Skip to content

Commit

Permalink
plugin: add queue validation
Browse files Browse the repository at this point in the history
Add validation for an optional queue argument
when a job is submitted. The queue is first checked
to exist in the queues map. It is then checked to
determine if it is a valid queue for a user/bank to specify
when submitting their job.

If no queue is specified, the plugin will look for a "default"
queue and use its associated priority. If no default queue is
added, jobs trying to use this default queue will be rejected
with a message saying that no default queue exists. It is up
to the sys admin or scheduler operator to ensure that at least
a default queue exists in the queue_table of the flux-accounting
DB.

If all checks pass, the queue's associated integer priority
is added to the bank_info struct for the user/bank job.
  • Loading branch information
cmoussa1 committed Mar 8, 2022
1 parent 83e73ae commit 5eff0c7
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,21 @@ static int validate_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, struct queue_info>::iterator q_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"{s:i, s{s{s{s?s, s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

Expand All @@ -342,6 +344,35 @@ static int validate_cb (flux_plugin_t *p,
"user/default bank entry does not exist");
}

// make sure that if a queue is passed in, it 1) exists, and 2) is a valid
// queue for the user to run jobs in
if (queue != NULL) {
// check #1) the queue passed in exists in the queues map
q_it = queues.find (queue);
if (q_it == queues.end ())
return flux_jobtap_reject_job (p, args, "Queue does not exist");

// check #2) the queue passed in is a valid option to pass for user
std::vector<std::string>::iterator vect_it;
vect_it = std::find (bank_it->second.queues.begin (),
bank_it->second.queues.end (), queue);

if (vect_it == bank_it->second.queues.end ())
return flux_jobtap_reject_job (p, args, "Queue not valid for user");
else
// add priority associated with the passed in queue to bank_info
bank_it->second.queue_factor = queues[queue].priority;
} else {
// no queue was specified, so use default queue and associated priority
q_it = queues.find ("default");

if (q_it == queues.end ())
return flux_jobtap_reject_job (p, args, "No default queue exists");
else
// add priority associated with the passed in queue to bank_info
bank_it->second.queue_factor = queues["default"].priority;
}

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
Expand Down

0 comments on commit 5eff0c7

Please sign in to comment.