Skip to content

Commit

Permalink
Merge pull request #3947 from chu11/issue3811_kvscheckpoint
Browse files Browse the repository at this point in the history
job-manager / job-exec: checkpoint and restore guest KVS namespaces
  • Loading branch information
mergify[bot] authored Nov 20, 2021
2 parents 90cc9c8 + 61df254 commit 562d29b
Show file tree
Hide file tree
Showing 14 changed files with 635 additions and 43 deletions.
3 changes: 2 additions & 1 deletion etc/rc1
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ if test $RANK -eq 0 -a "${FLUX_SCHED_MODULE}" != "none" \
flux module load ${FLUX_SCHED_MODULE:-sched-simple}
fi

test $RANK -ne 0 || flux admin cleanup-push <<-EOT
test $RANK -ne 0 -o "${FLUX_INSTANCE_RESTART}" = "t" \
|| flux admin cleanup-push <<-EOT
flux queue stop --quiet
flux job cancelall --user=all --quiet -f --states RUN
flux queue idle --quiet
Expand Down
2 changes: 1 addition & 1 deletion src/common/libkvs/kvs_getroot.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flux_future_t *flux_kvs_getroot (flux_t *h, const char *ns, int flags);
/* Decode KVS root hash response.
*
* treeobj - get the hash as an RFC 11 "dirref" object.
* blobref - get the raw hash as a n RFC 10 "blobref".
* blobref - get the raw hash as an RFC 10 "blobref".
* sequence - get the commit sequence number
* owner - get the userid of the namespace owner
*/
Expand Down
2 changes: 2 additions & 0 deletions src/modules/job-exec/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ libbulk_exec_la_SOURCES = \
job_exec_la_SOURCES = \
job-exec.h \
job-exec.c \
checkpoint.h \
checkpoint.c \
rset.c \
rset.h \
testexec.c \
Expand Down
253 changes: 253 additions & 0 deletions src/modules/job-exec/checkpoint.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* Prototype checkpoint of running jobs KVS root refs
*
* DESCRIPTION
*
* Handle checkpoint of running job's guest KVS namescapes into the
* primary KVS. This will allow the namespaces to be recreated if
* a job manager is brought down than back up.
*
* Also provide helper functions to get rootrefs from the checkpointed
* object.
*
* OPERATION
*
* Get the KVS rootrefs for all running jobs and commit to
* "job-exec.kvs-namespaces".
*
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"

#include "job-exec.h"
#include "checkpoint.h"

flux_future_t *checkpoint_get_rootrefs (flux_t *h)
{
if (!h) {
errno = EINVAL;
return NULL;
}

return flux_kvs_lookup (h,
NULL,
0,
"job-exec.kvs-namespaces");
}

char *checkpoint_find_rootref (flux_future_t *f,
flux_jobid_t id,
uint32_t owner)
{
int saved_errno;
flux_t *h = flux_future_get_flux (f);
char *rv = NULL;
const char *rootrefs;
json_error_t error;
json_t *o = NULL;
size_t index;
json_t *value;

if (flux_kvs_lookup_get (f, &rootrefs) < 0) {
flux_log_error (h, "checkpoint_get_rootref: flux_kvs_lookup_get");
goto cleanup;
}

if (!(o = json_loads (rootrefs, 0, &error))) {
flux_log (h, LOG_ERR, "json_loads rootrefs: %s", error.text);
goto cleanup;
}

json_array_foreach (o, index, value) {
flux_jobid_t l_id;
uint32_t l_owner;
const char *rootref;

if (json_unpack (value,
"{s:I s:i s:s}",
"id", &l_id,
"owner", &l_owner,
"kvsroot", &rootref) < 0) {
flux_log (h, LOG_ERR, "json_unpack rootref: %s", error.text);
goto cleanup;
}
if (l_id == id && l_owner == owner) {
if (!(rv = strdup (rootref)))
goto cleanup;
break;
}
}

cleanup:
saved_errno = errno;
json_decref (o);
errno = saved_errno;
return rv;
}

static int lookup_nsroots (flux_t *h, zhashx_t *jobs, flux_future_t **fp)
{
struct jobinfo *job = zhashx_first (jobs);
flux_future_t *fall = NULL;
flux_future_t *f = NULL;

while (job) {
if (job->running) {
if (!fall) {
if (!(fall = flux_future_wait_all_create ()))
goto cleanup;
flux_future_set_flux (fall, h);
}
if (!(f = flux_kvs_getroot (h, job->ns, 0)))
goto cleanup;
if (flux_future_aux_set (f, "jobinfo", job, NULL) < 0)
goto cleanup;
if (flux_future_push (fall, job->ns, f) < 0)
goto cleanup;
f = NULL;
}
job = zhashx_next (jobs);
}

(*fp) = fall;
return 0;

cleanup:
flux_future_destroy (f);
flux_future_destroy (fall);
return -1;
}

static json_t *get_nsroots (flux_t *h, flux_future_t *fall)
{
const char *child;
json_t *nsdata = NULL;
int saved_errno;

if (!(nsdata = json_array ())) {
errno = ENOMEM;
return NULL;
}

child = flux_future_first_child (fall);
while (child) {
flux_future_t *f = flux_future_get_child (fall, child);
struct jobinfo *job;
const char *blobref = NULL;
json_t *o = NULL;
if (!f)
goto cleanup;
if (!(job = flux_future_aux_get (f, "jobinfo")))
goto cleanup;
if (flux_kvs_getroot_get_blobref (f, &blobref) < 0)
goto cleanup;
if (!(o = json_pack ("{s:I s:i s:s}",
"id", job->id,
"owner", job->userid,
"kvsroot", blobref))) {
errno = ENOMEM;
goto cleanup;
}
if (json_array_append (nsdata, o) < 0) {
json_decref (o);
errno = ENOMEM;
goto cleanup;
}
json_decref (o);
child = flux_future_next_child (fall);
}

return nsdata;
cleanup:
saved_errno = errno;
json_decref (nsdata);
errno = saved_errno;
return NULL;
}

static int checkpoint_commit (flux_t *h, json_t *nsdata)
{
flux_future_t *f = NULL;
flux_kvs_txn_t *txn = NULL;
char *s = NULL;
int rv = -1;

if (!(s = json_dumps (nsdata, JSON_COMPACT))) {
errno = ENOMEM;
goto cleanup;
}

if (!(txn = flux_kvs_txn_create ()))
goto cleanup;

if (flux_kvs_txn_put (txn,
0,
"job-exec.kvs-namespaces",
s) < 0)
goto cleanup;

if (!(f = flux_kvs_commit (h, NULL, 0, txn)))
goto cleanup;

if (flux_future_get (f, NULL) < 0)
goto cleanup;

rv = 0;
cleanup:
flux_future_destroy (f);
flux_kvs_txn_destroy (txn);
free (s);
return rv;
}

void checkpoint_running (flux_t *h, zhashx_t *jobs)
{
flux_future_t *lookupf = NULL;
json_t *nsdata = NULL;

if (!h || !jobs)
return;

if (lookup_nsroots (h, jobs, &lookupf) < 0) {
flux_log_error (h, "failed to lookup ns root refs");
goto cleanup;
}

if (!lookupf)
return;

if (!(nsdata = get_nsroots (h, lookupf))) {
flux_log_error (h, "failure getting ns root refs");
goto cleanup;
}

if (checkpoint_commit (h, nsdata) < 0) {
flux_log_error (h, "failure committing ns checkpoint data");
goto cleanup;
}

cleanup:
json_decref (nsdata);
flux_future_destroy (lookupf);
}

/*
* vi: tabstop=4 shiftwidth=4 expandtab
*/
30 changes: 30 additions & 0 deletions src/modules/job-exec/checkpoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef HAVE_JOB_EXEC_CHECKPOINT_H
#define HAVE_JOB_EXEC_CHECKPOINT_H 1

#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "job-exec.h"

flux_future_t *checkpoint_get_rootrefs (flux_t *h);

char *checkpoint_find_rootref (flux_future_t *f,
flux_jobid_t id,
uint32_t owner);

void checkpoint_running (flux_t *h, zhashx_t *jobs);

#endif /* !HAVE_JOB_EXEC_CHECKPOINT_EXEC_H */

/* vi: ts=4 sw=4 expandtab
*/
2 changes: 1 addition & 1 deletion src/modules/job-exec/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static const char *job_get_cwd (struct jobinfo *job)
static void start_cb (struct bulk_exec *exec, void *arg)
{
struct jobinfo *job = arg;
jobinfo_started (job, NULL);
jobinfo_started (job);
/* This is going to be really slow. However, it should at least
* work for now. We wait for all imp's to start, then send input
*/
Expand Down
Loading

0 comments on commit 562d29b

Please sign in to comment.