Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add max_active_jobs per-user/bank limit #201

Merged
merged 6 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def create_db(
job_usage real DEFAULT 0.0 NOT NULL,
fairshare real DEFAULT 0.5 NOT NULL,
max_running_jobs int(11) DEFAULT 5 NOT NULL ON CONFLICT REPLACE DEFAULT 5,
max_active_jobs int(11) DEFAULT 7 NOT NULL ON CONFLICT REPLACE DEFAULT 7,
qos tinytext DEFAULT '' NOT NULL ON CONFLICT REPLACE DEFAULT '',
PRIMARY KEY (username, bank)
);"""
Expand Down
9 changes: 7 additions & 2 deletions src/bindings/python/fluxacct/accounting/user_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def add_user(
uid=65534,
shares=1,
max_running_jobs=5,
max_active_jobs=7,
qos="",
):

Expand Down Expand Up @@ -102,8 +103,9 @@ def add_user(
"""
INSERT INTO association_table (creation_time, mod_time, deleted,
username, userid, bank, default_bank,
shares, max_running_jobs, qos)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
shares, max_running_jobs,
max_active_jobs, qos)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
int(time.time()),
Expand All @@ -115,6 +117,7 @@ def add_user(
default_bank,
shares,
max_running_jobs,
max_active_jobs,
qos,
),
)
Expand Down Expand Up @@ -163,6 +166,7 @@ def edit_user(
default_bank=None,
shares=None,
max_running_jobs=None,
max_active_jobs=None,
qos=None,
):
params = locals()
Expand All @@ -172,6 +176,7 @@ def edit_user(
"default_bank",
"shares",
"max_running_jobs",
"max_active_jobs",
"qos",
]
for field in editable_fields:
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/flux-account-priority-update.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def bulk_update(path):
# fetch all rows from association_table (will print out tuples)
for row in cur.execute(
"""SELECT userid, bank, default_bank,
fairshare, max_running_jobs FROM association_table"""
fairshare, max_running_jobs, max_active_jobs FROM association_table"""
):
# create a JSON payload with the results of the query
single_user_data = {
Expand All @@ -63,6 +63,7 @@ def bulk_update(path):
"def_bank": str(row[2]),
"fairshare": float(row[3]),
"max_running_jobs": int(row[4]),
"max_active_jobs": int(row[5]),
}
bulk_user_data.append(single_user_data)

Expand Down
14 changes: 14 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ def add_add_user_arg(subparsers):
default=5,
metavar="MAX_RUNNING_JOBS",
)
subparser_add_user.add_argument(
"--max-active-jobs",
help="max number of both pending and running jobs",
default=7,
metavar="max_active_jobs",
)
subparser_add_user.add_argument(
"--qos",
help="quality of service",
Expand Down Expand Up @@ -128,6 +134,12 @@ def add_edit_user_arg(subparsers):
default=None,
metavar="MAX_RUNNING_JOBS",
)
subparser_edit_user.add_argument(
"--max-active-jobs",
help="max number of both pending and running jobs",
default=7,
metavar="max_active_jobs",
)
subparser_edit_user.add_argument(
"--qos",
help="quality of service",
Expand Down Expand Up @@ -439,6 +451,7 @@ def select_accounting_function(args, conn, output_file, parser):
args.userid,
args.shares,
args.max_running_jobs,
args.max_active_jobs,
args.qos,
)
elif args.func == "delete_user":
Expand All @@ -451,6 +464,7 @@ def select_accounting_function(args, conn, output_file, parser):
args.default_bank,
args.shares,
args.max_running_jobs,
args.max_active_jobs,
args.qos,
)
elif args.func == "view_job_records":
Expand Down
41 changes: 27 additions & 14 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ std::map<int, std::string> users_def_bank;

struct bank_info {
double fairshare;
int max_running_jobs;
int current_jobs;
int max_run_jobs;
int cur_run_jobs;
int max_active_jobs;
int cur_active_jobs;
std::vector<long int> held_jobs;
};

Expand Down Expand Up @@ -105,7 +107,7 @@ static void rec_update_cb (flux_t *h,
void *arg)
{
char *bank, *def_bank = NULL;
int uid, max_running_jobs = 0;
int uid, max_running_jobs, max_active_jobs = 0;
double fshare = 0.0;
json_t *data, *jtemp = NULL;
json_error_t error;
Expand All @@ -128,19 +130,21 @@ static void rec_update_cb (flux_t *h,
for (int i = 0; i < num_data; i++) {
json_t *el = json_array_get(data, i);

if (json_unpack_ex (el, &error, 0, "{s:i, s:s, s:s, s:F, s:i}",
if (json_unpack_ex (el, &error, 0, "{s:i, s:s, s:s, s:F, s:i, s:i}",
"userid", &uid,
"bank", &bank,
"def_bank", &def_bank,
"fairshare", &fshare,
"max_running_jobs", &max_running_jobs) < 0)
"max_running_jobs", &max_running_jobs,
"max_active_jobs", &max_active_jobs) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

struct bank_info *b;
b = &users[uid][bank];

b->fairshare = fshare;
b->max_running_jobs = max_running_jobs;
b->max_run_jobs = max_running_jobs;
b->max_active_jobs = max_active_jobs;

users_def_bank[uid] = def_bank;
}
Expand Down Expand Up @@ -207,7 +211,7 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

b->current_jobs++;
b->cur_run_jobs++;

return 0;
}
Expand All @@ -225,7 +229,7 @@ static int validate_cb (flux_plugin_t *p,
{
int userid;
char *bank = NULL;
int current_jobs, max_jobs = 0;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
Expand Down Expand Up @@ -262,18 +266,24 @@ static int validate_cb (flux_plugin_t *p,
"user/default bank entry does not exist");
}

max_jobs = bank_it->second.max_running_jobs;
current_jobs = bank_it->second.current_jobs;
max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
max_active_jobs = bank_it->second.max_active_jobs;

// if a user's fairshare value is 0, that means they shouldn't be able
// to run jobs on a system
if (fairshare == 0)
return flux_jobtap_reject_job (p, args, "user fairshare value is 0");

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs)
return flux_jobtap_reject_job (p, args, "user has max active jobs");

// special case where the user/bank bank_info struct is set to NULL; used
// for testing the "if (b == NULL)" checks
if (max_jobs == -1) {
if (max_run_jobs == -1) {
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info",
Expand All @@ -292,6 +302,8 @@ static int validate_cb (flux_plugin_t *p,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");

bank_it->second.cur_active_jobs++;

return 0;
}

Expand Down Expand Up @@ -332,7 +344,7 @@ static int depend_cb (flux_plugin_t *p,

// if user has already hit their max running jobs count, add a job
// dependency to hold job until an already running job has finished
if ((b->max_running_jobs > 0) && (b->current_jobs == b->max_running_jobs)) {
if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) {
if (flux_jobtap_dependency_add (p, id, "max-jobs-limit") < 0) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0, "failed to add " \
Expand Down Expand Up @@ -380,11 +392,12 @@ static int inactive_cb (flux_plugin_t *p,
return -1;
}

b->current_jobs--;
b->cur_run_jobs--;
b->cur_active_jobs--;

// if the user/bank combo has any currently held jobs and the user is now
// under their max jobs limit, remove the dependency from first held job
if ((b->held_jobs.size () > 0) && (b->current_jobs < b->max_running_jobs)) {
if ((b->held_jobs.size () > 0) && (b->cur_run_jobs < b->max_run_jobs)) {
long int jobid = b->held_jobs.front ();

if (flux_jobtap_dependency_remove (p, jobid, "max-jobs-limit") < 0)
Expand Down
10 changes: 5 additions & 5 deletions t/t1001-mf-priority-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ test_expect_success 'create fake_payload.py' '
# create an array of JSON payloads
bulk_update_data = {
"data" : [
{"userid": userid, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 10},
{"userid": userid, "bank": "account2", "def_bank": "account3", "fairshare": 0.11345, "max_running_jobs": 10}
{"userid": userid, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 10, "max_active_jobs": 12},
{"userid": userid, "bank": "account2", "def_bank": "account3", "fairshare": 0.11345, "max_running_jobs": 10, "max_active_jobs": 12}
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_update", json.dumps(bulk_update_data)).get()
Expand Down Expand Up @@ -141,7 +141,7 @@ test_expect_success 'create a fake payload with a 0 fairshare key-value pair' '
# create an array of JSON payloads
bulk_update_data = {
"data" : [
{"userid": userid, "bank": "account4", "def_bank": "account3", "fairshare": 0.0, "max_running_jobs": 10}
{"userid": userid, "bank": "account4", "def_bank": "account3", "fairshare": 0.0, "max_running_jobs": 10, "max_active_jobs": 12}
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_update", json.dumps(bulk_update_data)).get()
Expand All @@ -162,7 +162,7 @@ test_expect_success 'pass special key to user/bank struct to nullify information
cat <<-EOF >null_struct.json
{
"data" : [
{"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": -1}
{"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": -1, "max_active_jobs": 12}
]
}
EOF
Expand All @@ -182,7 +182,7 @@ test_expect_success 'resend user/bank information with valid data and successful
cat <<-EOF >valid_info.json
{
"data" : [
{"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 2}
{"userid": 5011, "bank": "account3", "def_bank": "account3", "fairshare": 0.45321, "max_running_jobs": 2, "max_active_jobs": 4}
]
}
EOF
Expand Down
14 changes: 7 additions & 7 deletions t/t1002-mf-priority-small-no-tie.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ test_expect_success 'create a group of users with unique fairshare values' '
cat <<-EOF >fake_small_no_tie.json
{
"data" : [
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.285714, "max_running_jobs": 5},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.142857, "max_running_jobs": 5},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.428571, "max_running_jobs": 5},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.714286, "max_running_jobs": 5},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.571429, "max_running_jobs": 5},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.857143, "max_running_jobs": 5}
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.285714, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.142857, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.428571, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.714286, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.571429, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.857143, "max_running_jobs": 5, "max_active_jobs": 7}
]
}
EOF
Expand Down
16 changes: 8 additions & 8 deletions t/t1003-mf-priority-small-tie.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ test_expect_success 'create a group of users with some ties in fairshare values'
cat <<-EOF >fake_small_tie.json
{
"data" : [
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.75, "max_running_jobs": 5},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5},
{"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 0.75, "max_running_jobs": 5},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.875, "max_running_jobs": 5}
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.5, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 0.75, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 1.0, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.875, "max_running_jobs": 5, "max_active_jobs": 7}
]
}
EOF
Expand Down
18 changes: 9 additions & 9 deletions t/t1004-mf-priority-small-tie-all.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ test_expect_success 'create a group of users with many ties in fairshare values'
cat <<-EOF >fake_small_tie_all.json
{
"data" : [
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 1, "max_running_jobs": 5},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 1, "max_running_jobs": 5},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5},
{"userid": 5033, "bank": "account3", "def_bank": "account3", "fairshare": 1, "max_running_jobs": 5}
{"userid": 5011, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5012, "bank": "account1", "def_bank": "account1", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5013, "bank": "account1", "def_bank": "account1", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5021, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5022, "bank": "account2", "def_bank": "account2", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5023, "bank": "account2", "def_bank": "account2", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5031, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5032, "bank": "account3", "def_bank": "account3", "fairshare": 0.666667, "max_running_jobs": 5, "max_active_jobs": 7},
{"userid": 5033, "bank": "account3", "def_bank": "account3", "fairshare": 1, "max_running_jobs": 5, "max_active_jobs": 7}
]
}
EOF
Expand Down
Loading