Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] priority plugin: add configurable plugin factor weights for priority calculation #220

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/bindings/python/fluxacct/accounting/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ acctpy_PYTHON = \
user_subcommands.py \
bank_subcommands.py \
queue_subcommands.py \
plugin_factor_subcommands.py \
job_archive_interface.py \
create_db.py

Expand All @@ -16,7 +17,8 @@ TESTSCRIPTS = \
test/test_example.py \
test/test_job_archive_interface.py \
test/test_user_subcommands.py \
test/test_queue_subcommands.py
test/test_queue_subcommands.py \
test/test_plugin_factor_subcommands.py

dist_check_SCRIPTS = \
$(TESTSCRIPTS)
Expand Down
22 changes: 22 additions & 0 deletions src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,27 @@ def create_db(
PRIMARY KEY (queue)
);"""
)
logging.info("Created queue_table successfully")

# Plugin Weight Table
# stores the weights for all of the priority factors in the multi-factor
# priority plugin
logging.info("Creating plugin_factor_table in DB...")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS plugin_factor_table (
factor tinytext NOT NULL,
weight int(11) DEFAULT 1,
PRIMARY KEY (factor)
);"""
)
logging.info("Created plugin_factor_table successfully")
conn.execute(
"INSERT INTO plugin_factor_table (factor, weight) VALUES ('fairshare', 100000);"
)
conn.execute(
"INSERT INTO plugin_factor_table (factor, weight) VALUES ('queue', 10000);"
)
conn.commit()

conn.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3

###############################################################
# Copyright 2020 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import sqlite3


def view_factor(conn, factor):
cur = conn.cursor()
try:
# get the information pertaining to a plugin weight in the DB
cur.execute("SELECT * FROM plugin_factor_table WHERE factor=?", (factor,))
rows = cur.fetchall()
headers = [description[0] for description in cur.description]
if not rows:
print("Factor not found in plugin_factor_table")
else:
# print column names of plugin_factor_table
for header in headers:
print(header.ljust(18), end=" ")
print()
for row in rows:
for col in list(row):
print(str(col).ljust(18), end=" ")
print()
except sqlite3.OperationalError as e_database_error:
print(e_database_error)


def edit_factor(conn, factor, weight=None):
try:
update_stmt = "UPDATE plugin_factor_table SET weight=? WHERE factor=?"
updated_weight = int(weight) # check that the weight is of type int

conn.execute(
update_stmt,
(
weight,
factor,
),
)

# commit changes
conn.commit()
except ValueError:
raise ValueError("Weight must be an integer")
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def test_01_tables_exist(self):
"job_usage_factor_table",
"t_half_life_period_table",
"queue_table",
"plugin_factor_table",
]
self.assertEqual(list_of_tables, expected)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3

###############################################################
# Copyright 2020 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import unittest
import os
import sqlite3

from fluxacct.accounting import create_db as c
from fluxacct.accounting import plugin_factor_subcommands as p


class TestAccountingCLI(unittest.TestCase):
@classmethod
def setUpClass(self):
# create test accounting database
c.create_db("TestPluginFactorSubcommands.db")
global acct_conn
global cur

acct_conn = sqlite3.connect("TestPluginFactorSubcommands.db")
cur = acct_conn.cursor()

# edit the weight for the fairshare factor
def test_01_edit_fairshare_factor_successfully(self):
p.edit_factor(acct_conn, factor="fairshare", weight=1500)
cur.execute("SELECT weight FROM plugin_factor_table WHERE factor='fairshare'")
row = cur.fetchone()

self.assertEqual(row[0], 1500)

# edit the weight for the queue factor
def test_02_edit_queue_factor_successfully(self):
p.edit_factor(acct_conn, factor="queue", weight=200)
cur.execute("SELECT weight FROM plugin_factor_table WHERE factor='queue'")
row = cur.fetchone()

self.assertEqual(row[0], 200)

# try to edit a factor with a bad type
def test_03_edit_factor_bad_type(self):
with self.assertRaises(ValueError):
p.edit_factor(acct_conn, factor="fairshare", weight="foo")

# remove database and log file
@classmethod
def tearDownClass(self):
acct_conn.close()
os.remove("TestPluginFactorSubcommands.db")


def suite():
suite = unittest.TestSuite()

return suite


if __name__ == "__main__":
from pycotap import TAPTestRunner

unittest.main(testRunner=TAPTestRunner())
14 changes: 14 additions & 0 deletions src/cmd/flux-account-priority-update.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def bulk_update(path):
data = {}
bulk_user_data = []
bulk_q_data = []
bulk_factor_data = []

# fetch all rows from association_table (will print out tuples)
for row in cur.execute(
Expand Down Expand Up @@ -90,6 +91,19 @@ def bulk_update(path):

flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(data)).get()

# fetch all factors and their associated weights from the plugin_factor_table
for row in cur.execute("SELECT * FROM plugin_factor_table"):
# create a JSON payload with the results of the query
single_factor_data = {
"factor": str(row[0]),
"weight": int(row[1]),
}
bulk_factor_data.append(single_factor_data)

data = {"data": bulk_factor_data}

flux.Flux().rpc("job-manager.mf_priority.rec_fac_update", json.dumps(data)).get()

flux.Flux().rpc("job-manager.mf_priority.reprioritize")

# close DB connection
Expand Down
30 changes: 30 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from fluxacct.accounting import job_archive_interface as jobs
from fluxacct.accounting import create_db as c
from fluxacct.accounting import queue_subcommands as qu
from fluxacct.accounting import plugin_factor_subcommands as p


def add_path_arg(parser):
Expand Down Expand Up @@ -376,6 +377,29 @@ def add_delete_queue_arg(subparsers):
subparser_delete_queue.add_argument("queue", help="queue name", metavar="QUEUE")


def add_view_factor_arg(subparsers):
subparser_view_factor = subparsers.add_parser(
"view-plugin-factor", help="view a plugin factor and its associated weight"
)
subparser_view_factor.set_defaults(func="view_factor")
subparser_view_factor.add_argument(
"factor", type=str, help="factor name", metavar="FACTOR"
)


def add_edit_factor_arg(subparsers):
subparser_edit_factor = subparsers.add_parser(
"edit-plugin-factor", help="edit a plugin factor's associated weight"
)
subparser_edit_factor.set_defaults(func="edit_factor")
subparser_edit_factor.add_argument(
"factor", type=str, help="factor name", metavar="FACTOR"
)
subparser_edit_factor.add_argument(
"--weight", type=int, help="associated weight", default=None, metavar="WEIGHT"
)


def add_arguments_to_parser(parser, subparsers):
add_path_arg(parser)
add_output_file_arg(parser)
Expand All @@ -394,6 +418,8 @@ def add_arguments_to_parser(parser, subparsers):
add_view_queue_arg(subparsers)
add_edit_queue_arg(subparsers)
add_delete_queue_arg(subparsers)
add_view_factor_arg(subparsers)
add_edit_factor_arg(subparsers)


def set_db_location(args):
Expand Down Expand Up @@ -498,6 +524,10 @@ def select_accounting_function(args, conn, output_file, parser):
args.max_time_per_job,
args.priority,
)
elif args.func == "view_factor":
p.view_factor(conn, args.factor)
elif args.func == "edit_factor":
p.edit_factor(conn, args.factor, args.weight)
else:
print(parser.print_usage())

Expand Down
56 changes: 53 additions & 3 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extern "C" {
std::map<int, std::map<std::string, struct bank_info>> users;
std::map<std::string, struct queue_info> queues;
std::map<int, std::string> users_def_bank;
std::map<std::string, int> plugin_factors;

struct bank_info {
double fairshare;
Expand Down Expand Up @@ -84,8 +85,9 @@ int64_t priority_calculation (flux_plugin_t *p,
int fshare_weight, queue_weight;
struct bank_info *b;

fshare_weight = 100000;
queue_weight = 10000;
// fetch associated weights of all of the priority factors
fshare_weight = plugin_factors["fairshare"];
queue_weight = plugin_factors["queue"];

if (urgency == FLUX_JOB_URGENCY_HOLD)
return FLUX_JOB_PRIORITY_MIN;
Expand Down Expand Up @@ -340,6 +342,53 @@ static void reprior_cb (flux_t *h,
}


/*
* Unpack a payload from an external bulk update service and place it in a
* map datastructure.
*/
static void rec_f_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
char *factor = NULL;
int weight = 0;
json_t *data, *jtemp = NULL;
json_error_t error;
int num_data = 0;

if (flux_request_unpack (msg, NULL, "{s:o}", "data", &data) < 0) {
flux_log_error (h, "failed to unpack custom_priority.trigger msg");
goto error;
}

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");

if (!data || !json_is_array (data)) {
flux_log (h, LOG_ERR, "mf_priority: invalid bulk_update payload");
goto error;
}
num_data = json_array_size (data);

for (int i = 0; i < num_data; i++) {
json_t *el = json_array_get(data, i);

if (json_unpack_ex (el, &error, 0,
"{s:s, s:i}",
"factor", &factor,
"weight", &weight) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

plugin_factors[factor] = weight;
}

return;
error:
flux_respond_error (h, msg, errno, flux_msg_last_error (msg));
}


/*
* Unpack the urgency and userid from a submitted job and call
* priority_calculation (), which will return a new job priority to be packed.
Expand Down Expand Up @@ -810,7 +859,8 @@ extern "C" int flux_plugin_init (flux_plugin_t *p)
if (flux_plugin_register (p, "mf_priority", tab) < 0
|| flux_jobtap_service_register (p, "rec_update", rec_update_cb, p)
|| flux_jobtap_service_register (p, "reprioritize", reprior_cb, p)
|| flux_jobtap_service_register (p, "rec_q_update", rec_q_cb, p) < 0)
|| flux_jobtap_service_register (p, "rec_q_update", rec_q_cb, p)
|| flux_jobtap_service_register (p, "rec_fac_update", rec_f_cb, p) < 0)
return -1;

struct queue_info *q;
Expand Down
3 changes: 2 additions & 1 deletion t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ TESTSCRIPTS = \
t1011-job-archive-interface.t \
t1012-mf-priority-load.t \
t1013-mf-priority-queues.t \
t1014-mf-priority-dne.t
t1014-mf-priority-dne.t \
t1015-mf-priority-weights.t

dist_check_SCRIPTS = \
$(TESTSCRIPTS) \
Expand Down
7 changes: 7 additions & 0 deletions t/t1001-mf-priority-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ test_expect_success 'create fake_payload.py' '
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get()
bulk_fac_data = {
"data" : [
{"factor": "fairshare", "weight": 100000},
{"factor": "queue", "weight": 10000}
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_fac_update", json.dumps(bulk_fac_data)).get()
EOF
'

Expand Down
9 changes: 8 additions & 1 deletion t/t1002-mf-priority-small-no-tie.t
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ test_expect_success 'send the user information to the plugin' '
flux python ${SEND_PAYLOAD} fake_small_no_tie.json
'

test_expect_success 'add a default queue and send it to the plugin' '
test_expect_success 'add a default queue and plugin weights and send it to the plugin' '
cat <<-EOF >fake_payload.py
import flux
import json
Expand All @@ -58,6 +58,13 @@ test_expect_success 'add a default queue and send it to the plugin' '
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(bulk_queue_data)).get()
bulk_fac_data = {
"data" : [
{"factor": "fairshare", "weight": 100000},
{"factor": "queue", "weight": 10000}
]
}
flux.Flux().rpc("job-manager.mf_priority.rec_fac_update", json.dumps(bulk_fac_data)).get()
EOF
flux python fake_payload.py
'
Expand Down
Loading