diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index cb91203ae..02a4a620d 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -25,8 +25,10 @@ extern "C" { #include #include #include +#include std::map> users; +std::map>> users_mj; /****************************************************************************** * * @@ -96,13 +98,14 @@ static void rec_update_cb (flux_t *h, const flux_msg_t *msg, void *arg) { - char *uid, *fshare, *bank, *default_bank; + char *uid, *fshare, *bank, *default_bank, *max_jobs; - if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s, s:s}", + if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s, s:s, s:s}", "userid", &uid, "bank", &bank, "default_bank", &default_bank, - "fairshare", &fshare) < 0) { + "fairshare", &fshare, + "max_jobs", &max_jobs) < 0) { flux_log_error (h, "failed to unpack custom_priority.trigger msg"); goto error; } @@ -112,8 +115,10 @@ static void rec_update_cb (flux_t *h, if (strcmp (bank, default_bank) == 0) users[std::atoi (uid)]["default"] = std::stod (fshare); + users_mj[std::atoi (uid)]["default"] = { std::atoi (max_jobs), 0 }; users[std::atoi (uid)][bank] = std::stod (fshare); + users_mj[std::atoi (uid)][bank] = { std::atoi (max_jobs), 0 }; return; error: @@ -181,6 +186,9 @@ static int validate_cb (flux_plugin_t *p, std::map>::iterator it; std::map::iterator inner_it; + std::map>>::iterator mj_it; + std::map>::iterator mj_inner_it; + flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -204,6 +212,56 @@ static int validate_cb (flux_plugin_t *p, "user does not belong to specified bank"); } + mj_it = users_mj.find (userid); + + mj_inner_it = (bank == NULL) ? + mj_it->second.find ("default") : + mj_it->second.find (bank); + + if (mj_inner_it->second[1] >= mj_inner_it->second[0]) + return flux_jobtap_reject_job (p, args, + "user has max number of jobs submitted"); + + // increment user's max_jobs count + mj_inner_it->second[1]++; + + return 0; +} + + +static int inactive_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int userid; + char *bank = NULL; + + std::map>>::iterator mj_it; + std::map>::iterator mj_inner_it; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s{s{s{s?s}}}}", + "userid", &userid, + "jobspec", "attributes", "system", + "bank", &bank) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + + mj_it = users_mj.find (userid); + + mj_inner_it = (bank == NULL) ? + mj_it->second.find ("default") : + mj_it->second.find (bank); + + mj_inner_it->second[1]--; + return 0; } @@ -212,6 +270,7 @@ static const struct flux_plugin_handler tab[] = { { "job.validate", validate_cb, NULL }, { "job.state.priority", priority_cb, NULL }, { "job.priority.get", priority_cb, NULL }, + { "job.state.inactive", inactive_cb, NULL }, { 0 }, };