diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index f59dc22a9..edc3eedac 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -27,8 +27,14 @@ extern "C" { #include #include -std::map> users; -std::map>> users_mj; +std::map> users; +std::map users_def_bank; + +struct bank_info { + double fairshare; + int max_jobs; + int current_jobs; +}; /****************************************************************************** * * @@ -63,18 +69,25 @@ int64_t priority_calculation (flux_plugin_t *p, if (urgency == FLUX_JOB_URGENCY_EXPEDITE) return FLUX_JOB_PRIORITY_MAX; - // search element in map of maps by key - it = users.find (userid); + bank = static_cast (flux_jobtap_job_aux_get ( + p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:bank")); - // search for correct fshare value using passed-in bank; otherwise, use - // a default bank - if (bank != NULL) { - inner_it = it->second.find (bank); - fshare_factor = inner_it->second; + if (bank == NULL) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "plugin", + 3, "mf_priority: bank is NULL; " + "holding job"); + return 0; } - else { - inner_it = it->second.find ("default"); - fshare_factor = inner_it->second; + + try { + fshare_factor = users.at(userid).at(bank).fairshare; + } catch (const std::out_of_range& oor) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "plugin", + 3, "mf_priority: cannot find fairshare; " + "holding job"); + return 0; } priority = (fshare_weight * fshare_factor) + (urgency - 16); @@ -113,13 +126,14 @@ static void rec_update_cb (flux_t *h, if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "flux_respond"); - if (strcmp (bank, default_bank) == 0) { - users[std::atoi (uid)]["default"] = std::stod (fshare); - users_mj[std::atoi (uid)]["default"] = { std::atoi (max_jobs), 0 }; - } + struct bank_info b; + + b.fairshare = std::atof (fshare); + b.max_jobs = std::atoi (max_jobs); + b.current_jobs = 0; - users[std::atoi (uid)][bank] = std::stod (fshare); - users_mj[std::atoi (uid)][bank] = { std::atoi (max_jobs), 0 }; + users[std::atoi (uid)][bank] = b; + users_def_bank[std::atoi (uid)] = default_bank; return; error: @@ -184,11 +198,10 @@ static int validate_cb (flux_plugin_t *p, { int userid; char *bank = NULL; - std::map>::iterator it; - std::map::iterator inner_it; + int current_jobs, max_jobs = 0; - std::map>>::iterator mj_it; - std::map>::iterator mj_inner_it; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -200,31 +213,44 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } - // searching element in std::map by key + // make sure user belongs to flux-accounting DB it = users.find (userid); if (it == users.end ()) return flux_jobtap_reject_job (p, args, "user not found in flux-accounting DB"); + // make sure user belongs to bank they specified; if no bank was passed in, + // look up their default bank if (bank != NULL) { - inner_it = it->second.find (std::string (bank)); - if (inner_it == it->second.end ()) + bank_it = it->second.find (std::string (bank)); + if (bank_it == it->second.end ()) return flux_jobtap_reject_job (p, args, - "user does not belong to specified bank"); + "user does not belong to specified bank"); + } else { + bank = const_cast (users_def_bank[userid].c_str ()); } - mj_it = users_mj.find (userid); + try { + max_jobs = users.at(userid).at(bank).max_jobs; + current_jobs = users.at(userid).at(bank).current_jobs; + } catch (const std::out_of_range& oor) { + return flux_jobtap_reject_job (p, args, + "could not look up user's active job count"); + } - mj_inner_it = (bank == NULL) ? - mj_it->second.find ("default") : - mj_it->second.find (bank); + if (flux_jobtap_job_aux_set (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:bank", + bank, + NULL) < 0) + flux_log_error (h, "flux_jobtap_job_aux_set"); - if (mj_inner_it->second[1] >= mj_inner_it->second[0]) + // make sure user has not already hit their max active jobs count + if (max_jobs > 0 && current_jobs >= max_jobs) return flux_jobtap_reject_job (p, args, - "user has max number of jobs submitted"); + "user has max number of jobs submitted"); - // increment user's max_jobs count - mj_inner_it->second[1]++; + users[userid][bank].current_jobs++; return 0; } @@ -236,18 +262,13 @@ static int inactive_cb (flux_plugin_t *p, void *data) { int userid; - char *bank = NULL; - - std::map>>::iterator mj_it; - std::map>::iterator mj_inner_it; + char *bank; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}}", - "userid", &userid, - "jobspec", "attributes", "system", - "bank", &bank) < 0) { + "{s:i}", + "userid", &userid) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -255,13 +276,23 @@ static int inactive_cb (flux_plugin_t *p, return -1; } - mj_it = users_mj.find (userid); + bank = static_cast (flux_jobtap_job_aux_get ( + p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:bank")); - mj_inner_it = (bank == NULL) ? - mj_it->second.find ("default") : - mj_it->second.find (bank); + if (bank == NULL) + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "plugin", + 3, "mf_priority: bank is NULL"); - mj_inner_it->second[1]--; + + try { + users.at(userid).at(bank).current_jobs--; + } catch (const std::out_of_range& oor) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "plugin", + 3, "mf_priority: could not decrement " + "current jobs count"); + } return 0; }