From 5a5cc6e06b992e8329bc24d15793f3a6eeeb8a96 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Fri, 11 Feb 2022 09:04:44 -0800 Subject: [PATCH] plugin: add queue validation Add validation for an optional queue argument when a job is submitted. The queue is first checked to exist in the queues map. It is then checked to determine if it is a valid queue for a user/bank to specify when submitting their job. If all checks pass, the queue's associated integer priority is added to the bank_info struct for the user/bank job. --- src/plugins/mf_priority.cpp | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 21465e1f9..f49194978 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -305,19 +305,21 @@ static int validate_cb (flux_plugin_t *p, { int userid; char *bank = NULL; + char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; std::map>::iterator it; std::map::iterator bank_it; + std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}}", + "{s:i, s{s{s{s?s, s?s}}}}", "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank) < 0) { + "bank", &bank, "queue", &queue) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -342,6 +344,26 @@ static int validate_cb (flux_plugin_t *p, "user/default bank entry does not exist"); } + // make sure that if a queue is passed in, it 1) exists, and 2) is a valid + // queue for the user to run jobs in + if (queue != NULL) { + // check #1) the queue passed in exists in the queues map + q_it = queues.find (queue); + if (q_it == queues.end ()) + return flux_jobtap_reject_job (p, args, "Queue does not exist"); + + // check #2) the queue passed in is a valid option to pass for user + std::vector::iterator vect_it; + vect_it = std::find (bank_it->second.queues.begin (), + bank_it->second.queues.end (), queue); + + if (vect_it == bank_it->second.queues.end ()) + return flux_jobtap_reject_job (p, args, "Queue not valid for user"); + else + // add priority associated with the passed in queue to bank_info + bank_it->second.queue_factor = queues[queue].priority; + } + max_run_jobs = bank_it->second.max_run_jobs; fairshare = bank_it->second.fairshare; cur_active_jobs = bank_it->second.cur_active_jobs;