From 0ce30db48fc20b450fe4e2695ca049cf260bff29 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 26 Oct 2021 16:21:13 -0700 Subject: [PATCH 01/13] libkvs: fix comment typo --- src/common/libkvs/kvs_getroot.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/libkvs/kvs_getroot.h b/src/common/libkvs/kvs_getroot.h index 5cdfe995e88c..94c370ba40db 100644 --- a/src/common/libkvs/kvs_getroot.h +++ b/src/common/libkvs/kvs_getroot.h @@ -21,7 +21,7 @@ flux_future_t *flux_kvs_getroot (flux_t *h, const char *ns, int flags); /* Decode KVS root hash response. * * treeobj - get the hash as an RFC 11 "dirref" object. - * blobref - get the raw hash as a n RFC 10 "blobref". + * blobref - get the raw hash as an RFC 10 "blobref". * sequence - get the commit sequence number * owner - get the userid of the namespace owner */ From bc3848af49d06506a0203ade117862615a1e404f Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 18 Nov 2021 14:19:52 -0800 Subject: [PATCH 02/13] job-exec: fix comment typo --- src/modules/job-exec/job-exec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index 5e95cd3aeaec..9d3e53629b82 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -26,7 +26,7 @@ * * JOB INIT: * - * On reciept of a start request, the exec service enters initialization + * On receipt of a start request, the exec service enters initialization * phase of the job, where the jobspec and R are fetched from the KVS, * and the guest namespace is created and linked from the primary * namespace. A guest.exec.eventlog is created with an initial "init" From 25208d9fe1e86921a16a0de592501cdcddc46dde Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 27 Oct 2021 16:44:45 -0700 Subject: [PATCH 03/13] job-exec: remove unused function param --- src/modules/job-exec/job-exec.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index 9d3e53629b82..2b6149511ce8 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -290,7 +290,7 @@ static int jobinfo_send_release (struct jobinfo *job, } static int jobinfo_respond (flux_t *h, struct jobinfo *job, - const char *event, int status) + const char *event) { return flux_respond_pack (h, job->req, "{s:I s:s s:{}}", "id", job->id, @@ -327,7 +327,7 @@ void jobinfo_started (struct jobinfo *job, const char *fmt, ...) va_start (ap, fmt); job->running = 1; va_end (ap); - if (jobinfo_respond (h, job, "start", 0) < 0) + if (jobinfo_respond (h, job, "start") < 0) flux_log_error (h, "jobinfo_started: flux_respond"); } } From 322868f195e5d09af7f189416543a789b270af2d Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 27 Oct 2021 16:47:20 -0700 Subject: [PATCH 04/13] job-exec: remove unused var args Problem: The jobinfo_started() function took a fmt parameter and variable args, but never used them. Solution: Remove the variable arguments options to jobinfo_started(). Adjust callers accordingly. --- src/modules/job-exec/exec.c | 2 +- src/modules/job-exec/job-exec.c | 5 +---- src/modules/job-exec/job-exec.h | 4 ++-- src/modules/job-exec/testexec.c | 7 ++----- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/modules/job-exec/exec.c b/src/modules/job-exec/exec.c index 3bb837a9e749..eec88ad3e24f 100644 --- a/src/modules/job-exec/exec.c +++ b/src/modules/job-exec/exec.c @@ -111,7 +111,7 @@ static const char *job_get_cwd (struct jobinfo *job) static void start_cb (struct bulk_exec *exec, void *arg) { struct jobinfo *job = arg; - jobinfo_started (job, NULL); + jobinfo_started (job); /* This is going to be really slow. However, it should at least * work for now. We wait for all imp's to start, then send input */ diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index 2b6149511ce8..fe05677a1661 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -319,14 +319,11 @@ static void jobinfo_complete (struct jobinfo *job, const struct idset *ranks) } } -void jobinfo_started (struct jobinfo *job, const char *fmt, ...) +void jobinfo_started (struct jobinfo *job) { flux_t *h = job->ctx->h; if (h && job->req) { - va_list ap; - va_start (ap, fmt); job->running = 1; - va_end (ap); if (jobinfo_respond (h, job, "start") < 0) flux_log_error (h, "jobinfo_started: flux_respond"); } diff --git a/src/modules/job-exec/job-exec.h b/src/modules/job-exec/job-exec.h index 2cbed57e9b66..0c3f4f462e43 100644 --- a/src/modules/job-exec/job-exec.h +++ b/src/modules/job-exec/job-exec.h @@ -102,8 +102,8 @@ int jobinfo_emit_event_pack_nowait (struct jobinfo *job, const char *name, const char *fmt, ...); -/* Emit start event with optional note in jansson pack format */ -void jobinfo_started (struct jobinfo *job, const char *fmt, ...); +/* Emit start event */ +void jobinfo_started (struct jobinfo *job); /* Notify job-exec that ranks in idset `ranks` have completed * with the given wait status diff --git a/src/modules/job-exec/testexec.c b/src/modules/job-exec/testexec.c index c8f6e7e71f2d..ee78fa0cc57f 100644 --- a/src/modules/job-exec/testexec.c +++ b/src/modules/job-exec/testexec.c @@ -171,7 +171,6 @@ static int start_timer (flux_t *h, struct testexec *te) if (t < 0.) t = 1.e-5; if (t >= 0.) { - char timebuf[256]; if (t > 0.) { te->timer = flux_timer_watcher_create (r, t, 0., timer_cb, te); if (!te->timer) { @@ -179,10 +178,8 @@ static int start_timer (flux_t *h, struct testexec *te) return -1; } flux_watcher_start (te->timer); - snprintf (timebuf, sizeof (timebuf), "%.6fs", t); - } else - strncpy (timebuf, "inf", sizeof (timebuf)); - jobinfo_started (te->job, "{ s:s }", "timer", timebuf); + } + jobinfo_started (te->job); } else return -1; From d653339cb6b31eac6193cdbfce97bcbf701d5840 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 8 Nov 2021 10:48:48 -0800 Subject: [PATCH 05/13] job-exec: checkpoint running namespaces on unload Problem: If the job-exec module is unloaded with running jobs, we have no way to recreate the KVS namespaces for those jobs if we wish to re-attach to them later. Solution: Checkpoint the KVS root references for any KVS namespaces of running jobs. --- src/modules/job-exec/Makefile.am | 2 + src/modules/job-exec/checkpoint.c | 186 ++++++++++++++++++++++++++++++ src/modules/job-exec/checkpoint.h | 22 ++++ src/modules/job-exec/job-exec.c | 7 +- 4 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 src/modules/job-exec/checkpoint.c create mode 100644 src/modules/job-exec/checkpoint.h diff --git a/src/modules/job-exec/Makefile.am b/src/modules/job-exec/Makefile.am index 791262832a6c..618dcf47daf8 100644 --- a/src/modules/job-exec/Makefile.am +++ b/src/modules/job-exec/Makefile.am @@ -27,6 +27,8 @@ libbulk_exec_la_SOURCES = \ job_exec_la_SOURCES = \ job-exec.h \ job-exec.c \ + checkpoint.h \ + checkpoint.c \ rset.c \ rset.h \ testexec.c \ diff --git a/src/modules/job-exec/checkpoint.c b/src/modules/job-exec/checkpoint.c new file mode 100644 index 000000000000..94c716380afe --- /dev/null +++ b/src/modules/job-exec/checkpoint.c @@ -0,0 +1,186 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* Prototype checkpoint of running jobs KVS root refs + * + * DESCRIPTION + * + * Handle checkpoint of running job's guest KVS namescapes into the + * primary KVS. This will allow the namespaces to be recreated if + * a job manager is brought down than back up. + * + * OPERATION + * + * Get the KVS rootrefs for all running jobs and commit to + * "job-exec.kvs-namespaces". + * + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include + +#include "src/common/libczmqcontainers/czmq_containers.h" + +#include "job-exec.h" +#include "checkpoint.h" + +static int lookup_nsroots (flux_t *h, zhashx_t *jobs, flux_future_t **fp) +{ + struct jobinfo *job = zhashx_first (jobs); + flux_future_t *fall = NULL; + flux_future_t *f = NULL; + + while (job) { + if (job->running) { + if (!fall) { + if (!(fall = flux_future_wait_all_create ())) + goto cleanup; + flux_future_set_flux (fall, h); + } + if (!(f = flux_kvs_getroot (h, job->ns, 0))) + goto cleanup; + if (flux_future_aux_set (f, "jobinfo", job, NULL) < 0) + goto cleanup; + if (flux_future_push (fall, job->ns, f) < 0) + goto cleanup; + f = NULL; + } + job = zhashx_next (jobs); + } + + (*fp) = fall; + return 0; + +cleanup: + flux_future_destroy (f); + flux_future_destroy (fall); + return -1; +} + +static json_t *get_nsroots (flux_t *h, flux_future_t *fall) +{ + const char *child; + json_t *nsdata = NULL; + int saved_errno; + + if (!(nsdata = json_array ())) { + errno = ENOMEM; + return NULL; + } + + child = flux_future_first_child (fall); + while (child) { + flux_future_t *f = flux_future_get_child (fall, child); + struct jobinfo *job; + const char *blobref = NULL; + json_t *o = NULL; + if (!f) + goto cleanup; + if (!(job = flux_future_aux_get (f, "jobinfo"))) + goto cleanup; + if (flux_kvs_getroot_get_blobref (f, &blobref) < 0) + goto cleanup; + if (!(o = json_pack ("{s:I s:i s:s}", + "id", job->id, + "owner", job->userid, + "kvsroot", blobref))) { + errno = ENOMEM; + goto cleanup; + } + if (json_array_append (nsdata, o) < 0) { + json_decref (o); + errno = ENOMEM; + goto cleanup; + } + json_decref (o); + child = flux_future_next_child (fall); + } + + return nsdata; +cleanup: + saved_errno = errno; + json_decref (nsdata); + errno = saved_errno; + return NULL; +} + +static int checkpoint_commit (flux_t *h, json_t *nsdata) +{ + flux_future_t *f = NULL; + flux_kvs_txn_t *txn = NULL; + char *s = NULL; + int rv = -1; + + if (!(s = json_dumps (nsdata, JSON_COMPACT))) { + errno = ENOMEM; + goto cleanup; + } + + if (!(txn = flux_kvs_txn_create ())) + goto cleanup; + + if (flux_kvs_txn_put (txn, + 0, + "job-exec.kvs-namespaces", + s) < 0) + goto cleanup; + + if (!(f = flux_kvs_commit (h, NULL, 0, txn))) + goto cleanup; + + if (flux_future_get (f, NULL) < 0) + goto cleanup; + + rv = 0; +cleanup: + flux_future_destroy (f); + flux_kvs_txn_destroy (txn); + free (s); + return rv; +} + +void checkpoint_running (flux_t *h, zhashx_t *jobs) +{ + flux_future_t *lookupf = NULL; + json_t *nsdata = NULL; + + if (!h || !jobs) + return; + + if (lookup_nsroots (h, jobs, &lookupf) < 0) { + flux_log_error (h, "failed to lookup ns root refs"); + goto cleanup; + } + + if (!lookupf) + return; + + if (!(nsdata = get_nsroots (h, lookupf))) { + flux_log_error (h, "failure getting ns root refs"); + goto cleanup; + } + + if (checkpoint_commit (h, nsdata) < 0) { + flux_log_error (h, "failure committing ns checkpoint data"); + goto cleanup; + } + +cleanup: + json_decref (nsdata); + flux_future_destroy (lookupf); +} + +/* + * vi: tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-exec/checkpoint.h b/src/modules/job-exec/checkpoint.h new file mode 100644 index 000000000000..0109dd9ae777 --- /dev/null +++ b/src/modules/job-exec/checkpoint.h @@ -0,0 +1,22 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef HAVE_JOB_EXEC_CHECKPOINT_H +#define HAVE_JOB_EXEC_CHECKPOINT_H 1 + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "job-exec.h" + +void checkpoint_running (flux_t *h, zhashx_t *jobs); + +#endif /* !HAVE_JOB_EXEC_CHECKPOINT_EXEC_H */ + +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index fe05677a1661..3422e4da4d47 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -99,6 +99,7 @@ #include "src/common/libutil/errno_safe.h" #include "job-exec.h" +#include "checkpoint.h" static double kill_timeout=5.0; @@ -1100,10 +1101,12 @@ static int configure_implementations (flux_t *h, int argc, char **argv) return 0; } -static int unload_implementations (void) +static int unload_implementations (struct job_exec_ctx *ctx) { struct exec_implementation *impl; int i = 0; + if (ctx && ctx->jobs) + checkpoint_running (ctx->h, ctx->jobs); while ((impl = implementations[i]) && impl->name) { if (impl->unload) (*impl->unload) (); @@ -1143,7 +1146,7 @@ int mod_main (flux_t *h, int argc, char **argv) rc = flux_reactor_run (flux_get_reactor (h), 0); out: - unload_implementations (); + unload_implementations (ctx); saved_errno = errno; if (flux_event_unsubscribe (h, "job-exception") < 0) From 5161df07a7c774a34fde73fcfeb6b812cb032cf4 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 19 Nov 2021 10:43:23 -0800 Subject: [PATCH 06/13] job-exec: remove namespaces on unload Problem: After checkpoint of running namespaces, we do not want running jobs to continue to write to that guest namespace. Solution: On job-exec unload, remove the namespaces we just checkpointed. --- src/modules/job-exec/job-exec.c | 40 ++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index 3422e4da4d47..a1143b8c6856 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -1101,12 +1101,50 @@ static int configure_implementations (flux_t *h, int argc, char **argv) return 0; } +static int remove_running_ns (struct job_exec_ctx *ctx) +{ + struct jobinfo *job = zhashx_first (ctx->jobs); + flux_future_t *fall = NULL; + flux_future_t *f = NULL; + int rv = -1; + + while (job) { + if (job->running) { + if (!fall) { + if (!(fall = flux_future_wait_all_create ())) + goto cleanup; + flux_future_set_flux (fall, ctx->h); + } + if (!(f = flux_kvs_namespace_remove (ctx->h, job->ns))) + goto cleanup; + if (flux_future_push (fall, job->ns, f) < 0) + goto cleanup; + f = NULL; + } + job = zhashx_next (ctx->jobs); + } + + if (fall) { + if (flux_future_wait_for (fall, -1.) < 0) + goto cleanup; + } + + rv = 0; +cleanup: + flux_future_destroy (f); + flux_future_destroy (fall); + return rv; +} + static int unload_implementations (struct job_exec_ctx *ctx) { struct exec_implementation *impl; int i = 0; - if (ctx && ctx->jobs) + if (ctx && ctx->jobs) { checkpoint_running (ctx->h, ctx->jobs); + if (remove_running_ns (ctx) < 0) + flux_log_error (ctx->h, "failed to remove guest namespaces"); + } while ((impl = implementations[i]) && impl->name) { if (impl->unload) (*impl->unload) (); From 91a032d43153f11e8c360613d99499499013885c Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 27 Oct 2021 10:03:24 -0700 Subject: [PATCH 07/13] job-manager: send reattach flag to job-exec Problem: When the job-manager is re-loaded discovers a job that's in the RUN state, there is no way to inform the job exec module that the job should still be running. Solution: Add a "reattach" flag to the job-exec.start RPC. This flag informs the job-exec module that the job should still be running. --- src/modules/job-manager/job.h | 1 + src/modules/job-manager/restart.c | 4 +++- src/modules/job-manager/start.c | 16 ++++++++++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/modules/job-manager/job.h b/src/modules/job-manager/job.h index d3d4a14332ee..a3d86ee1abfb 100644 --- a/src/modules/job-manager/job.h +++ b/src/modules/job-manager/job.h @@ -40,6 +40,7 @@ struct job { uint8_t free_pending:1; // free request sent to sched uint8_t has_resources:1; uint8_t start_pending:1;// start request sent to job-exec + uint8_t reattach:1; uint8_t perilog_active; // if nonzero, prolog/epilog active diff --git a/src/modules/job-manager/restart.c b/src/modules/job-manager/restart.c index b932a6b3d7d7..8cff91779306 100644 --- a/src/modules/job-manager/restart.c +++ b/src/modules/job-manager/restart.c @@ -266,8 +266,10 @@ int restart_from_kvs (struct job_manager *ctx) __FUNCTION__, (uintmax_t)job->id); } } - else if ((job->state & FLUX_JOB_STATE_RUNNING) != 0) + else if ((job->state & FLUX_JOB_STATE_RUNNING) != 0) { ctx->running_jobs++; + job->reattach = 1; + } job = zhashx_next (ctx->active_jobs); } flux_log (ctx->h, LOG_INFO, "restart: %d running jobs", ctx->running_jobs); diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index bc7a11c8b35b..cee6e555d663 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -200,8 +200,15 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } if (!strcmp (type, "start")) { - if (event_job_post_pack (ctx->event, job, "start", 0, NULL) < 0) - goto error_post; + if (job->reattach) + flux_log (h, + LOG_ERR, + "start response: id=%ju should not get start event", + (uintmax_t)id); + else { + if (event_job_post_pack (ctx->event, job, "start", 0, NULL) < 0) + goto error_post; + } } else if (!strcmp (type, "release")) { const char *idset; @@ -275,10 +282,11 @@ int start_send_request (struct start *start, struct job *job) if (!job->start_pending && start->topic != NULL) { if (!(msg = flux_request_encode (start->topic, NULL))) return -1; - if (flux_msg_pack (msg, "{s:I s:i s:O}", + if (flux_msg_pack (msg, "{s:I s:i s:O s:b}", "id", job->id, "userid", job->userid, - "jobspec", job->jobspec_redacted) < 0) + "jobspec", job->jobspec_redacted, + "reattach", job->reattach) < 0) goto error; if (flux_send (ctx->h, msg, 0) < 0) goto error; From 9dde20193abde140a74549d9f2d2f3fc7079c188 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 1 Nov 2021 11:38:54 -0700 Subject: [PATCH 08/13] job-exec/testexec: emulate job reattachment Problem: The testexec exec implementation does not parse and handle the "reattach" flag from the job-manager. Solution: If the testexec implementation sees the "reattach" flag from the job-manager, emulate that job is still running by running the job for the remaining time it should given the job's start time. Emit a "re-starting" event indicating this restart and notify the job-manager via a "reattach" event. --- src/modules/job-exec/job-exec.c | 24 +++++-- src/modules/job-exec/job-exec.h | 4 ++ src/modules/job-exec/testexec.c | 111 +++++++++++++++++++++++++++++--- src/modules/job-manager/start.c | 3 + 4 files changed, 128 insertions(+), 14 deletions(-) diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index a1143b8c6856..ec4c1c65b72b 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -330,6 +330,16 @@ void jobinfo_started (struct jobinfo *job) } } +void jobinfo_reattached (struct jobinfo *job) +{ + flux_t *h = job->ctx->h; + if (h && job->req) { + job->running = 1; + if (jobinfo_respond (h, job, "reattached") < 0) + flux_log_error (h, "jobinfo_reattach: flux_respond"); + } +} + static void kill_timer_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { @@ -592,7 +602,10 @@ static int jobinfo_finalize (struct jobinfo *job) static int jobinfo_start_execution (struct jobinfo *job) { - jobinfo_emit_event_pack_nowait (job, "starting", NULL); + if (job->reattach) + jobinfo_emit_event_pack_nowait (job, "re-starting", NULL); + else + jobinfo_emit_event_pack_nowait (job, "starting", NULL); /* Set started flag before calling 'start' method because we want to * be sure to clean up properly if an exception occurs */ @@ -806,7 +819,9 @@ static void namespace_link (flux_future_t *fprev, void *arg) goto error; } flux_future_set_flux (cf, h); - if (!(f = jobinfo_emit_event_pack (job, "init", NULL)) + if (!(f = jobinfo_emit_event_pack (job, + job->reattach ? "reattach" : "init", + NULL)) || flux_future_push (cf, "emit event", f) < 0) goto error; @@ -914,10 +929,11 @@ static int job_start (struct job_exec_ctx *ctx, const flux_msg_t *msg) job->ctx = ctx; - if (flux_request_unpack (job->req, NULL, "{s:I s:i s:O}", + if (flux_request_unpack (job->req, NULL, "{s:I s:i s:O s:b}", "id", &job->id, "userid", &job->userid, - "jobspec", &job->jobspec) < 0) { + "jobspec", &job->jobspec, + "reattach", &job->reattach) < 0) { flux_log_error (ctx->h, "start: flux_request_unpack"); jobinfo_decref (job); return -1; diff --git a/src/modules/job-exec/job-exec.h b/src/modules/job-exec/job-exec.h index 0c3f4f462e43..efebde89a441 100644 --- a/src/modules/job-exec/job-exec.h +++ b/src/modules/job-exec/job-exec.h @@ -77,6 +77,7 @@ struct jobinfo { uint8_t running:1; /* all shells are running */ uint8_t finalizing:1; /* in process of cleanup */ + int reattach; /* job-manager reattach attempt */ int wait_status; struct eventlogger * ev; /* event batcher */ @@ -105,6 +106,9 @@ int jobinfo_emit_event_pack_nowait (struct jobinfo *job, /* Emit start event */ void jobinfo_started (struct jobinfo *job); +/* Emit reattached event */ +void jobinfo_reattached (struct jobinfo *job); + /* Notify job-exec that ranks in idset `ranks` have completed * with the given wait status */ diff --git a/src/modules/job-exec/testexec.c b/src/modules/job-exec/testexec.c index ee78fa0cc57f..fa3e6c36d94f 100644 --- a/src/modules/job-exec/testexec.c +++ b/src/modules/job-exec/testexec.c @@ -40,8 +40,12 @@ # include "config.h" #endif +#include +#include + #include "src/common/libutil/fsd.h" #include "src/common/libjob/job_hash.h" +#include "src/common/libeventlog/eventlog.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "job-exec.h" @@ -160,10 +164,9 @@ static void timer_cb (flux_reactor_t *r, * is sent when the timer fires (simulating the exit of the final * job shell.) */ -static int start_timer (flux_t *h, struct testexec *te) +static int start_timer (flux_t *h, struct testexec *te, double t) { flux_reactor_t *r = flux_get_reactor (h); - double t = te->conf.run_duration; /* For now, if a job duration wasn't found, complete job almost * immediately. @@ -179,7 +182,10 @@ static int start_timer (flux_t *h, struct testexec *te) } flux_watcher_start (te->timer); } - jobinfo_started (te->job); + if (te->job->reattach) + jobinfo_reattached (te->job); + else + jobinfo_started (te->job); } else return -1; @@ -216,17 +222,102 @@ static int testexec_init (struct jobinfo *job) return 1; } +static int testexec_reattach_starttime (struct jobinfo *job, + const char *eventlog, + time_t *startp) +{ + json_t *o = NULL; + size_t index; + json_t *value; + int rv = -1; + + if (!(o = eventlog_decode (eventlog))) { + jobinfo_fatal_error (job, errno, "eventlog_decode"); + goto cleanup; + } + + json_array_foreach (o, index, value) { + double timestamp; + const char *name; + + if (eventlog_entry_parse (value, ×tamp, &name, NULL) < 0) { + jobinfo_fatal_error (job, errno, "eventlog_entry_parse"); + goto cleanup; + } + if (!strcmp (name, "start")) { + (*startp) = (time_t)timestamp; + break; + } + } + + rv = 0; +cleanup: + json_decref (o); + return rv; +} + +static int testexec_reattach (struct testexec *te) +{ + const char *value; + flux_future_t *f = NULL; + time_t start = 0; + struct timespec now; + time_t runtimeleft = -1; + int rv = -1; + char ekey[256]; + + if (flux_job_kvs_key (ekey, sizeof (ekey), te->job->id, "eventlog") < 0) { + jobinfo_fatal_error (te->job, errno, "flux_job_kvs_key"); + goto cleanup; + } + if (!(f = flux_kvs_lookup (te->job->h, + NULL, + 0, + ekey))) { + jobinfo_fatal_error (te->job, errno, "flux_kvs_lookup"); + goto cleanup; + } + if (flux_kvs_lookup_get (f, &value) < 0) { + jobinfo_fatal_error (te->job, errno, "flux_kvs_lookup_get starttimes"); + goto cleanup; + } + if (testexec_reattach_starttime (te->job, value, &start) < 0) + goto cleanup; + /* just use seconds, we approximate runtime left */ + clock_gettime (CLOCK_REALTIME, &now); + if ((now.tv_sec - start) <= te->conf.run_duration) + runtimeleft = (start + te->conf.run_duration) - now.tv_sec; + if (start_timer (te->job->h, + te, + runtimeleft) < 0) { + jobinfo_fatal_error (te->job, errno, "unable to restart timer"); + goto cleanup; + } + rv = 0; +cleanup: + flux_future_destroy (f); + return rv; +} + static int testexec_start (struct jobinfo *job) { struct testexec *te = job->data; - if (!te->conf.override && start_timer (job->h, te) < 0) { - jobinfo_fatal_error (job, errno, "unable to start test exec timer"); - return -1; + if (job->reattach) { + if (testexec_reattach (te) < 0) + return -1; } - if (testconf_mock_exception (&te->conf, "run")) { - jobinfo_fatal_error (job, 0, "mock run exception generated"); - return -1; + else { + if (!te->conf.override && start_timer (job->h, + te, + te->conf.run_duration) < 0) { + jobinfo_fatal_error (job, errno, "unable to start test exec timer"); + return -1; + } + if (testconf_mock_exception (&te->conf, "run")) { + jobinfo_fatal_error (job, 0, "mock run exception generated"); + return -1; + } } return 0; } @@ -294,7 +385,7 @@ static void testexec_request_cb (flux_t *h, errno = EINVAL; goto error; } - if (start_timer (h, te) < 0) + if (start_timer (h, te, te->conf.run_duration) < 0) goto error; } else if (strcmp (event, "finish") == 0) { diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index cee6e555d663..f508fb341c82 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -210,6 +210,9 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, goto error_post; } } + else if (!strcmp (type, "reattached")) { + /* nothing to do yet */ + } else if (!strcmp (type, "release")) { const char *idset; int final; From 960308210d6d78ee45332a4454d6a4c7faab8b3d Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 8 Nov 2021 14:19:59 -0800 Subject: [PATCH 09/13] job-exec: restart KVS namespace with checkpoint Problem: If re-attaching to an already running job, re-create the KVS namespace based on the previously checkpointed root reference for the job. --- src/modules/job-exec/checkpoint.c | 67 ++++++++++++++++++++++++++++++ src/modules/job-exec/checkpoint.h | 8 ++++ src/modules/job-exec/job-exec.c | 69 +++++++++++++++++++++++++++++-- src/modules/job-exec/job-exec.h | 1 + 4 files changed, 141 insertions(+), 4 deletions(-) diff --git a/src/modules/job-exec/checkpoint.c b/src/modules/job-exec/checkpoint.c index 94c716380afe..055cb64396e8 100644 --- a/src/modules/job-exec/checkpoint.c +++ b/src/modules/job-exec/checkpoint.c @@ -16,6 +16,9 @@ * primary KVS. This will allow the namespaces to be recreated if * a job manager is brought down than back up. * + * Also provide helper functions to get rootrefs from the checkpointed + * object. + * * OPERATION * * Get the KVS rootrefs for all running jobs and commit to @@ -28,6 +31,7 @@ #endif #include #include +#include #include #include "src/common/libczmqcontainers/czmq_containers.h" @@ -35,6 +39,69 @@ #include "job-exec.h" #include "checkpoint.h" +flux_future_t *checkpoint_get_rootrefs (flux_t *h) +{ + if (!h) { + errno = EINVAL; + return NULL; + } + + return flux_kvs_lookup (h, + NULL, + 0, + "job-exec.kvs-namespaces"); +} + +char *checkpoint_find_rootref (flux_future_t *f, + flux_jobid_t id, + uint32_t owner) +{ + int saved_errno; + flux_t *h = flux_future_get_flux (f); + char *rv = NULL; + const char *rootrefs; + json_error_t error; + json_t *o = NULL; + size_t index; + json_t *value; + + if (flux_kvs_lookup_get (f, &rootrefs) < 0) { + flux_log_error (h, "checkpoint_get_rootref: flux_kvs_lookup_get"); + goto cleanup; + } + + if (!(o = json_loads (rootrefs, 0, &error))) { + flux_log (h, LOG_ERR, "json_loads rootrefs: %s", error.text); + goto cleanup; + } + + json_array_foreach (o, index, value) { + flux_jobid_t l_id; + uint32_t l_owner; + const char *rootref; + + if (json_unpack (value, + "{s:I s:i s:s}", + "id", &l_id, + "owner", &l_owner, + "kvsroot", &rootref) < 0) { + flux_log (h, LOG_ERR, "json_unpack rootref: %s", error.text); + goto cleanup; + } + if (l_id == id && l_owner == owner) { + if (!(rv = strdup (rootref))) + goto cleanup; + break; + } + } + +cleanup: + saved_errno = errno; + json_decref (o); + errno = saved_errno; + return rv; +} + static int lookup_nsroots (flux_t *h, zhashx_t *jobs, flux_future_t **fp) { struct jobinfo *job = zhashx_first (jobs); diff --git a/src/modules/job-exec/checkpoint.h b/src/modules/job-exec/checkpoint.h index 0109dd9ae777..c4d036f5faac 100644 --- a/src/modules/job-exec/checkpoint.h +++ b/src/modules/job-exec/checkpoint.h @@ -11,9 +11,17 @@ #ifndef HAVE_JOB_EXEC_CHECKPOINT_H #define HAVE_JOB_EXEC_CHECKPOINT_H 1 +#include + #include "src/common/libczmqcontainers/czmq_containers.h" #include "job-exec.h" +flux_future_t *checkpoint_get_rootrefs (flux_t *h); + +char *checkpoint_find_rootref (flux_future_t *f, + flux_jobid_t id, + uint32_t owner); + void checkpoint_running (flux_t *h, zhashx_t *jobs); #endif /* !HAVE_JOB_EXEC_CHECKPOINT_EXEC_H */ diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index ec4c1c65b72b..f426f443a442 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -139,6 +139,7 @@ void jobinfo_decref (struct jobinfo *job) free (job->J); resource_set_destroy (job->R); json_decref (job->jobspec); + free (job->rootref); free (job); errno = saved_errno; } @@ -845,8 +846,16 @@ static flux_future_t *ns_create_and_link (flux_t *h, flux_future_t *f = NULL; flux_future_t *f2 = NULL; - if (!(f = flux_kvs_namespace_create (h, job->ns, job->userid, flags)) - || !(f2 = flux_future_and_then (f, namespace_link, job))) { + if (job->reattach && job->rootref) + f = flux_kvs_namespace_create_with (h, + job->ns, + job->rootref, + job->userid, + flags); + else + f = flux_kvs_namespace_create (h, job->ns, job->userid, flags); + + if (!f || !(f2 = flux_future_and_then (f, namespace_link, job))) { flux_log_error (h, "namespace_move: flux_future_and_then"); flux_future_destroy (f); return NULL; @@ -854,6 +863,54 @@ static flux_future_t *ns_create_and_link (flux_t *h, return f2; } +static void get_rootref_cb (flux_future_t *fprev, void *arg) +{ + int saved_errno; + flux_t *h = flux_future_get_flux (fprev); + struct jobinfo *job = arg; + flux_future_t *f = NULL; + + if (!(job->rootref = checkpoint_find_rootref (fprev, + job->id, + job->userid))) + flux_log (job->h, + LOG_DEBUG, + "checkpoint rootref not found: %ju", + (uintmax_t)job->id); + + /* if rootref not found, still create namespace */ + if (!(f = ns_create_and_link (h, job, 0))) + goto error; + + flux_future_continue (fprev, f); + flux_future_destroy (fprev); + return; +error: + saved_errno = errno; + flux_future_destroy (f); + flux_future_continue_error (fprev, saved_errno, NULL); + flux_future_destroy (fprev); +} + +static flux_future_t *ns_get_rootref (flux_t *h, + struct jobinfo *job, + int flags) +{ + flux_future_t *f = NULL; + flux_future_t *f2 = NULL; + + if (!(f = checkpoint_get_rootrefs (h))) { + flux_log_error (h, "ns_get_rootref: checkpoint_get_rootrefs"); + return NULL; + } + if (!f || !(f2 = flux_future_and_then (f, get_rootref_cb, job))) { + flux_log_error (h, "ns_get_rootref: flux_future_and_then"); + flux_future_destroy (f); + return NULL; + } + return f2; +} + /* Asynchronously fetch job data from KVS and create namespace. */ static flux_future_t *jobinfo_start_init (struct jobinfo *job) @@ -871,8 +928,12 @@ static flux_future_t *jobinfo_start_init (struct jobinfo *job) || flux_future_push (f, "J", f_kvs) < 0)) { goto err; } - if (!(f_kvs = ns_create_and_link (h, job, 0)) - || flux_future_push (f, "ns", f_kvs)) + if (job->reattach) + f_kvs = ns_get_rootref (h, job, 0); + else + f_kvs = ns_create_and_link (h, job, 0); + + if (flux_future_push (f, "ns", f_kvs) < 0) goto err; return f; diff --git a/src/modules/job-exec/job-exec.h b/src/modules/job-exec/job-exec.h index efebde89a441..3cf6a25a83a4 100644 --- a/src/modules/job-exec/job-exec.h +++ b/src/modules/job-exec/job-exec.h @@ -61,6 +61,7 @@ struct jobinfo { flux_t * h; flux_jobid_t id; char ns [64]; /* namespace string */ + char * rootref; /* ns rootref if restart */ const flux_msg_t * req; /* initial request */ uint32_t userid; /* requesting userid */ int flags; /* job flags */ From 59909846baece214930640d775c9ad23e31ab8a6 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 17 Nov 2021 16:59:10 -0800 Subject: [PATCH 10/13] job-manager: add reattach debug events Problem: Job "reattach" is difficult to test at the moment. Solution: Add reattach start and finish events to aid in testing. --- src/modules/job-manager/restart.c | 10 ++++++++++ src/modules/job-manager/start.c | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/modules/job-manager/restart.c b/src/modules/job-manager/restart.c index 8cff91779306..ec613189d877 100644 --- a/src/modules/job-manager/restart.c +++ b/src/modules/job-manager/restart.c @@ -269,6 +269,16 @@ int restart_from_kvs (struct job_manager *ctx) else if ((job->state & FLUX_JOB_STATE_RUNNING) != 0) { ctx->running_jobs++; job->reattach = 1; + if ((job->flags & FLUX_JOB_DEBUG)) { + if (event_job_post_pack (ctx->event, + job, + "debug.exec-reattach-start", + 0, + "{s:I}", + "id", (uintmax_t)job->id) < 0) + flux_log_error (ctx->h, "%s: event_job_post_pack id=%ju", + __FUNCTION__, (uintmax_t)job->id); + } } job = zhashx_next (ctx->active_jobs); } diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index f508fb341c82..cd0084a43d33 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -211,7 +211,14 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, } } else if (!strcmp (type, "reattached")) { - /* nothing to do yet */ + if ((job->flags & FLUX_JOB_DEBUG)) { + if (event_job_post_pack (ctx->event, + job, + "debug.exec-reattach-finish", + 0, + NULL) < 0) + goto error_post; + } } else if (!strcmp (type, "release")) { const char *idset; From db509a39f5373faa9580aedfa4f938aec013e2b7 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 11 Nov 2021 21:49:48 -0800 Subject: [PATCH 11/13] etc/rc1: Support FLUX_INSTANCE_RESTART Problem: rc1 cancels all jobs when an instance is exited, but that may not be desireable all of the time, such as some testing scenarios. Solution: Support an environment variable FLUX_INSTANCE_RESTART to notify rc1 that we instance restart is occurring and to not cancel jobs upon instance shutdown. --- etc/rc1 | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etc/rc1 b/etc/rc1 index ab0d8811bffe..56474eb5f00b 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -54,7 +54,8 @@ if test $RANK -eq 0 -a "${FLUX_SCHED_MODULE}" != "none" \ flux module load ${FLUX_SCHED_MODULE:-sched-simple} fi -test $RANK -ne 0 || flux admin cleanup-push <<-EOT +test $RANK -ne 0 -o "${FLUX_INSTANCE_RESTART}" = "t" \ + || flux admin cleanup-push <<-EOT flux queue stop --quiet flux job cancelall --user=all --quiet -f --states RUN flux queue idle --quiet From f8be5ae398f839642a35e8a6aa5963277d8c0110 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 12 Nov 2021 11:11:53 -0800 Subject: [PATCH 12/13] job-exec/testexec: Support reattach_finish test flag Problem: Under test scenarios, it may be difficult to reattach to a job that "already ended". Solution: Support a flag that will assume a reattached job has already finished and will go through the normal process for an already completed job. --- src/modules/job-exec/job-exec.c | 1 + src/modules/job-exec/testexec.c | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index f426f443a442..7ece63eb4beb 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -743,6 +743,7 @@ static void jobinfo_start_continue (flux_future_t *f, void *arg) } job->has_namespace = 1; + /* If an exception was received during startup, no need to continue * with startup */ diff --git a/src/modules/job-exec/testexec.c b/src/modules/job-exec/testexec.c index fa3e6c36d94f..3033f527febd 100644 --- a/src/modules/job-exec/testexec.c +++ b/src/modules/job-exec/testexec.c @@ -32,6 +32,9 @@ * event a testexec job. If job has unlimited * duration, then also wait for finish RPC or * job exception for job finish event. + * "reattach_finish":i - if reattached, assume job has finished + * and no longer has remaining time to run. + * Useful for testing job reattach. * } * */ @@ -59,6 +62,7 @@ struct testexec_ctx { struct testconf { bool enabled; /* test execution enabled */ int override; /* wait for RPC for start event */ + int reattach_finish; /* if reattached, just finish job */ double run_duration; /* duration of fake job in sec */ int wait_status; /* reported status for "finish" */ const char * mock_exception; /* fake excetion at this site */ @@ -112,6 +116,7 @@ static int init_testconf (flux_t *h, struct testconf *conf, json_t *jobspec) /* get/set defaults */ conf->run_duration = jobspec_duration (h, jobspec); conf->override = 0; + conf->reattach_finish = 0; conf->wait_status = 0; conf->mock_exception = NULL; conf->enabled = false; @@ -123,9 +128,10 @@ static int init_testconf (flux_t *h, struct testconf *conf, json_t *jobspec) return 0; conf->enabled = true; if (json_unpack_ex (test, &err, 0, - "{s?s s?i s?i s?s}", + "{s?s s?i s?i s?i s?s}", "run_duration", &trun, "override", &conf->override, + "reattach_finish", &conf->reattach_finish, "wait_status", &conf->wait_status, "mock_exception", &conf->mock_exception) < 0) { flux_log (h, LOG_ERR, "init_testconf: %s", err.text); @@ -283,10 +289,12 @@ static int testexec_reattach (struct testexec *te) } if (testexec_reattach_starttime (te->job, value, &start) < 0) goto cleanup; - /* just use seconds, we approximate runtime left */ - clock_gettime (CLOCK_REALTIME, &now); - if ((now.tv_sec - start) <= te->conf.run_duration) - runtimeleft = (start + te->conf.run_duration) - now.tv_sec; + if (!te->conf.reattach_finish) { + /* just use seconds, we approximate runtime left */ + clock_gettime (CLOCK_REALTIME, &now); + if ((now.tv_sec - start) <= te->conf.run_duration) + runtimeleft = (start + te->conf.run_duration) - now.tv_sec; + } if (start_timer (te->job->h, te, runtimeleft) < 0) { From 61df254629be1e9290a5111fb384054688491780 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 11 Nov 2021 22:00:36 -0800 Subject: [PATCH 13/13] testsuite: Add job instance restart tests Add initial tests to see that jobs can survive instance restarts using the job-exec testexec execution plugin. --- t/Makefile.am | 3 +- t/t3202-instance-restart-testexec.t | 57 +++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100755 t/t3202-instance-restart-testexec.t diff --git a/t/Makefile.am b/t/Makefile.am index b317cd8cf56e..225704516186 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -47,7 +47,8 @@ clean-local: LONGTESTSCRIPTS = \ t5000-valgrind.t \ t3100-flux-in-flux.t \ - t3200-instance-restart.t + t3200-instance-restart.t \ + t3202-instance-restart-testexec.t # This list is included in both TESTS and dist_check_SCRIPTS. TESTSCRIPTS = \ diff --git a/t/t3202-instance-restart-testexec.t b/t/t3202-instance-restart-testexec.t new file mode 100755 index 000000000000..f5e1a8dadf25 --- /dev/null +++ b/t/t3202-instance-restart-testexec.t @@ -0,0 +1,57 @@ +#!/bin/sh + +test_description='Test instance restart and still running jobs with testexec' + +# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: +test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile +. `dirname $0`/sharness.sh + +export FLUX_INSTANCE_RESTART=t + +test_expect_success 'run a testexec job in persistent instance (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux mini submit \ + --flags=debug \ + --setattr=system.exec.test.run_duration=100s \ + hostname >id1.out +' + +test_expect_success 'restart instance, reattach to running job, cancel it (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id1.out) > eventlog_long1.out; \ + flux jobs -n > jobs_long1.out; \ + flux job cancel $(cat id1.out)" && + grep "reattach-start" eventlog_long1.out && + grep "reattach-finish" eventlog_long1.out && + grep $(cat id1.out) jobs_long1.out +' + +test_expect_success 'restart instance, job completed (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id1.out) > eventlog_long2.out; \ + flux jobs -n > jobs_long2.out" && + grep "finish" eventlog_long2.out | grep status && + test_must_fail grep $(cat id1.out) jobs_long2.out +' + +# reattach_finish will indicate to testexcec that the job finished +# right after reattach, emulating a job that finished before the +# instance restarted +test_expect_success 'run a testexec job in persistent instance (exit run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux mini submit \ + --flags=debug \ + --setattr=system.exec.test.reattach_finish=1 \ + --setattr=system.exec.test.run_duration=100s \ + hostname >id2.out +' + +test_expect_success 'restart instance, reattach to running job, its finished (exit run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id2.out) > eventlog_exit1.out" && + grep "reattach-start" eventlog_exit1.out && + grep "reattach-finish" eventlog_exit1.out && + grep "finish" eventlog_exit1.out | grep status +' + +test_done