From f4d71dcc7f5d478391669abb618fe126c883a691 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 10 May 2021 10:51:27 -0700 Subject: [PATCH 01/14] association_table: add userid field --- .../python/fluxacct/accounting/create_db.py | 20 ++++++----- .../fluxacct/accounting/user_subcommands.py | 36 +++++++++++++++++-- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index 47d80b69..66f2707f 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -87,15 +87,16 @@ def create_db( conn.execute( """ CREATE TABLE IF NOT EXISTS association_table ( - creation_time bigint(20) NOT NULL, - mod_time bigint(20) DEFAULT 0 NOT NULL, - deleted tinyint(4) DEFAULT 0 NOT NULL, - username tinytext NOT NULL, - admin_level smallint(6) DEFAULT 1 NOT NULL, - bank tinytext NOT NULL, - shares int(11) DEFAULT 1 NOT NULL, - job_usage real DEFAULT 0.0 NOT NULL, - fairshare real DEFAULT 0.0 NOT NULL, + creation_time bigint(20) NOT NULL, + mod_time bigint(20) DEFAULT 0 NOT NULL, + deleted tinyint(4) DEFAULT 0 NOT NULL, + username tinytext NOT NULL, + userid int(11) DEFAULT 65534 NOT NULL, + admin_level smallint(6) DEFAULT 1 NOT NULL, + bank tinytext NOT NULL, + shares int(11) DEFAULT 1 NOT NULL, + job_usage real DEFAULT 0.0 NOT NULL, + fairshare real DEFAULT 0.0 NOT NULL, PRIMARY KEY (username, bank) );""" ) @@ -122,6 +123,7 @@ def create_db( """ CREATE TABLE IF NOT EXISTS job_usage_factor_table ( username tinytext NOT NULL, + userid int(11) NOT NULL, bank tinytext NOT NULL, last_job_timestamp real DEFAULT 0.0, PRIMARY KEY (username, bank) diff --git a/src/bindings/python/fluxacct/accounting/user_subcommands.py b/src/bindings/python/fluxacct/accounting/user_subcommands.py index 362b5ed5..47d7535b 100755 --- a/src/bindings/python/fluxacct/accounting/user_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/user_subcommands.py @@ -11,6 +11,7 @@ ############################################################### import sqlite3 import time +import pwd import pandas as pd @@ -30,7 +31,32 @@ def view_user(conn, user): print(e_database_error) -def add_user(conn, username, bank, admin_level=1, shares=1): +def get_uid(username): + try: + return pwd.getpwnam(username).pw_uid + except KeyError: + return str(username) + + +def add_user( + conn, + username, + bank, + uid=65534, + admin_level=1, + shares=1, +): + + # get uid of user + fetched_uid = get_uid(username) + + try: + if isinstance(fetched_uid, int): + uid = fetched_uid + else: + raise KeyError + except KeyError as key_error: + print(key_error) try: # insert the user values into association_table @@ -41,17 +67,19 @@ def add_user(conn, username, bank, admin_level=1, shares=1): mod_time, deleted, username, + userid, admin_level, bank, shares ) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( int(time.time()), int(time.time()), 0, username, + uid, admin_level, bank, shares, @@ -64,12 +92,14 @@ def add_user(conn, username, bank, admin_level=1, shares=1): """ INSERT INTO job_usage_factor_table ( username, + userid, bank ) - VALUES (?, ?) + VALUES (?, ?, ?) """, ( username, + uid, bank, ), ) From 52f57cb66dba7da7f73d9325664370cd720251a0 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 10 May 2021 10:57:51 -0700 Subject: [PATCH 02/14] fetch_usg_bins: change index of dataframe values Now that the userid field exists in the job_usage_factor_table, edit the index of values to look for in the dataframe to be moved one to the right. --- .../python/fluxacct/accounting/job_archive_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bindings/python/fluxacct/accounting/job_archive_interface.py b/src/bindings/python/fluxacct/accounting/job_archive_interface.py index 088a5c39..686ddb24 100755 --- a/src/bindings/python/fluxacct/accounting/job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/job_archive_interface.py @@ -267,7 +267,7 @@ def fetch_usg_bins(acct_conn, user=None, bank=None): ), ) - for val in dataframe.iloc[0].values[3:]: + for val in dataframe.iloc[0].values[4:]: if isinstance(val, float): past_usage_factors.append(val) From 8ae03b4878e090c903f2480a037f73ec9be7d11c Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 10 May 2021 11:00:49 -0700 Subject: [PATCH 03/14] test: change tests to account for new userid field --- .../python/fluxacct/accounting/test/test_create_db.py | 4 ++-- .../accounting/test/test_job_archive_interface.py | 8 ++++---- .../fluxacct/accounting/test/test_user_subcommands.py | 7 ++++++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/test/test_create_db.py b/src/bindings/python/fluxacct/accounting/test/test_create_db.py index 9b8bdff2..11dbee37 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_create_db.py +++ b/src/bindings/python/fluxacct/accounting/test/test_create_db.py @@ -55,10 +55,10 @@ def test_02_create_association(self): conn.execute( """ INSERT INTO association_table - (creation_time, mod_time, deleted, username, admin_level, + (creation_time, mod_time, deleted, username, userid, admin_level, bank, shares) VALUES - (0, 0, 0, "test user", 1, "test account", 0) + (0, 0, 0, "test user", 1234, 1, "test account", 0) """ ) cursor = conn.cursor() diff --git a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py index c7e12603..b61941e9 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py @@ -73,10 +73,10 @@ def setUpClass(self): b.add_bank(acct_conn, bank="D", parent_bank="B", shares=1) # add users - u.add_user(acct_conn, username="1001", bank="C") - u.add_user(acct_conn, username="1002", bank="C") - u.add_user(acct_conn, username="1003", bank="D") - u.add_user(acct_conn, username="1004", bank="D") + u.add_user(acct_conn, username="1001", uid="1001", bank="C") + u.add_user(acct_conn, username="1002", uid="1002", bank="C") + u.add_user(acct_conn, username="1003", uid="1003", bank="D") + u.add_user(acct_conn, username="1004", uid="1004", bank="D") jobid = 100 interval = 0 # add to job timestamps to diversify job-archive records diff --git a/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py b/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py index 25b4dd3f..f34f6b9e 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py @@ -32,6 +32,7 @@ def test_01_add_valid_user(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -44,12 +45,13 @@ def test_01_add_valid_user(self): self.assertEqual(num_rows_assoc_table, num_rows_job_usage_factor_table) - # adding a user with the same primary key (user_name, account) should + # adding a user with the same primary key (username, account) should # return an IntegrityError def test_02_add_duplicate_primary_key(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -57,6 +59,7 @@ def test_02_add_duplicate_primary_key(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -70,6 +73,7 @@ def test_03_add_duplicate_user(self): u.add_user( acct_conn, username="dup_user", + uid="5678", admin_level="1", bank="acct", shares="10", @@ -77,6 +81,7 @@ def test_03_add_duplicate_user(self): u.add_user( acct_conn, username="dup_user", + uid="5678", admin_level="1", bank="other_acct", shares="10", From 901b64afa4643dfdf734a5724b5e08a8715bc676 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 11 May 2021 09:54:36 -0700 Subject: [PATCH 04/14] flux-account: add uid arg to add-user subcommand --- src/cmd/flux-account.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index b4f7f493..ab59ad1d 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -52,6 +52,11 @@ def add_add_user_arg(subparsers): help="username", metavar="USERNAME", ) + subparser_add_user.add_argument( + "--userid", + help="userid", + metavar="USERID", + ) subparser_add_user.add_argument( "--admin-level", help="admin level", @@ -295,6 +300,7 @@ def select_accounting_function(args, conn, output_file, parser): conn, args.username, args.bank, + args.userid, args.admin_level, args.shares, args.max_jobs, From 25c6c4f6a65ed27a72c093eb14beba777faf9000 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 24 May 2021 10:24:23 -0700 Subject: [PATCH 05/14] association_table: change default val of fairshare Problem: The current default value for the fairshare field in the association table is 0.0, which in terms of fairshare, indicates that the user has used way more resources than allocated, which is an inaccurate representation when a user is first added to the flux-accounting database. Solution: Change the default value for fairshare to 0.5, which means that the user has used exactly the amount of resources allocated to them. --- src/bindings/python/fluxacct/accounting/create_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index 66f2707f..c3bcdc6c 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -96,7 +96,7 @@ def create_db( bank tinytext NOT NULL, shares int(11) DEFAULT 1 NOT NULL, job_usage real DEFAULT 0.0 NOT NULL, - fairshare real DEFAULT 0.0 NOT NULL, + fairshare real DEFAULT 0.5 NOT NULL, PRIMARY KEY (username, bank) );""" ) From 66114ef689ca8c51893a4e7a18cb4800fec66f7c Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 26 May 2021 10:54:37 -0700 Subject: [PATCH 06/14] plugins: add multifactor priority plugin Add a new plugin to flux-accounting: a multi-factor priority plugin which takes factors from a flux-accounting DB to generate an integer priority value for a submitte job. --- src/plugins/mf_priority.cpp | 230 ++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 src/plugins/mf_priority.cpp diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp new file mode 100644 index 00000000..db8bbbe0 --- /dev/null +++ b/src/plugins/mf_priority.cpp @@ -0,0 +1,230 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* mycpppriority.cpp - custom basic job priority plugin + * + */ + +extern "C" { +#if HAVE_CONFIG_H +#include "config.h" +#endif +} +#include +#include +#include +#include +#include +#include +#include +#include + +std::map> users; + +/****************************************************************************** + * * + * Helper Functions * + * * + *****************************************************************************/ + +/* + * Calculate a user's job priority using the following factors: + * + * fairshare: the ratio between the amount of resources allocated vs. resources + * consumed. + * urgency: a user-controlled factor to prioritize their own jobs. + */ +int64_t priority_calculation (flux_plugin_t *p, + flux_plugin_arg_t *args, + int userid, + char *bank, + int urgency) +{ + double fshare_factor = 0.0, priority = 0.0; + int fshare_weight; + + std::map>::iterator it; + std::map::iterator inner_it; + + fshare_weight = 100000; + + if (urgency == FLUX_JOB_URGENCY_HOLD) + return FLUX_JOB_PRIORITY_MIN; + + if (urgency == FLUX_JOB_URGENCY_EXPEDITE) + return FLUX_JOB_PRIORITY_MAX; + + // search element in map of maps by key + it = users.find (userid); + + // search for correct fshare value using passed-in bank; otherwise, use + // a default bank + if (bank != NULL) { + inner_it = it->second.find (bank); + fshare_factor = inner_it->second; + } + else { + inner_it = it->second.find ("default"); + fshare_factor = inner_it->second; + } + + priority = (fshare_weight * fshare_factor) + (urgency - 16); + + return abs (round (priority)); +} + + +/****************************************************************************** + * * + * Callbacks * + * * + *****************************************************************************/ + +/* + * Unpack a payload from an external bulk update service and place it in the + * multimap datastructure. + */ +static void rec_update_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + char *uid, *fshare, *bank; + + if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s}", + "userid", &uid, + "bank", &bank, + "fairshare", &fshare) < 0) { + flux_log_error (h, "failed to unpack custom_priority.trigger msg"); + goto error; + } + + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "flux_respond"); + + // if the user being added to the does not yet have any entries in the map, + // treat their first bank as the "default" bank + if (users.count (std::atoi (uid)) == 0) + users[std::atoi (uid)]["default"] = std::stod (fshare); + + users[std::atoi (uid)][bank] = std::stod (fshare); + + return; +error: + flux_respond_error (h, msg, errno, flux_msg_last_error (msg)); +} + + +/* + * Unpack the urgency and userid from a submitted job and call calc_priority (), + * which will return a new job priority to be packed. + */ +static int priority_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int urgency, userid; + char *bank = NULL; + int64_t priority; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s:i, s{s{s{s?s}}}}", + "urgency", &urgency, + "userid", &userid, + "jobspec", "attributes", "system", + "bank", &bank) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + + priority = priority_calculation (p, args, userid, bank, urgency); + + if (flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:I}", + "priority", + priority) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_pack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + return 0; +} + + +/* + * Look up the userid of the submitted job in the multimap; if user is not found + * in the map, reject the job saying the user wasn't found in the + * flux-accounting database. + */ +static int validate_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int userid; + char *bank = NULL; + std::map>::iterator it; + std::map::iterator inner_it; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s{s{s{s?s}}}}", + "userid", &userid, + "jobspec", "attributes", "system", + "bank", &bank) < 0) { + return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); + } + + // searching element in std::map by key + it = users.find (userid); + if (it == users.end ()) + return flux_jobtap_reject_job (p, args, + "user not found in flux-accounting DB"); + + if (bank != NULL) { + inner_it = it->second.find (std::string (bank)); + if (inner_it == it->second.end ()) + return flux_jobtap_reject_job (p, args, + "user does not belong to specified bank"); + } + + return 0; +} + + +static const struct flux_plugin_handler tab[] = { + { "job.validate", validate_cb, NULL }, + { "job.state.priority", priority_cb, NULL }, + { "job.priority.get", priority_cb, NULL }, + { 0 }, +}; + + +extern "C" int flux_plugin_init (flux_plugin_t *p) +{ + if (flux_plugin_register (p, "mf_priority", tab) < 0 + || flux_jobtap_service_register (p, "rec_update", rec_update_cb, p) < 0) + return -1; + return 0; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ From 783d2228e0ee747a351ab80e214f8718ff320af4 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 26 May 2021 10:56:56 -0700 Subject: [PATCH 07/14] plugins: add external service Add an external service for the multi-factor priority plugin which is responsible for querying data from a flux-accounting DB and pushing it to the plugin. --- src/plugins/bulk_update.py | 76 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100755 src/plugins/bulk_update.py diff --git a/src/plugins/bulk_update.py b/src/plugins/bulk_update.py new file mode 100755 index 00000000..1ebe86b3 --- /dev/null +++ b/src/plugins/bulk_update.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import flux +import argparse +import sys +import os +import sqlite3 + +import fluxacct.accounting + + +def set_db_loc(args): + path = args.path if args.path else fluxacct.accounting.db_path + + return path + + +def est_sqlite_conn(path): + # try to open database file; will exit with -1 if database file not found + if not os.path.isfile(path): + print(f"Database file does not exist: {path}", file=sys.stderr) + sys.exit(1) + + db_uri = "file:" + path + "?mode=rw" + try: + conn = sqlite3.connect(db_uri, uri=True) + # set foreign keys constraint + conn.execute("PRAGMA foreign_keys = 1") + except sqlite3.OperationalError: + print(f"Unable to open database file: {db_uri}", file=sys.stderr) + sys.exit(1) + + return conn + + +def bulk_update(path): + conn = est_sqlite_conn(path) + cur = conn.cursor() + + # fetch all rows from association_table (will print out tuples) + for row in cur.execute("SELECT userid, bank, fairshare FROM association_table"): + # create a JSON payload with the results of the query + data = {"userid": str(row[0]), "bank": str(row[1]), "fairshare": str(row[2])} + + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + + +def main(): + parser = argparse.ArgumentParser( + description=""" + Description: Send a bulk update of user information from a + flux-accounting database to the multi-factor priority plugin. + """ + ) + + parser.add_argument( + "-p", "--path", dest="path", help="specify location of database file" + ) + args = parser.parse_args() + + path = set_db_loc(args) + + bulk_update(path) + + +if __name__ == "__main__": + main() From ea3cdb64865b70b990b24d5207f0b5cec0ea79ed Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 7 Jun 2021 08:37:56 -0700 Subject: [PATCH 08/14] make: add Makefile for plugin --- configure.ac | 4 ++++ src/Makefile.am | 2 +- src/plugins/Makefile.am | 9 +++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 src/plugins/Makefile.am diff --git a/configure.ac b/configure.ac index 8a15d2f9..0b7fffd7 100644 --- a/configure.ac +++ b/configure.ac @@ -107,6 +107,9 @@ AC_SUBST(acct_db_path) AC_CONFIG_MACRO_DIR([config]) X_AC_EXPAND_INSTALL_DIRS +fluxplugin_ldflags="-avoid-version -export-symbols-regex '^flux_plugin_init\$\$' --disable-static -shared -export-dynamic" +AC_SUBST(fluxplugin_ldflags) + AC_CONFIG_FILES([Makefile src/Makefile src/common/Makefile @@ -125,6 +128,7 @@ AC_CONFIG_FILES([Makefile src/bindings/python/fluxacct/accounting/Makefile src/bindings/python/fluxacct/accounting/__init__.py src/cmd/Makefile + src/plugins/Makefile t/Makefile ]) AC_OUTPUT diff --git a/src/Makefile.am b/src/Makefile.am index c98e1813..7df3050c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1 +1 @@ -SUBDIRS = common cmd fairness bindings +SUBDIRS = common cmd fairness bindings plugins diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am new file mode 100644 index 00000000..9b1bfba7 --- /dev/null +++ b/src/plugins/Makefile.am @@ -0,0 +1,9 @@ +AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS) + +AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) + +AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared + +lib_LTLIBRARIES = mf_priority.la +mf_priority_la_SOURCES = mf_priority.cpp +mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module From 7f8979495daac21490426616450003f23e3895e4 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 1 Jun 2021 11:15:32 -0700 Subject: [PATCH 09/14] docker: specify latest version of flux-core --- src/test/docker/bionic/Dockerfile | 2 +- src/test/docker/centos8/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/docker/bionic/Dockerfile b/src/test/docker/bionic/Dockerfile index cf53ae92..d780e53d 100644 --- a/src/test/docker/bionic/Dockerfile +++ b/src/test/docker/bionic/Dockerfile @@ -1,4 +1,4 @@ -FROM fluxrm/flux-core:bionic +FROM fluxrm/flux-core:bionic-v0.28.0 ARG USER=flux ARG UID=1000 diff --git a/src/test/docker/centos8/Dockerfile b/src/test/docker/centos8/Dockerfile index a15dd5a4..f4e0f2af 100644 --- a/src/test/docker/centos8/Dockerfile +++ b/src/test/docker/centos8/Dockerfile @@ -1,4 +1,4 @@ -FROM fluxrm/flux-core:centos8 +FROM fluxrm/flux-core:centos8-v0.28.0 ARG USER=flux ARG UID=1000 From be6d976e10f0cdc807d756c4efb4bf447b9cb56d Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 27 May 2021 08:03:05 -0700 Subject: [PATCH 10/14] t: add rc directory Add an rc directory to load modules for running tests under a flux instance. --- t/rc/rc1-job | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++++ t/rc/rc1-kvs | 16 ++++++++++++ t/rc/rc3-job | 31 ++++++++++++++++++++++ t/rc/rc3-kvs | 17 ++++++++++++ 4 files changed, 137 insertions(+) create mode 100755 t/rc/rc1-job create mode 100755 t/rc/rc1-kvs create mode 100755 t/rc/rc3-job create mode 100755 t/rc/rc3-kvs diff --git a/t/rc/rc1-job b/t/rc/rc1-job new file mode 100755 index 00000000..075193e0 --- /dev/null +++ b/t/rc/rc1-job @@ -0,0 +1,73 @@ +#!/bin/bash -e + +idset_from_count() { + if test $1 -eq 1; then + echo "0" + else + echo "0-$(($1 - 1))" + fi +} + +set_fake_resources() { + cores=${1} + ranklist=$(idset_from_count $(flux getattr size)) + corelist=$(idset_from_count ${cores}) + R=$(flux R encode -r${ranklist} -c${corelist}) + echo Setting fake resource.R="$R" >&2 + flux kvs put resource.R="$R" +} + +RANK=$(flux getattr rank) + +# Usage: modload {all|} modname [args ...] +modload() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module load $* + fi +} + + +modload 0 content-sqlite +modload all kvs +modload all kvs-watch + +modload 0 job-manager + +modload all job-ingest +modload all job-info +modload 0 job-list +modload all barrier +modload 0 heartbeat + +if test $RANK -eq 0; then + # Set fake resources for testing + set_fake_resources ${TEST_UNDER_FLUX_CORES_PER_RANK:-2} +fi +modload all resource noverify + +if [ "${TEST_UNDER_FLUX_NO_JOB_EXEC}" != "y" ] +then + modload 0 job-exec +fi + +# mirror sched-simple default of limited=8 +if [ "${TEST_UNDER_FLUX_SCHED_SIMPLE_MODE}x" != "x" ] +then + mode=${TEST_UNDER_FLUX_SCHED_SIMPLE_MODE} +else + mode="limited=8" +fi + +modload 0 sched-simple mode=${mode} +#--setbit 0x2 enables creation of reason_pending field +if [ $RANK -eq 0 ] +then + flux module debug --setbit 0x2 sched-simple +fi + +test $RANK -ne 0 || flux admin cleanup-push <<-EOT + flux queue stop + flux job cancelall -f --states RUN + flux queue idle +EOT diff --git a/t/rc/rc1-kvs b/t/rc/rc1-kvs new file mode 100755 index 00000000..d26dbdfc --- /dev/null +++ b/t/rc/rc1-kvs @@ -0,0 +1,16 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modload {all|} modname [args ...] +modload() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module load $* + fi +} + +modload 0 content-sqlite +modload all kvs +modload all kvs-watch +modload 0 heartbeat diff --git a/t/rc/rc3-job b/t/rc/rc3-job new file mode 100755 index 00000000..ed019ace --- /dev/null +++ b/t/rc/rc3-job @@ -0,0 +1,31 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modrm {all|} modname +modrm() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module remove -f $* + fi +} + +if [ "${TEST_UNDER_FLUX_NO_EXEC}" != "y" ] +then + modrm 0 job-exec +fi +modrm 0 heartbeat +modrm 0 sched-simple +modrm all resource +modrm 0 job-list +modrm all job-info +modrm 0 job-manager +modrm all barrier +modrm all kvs-watch +modrm all job-ingest + +modrm all kvs + +flux content flush + +modrm 0 content-sqlite diff --git a/t/rc/rc3-kvs b/t/rc/rc3-kvs new file mode 100755 index 00000000..77625805 --- /dev/null +++ b/t/rc/rc3-kvs @@ -0,0 +1,17 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modrm {all|} modname +modrm() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module remove -f $* + fi +} +modrm 0 heartbeat +modrm all kvs-watch +modrm all kvs + +flux content flush +modrm 0 content-sqlite From bd152a161d3ba5b041a8c6ead7ba4f8c6bed8f09 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 1 Jun 2021 11:17:13 -0700 Subject: [PATCH 11/14] sharness: drop --bootstrap=selfpmi --- t/sharness.d/flux-sharness.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/sharness.d/flux-sharness.sh b/t/sharness.d/flux-sharness.sh index ddf2273f..d40d991a 100644 --- a/t/sharness.d/flux-sharness.sh +++ b/t/sharness.d/flux-sharness.sh @@ -123,7 +123,7 @@ test_under_flux() { logopts="-o -Slog-filename=${log_file},-Slog-forward-level=7" TEST_UNDER_FLUX_ACTIVE=t \ TERM=${ORIGINAL_TERM} \ - exec flux start --bootstrap=selfpmi --size=${size} \ + exec flux start --test-size=${size} \ ${RC1_PATH+-o -Sbroker.rc1_path=${RC1_PATH}} \ ${RC3_PATH+-o -Sbroker.rc3_path=${RC3_PATH}} \ ${logopts} \ From c9aec4ab2d8382107c7af199e86fbf4b9ad6f6bc Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 7 Jun 2021 10:03:00 -0700 Subject: [PATCH 12/14] .github: add checks-annotate.sh --- .github/workflows/main.yml | 5 ++ src/test/checks-annotate.sh | 101 ++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100755 src/test/checks-annotate.sh diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bd6595b9..71783268 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -106,3 +106,8 @@ jobs: - name: coverage report if: success() && matrix.coverage run: DOCKER_REPO= bash <(curl -s https://codecov.io/bash) + + - name: annotate errors + if: failure() || cancelled() + env: ${{matrix.env}} + run: src/test/checks-annotate.sh diff --git a/src/test/checks-annotate.sh b/src/test/checks-annotate.sh new file mode 100755 index 00000000..6a169117 --- /dev/null +++ b/src/test/checks-annotate.sh @@ -0,0 +1,101 @@ +#!/bin/bash +# +# Post-process testsuite logs and outputs after a failure +# +# Uses GH Workflow commands for GH Actions +# +error() { + printf "::error::$@\n" +} +catfile() { + if test -f $1; then + printf "::group::$1\n" + cat $1 + printf "::endgroup::\n" + fi +} +catfile_error() { + error "Found $1" + catfile $1 +} +annotate_test_log() { + # + # Look through test logfiles for various failure indicators and + # emit an annotation '::error::' to the logfile if found: + # + local test=$1 + + # Emit an annotation for each failed test ('not ok') + grep 'not ok' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for TAP ERROR lines: + grep '^ERROR: ' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for chain-lint errors: + grep '^error: bug in the test script' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for anything that looks like an ASan error: + sed -n 's/==[0-9][0-9]*==ERROR: //p' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done +} + +# +# Check all testsuite *.trs files and check for results that +# were not 'SKIP' or 'PASS': +# +logfile=/tmp/check-errors.$$ +cat /dev/null >$logfile + +errors=0 +total=0 +for trs in $(find . -name *.trs); do + : $((total++)) + result=$(sed -n 's/^.*global-test-result: *//p' ${trs}) + if test "$result" != "PASS" -a "$result" != "SKIP"; then + testbase=${trs//.trs} + annotate_test_log $testbase >> $logfile + catfile ${testbase}.output >> $logfile + catfile ${testbase}.log >> $logfile + : $((errors++)) + fi +done +if test $errors -gt 0; then + printf "::warning::" +fi +printf "Found ${errors} errors from ${total} tests in testsuite\n" +cat $logfile +rm $logfile + +# +# Find and emit all *.asan.* files from test: +# +export -f catfile_error +export -f catfile +export -f error +find . -name *.asan.* | xargs -i bash -c 'catfile_error {}' + + +# +# Check for any expected tests that were not run: +# +ls -1 t/*.t | sort >/tmp/expected +ls -1 t/*.trs | sed 's/rs$//' | sort >/tmp/actual +comm -23 /tmp/expected /tmp/actual > missing +if test -s missing; then + error "Detected $(wc -l missing) missing tests:" + for f in $(cat missing); do + printf "$f\n" + file=${f//.t} + test -f ${file}.log && catfile ${file}.log + test -f ${file}.output && catfile ${file}.output + done +else + printf "No missing test runs detected\n" +fi From 14214d47a48599921142240d4343969d71c84891 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 27 May 2021 08:05:14 -0700 Subject: [PATCH 13/14] t: add tests for multi-factor priority plugin --- t/t1001-mf-priority.t | 127 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100755 t/t1001-mf-priority.t diff --git a/t/t1001-mf-priority.t b/t/t1001-mf-priority.t new file mode 100755 index 00000000..250cb7fd --- /dev/null +++ b/t/t1001-mf-priority.t @@ -0,0 +1,127 @@ +#!/bin/bash + +test_description='Test multi-factor priority plugin' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +# BULK_UPDATE=${FLUX_BUILD_DIR}/t/scripts/send_fake_payloads.py + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'try to submit a job when user does not exist in DB' ' + test_must_fail flux mini submit -n1 hostname > failure.out 2>&1 && + test_debug "cat failure.out" && + grep "user not found in flux-accounting DB" failure.out +' + +test_expect_success 'send an empty payload to make sure unpack fails' ' + cat <<-EOF >bad_payload.py && + import flux + + #create a JSON payload + flux.Flux().rpc("job-manager.mf_priority.rec_update", {}).get() + EOF + test_must_fail flux python bad_payload.py && + flux dmesg | grep "failed to unpack custom_priority.trigger msg: Protocol error" +' + +test_expect_success 'create fake_payload.py' ' + cat <<-EOF >fake_payload.py + import flux + import pwd + import getpass + + username = getpass.getuser() + userid = pwd.getpwnam(username).pw_uid + # create a JSON payload + data = {"userid": str(userid), "bank": "account3", "fairshare": "0.45321"} + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + data = {"userid": str(userid), "bank": "account2", "fairshare": "0.11345"} + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + EOF +' + +test_expect_success 'update plugin with sample test data' ' + flux python fake_payload.py +' + +test_expect_success 'submit a job with default urgency' ' + jobid=$(flux mini submit --setattr=system.bank=account3 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job1.test && + cat <<-EOF >job1.expected && + 45321 + EOF + test_cmp job1.expected job1.test +' + +test_expect_success 'submit a job with custom urgency' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=15 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job2.test && + cat <<-EOF >job2.expected && + 45320 + EOF + test_cmp job2.expected job2.test +' + +test_expect_success 'submit a job with urgency of 0' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=0 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job3.test && + cat <<-EOF >job3.expected && + 0 + EOF + test_cmp job3.expected job3.test && + flux job cancel $jobid +' + +test_expect_success 'submit a job with urgency of 31' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=31 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job4.test && + cat <<-EOF >job4.expected && + 4294967295 + EOF + test_cmp job4.expected job4.test +' + +test_expect_success 'submit a job with other bank' ' + jobid=$(flux mini submit --setattr=system.bank=account2 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job5.test && + cat <<-EOF >job5.expected && + 11345 + EOF + test_cmp job5.expected job5.test +' + +test_expect_success 'submit a job using default bank' ' + jobid=$(flux mini submit -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job6.test && + cat <<-EOF >job6.expected && + 45321 + EOF + test_cmp job6.expected job6.test +' + +test_expect_success 'submit a job using a bank the user does not belong to' ' + test_must_fail flux mini submit --setattr=system.bank=account1 -n1 hostname > bad_bank.out 2>&1 && + test_debug "cat bad_bank.out" && + grep "user does not belong to specified bank" bad_bank.out +' + +test_expect_success 'reject job when invalid bank format is passed in' ' + test_must_fail flux mini submit --setattr=system.bank=1 -n1 hostname > invalid_fmt.out 2>&1 && + test_debug "cat invalid_fmt.out" && + grep "unable to unpack bank arg" invalid_fmt.out +' + +test_done From b69f2527995f07dac0fd1ca3cbcea1fff72e0c75 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 27 May 2021 08:07:02 -0700 Subject: [PATCH 14/14] make: add t1001-mf-priority to make check --- t/Makefile.am | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/t/Makefile.am b/t/Makefile.am index f0e5a3a2..322c8c3d 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -1,7 +1,8 @@ # This list is included in both TESTS and dist_check_SCRIPTS. TESTSCRIPTS = \ t0000-sharness.t \ - t1000-print-hierarchy.t + t1000-print-hierarchy.t \ + t1001-mf-priority.t dist_check_SCRIPTS = \ $(TESTSCRIPTS) \