Skip to content

Commit

Permalink
plugin: add queue validation
Browse files Browse the repository at this point in the history
Add validation for an optional queue argument
when a job is submitted. The queue is first checked
to exist in the queues map. It is then checked to
determine if it is a valid queue for a user/bank to specify
when submitting their job.

If all checks pass, the queue's associated integer priority
is added to the bank_info struct for the user/bank job.
  • Loading branch information
cmoussa1 committed Mar 7, 2022
1 parent 93fceca commit 5a5cc6e
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,21 @@ static int validate_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, struct queue_info>::iterator q_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"{s:i, s{s{s{s?s, s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

Expand All @@ -342,6 +344,26 @@ static int validate_cb (flux_plugin_t *p,
"user/default bank entry does not exist");
}

// make sure that if a queue is passed in, it 1) exists, and 2) is a valid
// queue for the user to run jobs in
if (queue != NULL) {
// check #1) the queue passed in exists in the queues map
q_it = queues.find (queue);
if (q_it == queues.end ())
return flux_jobtap_reject_job (p, args, "Queue does not exist");

// check #2) the queue passed in is a valid option to pass for user
std::vector<std::string>::iterator vect_it;
vect_it = std::find (bank_it->second.queues.begin (),
bank_it->second.queues.end (), queue);

if (vect_it == bank_it->second.queues.end ())
return flux_jobtap_reject_job (p, args, "Queue not valid for user");
else
// add priority associated with the passed in queue to bank_info
bank_it->second.queue_factor = queues[queue].priority;
}

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
Expand Down

0 comments on commit 5a5cc6e

Please sign in to comment.