Skip to content

Commit

Permalink
plugin: add queue validation
Browse files Browse the repository at this point in the history
Add validation for an optional queue argument
when a job is submitted. The queue is first checked
to exist in the queues map. It is then checked to
determine if it is a valid queue for a user/bank to specify
when submitting their job.

If no queue is specified, the plugin will look for a "default"
queue and use its associated priority. If no default queue is
added, jobs trying to use this default queue will be rejected
with a message saying that no default queue exists. It is up
to the sys admin or scheduler operator to ensure that at least
a default queue exists in the queue_table of the flux-accounting
DB.

If all checks pass, the queue's associated integer priority
is added to the bank_info struct for the user/bank job.
  • Loading branch information
cmoussa1 committed Apr 14, 2022
1 parent 3cf8d16 commit 31df2e5
Showing 1 changed file with 105 additions and 6 deletions.
111 changes: 105 additions & 6 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ extern "C" {
#include <sstream>

#define BANK_INFO_MISSING -9
#define NO_SUCH_QUEUE -5
#define INVALID_QUEUE -6
#define NO_DEFAULT_QUEUE -7

std::map<int, std::map<std::string, struct bank_info>> users;
std::map<std::string, struct queue_info> queues;
Expand Down Expand Up @@ -104,6 +107,42 @@ int64_t priority_calculation (flux_plugin_t *p,
}


static int get_queue_info (
char *queue,
std::map<std::string, struct bank_info>::iterator bank_it)
{
std::map<std::string, struct queue_info>::iterator q_it;

// make sure that if a queue is passed in, it 1) exists, and 2) is a valid
// queue for the user to run jobs in
if (queue != NULL) {
// check #1) the queue passed in exists in the queues map
q_it = queues.find (queue);
if (q_it == queues.end ())
return NO_SUCH_QUEUE;

// check #2) the queue passed in is a valid option to pass for user
std::vector<std::string>::iterator vect_it;
vect_it = std::find (bank_it->second.queues.begin (),
bank_it->second.queues.end (), queue);

if (vect_it == bank_it->second.queues.end ())
return INVALID_QUEUE;
else
// add priority associated with the passed in queue to bank_info
return queues[queue].priority;
} else {
// no queue was specified, so use default queue and associated priority
q_it = queues.find ("default");

if (q_it == queues.end ())
return NO_DEFAULT_QUEUE;
else
return queues["default"].priority;
}
}


static void split_string (char *queues, struct bank_info *b)
{
std::stringstream s_stream;
Expand All @@ -117,6 +156,35 @@ static void split_string (char *queues, struct bank_info *b)
}


int check_queue_factor (flux_plugin_t *p,
int queue_factor,
char *queue,
char *prefix = (char *) "")
{
if (queue_factor == NO_SUCH_QUEUE) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority",
0,
"%sQueue does not exist: %s",
prefix, queue);
return -1;
} else if (queue_factor == INVALID_QUEUE) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"%sQueue not valid for user: %s",
prefix, queue);
return -1;
}
else if (queue_factor == NO_DEFAULT_QUEUE) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"No default queue exists");
return -1;
}

return 0;
}


/******************************************************************************
* *
* Callbacks *
Expand Down Expand Up @@ -271,17 +339,18 @@ static int priority_cb (flux_plugin_t *p,
{
int urgency, userid;
char *bank = NULL;
char *queue = NULL;
int64_t priority;
struct bank_info *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:i, s{s{s{s?s}}}}",
"{s:i, s:i, s{s{s{s?s, s?s}}}}",
"urgency", &urgency,
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"bank", &bank, "queue", &queue) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
Expand Down Expand Up @@ -336,6 +405,13 @@ static int priority_cb (flux_plugin_t *p,
return flux_jobtap_priority_unavail (p, args);
}

// fetch priority associated with passed-in queue (or default queue)
bank_it->second.queue_factor = get_queue_info (queue, bank_it);
if (check_queue_factor (p,
bank_it->second.queue_factor,
queue) < 0)
return -1;

// if we get here, the bank was unknown when this job was first
// accepted, and therefore the active and run job counts for this
// job need to be incremented here
Expand Down Expand Up @@ -407,19 +483,21 @@ static int validate_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, struct queue_info>::iterator q_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"{s:i, s{s{s{s?s, s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

Expand Down Expand Up @@ -447,6 +525,18 @@ static int validate_cb (flux_plugin_t *p,
"user/default bank entry does not exist");
}

// fetch priority associated with passed-in queue (or default queue)
bank_it->second.queue_factor = get_queue_info (queue, bank_it);

if (bank_it->second.queue_factor == NO_SUCH_QUEUE)
return flux_jobtap_reject_job (p, args, "Queue does not exist: %s",
queue);
else if (bank_it->second.queue_factor == INVALID_QUEUE)
return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s",
queue);
else if (bank_it->second.queue_factor == NO_DEFAULT_QUEUE)
return flux_jobtap_reject_job (p, args, "No default queue exists");

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
Expand All @@ -473,6 +563,7 @@ static int new_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;
struct bank_info *b;
Expand All @@ -483,10 +574,10 @@ static int new_cb (flux_plugin_t *p,
flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"{s:i, s{s{s{s?s, s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

Expand Down Expand Up @@ -533,6 +624,14 @@ static int new_cb (flux_plugin_t *p,
}
}

// fetch priority associated with passed-in queue (or default queue)
bank_it->second.queue_factor = get_queue_info (queue, bank_it);
if (check_queue_factor (p,
bank_it->second.queue_factor,
queue,
(char *) "job.new: ") < 0)
return -1;

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
Expand Down

0 comments on commit 31df2e5

Please sign in to comment.