From 02e0200547700920db686ba75cecefbb99fa50f5 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2022 11:13:19 -0700 Subject: [PATCH 01/13] testsuite: relax queue command error string checks Problem: t2240-queue-cmd.t checks for very specific error messages in some tests, which makes the test brittle when messages change. Relax the tests so that verbatim output is not required, just the substantive portion of the message. --- t/t2240-queue-cmd.t | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/t/t2240-queue-cmd.t b/t/t2240-queue-cmd.t index 797d4a782396..86c367884f2e 100755 --- a/t/t2240-queue-cmd.t +++ b/t/t2240-queue-cmd.t @@ -259,45 +259,32 @@ test_expect_success 'flux-queue: status allowed for guest' ' test_expect_success 'flux-queue: stop denied for guest' ' test_must_fail runas_guest flux queue stop 2>guest_stop.err && - cat <<-EOT >guest_alloc.exp && - flux-queue: alloc-admin: Request requires owner credentials - EOT - test_cmp guest_alloc.exp guest_stop.err + grep "requires owner credentials" guest_stop.err ' test_expect_success 'flux-queue: start denied for guest' ' test_must_fail runas_guest flux queue start 2>guest_start.err && - test_cmp guest_alloc.exp guest_start.err + grep "requires owner credentials" guest_start.err ' test_expect_success 'flux-queue: disable denied for guest' ' test_must_fail runas_guest flux queue disable foo 2>guest_dis.err && - cat <<-EOT >guest_submit.exp && - flux-queue: submit-admin: Request requires owner credentials - EOT - test_cmp guest_submit.exp guest_dis.err + grep "requires owner credentials" guest_dis.err ' test_expect_success 'flux-queue: enable denied for guest' ' test_must_fail runas_guest flux queue enable 2>guest_ena.err && - test_cmp guest_submit.exp guest_ena.err + grep "requires owner credentials" guest_ena.err ' test_expect_success 'flux-queue: drain denied for guest' ' test_must_fail runas_guest flux queue drain 2>guest_drain.err && - cat <<-EOT >guest_drain.exp && - flux-queue: drain: Request requires owner credentials - EOT - test_cmp guest_drain.exp guest_drain.err + grep "requires owner credentials" guest_drain.err ' test_expect_success 'flux-queue: idle denied for guest' ' test_must_fail runas_guest flux queue idle 2>guest_idle.err && - cat <<-EOT >guest_idle.exp && - flux-queue: idle: Request requires owner credentials - EOT - test_cmp guest_idle.exp guest_idle.err + grep "requires owner credentials" guest_idle.err ' - test_done From 94372ec875e5b4c72942c84399dd758a368d79de Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 07:00:08 -0700 Subject: [PATCH 02/13] testsuite: configure queues where needed Problem: some tests submit jobs with queues that are not present in the TOML configuration, but when the job manager becomes queue-aware, this will no longer be possible. Add [queues] configuration to tests where appropriate. --- t/t2260-job-list.t | 16 +++++++++++++++- t/t2800-jobs-cmd.t | 32 ++++++++++++++++++++++++-------- t/t2801-top-cmd.t | 10 +++++++++- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/t/t2260-job-list.t b/t/t2260-job-list.t index 1af25be026b3..8b84f576a6b3 100755 --- a/t/t2260-job-list.t +++ b/t/t2260-job-list.t @@ -6,7 +6,9 @@ test_description='Test flux job list services' . $(dirname $0)/sharness.sh -test_under_flux 4 job +mkdir -p conf.d + +test_under_flux 4 job -o,--config-path=$(pwd)/conf.d RPC=${FLUX_BUILD_DIR}/t/request/rpc listRPC="flux python ${SHARNESS_TEST_SRCDIR}/job-list/list-rpc.py" @@ -649,6 +651,13 @@ test_expect_success HAVE_JQ 'flux job list output no queue if queue not set' ' flux job list -s inactive | grep $jobid | jq -e ".queue == null" ' +test_expect_success 'reconfigure with one queue' ' + cat >conf.d/config.toml <<-EOT && + [queues.foo] + EOT + flux config reload +' + test_expect_success HAVE_JQ 'flux job list outputs queue' ' jobid=`flux mini submit --wait --queue=foo /bin/true | flux job id` && echo $jobid > jobqueue2.id && @@ -656,6 +665,11 @@ test_expect_success HAVE_JQ 'flux job list outputs queue' ' flux job list -s inactive | grep $jobid | jq -e ".queue == \"foo\"" ' +test_expect_success 'reconfigure with no queues' ' + cp /dev/null conf.d/config.toml && + flux config reload +' + test_expect_success 'reload the job-list module' ' flux module reload job-list ' diff --git a/t/t2800-jobs-cmd.t b/t/t2800-jobs-cmd.t index e0d64dfcaf05..db69b6d97fe2 100755 --- a/t/t2800-jobs-cmd.t +++ b/t/t2800-jobs-cmd.t @@ -4,7 +4,17 @@ test_description='Test flux jobs command' . $(dirname $0)/sharness.sh -test_under_flux 4 job +mkdir -p conf.d +cat >conf.d/config.toml <<-EOT +[policy] +jobspec.defaults.system.queue = "defaultqueue" + +[queues.defaultqueue] +[queues.queue1] +[queues.queue2] +EOT + +test_under_flux 4 job -o,--config-path=$(pwd)/conf.d RPC=${FLUX_BUILD_DIR}/t/request/rpc runpty="${SHARNESS_TEST_SRCDIR}/scripts/runpty.py --line-buffer -f asciicast" @@ -499,7 +509,7 @@ test_expect_success 'flux-jobs --format={name} works' ' ' # in job submissions above: completed jobs should be in queue1, running jobs -# in queue2, and the rest in no queue +# in queue2, and the rest in defaultqueue test_expect_success 'flux-jobs --format={queue} works' ' flux jobs --filter=completed -no "{queue}" > jobqueueCD.out && for i in `seq 1 $(state_count completed)`; do @@ -511,9 +521,9 @@ test_expect_success 'flux-jobs --format={queue} works' ' echo "queue2" >> jobqueueR.exp done && test_cmp jobqueueR.out jobqueueR.exp && - flux jobs --filter=failed -no "{queue},{queue:h}" > jobqueueF.out && + flux jobs --filter=failed -no "{queue}" > jobqueueF.out && for i in `seq 1 $(state_count failed)`; do - echo ",-" >> jobqueueF.exp + echo "defaultqueue" >> jobqueueF.exp done && test_cmp jobqueueF.out jobqueueF.exp ' @@ -1162,8 +1172,13 @@ ingest_module () flux module ${cmd} job-ingest $* } -test_expect_success 'reload job-ingest without validator' ' - ingest_module reload disable-validator +test_expect_success 'disable ingest preprocessing' ' + cat >conf.d/ingest.toml <<-EOT && + [ingest] + frobnicator.disable = true + validator.disable = true + EOT + flux config reload ' test_expect_success HAVE_JQ 'create illegal jobspec with empty command array' ' @@ -1188,8 +1203,9 @@ test_expect_success HAVE_JQ 'flux jobs works on job with illegal jobspec' ' test_cmp list_illegal_jobspec.out list_illegal_jobspec.exp ' -test_expect_success 'reload job-ingest with defaults' ' - ingest_module reload +test_expect_success 're-enable job-ingest preprocessing' ' + rm -f conf.d/ingest.toml && + flux config reload ' # we make R invalid by overwriting it in the KVS before job-list will diff --git a/t/t2801-top-cmd.t b/t/t2801-top-cmd.t index 576605ec0194..8812ea7e0615 100755 --- a/t/t2801-top-cmd.t +++ b/t/t2801-top-cmd.t @@ -4,7 +4,9 @@ test_description='Test flux top command' . $(dirname $0)/sharness.sh -test_under_flux 4 +mkdir -p conf.d + +test_under_flux 4 full -o,--config-path=$(pwd)/conf.d runpty="${SHARNESS_TEST_SRCDIR}/scripts/runpty.py" waitfile="${SHARNESS_TEST_SRCDIR}/scripts/waitfile.lua" @@ -128,6 +130,12 @@ test_expect_success NO_CHAIN_LINT 'flux-top does not exit on recursive failure' --input=recurse-fail.in flux top && grep -qi "error connecting to Flux" recurse-fail.log ' +test_expect_success 'configure a test queue' ' + cat >conf.d/config.toml <<-EOT && + [queues.testq] + EOT + flux config reload +' test_expect_success 'flux-top displays job queues when present' ' $runpty -f asciicast -o no-queue.log flux top --test-exit && grep -v QUEUE no-queue.log && From f627e1cdf88ab849633372d8c465318d745fbad1 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2022 13:05:14 -0700 Subject: [PATCH 03/13] flux-uptime: fix memory leaks Problem: several futures are created but not destroyed. Destroy futures. --- src/cmd/builtin/uptime.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cmd/builtin/uptime.c b/src/cmd/builtin/uptime.c index 71f8f8da3d99..b81f8349a58d 100644 --- a/src/cmd/builtin/uptime.c +++ b/src/cmd/builtin/uptime.c @@ -59,6 +59,7 @@ static bool sched_disabled (flux_t *h) "reason", "")) || flux_rpc_get_unpack (f, "{s:b}", "enable", &enable) < 0) log_err_exit ("Error fetching alloc status"); + flux_future_destroy (f); return enable ? false : true; } @@ -79,6 +80,7 @@ static bool submit_disabled (flux_t *h) "reason", "")) || flux_rpc_get_unpack (f, "{s:b}", "enable", &enable) < 0) log_err_exit ("Error fetching submit status"); + flux_future_destroy (f); return enable ? false : true; } @@ -153,6 +155,7 @@ static double attr_get_starttime (flux_t *h) d = strtod (s, &endptr); if (errno != 0 || *endptr != '\0') log_msg_exit ("Error parsing %s", name); + flux_future_destroy (f); return d; } From 06767c05985dfe1c428115fb9617543b08e67a92 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2022 10:34:41 -0700 Subject: [PATCH 04/13] flux-queue: drop little used short option Problem: several flux-queue subcommands accept -q,--queue=NAME, but others accept -q,--quiet, which could be come confusing. Drop -q as a short option for quiet. The short option is not used anywhere. --- src/cmd/flux-queue.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmd/flux-queue.c b/src/cmd/flux-queue.c index 481452495817..4fe57d5126bf 100644 --- a/src/cmd/flux-queue.c +++ b/src/cmd/flux-queue.c @@ -40,7 +40,7 @@ static struct optparse_option stop_opts[] = { { .name = "verbose", .key = 'v', .usage = "Display more detail about internal job manager state", }, - { .name = "quiet", .key = 'q', + { .name = "quiet", .usage = "Display only errors", }, OPTPARSE_TABLE_END @@ -50,7 +50,7 @@ static struct optparse_option start_opts[] = { { .name = "verbose", .key = 'v', .usage = "Display more detail about internal job manager state", }, - { .name = "quiet", .key = 'q', + { .name = "quiet", .usage = "Display only errors", }, OPTPARSE_TABLE_END @@ -74,7 +74,7 @@ static struct optparse_option idle_opts[] = { { .name = "timeout", .key = 't', .has_arg = 1, .arginfo = "DURATION", .usage = "timeout after DURATION", }, - { .name = "quiet", .key = 'q', .has_arg = 0, + { .name = "quiet", .has_arg = 0, .usage = "Only display pending job count if nonzero", }, OPTPARSE_TABLE_END From 0298d21de33f6b2cec1560556c216672d7426372 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 06:27:11 -0700 Subject: [PATCH 05/13] flux-queue: send output to stdout Problem: flux-queue sends non-error output to stderr. Stop using log_msg(), which uses stderr, to print non-error output. This means "flux-queue: " is dropped from those lines of output. Update tests. --- src/cmd/flux-queue.c | 27 ++++++------- t/issues/t3920-running-underflow.sh | 4 +- t/t2240-queue-cmd.t | 62 ++++++++++++++--------------- t/t2300-sched-simple.t | 4 +- 4 files changed, 48 insertions(+), 49 deletions(-) diff --git a/src/cmd/flux-queue.c b/src/cmd/flux-queue.c index 4fe57d5126bf..0e780e69e086 100644 --- a/src/cmd/flux-queue.c +++ b/src/cmd/flux-queue.c @@ -234,10 +234,10 @@ void submit_admin (flux_t *h, "reason", &reason) < 0) log_msg_exit ("submit-admin: %s", future_strerror (f, errno)); - log_msg ("Job submission is %s%s%s", - enable ? "enabled" : "disabled", - enable ? "" : ": ", - enable ? "" : reason); + printf ("Job submission is %s%s%s\n", + enable ? "enabled" : "disabled", + enable ? "" : ": ", + enable ? "" : reason); flux_future_destroy (f); } @@ -282,21 +282,20 @@ void alloc_admin (flux_t *h, &running) < 0) log_msg_exit ("alloc-admin: %s", future_strerror (f, errno)); if (!quiet) { - log_msg ("Scheduling is %s%s%s", - enable ? "enabled" : "disabled", - reason && strlen (reason) > 0 ? ": " : "", - reason ? reason : ""); + printf ("Scheduling is %s%s%s\n", + enable ? "enabled" : "disabled", + reason && strlen (reason) > 0 ? ": " : "", + reason ? reason : ""); } if (verbose) { - log_msg ("%d alloc requests queued", queue_length); - log_msg ("%d alloc requests pending to scheduler", alloc_pending); - log_msg ("%d free requests pending to scheduler", free_pending); - log_msg ("%d running jobs", running); + printf ("%d alloc requests queued\n", queue_length); + printf ("%d alloc requests pending to scheduler\n", alloc_pending); + printf ("%d free requests pending to scheduler\n", free_pending); + printf ("%d running jobs\n", running); } flux_future_destroy (f); } - int cmd_enable (optparse_t *p, int argc, char **argv) { flux_t *h; @@ -439,7 +438,7 @@ int cmd_idle (optparse_t *p, int argc, char **argv) log_msg_exit ("idle: %s", errno == ETIMEDOUT ? "timeout" : future_strerror (f, errno)); if (!optparse_hasopt (p, "quiet") || pending > 0) - log_msg ("%d pending jobs", pending); + printf ("%d pending jobs\n", pending); flux_future_destroy (f); flux_close (h); return (0); diff --git a/t/issues/t3920-running-underflow.sh b/t/issues/t3920-running-underflow.sh index 407e3b38a070..ebe0f1ed72ab 100755 --- a/t/issues/t3920-running-underflow.sh +++ b/t/issues/t3920-running-underflow.sh @@ -11,8 +11,8 @@ SHELL=/bin/sh flux start '\ && flux job wait-event $jobid depend \ && flux job cancel $jobid \ && flux job attach -vE $jobid \ -; flux queue status -v >t3920.output 2>&1' +; flux queue status -v >t3920.output' cat t3920.output -grep 'flux-queue: 0 running jobs' t3920.output +grep '0 running jobs' t3920.output diff --git a/t/t2240-queue-cmd.t b/t/t2240-queue-cmd.t index 86c367884f2e..1f0674b934ad 100755 --- a/t/t2240-queue-cmd.t +++ b/t/t2240-queue-cmd.t @@ -89,8 +89,8 @@ test_expect_success 'flux-queue: start with bad broker connection fails' ' ' test_expect_success 'flux-queue: start with extra free args fails' ' - test_must_fail flux queue start xyz 2>start_xargs.out && - grep Usage: start_xargs.out + test_must_fail flux queue start xyz 2>start_xargs.err && + grep Usage: start_xargs.err ' test_expect_success 'flux-queue: stop works' ' @@ -98,10 +98,10 @@ test_expect_success 'flux-queue: stop works' ' ' test_expect_success 'flux-queue: status reports reason for stop' ' - flux queue status 2>status.out && + flux queue status >status.out && cat <<-EOT >status.exp && - flux-queue: Job submission is enabled - flux-queue: Scheduling is disabled: my unique message + Job submission is enabled + Scheduling is disabled: my unique message EOT test_cmp status.exp status.out ' @@ -131,8 +131,8 @@ test_expect_success 'flux-queue: submit a job and make sure alloc sent' ' ' test_expect_success 'flux-queue: stop canceled alloc request' ' - flux queue stop -v 2>stop.err && - grep "flux-queue: 1 alloc requests pending to scheduler" stop.err + flux queue stop -v >stop.out && + grep "1 alloc requests pending to scheduler" stop.out ' test_expect_success 'flux-queue: start scheduling and cancel long job' ' @@ -162,12 +162,12 @@ wait_for_sched_offline() { test_expect_success 'flux-queue: queue says scheduling disabled' ' wait_for_sched_offline 10 && - flux queue status 2>sched_stat.err && + flux queue status >sched_stat.out && cat <<-EOT >sched_stat.exp && - flux-queue: Job submission is enabled - flux-queue: Scheduling is disabled: Scheduler is offline + Job submission is enabled + Scheduling is disabled: Scheduler is offline EOT - test_cmp sched_stat.exp sched_stat.err + test_cmp sched_stat.exp sched_stat.out ' test_expect_success 'flux-queue: queue contains 1 active job' ' @@ -180,12 +180,12 @@ test_expect_success 'flux-queue: load scheduler' ' ' test_expect_success 'flux-queue: queue says scheduling is enabled' ' - flux queue status 2>sched_stat2.err && + flux queue status >sched_stat2.out && cat <<-EOT >sched_stat2.exp && - flux-queue: Job submission is enabled - flux-queue: Scheduling is enabled + Job submission is enabled + Scheduling is enabled EOT - test_cmp sched_stat2.exp sched_stat2.err + test_cmp sched_stat2.exp sched_stat2.out ' test_expect_success 'flux-queue: job in queue ran' ' @@ -208,16 +208,16 @@ test_expect_success 'flux-queue: there are 3 active jobs' ' ' test_expect_success 'flux-queue: queue status -v shows expected counts' ' - flux queue status -v 2>stat.err && + flux queue status -v >stat.out && cat <<-EOT >stat.exp && - flux-queue: Job submission is enabled - flux-queue: Scheduling is enabled - flux-queue: 1 alloc requests queued - flux-queue: 1 alloc requests pending to scheduler - flux-queue: 0 free requests pending to scheduler - flux-queue: 1 running jobs + Job submission is enabled + Scheduling is enabled + 1 alloc requests queued + 1 alloc requests pending to scheduler + 0 free requests pending to scheduler + 1 running jobs EOT - test_cmp stat.exp stat.err + test_cmp stat.exp stat.out ' test_expect_success 'flux-queue: stop queue and cancel long job' ' @@ -230,16 +230,16 @@ test_expect_success 'flux-queue: queue becomes idle' ' ' test_expect_success 'flux-queue: queue status -v shows expected counts' ' - flux queue status -v 2>stat2.err && + flux queue status -v >stat2.out && cat <<-EOT >stat2.exp && - flux-queue: Job submission is enabled - flux-queue: Scheduling is disabled - flux-queue: 2 alloc requests queued - flux-queue: 0 alloc requests pending to scheduler - flux-queue: 0 free requests pending to scheduler - flux-queue: 0 running jobs + Job submission is enabled + Scheduling is disabled + 2 alloc requests queued + 0 alloc requests pending to scheduler + 0 free requests pending to scheduler + 0 running jobs EOT - test_cmp stat2.exp stat2.err + test_cmp stat2.exp stat2.out ' test_expect_success 'flux-queue: start queue and drain' ' diff --git a/t/t2300-sched-simple.t b/t/t2300-sched-simple.t index 66ce835194a1..245246e5a3fa 100755 --- a/t/t2300-sched-simple.t +++ b/t/t2300-sched-simple.t @@ -190,7 +190,7 @@ test_expect_success 'sched-simple: remove sched-simple and cancel jobs' ' flux job cancelall -f ' test_expect_success 'sched-simple: there are no outstanding sched requests' ' - flux queue status -v 2>queue_status.out && + flux queue status -v >queue_status.out && grep "0 alloc requests pending to scheduler" queue_status.out && grep "0 free requests pending to scheduler" queue_status.out ' @@ -257,7 +257,7 @@ test_expect_success 'sched-simple: remove sched-simple and cancel jobs' ' flux job cancelall -f ' test_expect_success 'sched-simple: there are no outstanding sched requests' ' - flux queue status -v 2>queue_status.out && + flux queue status -v >queue_status.out && grep "0 alloc requests pending to scheduler" queue_status.out && grep "0 free requests pending to scheduler" queue_status.out ' From a3ac0e114f0c049268c79def9fe1aaec8cc94070 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2022 06:50:19 -0700 Subject: [PATCH 06/13] job-manager: add queue class Problem: there is no way to list queues or enable/disable job submission on a queue basis. Add a new class to the job manager which provides new interfaces for enabling, disabling, listing, and querying status of queues. Note that these queues are not containers for jobs. Jobs are still enqueued in one "alloc queue" even when there are multiple named queues configured. The purpose of this class (at this time) is to capture the queue configuration and the administrative status to be be utilized by the job submit logic. A future commit will wire this into the job submit logic. --- src/modules/job-manager/Makefile.am | 2 + src/modules/job-manager/job-manager.c | 6 + src/modules/job-manager/job-manager.h | 1 + src/modules/job-manager/queue.c | 426 ++++++++++++++++++++++++++ src/modules/job-manager/queue.h | 28 ++ 5 files changed, 463 insertions(+) create mode 100644 src/modules/job-manager/queue.c create mode 100644 src/modules/job-manager/queue.h diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 273fba650627..2b58a42008f9 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -64,6 +64,8 @@ libjob_manager_la_SOURCES = \ getattr.c \ prioritize.h \ prioritize.c \ + queue.h \ + queue.c \ jobtap-internal.h \ jobtap.h \ jobtap.c \ diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index 60b7b6581f82..e204159088d9 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -30,6 +30,7 @@ #include "drain.h" #include "wait.h" #include "purge.h" +#include "queue.h" #include "annotate.h" #include "journal.h" #include "getattr.h" @@ -171,6 +172,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating purge context"); goto done; } + if (!(ctx.queue = queue_create (&ctx))) { + flux_log_error (h, "error creating queue context"); + goto done; + } if (!(ctx.event = event_ctx_create (&ctx))) { flux_log_error (h, "error creating event batcher"); goto done; @@ -232,6 +237,7 @@ int mod_main (flux_t *h, int argc, char **argv) rc = 0; done: flux_msg_handler_delvec (ctx.handlers); + queue_destroy (ctx.queue); purge_destroy (ctx.purge); journal_ctx_destroy (ctx.journal); annotate_ctx_destroy (ctx.annotate); diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index c9ba508e8683..b1d281e1780b 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -32,6 +32,7 @@ struct job_manager { struct annotate *annotate; struct journal *journal; struct purge *purge; + struct queue *queue; struct jobtap *jobtap; }; diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c new file mode 100644 index 000000000000..ae91635712a4 --- /dev/null +++ b/src/modules/job-manager/queue.c @@ -0,0 +1,426 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* queue.c - job queues + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libutil/errprintf.h" +#include "src/common/libutil/jpath.h" +#include "src/common/libczmqcontainers/czmq_containers.h" + +#include "job-manager.h" +#include "conf.h" +#include "queue.h" + +struct jobq { + char *name; + bool enable; // jobs may be submitted to this queue + char *reason; // reason if disabled +}; + +struct queue { + struct job_manager *ctx; + flux_msg_handler_t **handlers; + union { + struct jobq *anon; + zhashx_t *named; + }; + bool have_named_queues; +}; + +static void jobq_destroy (struct jobq *q) +{ + if (q) { + int saved_errno = errno; + free (q->name); + free (q->reason); + free (q); + errno = saved_errno; + } +} + +// zhashx_destructor_fn signature +static void jobq_destructor (void **item) +{ + if (item) { + jobq_destroy (*item); + *item = NULL; + } +} + +static struct jobq *jobq_create (const char *name) +{ + struct jobq *q; + + if (!(q = calloc (1, sizeof (*q)))) + return NULL; + if (name && !(q->name = strdup (name))) + goto error; + q->enable = true; + return q; +error: + jobq_destroy (q); + return NULL; +} + +static int jobq_enable (struct jobq *q, bool enable, const char *reason) +{ + if (enable) { + q->enable = true; + free (q->reason); + q->reason = NULL; + } + else { + char *cpy; + if (!(cpy = strdup (reason))) + return -1; + free (q->reason); + q->reason = cpy; + q->enable = false; + } + return 0; +} + +static int queue_enable_all (struct queue *queue, + bool enable, + const char *reason) +{ + struct jobq *q; + + if (queue->have_named_queues) { + q = zhashx_first (queue->named); + while (q) { + if (jobq_enable (q, enable, reason) < 0) + return -1; + q = zhashx_next (queue->named); + } + } + else { + if (jobq_enable (queue->anon, enable, reason) < 0) + return -1; + } + return 0; +} + +struct jobq *queue_lookup (struct queue *queue, + const char *name, + flux_error_t *error) +{ + if (name) { + struct jobq *q; + + if (!queue->have_named_queues + || !(q = zhashx_lookup (queue->named, name))) { + errprintf (error, "'%s' is not a valid queue", name); + return NULL; + } + return q; + } + else { + if (queue->have_named_queues) { + errprintf (error, "a named queue is required"); + return NULL; + } + return queue->anon; + } +} + +int queue_submit_check (struct queue *queue, + json_t *jobspec, + flux_error_t *error) +{ + struct jobq *q; + json_t *o; + const char *name = NULL; + + if ((o = jpath_get (jobspec, "attributes.system.queue"))) + name = json_string_value (o); + + if (!(q = queue_lookup (queue, name, error))) { + errno = EINVAL; + return -1; + } + if (!q->enable) { + errprintf (error, "job submission%s%s is disabled: %s", + name ? " to " : "", + name ? name : "", + q->reason); + errno = EINVAL; + return -1; + } + return 0; +} + +/* N.B. the broker will have already validated the basic queue configuration so + * we shouldn't need to produce detailed configuration errors for users here. + */ +static int queue_configure (const flux_conf_t *conf, + flux_error_t *error, + void *arg) +{ + struct queue *queue = arg; + json_t *queues; + + if (flux_conf_unpack (conf, NULL, "{s:o}", "queues", &queues) == 0 + && json_object_size (queues) > 0) { + const char *name; + json_t *value; + struct jobq *q; + zlistx_t *keys; + + /* destroy anon queue and create hash if necessary + */ + if (!queue->have_named_queues) { + queue->have_named_queues = true; + jobq_destroy (queue->anon); + if (!(queue->named = zhashx_new ())) + goto nomem; + zhashx_set_destructor (queue->named, jobq_destructor); + } + /* remove any queues that disappeared from config + */ + if (!(keys = zhashx_keys (queue->named))) + goto nomem; + name = zlistx_first (keys); + while (name) { + if (!json_object_get (queues, name)) + zhashx_delete (queue->named, name); + name = zlistx_next (keys); + } + zlistx_destroy (&keys); + /* add any new queues that appeared in config + */ + json_object_foreach (queues, name, value) { + if (!zhashx_lookup (queue->named, name)) { + if (!(q = jobq_create (name)) + || zhashx_insert (queue->named, name, q) < 0) { + jobq_destroy (q); + goto nomem; + } + } + } + } + else { + if (queue->have_named_queues) { + queue->have_named_queues = false; + zhashx_destroy (&queue->named); + if (!(queue->anon = jobq_create (NULL))) + goto nomem; + } + } + return 1; +nomem: + errprintf (error, "out of memory while processing queue configuration"); + errno = ENOMEM; + return -1; +} + +static void queue_list_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct queue *queue = arg; + struct jobq *q; + json_t *a = NULL;; + + if (flux_request_decode (msg, NULL, NULL) < 0) + goto error; + if (!(a = json_array ())) { + errno = ENOMEM; + goto error; + } + if (queue->have_named_queues) { + q = zhashx_first (queue->named); + while (q) { + json_t *o; + if (!(o = json_string (q->name)) + || json_array_append_new (a, o) < 0) { + json_decref (o); + errno = ENOMEM; + goto error; + } + q = zhashx_next (queue->named); + } + } + if (flux_respond_pack (h, msg, "{s:o}", "queues", a) < 0) + flux_log_error (h, "error responding to job-manager.queue-list"); + json_decref (a); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to job-manager.queue-list"); + json_decref (a); +} + +static void queue_status_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct queue *queue = arg; + flux_error_t error; + const char *errmsg = NULL; + const char *name = NULL; + struct jobq *q; + int rc; + + if (flux_request_unpack (msg, NULL, "{s?s}", "name", &name) < 0) + goto error; + if (!(q = queue_lookup (queue, name, &error))) { + errmsg = error.text; + errno = EINVAL; + goto error; + } + if (q->enable) + rc = flux_respond_pack (h, msg, "{s:b}", "enable", 1); + else { + rc = flux_respond_pack (h, + msg, + "{s:b s:s}", + "enable", 0, + "reason", q->reason); + } + if (rc < 0) + flux_log_error (h, "error responding to job-manager.queue-status"); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "error responding to job-manager.queue-status"); +} + +static void queue_admin_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct queue *queue = arg; + flux_error_t error; + const char *errmsg = NULL; + const char *name = NULL; + int enable; + const char *reason = NULL; + struct jobq *q; + int all; + + if (flux_request_unpack (msg, + NULL, + "{s?s s:b s?s s:b}", + "name", &name, + "enable", &enable, + "reason", &reason, + "all", &all) < 0) + goto error; + if (!enable && !reason) { + errmsg = "reason is required for disable"; + errno = EINVAL; + goto error; + } + if (!name) { + if (queue->have_named_queues && !all) { + errmsg = "Use --all to apply this command to all queues"; + errno = EINVAL; + goto error; + } + if (queue_enable_all (queue, enable, reason)) + goto error; + } + else { + if (!(q = queue_lookup (queue, name, &error))) { + errmsg = error.text; + errno = EINVAL; + goto error; + } + if (jobq_enable (q, enable, reason) < 0) + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to job-manager.queue-admin"); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "error responding to job-manager.queue-admin"); +} + +static const struct flux_msg_handler_spec htab[] = { + { + FLUX_MSGTYPE_REQUEST, + "job-manager.queue-list", + queue_list_cb, + FLUX_ROLE_USER + }, + { + FLUX_MSGTYPE_REQUEST, + "job-manager.queue-status", + queue_status_cb, + FLUX_ROLE_USER + }, + { + FLUX_MSGTYPE_REQUEST, + "job-manager.queue-admin", + queue_admin_cb, + 0, + }, + FLUX_MSGHANDLER_TABLE_END, +}; + +void queue_destroy (struct queue *queue) +{ + if (queue) { + int saved_errno = errno; + conf_unregister_callback (queue->ctx->conf, queue_configure); + flux_msg_handler_delvec (queue->handlers); + if (queue->have_named_queues) + zhashx_destroy (&queue->named); + else + jobq_destroy (queue->anon); + free (queue); + errno = saved_errno; + } +} + +struct queue *queue_create (struct job_manager *ctx) +{ + struct queue *queue; + flux_error_t error; + + if (!(queue = calloc (1, sizeof (*queue)))) + return NULL; + queue->ctx = ctx; + if (!(queue->anon = jobq_create (NULL))) + goto error; + if (flux_msg_handler_addvec (ctx->h, + htab, + queue, + &queue->handlers) < 0) + goto error; + if (conf_register_callback (ctx->conf, + &error, + queue_configure, + queue) < 0) { + flux_log (ctx->h, + LOG_ERR, + "error parsing queue config: %s", + error.text); + goto error; + } + return queue; +error: + queue_destroy (queue); + return NULL; +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/queue.h b/src/modules/job-manager/queue.h new file mode 100644 index 000000000000..39fd93585547 --- /dev/null +++ b/src/modules/job-manager/queue.h @@ -0,0 +1,28 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_QUEUE_H +#define _FLUX_JOB_MANAGER_QUEUE_H + +#include + +#include "job-manager.h" + +struct queue *queue_create (struct job_manager *ctx); +void queue_destroy (struct queue *queue); + +int queue_submit_check (struct queue *queue, + json_t *jobspec, + flux_error_t *error); + + +#endif /* ! _FLUX_JOB_MANAGER_QUEUE_H */ + +// vi:ts=4 sw=4 expandtab From f2d8cb2a5abdd27c49fa9a5a413f7ef0c63fac61 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 07:47:49 -0700 Subject: [PATCH 07/13] job-manager: integrate queue class with submit logic Problem: jobs are accepted when submitted with a named queue when queues are not configured. Call queue_submit_check() on each submitted job. The job is rejected if a requested queue is not configured, or if the queue is disabled. As part of the integration, the job-manager.submit-admin RPC is moved temporarily to queue.c. It will be removed once tools have been updated to use the new RPCs. Fixes #4440 --- src/modules/job-manager/queue.c | 76 ++++++++++++++++++++++++++++++++ src/modules/job-manager/submit.c | 70 +++-------------------------- 2 files changed, 82 insertions(+), 64 deletions(-) diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index ae91635712a4..6c6627a1e8c9 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -355,6 +355,76 @@ static void queue_admin_cb (flux_t *h, flux_log_error (h, "error responding to job-manager.queue-admin"); } +static void submit_admin_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct queue *queue = arg; + const char *error_prefix = "job submission is disabled: "; + const char *errmsg = NULL; + int enable; + int query_only; + const char *reason; + + if (flux_request_unpack (msg, + NULL, + "{s:b s:b s:s}", + "query_only", + &query_only, + "enable", + &enable, + "reason", + &reason) < 0) + goto error; + if (!query_only) { + if (flux_msg_authorize (msg, FLUX_USERID_UNKNOWN) < 0) { + errmsg = "Request requires owner credentials"; + goto error; + } + if (!enable) { + char *errmsg; + if (asprintf (&errmsg, "%s%s", error_prefix, reason) < 0) + goto error; + if (queue_enable_all (queue, false, errmsg) < 0) + goto error; + } + else { + if (queue_enable_all (queue, true, NULL) < 0) + goto error; + } + } + int any_enabled = 0; + struct jobq *q; + reason = NULL; + if (queue->have_named_queues) { + q = zhashx_first (queue->named); + while (q) { + if (q->enable) { + any_enabled = 1; + break; + } + if (!reason) + reason = q->reason; + q = zhashx_next (queue->named); + } + } + else { + any_enabled = queue->anon->enable ? 1 : 0; + reason = queue->anon->reason; + } + if (flux_respond_pack (h, + msg, + "{s:b s:s}", + "enable", + any_enabled, + "reason", + reason ? reason : "") < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, @@ -374,6 +444,12 @@ static const struct flux_msg_handler_spec htab[] = { queue_admin_cb, 0, }, + { + FLUX_MSGTYPE_REQUEST, + "job-manager.submit-admin", + submit_admin_cb, + FLUX_ROLE_USER, + }, FLUX_MSGHANDLER_TABLE_END, }; diff --git a/src/modules/job-manager/submit.c b/src/modules/job-manager/submit.c index 495592154db5..b26d8c5c8e73 100644 --- a/src/modules/job-manager/submit.c +++ b/src/modules/job-manager/submit.c @@ -24,14 +24,13 @@ #include "alloc.h" #include "event.h" #include "wait.h" +#include "queue.h" #include "jobtap-internal.h" #include "submit.h" struct submit { struct job_manager *ctx; - bool submit_disable; - char *disable_errmsg; flux_msg_handler_t **handlers; }; @@ -80,8 +79,13 @@ static int submit_job (struct job_manager *ctx, struct job *job, json_t *errors) { + flux_error_t e; char *error = NULL; + if (queue_submit_check (ctx->queue, job->jobspec_redacted, &e) < 0) { + set_errorf (errors, job->id, e.text); + return -1; + } if (zhashx_insert (ctx->active_jobs, &job->id, job) < 0) { set_errorf (errors, job->id, "hash insert failed"); return -1; @@ -166,11 +170,6 @@ static void submit_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s", __FUNCTION__); goto error; } - if (ctx->submit->submit_disable) { - errno = EINVAL; - errmsg = ctx->submit->disable_errmsg; - goto error; - } if (!(errors = json_array ())) { errno = ENOMEM; goto error; @@ -204,62 +203,11 @@ static void submit_cb (flux_t *h, flux_msg_handler_t *mh, json_decref (errors); } -static void submit_admin_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) -{ - struct job_manager *ctx = arg; - const char *error_prefix = "job submission is disabled: "; - const char *errmsg = NULL; - int enable; - int query_only; - const char *reason; - - if (flux_request_unpack (msg, - NULL, - "{s:b s:b s:s}", - "query_only", - &query_only, - "enable", - &enable, - "reason", - &reason) < 0) - goto error; - if (!query_only) { - if (flux_msg_authorize (msg, FLUX_USERID_UNKNOWN) < 0) { - errmsg = "Request requires owner credentials"; - goto error; - } - if (!enable) { - char *errmsg; - if (asprintf (&errmsg, "%s%s", error_prefix, reason) < 0) - goto error; - free (ctx->submit->disable_errmsg); - ctx->submit->disable_errmsg = errmsg; - } - ctx->submit->submit_disable = enable ? false : true; - } - if (ctx->submit->submit_disable) - reason = ctx->submit->disable_errmsg + strlen (error_prefix); - if (flux_respond_pack (h, - msg, - "{s:b s:s}", - "enable", - ctx->submit->submit_disable ? 0 : 1, - "reason", - ctx->submit->submit_disable ? reason : "") < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, errmsg) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - void submit_ctx_destroy (struct submit *submit) { if (submit) { int saved_errno = errno; flux_msg_handler_delvec (submit->handlers); - free (submit->disable_errmsg); free (submit); errno = saved_errno; } @@ -271,12 +219,6 @@ static const struct flux_msg_handler_spec htab[] = { submit_cb, 0 }, - { - FLUX_MSGTYPE_REQUEST, - "job-manager.submit-admin", - submit_admin_cb, - FLUX_ROLE_USER, - }, FLUX_MSGHANDLER_TABLE_END, }; From 40f609b7c30c395bb87952731a678c6d35d37c35 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 07:03:21 -0700 Subject: [PATCH 08/13] flux-uptime: use new queue-status RPC Problem: flux-uptime uses the deprecated job-manager.submit-admin RPC. Use the job-manager.queue-status RPC instead. Since the purpose of this section of code is to add a warning to the output when the queue is disabled, and not to provide comprehensive queue status, the warning is now added only when all queues are disabled. --- src/cmd/builtin/uptime.c | 51 ++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/src/cmd/builtin/uptime.c b/src/cmd/builtin/uptime.c index b81f8349a58d..3f08ab71f8aa 100644 --- a/src/cmd/builtin/uptime.c +++ b/src/cmd/builtin/uptime.c @@ -63,25 +63,52 @@ static bool sched_disabled (flux_t *h) return enable ? false : true; } +static bool queue_is_enabled (flux_t *h, const char *name) +{ + flux_future_t *f; + int enable; + const char *topic = "job-manager.queue-status"; + + if (name) + f = flux_rpc_pack (h, topic, 0, 0, "{s:s}", "name", name); + else + f = flux_rpc_pack (h, topic, 0, 0, "{}"); + if (!f || flux_rpc_get_unpack (f, "{s:b}", "enable", &enable) < 0) + log_err_exit ("Error fetching queue status: %s", + future_strerror (f, errno)); + flux_future_destroy (f); + return enable ? true : false; +} + /* Return true if job submission is disabled. + * If there are multiple queues, return true only if ALL queues are disabled. */ static bool submit_disabled (flux_t *h) { flux_future_t *f; - int enable; + bool disabled = true; + json_t *queues; + size_t index; + json_t *value; - if (!(f = flux_rpc_pack (h, - "job-manager.submit-admin", - 0, - 0, - "{s:b s:b s:s}", - "query_only", 1, - "enable", 0, - "reason", "")) - || flux_rpc_get_unpack (f, "{s:b}", "enable", &enable) < 0) - log_err_exit ("Error fetching submit status"); + f = flux_rpc (h, "job-manager.queue-list", NULL, 0, 0); + if (!f || flux_rpc_get_unpack (f, "{s:o}", "queues", &queues)) + log_msg_exit ("queue-list: %s", future_strerror (f, errno)); + if (json_array_size (queues) > 0) { + json_array_foreach (queues, index, value) { + if (queue_is_enabled (h, json_string_value (value))) { + disabled = false; + break; + } + } + } + else { + if (queue_is_enabled (h, NULL)) + disabled = false; + } flux_future_destroy (f); - return enable ? false : true; + + return disabled; } /* Each key in the drain object is an idset representing a group From 67b4f4afbfabc8c71a3a82dc4f68ca4663110694 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 07:03:52 -0700 Subject: [PATCH 09/13] flux-queue: support multiple queues Problem: there is no way to list available queues, nor enable/disable individual queues. Add a -q,--queue option to the enable, disable, and status subcommands. If the queue option is unspecified and multiple named queues are configured, all queues are targetted (--all confirmation required for enable, disable). This allows the commands to function similar to the way they did before when there is only the anonymous queue, which is still the common case. Since flux queue status now lists the enable/disable status of all queues when none are specified, this is now a way for a user to find out what queues are available on the system. Update rc1 to use "flux queue disable --all" when putting the instance in safe mode. Fixes #4620 --- etc/rc1 | 3 +- src/cmd/flux-queue.c | 162 ++++++++++++++++++++++++++++++++----------- 2 files changed, 125 insertions(+), 40 deletions(-) diff --git a/etc/rc1 b/etc/rc1 index a24e53bc2c3e..3b6853bbf3e1 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -70,7 +70,8 @@ if test $RANK -eq 0; then if test "$(backing_module)" != "none"; then if ! flux startlog --check --quiet; then flux queue stop - flux queue disable "Flux is in safe mode due to an incomplete shutdown." + flux queue disable --all \ + "Flux is in safe mode due to an incomplete shutdown." fi fi fi diff --git a/src/cmd/flux-queue.c b/src/cmd/flux-queue.c index 0e780e69e086..35b83c3b8d35 100644 --- a/src/cmd/flux-queue.c +++ b/src/cmd/flux-queue.c @@ -60,6 +60,29 @@ static struct optparse_option status_opts[] = { { .name = "verbose", .key = 'v', .usage = "Display more detail about internal job manager state", }, + { .name = "queue", .key = 'q', .has_arg = 1, .arginfo = "NAME", + .usage = "Specify queue to show (default all)", + }, + OPTPARSE_TABLE_END +}; + +static struct optparse_option enable_opts[] = { + { .name = "queue", .key = 'q', .has_arg = 1, .arginfo = "NAME", + .usage = "Specify queue to enable (default all)", + }, + { .name = "all", .key = 'a', .has_arg = 0, + .usage = "Force command to apply to all queues if none specified", + }, + OPTPARSE_TABLE_END +}; + +static struct optparse_option disable_opts[] = { + { .name = "queue", .key = 'q', .has_arg = 1, .arginfo = "NAME", + .usage = "Specify queue to disable (default all)", + }, + { .name = "all", .key = 'a', .has_arg = 0, + .usage = "Force command to apply to all queues if none specified", + }, OPTPARSE_TABLE_END }; @@ -86,14 +109,14 @@ static struct optparse_subcommand subcommands[] = { "Enable job submission", cmd_enable, 0, - NULL, + enable_opts, }, { "disable", "[OPTIONS] [message ...]", "Disable job submission", cmd_disable, 0, - NULL, + disable_opts, }, { "start", "[OPTIONS]", @@ -209,38 +232,6 @@ static char *parse_arg_message (char **argv, const char *name) return argz; } -void submit_admin (flux_t *h, - int query_only, - int enable, - const char *reason) -{ - flux_future_t *f; - if (!(f = flux_rpc_pack (h, - "job-manager.submit-admin", - FLUX_NODEID_ANY, - 0, - "{s:b s:b s:s}", - "query_only", - query_only, - "enable", - enable, - "reason", - reason ? reason : ""))) - log_err_exit ("error sending submit-admin request"); - if (flux_rpc_get_unpack (f, - "{s:b s:s}", - "enable", - &enable, - "reason", - &reason) < 0) - log_msg_exit ("submit-admin: %s", future_strerror (f, errno)); - printf ("Job submission is %s%s%s\n", - enable ? "enabled" : "disabled", - enable ? "" : ": ", - enable ? "" : reason); - flux_future_destroy (f); -} - void alloc_admin (flux_t *h, bool verbose, bool quiet, @@ -296,10 +287,101 @@ void alloc_admin (flux_t *h, flux_future_destroy (f); } +static void add_string_if_set (json_t *o, const char *key, const char *val) +{ + if (val) { + json_t *str = json_string (val); + if (!str || json_object_set_new (o, key, str) < 0) { + json_decref (str); + log_msg_exit ("out of memory"); + } + } +} + +static void queue_admin (flux_t *h, + const char *name, + bool enable, + const char *reason, + bool all) +{ + json_t *payload; + flux_future_t *f; + + if (!(payload = json_pack ("{s:b s:b}", + "enable", enable ? 1 : 0, + "all", all ? 1 : 0))) + log_msg_exit ("out of memory"); + add_string_if_set (payload, "name", name); + add_string_if_set (payload, "reason", reason); + f = flux_rpc_pack (h, "job-manager.queue-admin", 0, 0, "O", payload); + if (!f || flux_rpc_get (f, NULL) < 0) + log_msg_exit ("%s", future_strerror (f, errno)); + flux_future_destroy (f); + json_decref (payload); +} + +static void queue_status_one (flux_t *h, const char *name) +{ + json_t *payload; + flux_future_t *f; + int enable; + const char *reason; + + if (!(payload = json_object ())) + log_msg_exit ("out of memory"); + add_string_if_set (payload, "name", name); + f = flux_rpc_pack (h, "job-manager.queue-status", 0, 0, "O", payload); + if (!f || flux_rpc_get_unpack (f, + "{s:b s?s}", + "enable", &enable, + "reason", &reason)) + log_msg_exit ("%s", future_strerror (f, errno)); + if (enable) { + printf ("%s%sJob submission is enabled\n", + name ? name : "", + name ? ": " : ""); + } + else { + printf ("%s%sJob submission is disabled: %s\n", + name ? name : "", + name ? ": " : "", reason); + } + flux_future_destroy (f); + json_decref (payload); +} + +static void queue_status (flux_t *h, const char *name) +{ + if (!name) { + json_t *queues; + size_t index; + json_t *value; + flux_future_t *f; + + f = flux_rpc (h, "job-manager.queue-list", NULL, 0, 0); + if (!f || flux_rpc_get_unpack (f, + "{s:o}", + "queues", &queues)) + log_msg_exit ("%s", future_strerror (f, errno)); + if (json_array_size (queues) > 0) { + json_array_foreach (queues, index, value) { + queue_status_one (h, json_string_value (value)); + } + } + else + queue_status_one (h, NULL); + flux_future_destroy (f); + } + else + queue_status_one (h, name); +} + int cmd_enable (optparse_t *p, int argc, char **argv) { flux_t *h; int optindex = optparse_option_index (p); + const char *name = optparse_get_str (p, "queue", NULL); + bool all = optparse_hasopt (p, "all"); if (argc - optindex > 0) { optparse_print_usage (p); @@ -307,7 +389,7 @@ int cmd_enable (optparse_t *p, int argc, char **argv) } if (!(h = flux_open (NULL, 0))) log_err_exit ("flux_open"); - submit_admin (h, 0, 1, NULL); + queue_admin (h, name, true, NULL, all); flux_close (h); return (0); } @@ -316,15 +398,15 @@ int cmd_disable (optparse_t *p, int argc, char **argv) { flux_t *h; int optindex = optparse_option_index (p); + const char *name = optparse_get_str (p, "queue", NULL); + bool all = optparse_hasopt (p, "all"); char *reason = NULL; if (argc - optindex > 0) reason = parse_arg_message (argv + optindex, "reason"); - else - log_msg_exit ("submit: reason is required for disable"); if (!(h = flux_open (NULL, 0))) log_err_exit ("flux_open"); - submit_admin (h, 0, 0, reason); + queue_admin (h, name, false, reason, all); flux_close (h); free (reason); return (0); @@ -376,6 +458,7 @@ int cmd_status (optparse_t *p, int argc, char **argv) { flux_t *h; int optindex = optparse_option_index (p); + const char *name = optparse_get_str (p, "queue", NULL); if (argc - optindex > 0) { optparse_print_usage (p); @@ -383,7 +466,8 @@ int cmd_status (optparse_t *p, int argc, char **argv) } if (!(h = flux_open (NULL, 0))) log_err_exit ("flux_open"); - submit_admin (h, 1, 0, NULL); + queue_status (h, name); + alloc_admin (h, optparse_hasopt (p, "verbose"), false, From 1490c93efdb092a9ce3ad41758d42cf6ac4e9269 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 2 Oct 2022 08:03:26 -0700 Subject: [PATCH 10/13] job-manager: drop job-manager.submit-admin RPC Problem: the job-manager.submit-admin RPC no longer has any users. Drop deprecated RPC. --- src/modules/job-manager/queue.c | 76 --------------------------------- 1 file changed, 76 deletions(-) diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index 6c6627a1e8c9..ae91635712a4 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -355,76 +355,6 @@ static void queue_admin_cb (flux_t *h, flux_log_error (h, "error responding to job-manager.queue-admin"); } -static void submit_admin_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) -{ - struct queue *queue = arg; - const char *error_prefix = "job submission is disabled: "; - const char *errmsg = NULL; - int enable; - int query_only; - const char *reason; - - if (flux_request_unpack (msg, - NULL, - "{s:b s:b s:s}", - "query_only", - &query_only, - "enable", - &enable, - "reason", - &reason) < 0) - goto error; - if (!query_only) { - if (flux_msg_authorize (msg, FLUX_USERID_UNKNOWN) < 0) { - errmsg = "Request requires owner credentials"; - goto error; - } - if (!enable) { - char *errmsg; - if (asprintf (&errmsg, "%s%s", error_prefix, reason) < 0) - goto error; - if (queue_enable_all (queue, false, errmsg) < 0) - goto error; - } - else { - if (queue_enable_all (queue, true, NULL) < 0) - goto error; - } - } - int any_enabled = 0; - struct jobq *q; - reason = NULL; - if (queue->have_named_queues) { - q = zhashx_first (queue->named); - while (q) { - if (q->enable) { - any_enabled = 1; - break; - } - if (!reason) - reason = q->reason; - q = zhashx_next (queue->named); - } - } - else { - any_enabled = queue->anon->enable ? 1 : 0; - reason = queue->anon->reason; - } - if (flux_respond_pack (h, - msg, - "{s:b s:s}", - "enable", - any_enabled, - "reason", - reason ? reason : "") < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, errmsg) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, @@ -444,12 +374,6 @@ static const struct flux_msg_handler_spec htab[] = { queue_admin_cb, 0, }, - { - FLUX_MSGTYPE_REQUEST, - "job-manager.submit-admin", - submit_admin_cb, - FLUX_ROLE_USER, - }, FLUX_MSGHANDLER_TABLE_END, }; From f1f2232aa021c24e0866317f5e37617c6f36e067 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2022 15:21:45 -0700 Subject: [PATCH 11/13] flux-uptime(1): update queue disable blurb Problem: flux-uptime(1) now reports "queue disabled" only if all queues are disabled, but this is not documented. Change the notice description to include this fact. --- doc/man1/flux-uptime.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/man1/flux-uptime.rst b/doc/man1/flux-uptime.rst index 9d0f320814fc..0c97578a2111 100644 --- a/doc/man1/flux-uptime.rst +++ b/doc/man1/flux-uptime.rst @@ -41,7 +41,7 @@ current Flux instance, on one or two lines: - The number of offline nodes, if greater than zero. A node is offline if its broker is not connected to the instance overlay network. -- A notice if job submission is disabled. +- A notice if job submission is disabled on all queues. - A notice if scheduling is disabled. From d342034623d7d731175fdd35a712aaf7dff8a346 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2022 15:32:14 -0700 Subject: [PATCH 12/13] flux-queue(1): add options for multiple queues Problem: flux-queue(1) does not document the command's limited support for multiple named queues. Describe the possibility of multiple queues. Document the -q,--queue=NAME option for enable, disable, and status. Reorder subcommands to group the multiqueue-enabled ones together. Document -a,--all. --- doc/man1/flux-queue.rst | 52 +++++++++++++++++++++++++++-------------- doc/test/spell.en.pws | 1 + 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/doc/man1/flux-queue.rst b/doc/man1/flux-queue.rst index b0f061b235ba..524ec621edc2 100644 --- a/doc/man1/flux-queue.rst +++ b/doc/man1/flux-queue.rst @@ -8,50 +8,59 @@ flux-queue(1) SYNOPSIS ======== -**flux** **queue** **disable** *reason...* +**flux** **queue** **disable** [*--queue=NAME*] *reason...* -**flux** **queue** **enable** +**flux** **queue** **enable** [*--queue=NAME*] -**flux** **queue** **stop** [*--verbose*] [*--quiet*] +**flux** **queue** **status** [*--queue=NAME*] -**flux** **queue** **start** [*--verbose*] [*--quiet*] +**flux** **queue** **stop** -**flux** **queue** **status** [*--verbose*] +**flux** **queue** **start** **flux** **queue** **drain** [*--timeout=DURATION*] -**flux** **queue** **idle** [*--quiet*] [*--timeout=DURATION*] +**flux** **queue** **idle** [*--timeout=DURATION*] DESCRIPTION =========== -The ``flux-queue`` command controls the Flux job queue. -It has the following subcommands: +The ``flux-queue`` command controls Flux job queues. + +Normally, Flux has a single anonymous queue, but when queues are configured, +all queues are named. At this time, only the *disable*, *enable*, and +*status* subcommands can be applied to a single, named queue. The rest affect +all queues. + +``flux-queue`` has the following subcommands: disable - Prevent jobs from being submitted to the queue, with `reason` that is - shown to submitting users. + Prevent jobs from being submitted to the queue, with a reason that is + shown to submitting users. If multiple queues are configured, either the + *--queue* or the *--all* option is required. enable - Allow jobs to be submitted to the queue. + Allow jobs to be submitted to the queue. If multiple queues are configured, + either the *--queue* or the *--all* option is required. + +status + Report the current queue status. If multiple queues are configured, + all queues are shown unless one is specified with *--queue*. stop - Stop allocating resources to jobs. Pending jobs remain in the queue, + Stop allocating resources to jobs. Pending jobs remain enqueued, and running jobs continue to run, but no new jobs are allocated resources. start Start allocating resources to jobs. -status - Report the current queue status. - drain - Block until the queue becomes empty. It is sometimes useful to run after + Block until all queues become empty. It is sometimes useful to run after ``flux queue disable``, to wait until the system is quiescent and can be taken down for maintenance. idle - Block until the queue becomes `idle` (no jobs in RUN or CLEANUP state, + Block until all queues become `idle` (no jobs in RUN or CLEANUP state, and no outstanding alloc requests to the scheduler). It may be useful to run after ``flux queue stop`` to wait until the scheduler and execution system are quiescent before maintenance involving them. @@ -62,12 +71,19 @@ OPTIONS **-h, --help** Summarize available options. +**-q, --queue**\ =\ *NAME* + Select a queue by name. + **-v, --verbose** Be chatty. -**-q, --quiet** +**--quiet** Be taciturn. +**-a, --all** + Use with *enable* or *disable* subcommands to signify intent to affect + all queues, when queues are configured but *--queue* is missing. + **--timeout** \ =\ *FSD* Limit the time that ``drain`` or ``idle`` will block. diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index ac8901797ff8..4e650fbebb7c 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -647,3 +647,4 @@ myformat xdg XDG yaml +enqueued From 5594e937cbd31c0d95075c763285ea124619a5b9 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 30 Sep 2022 07:27:44 -0700 Subject: [PATCH 13/13] testsuite: cover flux-queue command changes Problem: the addition of named queue support to flux-queue has no test coverage. Add some new tests. --- t/t2240-queue-cmd.t | 79 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/t/t2240-queue-cmd.t b/t/t2240-queue-cmd.t index 1f0674b934ad..86c4b665cb0a 100755 --- a/t/t2240-queue-cmd.t +++ b/t/t2240-queue-cmd.t @@ -3,7 +3,9 @@ test_description='Test flux queue command' . $(dirname $0)/sharness.sh -test_under_flux 1 +mkdir -p conf.d + +test_under_flux 1 full -o,--config-path=$(pwd)/conf.d flux setattr log-stderr-level 1 @@ -287,4 +289,79 @@ test_expect_success 'flux-queue: idle denied for guest' ' grep "requires owner credentials" guest_idle.err ' +# +# Test support for named queues +# + +test_expect_success 'flux queue status --queue fails with no queues' ' + test_must_fail flux queue status --queue=batch +' +test_expect_success 'flux queue enable --queue fails with no queues' ' + test_must_fail flux queue enable --queue=batch +' +test_expect_success 'ensure instance is drained' ' + flux queue drain && + flux queue status -v +' +test_expect_success 'configure batch,debug queues' ' + cat >conf.d/config.toml <<-EOT && + [queues.batch] + [queues.debug] + EOT + flux config reload +' +test_expect_success 'jobs may be submitted to either queue' ' + flux mini submit -q batch /bin/true && + flux mini submit -q debug /bin/true +' +test_expect_success 'flux-queue status reports all queues' ' + flux queue status >mqstatus.out && + grep batch mqstatus.out && + grep debug mqstatus.out +' +test_expect_success 'flux-queue status can show one queue' ' + flux queue status -q debug >mqstatus_debug.out && + test_must_fail grep batch mqstatus_debug.out +' +test_expect_success 'flux-queue disable without --queue or --all fails' ' + test_must_fail flux queue disable test reasons +' +test_expect_success 'flux-queue disable --all affects all queues' ' + flux queue disable --all test reasons && + flux queue status >mqstatus_dis.out && + test $(grep -c "submission is disabled" mqstatus_dis.out) -eq 2 +' +test_expect_success 'jobs may not be submitted to either queue' ' + test_must_fail flux mini submit -q batch /bin/true && + test_must_fail flux mini submit -q debug /bin/true +' +test_expect_success 'flux-queue enable without --queue or --all fails' ' + test_must_fail flux queue enable +' +test_expect_success 'flux-queue enable --all affects all queues' ' + flux queue enable -a && + flux queue status >mqstatus_ena.out && + test $(grep -c "submission is enabled" mqstatus_ena.out) -eq 2 +' +test_expect_success 'flux-queue disable can do one queue' ' + flux queue disable -q batch nobatch && + flux queue status >mqstatus_batchdis.out && + test $(grep -c "submission is enabled" mqstatus_batchdis.out) -eq 1 && + test_must_fail flux mini submit -q batch /bin/true && + flux mini submit -q debug /bin/true +' +test_expect_success 'flux-queue enable can do one queue' ' + flux queue enable -q batch && + flux queue status >mqstatus_batchena.out && + test $(grep -c "submission is enabled" mqstatus_batchena.out) -eq 2 && + flux mini submit -q batch /bin/true && + flux mini submit -q debug /bin/true +' +test_expect_success 'flux-queue enable fails on unknown queue' ' + test_must_fail flux queue enable -q notaqueue +' +test_expect_success 'flux-queue status fails on unknown queue' ' + test_must_fail flux queue status -q notaqueue +' + test_done