Skip to content

Commit

Permalink
fixup! plugin: add max_jobs limit support
Browse files Browse the repository at this point in the history
  • Loading branch information
cmoussa1 committed Jul 29, 2021
1 parent eae42be commit cfd5d39
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extern "C" {

std::map<int, std::map<std::string, double>> users;
std::map<int, std::map<std::string, std::vector<int>>> users_mj;
std::map<int, std::string> users_def_bank;

/******************************************************************************
* *
Expand Down Expand Up @@ -113,14 +114,14 @@ static void rec_update_cb (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");

if (strcmp (bank, default_bank) == 0) {
if (strcmp (bank, default_bank) == 0)
users[std::atoi (uid)]["default"] = std::stod (fshare);
users_mj[std::atoi (uid)]["default"] = { std::atoi (max_jobs), 0 };
}

users[std::atoi (uid)][bank] = std::stod (fshare);
users_mj[std::atoi (uid)][bank] = { std::atoi (max_jobs), 0 };

users_def_bank[std::atoi (uid)] = default_bank;

return;
error:
flux_respond_error (h, msg, errno, flux_msg_last_error (msg));
Expand Down Expand Up @@ -190,6 +191,8 @@ static int validate_cb (flux_plugin_t *p,
std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;

std::map<int, std::string>::iterator def_bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -215,9 +218,26 @@ static int validate_cb (flux_plugin_t *p,

mj_it = users_mj.find (userid);

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);
if (bank == NULL) {
def_bank_it = users_def_bank.find (userid);
mj_inner_it = mj_it->second.find (def_bank_it->second);

if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank",
&def_bank_it->second[0],
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");
} else {
mj_inner_it = mj_it->second.find (bank);

if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank",
&bank[0],
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");
}

if (mj_inner_it->second[1] >= mj_inner_it->second[0])
return flux_jobtap_reject_job (p, args,
Expand All @@ -236,30 +256,31 @@ static int inactive_cb (flux_plugin_t *p,
void *data)
{
int userid;
char *bank = NULL;
void *bank;

std::map<int, std::map<std::string, std::vector<int>>>::iterator mj_it;
std::map<std::string, std::vector<int>>::iterator mj_inner_it;

std::map<int, std::string>::iterator def_bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank) < 0) {
"{s:i}",
"userid", &userid) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

mj_it = users_mj.find (userid);
bank = flux_jobtap_job_aux_get (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank");

mj_inner_it = (bank == NULL) ?
mj_it->second.find ("default") :
mj_it->second.find (bank);
mj_it = users_mj.find (userid);
mj_inner_it = mj_it->second.find (static_cast<char *> (bank));

mj_inner_it->second[1]--;

Expand Down

0 comments on commit cfd5d39

Please sign in to comment.