Skip to content

Commit

Permalink
fixup! plugin: add max_jobs limit support
Browse files Browse the repository at this point in the history
  • Loading branch information
cmoussa1 committed Aug 2, 2021
1 parent 0c16e28 commit fd38c6e
Showing 1 changed file with 56 additions and 50 deletions.
106 changes: 56 additions & 50 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ extern "C" {
#include <cinttypes>
#include <vector>

std::map<int, std::map<std::string, double>> users;
std::map<int, std::map<std::string, std::vector<int>>> users_mj;
std::map<int, std::map<std::string, struct bank_info>> users;
std::map<int, std::string> users_def_bank;

struct bank_info {
double fairshare;
int max_jobs;
int current_jobs;
};

/******************************************************************************
* *
Expand Down Expand Up @@ -63,19 +69,12 @@ int64_t priority_calculation (flux_plugin_t *p,
if (urgency == FLUX_JOB_URGENCY_EXPEDITE)
return FLUX_JOB_PRIORITY_MAX;

// search element in map of maps by key
it = users.find (userid);
bank = static_cast<char *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank"));

// search for correct fshare value using passed-in bank; otherwise, use
// a default bank
if (bank != NULL) {
inner_it = it->second.find (bank);
fshare_factor = inner_it->second;
}
else {
inner_it = it->second.find ("default");
fshare_factor = inner_it->second;
}
fshare_factor = users[userid][bank].fairshare;

priority = (fshare_weight * fshare_factor) + (urgency - 16);

Expand Down Expand Up @@ -113,13 +112,14 @@ static void rec_update_cb (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");

if (strcmp (bank, default_bank) == 0) {
users[std::atoi (uid)]["default"] = std::stod (fshare);
users_mj[std::atoi (uid)]["default"] = { std::atoi (max_jobs), 0 };
}
struct bank_info b;

b.fairshare = std::atof (fshare);
b.max_jobs = std::atoi (max_jobs);
b.current_jobs = 0;

users[std::atoi (uid)][bank] = std::stod (fshare);
users_mj[std::atoi (uid)][bank] = { std::atoi (max_jobs), 0 };
users[std::atoi (uid)][bank] = b;
users_def_bank[std::atoi (uid)] = default_bank;

return;
error:
Expand Down Expand Up @@ -184,11 +184,10 @@ static int validate_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
std::map<int, std::map<std::string, double>>::iterator it;
std::map<std::string, double>::iterator inner_it;
int current_jobs, max_jobs = 0;

std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -200,31 +199,44 @@ static int validate_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

// searching element in std::map by key
// make sure user belongs to flux-accounting DB
it = users.find (userid);
if (it == users.end ())
return flux_jobtap_reject_job (p, args,
"user not found in flux-accounting DB");

// make sure user belongs to bank they specified; if no bank was passed in,
// look up their default bank
if (bank != NULL) {
inner_it = it->second.find (std::string (bank));
if (inner_it == it->second.end ())
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return flux_jobtap_reject_job (p, args,
"user does not belong to specified bank");
"user does not belong to specified bank");
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
}

mj_it = users_mj.find (userid);
try {
max_jobs = users.at (userid).at (bank).max_jobs;
current_jobs = users.at (userid).at (bank).current_jobs;
} catch (const std::out_of_range& oor) {
return flux_jobtap_reject_job (p, args,
"could not look up user's active job count");
}

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank",
bank,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");

if (mj_inner_it->second[1] >= mj_inner_it->second[0])
// make sure user has not already hit their max active jobs count
if (current_jobs >= max_jobs)
return flux_jobtap_reject_job (p, args,
"user has max number of jobs submitted");
"user has max number of jobs submitted");

// increment user's max_jobs count
mj_inner_it->second[1]++;
users[userid][bank].current_jobs++;

return 0;
}
Expand All @@ -236,32 +248,26 @@ static int inactive_cb (flux_plugin_t *p,
void *data)
{
int userid;
char *bank = NULL;

std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;
char *bank;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"{s:i}",
"userid", &userid) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

mj_it = users_mj.find (userid);

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);
bank = static_cast<char *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank"));

mj_inner_it->second[1]--;
users[userid][bank].current_jobs--;

return 0;
}
Expand Down

0 comments on commit fd38c6e

Please sign in to comment.