From 84037026287fc4a6dc8e94972a08920ed4af3394 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 10 Jun 2024 11:26:18 -0700 Subject: [PATCH 01/12] libsubprocess: incorporate bulk-exec from job-exec Problem: the bulk-exec API in job-exec is useful elsewhere, but it is local to the job-exec module. Move it into the libsubprocess directory where it can be more easily reused within flux-core (it is not added to the public API). Update job-exec. --- src/common/libsubprocess/Makefile.am | 15 +++++++++++-- .../libsubprocess}/bulk-exec.c | 0 .../libsubprocess}/bulk-exec.h | 1 + .../libsubprocess}/test/bulk-exec.c | 0 src/modules/Makefile.am | 1 - src/modules/job-exec/Makefile.am | 21 +------------------ src/modules/job-exec/exec.c | 2 +- 7 files changed, 16 insertions(+), 24 deletions(-) rename src/{modules/job-exec => common/libsubprocess}/bulk-exec.c (100%) rename src/{modules/job-exec => common/libsubprocess}/bulk-exec.h (99%) rename src/{modules/job-exec => common/libsubprocess}/test/bulk-exec.c (100%) diff --git a/src/common/libsubprocess/Makefile.am b/src/common/libsubprocess/Makefile.am index 3e731d3b8962..0b9c1bef35dd 100644 --- a/src/common/libsubprocess/Makefile.am +++ b/src/common/libsubprocess/Makefile.am @@ -41,7 +41,9 @@ libsubprocess_la_SOURCES = \ fbuf.h \ fbuf.c \ fbuf_watcher.h \ - fbuf_watcher.c + fbuf_watcher.c \ + bulk-exec.h \ + bulk-exec.c fluxcoreinclude_HEADERS = \ command.h \ @@ -63,7 +65,8 @@ check_PROGRAMS = \ test_echo \ test_multi_echo \ test_fork_sleep \ - test_fdcopy + test_fdcopy \ + bulk-exec check_LTLIBRARIES = test/libutil.la @@ -91,6 +94,14 @@ test_libutil_la_SOURCES = \ test/rcmdsrv.h \ test/rcmdsrv.c +bulk_exec_SOURCES = test/bulk-exec.c +bulk_exec_CPPFLAGS = $(test_cppflags) +bulk_exec_LDADD = \ + $(test_ldadd) \ + $(top_builddir)/src/common/libflux-optparse.la + +bulk_exec_LDFLAGS = $(test_ldflags) + test_command_t_SOURCES = test/command.c test_command_t_CPPFLAGS = $(test_cppflags) test_command_t_LDADD = $(test_ldadd) diff --git a/src/modules/job-exec/bulk-exec.c b/src/common/libsubprocess/bulk-exec.c similarity index 100% rename from src/modules/job-exec/bulk-exec.c rename to src/common/libsubprocess/bulk-exec.c diff --git a/src/modules/job-exec/bulk-exec.h b/src/common/libsubprocess/bulk-exec.h similarity index 99% rename from src/modules/job-exec/bulk-exec.h rename to src/common/libsubprocess/bulk-exec.h index 0442a789bd3e..63fcfee380f7 100644 --- a/src/modules/job-exec/bulk-exec.h +++ b/src/common/libsubprocess/bulk-exec.h @@ -14,6 +14,7 @@ #define HAVE_JOB_EXEC_BULK_EXEC_H 1 #include +#include struct bulk_exec; diff --git a/src/modules/job-exec/test/bulk-exec.c b/src/common/libsubprocess/test/bulk-exec.c similarity index 100% rename from src/modules/job-exec/test/bulk-exec.c rename to src/common/libsubprocess/test/bulk-exec.c diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 3dd5a3628ceb..eabed0b50a9e 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -152,7 +152,6 @@ job_archive_la_LDFLAGS = $(fluxmod_ldflags) -module job_exec_la_SOURCES = job_exec_la_LIBADD = \ $(builddir)/job-exec/libjob-exec.la \ - $(builddir)/job-exec/libbulk-exec.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ diff --git a/src/modules/job-exec/Makefile.am b/src/modules/job-exec/Makefile.am index 51222e74d684..b07827ebc22b 100644 --- a/src/modules/job-exec/Makefile.am +++ b/src/modules/job-exec/Makefile.am @@ -13,11 +13,7 @@ AM_CPPFLAGS = \ $(JANSSON_CFLAGS) noinst_LTLIBRARIES = \ - libjob-exec.la \ - libbulk-exec.la - -noinst_PROGRAMS = \ - bulk-exec + libjob-exec.la libjob_exec_la_SOURCES = \ job-exec.h \ @@ -31,21 +27,6 @@ libjob_exec_la_SOURCES = \ testexec.c \ exec.c -libbulk_exec_la_SOURCES = \ - bulk-exec.h \ - bulk-exec.c - -bulk_exec_SOURCES = \ - test/bulk-exec.c - -bulk_exec_LDADD = \ - libbulk-exec.la \ - $(top_builddir)/src/common/libflux-internal.la \ - $(top_builddir)/src/common/libflux-core.la \ - $(top_builddir)/src/common/libflux-idset.la \ - $(top_builddir)/src/common/libflux-optparse.la \ - $(top_builddir)/src/common/libutil/libutil.la - test_ldadd = \ $(builddir)/libjob-exec.la \ $(top_builddir)/src/common/libtap/libtap.la \ diff --git a/src/modules/job-exec/exec.c b/src/modules/job-exec/exec.c index 53f38e4301ac..afd7d3959d83 100644 --- a/src/modules/job-exec/exec.c +++ b/src/modules/job-exec/exec.c @@ -39,10 +39,10 @@ #include "ccan/str/str.h" #include "src/common/libutil/errprintf.h" #include "src/common/libutil/errno_safe.h" +#include "src/common/libsubprocess/bulk-exec.h" #include "job-exec.h" #include "exec_config.h" -#include "bulk-exec.h" #include "rset.h" /* Numeric severity used for a non-fatal, critical job exception: From 96ba443bc2d4378661f25300073e380bee9ed12a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 19 Mar 2024 10:29:33 -0700 Subject: [PATCH 02/12] testsuite: don't expect debug.free-request event Problem: free requests may not take place in the context of the job once housekeeping is in place, but t2212-job-manager-plugins.t uses the debug.free-request event as an indication that the job debug flag could be set. Use debug.alloc-request in the test instead. --- t/t2212-job-manager-plugins.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t2212-job-manager-plugins.t b/t/t2212-job-manager-plugins.t index 1d520d9a9f9a..fc073721f97b 100755 --- a/t/t2212-job-manager-plugins.t +++ b/t/t2212-job-manager-plugins.t @@ -253,7 +253,7 @@ test_expect_success 'job-manager: load jobtap_api test plugin' ' test_expect_success 'job-manager: test that job flags can be set' ' id=$(flux submit \ --setattr=system.depend.set_flag=debug hostname) && - flux job wait-event -vt 20 $id debug.free-request && + flux job wait-event -vt 20 $id debug.alloc-request && flux job wait-event -vt 20 $id clean ' From 3361ae25d4562789cffa7b5a0436228cf3b693c7 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 21 Mar 2024 10:39:57 -0700 Subject: [PATCH 03/12] job-manager: add housekeeping subsystem Problem: jobs get stuck in CLEANUP state while long epilog scripts run, causing sadness and idling resources. Introduce a new type of epilog script called "housekeeping" that runs after the job. Instead of freeing resources directly to the scheduler, jobs free resources to housekeeping, post their free event, and may reach INACTIVE state. Meanwhile, housekeeping can run a script on the allocated resources and return the resources to the scheduler when complete. The resources are still allocated to the job as far as the scheduler is concerned while housekeeping runs. However since the job has transitioned to INACTIVE, the flux-accounting plugin will decrement the running job count for the user and stop billing the user for the resources. 'flux resource list' utility shows the resources as allocated. By default, resources are released to the scheduler only after all ranks complete housekeeping, as before. However, if configured, resources can be freed to the scheduler immediately as they complete housekeeping on each execution target, or a timer can be started on completion of the first target, and when the timer expires, all the targets that have completed thus far are freed in one go. Following that, resources are freed to the scheduler immediately as they complete. This works with sched-simple without changes, with the exception that the hello protocol does not currently support partial release so, as noted in the code, housekeeping and a new job could overlap when the scheduler is reloaded on a live system. Some RFC 27 work is needed to resolve ths. The Fluxion scheduler does not currently support partial release (flux-framework/flux-sched#1151). But as discussed over there, the combination of receiving an R fragment and a jobid in the free request should be sufficient to get that working. --- src/modules/Makefile.am | 1 + src/modules/job-manager/Makefile.am | 3 + src/modules/job-manager/alloc.c | 24 +- src/modules/job-manager/alloc.h | 6 +- src/modules/job-manager/event.c | 12 +- src/modules/job-manager/housekeeping.c | 860 +++++++++++++++++++++++++ src/modules/job-manager/housekeeping.h | 45 ++ src/modules/job-manager/job-manager.c | 17 +- src/modules/job-manager/job-manager.h | 1 + 9 files changed, 944 insertions(+), 25 deletions(-) create mode 100644 src/modules/job-manager/housekeeping.c create mode 100644 src/modules/job-manager/housekeeping.h diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index eabed0b50a9e..e9b8829fc64f 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -219,6 +219,7 @@ job_manager_la_SOURCES = job_manager_la_LIBADD = \ $(builddir)/job-manager/libjob-manager.la \ $(top_builddir)/src/common/libjob/libjob.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-optparse.la \ diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 535a47e1e9a0..7e8f4e85ecd8 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -44,6 +44,8 @@ libjob_manager_la_SOURCES = \ kill.c \ alloc.h \ alloc.c \ + housekeeping.h \ + housekeeping.c \ start.h \ start.c \ list.h \ @@ -126,6 +128,7 @@ TESTS = \ test_ldadd = \ libjob-manager.la \ $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libflux-core.la \ diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 8c7170fcb854..58cbe0c12706 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -36,6 +36,7 @@ #include "annotate.h" #include "raise.h" #include "queue.h" +#include "housekeeping.h" struct alloc { struct job_manager *ctx; @@ -99,8 +100,7 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum) } } -/* Send sched.free request for job. - * Update flags. +/* Send sched.free request. */ int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R) { @@ -343,6 +343,8 @@ static void hello_cb (flux_t *h, } job = zhashx_next (ctx->active_jobs); } + if (housekeeping_hello_respond (ctx->housekeeping, msg) < 0) + goto error; if (flux_respond_error (h, msg, ENODATA, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); return; @@ -504,20 +506,11 @@ static void check_cb (flux_reactor_t *r, NULL); } -/* called from event_job_action() FLUX_JOB_STATE_CLEANUP */ -int alloc_send_free_request (struct alloc *alloc, struct job *job) +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id) { - if (job->state != FLUX_JOB_STATE_CLEANUP) - return -1; if (alloc->scheduler_is_online) { - if (free_request (alloc, job->id, job->R_redacted) < 0) + if (free_request (alloc, id, R) < 0) return -1; - if ((job->flags & FLUX_JOB_DEBUG)) - (void)event_job_post_pack (alloc->ctx->event, - job, - "debug.free-request", - 0, - NULL); } return 0; } @@ -691,7 +684,8 @@ static void resource_status_cb (flux_t *h, } job = zhashx_first (alloc->ctx->active_jobs); while (job) { - if (job->has_resources && job->R_redacted && !job->alloc_bypass) { + if ((job->has_resources && !job->free_posted) + && job->R_redacted && !job->alloc_bypass) { struct rlist *rl2; json_error_t jerror; @@ -711,6 +705,8 @@ static void resource_status_cb (flux_t *h, } job = zhashx_next (alloc->ctx->active_jobs); } + if (housekeeping_stat_append (ctx->housekeeping, rl, &error) < 0) + goto error; if (!(R = rlist_to_R (rl))) { errprintf (&error, "error converting rlist to JSON"); goto error; diff --git a/src/modules/job-manager/alloc.h b/src/modules/job-manager/alloc.h index 8a086c04cb98..ae35044e69ad 100644 --- a/src/modules/job-manager/alloc.h +++ b/src/modules/job-manager/alloc.h @@ -11,6 +11,7 @@ #ifndef _FLUX_JOB_MANAGER_ALLOC_H #define _FLUX_JOB_MANAGER_ALLOC_H +#include #include #include "job.h" @@ -47,10 +48,9 @@ int alloc_queue_count (struct alloc *alloc); */ int alloc_pending_count (struct alloc *alloc); -/* Call from CLEANUP state to release resources. - * This function is a no-op if job->free_pending is set. +/* Release resources back to the scheduler. */ -int alloc_send_free_request (struct alloc *alloc, struct job *job); +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id); /* List pending jobs */ diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index a0ba2c4dba7f..dbdda81863db 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -48,6 +48,7 @@ #include "ccan/str/str.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "drain.h" #include "journal.h" @@ -320,17 +321,18 @@ int event_job_action (struct event *event, struct job *job) /* N.B. start_pending indicates that the start request is still * expecting responses. The final response is the 'release' * response with final=true. Thus once the flag is clear, - * it is safe to release all resources to the scheduler. + * it is safe for the job to release its resources to housekeeping. */ if (job->has_resources && !job_event_is_queued (job, "epilog-start") && !job->perilog_active && !job->start_pending && !job->free_posted) { - if (!job->alloc_bypass) { - if (alloc_send_free_request (ctx->alloc, job) < 0) - return -1; - } + if (housekeeping_start (ctx->housekeeping, + job->R_redacted, + job->id, + job->userid) < 0) + return -1; if (event_job_post_pack (ctx->event, job, "free", 0, NULL) < 0) return -1; job->free_posted = 1; diff --git a/src/modules/job-manager/housekeeping.c b/src/modules/job-manager/housekeeping.c new file mode 100644 index 000000000000..ef1a248a4bdf --- /dev/null +++ b/src/modules/job-manager/housekeeping.c @@ -0,0 +1,860 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* housekeeping - clean resources prior to release to the scheduler + * + * Purpose: + * Resources are released by jobs to housekeeping. Housekeeping runs + * an epilog-like script, then releases resources to the scheduler. + * Unlike the job manager epilog, housekeeping runs after the job, which + * is allowed to exit CLEANUP when resources are handed over to housekeeping. + * The scheduler still thinks resources are allocated to the job. + * + * Configuration: + * [job-manager.housekeeping] + * #command = ["command", "arg1", "arg2", ...] + * use-systemd-unit = true + * release-after = "FSD" + * + * Partial release: + * The 'release-after' config key enables partial release of resources. + * - If unset, resources for a given job are not released until all exec + * targets have completed housekeeping. + * - If set to "0", resources are released as each exec target completes. + * - If set to a nozero duration, a timer starts when the first exec target + * for a given job completes. When the timer expires, resources for all + * the completed exec targets are released. Following that, resources + * are released as each target completes. + * + * Script credentials: + * The housekeeping script runs as the instance owner (e.g. "flux"). + * On a real system, "command" is configured to "imp run housekeeping", + * and the IMP is configured to launch the flux-housekeeping systemd + * service as root. + * + * Script environment: + * FLUX_JOB_ID - the job whose resources are running housekeeping + * FLUX_JOB_USERID - the UID of the job's owner + * FLUX_URI - the URI of the local flux broker + * The IMP must be configured to explicitly allow FLUX_* to pass through. + * + * Script error handling: + * If housekeeping fails on a node or set of nodes, this is logged to + * the flux circular buffer at LOG_ERR. + * Stdout is logged at LOG_INFO and stderr at LOG_ERR. + * + * Error handling under systemd: + * When using systemd, any output is captured by the systemd journal on + * the remote node, accessed with 'journalctl -u flux-housekeeping@*'. + * + * If the housekeeping script fails, the systemd unit file automatically + * drains the node. + * + * Core scheduled instances: + * Note that housekeeping runs after every job even if the job did not + * allocate the whole node. + * + * Job manager module stats: + * 'flux module stats job-manager | jq .housekeeping' returns the following: + * {"running":o} + * "running" is a dictionary of jobids (f58) for jobs currently + * running housekeeping. Each job object consists of: + * {"pending":s "allocated":s, "t_start":f} + * where + * pending: set of ranks on which housekeeping is needed/active + * allocated: set of ranks still allocated by housekeeping + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#ifdef HAVE_ARGZ_ADD +#include +#else +#include "src/common/libmissing/argz.h" +#endif + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/librlist/rlist.h" +#include "src/common/libhostlist/hostlist.h" +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/errprintf.h" +#include "src/common/libjob/idf58.h" +#include "src/common/libsubprocess/bulk-exec.h" +#include "src/common/libsubprocess/command.h" +#include "ccan/str/str.h" + +#include "job.h" +#include "alloc.h" +#include "job-manager.h" +#include "conf.h" + +#include "housekeeping.h" + +// -1 = never, 0 = immediate, >0 = time in seconds +static const double default_release_after = -1; + +struct allocation { + flux_jobid_t id; + struct rlist *rl; // R, diminished each time a subset is released + struct idset *pending; // ranks in need of housekeeping + struct housekeeping *hk; + flux_watcher_t *timer; + bool timer_armed; + bool timer_expired; + int free_count; // number of releases + double t_start; + struct bulk_exec *bulk_exec; +}; + +struct housekeeping { + struct job_manager *ctx; + flux_cmd_t *cmd; // NULL if not configured + double release_after; + char *imp_path; + zlistx_t *allocations; + flux_msg_handler_t **handlers; +}; + +static struct bulk_exec_ops bulk_ops; + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg); + +static void allocation_destroy (struct allocation *a) +{ + if (a) { + int saved_errno = errno; + rlist_destroy (a->rl); + idset_destroy (a->pending); + flux_watcher_destroy (a->timer); + bulk_exec_destroy (a->bulk_exec); + free (a); + errno = saved_errno; + } +} + +// zlistx_destructor_fn footprint +static void allocation_destructor (void **item) +{ + if (item) { + allocation_destroy (*item); + *item = NULL; + } + +} + +static int update_cmd_env (flux_cmd_t *cmd, flux_jobid_t id, uint32_t userid) +{ + if (flux_cmd_setenvf (cmd, 1, "FLUX_JOB_ID", "%ju", (uintmax_t)id) < 0 + || flux_cmd_setenvf (cmd, 1, "FLUX_JOB_USERID", "%u", userid) < 0) + return -1; + return 0; +} + +static struct allocation *allocation_create (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid) +{ + struct allocation *a; + flux_reactor_t *r = flux_get_reactor (hk->ctx->h); + + if (!(a = calloc (1, sizeof (*a)))) + return NULL; + a->hk = hk; + a->id = id; + a->t_start = flux_reactor_now (flux_get_reactor (hk->ctx->h)); + if (!(a->rl = rlist_from_json (R, NULL)) + || !(a->pending = rlist_ranks (a->rl)) + || !(a->timer = flux_timer_watcher_create (r, + 0, + 0., + allocation_timeout, + a)) + || !(a->bulk_exec = bulk_exec_create (&bulk_ops, + "rexec", + id, + "housekeeping", + a)) + || update_cmd_env (hk->cmd, id, userid) < 0 + || bulk_exec_push_cmd (a->bulk_exec, a->pending, hk->cmd, 0) < 0) { + allocation_destroy (a); + return NULL; + } + return a; +} + +/* Return the set of ranks in the remaining resource set (a->rl) which are + * not still pending housekeeping (a->pending). That is: + * + * ranks (a->rl) -= a->pending + * + */ +static struct idset *get_housekept_ranks (struct allocation *a) +{ + struct idset *ranks; + + if (!(ranks = rlist_ranks (a->rl))) + goto error; + if (idset_subtract (ranks, a->pending) < 0) + goto error; + return ranks; +error: + idset_destroy (ranks); + return NULL; +} + +/* Release any resources in a->rl associated with ranks that are no longer + * pending for housekeeping. Then remove them from a->rl. + */ +static void allocation_release (struct allocation *a) +{ + struct job_manager *ctx = a->hk->ctx; + struct idset *ranks = NULL; + struct rlist *rl = NULL; + json_t *R = NULL; + + if ((ranks = get_housekept_ranks (a)) && idset_count (ranks) == 0) { + idset_destroy (ranks); + return; // nothing to do + } + + if (!ranks + || !(rl = rlist_copy_ranks (a->rl, ranks)) + || !(R = rlist_to_R (rl)) + || alloc_send_free_request (ctx->alloc, R, a->id) < 0 + || rlist_remove_ranks (a->rl, ranks) < 0) { + char *s = idset_encode (ranks, IDSET_FLAG_RANGE); + flux_log (ctx->h, + LOG_ERR, + "housekeeping error releasing resources for job %s ranks %s", + idf58 (a->id), + s ? s : "NULL"); + free (s); + } + else + a->free_count++; + json_decref (R); + rlist_destroy (rl); + idset_destroy (ranks); +} + +static void allocation_remove (struct allocation *a) +{ + void *cursor; + flux_log (a->hk->ctx->h, + LOG_DEBUG, + "housekeeping: all resources of %s have been released", + idf58 (a->id)); + if (!(cursor = zlistx_find (a->hk->allocations, a))) { + flux_log (a->hk->ctx->h, + LOG_ERR, + "housekeeping: internal error removing allocation for %s", + idf58 (a->id)); + return; + } + zlistx_delete (a->hk->allocations, cursor); +} + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + struct allocation *a = arg; + + a->timer_expired = true; + + // release the ranks that have completed housekeeping so far + allocation_release (a); + + /* Note: All resources will never be released under the timeout + * because completion of housekeeping on the final rank will + * always release all resources immediately instead of waiting + * for the timer. Therefore, there is no need to check if + * rlist_rnodes (a->rl) is zero here (it never will be). + */ +} + +/* 'rank' has completed housekeeping. + */ +static bool housekeeping_finish_one (struct allocation *a, int rank) +{ + if (!idset_test (a->pending, rank)) + return false; + idset_clear (a->pending, rank); + + if (idset_count (a->pending) == 0 + || a->hk->release_after == 0 + || a->timer_expired) { + allocation_release (a); + } + if (!a->timer_armed && a->hk->release_after > 0) { + flux_timer_watcher_reset (a->timer, a->hk->release_after, 0.); + flux_watcher_start (a->timer); + a->timer_armed = true; + } + return true; +} + +static void bulk_start (struct bulk_exec *bulk_exec, void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + + flux_log (h, LOG_DEBUG, "housekeeping: %s started", idf58 (a->id)); +} + +static void set_failed_reason (const char **s, const char *reason) +{ + if (!*s) + *s = reason; + else if (!streq (*s, reason)) + *s = "multiple failure modes"; +} + +static void bulk_exit (struct bulk_exec *bulk_exec, + void *arg, + const struct idset *ids) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + unsigned int rank; + struct idset *failed_ranks = NULL; + char *failed_ranks_str = NULL; + char *failed_hosts = NULL; + const char *failed_reason = NULL; + + rank = idset_first (ids); + while (rank != IDSET_INVALID_ID) { + if (housekeeping_finish_one (a, rank)) { + flux_subprocess_t *p = bulk_exec_get_subprocess (bulk_exec, rank); + bool fail = false; + int n; + if ((n = flux_subprocess_signaled (p)) > 0) { + fail = true; + set_failed_reason (&failed_reason, strsignal (n)); + } + else { + n = flux_subprocess_exit_code (p); + if (n != 0) { + fail = true; + set_failed_reason (&failed_reason, "nonzero exit code"); + } + } + if (fail) { + if (!failed_ranks) + failed_ranks = idset_create (0, IDSET_FLAG_AUTOGROW); + idset_set (failed_ranks, rank); + } + } + rank = idset_next (ids, rank); + } + // log a consolidated error message for potentially multiple ranks + if (failed_ranks + && (failed_ranks_str = idset_encode (failed_ranks, IDSET_FLAG_RANGE)) + && (failed_hosts = flux_hostmap_lookup (h, failed_ranks_str, NULL)) + && failed_reason) { + flux_log (h, + LOG_ERR, + "housekeeping: %s (rank %s) %s: %s", + failed_hosts, + failed_ranks_str, + idf58 (a->id), + failed_reason); + + } + idset_destroy (failed_ranks); + free (failed_ranks_str); + free (failed_hosts); +} + +static void bulk_complete (struct bulk_exec *bulk_exec, void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + + flux_log (h, LOG_DEBUG, "housekeeping: %s complete", idf58 (a->id)); + allocation_remove (a); +} + +static void bulk_output (struct bulk_exec *bulk_exec, + flux_subprocess_t *p, + const char *stream, + const char *data, + int data_len, + void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + int rank = flux_subprocess_rank (p); + + flux_log (h, + streq (stream, "stderr") ? LOG_ERR : LOG_INFO, + "housekeeping: %s (rank %d) %s: %.*s", + flux_get_hostbyrank (h, rank), + rank, + idf58 (a->id), + data_len, + data); +} + +static void bulk_error (struct bulk_exec *bulk_exec, + flux_subprocess_t *p, + void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + int rank = flux_subprocess_rank (p); + const char *hostname = flux_get_hostbyrank (h, rank); + const char *error = flux_subprocess_fail_error (p); + + flux_log (h, + LOG_ERR, + "housekeeping: %s (rank %d) %s: %s", + hostname, + rank, + idf58 (a->id), + error); + + housekeeping_finish_one (a, rank); +} + +int housekeeping_start (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid) +{ + flux_t *h = hk->ctx->h; + struct allocation *a; + void *list_handle; + + /* Housekeeping is not configured + */ + if (!hk->cmd) + goto skip; + + /* Create the 'allocation' and put it in our list. + */ + if (!(a = allocation_create (hk, R, id, userid)) + || !(list_handle = zlistx_insert (hk->allocations, a, false))) { + flux_log (h, + LOG_ERR, + "housekeeping: %s error saving alloc object (skipping)", + idf58 (id)); + allocation_destroy (a); + goto skip; + } + /* Start bulk execution. + */ + if (bulk_exec_start (h, a->bulk_exec) < 0) { + flux_log (h, + LOG_ERR, + "housekeeping: %s error starting housekeeping tasks", + idf58 (id)); + zlistx_delete (hk->allocations, list_handle); + goto skip; + } + return 0; +skip: + return alloc_send_free_request (hk->ctx->alloc, R, id); +} + +static int housekeeping_hello_respond_one (struct housekeeping *hk, + const flux_msg_t *msg, + struct allocation *a, + flux_error_t *error) +{ + struct job *job; + + if (a->free_count > 0) { + errprintf (error, "partial release is not supported by RFC 27 hello"); + goto error; + } + if (!(job = zhashx_lookup (hk->ctx->inactive_jobs, &a->id)) + && !(job = zhashx_lookup (hk->ctx->active_jobs, &a->id))) { + errprintf (error, "the job could not be looked up during RFC 27 hello"); + goto error; + } + if (flux_respond_pack (hk->ctx->h, + msg, + "{s:I s:I s:I s:f}", + "id", job->id, + "priority", job->priority, + "userid", (json_int_t)job->userid, + "t_submit", job->t_submit) < 0) { + errprintf (error, + "the RFC 27 hello response could not be sent: %s", + strerror (errno)); + goto error; + } + return 0; +error: + return -1; +} + +static void kill_continuation (flux_future_t *f, void *arg) +{ + struct housekeeping *hk = arg; + + if (flux_future_get (f, NULL) < 0) + flux_log (hk->ctx->h, LOG_ERR, "kill: %s", future_strerror (f, errno)); + flux_future_destroy (f); +} + +/* Participate in the scheduler hello protocol, where the scheduler is informed + * of resources that are already allocated. Since partial release is not yet + * supported in the hello protocol, for now, we must let go of any partial + * allocations. Send remaining housekeeping tasks a SIGTERM, log an error, + * and delete the allocation. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg) +{ + struct allocation *a; + flux_error_t error; + + a = zlistx_first (hk->allocations); + while (a) { + if (housekeeping_hello_respond_one (hk, msg, a, &error) < 0) { + char *ranks; + char *hosts = NULL; + flux_future_t *f; + + if ((ranks = idset_encode (a->pending, IDSET_FLAG_RANGE))) + hosts = flux_hostmap_lookup (hk->ctx->h, ranks, NULL); + flux_log (hk->ctx->h, + LOG_ERR, + "housekeeping: %s (rank %s) from %s will be terminated" + " because %s", + hosts ? hosts : "?", + ranks ? ranks : "?", + idf58 (a->id), + error.text); + free (hosts); + free (ranks); + + if (hk->imp_path) { + f = bulk_exec_imp_kill (a->bulk_exec, + hk->imp_path, + NULL, + SIGTERM); + } + else + f = bulk_exec_kill (a->bulk_exec, NULL, SIGTERM); + if (flux_future_then (f, -1, kill_continuation, hk) < 0) + flux_future_destroy (f); + + // delete the allocation to avoid sending frees later + zlistx_delete (hk->allocations, zlistx_cursor (hk->allocations)); + } + a = zlistx_next (hk->allocations); + } + return 0; +} + +static json_t *housekeeping_get_stats_job (struct allocation *a) +{ + struct idset *ranks; + char *s = NULL; + char *p = NULL; + json_t *job = NULL; + + if (!(ranks = rlist_ranks (a->rl)) + || !(p = idset_encode (ranks, IDSET_FLAG_RANGE)) + || !(s = idset_encode (a->pending, IDSET_FLAG_RANGE))) + goto out; + job = json_pack ("{s:f s:s s:s}", + "t_start", a->t_start, + "pending", s, + "allocated", p); +out: + idset_destroy (ranks); + free (s); + free (p); + return job; +} + +/* Support adding a housekeeping object to the the 'job-manager.stats-get' + * response in job-manager.c. + */ +json_t *housekeeping_get_stats (struct housekeeping *hk) +{ + json_t *running; + json_t *stats = NULL; + struct allocation *a; + + if (!(running = json_object ())) + goto nomem; + a = zlistx_first (hk->allocations); + while (a) { + json_t *job; + if (!(job = housekeeping_get_stats_job (a)) + || json_object_set_new (running, idf58 (a->id), job) < 0) { + json_decref (job); + goto nomem; + } + a = zlistx_next (hk->allocations); + } + if (!(stats = json_pack ("{s:O}", "running", running))) + goto nomem; + json_decref (running); + return stats; +nomem: + json_decref (running); + errno = ENOMEM; + return NULL; +} + +/* Support accounting for resources stuck in housekeeping when preparing the + * 'job-manager.resource-status' response in alloc.c. + */ +int housekeeping_stat_append (struct housekeeping *hk, + struct rlist *rl, + flux_error_t *error) +{ + struct allocation *a; + a = zlistx_first (hk->allocations); + while (a) { + if (rlist_append (rl, a->rl) < 0) { + errprintf (error, + "%s: duplicate housekeeping allocation", + idf58 (a->id)); + return -1; + } + a = zlistx_next (hk->allocations); + } + return 0; +} + +static void housekeeping_kill_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct housekeeping *hk = arg; + int signum; + flux_jobid_t jobid = FLUX_JOBID_ANY; + const char *ranks = NULL; + struct idset *ids = NULL; + idset_error_t error; + const char *errmsg = NULL; + struct allocation *a; + flux_future_t *f; + + if (flux_request_unpack (msg, + NULL, + "{s:i s?I s?s}", + "signum", &signum, + "jobid", &jobid, + "ranks", &ranks) < 0) + goto error; + if (ranks) { + if (!(ids = idset_decode_ex (ranks, -1, -1, 0, &error))) { + errmsg = error.text; + goto error; + } + } + a = zlistx_first (hk->allocations); + while (a) { + if (a->id == jobid || jobid == FLUX_JOBID_ANY) { + if (a->bulk_exec) { + if (hk->imp_path) { + f = bulk_exec_imp_kill (a->bulk_exec, + hk->imp_path, + ids, + signum); + } + else + f = bulk_exec_kill (a->bulk_exec, ids, signum); + if (flux_future_then (f, -1, kill_continuation, hk) < 0) + flux_future_destroy (f); + } + } + a = zlistx_next (hk->allocations); + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to housekeeping-kill"); + idset_destroy (ids); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "error responding to housekeeping-kill"); + idset_destroy (ids); +} + +static flux_cmd_t *create_cmd (json_t *cmdline) +{ + size_t index; + json_t *value; + char *argz = NULL; + size_t argz_len = 0; + int argc; + char **argv = NULL; + flux_cmd_t *cmd = NULL; + + json_array_foreach (cmdline, index, value) { + if (!json_is_string (value) + || argz_add (&argz, &argz_len, json_string_value (value)) != 0) + goto done; + } + if ((argc = argz_count (argz, argz_len)) == 0 + || !(argv = calloc (argc + 1, sizeof (argv[0])))) + goto done; + argz_extract (argz, argz_len, argv); + if (!(cmd = flux_cmd_create (argc, argv, environ))) + goto done; +done: + free (argz); + free (argv); + return cmd; +} + +static int housekeeping_parse_config (const flux_conf_t *conf, + flux_error_t *error, + void *arg) +{ + struct housekeeping *hk = arg; + flux_error_t e; + json_t *cmdline = NULL; + const char *release_after = NULL; + flux_cmd_t *cmd = NULL; + int use_systemd_unit = false; + const char *imp_path = NULL; + char *imp_path_cpy = NULL; + + if (flux_conf_unpack (conf, + &e, + "{s?{s?{s?o s?s s?b !}}}", + "job-manager", + "housekeeping", + "command", &cmdline, + "release-after", &release_after, + "use-systemd-unit", &use_systemd_unit) < 0) { + return errprintf (error, + "job-manager.housekeeping.command: %s", + e.text); + } + + // let job-exec handle exec errors + (void)flux_conf_unpack (conf, NULL, "{s?{s?s}}", "exec", "imp", &imp_path); + + if (release_after) { + if (fsd_parse_duration (release_after, &hk->release_after) < 0) + return errprintf (error, + "job-manager.housekeeping.release-after" + " FSD parse error"); + } + if (use_systemd_unit) { + if (!imp_path) { + return errprintf (error, + "job-manager.housekeeeping.use-systemd-unit " + " requires that exec.imp also be defined"); + } + if (cmdline) { + return errprintf (error, + "job-manager.housekeeeping.use-systemd-unit " + " means housekeeping.command must not be defined"); + } + json_t *o; + if ((o = json_pack ("[sss]", imp_path, "run", "housekeeping"))) + cmd = create_cmd (o); + json_decref (o); + if (!cmd) + return errprintf (error, "error creating housekeeping command"); + if (!(imp_path_cpy = strdup (imp_path))) { + flux_cmd_destroy (cmd); + return errprintf (error, "error duplicating IMP path"); + } + } + else if (cmdline) { + if (!(cmd = create_cmd (cmdline))) + return errprintf (error, "error creating housekeeping command"); + } + flux_cmd_destroy (hk->cmd); + hk->cmd = cmd; + free (hk->imp_path); + hk->imp_path = imp_path_cpy; + flux_log (hk->ctx->h, + LOG_DEBUG, + "housekeeping is %sconfigured%s", + hk->cmd ? "" : "not ", + hk->imp_path ? " with IMP" : ""); + return 1; // allow dynamic changes +} + +static const struct flux_msg_handler_spec htab[] = { + { + .typemask = FLUX_MSGTYPE_REQUEST, + .topic_glob = "job-manager.housekeeping-kill", + .cb = housekeeping_kill_cb, + .rolemask = 0 + }, + FLUX_MSGHANDLER_TABLE_END, +}; + +void housekeeping_ctx_destroy (struct housekeeping *hk) +{ + if (hk) { + int saved_errno = errno; + conf_unregister_callback (hk->ctx->conf, housekeeping_parse_config); + flux_cmd_destroy (hk->cmd); + zlistx_destroy (&hk->allocations); + flux_msg_handler_delvec (hk->handlers); + free (hk->imp_path); + free (hk); + errno = saved_errno; + } +} + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx) +{ + struct housekeeping *hk; + flux_error_t error; + + if (!(hk = calloc (1, sizeof (*hk)))) + return NULL; + hk->ctx = ctx; + hk->release_after = default_release_after; + if (!(hk->allocations = zlistx_new ())) { + errno = ENOMEM; + goto error; + } + zlistx_set_destructor (hk->allocations, allocation_destructor); + if (conf_register_callback (ctx->conf, + &error, + housekeeping_parse_config, + hk) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + goto error; + } + if (flux_msg_handler_addvec (ctx->h, htab, hk, &hk->handlers) < 0) + goto error; + return hk; +error: + housekeeping_ctx_destroy (hk); + return NULL; +} + +static struct bulk_exec_ops bulk_ops = { + .on_start = bulk_start, + .on_exit = bulk_exit, + .on_complete = bulk_complete, + .on_output = bulk_output, + .on_error = bulk_error, +}; + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/housekeeping.h b/src/modules/job-manager/housekeeping.h new file mode 100644 index 000000000000..a468d3967632 --- /dev/null +++ b/src/modules/job-manager/housekeeping.h @@ -0,0 +1,45 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_HOUSEKEEPING_H +#define _FLUX_JOB_MANAGER_HOUSEKEEPING_H + +#include +#include "src/common/librlist/rlist.h" +#include "job-manager.h" + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx); +void housekeeping_ctx_destroy (struct housekeeping *hk); + +/* Call this to transfer a job's R to the housekeeping subsystem. The job + * may treat R as freed, but R will remain allocated from the scheduler's + * perspective until the housekeeping script is run on each execution target. + */ +int housekeeping_start (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid); + +/* Call this to add responses to the scheduler's hello request at startup. + * It should inform the scheduler about resources that are still allocated, + * but no longer directly held by jobs. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg); + +json_t *housekeeping_get_stats (struct housekeeping *hk); + +int housekeeping_stat_append (struct housekeeping *hk, + struct rlist *rl, + flux_error_t *error); + + +#endif /* ! _FLUX_JOB_MANAGER_HOUSEKEEPING_H */ + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index f3ff97ec49a1..7c991740880f 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -27,6 +27,7 @@ #include "list.h" #include "urgency.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "event.h" #include "drain.h" @@ -79,22 +80,27 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh, { struct job_manager *ctx = arg; int journal_listeners = journal_listeners_count (ctx->journal); + json_t *housekeeping = housekeeping_get_stats (ctx->housekeeping); + if (!housekeeping) + goto error; if (flux_respond_pack (h, msg, - "{s:{s:i} s:i s:i s:I}", + "{s:{s:i} s:i s:i s:I s:O}", "journal", "listeners", journal_listeners, "active_jobs", zhashx_size (ctx->active_jobs), "inactive_jobs", zhashx_size (ctx->inactive_jobs), - "max_jobid", ctx->max_jobid) < 0) { + "max_jobid", ctx->max_jobid, + "housekeeping", housekeeping) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } - + json_decref (housekeeping); return; error: if (flux_respond_error (h, msg, errno, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + json_decref (housekeeping); } static const struct flux_msg_handler_spec htab[] = { @@ -198,6 +204,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating scheduler interface"); goto done; } + if (!(ctx.housekeeping = housekeeping_ctx_create (&ctx))) { + flux_log_error (h, "error creating resource housekeeping interface"); + goto done; + } if (!(ctx.start = start_ctx_create (&ctx))) { flux_log_error (h, "error creating exec interface"); goto done; @@ -256,6 +266,7 @@ int mod_main (flux_t *h, int argc, char **argv) wait_ctx_destroy (ctx.wait); drain_ctx_destroy (ctx.drain); start_ctx_destroy (ctx.start); + housekeeping_ctx_destroy (ctx.housekeeping); alloc_ctx_destroy (ctx.alloc); submit_ctx_destroy (ctx.submit); event_ctx_destroy (ctx.event); diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index f765b81d145e..7d0f19d58f1b 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -24,6 +24,7 @@ struct job_manager { struct conf *conf; struct start *start; struct alloc *alloc; + struct housekeeping *housekeeping; struct event *event; struct submit *submit; struct drain *drain; From 173ce43dd4c6bdc2b17d7ca750a938a9d96bb416 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 17 Jun 2024 11:22:14 -0700 Subject: [PATCH 04/12] systemd: add housekeeping service unit Problem: housekeeping scripts should be run in a systemd cgroup for reliable termination, logging to the local systemd journal, and debugging using well known systemd tools. Add a systemd "oneshot" service unit for housekeeping, templated by jobid and corresponding script that can be configured as an imp run target. This is similar to what was proposed for prolog/epilog except if the script fails, the node is drained. --- configure.ac | 2 ++ etc/Makefile.am | 4 +++- etc/flux-housekeeping@.service.in | 21 +++++++++++++++++++++ src/cmd/Makefile.am | 3 ++- src/cmd/flux-run-housekeeping.in | 21 +++++++++++++++++++++ 5 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 etc/flux-housekeeping@.service.in create mode 100755 src/cmd/flux-run-housekeeping.in diff --git a/configure.ac b/configure.ac index ed62589af0e3..d353e0620738 100644 --- a/configure.ac +++ b/configure.ac @@ -599,6 +599,8 @@ AC_CONFIG_FILES( \ etc/flux-hostlist.pc \ etc/flux-taskmap.pc \ etc/flux.service \ + etc/flux-housekeeping@.service \ + src/cmd/flux-run-housekeeping \ doc/Makefile \ doc/test/Makefile \ t/Makefile \ diff --git a/etc/Makefile.am b/etc/Makefile.am index 38015b321f94..9e529c7160df 100644 --- a/etc/Makefile.am +++ b/etc/Makefile.am @@ -1,5 +1,7 @@ #if HAVE_SYSTEMD -systemdsystemunit_DATA = flux.service +systemdsystemunit_DATA = \ + flux.service \ + flux-housekeeping@.service #endif tmpfilesdir = $(prefix)/lib/tmpfiles.d diff --git a/etc/flux-housekeeping@.service.in b/etc/flux-housekeeping@.service.in new file mode 100644 index 000000000000..c474a8aa6f8b --- /dev/null +++ b/etc/flux-housekeeping@.service.in @@ -0,0 +1,21 @@ +[Unit] +Description=Housekeeping for Flux job %I +CollectMode=inactive-or-failed + +[Service] +Type=oneshot +EnvironmentFile=-@X_RUNSTATEDIR@/flux-housekeeping@%I.env +ExecStart=@X_SYSCONFDIR@/flux/system/housekeeping +ExecStopPost=-rm -f @X_RUNSTATEDIR@/flux-housekeeping@%I.env +ExecStopPost=-sh -c '\ + if test "$SERVICE_RESULT" != "success"; then \ + if test "$EXIT_CODE" = "killed" -o "$EXIT_CODE" = "dumped"; then \ + message="killed by SIG${EXIT_STATUS}"; \ + elif test "$EXIT_CODE" = "exited"; then \ + message="exited with exit code $EXIT_CODE"; \ + else \ + message="code=$EXIT_CODE status=$EXIT_STATUS"; \ + fi; \ + flux resource drain $(flux getattr rank) "housekeeping $message"; \ + fi \ +' diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 7e4d5046a2e9..832b099cf6aa 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -122,7 +122,8 @@ dist_fluxcmd_SCRIPTS = \ flux-imp-exec-helper \ py-runner.py \ flux-hostlist.py \ - flux-post-job-event.py + flux-post-job-event.py \ + flux-run-housekeeping fluxcmd_PROGRAMS = \ flux-terminus \ diff --git a/src/cmd/flux-run-housekeeping.in b/src/cmd/flux-run-housekeeping.in new file mode 100755 index 000000000000..e19926389d7b --- /dev/null +++ b/src/cmd/flux-run-housekeeping.in @@ -0,0 +1,21 @@ +#!/bin/sh + +if test $FLUX_JOB_ID; then + FLUX_JOB_ID=$(flux job id --to=f58plain $FLUX_JOB_ID) +fi +unitname=flux-housekeeping@${FLUX_JOB_ID:-unknown} + +terminate() { + systemctl stop $unitname + exit 1 +} + +trap terminate INT TERM + +umask 022 +printenv >@X_RUNSTATEDIR@/${unitname}.env + +# Run systemctl start in background and `wait` for it so that the trap +# will run immediately when signal is received: +systemctl start $unitname --quiet & +wait From 4ddfca2c04897e40a1a1e75c9a67515c5bcdbaec Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 20 Jun 2024 12:41:01 -0700 Subject: [PATCH 05/12] libsubprocess: add ranks parameter to bulk kill Problem: bulk_exec does not offer an interface to kill a subset of the running subprocesses. Add a ranks parameter to bulk_exec_kill() and bulk_exec_imp_kill(). Set it to NULL to target all subprocesses like before. Update test and users. --- src/common/libsubprocess/bulk-exec.c | 13 +++++++++---- src/common/libsubprocess/bulk-exec.h | 7 ++++++- src/common/libsubprocess/test/bulk-exec.c | 4 ++-- src/modules/job-exec/exec.c | 4 ++-- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/common/libsubprocess/bulk-exec.c b/src/common/libsubprocess/bulk-exec.c index b5741d2d9dcd..f8affb70d292 100644 --- a/src/common/libsubprocess/bulk-exec.c +++ b/src/common/libsubprocess/bulk-exec.c @@ -564,7 +564,9 @@ void bulk_exec_kill_log_error (flux_future_t *f, flux_jobid_t id) } } -flux_future_t *bulk_exec_kill (struct bulk_exec *exec, int signum) +flux_future_t *bulk_exec_kill (struct bulk_exec *exec, + const struct idset *ranks, + int signum) { flux_subprocess_t *p = zlist_first (exec->processes); flux_future_t *cf = NULL; @@ -574,8 +576,9 @@ flux_future_t *bulk_exec_kill (struct bulk_exec *exec, int signum) flux_future_set_flux (cf, exec->h); while (p) { - if (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING - || flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT) { + if ((!ranks || idset_test (ranks, flux_subprocess_rank (p))) + && (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING + || flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT)) { flux_future_t *f = NULL; char s[64]; if (!(f = flux_subprocess_kill (p, signum))) { @@ -672,6 +675,7 @@ static int bulk_exec_push_one (struct bulk_exec *exec, */ flux_future_t *bulk_exec_imp_kill (struct bulk_exec *exec, const char *imp_path, + const struct idset *ranks, int signum) { struct bulk_exec *killcmd = NULL; @@ -699,7 +703,8 @@ flux_future_t *bulk_exec_imp_kill (struct bulk_exec *exec, p = zlist_first (exec->processes); while (p) { - if ((flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING + if ((!ranks || idset_test (ranks, flux_subprocess_rank (p))) + && (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING || flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT)) { pid_t pid = flux_subprocess_pid (p); diff --git a/src/common/libsubprocess/bulk-exec.h b/src/common/libsubprocess/bulk-exec.h index 63fcfee380f7..314410496abc 100644 --- a/src/common/libsubprocess/bulk-exec.h +++ b/src/common/libsubprocess/bulk-exec.h @@ -69,7 +69,11 @@ int bulk_exec_push_cmd (struct bulk_exec *exec, int bulk_exec_start (flux_t *h, struct bulk_exec *exec); -flux_future_t * bulk_exec_kill (struct bulk_exec *exec, int signal); +/* Set ranks=NULL for all + */ +flux_future_t * bulk_exec_kill (struct bulk_exec *exec, + const struct idset *ranks, + int signal); /* Log per-rank kill errors for a failed bulk_exec_kill() RPC. */ @@ -77,6 +81,7 @@ void bulk_exec_kill_log_error (flux_future_t *f, flux_jobid_t id); flux_future_t *bulk_exec_imp_kill (struct bulk_exec *exec, const char *imp_path, + const struct idset *ranks, int signal); int bulk_exec_cancel (struct bulk_exec *exec); diff --git a/src/common/libsubprocess/test/bulk-exec.c b/src/common/libsubprocess/test/bulk-exec.c index 31adeb919abf..9f96e1c149f2 100644 --- a/src/common/libsubprocess/test/bulk-exec.c +++ b/src/common/libsubprocess/test/bulk-exec.c @@ -57,7 +57,7 @@ void on_error (struct bulk_exec *exec, flux_subprocess_t *p, void *arg) (uintmax_t) flux_subprocess_pid (p), flux_subprocess_state_string (state)); } - flux_future_t *f = bulk_exec_kill (exec, 9); + flux_future_t *f = bulk_exec_kill (exec, NULL, 9); if (flux_future_get (f, NULL) < 0) log_err_exit ("bulk_exec_kill"); } @@ -85,7 +85,7 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w, int signum = flux_signal_watcher_get_signum (w); log_msg ("sending signal %d to all tasks\n", signum); - flux_future_t *f = bulk_exec_kill (exec, signum); + flux_future_t *f = bulk_exec_kill (exec, NULL, signum); if (!f || (flux_future_then (f, -1., kill_cb, exec) < 0)) log_err ("SIGINT: failed to forward signal %d", signum); flux_watcher_stop (w); diff --git a/src/modules/job-exec/exec.c b/src/modules/job-exec/exec.c index afd7d3959d83..39a447a1ea8f 100644 --- a/src/modules/job-exec/exec.c +++ b/src/modules/job-exec/exec.c @@ -575,9 +575,9 @@ static int exec_kill (struct jobinfo *job, int signum) flux_future_t *f; if (job->multiuser) - f = bulk_exec_imp_kill (exec, config_get_imp_path (), signum); + f = bulk_exec_imp_kill (exec, config_get_imp_path (), NULL, signum); else - f = bulk_exec_kill (exec, signum); + f = bulk_exec_kill (exec, NULL, signum); if (!f) { if (errno != ENOENT) flux_log_error (job->h, "%s: bulk_exec_kill", idf58 (job->id)); From 59857ab5a0bdac6b3d7a9a2a9e5a33d080d734c9 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 25 Jun 2024 23:14:03 +0000 Subject: [PATCH 06/12] cmd: add flux-housekeeping(1) Problem: There is interface to query information about the housekeeping subsystem. Add a new flux-housekeeping(1) command, which can be used to query or terminate housekeeping tasks. --- src/cmd/Makefile.am | 3 +- src/cmd/flux-housekeeping.py | 227 +++++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 1 deletion(-) create mode 100755 src/cmd/flux-housekeeping.py diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 832b099cf6aa..f67edf8b6568 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -123,7 +123,8 @@ dist_fluxcmd_SCRIPTS = \ py-runner.py \ flux-hostlist.py \ flux-post-job-event.py \ - flux-run-housekeeping + flux-run-housekeeping \ + flux-housekeeping.py fluxcmd_PROGRAMS = \ flux-terminus \ diff --git a/src/cmd/flux-housekeeping.py b/src/cmd/flux-housekeeping.py new file mode 100755 index 000000000000..69012dcce56f --- /dev/null +++ b/src/cmd/flux-housekeeping.py @@ -0,0 +1,227 @@ +############################################################# +# Copyright 2024 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################## + +import argparse +import logging +import sys +import time + +import flux +import flux.util +from flux.hostlist import Hostlist +from flux.idset import IDset +from flux.job import JobID +from flux.util import UtilConfig + +LOGGER = logging.getLogger("flux-housekeeping") + + +class FluxHousekeepingConfig(UtilConfig): + """flux-housekeeping specific configuration""" + + builtin_formats = { + "default": { + "description": "Default flux-housekeeping format string", + "format": ( + "{id.f58:>12} {nnodes:>6} {pending.nnodes:>7} " + "{runtime!F:>8} {nodelist}" + ), + }, + "pending": { + "description": "flux-housekeeping format string including active nodes only", + "format": ( + "{id.f58:>12} {pending.nnodes:>7} {runtime!F:>8} {pending.nodelist}" + ), + }, + } + + def __init__(self): + initial_dict = {"formats": dict(self.builtin_formats)} + super().__init__(name="flux-housekeeping", initial_dict=initial_dict) + + +class HKFormat(flux.util.OutputFormat): + + headings = { + "id": "JOBID", + "id.dec": "JOBID", + "id.hex": "JOBID", + "id.f58": "JOBID", + "id.f58plain": "JOBID", + "id.emoji": "JOBID", + "id.kvs": "JOBID", + "id.words": "JOBID", + "id.dothex": "JOBID", + "t_start": "T_START", + "runtime": "RUNTIME", + "ranks": "RANKS", + "nodelist": "NODELIST", + "nnodes": "NNODES", + "allocated.ranks": "RANKS", + "allocated.nodelist": "NODELIST", + "allocated.nnodes": "NNODES", + "pending.ranks": "ACTIVE_RANKS", + "pending.nodelist": "ACTIVE_NODES", + "pending.nnodes": "#ACTIVE", + } + + +class HousekeepingSet: + """Container for a set of ranks with ranks, nnodes, nodelist properties""" + + def __init__(self, ranks, hostlist): + self.ranks = IDset(ranks) + self.nnodes = len(self.ranks) + self.nodelist = hostlist[self.ranks] + + +class HousekeepingJob: + def __init__(self, jobid, stats_info, hostlist): + self.id = JobID(jobid) + self.t_start = stats_info["t_start"] + self.runtime = time.time() - self.t_start + self.pending = HousekeepingSet(stats_info["pending"], hostlist) + self.allocated = HousekeepingSet(stats_info["allocated"], hostlist) + + @property + def ranks(self): + return self.allocated.ranks + + @property + def nnodes(self): + return self.allocated.nnodes + + @property + def nodelist(self): + return self.allocated.nodelist + + +def housekeeping_list(args): + handle = flux.Flux() + + hostlist = Hostlist(handle.attr_get("hostlist")) + stats = handle.rpc("job-manager.stats-get", {}).get() + + jobs = [] + for jobid, info in stats["housekeeping"]["running"].items(): + jobs.append(HousekeepingJob(jobid, info, hostlist)) + + fmt = FluxHousekeepingConfig().load().get_format_string(args.format) + try: + formatter = HKFormat(fmt) + except ValueError as err: + raise ValueError(f"Error in user format: {err}") + + formatter.print_items(jobs, no_header=args.no_header) + + +def housekeeping_kill(args): + handle = flux.Flux() + payload = {"signum": args.signal} + + # Require one selection option (do not default to --all) + if args.jobid is None and args.targets is None and not args.all: + raise ValueError("specify at least one of --targets, --jobid, or --all") + if args.all and args.jobid is not None: + raise ValueError("do not specify --jobid with --all") + if args.jobid: + payload["jobid"] = args.jobid + if args.targets: + try: + ranks = IDset(args.targets) + except ValueError: + try: + hosts = Hostlist(args.targets) + except ValueError: + raise ValueError("--targets must be a valid Idset or Hostlist") + hostlist = Hostlist(handle.attr_get("hostlist")) + ranks = IDset() + for host in hosts: + try: + ranks.set(hostlist.find(host)) + except OSError: + raise ValueError(f"didn't find {host} in instance hostlist") + payload["ranks"] = str(ranks) + flux.Flux().rpc("job-manager.housekeeping-kill", payload).get() + + +def parse_args(): + parser = argparse.ArgumentParser(prog="flux-housekeeping") + subparsers = parser.add_subparsers( + title="subcommands", description="", dest="subcommand" + ) + subparsers.required = True + + list_parser = subparsers.add_parser( + "list", formatter_class=flux.util.help_formatter() + ) + list_parser.add_argument( + "-n", + "--no-header", + action="store_true", + help="Suppress printing of header line", + ) + list_parser.add_argument( + "-o", + "--format", + type=str, + default="default", + metavar="FORMAT", + help="Specify output format using Python's string format syntax " + + " or a defined format by name (use 'help' to get a list of names)", + ) + list_parser.set_defaults(func=housekeeping_list) + + kill_parser = subparsers.add_parser( + "kill", formatter_class=flux.util.help_formatter() + ) + kill_parser.add_argument( + "-s", + "--signal", + metavar="SIGNUM", + type=int, + default=15, + help="Specify signal number to send to housekeeping task", + ) + kill_parser.add_argument( + "-t", + "--targets", + metavar="RANKS|HOSTS", + type=str, + help="Only target specific ranks or hostnames", + ) + kill_parser.add_argument( + "-j", + "--jobid", + type=JobID, + help='target housekeeping tasks for this jobid or "all" for all jobs', + ) + kill_parser.add_argument( + "--all", action="store_true", help="kill all active housekeeping tasks" + ) + kill_parser.set_defaults(func=housekeeping_kill) + + return parser.parse_args() + + +@flux.util.CLIMain(LOGGER) +def main(): + sys.stdout = open( + sys.stdout.fileno(), "w", encoding="utf8", errors="surrogateescape" + ) + sys.stderr = open( + sys.stderr.fileno(), "w", encoding="utf8", errors="surrogateescape" + ) + args = parse_args() + args.func(args) + + +if __name__ == "__main__": + main() From 875fde877c294c05955258f65c456f0568f28976 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 26 Jun 2024 01:55:52 +0000 Subject: [PATCH 07/12] etc: add flux-housekeeping(1) bash completions Problem: The flux-housekeeping(1) command doesn't have any bash tab completions. Add them. --- etc/completions/flux.pre | 43 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/etc/completions/flux.pre b/etc/completions/flux.pre index fea5e0d63ab5..0cbf7e93e80b 100644 --- a/etc/completions/flux.pre +++ b/etc/completions/flux.pre @@ -2014,6 +2014,46 @@ _flux_broker() return 0 } +# flux-housekeeping(1) completions +_flux_housekeeping() +{ + local cmd=$1 + local subcmds="list kill" + local split=false + list_OPTS="\ + -o --format= \ + -n --no-header \ + " + kill_OPTS="\ + -s --signal= \ + -t --targets= \ + " + _flux_split_longopt && split=true + case $prev in + --format | -!(-*)o) + _flux_complete_format_name flux housekeeping list + return + ;; + esac + if [[ $cmd != "housekeeping" ]]; then + if [[ $cur != -* ]]; then + if _flux_contains_word ${cmd} "kill"; then + hk_active=$(flux housekeeping list -no $(_flux_id_fmt $cur)) + COMPREPLY=( $(compgen -W "${hk_active}" -- "$cur") ) + return 0 + fi + fi + var="${cmd//-/_}_OPTS" + COMPREPLY=( $(compgen -W "${!var}" -- "$cur") ) + if [[ "${COMPREPLY[@]}" == *= ]]; then + # no space if suggestions ends with '=' + compopt -o nospace + fi + else + COMPREPLY=( $(compgen -W "${subcmds}" -- "$cur") ) + fi +} + _flux_core() { local cur prev cmd subcmd matched @@ -2136,6 +2176,9 @@ _flux_core() compopt -o default COMPREPLY=() ;; + housekeeping) + _flux_housekeeping $subcmd + ;; -*) COMPREPLY=( $(compgen -W "${FLUX_OPTS}" -- "$cur") ) ;; From 201cf87c8e851096f5cf6485bd14ee84c15821bb Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 21 Mar 2024 11:49:24 -0700 Subject: [PATCH 08/12] testsuite: cover job-manager housekeeping Problem: there is no test coverage for job-manager housekeeping. Add a sharness script. --- t/Makefile.am | 1 + t/t2226-housekeeping.t | 325 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100755 t/t2226-housekeeping.t diff --git a/t/Makefile.am b/t/Makefile.am index bb3c31e81c02..87d1509f7a07 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -144,6 +144,7 @@ TESTSCRIPTS = \ t2222-job-manager-limit-job-size.t \ t2223-job-manager-queue-priority-order-limited.t \ t2224-job-manager-queue-priority-order-unlimited.t \ + t2226-housekeeping.t \ t2230-job-info-lookup.t \ t2231-job-info-eventlog-watch.t \ t2232-job-info-security.t \ diff --git a/t/t2226-housekeeping.t b/t/t2226-housekeeping.t new file mode 100755 index 000000000000..4ac6e23cd04e --- /dev/null +++ b/t/t2226-housekeeping.t @@ -0,0 +1,325 @@ +#!/bin/sh +test_description='Test job manager housekeeping' + +. $(dirname $0)/sharness.sh + +test_under_flux 4 + +flux setattr log-stderr-level 1 + +# Usage: list_jobs +list_jobs () { + flux housekeeping list -no {id} +} + +# Usage: kill_all signum +kill_all () { + flux housekeeping kill --all --signal=$1 +} + +# Usage: kill_job jobid signum +kill_job () { + flux housekeeping kill --jobid=$1 --signal=$2 +} + +# Usage: kill_ranks idset signum +kill_ranks () { + flux housekeeping kill --targets=$1 --signal=$2 +} + +# Note: the hand off of resources to housekeeping occurs just before the job +# becomes inactive, therefore it is safe to assume that housekeeping has run +# for the job if it is enclosed between successful 'wait_for_running 0' calls. +# This pattern is used repeatedly in the tests below. + +# Usage: wait_for_running count +wait_for_running () { + count=0 + while test $(list_jobs | wc -l) -ne $1; do + count=$(($count+1)); + test $count -eq 300 && return 1 # max 300 * 0.1s sleep = 30s + sleep 0.1 + done +} + +test_expect_success 'flux-housekeeping utility exists' ' + flux housekeeping list --help && + flux housekeeping kill --help +' +test_expect_success 'flux-housekeeping kill fails without proper args' ' + test_must_fail flux housekeeping kill && + test_must_fail flux housekeeping kill --all --jobid=f1 +' +test_expect_success 'dump housekeeping stats, presumed empty' ' + flux module stats job-manager | jq .housekeeping +' + +# Note: the broker runs housekeeping in its cwd rather than the test script's +# (the trash dir) so $(pwd) is expanded at script creation time, but +# \$(flux getattr rank) is expanded at runtime. + +test_expect_success 'create housekeeping script' ' + cat >housekeeping.sh <<-EOT && + #!/bin/sh + touch $(pwd)/hkflag.\$(flux getattr rank) + EOT + chmod +x housekeeping.sh && + test_debug "cat housekeeping.sh" +' +test_expect_success 'configure housekeeping without partial release' ' + flux config load <<-EOT && + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping.sh" ] + EOT + test_debug "flux config get job-manager.housekeeping" +' +test_expect_success 'run a job on broker ranks 1-2 and wait for housekeeping' ' + test_debug "flux dmesg -C" && + rm -f hkflag.* && + wait_for_running 0 && + flux run -N2 --requires=ranks:1-2 true && + wait_for_running 0 +' +test_expect_success 'housekeeping script ran on ranks 1-2' ' + test_debug "flux dmesg -H" && + test_debug "echo $(pwd)/hkflag.*" && + test -f hkflag.1 -a -f hkflag.2 +' +test_expect_success 'configure housekeeping with immediate release' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping.sh" ] + release-after = "0" + EOT +' +test_expect_success 'run a job on all four ranks and wait for housekeeping' ' + rm -f hkflag.* && + flux dmesg -C && + wait_for_running 0 && + flux run -n4 -N4 true && + wait_for_running 0 +' +test_expect_success 'housekeeping script ran on ranks 0-3' ' + test_debug "flux dmesg -H" && + test_debug "echo $(pwd)/hkflag.*" && + test -f hkflag.0 -a -f hkflag.1 -a -f hkflag.2 -a -f hkflag.3 +' +test_expect_success 'nodes were returned to scheduler separately' ' + flux dmesg -H | grep sched-simple >sched.log && + grep "free: rank0" sched.log && + grep "free: rank1" sched.log && + grep "free: rank2" sched.log && + grep "free: rank3" sched.log +' +test_expect_success 'create housekeeping script with one 10s straggler' ' + cat >housekeeping2.sh <<-EOT && + #!/bin/sh + rank=\$(flux getattr rank) + test \$rank -eq 3 && sleep 10 + touch $(pwd)/hkflag.\$rank + EOT + chmod +x housekeeping2.sh +' +test_expect_success 'configure housekeeping with release after 5s' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping2.sh" ] + release-after = "5s" + EOT +' +test_expect_success 'run a job on all four ranks and wait for housekeeping' ' + rm -f hkflag.* && + flux dmesg -C && + wait_for_running 0 && + flux run -n4 -N4 true && + sleep 1 && + flux housekeeping list && + wait_for_running 0 +' +test_expect_success 'dump stats' ' + flux module stats job-manager | jq .housekeeping +' +test_expect_success 'housekeeping script ran on ranks 0-3' ' + test_debug "flux dmesg -H" && + test_debug "echo $(pwd)/hkflag.*" && + test -f hkflag.0 -a -f hkflag.1 -a -f hkflag.2 -a -f hkflag.3 +' +test_expect_success 'there was one alloc and two frees to the scheduler' ' + flux dmesg -H | grep sched-simple >sched2.log && + grep "free: rank\[0-2\]" sched2.log && + grep "free: rank3" sched2.log +' +test_expect_success 'configuring housekeeping with bad key fails' ' + test_must_fail flux config load 2>load.err <<-EOT && + [job-manager.housekeeping] + xyz = 42 + EOT + grep "left unpacked" load.err +' +test_expect_success 'configuring housekeeping with bad fsd fails' ' + test_must_fail flux config load 2>load2.err <<-EOT && + [job-manager.housekeeping] + command = [ "/bin/true" ] + release-after = "foo" + EOT + grep "FSD parse error" load2.err +' +test_expect_success 'configure housekeeping with wrong path' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "/noexist" ] + EOT +' +test_expect_success 'run a job and ensure error was logged' ' + flux dmesg -C && + wait_for_running 0 && + flux run true && + wait_for_running 0 && + flux dmesg | grep "error launching process" +' +test_expect_success 'create housekeeping script with one failing rank (3)' ' + cat >housekeeping3.sh <<-EOT && + #!/bin/sh + test \$(flux getattr rank) -ne 3 + EOT + chmod +x housekeeping3.sh +' +test_expect_success 'configure housekeeping with one failing rank' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping3.sh" ] + EOT +' +test_expect_success 'run a job across all ranks and wait for housekeeping' ' + flux dmesg -C && + wait_for_running 0 && + flux run -N4 true && + wait_for_running 0 && + flux dmesg | grep "nonzero exit code" +' +test_expect_success 'create housekeeping script that creates output' ' + cat >housekeeping4.sh <<-EOT && + #!/bin/sh + echo housekeeping-output + echo housekeeping-output >&2 + EOT + chmod +x housekeeping4.sh +' +test_expect_success 'configure housekeeping to print to stdout' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping4.sh" ] + EOT +' +test_expect_success 'run a job and ensure script output was logged' ' + flux dmesg -C && + wait_for_running 0 && + flux run true && + wait_for_running 0 && + flux dmesg | grep housekeeping-output >output && + test $(wc -l housekeeping5.sh <<-EOT && + #!/bin/sh + printenv >$(pwd)/env.out + EOT + chmod +x housekeeping5.sh +' +test_expect_success 'configure housekeeping to dump environment' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping5.sh" ] + EOT +' +test_expect_success 'run a job on rank 3, wait for hk, and check environment' ' + wait_for_running 0 && + flux run --requires=rank:3 true && + wait_for_running 0 && + grep "^FLUX_JOB_ID=$(flux job last | flux job id --to=dec)$" env.out && + grep "^FLUX_JOB_USERID=$(id -u)$" env.out && + grep "^FLUX_URI=$(flux exec -r 3 flux getattr local-uri)$" env.out +' +test_expect_success 'configure housekeeping to sleep forever' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "sleep", "inf" ] + EOT +' +test_expect_success 'run two jobs that trigger housekeeping' ' + wait_for_running 0 && + flux submit --cc=0-1 -N2 --wait true +' +test_expect_success 'housekeeping is running for 2 jobs' ' + wait_for_running 2 +' +test_expect_success 'send SIGTERM to all jobs' ' + kill_all 15 +' +test_expect_success 'wait for housekeeping to finish' ' + wait_for_running 0 +' +test_expect_success 'run a job that trigger housekeeping' ' + wait_for_running 0 && + flux run -N4 true +' +test_expect_success 'housekeeping is running for 1 job' ' + wait_for_running 1 +' +test_expect_success 'send SIGTERM to the job by id' ' + kill_job $(flux job last | flux job id --to=dec) 15 +' +test_expect_success 'wait for housekeeping to finish' ' + wait_for_running 0 +' +test_expect_success 'run 4 jobs that trigger housekeeping' ' + wait_for_running 0 && + flux submit --cc=0-3 -N1 true +' +test_expect_success 'housekeeping is running for 4 jobs' ' + wait_for_running 4 +' +test_expect_success 'flux resource list shows 4 nodes allocated' ' + test $(flux resource list -s allocated -no {nnodes}) -eq 4 +' +test_expect_success 'flux housekeeping list shows 4 jobs' ' + test_debug "flux housekeeping list" && + test $(flux housekeeping list -n | wc -l) -eq 4 +' +test_expect_success 'send SIGTERM to the nodes by rank' ' + kill_ranks 0-3 15 +' +test_expect_success 'wait for housekeeping to finish' ' + wait_for_running 0 +' +test_expect_success 'flux resource list shows 0 nodes allocated' ' + test $(flux resource list -s allocated -no {nnodes}) -eq 0 +' +# The following tests exercise recovery from RFC 27 hello protocol +# with partial release. Once partial release is added to RFC 27, these +# tests should be removed or changed. +test_expect_success 'configure housekeeping with immediate release' ' + flux config load <<-EOT + [job-manager.housekeeping] + command = [ "$(pwd)/housekeeping2.sh" ] + release-after = "0" + EOT +' +test_expect_success 'run job that uses 4 nodes to trigger housekeeping' ' + flux run -N4 true +' +test_expect_success 'housekeeping is running for 1 job' ' + wait_for_running 1 +' +test_expect_success 'reload scheduler' ' + flux dmesg -C && + flux module reload -f sched-simple && + flux dmesg -H +' +test_expect_success 'wait for housekeeping to finish' ' + wait_for_running 0 +' +test_expect_success 'housekeeping jobs were terminated due to sched reload' ' + flux dmesg | grep "housekeeping:.*will be terminated" +' +test_done From b30db050f67df75e89596fa727820ffb5d75e183 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 28 Jun 2024 15:44:20 +0000 Subject: [PATCH 09/12] doc: add flux-housekeeping(1) Problem: There is no manual for flux-housekeeping(1). Add a simple man page for flux-housekeeping(1). --- doc/Makefile.am | 3 +- doc/man1/flux-housekeeping.rst | 137 +++++++++++++++++++++++++++++++++ doc/manpages.py | 1 + 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 doc/man1/flux-housekeeping.rst diff --git a/doc/Makefile.am b/doc/Makefile.am index c0f16c088ca5..f7cb422d601a 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -55,7 +55,8 @@ MAN1_FILES_PRIMARY = \ man1/flux-cancel.1 \ man1/flux-watch.1 \ man1/flux-update.1 \ - man1/flux-hostlist.1 + man1/flux-hostlist.1 \ + man1/flux-housekeeping.1 # These files are generated as clones of a primary page. # Sphinx handles this automatically if declared in the conf.py diff --git a/doc/man1/flux-housekeeping.rst b/doc/man1/flux-housekeeping.rst new file mode 100644 index 000000000000..d64cfea98d1b --- /dev/null +++ b/doc/man1/flux-housekeeping.rst @@ -0,0 +1,137 @@ +==================== +flux-housekeeping(1) +==================== + + +SYNOPSIS +======== + +| **flux** **housekeeping** **list** [*-n*] [*-o FORMAT*] +| **flux** **housekeeping** **kill** [*--all*] [*-j JOBID*] [*-t HOSTS|RANKS*] [*-s SIGNUM*] + + +DESCRIPTION +=========== + +.. program:: flux housekeeping + +The `EXPERIMENTAL`_ housekeeping service provides similar functionality to +a job epilog, with a few advantages + + - Housekeeping runs after the job, which is then allowed to exit CLEANUP + state and become inactive once resources are released. + - While housekeeping is running, the scheduler still thinks resources are + allocated to the job, and will not allocate resources to other jobs. + - Housekeeping supports partial release of resources back to the scheduler, + such that a subset of stuck nodes do not hold up other nodes from + being returned to service. + +The :program:`flux housekeeping` command is used to interact with the +housekeeping service. It supports listing the resources currently executing +housekeeping actions and a command to forcibly terminate actions on a per-job +or per-node basis. + + +COMMANDS +======== + +list +---- + +.. program:: flux housekeeping list + +:program:`flux housekeeping list` lists active housekeeping tasks by jobid. + + +.. option:: -o, --format=FORMAT + + Customize the output format (See the `OUTPUT FORMAT`_ section below). + +.. option:: -n, --no-header + + Suppress header from output. + +kill +---- + +.. program:: flux housekeeping kill + +:program:`flux housekeeping kill` can be used to terminate active housekeeping +tasks. Housekeeping may be terminated by jobid, a set of targets such as +broker ranks or hostnames, or all housekeeping may be terminated via the +:option:`--all` option. + +.. option:: -s, --signal=SIGNUM + + Send signal SIGNUM instead of SIGTERM. + +.. option:: -t, --targets=RANK|HOSTS + + Target a specific set of ranks or hosts. + +.. option:: -j, --jobid=JOBID + + Target a specific job by JOBID. Without ``--targets`` this will kill all + housekeeping tasks for the specified job. + +.. option:: --all + + Target all housekeeping tasks for all jobs. + +OUTPUT FORMAT +============= + +The :option:`--format` option can be used to specify an output format using +Python's string format syntax or a defined format by name. For a list of +built-in and configured formats use :option:`-o help`. + +The following field names can be specified for +:command:`flux housekeeping list`: + +**id** + The jobid that triggered housekeeping + +**runtime** + The time since this housekeeping task started + +**nnodes** + A synonym for **allocated.nnodes** + +**ranks** + A synonym for **allocated.ranks** + +**nodelist** + A synonym for **allocated.nodelist** + +**allocated.nnodes** + The number of nodes still allocated to this housekeeping task. + +**allocated.ranks** + The list of broker ranks still allocated to this housekeeping task. + +**allocated.ranks** + The list of nodes still allocated to this housekeeping task. + +**pending.nnodes** + The number of nodes that still need to complete housekeeping. + +**pending.ranks** + The list of broker ranks that still need to complete housekeeping. + +**pending.ranks** + The list of nodes that still need to complete housekeeping. + +EXPERIMENTAL +============ + +.. include:: common/experimental.rst + +RESOURCES +========= + +.. include:: common/resources.rst + +SEE ALSO +======== + +:man5:`flux-config-job-manager` diff --git a/doc/manpages.py b/doc/manpages.py index 1509742ee120..24baf7cc0957 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -70,6 +70,7 @@ ('man1/flux-watch', 'flux-watch', 'monitor one or more Flux jobs', [author], 1), ('man1/flux-update', 'flux-update', 'update active Flux jobs', [author], 1), ('man1/flux-hostlist', 'flux-hostlist', 'fetch, combine, and manipulate Flux hostlists', [author], 1), + ('man1/flux-housekeeping', 'flux-housekeeping', 'list and terminate housekeeping tasks', [author], 1), ('man3/flux_attr_get', 'flux_attr_set', 'get/set Flux broker attributes', [author], 3), ('man3/flux_attr_get', 'flux_attr_get', 'get/set Flux broker attributes', [author], 3), ('man3/flux_aux_set', 'flux_aux_get', 'get/set auxiliary handle data', [author], 3), From 18c70b4a57513c9300f38094d67ec08ae6823871 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 28 Jun 2024 15:44:57 +0000 Subject: [PATCH 10/12] doc: add housekeeping documentation to flux-config-job-manager(5) Problem: The job-manager.housekeeping config table is not documented. Document it in flux-config-job-manager(5). --- doc/man5/flux-config-job-manager.rst | 45 ++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/doc/man5/flux-config-job-manager.rst b/doc/man5/flux-config-job-manager.rst index ea0a75e8f643..52f9d32c0c4a 100644 --- a/doc/man5/flux-config-job-manager.rst +++ b/doc/man5/flux-config-job-manager.rst @@ -28,6 +28,20 @@ plugins Each directive follows the format defined in the :ref:`plugin_directive` section. +housekeeping + (optional) Table of configuration for the job-manager housekeeping + service. The housekeeping service is an `EXPERIMENTAL`_ alternative for + handling administrative job epilog workloads. If enabled, resources are + released by jobs to housekeeping, which runs a command or a systemd unit + and releases resources to the scheduler on completion. See configuration + details in the :ref:`housekeeping` section. + + Note: The housekeeping script runs as the instance owner (e.g. "flux"). + On a real system, "command" is configured to "imp run housekeeping", + and the IMP is configured to launch the flux-housekeeping systemd + service as root. (See :man5:`flux-config-security-imp` for details + on configuring :command:`flux imp run`). + .. _plugin_directive: @@ -50,6 +64,28 @@ conf (optional) An object, valid with ``load`` only, that defines a configuration table to pass to the loaded plugin. +.. _housekeeping: + +HOUSEKEEPING +============ + +command + (optional) An array of strings specifying the housekeeping command. Either + ``command`` or ``use-systemd-unit`` must be specified. + +use-systemd-unit + (optional) A boolean value indicating whether to run the flux-housekeeping + systemd unit to handle housekeeping, rather than a specific command. + Either ``use-systemd-unit`` or ``command`` must be specified. + +release-after + (optional) A string specified in Flux Standard Duration (FSD). If unset, + resources for a given job are not released until all execution targets for + a given job have completed housekeeping. If set to ``0``, resources are + released as each target completes. Otherwise, a timer is started when the + first execution target for a given job completes, and all resources that + have completed housekeeping when the timer fires are released. Following + that, resources are released as each execution target completes. EXAMPLE ======= @@ -73,6 +109,15 @@ EXAMPLE } ] + [job-manager.housekeeping] + use-systemd-unit = true + release-after = "1m" + + +EXPERIMENTAL +============ + +.. include:: common/experimental.rst RESOURCES ========= From 52f18aa73183f7cc60bfda1dda74bb66d6b35abb Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 1 Jul 2024 07:33:02 -0700 Subject: [PATCH 11/12] man: create include file for EXPERIMENTAL Problem: the text to explain expectations for experimental features must be repeated in documentation for each one which is extra work for both the author and the reader. Add common/experimental.rst which can be included from man pages that discuss experimental features. Like common/resource.rst, the idea is to make it a standalone section which can then be referenced from the feature documentation. This should improve consistency and make it easier to document these features and interfaces. Fixes #6066 --- doc/Makefile.am | 10 +++++++++- doc/man1/common/experimental.rst | 4 ++++ doc/man3/common/experimental.rst | 4 ++++ doc/man5/common/experimental.rst | 4 ++++ doc/man7/common/experimental.rst | 4 ++++ 5 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 doc/man1/common/experimental.rst create mode 100644 doc/man3/common/experimental.rst create mode 100644 doc/man5/common/experimental.rst create mode 100644 doc/man7/common/experimental.rst diff --git a/doc/Makefile.am b/doc/Makefile.am index f7cb422d601a..7faa80a8ab10 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -384,6 +384,7 @@ if ENABLE_DOCS man_MANS = $(MAN1_FILES) $(MAN3_FILES) $(MAN5_FILES) $(MAN7_FILES) $(RST_FILES): \ man1/common/resources.rst \ + man1/common/experimental.rst \ man1/common/job-param-additional.rst \ man1/common/job-param-batch.rst \ man1/common/job-param-common.rst \ @@ -401,10 +402,13 @@ $(RST_FILES): \ man1/common/job-other-run.rst \ man1/common/job-shell-options.rst \ man3/common/resources.rst \ + man3/common/experimental.rst \ man3/common/json_pack.rst \ man3/common/json_unpack.rst \ man5/common/resources.rst \ - man7/common/resources.rst + man5/common/experimental.rst \ + man7/common/resources.rst \ + man7/common/experimental.rst endif SUFFIXES = .rst .1 .3 .5 .7 @@ -473,6 +477,7 @@ EXTRA_DIST = \ $(RST_FILES) \ man1/index.rst \ man1/common/resources.rst \ + man1/common/experimental.rst \ man1/common/job-param-additional.rst \ man1/common/job-param-batch.rst \ man1/common/job-param-common.rst \ @@ -490,13 +495,16 @@ EXTRA_DIST = \ man1/common/job-other-run.rst \ man1/common/job-shell-options.rst \ man3/common/resources.rst \ + man3/common/experimental.rst \ man3/index.rst \ man3/common/json_pack.rst \ man3/common/json_unpack.rst \ man5/common/resources.rst \ + man5/common/experimental.rst \ man5/index.rst \ man7/index.rst \ man7/common/resources.rst \ + man7/common/experimental.rst \ man7/flux-undocumented.rst CLEANFILES = \ diff --git a/doc/man1/common/experimental.rst b/doc/man1/common/experimental.rst new file mode 100644 index 000000000000..00be899898a2 --- /dev/null +++ b/doc/man1/common/experimental.rst @@ -0,0 +1,4 @@ +Experimental Flux features and interfaces are made available for evaluation +only and may change or be removed without notice. + +Feedback is welcome. Please use the flux-core project Github issue tracker. diff --git a/doc/man3/common/experimental.rst b/doc/man3/common/experimental.rst new file mode 100644 index 000000000000..00be899898a2 --- /dev/null +++ b/doc/man3/common/experimental.rst @@ -0,0 +1,4 @@ +Experimental Flux features and interfaces are made available for evaluation +only and may change or be removed without notice. + +Feedback is welcome. Please use the flux-core project Github issue tracker. diff --git a/doc/man5/common/experimental.rst b/doc/man5/common/experimental.rst new file mode 100644 index 000000000000..00be899898a2 --- /dev/null +++ b/doc/man5/common/experimental.rst @@ -0,0 +1,4 @@ +Experimental Flux features and interfaces are made available for evaluation +only and may change or be removed without notice. + +Feedback is welcome. Please use the flux-core project Github issue tracker. diff --git a/doc/man7/common/experimental.rst b/doc/man7/common/experimental.rst new file mode 100644 index 000000000000..00be899898a2 --- /dev/null +++ b/doc/man7/common/experimental.rst @@ -0,0 +1,4 @@ +Experimental Flux features and interfaces are made available for evaluation +only and may change or be removed without notice. + +Feedback is welcome. Please use the flux-core project Github issue tracker. From d82c35604f794408d73b2171246e1c9ea5e610ac Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 1 Jul 2024 07:56:49 -0700 Subject: [PATCH 12/12] man: add github issue tracker to resources Problem: the EXPERIMENTAL section refers to the project issue tracker but man pages do not contain the URL. Add it. --- doc/man1/common/resources.rst | 2 ++ doc/man3/common/resources.rst | 2 ++ doc/man5/common/resources.rst | 2 ++ doc/man7/common/resources.rst | 2 ++ 4 files changed, 8 insertions(+) diff --git a/doc/man1/common/resources.rst b/doc/man1/common/resources.rst index 79cf95706790..dcc16e8954b0 100644 --- a/doc/man1/common/resources.rst +++ b/doc/man1/common/resources.rst @@ -1,3 +1,5 @@ Flux: http://flux-framework.org Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc + +Issue Tracker: https://github.com/flux-framework/flux-core/issues diff --git a/doc/man3/common/resources.rst b/doc/man3/common/resources.rst index 79cf95706790..dcc16e8954b0 100644 --- a/doc/man3/common/resources.rst +++ b/doc/man3/common/resources.rst @@ -1,3 +1,5 @@ Flux: http://flux-framework.org Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc + +Issue Tracker: https://github.com/flux-framework/flux-core/issues diff --git a/doc/man5/common/resources.rst b/doc/man5/common/resources.rst index 79cf95706790..dcc16e8954b0 100644 --- a/doc/man5/common/resources.rst +++ b/doc/man5/common/resources.rst @@ -1,3 +1,5 @@ Flux: http://flux-framework.org Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc + +Issue Tracker: https://github.com/flux-framework/flux-core/issues diff --git a/doc/man7/common/resources.rst b/doc/man7/common/resources.rst index 79cf95706790..dcc16e8954b0 100644 --- a/doc/man7/common/resources.rst +++ b/doc/man7/common/resources.rst @@ -1,3 +1,5 @@ Flux: http://flux-framework.org Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc + +Issue Tracker: https://github.com/flux-framework/flux-core/issues