diff --git a/src/common/libkvs/kvs.c b/src/common/libkvs/kvs.c index d6d535ce7cfa..14ac86ded92f 100644 --- a/src/common/libkvs/kvs.c +++ b/src/common/libkvs/kvs.c @@ -16,22 +16,53 @@ #include #include +#include "src/common/libutil/blobref.h" + +#include "treeobj.h" #include "kvs_util_private.h" flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *ns, uint32_t owner, int flags) { + flux_future_t *rv = NULL; + const char *hash_name; + json_t *rootdir = NULL; + char rootref[BLOBREF_MAX_STRING_SIZE]; + void *data = NULL; + int len; + if (!ns || flags) { errno = EINVAL; return NULL; } + if (!(hash_name = flux_attr_get (h, "content.hash"))) + goto cleanup; + + if (!(rootdir = treeobj_create_dir ())) + goto cleanup; + + if (!(data = treeobj_encode (rootdir))) + goto cleanup; + len = strlen (data); + + /* N.B. blobref of empty treeobj dir guaranteed to be in content store + * b/c that is how the primary KVS is initialized. + */ + if (blobref_hash (hash_name, data, len, rootref, sizeof (rootref)) < 0) + goto cleanup; + /* N.B. owner cast to int */ - return flux_rpc_pack (h, "kvs.namespace-create", 0, 0, - "{ s:s s:i s:i }", - "namespace", ns, - "owner", owner, - "flags", flags); + rv = flux_rpc_pack (h, "kvs.namespace-create", 0, 0, + "{ s:s s:s s:i s:i }", + "namespace", ns, + "rootref", rootref, + "owner", owner, + "flags", flags); +cleanup: + json_decref (rootdir); + free (data); + return rv; } flux_future_t *flux_kvs_namespace_remove (flux_t *h, const char *ns) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 65d840f34f3a..e79536c6eae5 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2230,15 +2230,12 @@ static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh, } static int namespace_create (struct kvs_ctx *ctx, const char *ns, - uint32_t owner, int flags, const char **errmsg) + const char *rootref, uint32_t owner, + int flags, const char **errmsg) { struct kvsroot *root; - json_t *rootdir = NULL; - char ref[BLOBREF_MAX_STRING_SIZE]; - void *data = NULL; flux_msg_t *msg = NULL; char *topic = NULL; - int len; int rv = -1; /* If namespace already exists, return EEXIST. Doesn't matter if @@ -2262,23 +2259,7 @@ static int namespace_create (struct kvs_ctx *ctx, const char *ns, return -1; } - if (!(rootdir = treeobj_create_dir ())) { - flux_log_error (ctx->h, "%s: treeobj_create_dir", __FUNCTION__); - goto cleanup; - } - - if (!(data = treeobj_encode (rootdir))) { - flux_log_error (ctx->h, "%s: treeobj_encode", __FUNCTION__); - goto cleanup; - } - len = strlen (data); - - if (blobref_hash (ctx->hash_name, data, len, ref, sizeof (ref)) < 0) { - flux_log_error (ctx->h, "%s: blobref_hash", __FUNCTION__); - goto cleanup; - } - - setroot (ctx, root, ref, 0); + setroot (ctx, root, rootref, 0); if (event_subscribe (ctx, ns) < 0) { flux_log_error (ctx->h, "%s: event_subscribe", __FUNCTION__); @@ -2312,8 +2293,6 @@ static int namespace_create (struct kvs_ctx *ctx, const char *ns, cleanup: if (rv < 0) kvsroot_mgr_remove_root (ctx->krm, ns); - free (data); - json_decref (rootdir); free (topic); flux_msg_destroy (msg); return rv; @@ -2325,14 +2304,16 @@ static void namespace_create_request_cb (flux_t *h, flux_msg_handler_t *mh, struct kvs_ctx *ctx = arg; const char *errmsg = NULL; const char *ns; + const char *rootref; uint32_t owner; int flags; assert (ctx->rank == 0); /* N.B. owner read into uint32_t */ - if (flux_request_unpack (msg, NULL, "{ s:s s:i s:i }", + if (flux_request_unpack (msg, NULL, "{ s:s s:s s:i s:i }", "namespace", &ns, + "rootref", &rootref, "owner", &owner, "flags", &flags) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); @@ -2342,7 +2323,7 @@ static void namespace_create_request_cb (flux_t *h, flux_msg_handler_t *mh, if (owner == FLUX_USERID_UNKNOWN) owner = getuid (); - if (namespace_create (ctx, ns, owner, flags, &errmsg) < 0) + if (namespace_create (ctx, ns, rootref, owner, flags, &errmsg) < 0) goto error; if (flux_respond (h, msg, NULL) < 0)