From e73ccfca08da5a073ac4d3c5419497434a568665 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 21 Mar 2024 10:39:57 -0700 Subject: [PATCH] job-manager: add housekeeping subsystem Problem: jobs get stuck in CLEANUP state while long epilog scripts run, causing sadness and idling resources. Introduce a new type of epilog script called "housekeeping" that runs after the job. Instead of freeing resources directly to the scheduler, jobs free resources to housekeeping, post their free event, and may reach INACTIVE state. Meanwhile, housekeeping can run a script on the allocated resources and return the resources to the scheduler when complete. The resources are still allocated to the job as far as the scheduler is concerned while housekeeping runs. However since the job has transitioned to INACTIVE, the flux-accounting plugin will decrement the running job count for the user and stop billing the user for the resources. 'flux resource list' utility shows the resources as allocated. By default, resources are released to the scheduler only after all ranks complete housekeeping, as before. However, if configured, resources can be freed to the scheduler immediately as they complete housekeeping on each execution target, or a timer can be started on completion of the first target, and when the timer expires, all the targets that have completed thus far are freed in one go. Following that, resources are freed to the scheduler immediately as they complete. This works with sched-simple without changes, with the exception that the hello protocol does not currently support partial release so, as noted in the code, housekeeping and a new job could overlap when the scheduler is reloaded on a live system. Some RFC 27 work is needed to resolve ths. The Fluxion scheduler does not currently support partial release (flux-framework/flux-sched#1151). But as discussed over there, the combination of receiving an R fragment and a jobid in the free request should be sufficient to get that working. --- src/modules/Makefile.am | 1 + src/modules/job-manager/Makefile.am | 3 + src/modules/job-manager/alloc.c | 24 +- src/modules/job-manager/alloc.h | 6 +- src/modules/job-manager/event.c | 12 +- src/modules/job-manager/housekeeping.c | 860 +++++++++++++++++++++++++ src/modules/job-manager/housekeeping.h | 45 ++ src/modules/job-manager/job-manager.c | 17 +- src/modules/job-manager/job-manager.h | 1 + 9 files changed, 944 insertions(+), 25 deletions(-) create mode 100644 src/modules/job-manager/housekeeping.c create mode 100644 src/modules/job-manager/housekeeping.h diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index eabed0b50a9e..e9b8829fc64f 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -219,6 +219,7 @@ job_manager_la_SOURCES = job_manager_la_LIBADD = \ $(builddir)/job-manager/libjob-manager.la \ $(top_builddir)/src/common/libjob/libjob.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-optparse.la \ diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 535a47e1e9a0..7e8f4e85ecd8 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -44,6 +44,8 @@ libjob_manager_la_SOURCES = \ kill.c \ alloc.h \ alloc.c \ + housekeeping.h \ + housekeeping.c \ start.h \ start.c \ list.h \ @@ -126,6 +128,7 @@ TESTS = \ test_ldadd = \ libjob-manager.la \ $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libflux-core.la \ diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 8c7170fcb854..58cbe0c12706 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -36,6 +36,7 @@ #include "annotate.h" #include "raise.h" #include "queue.h" +#include "housekeeping.h" struct alloc { struct job_manager *ctx; @@ -99,8 +100,7 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum) } } -/* Send sched.free request for job. - * Update flags. +/* Send sched.free request. */ int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R) { @@ -343,6 +343,8 @@ static void hello_cb (flux_t *h, } job = zhashx_next (ctx->active_jobs); } + if (housekeeping_hello_respond (ctx->housekeeping, msg) < 0) + goto error; if (flux_respond_error (h, msg, ENODATA, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); return; @@ -504,20 +506,11 @@ static void check_cb (flux_reactor_t *r, NULL); } -/* called from event_job_action() FLUX_JOB_STATE_CLEANUP */ -int alloc_send_free_request (struct alloc *alloc, struct job *job) +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id) { - if (job->state != FLUX_JOB_STATE_CLEANUP) - return -1; if (alloc->scheduler_is_online) { - if (free_request (alloc, job->id, job->R_redacted) < 0) + if (free_request (alloc, id, R) < 0) return -1; - if ((job->flags & FLUX_JOB_DEBUG)) - (void)event_job_post_pack (alloc->ctx->event, - job, - "debug.free-request", - 0, - NULL); } return 0; } @@ -691,7 +684,8 @@ static void resource_status_cb (flux_t *h, } job = zhashx_first (alloc->ctx->active_jobs); while (job) { - if (job->has_resources && job->R_redacted && !job->alloc_bypass) { + if ((job->has_resources && !job->free_posted) + && job->R_redacted && !job->alloc_bypass) { struct rlist *rl2; json_error_t jerror; @@ -711,6 +705,8 @@ static void resource_status_cb (flux_t *h, } job = zhashx_next (alloc->ctx->active_jobs); } + if (housekeeping_stat_append (ctx->housekeeping, rl, &error) < 0) + goto error; if (!(R = rlist_to_R (rl))) { errprintf (&error, "error converting rlist to JSON"); goto error; diff --git a/src/modules/job-manager/alloc.h b/src/modules/job-manager/alloc.h index 8a086c04cb98..ae35044e69ad 100644 --- a/src/modules/job-manager/alloc.h +++ b/src/modules/job-manager/alloc.h @@ -11,6 +11,7 @@ #ifndef _FLUX_JOB_MANAGER_ALLOC_H #define _FLUX_JOB_MANAGER_ALLOC_H +#include #include #include "job.h" @@ -47,10 +48,9 @@ int alloc_queue_count (struct alloc *alloc); */ int alloc_pending_count (struct alloc *alloc); -/* Call from CLEANUP state to release resources. - * This function is a no-op if job->free_pending is set. +/* Release resources back to the scheduler. */ -int alloc_send_free_request (struct alloc *alloc, struct job *job); +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id); /* List pending jobs */ diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index a0ba2c4dba7f..dbdda81863db 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -48,6 +48,7 @@ #include "ccan/str/str.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "drain.h" #include "journal.h" @@ -320,17 +321,18 @@ int event_job_action (struct event *event, struct job *job) /* N.B. start_pending indicates that the start request is still * expecting responses. The final response is the 'release' * response with final=true. Thus once the flag is clear, - * it is safe to release all resources to the scheduler. + * it is safe for the job to release its resources to housekeeping. */ if (job->has_resources && !job_event_is_queued (job, "epilog-start") && !job->perilog_active && !job->start_pending && !job->free_posted) { - if (!job->alloc_bypass) { - if (alloc_send_free_request (ctx->alloc, job) < 0) - return -1; - } + if (housekeeping_start (ctx->housekeeping, + job->R_redacted, + job->id, + job->userid) < 0) + return -1; if (event_job_post_pack (ctx->event, job, "free", 0, NULL) < 0) return -1; job->free_posted = 1; diff --git a/src/modules/job-manager/housekeeping.c b/src/modules/job-manager/housekeeping.c new file mode 100644 index 000000000000..ef1a248a4bdf --- /dev/null +++ b/src/modules/job-manager/housekeeping.c @@ -0,0 +1,860 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* housekeeping - clean resources prior to release to the scheduler + * + * Purpose: + * Resources are released by jobs to housekeeping. Housekeeping runs + * an epilog-like script, then releases resources to the scheduler. + * Unlike the job manager epilog, housekeeping runs after the job, which + * is allowed to exit CLEANUP when resources are handed over to housekeeping. + * The scheduler still thinks resources are allocated to the job. + * + * Configuration: + * [job-manager.housekeeping] + * #command = ["command", "arg1", "arg2", ...] + * use-systemd-unit = true + * release-after = "FSD" + * + * Partial release: + * The 'release-after' config key enables partial release of resources. + * - If unset, resources for a given job are not released until all exec + * targets have completed housekeeping. + * - If set to "0", resources are released as each exec target completes. + * - If set to a nozero duration, a timer starts when the first exec target + * for a given job completes. When the timer expires, resources for all + * the completed exec targets are released. Following that, resources + * are released as each target completes. + * + * Script credentials: + * The housekeeping script runs as the instance owner (e.g. "flux"). + * On a real system, "command" is configured to "imp run housekeeping", + * and the IMP is configured to launch the flux-housekeeping systemd + * service as root. + * + * Script environment: + * FLUX_JOB_ID - the job whose resources are running housekeeping + * FLUX_JOB_USERID - the UID of the job's owner + * FLUX_URI - the URI of the local flux broker + * The IMP must be configured to explicitly allow FLUX_* to pass through. + * + * Script error handling: + * If housekeeping fails on a node or set of nodes, this is logged to + * the flux circular buffer at LOG_ERR. + * Stdout is logged at LOG_INFO and stderr at LOG_ERR. + * + * Error handling under systemd: + * When using systemd, any output is captured by the systemd journal on + * the remote node, accessed with 'journalctl -u flux-housekeeping@*'. + * + * If the housekeeping script fails, the systemd unit file automatically + * drains the node. + * + * Core scheduled instances: + * Note that housekeeping runs after every job even if the job did not + * allocate the whole node. + * + * Job manager module stats: + * 'flux module stats job-manager | jq .housekeeping' returns the following: + * {"running":o} + * "running" is a dictionary of jobids (f58) for jobs currently + * running housekeeping. Each job object consists of: + * {"pending":s "allocated":s, "t_start":f} + * where + * pending: set of ranks on which housekeeping is needed/active + * allocated: set of ranks still allocated by housekeeping + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#ifdef HAVE_ARGZ_ADD +#include +#else +#include "src/common/libmissing/argz.h" +#endif + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/librlist/rlist.h" +#include "src/common/libhostlist/hostlist.h" +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/errprintf.h" +#include "src/common/libjob/idf58.h" +#include "src/common/libsubprocess/bulk-exec.h" +#include "src/common/libsubprocess/command.h" +#include "ccan/str/str.h" + +#include "job.h" +#include "alloc.h" +#include "job-manager.h" +#include "conf.h" + +#include "housekeeping.h" + +// -1 = never, 0 = immediate, >0 = time in seconds +static const double default_release_after = -1; + +struct allocation { + flux_jobid_t id; + struct rlist *rl; // R, diminished each time a subset is released + struct idset *pending; // ranks in need of housekeeping + struct housekeeping *hk; + flux_watcher_t *timer; + bool timer_armed; + bool timer_expired; + int free_count; // number of releases + double t_start; + struct bulk_exec *bulk_exec; +}; + +struct housekeeping { + struct job_manager *ctx; + flux_cmd_t *cmd; // NULL if not configured + double release_after; + char *imp_path; + zlistx_t *allocations; + flux_msg_handler_t **handlers; +}; + +static struct bulk_exec_ops bulk_ops; + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg); + +static void allocation_destroy (struct allocation *a) +{ + if (a) { + int saved_errno = errno; + rlist_destroy (a->rl); + idset_destroy (a->pending); + flux_watcher_destroy (a->timer); + bulk_exec_destroy (a->bulk_exec); + free (a); + errno = saved_errno; + } +} + +// zlistx_destructor_fn footprint +static void allocation_destructor (void **item) +{ + if (item) { + allocation_destroy (*item); + *item = NULL; + } + +} + +static int update_cmd_env (flux_cmd_t *cmd, flux_jobid_t id, uint32_t userid) +{ + if (flux_cmd_setenvf (cmd, 1, "FLUX_JOB_ID", "%ju", (uintmax_t)id) < 0 + || flux_cmd_setenvf (cmd, 1, "FLUX_JOB_USERID", "%u", userid) < 0) + return -1; + return 0; +} + +static struct allocation *allocation_create (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid) +{ + struct allocation *a; + flux_reactor_t *r = flux_get_reactor (hk->ctx->h); + + if (!(a = calloc (1, sizeof (*a)))) + return NULL; + a->hk = hk; + a->id = id; + a->t_start = flux_reactor_now (flux_get_reactor (hk->ctx->h)); + if (!(a->rl = rlist_from_json (R, NULL)) + || !(a->pending = rlist_ranks (a->rl)) + || !(a->timer = flux_timer_watcher_create (r, + 0, + 0., + allocation_timeout, + a)) + || !(a->bulk_exec = bulk_exec_create (&bulk_ops, + "rexec", + id, + "housekeeping", + a)) + || update_cmd_env (hk->cmd, id, userid) < 0 + || bulk_exec_push_cmd (a->bulk_exec, a->pending, hk->cmd, 0) < 0) { + allocation_destroy (a); + return NULL; + } + return a; +} + +/* Return the set of ranks in the remaining resource set (a->rl) which are + * not still pending housekeeping (a->pending). That is: + * + * ranks (a->rl) -= a->pending + * + */ +static struct idset *get_housekept_ranks (struct allocation *a) +{ + struct idset *ranks; + + if (!(ranks = rlist_ranks (a->rl))) + goto error; + if (idset_subtract (ranks, a->pending) < 0) + goto error; + return ranks; +error: + idset_destroy (ranks); + return NULL; +} + +/* Release any resources in a->rl associated with ranks that are no longer + * pending for housekeeping. Then remove them from a->rl. + */ +static void allocation_release (struct allocation *a) +{ + struct job_manager *ctx = a->hk->ctx; + struct idset *ranks = NULL; + struct rlist *rl = NULL; + json_t *R = NULL; + + if ((ranks = get_housekept_ranks (a)) && idset_count (ranks) == 0) { + idset_destroy (ranks); + return; // nothing to do + } + + if (!ranks + || !(rl = rlist_copy_ranks (a->rl, ranks)) + || !(R = rlist_to_R (rl)) + || alloc_send_free_request (ctx->alloc, R, a->id) < 0 + || rlist_remove_ranks (a->rl, ranks) < 0) { + char *s = idset_encode (ranks, IDSET_FLAG_RANGE); + flux_log (ctx->h, + LOG_ERR, + "housekeeping error releasing resources for job %s ranks %s", + idf58 (a->id), + s ? s : "NULL"); + free (s); + } + else + a->free_count++; + json_decref (R); + rlist_destroy (rl); + idset_destroy (ranks); +} + +static void allocation_remove (struct allocation *a) +{ + void *cursor; + flux_log (a->hk->ctx->h, + LOG_DEBUG, + "housekeeping: all resources of %s have been released", + idf58 (a->id)); + if (!(cursor = zlistx_find (a->hk->allocations, a))) { + flux_log (a->hk->ctx->h, + LOG_ERR, + "housekeeping: internal error removing allocation for %s", + idf58 (a->id)); + return; + } + zlistx_delete (a->hk->allocations, cursor); +} + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + struct allocation *a = arg; + + a->timer_expired = true; + + // release the ranks that have completed housekeeping so far + allocation_release (a); + + /* Note: All resources will never be released under the timeout + * because completion of housekeeping on the final rank will + * always release all resources immediately instead of waiting + * for the timer. Therefore, there is no need to check if + * rlist_rnodes (a->rl) is zero here (it never will be). + */ +} + +/* 'rank' has completed housekeeping. + */ +static bool housekeeping_finish_one (struct allocation *a, int rank) +{ + if (!idset_test (a->pending, rank)) + return false; + idset_clear (a->pending, rank); + + if (idset_count (a->pending) == 0 + || a->hk->release_after == 0 + || a->timer_expired) { + allocation_release (a); + } + if (!a->timer_armed && a->hk->release_after > 0) { + flux_timer_watcher_reset (a->timer, a->hk->release_after, 0.); + flux_watcher_start (a->timer); + a->timer_armed = true; + } + return true; +} + +static void bulk_start (struct bulk_exec *bulk_exec, void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + + flux_log (h, LOG_DEBUG, "housekeeping: %s started", idf58 (a->id)); +} + +static void set_failed_reason (const char **s, const char *reason) +{ + if (!*s) + *s = reason; + else if (!streq (*s, reason)) + *s = "multiple failure modes"; +} + +static void bulk_exit (struct bulk_exec *bulk_exec, + void *arg, + const struct idset *ids) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + unsigned int rank; + struct idset *failed_ranks = NULL; + char *failed_ranks_str = NULL; + char *failed_hosts = NULL; + const char *failed_reason = NULL; + + rank = idset_first (ids); + while (rank != IDSET_INVALID_ID) { + if (housekeeping_finish_one (a, rank)) { + flux_subprocess_t *p = bulk_exec_get_subprocess (bulk_exec, rank); + bool fail = false; + int n; + if ((n = flux_subprocess_signaled (p)) > 0) { + fail = true; + set_failed_reason (&failed_reason, strsignal (n)); + } + else { + n = flux_subprocess_exit_code (p); + if (n != 0) { + fail = true; + set_failed_reason (&failed_reason, "nonzero exit code"); + } + } + if (fail) { + if (!failed_ranks) + failed_ranks = idset_create (0, IDSET_FLAG_AUTOGROW); + idset_set (failed_ranks, rank); + } + } + rank = idset_next (ids, rank); + } + // log a consolidated error message for potentially multiple ranks + if (failed_ranks + && (failed_ranks_str = idset_encode (failed_ranks, IDSET_FLAG_RANGE)) + && (failed_hosts = flux_hostmap_lookup (h, failed_ranks_str, NULL)) + && failed_reason) { + flux_log (h, + LOG_ERR, + "housekeeping: %s (rank %s) %s: %s", + failed_hosts, + failed_ranks_str, + idf58 (a->id), + failed_reason); + + } + idset_destroy (failed_ranks); + free (failed_ranks_str); + free (failed_hosts); +} + +static void bulk_complete (struct bulk_exec *bulk_exec, void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + + flux_log (h, LOG_DEBUG, "housekeeping: %s complete", idf58 (a->id)); + allocation_remove (a); +} + +static void bulk_output (struct bulk_exec *bulk_exec, + flux_subprocess_t *p, + const char *stream, + const char *data, + int data_len, + void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + int rank = flux_subprocess_rank (p); + + flux_log (h, + streq (stream, "stderr") ? LOG_ERR : LOG_INFO, + "housekeeping: %s (rank %d) %s: %.*s", + flux_get_hostbyrank (h, rank), + rank, + idf58 (a->id), + data_len, + data); +} + +static void bulk_error (struct bulk_exec *bulk_exec, + flux_subprocess_t *p, + void *arg) +{ + struct allocation *a = arg; + flux_t *h = a->hk->ctx->h; + int rank = flux_subprocess_rank (p); + const char *hostname = flux_get_hostbyrank (h, rank); + const char *error = flux_subprocess_fail_error (p); + + flux_log (h, + LOG_ERR, + "housekeeping: %s (rank %d) %s: %s", + hostname, + rank, + idf58 (a->id), + error); + + housekeeping_finish_one (a, rank); +} + +int housekeeping_start (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid) +{ + flux_t *h = hk->ctx->h; + struct allocation *a; + void *list_handle; + + /* Housekeeping is not configured + */ + if (!hk->cmd) + goto skip; + + /* Create the 'allocation' and put it in our list. + */ + if (!(a = allocation_create (hk, R, id, userid)) + || !(list_handle = zlistx_insert (hk->allocations, a, false))) { + flux_log (h, + LOG_ERR, + "housekeeping: %s error saving alloc object (skipping)", + idf58 (id)); + allocation_destroy (a); + goto skip; + } + /* Start bulk execution. + */ + if (bulk_exec_start (h, a->bulk_exec) < 0) { + flux_log (h, + LOG_ERR, + "housekeeping: %s error starting housekeeping tasks", + idf58 (id)); + zlistx_delete (hk->allocations, list_handle); + goto skip; + } + return 0; +skip: + return alloc_send_free_request (hk->ctx->alloc, R, id); +} + +static int housekeeping_hello_respond_one (struct housekeeping *hk, + const flux_msg_t *msg, + struct allocation *a, + flux_error_t *error) +{ + struct job *job; + + if (a->free_count > 0) { + errprintf (error, "partial release is not supported by RFC 27 hello"); + goto error; + } + if (!(job = zhashx_lookup (hk->ctx->inactive_jobs, &a->id)) + && !(job = zhashx_lookup (hk->ctx->active_jobs, &a->id))) { + errprintf (error, "the job could not be looked up during RFC 27 hello"); + goto error; + } + if (flux_respond_pack (hk->ctx->h, + msg, + "{s:I s:I s:I s:f}", + "id", job->id, + "priority", job->priority, + "userid", (json_int_t)job->userid, + "t_submit", job->t_submit) < 0) { + errprintf (error, + "the RFC 27 hello response could not be sent: %s", + strerror (errno)); + goto error; + } + return 0; +error: + return -1; +} + +static void kill_continuation (flux_future_t *f, void *arg) +{ + struct housekeeping *hk = arg; + + if (flux_future_get (f, NULL) < 0) + flux_log (hk->ctx->h, LOG_ERR, "kill: %s", future_strerror (f, errno)); + flux_future_destroy (f); +} + +/* Participate in the scheduler hello protocol, where the scheduler is informed + * of resources that are already allocated. Since partial release is not yet + * supported in the hello protocol, for now, we must let go of any partial + * allocations. Send remaining housekeeping tasks a SIGTERM, log an error, + * and delete the allocation. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg) +{ + struct allocation *a; + flux_error_t error; + + a = zlistx_first (hk->allocations); + while (a) { + if (housekeeping_hello_respond_one (hk, msg, a, &error) < 0) { + char *ranks; + char *hosts = NULL; + flux_future_t *f; + + if ((ranks = idset_encode (a->pending, IDSET_FLAG_RANGE))) + hosts = flux_hostmap_lookup (hk->ctx->h, ranks, NULL); + flux_log (hk->ctx->h, + LOG_ERR, + "housekeeping: %s (rank %s) from %s will be terminated" + " because %s", + hosts ? hosts : "?", + ranks ? ranks : "?", + idf58 (a->id), + error.text); + free (hosts); + free (ranks); + + if (hk->imp_path) { + f = bulk_exec_imp_kill (a->bulk_exec, + hk->imp_path, + NULL, + SIGTERM); + } + else + f = bulk_exec_kill (a->bulk_exec, NULL, SIGTERM); + if (flux_future_then (f, -1, kill_continuation, hk) < 0) + flux_future_destroy (f); + + // delete the allocation to avoid sending frees later + zlistx_delete (hk->allocations, zlistx_cursor (hk->allocations)); + } + a = zlistx_next (hk->allocations); + } + return 0; +} + +static json_t *housekeeping_get_stats_job (struct allocation *a) +{ + struct idset *ranks; + char *s = NULL; + char *p = NULL; + json_t *job = NULL; + + if (!(ranks = rlist_ranks (a->rl)) + || !(p = idset_encode (ranks, IDSET_FLAG_RANGE)) + || !(s = idset_encode (a->pending, IDSET_FLAG_RANGE))) + goto out; + job = json_pack ("{s:f s:s s:s}", + "t_start", a->t_start, + "pending", s, + "allocated", p); +out: + idset_destroy (ranks); + free (s); + free (p); + return job; +} + +/* Support adding a housekeeping object to the the 'job-manager.stats-get' + * response in job-manager.c. + */ +json_t *housekeeping_get_stats (struct housekeeping *hk) +{ + json_t *running; + json_t *stats = NULL; + struct allocation *a; + + if (!(running = json_object ())) + goto nomem; + a = zlistx_first (hk->allocations); + while (a) { + json_t *job; + if (!(job = housekeeping_get_stats_job (a)) + || json_object_set_new (running, idf58 (a->id), job) < 0) { + json_decref (job); + goto nomem; + } + a = zlistx_next (hk->allocations); + } + if (!(stats = json_pack ("{s:O}", "running", running))) + goto nomem; + json_decref (running); + return stats; +nomem: + json_decref (running); + errno = ENOMEM; + return NULL; +} + +/* Support accounting for resources stuck in housekeeping when preparing the + * 'job-manager.resource-status' response in alloc.c. + */ +int housekeeping_stat_append (struct housekeeping *hk, + struct rlist *rl, + flux_error_t *error) +{ + struct allocation *a; + a = zlistx_first (hk->allocations); + while (a) { + if (rlist_append (rl, a->rl) < 0) { + errprintf (error, + "%s: duplicate housekeeping allocation", + idf58 (a->id)); + return -1; + } + a = zlistx_next (hk->allocations); + } + return 0; +} + +static void housekeeping_kill_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct housekeeping *hk = arg; + int signum; + flux_jobid_t jobid = FLUX_JOBID_ANY; + const char *ranks = NULL; + struct idset *ids = NULL; + idset_error_t error; + const char *errmsg = NULL; + struct allocation *a; + flux_future_t *f; + + if (flux_request_unpack (msg, + NULL, + "{s:i s?I s?s}", + "signum", &signum, + "jobid", &jobid, + "ranks", &ranks) < 0) + goto error; + if (ranks) { + if (!(ids = idset_decode_ex (ranks, -1, -1, 0, &error))) { + errmsg = error.text; + goto error; + } + } + a = zlistx_first (hk->allocations); + while (a) { + if (a->id == jobid || jobid == FLUX_JOBID_ANY) { + if (a->bulk_exec) { + if (hk->imp_path) { + f = bulk_exec_imp_kill (a->bulk_exec, + hk->imp_path, + ids, + signum); + } + else + f = bulk_exec_kill (a->bulk_exec, ids, signum); + if (flux_future_then (f, -1, kill_continuation, hk) < 0) + flux_future_destroy (f); + } + } + a = zlistx_next (hk->allocations); + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to housekeeping-kill"); + idset_destroy (ids); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "error responding to housekeeping-kill"); + idset_destroy (ids); +} + +static flux_cmd_t *create_cmd (json_t *cmdline) +{ + size_t index; + json_t *value; + char *argz = NULL; + size_t argz_len = 0; + int argc; + char **argv = NULL; + flux_cmd_t *cmd = NULL; + + json_array_foreach (cmdline, index, value) { + if (!json_is_string (value) + || argz_add (&argz, &argz_len, json_string_value (value)) != 0) + goto done; + } + if ((argc = argz_count (argz, argz_len)) == 0 + || !(argv = calloc (argc + 1, sizeof (argv[0])))) + goto done; + argz_extract (argz, argz_len, argv); + if (!(cmd = flux_cmd_create (argc, argv, environ))) + goto done; +done: + free (argz); + free (argv); + return cmd; +} + +static int housekeeping_parse_config (const flux_conf_t *conf, + flux_error_t *error, + void *arg) +{ + struct housekeeping *hk = arg; + flux_error_t e; + json_t *cmdline = NULL; + const char *release_after = NULL; + flux_cmd_t *cmd = NULL; + int use_systemd_unit = false; + const char *imp_path = NULL; + char *imp_path_cpy = NULL; + + if (flux_conf_unpack (conf, + &e, + "{s?{s?{s?o s?s s?b !}}}", + "job-manager", + "housekeeping", + "command", &cmdline, + "release-after", &release_after, + "use-systemd-unit", &use_systemd_unit) < 0) { + return errprintf (error, + "job-manager.housekeeping.command: %s", + e.text); + } + + // let job-exec handle exec errors + (void)flux_conf_unpack (conf, NULL, "{s?{s?s}}", "exec", "imp", &imp_path); + + if (release_after) { + if (fsd_parse_duration (release_after, &hk->release_after) < 0) + return errprintf (error, + "job-manager.housekeeping.release-after" + " FSD parse error"); + } + if (use_systemd_unit) { + if (!imp_path) { + return errprintf (error, + "job-manager.housekeeeping.use-systemd-unit " + " requires that exec.imp also be defined"); + } + if (cmdline) { + return errprintf (error, + "job-manager.housekeeeping.use-systemd-unit " + " means housekeeping.command must not be defined"); + } + json_t *o; + if ((o = json_pack ("[sss]", imp_path, "run", "housekeeping"))) + cmd = create_cmd (o); + json_decref (o); + if (!cmd) + return errprintf (error, "error creating housekeeping command"); + if (!(imp_path_cpy = strdup (imp_path))) { + flux_cmd_destroy (cmd); + return errprintf (error, "error duplicating IMP path"); + } + } + else if (cmdline) { + if (!(cmd = create_cmd (cmdline))) + return errprintf (error, "error creating housekeeping command"); + } + flux_cmd_destroy (hk->cmd); + hk->cmd = cmd; + free (hk->imp_path); + hk->imp_path = imp_path_cpy; + flux_log (hk->ctx->h, + LOG_DEBUG, + "housekeeping is %sconfigured%s", + hk->cmd ? "" : "not ", + hk->imp_path ? " with IMP" : ""); + return 1; // allow dynamic changes +} + +static const struct flux_msg_handler_spec htab[] = { + { + .typemask = FLUX_MSGTYPE_REQUEST, + .topic_glob = "job-manager.housekeeping-kill", + .cb = housekeeping_kill_cb, + .rolemask = 0 + }, + FLUX_MSGHANDLER_TABLE_END, +}; + +void housekeeping_ctx_destroy (struct housekeeping *hk) +{ + if (hk) { + int saved_errno = errno; + conf_unregister_callback (hk->ctx->conf, housekeeping_parse_config); + flux_cmd_destroy (hk->cmd); + zlistx_destroy (&hk->allocations); + flux_msg_handler_delvec (hk->handlers); + free (hk->imp_path); + free (hk); + errno = saved_errno; + } +} + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx) +{ + struct housekeeping *hk; + flux_error_t error; + + if (!(hk = calloc (1, sizeof (*hk)))) + return NULL; + hk->ctx = ctx; + hk->release_after = default_release_after; + if (!(hk->allocations = zlistx_new ())) { + errno = ENOMEM; + goto error; + } + zlistx_set_destructor (hk->allocations, allocation_destructor); + if (conf_register_callback (ctx->conf, + &error, + housekeeping_parse_config, + hk) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + goto error; + } + if (flux_msg_handler_addvec (ctx->h, htab, hk, &hk->handlers) < 0) + goto error; + return hk; +error: + housekeeping_ctx_destroy (hk); + return NULL; +} + +static struct bulk_exec_ops bulk_ops = { + .on_start = bulk_start, + .on_exit = bulk_exit, + .on_complete = bulk_complete, + .on_output = bulk_output, + .on_error = bulk_error, +}; + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/housekeeping.h b/src/modules/job-manager/housekeeping.h new file mode 100644 index 000000000000..a468d3967632 --- /dev/null +++ b/src/modules/job-manager/housekeeping.h @@ -0,0 +1,45 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_HOUSEKEEPING_H +#define _FLUX_JOB_MANAGER_HOUSEKEEPING_H + +#include +#include "src/common/librlist/rlist.h" +#include "job-manager.h" + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx); +void housekeeping_ctx_destroy (struct housekeeping *hk); + +/* Call this to transfer a job's R to the housekeeping subsystem. The job + * may treat R as freed, but R will remain allocated from the scheduler's + * perspective until the housekeeping script is run on each execution target. + */ +int housekeeping_start (struct housekeeping *hk, + json_t *R, + flux_jobid_t id, + uint32_t userid); + +/* Call this to add responses to the scheduler's hello request at startup. + * It should inform the scheduler about resources that are still allocated, + * but no longer directly held by jobs. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg); + +json_t *housekeeping_get_stats (struct housekeeping *hk); + +int housekeeping_stat_append (struct housekeeping *hk, + struct rlist *rl, + flux_error_t *error); + + +#endif /* ! _FLUX_JOB_MANAGER_HOUSEKEEPING_H */ + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index f3ff97ec49a1..7c991740880f 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -27,6 +27,7 @@ #include "list.h" #include "urgency.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "event.h" #include "drain.h" @@ -79,22 +80,27 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh, { struct job_manager *ctx = arg; int journal_listeners = journal_listeners_count (ctx->journal); + json_t *housekeeping = housekeeping_get_stats (ctx->housekeeping); + if (!housekeeping) + goto error; if (flux_respond_pack (h, msg, - "{s:{s:i} s:i s:i s:I}", + "{s:{s:i} s:i s:i s:I s:O}", "journal", "listeners", journal_listeners, "active_jobs", zhashx_size (ctx->active_jobs), "inactive_jobs", zhashx_size (ctx->inactive_jobs), - "max_jobid", ctx->max_jobid) < 0) { + "max_jobid", ctx->max_jobid, + "housekeeping", housekeeping) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } - + json_decref (housekeeping); return; error: if (flux_respond_error (h, msg, errno, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + json_decref (housekeeping); } static const struct flux_msg_handler_spec htab[] = { @@ -198,6 +204,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating scheduler interface"); goto done; } + if (!(ctx.housekeeping = housekeeping_ctx_create (&ctx))) { + flux_log_error (h, "error creating resource housekeeping interface"); + goto done; + } if (!(ctx.start = start_ctx_create (&ctx))) { flux_log_error (h, "error creating exec interface"); goto done; @@ -256,6 +266,7 @@ int mod_main (flux_t *h, int argc, char **argv) wait_ctx_destroy (ctx.wait); drain_ctx_destroy (ctx.drain); start_ctx_destroy (ctx.start); + housekeeping_ctx_destroy (ctx.housekeeping); alloc_ctx_destroy (ctx.alloc); submit_ctx_destroy (ctx.submit); event_ctx_destroy (ctx.event); diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index f765b81d145e..7d0f19d58f1b 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -24,6 +24,7 @@ struct job_manager { struct conf *conf; struct start *start; struct alloc *alloc; + struct housekeeping *housekeeping; struct event *event; struct submit *submit; struct drain *drain;