diff --git a/etc/rc1 b/etc/rc1 index ab0d8811bffe..56474eb5f00b 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -54,7 +54,8 @@ if test $RANK -eq 0 -a "${FLUX_SCHED_MODULE}" != "none" \ flux module load ${FLUX_SCHED_MODULE:-sched-simple} fi -test $RANK -ne 0 || flux admin cleanup-push <<-EOT +test $RANK -ne 0 -o "${FLUX_INSTANCE_RESTART}" = "t" \ + || flux admin cleanup-push <<-EOT flux queue stop --quiet flux job cancelall --user=all --quiet -f --states RUN flux queue idle --quiet diff --git a/src/common/libkvs/kvs_getroot.h b/src/common/libkvs/kvs_getroot.h index 5cdfe995e88c..94c370ba40db 100644 --- a/src/common/libkvs/kvs_getroot.h +++ b/src/common/libkvs/kvs_getroot.h @@ -21,7 +21,7 @@ flux_future_t *flux_kvs_getroot (flux_t *h, const char *ns, int flags); /* Decode KVS root hash response. * * treeobj - get the hash as an RFC 11 "dirref" object. - * blobref - get the raw hash as a n RFC 10 "blobref". + * blobref - get the raw hash as an RFC 10 "blobref". * sequence - get the commit sequence number * owner - get the userid of the namespace owner */ diff --git a/src/modules/job-exec/Makefile.am b/src/modules/job-exec/Makefile.am index 791262832a6c..618dcf47daf8 100644 --- a/src/modules/job-exec/Makefile.am +++ b/src/modules/job-exec/Makefile.am @@ -27,6 +27,8 @@ libbulk_exec_la_SOURCES = \ job_exec_la_SOURCES = \ job-exec.h \ job-exec.c \ + checkpoint.h \ + checkpoint.c \ rset.c \ rset.h \ testexec.c \ diff --git a/src/modules/job-exec/checkpoint.c b/src/modules/job-exec/checkpoint.c new file mode 100644 index 000000000000..055cb64396e8 --- /dev/null +++ b/src/modules/job-exec/checkpoint.c @@ -0,0 +1,253 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* Prototype checkpoint of running jobs KVS root refs + * + * DESCRIPTION + * + * Handle checkpoint of running job's guest KVS namescapes into the + * primary KVS. This will allow the namespaces to be recreated if + * a job manager is brought down than back up. + * + * Also provide helper functions to get rootrefs from the checkpointed + * object. + * + * OPERATION + * + * Get the KVS rootrefs for all running jobs and commit to + * "job-exec.kvs-namespaces". + * + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include + +#include "src/common/libczmqcontainers/czmq_containers.h" + +#include "job-exec.h" +#include "checkpoint.h" + +flux_future_t *checkpoint_get_rootrefs (flux_t *h) +{ + if (!h) { + errno = EINVAL; + return NULL; + } + + return flux_kvs_lookup (h, + NULL, + 0, + "job-exec.kvs-namespaces"); +} + +char *checkpoint_find_rootref (flux_future_t *f, + flux_jobid_t id, + uint32_t owner) +{ + int saved_errno; + flux_t *h = flux_future_get_flux (f); + char *rv = NULL; + const char *rootrefs; + json_error_t error; + json_t *o = NULL; + size_t index; + json_t *value; + + if (flux_kvs_lookup_get (f, &rootrefs) < 0) { + flux_log_error (h, "checkpoint_get_rootref: flux_kvs_lookup_get"); + goto cleanup; + } + + if (!(o = json_loads (rootrefs, 0, &error))) { + flux_log (h, LOG_ERR, "json_loads rootrefs: %s", error.text); + goto cleanup; + } + + json_array_foreach (o, index, value) { + flux_jobid_t l_id; + uint32_t l_owner; + const char *rootref; + + if (json_unpack (value, + "{s:I s:i s:s}", + "id", &l_id, + "owner", &l_owner, + "kvsroot", &rootref) < 0) { + flux_log (h, LOG_ERR, "json_unpack rootref: %s", error.text); + goto cleanup; + } + if (l_id == id && l_owner == owner) { + if (!(rv = strdup (rootref))) + goto cleanup; + break; + } + } + +cleanup: + saved_errno = errno; + json_decref (o); + errno = saved_errno; + return rv; +} + +static int lookup_nsroots (flux_t *h, zhashx_t *jobs, flux_future_t **fp) +{ + struct jobinfo *job = zhashx_first (jobs); + flux_future_t *fall = NULL; + flux_future_t *f = NULL; + + while (job) { + if (job->running) { + if (!fall) { + if (!(fall = flux_future_wait_all_create ())) + goto cleanup; + flux_future_set_flux (fall, h); + } + if (!(f = flux_kvs_getroot (h, job->ns, 0))) + goto cleanup; + if (flux_future_aux_set (f, "jobinfo", job, NULL) < 0) + goto cleanup; + if (flux_future_push (fall, job->ns, f) < 0) + goto cleanup; + f = NULL; + } + job = zhashx_next (jobs); + } + + (*fp) = fall; + return 0; + +cleanup: + flux_future_destroy (f); + flux_future_destroy (fall); + return -1; +} + +static json_t *get_nsroots (flux_t *h, flux_future_t *fall) +{ + const char *child; + json_t *nsdata = NULL; + int saved_errno; + + if (!(nsdata = json_array ())) { + errno = ENOMEM; + return NULL; + } + + child = flux_future_first_child (fall); + while (child) { + flux_future_t *f = flux_future_get_child (fall, child); + struct jobinfo *job; + const char *blobref = NULL; + json_t *o = NULL; + if (!f) + goto cleanup; + if (!(job = flux_future_aux_get (f, "jobinfo"))) + goto cleanup; + if (flux_kvs_getroot_get_blobref (f, &blobref) < 0) + goto cleanup; + if (!(o = json_pack ("{s:I s:i s:s}", + "id", job->id, + "owner", job->userid, + "kvsroot", blobref))) { + errno = ENOMEM; + goto cleanup; + } + if (json_array_append (nsdata, o) < 0) { + json_decref (o); + errno = ENOMEM; + goto cleanup; + } + json_decref (o); + child = flux_future_next_child (fall); + } + + return nsdata; +cleanup: + saved_errno = errno; + json_decref (nsdata); + errno = saved_errno; + return NULL; +} + +static int checkpoint_commit (flux_t *h, json_t *nsdata) +{ + flux_future_t *f = NULL; + flux_kvs_txn_t *txn = NULL; + char *s = NULL; + int rv = -1; + + if (!(s = json_dumps (nsdata, JSON_COMPACT))) { + errno = ENOMEM; + goto cleanup; + } + + if (!(txn = flux_kvs_txn_create ())) + goto cleanup; + + if (flux_kvs_txn_put (txn, + 0, + "job-exec.kvs-namespaces", + s) < 0) + goto cleanup; + + if (!(f = flux_kvs_commit (h, NULL, 0, txn))) + goto cleanup; + + if (flux_future_get (f, NULL) < 0) + goto cleanup; + + rv = 0; +cleanup: + flux_future_destroy (f); + flux_kvs_txn_destroy (txn); + free (s); + return rv; +} + +void checkpoint_running (flux_t *h, zhashx_t *jobs) +{ + flux_future_t *lookupf = NULL; + json_t *nsdata = NULL; + + if (!h || !jobs) + return; + + if (lookup_nsroots (h, jobs, &lookupf) < 0) { + flux_log_error (h, "failed to lookup ns root refs"); + goto cleanup; + } + + if (!lookupf) + return; + + if (!(nsdata = get_nsroots (h, lookupf))) { + flux_log_error (h, "failure getting ns root refs"); + goto cleanup; + } + + if (checkpoint_commit (h, nsdata) < 0) { + flux_log_error (h, "failure committing ns checkpoint data"); + goto cleanup; + } + +cleanup: + json_decref (nsdata); + flux_future_destroy (lookupf); +} + +/* + * vi: tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-exec/checkpoint.h b/src/modules/job-exec/checkpoint.h new file mode 100644 index 000000000000..c4d036f5faac --- /dev/null +++ b/src/modules/job-exec/checkpoint.h @@ -0,0 +1,30 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef HAVE_JOB_EXEC_CHECKPOINT_H +#define HAVE_JOB_EXEC_CHECKPOINT_H 1 + +#include + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "job-exec.h" + +flux_future_t *checkpoint_get_rootrefs (flux_t *h); + +char *checkpoint_find_rootref (flux_future_t *f, + flux_jobid_t id, + uint32_t owner); + +void checkpoint_running (flux_t *h, zhashx_t *jobs); + +#endif /* !HAVE_JOB_EXEC_CHECKPOINT_EXEC_H */ + +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/job-exec/exec.c b/src/modules/job-exec/exec.c index 3bb837a9e749..eec88ad3e24f 100644 --- a/src/modules/job-exec/exec.c +++ b/src/modules/job-exec/exec.c @@ -111,7 +111,7 @@ static const char *job_get_cwd (struct jobinfo *job) static void start_cb (struct bulk_exec *exec, void *arg) { struct jobinfo *job = arg; - jobinfo_started (job, NULL); + jobinfo_started (job); /* This is going to be really slow. However, it should at least * work for now. We wait for all imp's to start, then send input */ diff --git a/src/modules/job-exec/job-exec.c b/src/modules/job-exec/job-exec.c index 5e95cd3aeaec..7ece63eb4beb 100644 --- a/src/modules/job-exec/job-exec.c +++ b/src/modules/job-exec/job-exec.c @@ -26,7 +26,7 @@ * * JOB INIT: * - * On reciept of a start request, the exec service enters initialization + * On receipt of a start request, the exec service enters initialization * phase of the job, where the jobspec and R are fetched from the KVS, * and the guest namespace is created and linked from the primary * namespace. A guest.exec.eventlog is created with an initial "init" @@ -99,6 +99,7 @@ #include "src/common/libutil/errno_safe.h" #include "job-exec.h" +#include "checkpoint.h" static double kill_timeout=5.0; @@ -138,6 +139,7 @@ void jobinfo_decref (struct jobinfo *job) free (job->J); resource_set_destroy (job->R); json_decref (job->jobspec); + free (job->rootref); free (job); errno = saved_errno; } @@ -290,7 +292,7 @@ static int jobinfo_send_release (struct jobinfo *job, } static int jobinfo_respond (flux_t *h, struct jobinfo *job, - const char *event, int status) + const char *event) { return flux_respond_pack (h, job->req, "{s:I s:s s:{}}", "id", job->id, @@ -319,19 +321,26 @@ static void jobinfo_complete (struct jobinfo *job, const struct idset *ranks) } } -void jobinfo_started (struct jobinfo *job, const char *fmt, ...) +void jobinfo_started (struct jobinfo *job) { flux_t *h = job->ctx->h; if (h && job->req) { - va_list ap; - va_start (ap, fmt); job->running = 1; - va_end (ap); - if (jobinfo_respond (h, job, "start", 0) < 0) + if (jobinfo_respond (h, job, "start") < 0) flux_log_error (h, "jobinfo_started: flux_respond"); } } +void jobinfo_reattached (struct jobinfo *job) +{ + flux_t *h = job->ctx->h; + if (h && job->req) { + job->running = 1; + if (jobinfo_respond (h, job, "reattached") < 0) + flux_log_error (h, "jobinfo_reattach: flux_respond"); + } +} + static void kill_timer_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { @@ -594,7 +603,10 @@ static int jobinfo_finalize (struct jobinfo *job) static int jobinfo_start_execution (struct jobinfo *job) { - jobinfo_emit_event_pack_nowait (job, "starting", NULL); + if (job->reattach) + jobinfo_emit_event_pack_nowait (job, "re-starting", NULL); + else + jobinfo_emit_event_pack_nowait (job, "starting", NULL); /* Set started flag before calling 'start' method because we want to * be sure to clean up properly if an exception occurs */ @@ -731,6 +743,7 @@ static void jobinfo_start_continue (flux_future_t *f, void *arg) } job->has_namespace = 1; + /* If an exception was received during startup, no need to continue * with startup */ @@ -808,7 +821,9 @@ static void namespace_link (flux_future_t *fprev, void *arg) goto error; } flux_future_set_flux (cf, h); - if (!(f = jobinfo_emit_event_pack (job, "init", NULL)) + if (!(f = jobinfo_emit_event_pack (job, + job->reattach ? "reattach" : "init", + NULL)) || flux_future_push (cf, "emit event", f) < 0) goto error; @@ -832,8 +847,16 @@ static flux_future_t *ns_create_and_link (flux_t *h, flux_future_t *f = NULL; flux_future_t *f2 = NULL; - if (!(f = flux_kvs_namespace_create (h, job->ns, job->userid, flags)) - || !(f2 = flux_future_and_then (f, namespace_link, job))) { + if (job->reattach && job->rootref) + f = flux_kvs_namespace_create_with (h, + job->ns, + job->rootref, + job->userid, + flags); + else + f = flux_kvs_namespace_create (h, job->ns, job->userid, flags); + + if (!f || !(f2 = flux_future_and_then (f, namespace_link, job))) { flux_log_error (h, "namespace_move: flux_future_and_then"); flux_future_destroy (f); return NULL; @@ -841,6 +864,54 @@ static flux_future_t *ns_create_and_link (flux_t *h, return f2; } +static void get_rootref_cb (flux_future_t *fprev, void *arg) +{ + int saved_errno; + flux_t *h = flux_future_get_flux (fprev); + struct jobinfo *job = arg; + flux_future_t *f = NULL; + + if (!(job->rootref = checkpoint_find_rootref (fprev, + job->id, + job->userid))) + flux_log (job->h, + LOG_DEBUG, + "checkpoint rootref not found: %ju", + (uintmax_t)job->id); + + /* if rootref not found, still create namespace */ + if (!(f = ns_create_and_link (h, job, 0))) + goto error; + + flux_future_continue (fprev, f); + flux_future_destroy (fprev); + return; +error: + saved_errno = errno; + flux_future_destroy (f); + flux_future_continue_error (fprev, saved_errno, NULL); + flux_future_destroy (fprev); +} + +static flux_future_t *ns_get_rootref (flux_t *h, + struct jobinfo *job, + int flags) +{ + flux_future_t *f = NULL; + flux_future_t *f2 = NULL; + + if (!(f = checkpoint_get_rootrefs (h))) { + flux_log_error (h, "ns_get_rootref: checkpoint_get_rootrefs"); + return NULL; + } + if (!f || !(f2 = flux_future_and_then (f, get_rootref_cb, job))) { + flux_log_error (h, "ns_get_rootref: flux_future_and_then"); + flux_future_destroy (f); + return NULL; + } + return f2; +} + /* Asynchronously fetch job data from KVS and create namespace. */ static flux_future_t *jobinfo_start_init (struct jobinfo *job) @@ -858,8 +929,12 @@ static flux_future_t *jobinfo_start_init (struct jobinfo *job) || flux_future_push (f, "J", f_kvs) < 0)) { goto err; } - if (!(f_kvs = ns_create_and_link (h, job, 0)) - || flux_future_push (f, "ns", f_kvs)) + if (job->reattach) + f_kvs = ns_get_rootref (h, job, 0); + else + f_kvs = ns_create_and_link (h, job, 0); + + if (flux_future_push (f, "ns", f_kvs) < 0) goto err; return f; @@ -916,10 +991,11 @@ static int job_start (struct job_exec_ctx *ctx, const flux_msg_t *msg) job->ctx = ctx; - if (flux_request_unpack (job->req, NULL, "{s:I s:i s:O}", + if (flux_request_unpack (job->req, NULL, "{s:I s:i s:O s:b}", "id", &job->id, "userid", &job->userid, - "jobspec", &job->jobspec) < 0) { + "jobspec", &job->jobspec, + "reattach", &job->reattach) < 0) { flux_log_error (ctx->h, "start: flux_request_unpack"); jobinfo_decref (job); return -1; @@ -1103,10 +1179,50 @@ static int configure_implementations (flux_t *h, int argc, char **argv) return 0; } -static int unload_implementations (void) +static int remove_running_ns (struct job_exec_ctx *ctx) +{ + struct jobinfo *job = zhashx_first (ctx->jobs); + flux_future_t *fall = NULL; + flux_future_t *f = NULL; + int rv = -1; + + while (job) { + if (job->running) { + if (!fall) { + if (!(fall = flux_future_wait_all_create ())) + goto cleanup; + flux_future_set_flux (fall, ctx->h); + } + if (!(f = flux_kvs_namespace_remove (ctx->h, job->ns))) + goto cleanup; + if (flux_future_push (fall, job->ns, f) < 0) + goto cleanup; + f = NULL; + } + job = zhashx_next (ctx->jobs); + } + + if (fall) { + if (flux_future_wait_for (fall, -1.) < 0) + goto cleanup; + } + + rv = 0; +cleanup: + flux_future_destroy (f); + flux_future_destroy (fall); + return rv; +} + +static int unload_implementations (struct job_exec_ctx *ctx) { struct exec_implementation *impl; int i = 0; + if (ctx && ctx->jobs) { + checkpoint_running (ctx->h, ctx->jobs); + if (remove_running_ns (ctx) < 0) + flux_log_error (ctx->h, "failed to remove guest namespaces"); + } while ((impl = implementations[i]) && impl->name) { if (impl->unload) (*impl->unload) (); @@ -1146,7 +1262,7 @@ int mod_main (flux_t *h, int argc, char **argv) rc = flux_reactor_run (flux_get_reactor (h), 0); out: - unload_implementations (); + unload_implementations (ctx); saved_errno = errno; if (flux_event_unsubscribe (h, "job-exception") < 0) diff --git a/src/modules/job-exec/job-exec.h b/src/modules/job-exec/job-exec.h index 2cbed57e9b66..3cf6a25a83a4 100644 --- a/src/modules/job-exec/job-exec.h +++ b/src/modules/job-exec/job-exec.h @@ -61,6 +61,7 @@ struct jobinfo { flux_t * h; flux_jobid_t id; char ns [64]; /* namespace string */ + char * rootref; /* ns rootref if restart */ const flux_msg_t * req; /* initial request */ uint32_t userid; /* requesting userid */ int flags; /* job flags */ @@ -77,6 +78,7 @@ struct jobinfo { uint8_t running:1; /* all shells are running */ uint8_t finalizing:1; /* in process of cleanup */ + int reattach; /* job-manager reattach attempt */ int wait_status; struct eventlogger * ev; /* event batcher */ @@ -102,8 +104,11 @@ int jobinfo_emit_event_pack_nowait (struct jobinfo *job, const char *name, const char *fmt, ...); -/* Emit start event with optional note in jansson pack format */ -void jobinfo_started (struct jobinfo *job, const char *fmt, ...); +/* Emit start event */ +void jobinfo_started (struct jobinfo *job); + +/* Emit reattached event */ +void jobinfo_reattached (struct jobinfo *job); /* Notify job-exec that ranks in idset `ranks` have completed * with the given wait status diff --git a/src/modules/job-exec/testexec.c b/src/modules/job-exec/testexec.c index c8f6e7e71f2d..3033f527febd 100644 --- a/src/modules/job-exec/testexec.c +++ b/src/modules/job-exec/testexec.c @@ -32,6 +32,9 @@ * event a testexec job. If job has unlimited * duration, then also wait for finish RPC or * job exception for job finish event. + * "reattach_finish":i - if reattached, assume job has finished + * and no longer has remaining time to run. + * Useful for testing job reattach. * } * */ @@ -40,8 +43,12 @@ # include "config.h" #endif +#include +#include + #include "src/common/libutil/fsd.h" #include "src/common/libjob/job_hash.h" +#include "src/common/libeventlog/eventlog.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "job-exec.h" @@ -55,6 +62,7 @@ struct testexec_ctx { struct testconf { bool enabled; /* test execution enabled */ int override; /* wait for RPC for start event */ + int reattach_finish; /* if reattached, just finish job */ double run_duration; /* duration of fake job in sec */ int wait_status; /* reported status for "finish" */ const char * mock_exception; /* fake excetion at this site */ @@ -108,6 +116,7 @@ static int init_testconf (flux_t *h, struct testconf *conf, json_t *jobspec) /* get/set defaults */ conf->run_duration = jobspec_duration (h, jobspec); conf->override = 0; + conf->reattach_finish = 0; conf->wait_status = 0; conf->mock_exception = NULL; conf->enabled = false; @@ -119,9 +128,10 @@ static int init_testconf (flux_t *h, struct testconf *conf, json_t *jobspec) return 0; conf->enabled = true; if (json_unpack_ex (test, &err, 0, - "{s?s s?i s?i s?s}", + "{s?s s?i s?i s?i s?s}", "run_duration", &trun, "override", &conf->override, + "reattach_finish", &conf->reattach_finish, "wait_status", &conf->wait_status, "mock_exception", &conf->mock_exception) < 0) { flux_log (h, LOG_ERR, "init_testconf: %s", err.text); @@ -160,10 +170,9 @@ static void timer_cb (flux_reactor_t *r, * is sent when the timer fires (simulating the exit of the final * job shell.) */ -static int start_timer (flux_t *h, struct testexec *te) +static int start_timer (flux_t *h, struct testexec *te, double t) { flux_reactor_t *r = flux_get_reactor (h); - double t = te->conf.run_duration; /* For now, if a job duration wasn't found, complete job almost * immediately. @@ -171,7 +180,6 @@ static int start_timer (flux_t *h, struct testexec *te) if (t < 0.) t = 1.e-5; if (t >= 0.) { - char timebuf[256]; if (t > 0.) { te->timer = flux_timer_watcher_create (r, t, 0., timer_cb, te); if (!te->timer) { @@ -179,10 +187,11 @@ static int start_timer (flux_t *h, struct testexec *te) return -1; } flux_watcher_start (te->timer); - snprintf (timebuf, sizeof (timebuf), "%.6fs", t); - } else - strncpy (timebuf, "inf", sizeof (timebuf)); - jobinfo_started (te->job, "{ s:s }", "timer", timebuf); + } + if (te->job->reattach) + jobinfo_reattached (te->job); + else + jobinfo_started (te->job); } else return -1; @@ -219,17 +228,104 @@ static int testexec_init (struct jobinfo *job) return 1; } +static int testexec_reattach_starttime (struct jobinfo *job, + const char *eventlog, + time_t *startp) +{ + json_t *o = NULL; + size_t index; + json_t *value; + int rv = -1; + + if (!(o = eventlog_decode (eventlog))) { + jobinfo_fatal_error (job, errno, "eventlog_decode"); + goto cleanup; + } + + json_array_foreach (o, index, value) { + double timestamp; + const char *name; + + if (eventlog_entry_parse (value, ×tamp, &name, NULL) < 0) { + jobinfo_fatal_error (job, errno, "eventlog_entry_parse"); + goto cleanup; + } + if (!strcmp (name, "start")) { + (*startp) = (time_t)timestamp; + break; + } + } + + rv = 0; +cleanup: + json_decref (o); + return rv; +} + +static int testexec_reattach (struct testexec *te) +{ + const char *value; + flux_future_t *f = NULL; + time_t start = 0; + struct timespec now; + time_t runtimeleft = -1; + int rv = -1; + char ekey[256]; + + if (flux_job_kvs_key (ekey, sizeof (ekey), te->job->id, "eventlog") < 0) { + jobinfo_fatal_error (te->job, errno, "flux_job_kvs_key"); + goto cleanup; + } + if (!(f = flux_kvs_lookup (te->job->h, + NULL, + 0, + ekey))) { + jobinfo_fatal_error (te->job, errno, "flux_kvs_lookup"); + goto cleanup; + } + if (flux_kvs_lookup_get (f, &value) < 0) { + jobinfo_fatal_error (te->job, errno, "flux_kvs_lookup_get starttimes"); + goto cleanup; + } + if (testexec_reattach_starttime (te->job, value, &start) < 0) + goto cleanup; + if (!te->conf.reattach_finish) { + /* just use seconds, we approximate runtime left */ + clock_gettime (CLOCK_REALTIME, &now); + if ((now.tv_sec - start) <= te->conf.run_duration) + runtimeleft = (start + te->conf.run_duration) - now.tv_sec; + } + if (start_timer (te->job->h, + te, + runtimeleft) < 0) { + jobinfo_fatal_error (te->job, errno, "unable to restart timer"); + goto cleanup; + } + rv = 0; +cleanup: + flux_future_destroy (f); + return rv; +} + static int testexec_start (struct jobinfo *job) { struct testexec *te = job->data; - if (!te->conf.override && start_timer (job->h, te) < 0) { - jobinfo_fatal_error (job, errno, "unable to start test exec timer"); - return -1; + if (job->reattach) { + if (testexec_reattach (te) < 0) + return -1; } - if (testconf_mock_exception (&te->conf, "run")) { - jobinfo_fatal_error (job, 0, "mock run exception generated"); - return -1; + else { + if (!te->conf.override && start_timer (job->h, + te, + te->conf.run_duration) < 0) { + jobinfo_fatal_error (job, errno, "unable to start test exec timer"); + return -1; + } + if (testconf_mock_exception (&te->conf, "run")) { + jobinfo_fatal_error (job, 0, "mock run exception generated"); + return -1; + } } return 0; } @@ -297,7 +393,7 @@ static void testexec_request_cb (flux_t *h, errno = EINVAL; goto error; } - if (start_timer (h, te) < 0) + if (start_timer (h, te, te->conf.run_duration) < 0) goto error; } else if (strcmp (event, "finish") == 0) { diff --git a/src/modules/job-manager/job.h b/src/modules/job-manager/job.h index d3d4a14332ee..a3d86ee1abfb 100644 --- a/src/modules/job-manager/job.h +++ b/src/modules/job-manager/job.h @@ -40,6 +40,7 @@ struct job { uint8_t free_pending:1; // free request sent to sched uint8_t has_resources:1; uint8_t start_pending:1;// start request sent to job-exec + uint8_t reattach:1; uint8_t perilog_active; // if nonzero, prolog/epilog active diff --git a/src/modules/job-manager/restart.c b/src/modules/job-manager/restart.c index b932a6b3d7d7..ec613189d877 100644 --- a/src/modules/job-manager/restart.c +++ b/src/modules/job-manager/restart.c @@ -266,8 +266,20 @@ int restart_from_kvs (struct job_manager *ctx) __FUNCTION__, (uintmax_t)job->id); } } - else if ((job->state & FLUX_JOB_STATE_RUNNING) != 0) + else if ((job->state & FLUX_JOB_STATE_RUNNING) != 0) { ctx->running_jobs++; + job->reattach = 1; + if ((job->flags & FLUX_JOB_DEBUG)) { + if (event_job_post_pack (ctx->event, + job, + "debug.exec-reattach-start", + 0, + "{s:I}", + "id", (uintmax_t)job->id) < 0) + flux_log_error (ctx->h, "%s: event_job_post_pack id=%ju", + __FUNCTION__, (uintmax_t)job->id); + } + } job = zhashx_next (ctx->active_jobs); } flux_log (ctx->h, LOG_INFO, "restart: %d running jobs", ctx->running_jobs); diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index bc7a11c8b35b..cd0084a43d33 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -200,8 +200,25 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } if (!strcmp (type, "start")) { - if (event_job_post_pack (ctx->event, job, "start", 0, NULL) < 0) - goto error_post; + if (job->reattach) + flux_log (h, + LOG_ERR, + "start response: id=%ju should not get start event", + (uintmax_t)id); + else { + if (event_job_post_pack (ctx->event, job, "start", 0, NULL) < 0) + goto error_post; + } + } + else if (!strcmp (type, "reattached")) { + if ((job->flags & FLUX_JOB_DEBUG)) { + if (event_job_post_pack (ctx->event, + job, + "debug.exec-reattach-finish", + 0, + NULL) < 0) + goto error_post; + } } else if (!strcmp (type, "release")) { const char *idset; @@ -275,10 +292,11 @@ int start_send_request (struct start *start, struct job *job) if (!job->start_pending && start->topic != NULL) { if (!(msg = flux_request_encode (start->topic, NULL))) return -1; - if (flux_msg_pack (msg, "{s:I s:i s:O}", + if (flux_msg_pack (msg, "{s:I s:i s:O s:b}", "id", job->id, "userid", job->userid, - "jobspec", job->jobspec_redacted) < 0) + "jobspec", job->jobspec_redacted, + "reattach", job->reattach) < 0) goto error; if (flux_send (ctx->h, msg, 0) < 0) goto error; diff --git a/t/Makefile.am b/t/Makefile.am index b317cd8cf56e..225704516186 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -47,7 +47,8 @@ clean-local: LONGTESTSCRIPTS = \ t5000-valgrind.t \ t3100-flux-in-flux.t \ - t3200-instance-restart.t + t3200-instance-restart.t \ + t3202-instance-restart-testexec.t # This list is included in both TESTS and dist_check_SCRIPTS. TESTSCRIPTS = \ diff --git a/t/t3202-instance-restart-testexec.t b/t/t3202-instance-restart-testexec.t new file mode 100755 index 000000000000..f5e1a8dadf25 --- /dev/null +++ b/t/t3202-instance-restart-testexec.t @@ -0,0 +1,57 @@ +#!/bin/sh + +test_description='Test instance restart and still running jobs with testexec' + +# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: +test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile +. `dirname $0`/sharness.sh + +export FLUX_INSTANCE_RESTART=t + +test_expect_success 'run a testexec job in persistent instance (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux mini submit \ + --flags=debug \ + --setattr=system.exec.test.run_duration=100s \ + hostname >id1.out +' + +test_expect_success 'restart instance, reattach to running job, cancel it (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id1.out) > eventlog_long1.out; \ + flux jobs -n > jobs_long1.out; \ + flux job cancel $(cat id1.out)" && + grep "reattach-start" eventlog_long1.out && + grep "reattach-finish" eventlog_long1.out && + grep $(cat id1.out) jobs_long1.out +' + +test_expect_success 'restart instance, job completed (long run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id1.out) > eventlog_long2.out; \ + flux jobs -n > jobs_long2.out" && + grep "finish" eventlog_long2.out | grep status && + test_must_fail grep $(cat id1.out) jobs_long2.out +' + +# reattach_finish will indicate to testexcec that the job finished +# right after reattach, emulating a job that finished before the +# instance restarted +test_expect_success 'run a testexec job in persistent instance (exit run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux mini submit \ + --flags=debug \ + --setattr=system.exec.test.reattach_finish=1 \ + --setattr=system.exec.test.run_duration=100s \ + hostname >id2.out +' + +test_expect_success 'restart instance, reattach to running job, its finished (exit run)' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + sh -c "flux job eventlog $(cat id2.out) > eventlog_exit1.out" && + grep "reattach-start" eventlog_exit1.out && + grep "reattach-finish" eventlog_exit1.out && + grep "finish" eventlog_exit1.out | grep status +' + +test_done