diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bd6595b9..71783268 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -106,3 +106,8 @@ jobs: - name: coverage report if: success() && matrix.coverage run: DOCKER_REPO= bash <(curl -s https://codecov.io/bash) + + - name: annotate errors + if: failure() || cancelled() + env: ${{matrix.env}} + run: src/test/checks-annotate.sh diff --git a/configure.ac b/configure.ac index 8a15d2f9..0b7fffd7 100644 --- a/configure.ac +++ b/configure.ac @@ -107,6 +107,9 @@ AC_SUBST(acct_db_path) AC_CONFIG_MACRO_DIR([config]) X_AC_EXPAND_INSTALL_DIRS +fluxplugin_ldflags="-avoid-version -export-symbols-regex '^flux_plugin_init\$\$' --disable-static -shared -export-dynamic" +AC_SUBST(fluxplugin_ldflags) + AC_CONFIG_FILES([Makefile src/Makefile src/common/Makefile @@ -125,6 +128,7 @@ AC_CONFIG_FILES([Makefile src/bindings/python/fluxacct/accounting/Makefile src/bindings/python/fluxacct/accounting/__init__.py src/cmd/Makefile + src/plugins/Makefile t/Makefile ]) AC_OUTPUT diff --git a/src/Makefile.am b/src/Makefile.am index c98e1813..7df3050c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1 +1 @@ -SUBDIRS = common cmd fairness bindings +SUBDIRS = common cmd fairness bindings plugins diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index 47d80b69..c3bcdc6c 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -87,15 +87,16 @@ def create_db( conn.execute( """ CREATE TABLE IF NOT EXISTS association_table ( - creation_time bigint(20) NOT NULL, - mod_time bigint(20) DEFAULT 0 NOT NULL, - deleted tinyint(4) DEFAULT 0 NOT NULL, - username tinytext NOT NULL, - admin_level smallint(6) DEFAULT 1 NOT NULL, - bank tinytext NOT NULL, - shares int(11) DEFAULT 1 NOT NULL, - job_usage real DEFAULT 0.0 NOT NULL, - fairshare real DEFAULT 0.0 NOT NULL, + creation_time bigint(20) NOT NULL, + mod_time bigint(20) DEFAULT 0 NOT NULL, + deleted tinyint(4) DEFAULT 0 NOT NULL, + username tinytext NOT NULL, + userid int(11) DEFAULT 65534 NOT NULL, + admin_level smallint(6) DEFAULT 1 NOT NULL, + bank tinytext NOT NULL, + shares int(11) DEFAULT 1 NOT NULL, + job_usage real DEFAULT 0.0 NOT NULL, + fairshare real DEFAULT 0.5 NOT NULL, PRIMARY KEY (username, bank) );""" ) @@ -122,6 +123,7 @@ def create_db( """ CREATE TABLE IF NOT EXISTS job_usage_factor_table ( username tinytext NOT NULL, + userid int(11) NOT NULL, bank tinytext NOT NULL, last_job_timestamp real DEFAULT 0.0, PRIMARY KEY (username, bank) diff --git a/src/bindings/python/fluxacct/accounting/job_archive_interface.py b/src/bindings/python/fluxacct/accounting/job_archive_interface.py index 088a5c39..686ddb24 100755 --- a/src/bindings/python/fluxacct/accounting/job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/job_archive_interface.py @@ -267,7 +267,7 @@ def fetch_usg_bins(acct_conn, user=None, bank=None): ), ) - for val in dataframe.iloc[0].values[3:]: + for val in dataframe.iloc[0].values[4:]: if isinstance(val, float): past_usage_factors.append(val) diff --git a/src/bindings/python/fluxacct/accounting/test/test_create_db.py b/src/bindings/python/fluxacct/accounting/test/test_create_db.py index 9b8bdff2..11dbee37 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_create_db.py +++ b/src/bindings/python/fluxacct/accounting/test/test_create_db.py @@ -55,10 +55,10 @@ def test_02_create_association(self): conn.execute( """ INSERT INTO association_table - (creation_time, mod_time, deleted, username, admin_level, + (creation_time, mod_time, deleted, username, userid, admin_level, bank, shares) VALUES - (0, 0, 0, "test user", 1, "test account", 0) + (0, 0, 0, "test user", 1234, 1, "test account", 0) """ ) cursor = conn.cursor() diff --git a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py index c7e12603..b61941e9 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py @@ -73,10 +73,10 @@ def setUpClass(self): b.add_bank(acct_conn, bank="D", parent_bank="B", shares=1) # add users - u.add_user(acct_conn, username="1001", bank="C") - u.add_user(acct_conn, username="1002", bank="C") - u.add_user(acct_conn, username="1003", bank="D") - u.add_user(acct_conn, username="1004", bank="D") + u.add_user(acct_conn, username="1001", uid="1001", bank="C") + u.add_user(acct_conn, username="1002", uid="1002", bank="C") + u.add_user(acct_conn, username="1003", uid="1003", bank="D") + u.add_user(acct_conn, username="1004", uid="1004", bank="D") jobid = 100 interval = 0 # add to job timestamps to diversify job-archive records diff --git a/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py b/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py index 25b4dd3f..f34f6b9e 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/test/test_user_subcommands.py @@ -32,6 +32,7 @@ def test_01_add_valid_user(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -44,12 +45,13 @@ def test_01_add_valid_user(self): self.assertEqual(num_rows_assoc_table, num_rows_job_usage_factor_table) - # adding a user with the same primary key (user_name, account) should + # adding a user with the same primary key (username, account) should # return an IntegrityError def test_02_add_duplicate_primary_key(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -57,6 +59,7 @@ def test_02_add_duplicate_primary_key(self): u.add_user( acct_conn, username="fluxuser", + uid="1234", admin_level="1", bank="acct", shares="10", @@ -70,6 +73,7 @@ def test_03_add_duplicate_user(self): u.add_user( acct_conn, username="dup_user", + uid="5678", admin_level="1", bank="acct", shares="10", @@ -77,6 +81,7 @@ def test_03_add_duplicate_user(self): u.add_user( acct_conn, username="dup_user", + uid="5678", admin_level="1", bank="other_acct", shares="10", diff --git a/src/bindings/python/fluxacct/accounting/user_subcommands.py b/src/bindings/python/fluxacct/accounting/user_subcommands.py index 362b5ed5..47d7535b 100755 --- a/src/bindings/python/fluxacct/accounting/user_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/user_subcommands.py @@ -11,6 +11,7 @@ ############################################################### import sqlite3 import time +import pwd import pandas as pd @@ -30,7 +31,32 @@ def view_user(conn, user): print(e_database_error) -def add_user(conn, username, bank, admin_level=1, shares=1): +def get_uid(username): + try: + return pwd.getpwnam(username).pw_uid + except KeyError: + return str(username) + + +def add_user( + conn, + username, + bank, + uid=65534, + admin_level=1, + shares=1, +): + + # get uid of user + fetched_uid = get_uid(username) + + try: + if isinstance(fetched_uid, int): + uid = fetched_uid + else: + raise KeyError + except KeyError as key_error: + print(key_error) try: # insert the user values into association_table @@ -41,17 +67,19 @@ def add_user(conn, username, bank, admin_level=1, shares=1): mod_time, deleted, username, + userid, admin_level, bank, shares ) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( int(time.time()), int(time.time()), 0, username, + uid, admin_level, bank, shares, @@ -64,12 +92,14 @@ def add_user(conn, username, bank, admin_level=1, shares=1): """ INSERT INTO job_usage_factor_table ( username, + userid, bank ) - VALUES (?, ?) + VALUES (?, ?, ?) """, ( username, + uid, bank, ), ) diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index b4f7f493..ab59ad1d 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -52,6 +52,11 @@ def add_add_user_arg(subparsers): help="username", metavar="USERNAME", ) + subparser_add_user.add_argument( + "--userid", + help="userid", + metavar="USERID", + ) subparser_add_user.add_argument( "--admin-level", help="admin level", @@ -295,6 +300,7 @@ def select_accounting_function(args, conn, output_file, parser): conn, args.username, args.bank, + args.userid, args.admin_level, args.shares, args.max_jobs, diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am new file mode 100644 index 00000000..9b1bfba7 --- /dev/null +++ b/src/plugins/Makefile.am @@ -0,0 +1,9 @@ +AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS) + +AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) + +AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared + +lib_LTLIBRARIES = mf_priority.la +mf_priority_la_SOURCES = mf_priority.cpp +mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/bulk_update.py b/src/plugins/bulk_update.py new file mode 100755 index 00000000..1ebe86b3 --- /dev/null +++ b/src/plugins/bulk_update.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import flux +import argparse +import sys +import os +import sqlite3 + +import fluxacct.accounting + + +def set_db_loc(args): + path = args.path if args.path else fluxacct.accounting.db_path + + return path + + +def est_sqlite_conn(path): + # try to open database file; will exit with -1 if database file not found + if not os.path.isfile(path): + print(f"Database file does not exist: {path}", file=sys.stderr) + sys.exit(1) + + db_uri = "file:" + path + "?mode=rw" + try: + conn = sqlite3.connect(db_uri, uri=True) + # set foreign keys constraint + conn.execute("PRAGMA foreign_keys = 1") + except sqlite3.OperationalError: + print(f"Unable to open database file: {db_uri}", file=sys.stderr) + sys.exit(1) + + return conn + + +def bulk_update(path): + conn = est_sqlite_conn(path) + cur = conn.cursor() + + # fetch all rows from association_table (will print out tuples) + for row in cur.execute("SELECT userid, bank, fairshare FROM association_table"): + # create a JSON payload with the results of the query + data = {"userid": str(row[0]), "bank": str(row[1]), "fairshare": str(row[2])} + + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + + +def main(): + parser = argparse.ArgumentParser( + description=""" + Description: Send a bulk update of user information from a + flux-accounting database to the multi-factor priority plugin. + """ + ) + + parser.add_argument( + "-p", "--path", dest="path", help="specify location of database file" + ) + args = parser.parse_args() + + path = set_db_loc(args) + + bulk_update(path) + + +if __name__ == "__main__": + main() diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp new file mode 100644 index 00000000..db8bbbe0 --- /dev/null +++ b/src/plugins/mf_priority.cpp @@ -0,0 +1,230 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* mycpppriority.cpp - custom basic job priority plugin + * + */ + +extern "C" { +#if HAVE_CONFIG_H +#include "config.h" +#endif +} +#include +#include +#include +#include +#include +#include +#include +#include + +std::map> users; + +/****************************************************************************** + * * + * Helper Functions * + * * + *****************************************************************************/ + +/* + * Calculate a user's job priority using the following factors: + * + * fairshare: the ratio between the amount of resources allocated vs. resources + * consumed. + * urgency: a user-controlled factor to prioritize their own jobs. + */ +int64_t priority_calculation (flux_plugin_t *p, + flux_plugin_arg_t *args, + int userid, + char *bank, + int urgency) +{ + double fshare_factor = 0.0, priority = 0.0; + int fshare_weight; + + std::map>::iterator it; + std::map::iterator inner_it; + + fshare_weight = 100000; + + if (urgency == FLUX_JOB_URGENCY_HOLD) + return FLUX_JOB_PRIORITY_MIN; + + if (urgency == FLUX_JOB_URGENCY_EXPEDITE) + return FLUX_JOB_PRIORITY_MAX; + + // search element in map of maps by key + it = users.find (userid); + + // search for correct fshare value using passed-in bank; otherwise, use + // a default bank + if (bank != NULL) { + inner_it = it->second.find (bank); + fshare_factor = inner_it->second; + } + else { + inner_it = it->second.find ("default"); + fshare_factor = inner_it->second; + } + + priority = (fshare_weight * fshare_factor) + (urgency - 16); + + return abs (round (priority)); +} + + +/****************************************************************************** + * * + * Callbacks * + * * + *****************************************************************************/ + +/* + * Unpack a payload from an external bulk update service and place it in the + * multimap datastructure. + */ +static void rec_update_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + char *uid, *fshare, *bank; + + if (flux_request_unpack (msg, NULL, "{s:s, s:s, s:s}", + "userid", &uid, + "bank", &bank, + "fairshare", &fshare) < 0) { + flux_log_error (h, "failed to unpack custom_priority.trigger msg"); + goto error; + } + + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "flux_respond"); + + // if the user being added to the does not yet have any entries in the map, + // treat their first bank as the "default" bank + if (users.count (std::atoi (uid)) == 0) + users[std::atoi (uid)]["default"] = std::stod (fshare); + + users[std::atoi (uid)][bank] = std::stod (fshare); + + return; +error: + flux_respond_error (h, msg, errno, flux_msg_last_error (msg)); +} + + +/* + * Unpack the urgency and userid from a submitted job and call calc_priority (), + * which will return a new job priority to be packed. + */ +static int priority_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int urgency, userid; + char *bank = NULL; + int64_t priority; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s:i, s{s{s{s?s}}}}", + "urgency", &urgency, + "userid", &userid, + "jobspec", "attributes", "system", + "bank", &bank) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + + priority = priority_calculation (p, args, userid, bank, urgency); + + if (flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:I}", + "priority", + priority) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_pack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + return 0; +} + + +/* + * Look up the userid of the submitted job in the multimap; if user is not found + * in the map, reject the job saying the user wasn't found in the + * flux-accounting database. + */ +static int validate_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int userid; + char *bank = NULL; + std::map>::iterator it; + std::map::iterator inner_it; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s{s{s{s?s}}}}", + "userid", &userid, + "jobspec", "attributes", "system", + "bank", &bank) < 0) { + return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); + } + + // searching element in std::map by key + it = users.find (userid); + if (it == users.end ()) + return flux_jobtap_reject_job (p, args, + "user not found in flux-accounting DB"); + + if (bank != NULL) { + inner_it = it->second.find (std::string (bank)); + if (inner_it == it->second.end ()) + return flux_jobtap_reject_job (p, args, + "user does not belong to specified bank"); + } + + return 0; +} + + +static const struct flux_plugin_handler tab[] = { + { "job.validate", validate_cb, NULL }, + { "job.state.priority", priority_cb, NULL }, + { "job.priority.get", priority_cb, NULL }, + { 0 }, +}; + + +extern "C" int flux_plugin_init (flux_plugin_t *p) +{ + if (flux_plugin_register (p, "mf_priority", tab) < 0 + || flux_jobtap_service_register (p, "rec_update", rec_update_cb, p) < 0) + return -1; + return 0; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/test/checks-annotate.sh b/src/test/checks-annotate.sh new file mode 100755 index 00000000..6a169117 --- /dev/null +++ b/src/test/checks-annotate.sh @@ -0,0 +1,101 @@ +#!/bin/bash +# +# Post-process testsuite logs and outputs after a failure +# +# Uses GH Workflow commands for GH Actions +# +error() { + printf "::error::$@\n" +} +catfile() { + if test -f $1; then + printf "::group::$1\n" + cat $1 + printf "::endgroup::\n" + fi +} +catfile_error() { + error "Found $1" + catfile $1 +} +annotate_test_log() { + # + # Look through test logfiles for various failure indicators and + # emit an annotation '::error::' to the logfile if found: + # + local test=$1 + + # Emit an annotation for each failed test ('not ok') + grep 'not ok' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for TAP ERROR lines: + grep '^ERROR: ' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for chain-lint errors: + grep '^error: bug in the test script' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done + + # Emit an annotation for anything that looks like an ASan error: + sed -n 's/==[0-9][0-9]*==ERROR: //p' ${test}.log | while read line; do + printf "::error file=${test}.t::%s\n" "${line}" + done +} + +# +# Check all testsuite *.trs files and check for results that +# were not 'SKIP' or 'PASS': +# +logfile=/tmp/check-errors.$$ +cat /dev/null >$logfile + +errors=0 +total=0 +for trs in $(find . -name *.trs); do + : $((total++)) + result=$(sed -n 's/^.*global-test-result: *//p' ${trs}) + if test "$result" != "PASS" -a "$result" != "SKIP"; then + testbase=${trs//.trs} + annotate_test_log $testbase >> $logfile + catfile ${testbase}.output >> $logfile + catfile ${testbase}.log >> $logfile + : $((errors++)) + fi +done +if test $errors -gt 0; then + printf "::warning::" +fi +printf "Found ${errors} errors from ${total} tests in testsuite\n" +cat $logfile +rm $logfile + +# +# Find and emit all *.asan.* files from test: +# +export -f catfile_error +export -f catfile +export -f error +find . -name *.asan.* | xargs -i bash -c 'catfile_error {}' + + +# +# Check for any expected tests that were not run: +# +ls -1 t/*.t | sort >/tmp/expected +ls -1 t/*.trs | sed 's/rs$//' | sort >/tmp/actual +comm -23 /tmp/expected /tmp/actual > missing +if test -s missing; then + error "Detected $(wc -l missing) missing tests:" + for f in $(cat missing); do + printf "$f\n" + file=${f//.t} + test -f ${file}.log && catfile ${file}.log + test -f ${file}.output && catfile ${file}.output + done +else + printf "No missing test runs detected\n" +fi diff --git a/src/test/docker/bionic/Dockerfile b/src/test/docker/bionic/Dockerfile index cf53ae92..d780e53d 100644 --- a/src/test/docker/bionic/Dockerfile +++ b/src/test/docker/bionic/Dockerfile @@ -1,4 +1,4 @@ -FROM fluxrm/flux-core:bionic +FROM fluxrm/flux-core:bionic-v0.28.0 ARG USER=flux ARG UID=1000 diff --git a/src/test/docker/centos8/Dockerfile b/src/test/docker/centos8/Dockerfile index a15dd5a4..f4e0f2af 100644 --- a/src/test/docker/centos8/Dockerfile +++ b/src/test/docker/centos8/Dockerfile @@ -1,4 +1,4 @@ -FROM fluxrm/flux-core:centos8 +FROM fluxrm/flux-core:centos8-v0.28.0 ARG USER=flux ARG UID=1000 diff --git a/t/Makefile.am b/t/Makefile.am index f0e5a3a2..322c8c3d 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -1,7 +1,8 @@ # This list is included in both TESTS and dist_check_SCRIPTS. TESTSCRIPTS = \ t0000-sharness.t \ - t1000-print-hierarchy.t + t1000-print-hierarchy.t \ + t1001-mf-priority.t dist_check_SCRIPTS = \ $(TESTSCRIPTS) \ diff --git a/t/rc/rc1-job b/t/rc/rc1-job new file mode 100755 index 00000000..075193e0 --- /dev/null +++ b/t/rc/rc1-job @@ -0,0 +1,73 @@ +#!/bin/bash -e + +idset_from_count() { + if test $1 -eq 1; then + echo "0" + else + echo "0-$(($1 - 1))" + fi +} + +set_fake_resources() { + cores=${1} + ranklist=$(idset_from_count $(flux getattr size)) + corelist=$(idset_from_count ${cores}) + R=$(flux R encode -r${ranklist} -c${corelist}) + echo Setting fake resource.R="$R" >&2 + flux kvs put resource.R="$R" +} + +RANK=$(flux getattr rank) + +# Usage: modload {all|} modname [args ...] +modload() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module load $* + fi +} + + +modload 0 content-sqlite +modload all kvs +modload all kvs-watch + +modload 0 job-manager + +modload all job-ingest +modload all job-info +modload 0 job-list +modload all barrier +modload 0 heartbeat + +if test $RANK -eq 0; then + # Set fake resources for testing + set_fake_resources ${TEST_UNDER_FLUX_CORES_PER_RANK:-2} +fi +modload all resource noverify + +if [ "${TEST_UNDER_FLUX_NO_JOB_EXEC}" != "y" ] +then + modload 0 job-exec +fi + +# mirror sched-simple default of limited=8 +if [ "${TEST_UNDER_FLUX_SCHED_SIMPLE_MODE}x" != "x" ] +then + mode=${TEST_UNDER_FLUX_SCHED_SIMPLE_MODE} +else + mode="limited=8" +fi + +modload 0 sched-simple mode=${mode} +#--setbit 0x2 enables creation of reason_pending field +if [ $RANK -eq 0 ] +then + flux module debug --setbit 0x2 sched-simple +fi + +test $RANK -ne 0 || flux admin cleanup-push <<-EOT + flux queue stop + flux job cancelall -f --states RUN + flux queue idle +EOT diff --git a/t/rc/rc1-kvs b/t/rc/rc1-kvs new file mode 100755 index 00000000..d26dbdfc --- /dev/null +++ b/t/rc/rc1-kvs @@ -0,0 +1,16 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modload {all|} modname [args ...] +modload() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module load $* + fi +} + +modload 0 content-sqlite +modload all kvs +modload all kvs-watch +modload 0 heartbeat diff --git a/t/rc/rc3-job b/t/rc/rc3-job new file mode 100755 index 00000000..ed019ace --- /dev/null +++ b/t/rc/rc3-job @@ -0,0 +1,31 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modrm {all|} modname +modrm() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module remove -f $* + fi +} + +if [ "${TEST_UNDER_FLUX_NO_EXEC}" != "y" ] +then + modrm 0 job-exec +fi +modrm 0 heartbeat +modrm 0 sched-simple +modrm all resource +modrm 0 job-list +modrm all job-info +modrm 0 job-manager +modrm all barrier +modrm all kvs-watch +modrm all job-ingest + +modrm all kvs + +flux content flush + +modrm 0 content-sqlite diff --git a/t/rc/rc3-kvs b/t/rc/rc3-kvs new file mode 100755 index 00000000..77625805 --- /dev/null +++ b/t/rc/rc3-kvs @@ -0,0 +1,17 @@ +#!/bin/bash -e + +RANK=$(flux getattr rank) + +# Usage: modrm {all|} modname +modrm() { + local where=$1; shift + if test "$where" = "all" || test $where -eq $RANK; then + flux module remove -f $* + fi +} +modrm 0 heartbeat +modrm all kvs-watch +modrm all kvs + +flux content flush +modrm 0 content-sqlite diff --git a/t/sharness.d/flux-sharness.sh b/t/sharness.d/flux-sharness.sh index ddf2273f..d40d991a 100644 --- a/t/sharness.d/flux-sharness.sh +++ b/t/sharness.d/flux-sharness.sh @@ -123,7 +123,7 @@ test_under_flux() { logopts="-o -Slog-filename=${log_file},-Slog-forward-level=7" TEST_UNDER_FLUX_ACTIVE=t \ TERM=${ORIGINAL_TERM} \ - exec flux start --bootstrap=selfpmi --size=${size} \ + exec flux start --test-size=${size} \ ${RC1_PATH+-o -Sbroker.rc1_path=${RC1_PATH}} \ ${RC3_PATH+-o -Sbroker.rc3_path=${RC3_PATH}} \ ${logopts} \ diff --git a/t/t1001-mf-priority.t b/t/t1001-mf-priority.t new file mode 100755 index 00000000..250cb7fd --- /dev/null +++ b/t/t1001-mf-priority.t @@ -0,0 +1,127 @@ +#!/bin/bash + +test_description='Test multi-factor priority plugin' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +# BULK_UPDATE=${FLUX_BUILD_DIR}/t/scripts/send_fake_payloads.py + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'try to submit a job when user does not exist in DB' ' + test_must_fail flux mini submit -n1 hostname > failure.out 2>&1 && + test_debug "cat failure.out" && + grep "user not found in flux-accounting DB" failure.out +' + +test_expect_success 'send an empty payload to make sure unpack fails' ' + cat <<-EOF >bad_payload.py && + import flux + + #create a JSON payload + flux.Flux().rpc("job-manager.mf_priority.rec_update", {}).get() + EOF + test_must_fail flux python bad_payload.py && + flux dmesg | grep "failed to unpack custom_priority.trigger msg: Protocol error" +' + +test_expect_success 'create fake_payload.py' ' + cat <<-EOF >fake_payload.py + import flux + import pwd + import getpass + + username = getpass.getuser() + userid = pwd.getpwnam(username).pw_uid + # create a JSON payload + data = {"userid": str(userid), "bank": "account3", "fairshare": "0.45321"} + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + data = {"userid": str(userid), "bank": "account2", "fairshare": "0.11345"} + flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get() + EOF +' + +test_expect_success 'update plugin with sample test data' ' + flux python fake_payload.py +' + +test_expect_success 'submit a job with default urgency' ' + jobid=$(flux mini submit --setattr=system.bank=account3 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job1.test && + cat <<-EOF >job1.expected && + 45321 + EOF + test_cmp job1.expected job1.test +' + +test_expect_success 'submit a job with custom urgency' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=15 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job2.test && + cat <<-EOF >job2.expected && + 45320 + EOF + test_cmp job2.expected job2.test +' + +test_expect_success 'submit a job with urgency of 0' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=0 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job3.test && + cat <<-EOF >job3.expected && + 0 + EOF + test_cmp job3.expected job3.test && + flux job cancel $jobid +' + +test_expect_success 'submit a job with urgency of 31' ' + jobid=$(flux mini submit --setattr=system.bank=account3 --urgency=31 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job4.test && + cat <<-EOF >job4.expected && + 4294967295 + EOF + test_cmp job4.expected job4.test +' + +test_expect_success 'submit a job with other bank' ' + jobid=$(flux mini submit --setattr=system.bank=account2 -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job5.test && + cat <<-EOF >job5.expected && + 11345 + EOF + test_cmp job5.expected job5.test +' + +test_expect_success 'submit a job using default bank' ' + jobid=$(flux mini submit -n1 hostname) && + flux job wait-event -f json $jobid priority | jq '.context.priority' > job6.test && + cat <<-EOF >job6.expected && + 45321 + EOF + test_cmp job6.expected job6.test +' + +test_expect_success 'submit a job using a bank the user does not belong to' ' + test_must_fail flux mini submit --setattr=system.bank=account1 -n1 hostname > bad_bank.out 2>&1 && + test_debug "cat bad_bank.out" && + grep "user does not belong to specified bank" bad_bank.out +' + +test_expect_success 'reject job when invalid bank format is passed in' ' + test_must_fail flux mini submit --setattr=system.bank=1 -n1 hostname > invalid_fmt.out 2>&1 && + test_debug "cat invalid_fmt.out" && + grep "unable to unpack bank arg" invalid_fmt.out +' + +test_done