Skip to content

Commit

Permalink
plugin: add max_jobs limit support
Browse files Browse the repository at this point in the history
Add support for the priority plugin to keep count of
a user's active jobs and reject jobs that are submitted
while the user already has their max number of jobs
currently active.

The number of active jobs is kept track through a map-of-maps
data structure containing all of the users in the flux-accounting
DB along with their separate bank/max_jobs limits. When a job enters
job.validate, their max_jobs counter is incremented and is
decremented when the job enters job.state.inactive. If a user already
has their max number of active jobs running and tries to submit a
new job, the new job will be rejected with an error message.
  • Loading branch information
cmoussa1 committed Jul 26, 2021
1 parent 569b10f commit 849e40b
Showing 1 changed file with 62 additions and 3 deletions.
65 changes: 62 additions & 3 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ extern "C" {
#include <cassert>
#include <algorithm>
#include <cinttypes>
#include <vector>

std::map<int, std::map<std::string, double>> users;
std::map<int, std::map<std::string, std::vector<int>>> users_mj;

/******************************************************************************
* *
Expand Down Expand Up @@ -96,13 +98,14 @@ static void rec_update_cb (flux_t *h,
const flux_msg_t *msg,
void *arg)
{
char *uid, *fshare, *bank, *default_bank;
char *uid, *fshare, *bank, *default_bank, *max_jobs;

if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s, s:s}",
if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s, s:s, s:s}",
"userid", &uid,
"bank", &bank,
"default_bank", &default_bank,
"fairshare", &fshare) < 0) {
"fairshare", &fshare,
"max_jobs", &max_jobs) < 0) {
flux_log_error (h, "failed to unpack custom_priority.trigger msg");
goto error;
}
Expand All @@ -112,8 +115,10 @@ static void rec_update_cb (flux_t *h,

if (strcmp (bank, default_bank) == 0)
users[std::atoi (uid)]["default"] = std::stod (fshare);
users_mj[std::atoi (uid)]["default"] = { std::atoi (max_jobs), 0 };

users[std::atoi (uid)][bank] = std::stod (fshare);
users_mj[std::atoi (uid)][bank] = { std::atoi (max_jobs), 0 };

return;
error:
Expand Down Expand Up @@ -181,6 +186,9 @@ static int validate_cb (flux_plugin_t *p,
std::map<int, std::map<std::string, double>>::iterator it;
std::map<std::string, double>::iterator inner_it;

std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -204,6 +212,56 @@ static int validate_cb (flux_plugin_t *p,
"user does not belong to specified bank");
}

mj_it = users_mj.find (userid);

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);

if (mj_inner_it->second[1] >= mj_inner_it->second[0])
return flux_jobtap_reject_job (p, args,
"user has max number of jobs submitted");

// increment user's max_jobs count
mj_inner_it->second[1]++;

return 0;
}


static int inactive_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int userid;
char *bank = NULL;

std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

mj_it = users_mj.find (userid);

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);

mj_inner_it->second[1]--;

return 0;
}

Expand All @@ -212,6 +270,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.validate", validate_cb, NULL },
{ "job.state.priority", priority_cb, NULL },
{ "job.priority.get", priority_cb, NULL },
{ "job.state.inactive", inactive_cb, NULL },
{ 0 },
};

Expand Down

0 comments on commit 849e40b

Please sign in to comment.