From fae7684aae5a37dee1534b76765833c87690dadc Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 21 Mar 2024 10:39:57 -0700 Subject: [PATCH] job-manager: add housekeeping subsystem Problem: jobs get stuck in CLEANUP state while long epilog scripts run, causing sadness and idling resources. Introduce a new type of epilog script called "housekeeping" that is ostensibly job independent. Instead of freeing resources directly to the scheduler, jobs free resources to housekeeping, post their free event, and may reach INACTIVE state. Meanwhile, housekeeping can run a script on the allocated resources and return the resources to the scheduler when complete. The resources are still allocated to the job as far as the scheduler is concerned while housekeeping runs. However since the job has transitioned to INACTIVE, the flux-accounting plugin will decrement the running job count for the user and stop billing the user for the resources. 'flux resource list' utility shows the resources as allocated. By default, resources are released all at once to the scheduler, as before. However, if configured, resources can be freed to the scheduler immediately as they complete housekeeping on each execution target, or a timer can be started on completion of the first target, and when the timer expires, all the targets that have completed thus far are freed in one go. Following that, resources are freed to the scheduler immediately as they complete. This works with sched-simple without changes, with the exception that the hello protocol does not currently support partial release so, as noted in the code, housekeeping and a new job could overlap when the scheduler is reloaded on a live system. Some RFC 27 work is needed to resolve ths. The Fluxion scheduler does not currently support partial release (flux-framework/flux-sched#1151). But as discussed over there, the combination of receiving an R fragment and a jobid in the free request should be sufficient to get that working. --- src/modules/Makefile.am | 1 + src/modules/job-manager/Makefile.am | 3 + src/modules/job-manager/alloc.c | 37 +- src/modules/job-manager/alloc.h | 6 +- src/modules/job-manager/event.c | 7 +- src/modules/job-manager/housekeeping.c | 576 +++++++++++++++++++++++++ src/modules/job-manager/housekeeping.h | 34 ++ src/modules/job-manager/job-manager.c | 6 + src/modules/job-manager/job-manager.h | 1 + 9 files changed, 644 insertions(+), 27 deletions(-) create mode 100644 src/modules/job-manager/housekeeping.c create mode 100644 src/modules/job-manager/housekeeping.h diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 3dd5a3628ceb..28a305803ba1 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -220,6 +220,7 @@ job_manager_la_SOURCES = job_manager_la_LIBADD = \ $(builddir)/job-manager/libjob-manager.la \ $(top_builddir)/src/common/libjob/libjob.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-optparse.la \ diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 991c7260c17d..b9e4d7a38c5d 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -44,6 +44,8 @@ libjob_manager_la_SOURCES = \ kill.c \ alloc.h \ alloc.c \ + housekeeping.h \ + housekeeping.c \ start.h \ start.c \ list.h \ @@ -125,6 +127,7 @@ TESTS = \ test_ldadd = \ libjob-manager.la \ $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libflux-core.la \ diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index d936299115f1..94bc1fd686e6 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -37,6 +37,7 @@ #include "annotate.h" #include "raise.h" #include "queue.h" +#include "housekeeping.h" struct res_acct { struct rlist *allocated; @@ -209,10 +210,9 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum) } } -/* Send sched.free request for job. - * Update flags. +/* Send sched.free request. */ -int free_request (struct alloc *alloc, struct job *job) +int free_request (struct alloc *alloc, json_t *R, flux_jobid_t id) { flux_msg_t *msg; @@ -220,8 +220,8 @@ int free_request (struct alloc *alloc, struct job *job) return -1; if (flux_msg_pack (msg, "{s:I s:O}", - "id", job->id, - "R", job->R_redacted) < 0) + "id", id, + "R", R) < 0) goto error; if (flux_send (alloc->ctx->h, msg, 0) < 0) goto error; @@ -490,6 +490,8 @@ static void hello_cb (flux_t *h, } job = zhashx_next (ctx->active_jobs); } + if (housekeeping_hello_respond (ctx->housekeeping, msg) < 0) + goto error; if (flux_respond_error (h, msg, ENODATA, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); return; @@ -653,32 +655,23 @@ static void check_cb (flux_reactor_t *r, NULL); } -/* called from event_job_action() FLUX_JOB_STATE_CLEANUP */ -int alloc_send_free_request (struct alloc *alloc, struct job *job) +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id) { flux_error_t error; - assert (job->state == FLUX_JOB_STATE_CLEANUP); if (alloc->ready) { - if (free_request (alloc, job) < 0) + if (free_request (alloc, R, id) < 0) return -1; - if ((job->flags & FLUX_JOB_DEBUG)) - (void)event_job_post_pack (alloc->ctx->event, - job, - "debug.free-request", - 0, - NULL); - } - /* event_job_action() posts the "free" event after calling this function, - * so despite a no-op above if the scheduler isn't loaded, we account for - * the resources anyway. Since "free" clears job->has_resources, the job - * will not be presented in hello responses to the scheduler at reload. + } + /* Account for resource release even if the scheduler was offline above. + * When it comes back online, these resources will not be presented as + * allocated via the hello protocol, so they are effectively free. */ - if (acct_free (alloc, job->R_redacted, &error) < 0) { + if (acct_free (alloc, R, &error) < 0) { flux_log (alloc->ctx->h, LOG_ERR, "%s: %s", - idf58 (job->id), + idf58 (id), error.text); } return 0; diff --git a/src/modules/job-manager/alloc.h b/src/modules/job-manager/alloc.h index 03d8da714096..d77f5b25ce3f 100644 --- a/src/modules/job-manager/alloc.h +++ b/src/modules/job-manager/alloc.h @@ -11,6 +11,7 @@ #ifndef _FLUX_JOB_MANAGER_ALLOC_H #define _FLUX_JOB_MANAGER_ALLOC_H +#include #include #include "job.h" @@ -42,10 +43,9 @@ int alloc_queue_count (struct alloc *alloc); */ int alloc_pending_count (struct alloc *alloc); -/* Call from CLEANUP state to release resources. - * This function is a no-op if job->free_pending is set. +/* Release resources back to the scheduler. */ -int alloc_send_free_request (struct alloc *alloc, struct job *job); +int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id); /* List pending jobs */ diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index 313ba6ad6adb..4758d3b4fcf0 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -48,6 +48,7 @@ #include "ccan/str/str.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "drain.h" #include "journal.h" @@ -320,7 +321,7 @@ int event_job_action (struct event *event, struct job *job) /* N.B. start_pending indicates that the start request is still * expecting responses. The final response is the 'release' * response with final=true. Thus once the flag is clear, - * it is safe to release all resources to the scheduler. + * it is safe for the job to release its resources to housekeeping. */ if (job->has_resources && !job_event_is_queued (job, "epilog-start") @@ -328,7 +329,9 @@ int event_job_action (struct event *event, struct job *job) && !job->alloc_bypass && !job->start_pending && !job->free_posted) { - if (alloc_send_free_request (ctx->alloc, job) < 0) + if (housekeeping_start (ctx->housekeeping, + job->R_redacted, + job->id) < 0) return -1; if (event_job_post_pack (ctx->event, job, "free", 0, NULL) < 0) return -1; diff --git a/src/modules/job-manager/housekeeping.c b/src/modules/job-manager/housekeeping.c new file mode 100644 index 000000000000..6a565ec2dba8 --- /dev/null +++ b/src/modules/job-manager/housekeeping.c @@ -0,0 +1,576 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* housekeeping - clean resources prior to release to the scheduler + * + * Purpose: + * Resources are released by jobs to housekeeping. Housekeeping runs + * an epilog-like script, then releases resources to the scheduler. + * Unlike the epilog, housekeeping is intended to be divorced from the + * job, used for admin tasks like configuration management updates. + * The job does not remain in CLEANUP state while housekeeping runs, + * although the scheduler still thinks resources are allocated to the job. + * + * Configuration: + * [job-manager.housekeeping] + * command = "command arg1 arg2 ..." + * release-after = "FSD" + * + * Partial release: + * The 'release-after' config key enables partial release of resources. + * - If unset, resources for a given job are not released until all exec + * targets have completed housekeeping. + * - If set to "0", resources are released as each exec target completes. + * - If set to a nozero duration, a timer starts when the first exec target + * for a given job completes. When the timer expires, resources for all + * the completed exec targets are released. Following that, resources + * are released as each target completes. + * + * Script credentials: + * The housekeeping script runs as the instance owner (e.g. "flux"). + * + * Script environment: + * The environment is derived from the rank 0 broker's environment. + * Job-related environment variables are unset. + * FLUX_URI points to the local broker. + * + * script error handling: + * The script wait status is logged at LOG_ERR if it did not exit 0. + * Other script errors must be managed by the script itself: + * - Standard I/O is discarded. Use flux-logger(1) if needed. + * - The script can run forever. Use timeout(1) or equivalent as needed. + * - No drain on failure. Use flux-resource(1) to drain nodes if needed. + * + * Core scheduled instances: + * Note that housekeeping runs after every job even if the job did not + * allocate the whole node. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#ifdef HAVE_ARGZ_ADD +#include +#else +#include "src/common/libmissing/argz.h" +#endif + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/librlist/rlist.h" +#include "src/common/libhostlist/hostlist.h" +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/errprintf.h" +#include "src/common/libjob/idf58.h" +#include "src/common/libsubprocess/client.h" +#include "src/common/libsubprocess/command.h" +#include "ccan/str/str.h" + +#include "job.h" +#include "alloc.h" +#include "job-manager.h" +#include "conf.h" + +#include "housekeeping.h" + +// -1 = never, 0 = immediate, >0 = time in seconds +static const double default_release_after = -1; + +struct allocation { + flux_jobid_t id; + struct rlist *rl; // R, diminished each time a subset is released + struct idset *pending; // ranks in need of housekeeping + struct housekeeping *hk; + flux_watcher_t *timer; + bool timer_armed; + bool timer_expired; + int free_count; // number of releases + double t_start; +}; + +struct exec_target { + flux_future_t *f; +}; + +struct housekeeping { + struct job_manager *ctx; + flux_cmd_t *cmd; // NULL if not configured + double release_after; + zlistx_t *allocations; + uint32_t size; + struct exec_target *targets; // array of size entries (indexed by rank) +}; + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg); + +static const char *env_blocklist[] = { + "FLUX_JOB_ID", + "FLUX_JOB_SIZE", + "FLUX_JOB_NNODES", + "FLUX_JOB_TMPDIR", + "FLUX_TASK_RANK", + "FLUX_TASK_LOCAL_ID", + "FLUX_URI", + "FLUX_KVS_NAMESPACE", + "FLUX_PROXY_REMOTE", + NULL, +}; + +static void allocation_destroy (struct allocation *a) +{ + if (a) { + int saved_errno = errno; + rlist_destroy (a->rl); + idset_destroy (a->pending); + flux_watcher_destroy (a->timer); + free (a); + errno = saved_errno; + } +} + +// zlistx_destructor_fn footprint +static void allocation_destructor (void **item) +{ + if (item) { + allocation_destroy (*item); + *item = NULL; + } + +} + +static struct allocation *allocation_create (struct housekeeping *hk, + json_t *R, + flux_jobid_t id) +{ + struct allocation *a; + flux_reactor_t *r = flux_get_reactor (hk->ctx->h); + + if (!(a = calloc (1, sizeof (*a)))) + return NULL; + a->hk = hk; + a->id = id; + a->t_start = flux_reactor_now (flux_get_reactor (hk->ctx->h)); + if (!(a->rl = rlist_from_json (R, NULL)) + || !(a->pending = rlist_ranks (a->rl)) + || !(a->timer = flux_timer_watcher_create (r, + 0, + 0., + allocation_timeout, + a))) { + allocation_destroy (a); + return NULL; + } + return a; +} + +static struct idset *get_housekept_ranks (struct allocation *a) +{ + struct idset *ranks; + unsigned int id; + + if (!(ranks = rlist_ranks (a->rl))) + goto error; + id = idset_first (ranks); + while (id != IDSET_INVALID_ID) { + if (idset_test (a->pending, id)) + if (idset_clear (ranks, id) < 0) + goto error; + id = idset_next (ranks, id); + } + return ranks; +error: + idset_destroy (ranks); + return NULL; +} + +/* Release any resources in a->rl associated with ranks that are no longer + * pending for housekeeping. Then remove them from a->rl. + */ +static void allocation_release (struct allocation *a) +{ + struct job_manager *ctx = a->hk->ctx; + struct idset *ranks = NULL; + struct rlist *rl = NULL; + json_t *R = NULL; + + if ((ranks = get_housekept_ranks (a)) && idset_count (ranks) == 0) { + idset_destroy (ranks); + return; // nothing to do + } + + if (!ranks + || !(rl = rlist_copy_ranks (a->rl, ranks)) + || !(R = rlist_to_R (rl)) + || alloc_send_free_request (ctx->alloc, R, a->id) < 0 + || rlist_remove_ranks (a->rl, ranks) < 0) { + char *s = idset_encode (ranks, IDSET_FLAG_RANGE); + flux_log (ctx->h, + LOG_ERR, + "housekeeping error releasing resources for job %s ranks %s", + idf58 (a->id), + s ? s : "NULL"); + free (s); + } + else + a->free_count++; + json_decref (R); + rlist_destroy (rl); + idset_destroy (ranks); +} + +static void allocation_remove (struct allocation *a, void *cursor) +{ + flux_log (a->hk->ctx->h, + LOG_DEBUG, + "housekeeping: all resources of %s have been released", + idf58 (a->id)); + if (!cursor) + cursor = zlistx_find (a->hk->allocations, a); + if (!cursor) { + flux_log (a->hk->ctx->h, + LOG_ERR, + "housekeeping: internal error removing allocation for %s", + idf58 (a->id)); + return; + } + zlistx_delete (a->hk->allocations, cursor); +} + +static void allocation_timeout (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + struct allocation *a = arg; + + a->timer_expired = true; + + // release the ranks that have completed housekeeping so far + allocation_release (a); + + // the allocation has been completely released so retire it + if (rlist_nnodes (a->rl) == 0) + allocation_remove (a, NULL); +} + +/* 'rank' has completed housekeeping. + */ +static void housekeeping_finish_one (struct housekeeping *hk, int rank) +{ + struct allocation *a; + + a = zlistx_first (hk->allocations); + while (a) { + if (idset_test (a->pending, rank)) { + idset_clear (a->pending, rank); + + if (idset_count (a->pending) == 0 + || hk->release_after == 0 + || a->timer_expired) { + allocation_release (a); + } + if (!a->timer_armed && hk->release_after > 0) { + flux_timer_watcher_reset (a->timer, hk->release_after, 0.); + flux_watcher_start (a->timer); + a->timer_armed = true; + } + + // allocation has been completely released + if (rlist_nnodes (a->rl) == 0) + allocation_remove (a, zlistx_cursor (hk->allocations)); + } + a = zlistx_next (hk->allocations); + } +} + +static void housekeeping_continuation (flux_future_t *f, void *arg) +{ + struct housekeeping *hk = arg; + flux_t *h = flux_future_get_flux (f); + int rank = flux_rpc_get_nodeid (f); + const char *hostname = flux_get_hostbyrank (h, rank); + int status; + + if (subprocess_rexec_get (f) < 0) { + if (errno != ENODATA) { + flux_log (h, + LOG_ERR, + "housekeeping %s (rank %d): %s", + hostname, + rank, + future_strerror (f, errno)); + } + flux_future_destroy (f); + hk->targets[rank].f = NULL; + housekeeping_finish_one (hk, rank); + return; + } + if (subprocess_rexec_is_finished (f, &status)) { + if (WIFEXITED (status)) { + int n = WEXITSTATUS (status); + flux_log (h, + n == 0 ? LOG_INFO : LOG_ERR, + "housekeeping %s (rank %d): exit %d", + hostname, + rank, + n); + } + else if (WIFSIGNALED (status)) { + int n = WTERMSIG (status); + flux_log (h, + LOG_ERR, + "housekeeping %s (rank %d): %s", + hostname, + rank, + strsignal (n)); + } + } + flux_future_reset (f); +} + +static int housekeeping_start_one (struct housekeeping *hk, int rank) +{ + flux_future_t *f; + int flags = 0; + + if (rank >= hk->size) + return -1; + if (hk->targets[rank].f != NULL) // in progress already + return 0; + if (!(f = subprocess_rexec (hk->ctx->h, + "rexec", + rank, + hk->cmd, + flags)) + || flux_future_then (f, -1., housekeeping_continuation, hk) < 0) { + flux_future_destroy (f); + return -1; + } + hk->targets[rank].f = f; + return 0; +} + +int housekeeping_start (struct housekeeping *hk, + json_t *R, + flux_jobid_t id) +{ + flux_t *h = hk->ctx->h; + struct allocation *a; + unsigned int rank; + void *list_handle; + + /* Housekeeping is not configured + */ + if (!hk->cmd) + goto skip; + + /* Create the 'allocation' and put it in our list. + */ + if (!(a = allocation_create (hk, R, id)) + || !(list_handle = zlistx_insert (hk->allocations, a, false))) { + flux_log (h, + LOG_ERR, + "housekeeping: %s error saving alloc object (skipping)", + idf58 (id)); + allocation_destroy (a); + goto skip; + } + + /* Iterate over the ranks in the allocation and start housekeeping on + * each rank, unless already running. Continuations for the remote + * execution will find allocations remove rank from a->pending and return + * resources to the scheduler. + */ + rank = idset_first (a->pending); + while (rank != IDSET_INVALID_ID) { + if (housekeeping_start_one (hk, rank) < 0) { + flux_log_error (h, "error starting housekeeping on rank %d", rank); + idset_clear (a->pending, rank); + } + rank = idset_next (a->pending, rank); + } + if (idset_count (a->pending) == 0) { + zlistx_delete (hk->allocations, list_handle); + goto skip; + } + return 0; +skip: + return alloc_send_free_request (hk->ctx->alloc, R, id); +} + +/* We need a revision to RFC 27 to support partial allocations in the + * hello response payload. For now, just destroy any allocation record + * that has been partially released and let the scheduler assume any resources + * currently running housekeeping are "free". Same deal if the job has + * been purged or if we drop the response message. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg) +{ + struct allocation *a; + struct job *job; + + a = zlistx_first (hk->allocations); + while (a) { + if (a->free_count > 0 + || (!(job = zhashx_lookup (hk->ctx->inactive_jobs, &a->id)) + && !(job = zhashx_lookup (hk->ctx->active_jobs, &a->id))) + || flux_respond_pack (hk->ctx->h, + msg, + "{s:I s:I s:I s:f}", + "id", job->id, + "priority", job->priority, + "userid", (json_int_t) job->userid, + "t_submit", job->t_submit) < 0) { + struct hostlist *hl = rlist_nodelist (a->rl); + char *hosts = hostlist_encode (hl); + json_t *R = rlist_to_R (a->rl); + + flux_log (hk->ctx->h, + LOG_ERR, + "housekeeping: WARNING still running on %s of %s" + " at scheduler restart. Jobs may be allowed to run" + " there before housekeeping is complete.", + hosts ? hosts : "some nodes", + idf58 (a->id)); + + // avoid double booking error from alloc accounting. + alloc_acct_fudge_free (hk->ctx->alloc, R, "housekeeping"); + + // delete the allocation to avoid sending frees later + zlistx_delete (hk->allocations, zlistx_cursor (hk->allocations)); + + json_decref (R); + free (hosts); + hostlist_destroy (hl); + } + a = zlistx_next (hk->allocations); + } + return 0; +} + +static flux_cmd_t *create_cmd (const char *cmdline, + const char **blocklist) +{ + char *argz = NULL; + size_t argz_len = 0; + int argc; + char **argv = NULL; + flux_cmd_t *cmd = NULL; + + if (argz_create_sep (cmdline, ' ', &argz, &argz_len) < 0 + || (argc = argz_count (argz, argz_len)) == 0 + || !(argv = calloc (argc + 1, sizeof (argv[0])))) + goto done; + argz_extract (argz, argz_len, argv); + if (!(cmd = flux_cmd_create (argc, argv, environ))) + goto done; + if (blocklist) { + for (int i = 0; blocklist[i] != NULL; i++) + flux_cmd_unsetenv (cmd, blocklist[i]); + } + +done: + free (argz); + free (argv); + return cmd; +} + +static int housekeeping_parse_config (const flux_conf_t *conf, + flux_error_t *error, + void *arg) +{ + struct housekeeping *hk = arg; + flux_error_t e; + const char *cmdline = NULL; + const char *release_after = NULL; + flux_cmd_t *cmd = NULL; + + if (flux_conf_unpack (conf, + &e, + "{s?{s?{s?s s?s !}}}", + "job-manager", + "housekeeping", + "command", &cmdline, + "release-after", &release_after) < 0) { + return errprintf (error, + "job-manager.housekeeping.command: %s", + e.text); + } + if (release_after) { + if (fsd_parse_duration (release_after, &hk->release_after) < 0) + return errprintf (error, + "job-manager.housekeeping.release-after" + " FSD parse error"); + } + if (cmdline && !(cmd = create_cmd (cmdline, env_blocklist))) + return errprintf (error, "error creating housekeeping command object"); + flux_cmd_destroy (hk->cmd); + hk->cmd = cmd; + flux_log (hk->ctx->h, + LOG_DEBUG, + "housekeeping is %sconfigured", + hk->cmd ? "" : "not "); + return 1; // allow dynamic changes +} + +void housekeeping_ctx_destroy (struct housekeeping *hk) +{ + if (hk) { + int saved_errno = errno; + conf_unregister_callback (hk->ctx->conf, housekeeping_parse_config); + flux_cmd_destroy (hk->cmd); + zlistx_destroy (&hk->allocations); + if (hk->targets) { + for (int i = 0; i < hk->size; i++) + flux_future_destroy (hk->targets[i].f); + free (hk->targets); + } + free (hk); + errno = saved_errno; + } +} + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx) +{ + struct housekeeping *hk; + flux_error_t error; + + if (!(hk = calloc (1, sizeof (*hk)))) + return NULL; + hk->ctx = ctx; + hk->release_after = default_release_after; + if (flux_get_size (ctx->h, &hk->size) < 0) + goto error; + if (!(hk->targets = calloc (hk->size, sizeof (hk->targets[0])))) + goto error; + if (!(hk->allocations = zlistx_new ())) { + errno = ENOMEM; + goto error; + } + zlistx_set_destructor (hk->allocations, allocation_destructor); + if (conf_register_callback (ctx->conf, + &error, + housekeeping_parse_config, + hk) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + goto error; + } + return hk; +error: + housekeeping_ctx_destroy (hk); + return NULL; +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/housekeeping.h b/src/modules/job-manager/housekeeping.h new file mode 100644 index 000000000000..c7e0dda6ecfa --- /dev/null +++ b/src/modules/job-manager/housekeeping.h @@ -0,0 +1,34 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_HOUSEKEEPING_H +#define _FLUX_JOB_MANAGER_HOUSEKEEPING_H + +#include +#include "job-manager.h" + +struct housekeeping *housekeeping_ctx_create (struct job_manager *ctx); +void housekeeping_ctx_destroy (struct housekeeping *hk); + +/* Call this to transfer a job's R to the housekeeping subsystem. The job + * may treat R as freed, but R will remain allocated from the scheduler's + * perspective until the housekeeping script is run on each execution target. + */ +int housekeeping_start (struct housekeeping *hk, json_t *R, flux_jobid_t id); + +/* Call this to add responses to the scheduler's hello request at startup. + * It should inform the scheduler about resources that are still allocated, + * but no longer directly held by jobs. + */ +int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg); + +#endif /* ! _FLUX_JOB_MANAGER_HOUSEKEEPING_H */ + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index f3ff97ec49a1..21dc9ae9a92e 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -27,6 +27,7 @@ #include "list.h" #include "urgency.h" #include "alloc.h" +#include "housekeeping.h" #include "start.h" #include "event.h" #include "drain.h" @@ -198,6 +199,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating scheduler interface"); goto done; } + if (!(ctx.housekeeping = housekeeping_ctx_create (&ctx))) { + flux_log_error (h, "error creating resource housekeeping interface"); + goto done; + } if (!(ctx.start = start_ctx_create (&ctx))) { flux_log_error (h, "error creating exec interface"); goto done; @@ -256,6 +261,7 @@ int mod_main (flux_t *h, int argc, char **argv) wait_ctx_destroy (ctx.wait); drain_ctx_destroy (ctx.drain); start_ctx_destroy (ctx.start); + housekeeping_ctx_destroy (ctx.housekeeping); alloc_ctx_destroy (ctx.alloc); submit_ctx_destroy (ctx.submit); event_ctx_destroy (ctx.event); diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index f765b81d145e..7d0f19d58f1b 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -24,6 +24,7 @@ struct job_manager { struct conf *conf; struct start *start; struct alloc *alloc; + struct housekeeping *housekeeping; struct event *event; struct submit *submit; struct drain *drain;