From a14c2926eb3d552a6ab6845bba8b30d851d8a5fe Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 10 Feb 2022 09:10:00 -0800 Subject: [PATCH 1/7] plugin: add queues members to bank_info struct Add two new members to the bank_info struct: - queues, a vector of strings which will hold all available queues a particular user/bank row can run jobs in. The queues are passed in as a comma-delimited string, and then parsed and pushed one-by-one into the "queues" member in the bank_info struct. - queue_factor: an integer to hold the associated priority of a queue passed in from a user/bank job. --- src/plugins/mf_priority.cpp | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 8f97d912..98a99d87 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -27,6 +27,7 @@ extern "C" { #include #include #include +#include #define BANK_INFO_MISSING -9 @@ -40,6 +41,8 @@ struct bank_info { int max_active_jobs; int cur_active_jobs; std::vector held_jobs; + std::vector queues; + int queue_factor; }; /****************************************************************************** @@ -93,6 +96,19 @@ int64_t priority_calculation (flux_plugin_t *p, } +static void split_string (char *queues, struct bank_info *b) +{ + std::stringstream s_stream; + + s_stream << queues; // create string stream from string + while (s_stream.good ()) { + std::string substr; + getline (s_stream, substr, ','); // get string delimited by comma + b->queues.push_back (substr); + } +} + + /****************************************************************************** * * * Callbacks * @@ -108,12 +124,13 @@ static void rec_update_cb (flux_t *h, const flux_msg_t *msg, void *arg) { - char *bank, *def_bank = NULL; + char *bank, *def_bank, *queues = NULL; int uid, max_running_jobs, max_active_jobs = 0; double fshare = 0.0; json_t *data, *jtemp = NULL; json_error_t error; int num_data = 0; + std::stringstream s_stream; if (flux_request_unpack (msg, NULL, "{s:o}", "data", &data) < 0) { flux_log_error (h, "failed to unpack custom_priority.trigger msg"); @@ -132,13 +149,15 @@ static void rec_update_cb (flux_t *h, for (int i = 0; i < num_data; i++) { json_t *el = json_array_get(data, i); - if (json_unpack_ex (el, &error, 0, "{s:i, s:s, s:s, s:F, s:i, s:i}", + if (json_unpack_ex (el, &error, 0, + "{s:i, s:s, s:s, s:F, s:i, s:i, s:s}", "userid", &uid, "bank", &bank, "def_bank", &def_bank, "fairshare", &fshare, "max_running_jobs", &max_running_jobs, - "max_active_jobs", &max_active_jobs) < 0) + "max_active_jobs", &max_active_jobs, + "queues", &queues) < 0) flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); struct bank_info *b; @@ -148,6 +167,9 @@ static void rec_update_cb (flux_t *h, b->max_run_jobs = max_running_jobs; b->max_active_jobs = max_active_jobs; + // split queues comma-delimited string and add it to b->queues vector + split_string (queues, b); + users_def_bank[uid] = def_bank; } From 1320fbefdcd63423e1657c6d33bfc030b029c5bb Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 10 Feb 2022 09:11:45 -0800 Subject: [PATCH 2/7] bulk_update: add queues key-value pair to RPC Add the values from the queues column in the association_table to the RPC that is sent from the database to the priority plugin. This column represents the available queues to each user/bank row in the association_table. Add "queues" key-value pairs to the sample payloads in existing sharness tests. Add a "default" queue that won't affect the results of calculating priorities in the existing sharness tests. --- src/cmd/flux-account-priority-update.py | 4 +- t/t1001-mf-priority-basic.t | 52 ++++++++++++++-- t/t1002-mf-priority-small-no-tie.t | 35 ++++++++--- t/t1003-mf-priority-small-tie.t | 37 ++++++++--- t/t1004-mf-priority-small-tie-all.t | 39 +++++++++--- t/t1005-max-jobs-limits.t | 81 +++++++++++++++++++++++-- t/t1008-mf-priority-update.t | 4 ++ t/t1012-mf-priority-load.t | 12 ++++ 8 files changed, 229 insertions(+), 35 deletions(-) diff --git a/src/cmd/flux-account-priority-update.py b/src/cmd/flux-account-priority-update.py index 7c5dfff4..41e7ffad 100755 --- a/src/cmd/flux-account-priority-update.py +++ b/src/cmd/flux-account-priority-update.py @@ -54,7 +54,8 @@ def bulk_update(path): # fetch all rows from association_table (will print out tuples) for row in cur.execute( """SELECT userid, bank, default_bank, - fairshare, max_running_jobs, max_active_jobs FROM association_table""" + fairshare, max_running_jobs, max_active_jobs, + queues FROM association_table""" ): # create a JSON payload with the results of the query single_user_data = { @@ -64,6 +65,7 @@ def bulk_update(path): "fairshare": float(row[3]), "max_running_jobs": int(row[4]), "max_active_jobs": int(row[5]), + "queues": str(row[6]), } bulk_user_data.append(single_user_data) diff --git a/t/t1001-mf-priority-basic.t b/t/t1001-mf-priority-basic.t index 3bd158dc..e827790d 100755 --- a/t/t1001-mf-priority-basic.t +++ b/t/t1001-mf-priority-basic.t @@ -44,11 +44,39 @@ test_expect_success 'create fake_payload.py' ' # create an array of JSON payloads bulk_update_data = { "data" : [ - {"userid": userid, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 10, "max_active_jobs": 12}, - {"userid": userid, "bank": "account2", "def_bank": "account3", "fairshare": 0.11345, "max_running_jobs": 10, "max_active_jobs": 12} + { + "userid": userid, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 10, + "max_active_jobs": 12, + "queues": "standby,special" + }, + { + "userid": userid, + "bank": "account2", + "def_bank": "account3", + "fairshare": 0.11345, + "max_running_jobs": 10, + "max_active_jobs": 12, + "queues": "standby" + } ] } flux.Flux().rpc("job-manager.mf_priority.rec_update", json.dumps(bulk_update_data)).get() + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() EOF ' @@ -156,7 +184,15 @@ test_expect_success 'pass special key to user/bank struct to nullify information cat <<-EOF >null_struct.json { "data" : [ - {"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": -1, "max_active_jobs": 12} + { + "userid": 5011, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": -1, + "max_active_jobs": 12, + "queues": "standby,special" + } ] } EOF @@ -176,7 +212,15 @@ test_expect_success 'resend user/bank information with valid data and successful cat <<-EOF >valid_info.json { "data" : [ - {"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 2, "max_active_jobs": 4} + { + "userid": 5011, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 2, + "max_active_jobs": 4, + "queues": "standby,special" + } ] } EOF diff --git a/t/t1002-mf-priority-small-no-tie.t b/t/t1002-mf-priority-small-no-tie.t index 8ebd8950..a5b9d346 100755 --- a/t/t1002-mf-priority-small-no-tie.t +++ b/t/t1002-mf-priority-small-no-tie.t @@ -25,13 +25,13 @@ test_expect_success 'create a group of users with unique fairshare values' ' cat <<-EOF >fake_small_no_tie.json { "data" : [ - {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.285714, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.142857, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.428571, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.714286, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.571429, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.857143, "max_running_jobs": 5, "max_active_jobs": 7} + {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.285714, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.142857, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.428571, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.714286, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.571429, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.857143, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""} ] } EOF @@ -41,6 +41,27 @@ test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} fake_small_no_tie.json ' +test_expect_success 'add a default queue and send it to the plugin' ' + cat <<-EOF >fake_payload.py + import flux + import json + + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() + EOF + flux python fake_payload.py +' + test_expect_success 'stop the queue' ' flux queue stop ' diff --git a/t/t1003-mf-priority-small-tie.t b/t/t1003-mf-priority-small-tie.t index e5c7b2a5..7848144c 100755 --- a/t/t1003-mf-priority-small-tie.t +++ b/t/t1003-mf-priority-small-tie.t @@ -25,14 +25,14 @@ test_expect_success 'create a group of users with some ties in fairshare values' cat <<-EOF >fake_small_tie.json { "data" : [ - {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.875, "max_running_jobs": 5, "max_active_jobs": 7} + {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.875, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""} ] } EOF @@ -42,6 +42,27 @@ test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} fake_small_tie.json ' +test_expect_success 'add a default queue and send it to the plugin' ' + cat <<-EOF >fake_payload.py + import flux + import json + + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() + EOF + flux python fake_payload.py +' + test_expect_success 'stop the queue' ' flux queue stop ' diff --git a/t/t1004-mf-priority-small-tie-all.t b/t/t1004-mf-priority-small-tie-all.t index c6d3bc89..59c889de 100755 --- a/t/t1004-mf-priority-small-tie-all.t +++ b/t/t1004-mf-priority-small-tie-all.t @@ -25,15 +25,15 @@ test_expect_success 'create a group of users with many ties in fairshare values' cat <<-EOF >fake_small_tie_all.json { "data" : [ - {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7}, - {"userid": 5033, "bank": "account3", "def_bank": "account3", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7} + {"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""}, + {"userid": 5033, "bank": "account3", "def_bank": "account3", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7, "queues": ""} ] } EOF @@ -43,6 +43,27 @@ test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} fake_small_tie_all.json ' +test_expect_success 'add a default queue and send it to the plugin' ' + cat <<-EOF >fake_payload.py + import flux + import json + + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() + EOF + flux python fake_payload.py +' + test_expect_success 'stop the queue' ' flux queue stop ' diff --git a/t/t1005-max-jobs-limits.t b/t/t1005-max-jobs-limits.t index 008e3a8f..ea8cd0db 100755 --- a/t/t1005-max-jobs-limits.t +++ b/t/t1005-max-jobs-limits.t @@ -25,8 +25,24 @@ test_expect_success 'create fake_user.json' ' cat <<-EOF >fake_user.json { "data" : [ - {"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 2, "max_active_jobs": 4}, - {"userid": 5011, "bank": "account2", "def_bank": "account3", "fairshare": 0.11345, "max_running_jobs": 1, "max_active_jobs": 2} + { + "userid": 5011, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 2, + "max_active_jobs": 4, + "queues": "" + }, + { + "userid": 5011, + "bank": "account2", + "def_bank": "account3", + "fairshare": 0.11345, + "max_running_jobs": 1, + "max_active_jobs": 2, + "queues": "" + } ] } EOF @@ -36,6 +52,27 @@ test_expect_success 'update plugin with sample test data' ' flux python ${SEND_PAYLOAD} fake_user.json ' +test_expect_success 'add a default queue and send it to the plugin' ' + cat <<-EOF >fake_payload.py + import flux + import json + + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() + EOF + flux python fake_payload.py +' + test_expect_success 'submit max number of jobs' ' jobid1=$(flux python ${SUBMIT_AS} 5011 sleep 60) && jobid2=$(flux python ${SUBMIT_AS} 5011 sleep 60) @@ -79,7 +116,15 @@ test_expect_success 'increase the max jobs count of the user' ' cat <<-EOF >new_max_running_jobs_limit.json { "data" : [ - {"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 3, "max_active_jobs": 4} + { + "userid": 5011, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 3, + "max_active_jobs": 4, + "queues": "" + } ] } EOF @@ -120,7 +165,15 @@ test_expect_success 'update max_active_jobs limit' ' cat <<-EOF >new_max_active_jobs_limit.json { "data" : [ - {"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 3, "max_active_jobs": 5} + { + "userid": 5011, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 3, + "max_active_jobs": 5, + "queues": "" + } ] } EOF @@ -169,8 +222,24 @@ test_expect_success 'create another user with the same limits in multiple banks' cat <<-EOF >fake_user2.json { "data" : [ - {"userid": 5012, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 1, "max_active_jobs": 2}, - {"userid": 5012, "bank": "account2", "def_bank": "account3", "fairshare": 0.11345, "max_running_jobs": 1, "max_active_jobs": 2} + { + "userid": 5012, + "bank": "account3", + "def_bank": "account3", + "fairshare": 0.45321, + "max_running_jobs": 1, + "max_active_jobs": 2, + "queues": "" + }, + { + "userid": 5012, + "bank": "account2", + "def_bank": "account3", + "fairshare": 0.11345, + "max_running_jobs": 1, + "max_active_jobs": 2, + "queues": "" + } ] } EOF diff --git a/t/t1008-mf-priority-update.t b/t/t1008-mf-priority-update.t index afa1b0c4..19b4a4b0 100755 --- a/t/t1008-mf-priority-update.t +++ b/t/t1008-mf-priority-update.t @@ -36,6 +36,10 @@ test_expect_success 'add some users to the DB' ' flux account -p ${DB_PATH} add-user --username=user5013 --userid=5013 --bank=account1 ' +test_expect_success 'add a queue to the DB' ' + flux account -p ${DB_PATH} add-queue default --priority=0 +' + test_expect_success 'send the user information to the plugin' ' flux account-priority-update -p $(pwd)/FluxAccountingTest.db ' diff --git a/t/t1012-mf-priority-load.t b/t/t1012-mf-priority-load.t index a324221d..20d344fa 100755 --- a/t/t1012-mf-priority-load.t +++ b/t/t1012-mf-priority-load.t @@ -37,6 +37,18 @@ test_expect_success 'create fake_payload.py' ' ] } flux.Flux().rpc("job-manager.mf_priority.rec_update", json.dumps(bulk_update_data)).get() + bulk_queue_data = { + "data" : [ + { + "queue": "default", + "priority": 0, + "min_nodes_per_job": 0, + "max_nodes_per_job": 5, + "max_time_per_job": 64000 + } + ] + } + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get() flux.Flux().rpc("job-manager.mf_priority.reprioritize") EOF ' From 9161d212fe3c497bd83db777d66cf49335bcaf1e Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 10 Feb 2022 09:59:22 -0800 Subject: [PATCH 3/7] bulk_update: add queue payload section Add another section to bulk_update() which grabs queue information from the queue_table and sends it to the priority plugin. --- src/cmd/flux-account-priority-update.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/cmd/flux-account-priority-update.py b/src/cmd/flux-account-priority-update.py index 41e7ffad..b5aa7e5e 100755 --- a/src/cmd/flux-account-priority-update.py +++ b/src/cmd/flux-account-priority-update.py @@ -50,6 +50,7 @@ def bulk_update(path): data = {} bulk_user_data = [] + bulk_q_data = [] # fetch all rows from association_table (will print out tuples) for row in cur.execute( @@ -72,8 +73,28 @@ def bulk_update(path): data = {"data": bulk_user_data} flux.Flux().rpc("job-manager.mf_priority.rec_update", json.dumps(data)).get() + + # fetch all rows from queue_table + for row in cur.execute("SELECT * FROM queue_table"): + # create a JSON payload with the results of the query + single_q_data = { + "queue": str(row[0]), + "min_nodes_per_job": int(row[1]), + "max_nodes_per_job": int(row[2]), + "max_time_per_job": int(row[3]), + "priority": int(row[4]), + } + bulk_q_data.append(single_q_data) + + data = {"data": bulk_q_data} + + flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(data)).get() + flux.Flux().rpc("job-manager.mf_priority.reprioritize") + # close DB connection + cur.close() + def main(): parser = argparse.ArgumentParser( From 3cf8d1645510646f9820d62a0e3fd75ec0b82875 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 10 Feb 2022 10:00:47 -0800 Subject: [PATCH 4/7] plugin: add queue unpack callback Add a new callback function to the plugin which will receive and store queue information from the flux-accounting database to a map with the name of a queue as the key, and a struct of information about that queue as the value. These queue values will be used to further calculate job priorities if one is passed in. --- src/plugins/mf_priority.cpp | 66 ++++++++++++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 98a99d87..7345ec57 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -32,6 +32,7 @@ extern "C" { #define BANK_INFO_MISSING -9 std::map> users; +std::map queues; std::map users_def_bank; struct bank_info { @@ -45,6 +46,13 @@ struct bank_info { int queue_factor; }; +struct queue_info { + int min_nodes_per_job; + int max_nodes_per_job; + int max_time_per_job; + int priority; +}; + /****************************************************************************** * * * Helper Functions * @@ -178,6 +186,61 @@ static void rec_update_cb (flux_t *h, flux_respond_error (h, msg, errno, flux_msg_last_error (msg)); } +/* + * Unpack a payload from an external bulk update service and place it in the + * multimap datastructure. + */ +static void rec_q_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + char *queue = NULL; + int min_nodes_per_job, max_nodes_per_job, max_time_per_job, priority = 0; + json_t *data, *jtemp = NULL; + json_error_t error; + int num_data = 0; + + if (flux_request_unpack (msg, NULL, "{s:o}", "data", &data) < 0) { + flux_log_error (h, "failed to unpack custom_priority.trigger msg"); + goto error; + } + + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "flux_respond"); + + if (!data || !json_is_array (data)) { + flux_log (h, LOG_ERR, "mf_priority: invalid queue info payload"); + goto error; + } + num_data = json_array_size (data); + + for (int i = 0; i < num_data; i++) { + json_t *el = json_array_get(data, i); + + if (json_unpack_ex (el, &error, 0, + "{s:s, s:i, s:i, s:i, s:i}", + "queue", &queue, + "min_nodes_per_job", &min_nodes_per_job, + "max_nodes_per_job", &max_nodes_per_job, + "max_time_per_job", &max_time_per_job, + "priority", &priority) < 0) + flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); + + struct queue_info *q; + q = &queues[queue]; + + q->min_nodes_per_job = min_nodes_per_job; + q->max_nodes_per_job = max_nodes_per_job; + q->max_time_per_job = max_time_per_job; + q->priority = priority; + } + + return; +error: + flux_respond_error (h, msg, errno, flux_msg_last_error (msg)); +} + static void reprior_cb (flux_t *h, flux_msg_handler_t *mh, @@ -640,7 +703,8 @@ extern "C" int flux_plugin_init (flux_plugin_t *p) { if (flux_plugin_register (p, "mf_priority", tab) < 0 || flux_jobtap_service_register (p, "rec_update", rec_update_cb, p) - || flux_jobtap_service_register (p, "reprioritize", reprior_cb, p) < 0) + || flux_jobtap_service_register (p, "reprioritize", reprior_cb, p) + || flux_jobtap_service_register (p, "rec_q_update", rec_q_cb, p) < 0) return -1; return 0; } From 31df2e512c64fbc8fc325a2f7715f94358753eee Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Fri, 11 Feb 2022 09:04:44 -0800 Subject: [PATCH 5/7] plugin: add queue validation Add validation for an optional queue argument when a job is submitted. The queue is first checked to exist in the queues map. It is then checked to determine if it is a valid queue for a user/bank to specify when submitting their job. If no queue is specified, the plugin will look for a "default" queue and use its associated priority. If no default queue is added, jobs trying to use this default queue will be rejected with a message saying that no default queue exists. It is up to the sys admin or scheduler operator to ensure that at least a default queue exists in the queue_table of the flux-accounting DB. If all checks pass, the queue's associated integer priority is added to the bank_info struct for the user/bank job. --- src/plugins/mf_priority.cpp | 111 ++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 6 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 7345ec57..a1c96f16 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -30,6 +30,9 @@ extern "C" { #include #define BANK_INFO_MISSING -9 +#define NO_SUCH_QUEUE -5 +#define INVALID_QUEUE -6 +#define NO_DEFAULT_QUEUE -7 std::map> users; std::map queues; @@ -104,6 +107,42 @@ int64_t priority_calculation (flux_plugin_t *p, } +static int get_queue_info ( + char *queue, + std::map::iterator bank_it) +{ + std::map::iterator q_it; + + // make sure that if a queue is passed in, it 1) exists, and 2) is a valid + // queue for the user to run jobs in + if (queue != NULL) { + // check #1) the queue passed in exists in the queues map + q_it = queues.find (queue); + if (q_it == queues.end ()) + return NO_SUCH_QUEUE; + + // check #2) the queue passed in is a valid option to pass for user + std::vector::iterator vect_it; + vect_it = std::find (bank_it->second.queues.begin (), + bank_it->second.queues.end (), queue); + + if (vect_it == bank_it->second.queues.end ()) + return INVALID_QUEUE; + else + // add priority associated with the passed in queue to bank_info + return queues[queue].priority; + } else { + // no queue was specified, so use default queue and associated priority + q_it = queues.find ("default"); + + if (q_it == queues.end ()) + return NO_DEFAULT_QUEUE; + else + return queues["default"].priority; + } +} + + static void split_string (char *queues, struct bank_info *b) { std::stringstream s_stream; @@ -117,6 +156,35 @@ static void split_string (char *queues, struct bank_info *b) } +int check_queue_factor (flux_plugin_t *p, + int queue_factor, + char *queue, + char *prefix = (char *) "") +{ + if (queue_factor == NO_SUCH_QUEUE) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority", + 0, + "%sQueue does not exist: %s", + prefix, queue); + return -1; + } else if (queue_factor == INVALID_QUEUE) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "%sQueue not valid for user: %s", + prefix, queue); + return -1; + } + else if (queue_factor == NO_DEFAULT_QUEUE) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "No default queue exists"); + return -1; + } + + return 0; +} + + /****************************************************************************** * * * Callbacks * @@ -271,17 +339,18 @@ static int priority_cb (flux_plugin_t *p, { int urgency, userid; char *bank = NULL; + char *queue = NULL; int64_t priority; struct bank_info *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s}}}}", + "{s:i, s:i, s{s{s{s?s, s?s}}}}", "urgency", &urgency, "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank) < 0) { + "bank", &bank, "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -336,6 +405,13 @@ static int priority_cb (flux_plugin_t *p, return flux_jobtap_priority_unavail (p, args); } + // fetch priority associated with passed-in queue (or default queue) + bank_it->second.queue_factor = get_queue_info (queue, bank_it); + if (check_queue_factor (p, + bank_it->second.queue_factor, + queue) < 0) + return -1; + // if we get here, the bank was unknown when this job was first // accepted, and therefore the active and run job counts for this // job need to be incremented here @@ -407,19 +483,21 @@ static int validate_cb (flux_plugin_t *p, { int userid; char *bank = NULL; + char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; std::map>::iterator it; std::map::iterator bank_it; + std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}}", + "{s:i, s{s{s{s?s, s?s}}}}", "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank) < 0) { + "bank", &bank, "queue", &queue) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -447,6 +525,18 @@ static int validate_cb (flux_plugin_t *p, "user/default bank entry does not exist"); } + // fetch priority associated with passed-in queue (or default queue) + bank_it->second.queue_factor = get_queue_info (queue, bank_it); + + if (bank_it->second.queue_factor == NO_SUCH_QUEUE) + return flux_jobtap_reject_job (p, args, "Queue does not exist: %s", + queue); + else if (bank_it->second.queue_factor == INVALID_QUEUE) + return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", + queue); + else if (bank_it->second.queue_factor == NO_DEFAULT_QUEUE) + return flux_jobtap_reject_job (p, args, "No default queue exists"); + max_run_jobs = bank_it->second.max_run_jobs; fairshare = bank_it->second.fairshare; cur_active_jobs = bank_it->second.cur_active_jobs; @@ -473,6 +563,7 @@ static int new_cb (flux_plugin_t *p, { int userid; char *bank = NULL; + char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; struct bank_info *b; @@ -483,10 +574,10 @@ static int new_cb (flux_plugin_t *p, flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}}", + "{s:i, s{s{s{s?s, s?s}}}}", "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank) < 0) { + "bank", &bank, "queue", &queue) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -533,6 +624,14 @@ static int new_cb (flux_plugin_t *p, } } + // fetch priority associated with passed-in queue (or default queue) + bank_it->second.queue_factor = get_queue_info (queue, bank_it); + if (check_queue_factor (p, + bank_it->second.queue_factor, + queue, + (char *) "job.new: ") < 0) + return -1; + max_run_jobs = bank_it->second.max_run_jobs; fairshare = bank_it->second.fairshare; cur_active_jobs = bank_it->second.cur_active_jobs; From 7255493e9524c5cbf00e053c48cbca57e7730647 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 14 Feb 2022 08:13:47 -0800 Subject: [PATCH 6/7] plugin: add queue_factor to priority calculation --- src/plugins/mf_priority.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index a1c96f16..3d7bd6ed 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -67,7 +67,11 @@ struct queue_info { * * fairshare: the ratio between the amount of resources allocated vs. resources * consumed. + * * urgency: a user-controlled factor to prioritize their own jobs. + * + * queue: a factor that can further affect the priority of a job based on the + * queue passed in. */ int64_t priority_calculation (flux_plugin_t *p, flux_plugin_arg_t *args, @@ -76,10 +80,12 @@ int64_t priority_calculation (flux_plugin_t *p, int urgency) { double fshare_factor = 0.0, priority = 0.0; - int fshare_weight; + int queue_factor = 0; + int fshare_weight, queue_weight; struct bank_info *b; fshare_weight = 100000; + queue_weight = 10000; if (urgency == FLUX_JOB_URGENCY_HOLD) return FLUX_JOB_PRIORITY_MIN; @@ -100,10 +106,16 @@ int64_t priority_calculation (flux_plugin_t *p, } fshare_factor = b->fairshare; + queue_factor = b->queue_factor; - priority = (fshare_weight * fshare_factor) + (urgency - 16); + priority = round ((fshare_weight * fshare_factor) + + (queue_weight * queue_factor) + + (urgency - 16)); + + if (priority < 0) + return FLUX_JOB_PRIORITY_MIN; - return abs (round (priority)); + return priority; } From 76a897e088901f3798d6410a2586086d167fff6e Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Fri, 11 Feb 2022 09:06:34 -0800 Subject: [PATCH 7/7] t: add queue priority sharness tests --- t/Makefile.am | 3 +- t/t1013-mf-priority-queues.t | 159 +++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 1 deletion(-) create mode 100755 t/t1013-mf-priority-queues.t diff --git a/t/Makefile.am b/t/Makefile.am index 584ce9e6..52e0605a 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -13,7 +13,8 @@ TESTSCRIPTS = \ t1009-pop-db.t \ t1010-update-usage.t \ t1011-job-archive-interface.t \ - t1012-mf-priority-load.t + t1012-mf-priority-load.t \ + t1013-mf-priority-queues.t dist_check_SCRIPTS = \ $(TESTSCRIPTS) \ diff --git a/t/t1013-mf-priority-queues.t b/t/t1013-mf-priority-queues.t new file mode 100755 index 00000000..d36d9e61 --- /dev/null +++ b/t/t1013-mf-priority-queues.t @@ -0,0 +1,159 @@ +#!/bin/bash + +test_description='Test multi-factor priority plugin queue support with a single user' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p $(pwd)/FluxAccountingTest.db create-db +' + +test_expect_success 'add some banks to the DB' ' + flux account -p ${DB_PATH} add-bank root 1 && + flux account -p ${DB_PATH} add-bank --parent-bank=root account1 1 && + flux account -p ${DB_PATH} add-bank --parent-bank=root account2 1 +' + +test_expect_success 'add some queues to the DB' ' + flux account -p ${DB_PATH} add-queue standby --priority=0 && + flux account -p ${DB_PATH} add-queue expedite --priority=10000 && + flux account -p ${DB_PATH} add-queue bronze --priority=200 && + flux account -p ${DB_PATH} add-queue silver --priority=300 && + flux account -p ${DB_PATH} add-queue gold --priority=400 +' + +test_expect_success 'add a user to the DB' ' + flux account -p ${DB_PATH} add-user --username=user5011 \ + --userid=5011 --bank=account1 --queues="standby,bronze,silver,gold,expedite" && + flux account -p ${DB_PATH} add-user --username=user5011 \ + --userid=5011 --bank=account2 --queues="standby" +' + +test_expect_success 'view user information' ' + flux account -p ${DB_PATH} view-user user5011 +' + +test_expect_success 'send the user and queue information to the plugin' ' + flux account-priority-update -p $(pwd)/FluxAccountingTest.db +' + +test_expect_success 'stop the queue' ' + flux queue stop +' + +test_expect_success 'trying to submit a job without specifying a queue should attempt to use default queue' ' + test_must_fail flux python ${SUBMIT_AS} 5011 -n1 hostname > no_default_queue.out 2>&1 && + test_debug "no_default_queue.out" && + grep "No default queue exists" no_default_queue.out +' + +test_expect_success 'adding a default queue allows users to run jobs without specifying a queue' ' + flux account -p ${DB_PATH} add-queue default --priority=1000 && + flux account-priority-update -p $(pwd)/FluxAccountingTest.db && + jobid0=$(flux python ${SUBMIT_AS} 5011 -n1 hostname) && + flux job wait-event -f json $jobid0 priority | jq '.context.priority' > job0.test && + cat <<-EOF >job0.expected && + 10050000 + EOF + test_cmp job0.expected job0.test && + flux job cancel $jobid0 +' + +test_expect_success 'submit a job using a queue the user does not belong to' ' + test_must_fail flux python ${SUBMIT_AS} 5011 --setattr=system.bank=account2 \ + --setattr=system.queue=expedite -n1 hostname > unavail_queue.out 2>&1 && + test_debug "unavail_queue.out" && + grep "Queue not valid for user: expedite" unavail_queue.out +' + +test_expect_success 'submit a job using a nonexistent queue' ' + test_must_fail flux python ${SUBMIT_AS} 5011 --setattr=system.queue=foo \ + -n1 hostname > bad_queue.out 2>&1 && + test_debug "bad_queue.out" && + grep "Queue does not exist: foo" bad_queue.out +' + +test_expect_success 'submit a job using standby queue, which should not increase job priority' ' + jobid1=$(flux python ${SUBMIT_AS} 5011 --job-name=standby \ + --setattr=system.bank=account1 --setattr=system.queue=standby -n1 hostname) && + flux job wait-event -f json $jobid1 priority | jq '.context.priority' > job1.test && + cat <<-EOF >job1.expected && + 50000 + EOF + test_cmp job1.expected job1.test +' + +test_expect_success 'submit a job using expedite queue, which should increase priority' ' + jobid2=$(flux python ${SUBMIT_AS} 5011 --job-name=expedite \ + --setattr=system.bank=account1 --setattr=system.queue=expedite -n1 hostname) && + flux job wait-event -f json $jobid2 priority | jq '.context.priority' > job2.test && + cat <<-EOF >job2.expected && + 100050000 + EOF + test_cmp job2.expected job2.test +' + +test_expect_success 'submit a job using the rest of the available queues' ' + jobid3=$(flux python ${SUBMIT_AS} 5011 --job-name=bronze --setattr=system.queue=bronze -n1 hostname) && + jobid4=$(flux python ${SUBMIT_AS} 5011 --job-name=silver --setattr=system.queue=silver -n1 hostname) && + jobid5=$(flux python ${SUBMIT_AS} 5011 --job-name=gold --setattr=system.queue=gold -n1 hostname) +' + +test_expect_success 'check order of job queue' ' + flux jobs -A --suppress-header --format={name} > multi_queues.test && + cat <<-EOF >multi_queues.expected && + expedite + gold + silver + bronze + standby + EOF + test_cmp multi_queues.expected multi_queues.test +' + +test_expect_success 'cancel existing jobs' ' + flux job cancel $jobid1 && + flux job cancel $jobid2 && + flux job cancel $jobid3 && + flux job cancel $jobid4 && + flux job cancel $jobid5 +' + +test_expect_success 'unload mf_priority.so' ' + flux jobtap remove mf_priority.so +' + +test_expect_success 'submit a job to a nonexistent queue with no plugin information loaded' ' + jobid6=$(flux python ${SUBMIT_AS} 5011 --setattr=system.queue=foo -n1 hostname) && + test $(flux jobs -no {state} ${jobid6}) = PRIORITY +' + +test_expect_success 'reload mf_priority.so and update it with the sample test data again' ' + flux jobtap load ${MULTI_FACTOR_PRIORITY} && + test $(flux jobs -no {state} ${jobid6}) = PRIORITY && + flux account-priority-update -p $(pwd)/FluxAccountingTest.db +' + +test_expect_success 'ensure job exception was raised saying that the queue does not exist' ' + flux job wait-event -v ${jobid6} exception > nonexistent_queue.test && + grep "Queue does not exist: foo" nonexistent_queue.test +' + +test_done