Skip to content

Commit

Permalink
kvs: support rootref in namespace create RPC
Browse files Browse the repository at this point in the history
Problem: In the future we would like to checkpoint KVS guest
namespaces and restart them.  However, there is no way to
"re-initialize" a KVS namespace to a specific root reference.

Solution: Refactor the KVS namespace create RPC to take a root
reference.  Callers must supply the initial root reference.
Update flux_kvs_namespace_create() to supply the root reference
of an empty directory.
  • Loading branch information
chu11 committed Nov 5, 2021
1 parent 0ebdfc4 commit 7314bca
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
41 changes: 36 additions & 5 deletions src/common/libkvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,53 @@
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libutil/blobref.h"

#include "treeobj.h"
#include "kvs_util_private.h"

flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *ns,
uint32_t owner, int flags)
{
flux_future_t *rv = NULL;
const char *hash_name;
json_t *rootdir = NULL;
char rootref[BLOBREF_MAX_STRING_SIZE];
void *data = NULL;
int len;

if (!ns || flags) {
errno = EINVAL;
return NULL;
}

if (!(hash_name = flux_attr_get (h, "content.hash")))
goto cleanup;

if (!(rootdir = treeobj_create_dir ()))
goto cleanup;

if (!(data = treeobj_encode (rootdir)))
goto cleanup;
len = strlen (data);

/* N.B. blobref of empty treeobj dir guaranteed to be in content store
* b/c that is how the primary KVS is initialized.
*/
if (blobref_hash (hash_name, data, len, rootref, sizeof (rootref)) < 0)
goto cleanup;

/* N.B. owner cast to int */
return flux_rpc_pack (h, "kvs.namespace-create", 0, 0,
"{ s:s s:i s:i }",
"namespace", ns,
"owner", owner,
"flags", flags);
rv = flux_rpc_pack (h, "kvs.namespace-create", 0, 0,
"{ s:s s:s s:i s:i }",
"namespace", ns,
"rootref", rootref,
"owner", owner,
"flags", flags);
cleanup:
json_decref (rootdir);
free (data);
return rv;
}

flux_future_t *flux_kvs_namespace_remove (flux_t *h, const char *ns)
Expand Down
33 changes: 7 additions & 26 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2230,15 +2230,12 @@ static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh,
}

static int namespace_create (struct kvs_ctx *ctx, const char *ns,
uint32_t owner, int flags, const char **errmsg)
const char *rootref, uint32_t owner,
int flags, const char **errmsg)
{
struct kvsroot *root;
json_t *rootdir = NULL;
char ref[BLOBREF_MAX_STRING_SIZE];
void *data = NULL;
flux_msg_t *msg = NULL;
char *topic = NULL;
int len;
int rv = -1;

/* If namespace already exists, return EEXIST. Doesn't matter if
Expand All @@ -2262,23 +2259,7 @@ static int namespace_create (struct kvs_ctx *ctx, const char *ns,
return -1;
}

if (!(rootdir = treeobj_create_dir ())) {
flux_log_error (ctx->h, "%s: treeobj_create_dir", __FUNCTION__);
goto cleanup;
}

if (!(data = treeobj_encode (rootdir))) {
flux_log_error (ctx->h, "%s: treeobj_encode", __FUNCTION__);
goto cleanup;
}
len = strlen (data);

if (blobref_hash (ctx->hash_name, data, len, ref, sizeof (ref)) < 0) {
flux_log_error (ctx->h, "%s: blobref_hash", __FUNCTION__);
goto cleanup;
}

setroot (ctx, root, ref, 0);
setroot (ctx, root, rootref, 0);

if (event_subscribe (ctx, ns) < 0) {
flux_log_error (ctx->h, "%s: event_subscribe", __FUNCTION__);
Expand Down Expand Up @@ -2312,8 +2293,6 @@ static int namespace_create (struct kvs_ctx *ctx, const char *ns,
cleanup:
if (rv < 0)
kvsroot_mgr_remove_root (ctx->krm, ns);
free (data);
json_decref (rootdir);
free (topic);
flux_msg_destroy (msg);
return rv;
Expand All @@ -2325,14 +2304,16 @@ static void namespace_create_request_cb (flux_t *h, flux_msg_handler_t *mh,
struct kvs_ctx *ctx = arg;
const char *errmsg = NULL;
const char *ns;
const char *rootref;
uint32_t owner;
int flags;

assert (ctx->rank == 0);

/* N.B. owner read into uint32_t */
if (flux_request_unpack (msg, NULL, "{ s:s s:i s:i }",
if (flux_request_unpack (msg, NULL, "{ s:s s:s s:i s:i }",
"namespace", &ns,
"rootref", &rootref,
"owner", &owner,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
Expand All @@ -2342,7 +2323,7 @@ static void namespace_create_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (owner == FLUX_USERID_UNKNOWN)
owner = getuid ();

if (namespace_create (ctx, ns, owner, flags, &errmsg) < 0)
if (namespace_create (ctx, ns, rootref, owner, flags, &errmsg) < 0)
goto error;

if (flux_respond (h, msg, NULL) < 0)
Expand Down

0 comments on commit 7314bca

Please sign in to comment.