diff --git a/doc/man1/flux-kvs.adoc b/doc/man1/flux-kvs.adoc index cb1952dd149a..b629eb1cfb5d 100644 --- a/doc/man1/flux-kvs.adoc +++ b/doc/man1/flux-kvs.adoc @@ -42,6 +42,9 @@ arguments are described below. COMMANDS -------- +*namespace-create* 'name' ['name...']:: +Create a namespace in which kvs values will be read/written to. + *get* [-j|-r|-t] [-a treeobj] 'key' ['key...']:: Retrieve the value stored under 'key'. If nothing has been stored under 'key', display an error message. If no options, value is displayed with diff --git a/doc/man3/Makefile.am b/doc/man3/Makefile.am index cfa1e9a61f42..37bf528ca91e 100644 --- a/doc/man3/Makefile.am +++ b/doc/man3/Makefile.am @@ -44,7 +44,8 @@ MAN3_FILES_PRIMARY = \ flux_future_create.3 \ flux_kvs_lookup.3 \ flux_kvs_commit.3 \ - flux_kvs_txn_create.3 + flux_kvs_txn_create.3 \ + flux_kvs_namespace_create.3 # These files are generated as roff .so includes of a primary page. # A2X handles this automatically if mentioned in NAME section diff --git a/doc/man3/flux_kvs_commit.adoc b/doc/man3/flux_kvs_commit.adoc index 9e2f1a92b114..6d93c9680501 100644 --- a/doc/man3/flux_kvs_commit.adoc +++ b/doc/man3/flux_kvs_commit.adoc @@ -48,6 +48,11 @@ block until the response has been received. Both accept an optional timeout. was successful, indicating the entire transaction was committed, or -1 on failure, indicating none of the transaction was committed. +By default, both `flux_kvs_commit()` and `flux_kvs_fence()` operate on +the default KVS namespace. To use a different namespace, set the +environment variable FLUX_KVS_NAMESPACE to the namespace you wish to +use. + FLAGS ----- @@ -83,6 +88,8 @@ A request was malformed. ENOSYS:: The KVS module is not loaded. +ENOTSUP:: +An unknown namespace was requested. AUTHOR ------ diff --git a/doc/man3/flux_kvs_lookup.adoc b/doc/man3/flux_kvs_lookup.adoc index dee2add8374c..e87b40c84b43 100644 --- a/doc/man3/flux_kvs_lookup.adoc +++ b/doc/man3/flux_kvs_lookup.adoc @@ -48,6 +48,11 @@ _treeobj_ is a serialized RFC 11 object that references a particular static set of content within the KVS, effectively a snapshot. See `flux_kvs_lookup_get_treeobj()` below. +By default, both `flux_kvs_lookup()` and `flux_kvs_lookupat()` operate +on the default KVS namespace. To use a different namespace, set the +environment variable FLUX_KVS_NAMESPACE to the namespace you wish to +use. + All the functions below are variations on a common theme. First they complete the lookup RPC by blocking on the response, if not already received. Then they interpret the result in different ways. They may be called more @@ -145,6 +150,8 @@ EFBIG:: ENOSYS:: The KVS module is not loaded. +ENOTSUP:: +An unknown namespace was requested. AUTHOR ------ diff --git a/doc/man3/flux_kvs_namespace_create.adoc b/doc/man3/flux_kvs_namespace_create.adoc new file mode 100644 index 000000000000..d669a6e1c35b --- /dev/null +++ b/doc/man3/flux_kvs_namespace_create.adoc @@ -0,0 +1,74 @@ +flux_kvs_namespace_create(3) +============================ +:doctype: manpage + + +NAME +---- +flux_kvs_namespace_create - create a KVS namespace + + +SYNOPSIS +-------- + #include + + flux_future_t *flux_kvs_namespace_create (flux_t *h, + const char *namespace, + int flags); + +DESCRIPTION +----------- + +`flux_kvs_namespace_create()` creates a KVS namespace. Within a +namespace, users can get/put KVS values completely independent of +other KVS namespaces. + +FLAGS +----- + +The _flags_ mask is currently unused and should be set to 0. + + +RETURN VALUE +------------ + +`flux_kvs_namespace_create()` returns a `flux_future_t` on success, or +NULL on failure with errno set appropriately. + + +ERRORS +------ + +EINVAL:: +One of the arguments was invalid. + +ENOMEM:: +Out of memory. + +EPROTO:: +A request was malformed. + +ENOSYS:: +The KVS module is not loaded. + +EEXIST:: +The namespace already exists. + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + + +SEE ALSO +--------- +flux_kvs_lookup(3), flux_kvs_commit(3) diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index 43490647ef02..cd41679d98e8 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -405,3 +405,5 @@ vpack dirref lookups mh +namespaces +ENOTSUP diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index 471bbb8eb6f0..88350ffd5111 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -37,6 +37,7 @@ #include "src/common/libutil/readall.h" #include "src/common/libkvs/treeobj.h" +int cmd_namespace_create (optparse_t *p, int argc, char **argv); int cmd_get (optparse_t *p, int argc, char **argv); int cmd_put (optparse_t *p, int argc, char **argv); int cmd_unlink (optparse_t *p, int argc, char **argv); @@ -169,6 +170,13 @@ static struct optparse_option unlink_opts[] = { }; static struct optparse_subcommand subcommands[] = { + { "namespace-create", + "name [name...]", + "Create a KVS namespace", + cmd_namespace_create, + 0, + NULL + }, { "get", "[-j|-r|-t] [-a treeobj] key [key...]", "Get value stored under key", @@ -334,6 +342,28 @@ int main (int argc, char *argv[]) return (exitval); } +int cmd_namespace_create (optparse_t *p, int argc, char **argv) +{ + flux_t *h = (flux_t *)optparse_get_data (p, "flux_handle"); + flux_future_t *f; + int optindex, i; + + optindex = optparse_option_index (p); + if ((optindex - argc) == 0) { + optparse_print_usage (p); + exit (1); + } + for (i = optindex; i < argc; i++) { + const char *name = argv[i]; + int flags = 0; + if (!(f = flux_kvs_namespace_create (h, name, flags)) + || flux_future_get (f, NULL) < 0) + log_err_exit ("%s", name); + flux_future_destroy (f); + } + return (0); +} + static void kv_printf (const char *key, int maxcol, const char *fmt, ...) { va_list ap; diff --git a/src/common/libkvs/Makefile.am b/src/common/libkvs/Makefile.am index 4c00a17ec298..0ee2a77779cd 100644 --- a/src/common/libkvs/Makefile.am +++ b/src/common/libkvs/Makefile.am @@ -13,6 +13,7 @@ noinst_LTLIBRARIES = libkvs.la libkvs_la_SOURCES = \ kvs.c \ + kvs_private.h \ kvs_lookup.c \ kvs_dir.c \ kvs_dir_private.h \ @@ -34,6 +35,7 @@ fluxcoreinclude_HEADERS = \ kvs_commit.h TESTS = \ + test_kvs.t \ test_kvs_txn.t \ test_kvs_lookup.t \ test_kvs_dir.t \ @@ -57,6 +59,10 @@ test_cppflags = \ $(AM_CPPFLAGS) \ -I$(top_srcdir)/src/common/libtap +test_kvs_t_SOURCES = test/kvs.c +test_kvs_t_CPPFLAGS = $(test_cppflags) +test_kvs_t_LDADD = $(test_ldadd) $(LIBDL) + test_kvs_txn_t_SOURCES = test/kvs_txn.c test_kvs_txn_t_CPPFLAGS = $(test_cppflags) test_kvs_txn_t_LDADD = $(test_ldadd) $(LIBDL) diff --git a/src/common/libkvs/kvs.c b/src/common/libkvs/kvs.c index c60f98ebc4e2..aa4458c21652 100644 --- a/src/common/libkvs/kvs.c +++ b/src/common/libkvs/kvs.c @@ -27,13 +27,36 @@ #endif #include +const char *get_kvs_namespace (void) +{ + if (getenv ("FLUX_KVS_NAMESPACE")) + return getenv ("FLUX_KVS_NAMESPACE"); + return KVS_PRIMARY_NAMESPACE; +} + +flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *namespace, + int flags) +{ + if (!namespace || flags) { + errno = EINVAL; + return NULL; + } + + return flux_rpc_pack (h, "kvs.namespace.create", 0, 0, + "{ s:s s:i }", + "namespace", namespace, + "flags", flags); +} + int flux_kvs_get_version (flux_t *h, int *versionp) { flux_future_t *f; + const char *namespace = get_kvs_namespace (); int version; int rc = -1; - if (!(f = flux_rpc (h, "kvs.getroot", NULL, FLUX_NODEID_ANY, 0))) + if (!(f = flux_rpc_pack (h, "kvs.getroot", FLUX_NODEID_ANY, 0, "{ s:s }", + "namespace", namespace))) goto done; if (flux_rpc_get_unpack (f, "{ s:i }", "rootseq", &version) < 0) goto done; @@ -48,10 +71,12 @@ int flux_kvs_get_version (flux_t *h, int *versionp) int flux_kvs_wait_version (flux_t *h, int version) { flux_future_t *f; + const char *namespace = get_kvs_namespace (); int ret = -1; - if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i }", - "rootseq", version))) + if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i s:s }", + "rootseq", version, + "namespace", namespace))) goto done; /* N.B. response contains (rootseq, rootref) but we don't need it. */ diff --git a/src/common/libkvs/kvs.h b/src/common/libkvs/kvs.h index 3ed8a40a375d..49ff747f1eab 100644 --- a/src/common/libkvs/kvs.h +++ b/src/common/libkvs/kvs.h @@ -14,6 +14,8 @@ extern "C" { #endif +#define KVS_PRIMARY_NAMESPACE "primary" + enum { FLUX_KVS_READDIR = 1, FLUX_KVS_READLINK = 2, @@ -21,9 +23,14 @@ enum { FLUX_KVS_APPEND = 32, }; +/* Namespace */ +flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *namespace, + int flags); + /* Synchronization: * Process A commits data, then gets the store version V and sends it to B. * Process B waits for the store version to be >= V, then reads data. + * To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE. */ int flux_kvs_get_version (flux_t *h, int *versionp); int flux_kvs_wait_version (flux_t *h, int version); diff --git a/src/common/libkvs/kvs_commit.c b/src/common/libkvs/kvs_commit.c index efe10e6040de..893e0df0bc36 100644 --- a/src/common/libkvs/kvs_commit.c +++ b/src/common/libkvs/kvs_commit.c @@ -29,12 +29,15 @@ #include #include +#include "kvs_private.h" #include "kvs_txn_private.h" #include "src/common/libutil/blobref.h" flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name, int nprocs, flux_kvs_txn_t *txn) { + const char *namespace = get_kvs_namespace (); + if (txn) { json_t *ops; if (!(ops = txn_get_ops (txn))) { @@ -42,16 +45,18 @@ flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name, return NULL; } return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0, - "{s:s s:i s:i s:O}", + "{s:s s:i s:s s:i s:O}", "name", name, "nprocs", nprocs, + "namespace", namespace, "flags", flags, "ops", ops); } else { return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0, - "{s:s s:i s:i s:[]}", + "{s:s s:i s:s s:i s:[]}", "name", name, "nprocs", nprocs, + "namespace", namespace, "flags", flags, "ops"); } diff --git a/src/common/libkvs/kvs_commit.h b/src/common/libkvs/kvs_commit.h index 14369aec5335..90567748dfb8 100644 --- a/src/common/libkvs/kvs_commit.h +++ b/src/common/libkvs/kvs_commit.h @@ -9,6 +9,8 @@ enum kvs_commit_flags { FLUX_KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */ }; +/* To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ + flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn); flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name, diff --git a/src/common/libkvs/kvs_lookup.c b/src/common/libkvs/kvs_lookup.c index 1bb2874cf2d8..c8d724f1e14e 100644 --- a/src/common/libkvs/kvs_lookup.c +++ b/src/common/libkvs/kvs_lookup.c @@ -32,6 +32,7 @@ #include #include +#include "kvs_private.h" #include "kvs_dir_private.h" #include "kvs_lookup.h" #include "treeobj.h" @@ -99,6 +100,7 @@ flux_future_t *flux_kvs_lookup (flux_t *h, int flags, const char *key) { struct lookup_ctx *ctx; flux_future_t *f; + const char *namespace = get_kvs_namespace (); if (!h || !key || strlen (key) == 0 || validate_lookup_flags (flags) < 0) { errno = EINVAL; @@ -106,9 +108,11 @@ flux_future_t *flux_kvs_lookup (flux_t *h, int flags, const char *key) } if (!(ctx = alloc_ctx (h, flags, key))) return NULL; - if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0, "{s:s s:i}", - "key", key, - "flags", flags))) { + if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0, + "{s:s s:s s:i}", + "key", key, + "namespace", namespace, + "flags", flags))) { free_ctx (ctx); return NULL; } @@ -140,6 +144,8 @@ flux_future_t *flux_kvs_lookupat (flux_t *h, int flags, const char *key, } } else { + const char *namespace = get_kvs_namespace (); + if (!(ctx->atref = strdup (treeobj))) return NULL; if (!(obj = json_loads (treeobj, 0, NULL))) { @@ -147,9 +153,11 @@ flux_future_t *flux_kvs_lookupat (flux_t *h, int flags, const char *key, return NULL; } if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0, - "{s:s s:i s:O}", "key", key, - "flags", flags, - "rootdir", obj))) { + "{s:s s:s s:i s:O}", + "key", key, + "namespace", namespace, + "flags", flags, + "rootdir", obj))) { free_ctx (ctx); json_decref (obj); return NULL; diff --git a/src/common/libkvs/kvs_lookup.h b/src/common/libkvs/kvs_lookup.h index 30fd3ef52d44..3c3ca3cb2b43 100644 --- a/src/common/libkvs/kvs_lookup.h +++ b/src/common/libkvs/kvs_lookup.h @@ -5,6 +5,8 @@ extern "C" { #endif +/* To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ + flux_future_t *flux_kvs_lookup (flux_t *h, int flags, const char *key); flux_future_t *flux_kvs_lookupat (flux_t *h, int flags, const char *key, const char *treeobj); diff --git a/src/common/libkvs/kvs_private.h b/src/common/libkvs/kvs_private.h new file mode 100644 index 000000000000..20367166884c --- /dev/null +++ b/src/common/libkvs/kvs_private.h @@ -0,0 +1,10 @@ +#ifndef _KVS_PRIVATE_H +#define _KVS_PRIVATE_H + +const char *get_kvs_namespace (void); + +#endif /* !_KVS_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libkvs/kvs_watch.c b/src/common/libkvs/kvs_watch.c index 23320b9c9da9..0a919d09689a 100644 --- a/src/common/libkvs/kvs_watch.c +++ b/src/common/libkvs/kvs_watch.c @@ -30,6 +30,7 @@ #include #include "treeobj.h" +#include "kvs_private.h" #include "kvs_dir_private.h" typedef enum { @@ -147,10 +148,13 @@ static kvs_watcher_t *lookup_watcher (flux_t *h, uint32_t matchtag) int flux_kvs_unwatch (flux_t *h, const char *key) { flux_future_t *f = NULL; + const char *namespace = get_kvs_namespace (); int rc = -1; if (!(f = flux_rpc_pack (h, "kvs.unwatch", FLUX_NODEID_ANY, 0, - "{s:s}", "key", key))) + "{s:s s:s}", + "key", key, + "namespace", namespace))) goto done; if (flux_future_get (f, NULL) < 0) goto done; @@ -232,6 +236,7 @@ static flux_future_t *kvs_watch_rpc (flux_t *h, const char *key, const char *json_str, int flags) { flux_future_t *f; + const char *namespace = get_kvs_namespace (); json_t *val = NULL; int saved_errno; @@ -242,8 +247,9 @@ static flux_future_t *kvs_watch_rpc (flux_t *h, const char *key, goto error; } if (!(f = flux_rpc_pack (h, "kvs.watch", FLUX_NODEID_ANY, 0, - "{s:s s:i s:o}", + "{s:s s:s s:i s:o}", "key", key, + "namespace", namespace, "flags", flags, "val", val))) { goto error; diff --git a/src/common/libkvs/kvs_watch.h b/src/common/libkvs/kvs_watch.h index a987038e4e54..a9b923fef1f7 100644 --- a/src/common/libkvs/kvs_watch.h +++ b/src/common/libkvs/kvs_watch.h @@ -40,6 +40,7 @@ typedef int (*kvs_set_dir_f)(const char *key, flux_kvsdir_t *dir, void *arg, * Callback is triggered once during registration to get the initial value. * Once the reactor is (re-)entered, it will then be called each time the * key changes. + * To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ int flux_kvs_watch (flux_t *h, const char *key, kvs_set_f set, void *arg); @@ -49,6 +50,7 @@ int flux_kvs_watch (flux_t *h, const char *key, kvs_set_f set, void *arg); * KVS's hash tree namespace organization, this function will be called * whenever any key under this directory changes, since that forces the * hash references to change on parents, all the way to the root. + * To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ int flux_kvs_watch_dir (flux_t *h, kvs_set_dir_f set, void *arg, const char *fmt, ...) @@ -56,6 +58,7 @@ int flux_kvs_watch_dir (flux_t *h, kvs_set_dir_f set, void *arg, /* Cancel a flux_kvs_watch(), freeing server-side state, and unregistering * any callback. Returns 0 on success, or -1 with errno set on error. + * To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ int flux_kvs_unwatch (flux_t *h, const char *key); @@ -70,6 +73,8 @@ int flux_kvs_unwatch (flux_t *h, const char *key); * * If 'key' initially exists, then is removed, the function fails with * ENOENT and the initial value is not freed. + * + * To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */ int flux_kvs_watch_once (flux_t *h, const char *key, char **json_str); diff --git a/src/common/libkvs/test/kvs.c b/src/common/libkvs/test/kvs.c new file mode 100644 index 000000000000..65a7a1998e92 --- /dev/null +++ b/src/common/libkvs/test/kvs.c @@ -0,0 +1,55 @@ +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "src/common/libflux/flux.h" +#include "kvs.h" +#include "kvs_private.h" +#include "src/common/libtap/tap.h" + +void errors (void) +{ + /* check simple error cases */ + + errno = 0; + ok (flux_kvs_namespace_create (NULL, NULL, 5) == NULL && errno == EINVAL, + "flux_kvs_namespace_create fails on bad input"); +} + +void namespace (void) +{ + const char *str; + + ok (setenv ("FLUX_KVS_NAMESPACE", "FOOBAR", 1) == 0, + "setenv FLUX_KVS_NAMESPACE success"); + ok ((str = get_kvs_namespace ()) != NULL, + "get_kvs_namespace returns non-NULL"); + ok (!strcmp (str, "FOOBAR"), + "get_kvs_namespace returns correct non-default namespace"); + ok (unsetenv ("FLUX_KVS_NAMESPACE") == 0, + "unsetenv FLUX_KVS_NAMESPACE success"); + ok ((str = get_kvs_namespace ()) != NULL, + "get_kvs_namespace returns non-NULL"); + ok (!strcmp (str, KVS_PRIMARY_NAMESPACE), + "get_kvs_namespace returns correct default namespace"); +} + +int main (int argc, char *argv[]) +{ + + plan (NO_PLAN); + + errors (); + namespace (); + + done_testing(); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 1e495db18331..ee968069053e 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -62,18 +62,10 @@ const int max_lastuse_age = 5; */ const bool event_includes_rootdir = true; -struct kvsroot { - int seq; - blobref_t ref; -}; - typedef struct { int magic; struct cache *cache; /* blobref => cache_entry */ - struct kvsroot root; - commit_mgr_t *cm; - waitqueue_t *watchlist; - int watchlist_lastrun_epoch; + zhash_t *roothash; int faults; /* for kvs.stats.get, etc. */ flux_t *h; uint32_t rank; @@ -82,17 +74,32 @@ typedef struct { flux_watcher_t *idle_w; flux_watcher_t *check_w; int commit_merge; + bool events_init; /* flag */ const char *hash_name; } kvs_ctx_t; +struct kvsroot { + char *namespace; + int seq; + blobref_t ref; + commit_mgr_t *cm; + waitqueue_t *watchlist; + int watchlist_lastrun_epoch; + int flags; + kvs_ctx_t *ctx; +}; + struct kvs_cb_data { kvs_ctx_t *ctx; wait_t *wait; int errnum; }; -static int setroot_event_send (kvs_ctx_t *ctx, json_t *names); -static int error_event_send (kvs_ctx_t *ctx, json_t *names, int errnum); +static struct kvsroot *getroot (kvs_ctx_t *ctx, const char *namespace); +static int setroot_event_send (kvs_ctx_t *ctx, struct kvsroot *root, + json_t *names); +static int error_event_send (kvs_ctx_t *ctx, struct kvsroot *root, + json_t *names, int errnum); static void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); static void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -103,9 +110,7 @@ static void freectx (void *arg) kvs_ctx_t *ctx = arg; if (ctx) { cache_destroy (ctx->cache); - commit_mgr_destroy (ctx->cm); - if (ctx->watchlist) - wait_queue_destroy (ctx->watchlist); + zhash_destroy (&ctx->roothash); flux_watcher_destroy (ctx->prep_w); flux_watcher_destroy (ctx->check_w); flux_watcher_destroy (ctx->idle_w); @@ -135,9 +140,8 @@ static kvs_ctx_t *getctx (flux_t *h) goto error; } ctx->cache = cache_create (); - ctx->watchlist = wait_queue_create (); - ctx->cm = commit_mgr_create (ctx->cache, ctx->hash_name, h, ctx); - if (!ctx->cache || !ctx->watchlist || !ctx->cm) { + ctx->roothash = zhash_new (); + if (!ctx->cache || !ctx->roothash) { saved_errno = ENOMEM; goto error; } @@ -381,18 +385,19 @@ static int content_store_request_send (kvs_ctx_t *ctx, const void *data, return rc; } -static void setroot (kvs_ctx_t *ctx, const char *rootref, int rootseq) +static void setroot (kvs_ctx_t *ctx, struct kvsroot *root, + const char *rootref, int rootseq) { - if (rootseq == 0 || rootseq > ctx->root.seq) { + if (rootseq == 0 || rootseq > root->seq) { assert (strlen (rootref) < sizeof (blobref_t)); - strcpy (ctx->root.ref, rootref); - ctx->root.seq = rootseq; + strcpy (root->ref, rootref); + root->seq = rootseq; /* log error on wait_runqueue(), don't error out. watchers * may miss value change, but will never get older one. * Maintains consistency model */ - if (wait_runqueue (ctx->watchlist) < 0) + if (wait_runqueue (root->watchlist) < 0) flux_log_error (ctx->h, "%s: wait_runqueue", __FUNCTION__); - ctx->watchlist_lastrun_epoch = ctx->epoch; + root->watchlist_lastrun_epoch = ctx->epoch; } } @@ -454,7 +459,8 @@ static int commit_cache_cb (commit_t *c, struct cache_entry *entry, void *data) */ static void commit_apply (commit_t *c) { - kvs_ctx_t *ctx = commit_get_aux (c); + struct kvsroot *root = commit_get_aux (c); + kvs_ctx_t *ctx = root->ctx; wait_t *wait = NULL; int errnum = 0; commit_process_t ret; @@ -464,7 +470,7 @@ static void commit_apply (commit_t *c) if ((ret = commit_process (c, ctx->epoch, - ctx->root.ref)) == COMMIT_PROCESS_ERROR) { + root->ref)) == COMMIT_PROCESS_ERROR) { errnum = commit_get_errnum (c); goto done; } @@ -526,9 +532,8 @@ static void commit_apply (commit_t *c) /* else ret == COMMIT_PROCESS_FINISHED */ /* This is the transaction that finalizes the commit by replacing - * ctx->root.ref with newroot, incrementing ctx->root.seq, - * and sending out the setroot event for "eventual consistency" - * of other nodes. + * root->ref with newroot, incrementing root->seq, and sending out + * the setroot event for "eventual consistency" of other nodes. */ done: if (errnum == 0) { @@ -540,20 +545,20 @@ static void commit_apply (commit_t *c) flux_log (ctx->h, LOG_DEBUG, "aggregated %d commits (%d ops)", count, opcount); } - setroot (ctx, commit_get_newroot_ref (c), ctx->root.seq + 1); - setroot_event_send (ctx, fence_get_json_names (f)); + setroot (ctx, root, commit_get_newroot_ref (c), root->seq + 1); + setroot_event_send (ctx, root, fence_get_json_names (f)); } else { fence_t *f = commit_get_fence (c); flux_log (ctx->h, LOG_ERR, "commit failed: %s", flux_strerror (errnum)); - error_event_send (ctx, fence_get_json_names (f), errnum); + error_event_send (ctx, root, fence_get_json_names (f), errnum); } wait_destroy (wait); /* Completed: remove from 'ready' list. * N.B. fence_t remains in the fences hash until event is received. */ - commit_mgr_remove_commit (ctx->cm, c); + commit_mgr_remove_commit (root->cm, c); return; stall: @@ -564,8 +569,21 @@ static void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + bool ready = false; + + root = zhash_first (ctx->roothash); + while (root) { + + if (commit_mgr_commits_ready (root->cm)) { + ready = true; + break; + } + + root = zhash_next (ctx->roothash); + } - if (commit_mgr_commits_ready (ctx->cm)) + if (ready) flux_watcher_start (ctx->idle_w); } @@ -573,19 +591,25 @@ static void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; commit_t *c; flux_watcher_stop (ctx->idle_w); - if ((c = commit_mgr_get_ready_commit (ctx->cm))) { - if (ctx->commit_merge) { - /* if merge fails, set errnum in commit_t, let - * commit_apply() handle error handling. - */ - if (commit_mgr_merge_ready_commits (ctx->cm) < 0) - commit_set_aux_errnum (c, errno); + root = zhash_first (ctx->roothash); + while (root) { + if ((c = commit_mgr_get_ready_commit (root->cm))) { + if (ctx->commit_merge) { + /* if merge fails, set errnum in commit_t, let + * commit_apply() handle error handling. + */ + if (commit_mgr_merge_ready_commits (root->cm) < 0) + commit_set_aux_errnum (c, errno); + } + commit_apply (c); } - commit_apply (c); + + root = zhash_next (ctx->roothash); } } @@ -596,6 +620,8 @@ static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *mh, int size, expcount = 0; int rc = -1; + /* irrelevant if root not initialized, drop cache entries */ + if (flux_request_decode (msg, NULL, NULL) < 0) goto done; size = cache_count_entries (ctx->cache); @@ -618,6 +644,8 @@ static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *mh, kvs_ctx_t *ctx = arg; int size, expcount = 0; + /* irrelevant if root not initialized, drop cache entries */ + if (flux_event_decode (msg, NULL, NULL) < 0) { flux_log_error (ctx->h, "%s: flux_event_decode", __FUNCTION__); return; @@ -634,22 +662,30 @@ static void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; if (flux_heartbeat_decode (msg, &ctx->epoch) < 0) { flux_log_error (ctx->h, "%s: flux_heartbeat_decode", __FUNCTION__); return; } - /* "touch" objects involved in watched keys */ - if (ctx->epoch - ctx->watchlist_lastrun_epoch > max_lastuse_age) { - /* log error on wait_runqueue(), don't error out. watchers - * may miss value change, but will never get older one. - * Maintains consistency model */ - if (wait_runqueue (ctx->watchlist) < 0) - flux_log_error (h, "%s: wait_runqueue", __FUNCTION__); - ctx->watchlist_lastrun_epoch = ctx->epoch; + + root = zhash_first (ctx->roothash); + while (root) { + + /* "touch" objects involved in watched keys */ + if (ctx->epoch - root->watchlist_lastrun_epoch > max_lastuse_age) { + /* log error on wait_runqueue(), don't error out. watchers + * may miss value change, but will never get older one. + * Maintains consistency model */ + if (wait_runqueue (root->watchlist) < 0) + flux_log_error (h, "%s: wait_runqueue", __FUNCTION__); + root->watchlist_lastrun_epoch = ctx->epoch; + } + /* "touch" root */ + (void)cache_lookup (ctx->cache, root->ref, ctx->epoch); + + root = zhash_next (ctx->roothash); } - /* "touch" root */ - (void)cache_lookup (ctx->cache, ctx->root.ref, ctx->epoch); if (cache_expire_entries (ctx->cache, ctx->epoch, max_lastuse_age) < 0) flux_log_error (ctx->h, "%s: cache_expire_entries", __FUNCTION__); @@ -687,15 +723,22 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *mh, /* if bad lh, then first time rpc and not a replay */ if (lookup_validate (arg) == false) { + const char *namespace; + struct kvsroot *root; + ctx = arg; - if (flux_request_unpack (msg, NULL, "{ s:s s:i }", + if (flux_request_unpack (msg, NULL, "{ s:s s:s s:i }", "key", &key, + "namespace", &namespace, "flags", &flags) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto done; } + if (!(root = getroot (ctx, namespace))) + goto done; + /* rootdir is optional */ (void)flux_request_unpack (msg, NULL, "{ s:o }", "rootdir", &root_dirent); @@ -714,7 +757,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *mh, if (!(lh = lookup_create (ctx->cache, ctx->epoch, - ctx->root.ref, + root->ref, root_ref, key, h, @@ -812,7 +855,9 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh, json_t *oval = NULL; json_t *val = NULL; flux_msg_t *cpy = NULL; + struct kvsroot *root = NULL; const char *key; + const char *namespace; int flags; lookup_t *lh = NULL; wait_t *wait = NULL; @@ -826,17 +871,21 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh, if (lookup_validate (arg) == false) { ctx = arg; - if (flux_request_unpack (msg, NULL, "{ s:s s:o s:i }", + if (flux_request_unpack (msg, NULL, "{ s:s s:s s:o s:i }", "key", &key, + "namespace", &namespace, "val", &oval, "flags", &flags) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto done; } + if (!(root = getroot (ctx, namespace))) + goto done; + if (!(lh = lookup_create (ctx->cache, ctx->epoch, - ctx->root.ref, + root->ref, NULL, key, h, @@ -908,13 +957,17 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh, /* we didn't initialize these values on a replay, get them */ if (isreplay) { - if (flux_request_unpack (msg, NULL, "{ s:s s:o s:i }", + if (flux_request_unpack (msg, NULL, "{ s:s s:s s:o s:i }", "key", &key, + "namespace", &namespace, "val", &oval, "flags", &flags) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto done; } + + if (!(root = getroot (ctx, namespace))) + goto done; } /* Value changed or this is the initial request, so there will be @@ -924,24 +977,26 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh, out = true; /* No reply sent or this is a multi-response watch request. - * Arrange to wait on ctx->watchlist for each new commit. + * Arrange to wait on root->watchlist for each new commit. * Reconstruct the payload with 'first' flag clear, and updated value. */ if (!out || !(flags & KVS_WATCH_ONCE)) { if (!(cpy = flux_msg_copy (msg, false))) goto done; - if (flux_msg_pack (cpy, "{ s:s s:O s:i }", + if (flux_msg_pack (cpy, "{ s:s s:s s:O s:i }", "key", key, + "namespace", namespace, "val", val, "flags", flags & ~KVS_WATCH_FIRST) < 0) { flux_log_error (h, "%s: flux_msg_pack", __FUNCTION__); goto done; } + if (!(watcher = wait_create_msg_handler (h, mh, cpy, watch_request_cb, ctx))) goto done; - if (wait_addqueue (ctx->watchlist, watcher) < 0) { + if (wait_addqueue (root->watchlist, watcher) < 0) { saved_errno = errno; wait_destroy (watcher); errno = saved_errno; @@ -1009,15 +1064,24 @@ static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + const char *namespace; const char *key; unwatch_param_t p = { NULL, NULL }; int errnum = 0; - if (flux_request_unpack (msg, NULL, "{ s:s }", "key", &key) < 0) { + if (flux_request_unpack (msg, NULL, "{ s:s s:s }", + "namespace", &namespace, + "key", &key) < 0) { errnum = errno; flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto done; } + + /* if root not initialized, success automatically */ + if (!(root = zhash_lookup (ctx->roothash, namespace))) + goto done; + if (!(p.key = kvs_util_normalize_key (key, NULL))) { errnum = errno; goto done; @@ -1033,7 +1097,7 @@ static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *mh, * but cache_wait_destroy_msg() fails, it's not that big of a * deal. The current state is still maintained. */ - if (wait_destroy_msg (ctx->watchlist, unwatch_cmp, &p) < 0) { + if (wait_destroy_msg (root->watchlist, unwatch_cmp, &p) < 0) { errnum = errno; flux_log_error (h, "%s: wait_destroy_msg", __FUNCTION__); goto done; @@ -1065,7 +1129,8 @@ static int finalize_fence_req (fence_t *f, const flux_msg_t *req, void *data) return 0; } -static void finalize_fences_bynames (kvs_ctx_t *ctx, json_t *names, int errnum) +static void finalize_fences_bynames (kvs_ctx_t *ctx, struct kvsroot *root, + json_t *names, int errnum) { int i, len; json_t *name; @@ -1081,9 +1146,9 @@ static void finalize_fences_bynames (kvs_ctx_t *ctx, json_t *names, int errnum) flux_log_error (ctx->h, "%s: parsing array[%d]", __FUNCTION__, i); return; } - if ((f = commit_mgr_lookup_fence (ctx->cm, json_string_value (name)))) { + if ((f = commit_mgr_lookup_fence (root->cm, json_string_value (name)))) { fence_iter_request_copies (f, finalize_fence_req, &d); - commit_mgr_remove_fence (ctx->cm, json_string_value (name)); + commit_mgr_remove_fence (root->cm, json_string_value (name)); } } } @@ -1094,28 +1159,39 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + const char *namespace; const char *name; int nprocs, flags; json_t *ops = NULL; fence_t *f; - if (flux_request_unpack (msg, NULL, "{ s:o s:s s:i s:i }", + if (flux_request_unpack (msg, NULL, "{ s:o s:s s:s s:i s:i }", "ops", &ops, "name", &name, + "namespace", &namespace, "flags", &flags, "nprocs", &nprocs) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); return; } + + /* namespace must exist given we are on rank 0 */ + if (!(root = zhash_lookup (ctx->roothash, namespace))) { + flux_log (h, LOG_ERR, "%s: namespace %s not initialized", + __FUNCTION__, namespace); + return; + } + /* FIXME: generate a kvs.fence.abort (or similar) if an error * occurs after we know the fence name */ - if (!(f = commit_mgr_lookup_fence (ctx->cm, name))) { + if (!(f = commit_mgr_lookup_fence (root->cm, name))) { if (!(f = fence_create (name, nprocs, flags))) { flux_log_error (h, "%s: fence_create", __FUNCTION__); return; } - if (commit_mgr_add_fence (ctx->cm, f) < 0) { + if (commit_mgr_add_fence (root->cm, f) < 0) { flux_log_error (h, "%s: commit_mgr_add_fence", __FUNCTION__); fence_destroy (f); return; @@ -1129,7 +1205,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh, return; } - if (commit_mgr_process_fence_request (ctx->cm, f) < 0) { + if (commit_mgr_process_fence_request (root->cm, f) < 0) { flux_log_error (h, "%s: commit_mgr_process_fence_request", __FUNCTION__); return; } @@ -1144,25 +1220,32 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + const char *namespace; const char *name; int saved_errno, nprocs, flags; json_t *ops = NULL; fence_t *f; - if (flux_request_unpack (msg, NULL, "{ s:o s:s s:i s:i }", + if (flux_request_unpack (msg, NULL, "{ s:o s:s s:s s:i s:i }", "ops", &ops, "name", &name, + "namespace", &namespace, "flags", &flags, "nprocs", &nprocs) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto error; } - if (!(f = commit_mgr_lookup_fence (ctx->cm, name))) { + + if (!(root = getroot (ctx, namespace))) + goto error; + + if (!(f = commit_mgr_lookup_fence (root->cm, name))) { if (!(f = fence_create (name, nprocs, flags))) { flux_log_error (h, "%s: fence_create", __FUNCTION__); goto error; } - if (commit_mgr_add_fence (ctx->cm, f) < 0) { + if (commit_mgr_add_fence (root->cm, f) < 0) { saved_errno = errno; flux_log_error (h, "%s: commit_mgr_add_fence", __FUNCTION__); fence_destroy (f); @@ -1181,7 +1264,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } - if (commit_mgr_process_fence_request (ctx->cm, f) < 0) { + if (commit_mgr_process_fence_request (root->cm, f) < 0) { flux_log_error (h, "%s: commit_mgr_process_fence_request", __FUNCTION__); goto error; @@ -1191,9 +1274,10 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_future_t *f; if (!(f = flux_rpc_pack (h, "kvs.relayfence", 0, FLUX_RPC_NORESPONSE, - "{ s:O s:s s:i s:i }", + "{ s:O s:s s:s s:i s:i }", "ops", ops, "name", name, + "namespace", namespace, "flags", flags, "nprocs", nprocs))) { flux_log_error (h, "%s: flux_rpc_pack", __FUNCTION__); @@ -1215,19 +1299,26 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + const char *namespace; + struct kvsroot *root; int saved_errno, rootseq; wait_t *wait = NULL; - if (flux_request_unpack (msg, NULL, "{ s:i }", - "rootseq", &rootseq) < 0) { + if (flux_request_unpack (msg, NULL, "{ s:i s:s }", + "rootseq", &rootseq, + "namespace", &namespace) < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto error; } - if (ctx->root.seq < rootseq) { + + if (!(root = getroot (ctx, namespace))) + goto error; + + if (root->seq < rootseq) { if (!(wait = wait_create_msg_handler (h, mh, msg, sync_request_cb, arg))) goto error; - if (wait_addqueue (ctx->watchlist, wait) < 0) { + if (wait_addqueue (root->watchlist, wait) < 0) { saved_errno = errno; wait_destroy (wait); errno = saved_errno; @@ -1236,8 +1327,8 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh, return; /* stall */ } if (flux_respond_pack (h, msg, "{ s:i s:s }", - "rootseq", ctx->root.seq, - "rootref", ctx->root.ref) < 0) { + "rootseq", root->seq, + "rootref", root->ref) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } @@ -1253,12 +1344,34 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + const char *namespace; + struct kvsroot *root; - if (flux_request_decode (msg, NULL, NULL) < 0) + if (flux_request_unpack (msg, NULL, "{ s:s }", + "namespace", &namespace) < 0) { + flux_log_error (ctx->h, "%s: flux_request_unpack", __FUNCTION__); goto error; - if (flux_respond_pack (h, msg, "{ s:i s:s }", - "rootseq", ctx->root.seq, - "rootref", ctx->root.ref) < 0) { + } + + if (ctx->rank == 0) { + if (!(root = zhash_lookup (ctx->roothash, namespace))) { + flux_log (h, LOG_DEBUG, "namespace %s not found", namespace); + errno = ENOTSUP; + goto error; + } + } + else { + /* If root is not initialized, we have to intialize ourselves + * first. + */ + if (!(root = getroot (ctx, namespace))) + goto error; + } + + if (flux_respond_pack (h, msg, "{ s:i s:s s:i }", + "rootseq", root->seq, + "rootref", root->ref, + "flags", root->flags) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } @@ -1269,19 +1382,25 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, blobref_t rootref) +static int getroot_rpc (kvs_ctx_t *ctx, const char *namespace, int *rootseq, + blobref_t rootref, int *flagsp) { flux_future_t *f; const char *ref; + int flags; int saved_errno, rc = -1; - if (!(f = flux_rpc (ctx->h, "kvs.getroot", NULL, FLUX_NODEID_UPSTREAM, 0))) { + /* XXX: future make asynchronous */ + if (!(f = flux_rpc_pack (ctx->h, "kvs.getroot", FLUX_NODEID_UPSTREAM, 0, + "{ s:s }", + "namespace", namespace))) { saved_errno = errno; goto done; } - if (flux_rpc_get_unpack (f, "{ s:i s:s }", + if (flux_rpc_get_unpack (f, "{ s:i s:s s:i }", "rootseq", rootseq, - "rootref", &ref) < 0) { + "rootref", &ref, + "flags", &flags) < 0) { saved_errno = errno; flux_log_error (ctx->h, "%s: flux_rpc_get_unpack", __FUNCTION__); goto done; @@ -1291,6 +1410,8 @@ static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, blobref_t rootref) goto done; } strcpy (rootref, ref); + if (flagsp) + (*flagsp) = flags; rc = 0; done: flux_future_destroy (f); @@ -1299,28 +1420,219 @@ static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, blobref_t rootref) return rc; } +static void destroy_root (void *data) +{ + if (data) { + struct kvsroot *root = data; + if (root->namespace) + free (root->namespace); + if (root->cm) + commit_mgr_destroy (root->cm); + if (root->watchlist) + wait_queue_destroy (root->watchlist); + free (data); + } +} + +static void remove_root (kvs_ctx_t *ctx, const char *namespace) +{ + zhash_delete (ctx->roothash, namespace); +} + +static struct kvsroot *create_root (kvs_ctx_t *ctx, const char *namespace, + int flags) { + struct kvsroot *root; + int save_errnum; + + if (!(root = calloc (1, sizeof (*root)))) { + flux_log_error (ctx->h, "calloc"); + return NULL; + } + + if (!(root->namespace = strdup (namespace))) { + flux_log_error (ctx->h, "strdup"); + goto error; + } + + if (!(root->cm = commit_mgr_create (ctx->cache, ctx->hash_name, + ctx->h, root))) { + flux_log_error (ctx->h, "commit_mgr_create"); + goto error; + } + + if (!(root->watchlist = wait_queue_create ())) { + flux_log_error (ctx->h, "wait_queue_create"); + goto error; + } + + root->flags = flags; + + if (zhash_insert (ctx->roothash, namespace, root) < 0) { + flux_log_error (ctx->h, "zhash_insert"); + goto error; + } + + if (!zhash_freefn (ctx->roothash, namespace, destroy_root)) { + flux_log_error (ctx->h, "zhash_freefn"); + save_errnum = errno; + zhash_delete (ctx->roothash, namespace); + errno = save_errnum; + goto error; + } + + root->ctx = ctx; + + return root; + + error: + save_errnum = errno; + destroy_root (root); + errno = save_errnum; + return NULL; +} + +static int event_subscribe (kvs_ctx_t *ctx, const char *namespace) +{ + char *setroot_topic = NULL; + char *error_topic = NULL; + int rc = -1; + + /* do not want to subscribe to events that are not within our + * namespace, so we subscribe to only specific ones. + */ + + if (!(ctx->events_init)) { + + /* These belong to all namespaces, subscribe once the first + * time we init a namespace */ + + if (flux_event_subscribe (ctx->h, "hb") < 0) { + flux_log_error (ctx->h, "flux_event_subscribe"); + goto cleanup; + } + + if (flux_event_subscribe (ctx->h, "kvs.stats.clear") < 0) { + flux_log_error (ctx->h, "flux_event_subscribe"); + goto cleanup; + } + + if (flux_event_subscribe (ctx->h, "kvs.dropcache") < 0) { + flux_log_error (ctx->h, "flux_event_subscribe"); + goto cleanup; + } + + ctx->events_init = true; + } + + if (asprintf (&setroot_topic, "kvs.setroot.%s", namespace) < 0) { + errno = ENOMEM; + goto cleanup; + } + + if (flux_event_subscribe (ctx->h, setroot_topic) < 0) { + flux_log_error (ctx->h, "flux_event_subscribe"); + goto cleanup; + } + + if (asprintf (&error_topic, "kvs.error.%s", namespace) < 0) { + errno = ENOMEM; + goto cleanup; + } + + if (flux_event_subscribe (ctx->h, error_topic) < 0) { + flux_log_error (ctx->h, "flux_event_subscribe"); + goto cleanup; + } + + rc = 0; +cleanup: + free (setroot_topic); + free (error_topic); + return rc; +} + +static struct kvsroot *getroot (kvs_ctx_t *ctx, const char *namespace) { + struct kvsroot *root; + blobref_t rootref; + int save_errno, rootseq, flags; + + if (!(root = zhash_lookup (ctx->roothash, namespace))) { + if (ctx->rank == 0) { + flux_log (ctx->h, LOG_DEBUG, "namespace %s not found", namespace); + errno = ENOTSUP; + return NULL; + } + else { + if (getroot_rpc (ctx, namespace, &rootseq, rootref, &flags) < 0) { + flux_log_error (ctx->h, "getroot_rpc"); + return NULL; + } + + if (!(root = create_root (ctx, namespace, flags))) { + flux_log_error (ctx->h, "create_root"); + return NULL; + } + + setroot (ctx, root, rootref, rootseq); + + if (event_subscribe (ctx, namespace) < 0) { + save_errno = errno; + remove_root (ctx, namespace); + errno = save_errno; + flux_log_error (ctx->h, "event_subscribe"); + return NULL; + } + } + } + return root; +} + static void error_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + const char *namespace; json_t *names = NULL; int errnum; - if (flux_event_unpack (msg, NULL, "{ s:o s:i }", + if (flux_event_unpack (msg, NULL, "{ s:s s:o s:i }", + "namespace", &namespace, "names", &names, "errnum", &errnum) < 0) { flux_log_error (ctx->h, "%s: flux_event_unpack", __FUNCTION__); return; } - finalize_fences_bynames (ctx, names, errnum); + + /* if root not initialized, nothing to do + * - it is possible to get the event due to a race between a + * remove and the event being sent. But generally speaking this + * should be impossible to hit. + */ + if (!(root = zhash_lookup (ctx->roothash, namespace))) { + flux_log (ctx->h, LOG_ERR, "%s: received unknown namespace %s", + __FUNCTION__, namespace); + return; + } + + finalize_fences_bynames (ctx, root, names, errnum); } -static int error_event_send (kvs_ctx_t *ctx, json_t *names, int errnum) +static int error_event_send (kvs_ctx_t *ctx, struct kvsroot *root, + json_t *names, int errnum) { flux_msg_t *msg = NULL; + char *error_topic = NULL; int saved_errno, rc = -1; - if (!(msg = flux_event_pack ("kvs.error", "{ s:O s:i }", + if (asprintf (&error_topic, "kvs.error.%s", root->namespace) < 0) { + saved_errno = ENOMEM; + flux_log_error (ctx->h, "%s: asprintf", __FUNCTION__); + goto done; + } + + if (!(msg = flux_event_pack (error_topic, "{ s:s s:O s:i }", + "namespace", root->namespace, "names", names, "errnum", errnum))) { saved_errno = errno; @@ -1337,6 +1649,7 @@ static int error_event_send (kvs_ctx_t *ctx, json_t *names, int errnum) } rc = 0; done: + free (error_topic); flux_msg_destroy (msg); if (rc < 0) errno = saved_errno; @@ -1389,12 +1702,15 @@ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; + const char *namespace; int rootseq; const char *rootref; json_t *rootdir = NULL; json_t *names = NULL; - if (flux_event_unpack (msg, NULL, "{ s:i s:s s:o s:o }", + if (flux_event_unpack (msg, NULL, "{ s:s s:i s:s s:o s:o }", + "namespace", &namespace, "rootseq", &rootseq, "rootref", &rootref, "names", &names, @@ -1403,7 +1719,18 @@ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *mh, return; } - finalize_fences_bynames (ctx, names, 0); + /* if root not initialized, nothing to do + * - it is possible to get the event due to a race between a + * remove and the event being sent. But generally speaking this + * should be impossible to hit. + */ + if (!(root = zhash_lookup (ctx->roothash, namespace))) { + flux_log (ctx->h, LOG_ERR, "%s: received unknown namespace %s", + __FUNCTION__, namespace); + return; + } + + finalize_fences_bynames (ctx, root, names, 0); /* Optimization: prime local cache with directory object, if provided * in event message. Ignore failure here - object will be fetched on @@ -1412,23 +1739,26 @@ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *mh, if (!json_is_null (rootdir)) prime_cache_with_rootdir (ctx, rootdir); - setroot (ctx, rootref, rootseq); + setroot (ctx, root, rootref, rootseq); } -static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) +static int setroot_event_send (kvs_ctx_t *ctx, struct kvsroot *root, + json_t *names) { - const json_t *root = NULL; + const json_t *root_dir = NULL; json_t *nullobj = NULL; flux_msg_t *msg = NULL; + char *setroot_topic = NULL; int saved_errno, rc = -1; assert (ctx->rank == 0); if (event_includes_rootdir) { struct cache_entry *entry; - if ((entry = cache_lookup (ctx->cache, ctx->root.ref, ctx->epoch))) - root = cache_entry_get_treeobj (entry); - assert (root != NULL); // root entry is always in cache on rank 0 + + if ((entry = cache_lookup (ctx->cache, root->ref, ctx->epoch))) + root_dir = cache_entry_get_treeobj (entry); + assert (root_dir != NULL); // root entry is always in cache on rank 0 } else { if (!(nullobj = json_null ())) { @@ -1436,13 +1766,21 @@ static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) flux_log_error (ctx->h, "%s: json_null", __FUNCTION__); goto done; } - root = nullobj; + root_dir = nullobj; + } + + if (asprintf (&setroot_topic, "kvs.setroot.%s", root->namespace) < 0) { + saved_errno = ENOMEM; + flux_log_error (ctx->h, "%s: asprintf", __FUNCTION__); + goto done; } - if (!(msg = flux_event_pack ("kvs.setroot", "{ s:i s:s s:O s:O }", - "rootseq", ctx->root.seq, - "rootref", ctx->root.ref, + + if (!(msg = flux_event_pack (setroot_topic, "{ s:s s:i s:s s:O s:O }", + "namespace", root->namespace, + "rootseq", root->seq, + "rootref", root->ref, "names", names, - "rootdir", root))) { + "rootdir", root_dir))) { saved_errno = errno; flux_log_error (ctx->h, "%s: flux_event_pack", __FUNCTION__); goto done; @@ -1457,6 +1795,7 @@ static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) } rc = 0; done: + free (setroot_topic); flux_msg_destroy (msg); json_decref (nullobj); if (rc < 0) @@ -1481,6 +1820,7 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + struct kvsroot *root; char *sender = NULL; if (flux_request_decode (msg, NULL, NULL) < 0) @@ -1494,8 +1834,14 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, * but cache_wait_destroy_msg() fails, it's not that big of a * deal. The current state is still maintained. */ - if (wait_destroy_msg (ctx->watchlist, disconnect_cmp, sender) < 0) - flux_log_error (h, "%s: wait_destroy_msg", __FUNCTION__); + root = zhash_first (ctx->roothash); + while (root) { + + if (wait_destroy_msg (root->watchlist, disconnect_cmp, sender) < 0) + flux_log_error (h, "%s: wait_destroy_msg", __FUNCTION__); + + root = zhash_next (ctx->roothash); + } if (cache_wait_destroy_msg (ctx->cache, disconnect_cmp, sender) < 0) flux_log_error (h, "%s: wait_destroy_msg", __FUNCTION__); free (sender); @@ -1505,39 +1851,88 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; - json_t *t = NULL; - tstat_t ts; - int size, incomplete, dirty; + json_t *tstats = NULL; + json_t *cstats = NULL; + json_t *nsstats = NULL; + tstat_t ts = { .min = 0.0, .max = 0.0, .M = 0.0, .S = 0.0, .newM = 0.0, + .newS = 0.0, .n = 0 }; + int size = 0, incomplete = 0, dirty = 0; int rc = -1; double scale = 1E-3; if (flux_request_decode (msg, NULL, NULL) < 0) goto done; - memset (&ts, 0, sizeof (ts)); - if (cache_get_stats (ctx->cache, &ts, &size, &incomplete, &dirty) < 0) + /* if no roots are initialized, respond with all zeroes as stats */ + if (zhash_size (ctx->roothash)) { + if (cache_get_stats (ctx->cache, &ts, &size, &incomplete, &dirty) < 0) + goto done; + } + + if (!(tstats = json_pack ("{ s:i s:f s:f s:f s:f }", + "count", tstat_count (&ts), + "min", tstat_min (&ts)*scale, + "mean", tstat_mean (&ts)*scale, + "stddev", tstat_stddev (&ts)*scale, + "max", tstat_max (&ts)*scale))) { + errno = ENOMEM; + goto done; + } + + if (!(cstats = json_pack ("{ s:f s:O s:i s:i s:i }", + "obj size total (MiB)", (double)size/1048576, + "obj size (KiB)", tstats, + "#obj dirty", dirty, + "#obj incomplete", incomplete, + "#faults", ctx->faults))) { + errno = ENOMEM; goto done; + } - if (!(t = json_pack ("{ s:i s:f s:f s:f s:f }", - "count", tstat_count (&ts), - "min", tstat_min (&ts)*scale, - "mean", tstat_mean (&ts)*scale, - "stddev", tstat_stddev (&ts)*scale, - "max", tstat_max (&ts)*scale))) { + if (!(nsstats = json_object ())) { errno = ENOMEM; goto done; } + if (zhash_size (ctx->roothash)) { + struct kvsroot *root; + + root = zhash_first (ctx->roothash); + while (root) { + json_t *s; + + if (!(s = json_pack ("{ s:i s:i s:i }", + "#watchers", + wait_queue_length (root->watchlist), + "#no-op stores", + commit_mgr_get_noop_stores (root->cm), + "store revision", root->seq))) { + errno = ENOMEM; + goto done; + } + + json_object_set_new (nsstats, root->namespace, s); + + root = zhash_next (ctx->roothash); + } + } + else { + json_t *s; + + if (!(s = json_pack ("{ s:i s:i }", + "#watchers", 0, + "store revision", 0))) { + errno = ENOMEM; + goto done; + } + + json_object_set_new (nsstats, KVS_PRIMARY_NAMESPACE, s); + } + if (flux_respond_pack (h, msg, - "{ s:f s:O s:i s:i s:i s:i s:i s:i }", - "obj size total (MiB)", (double)size/1048576, - "obj size (KiB)", t, - "#obj dirty", dirty, - "#obj incomplete", incomplete, - "#watchers", wait_queue_length (ctx->watchlist), - "#no-op stores", commit_mgr_get_noop_stores (ctx->cm), - "#faults", ctx->faults, - "store revision", ctx->root.seq) < 0) { + "{ s:O s:O }", + "cache", cstats, + "namespace", nsstats) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto done; } @@ -1548,19 +1943,29 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *mh, if (flux_respond (h, msg, errno, NULL) < 0) flux_log_error (h, "%s: flux_respond", __FUNCTION__); } - json_decref (t); + json_decref (tstats); + json_decref (cstats); + json_decref (nsstats); } static void stats_clear (kvs_ctx_t *ctx) { + struct kvsroot *root; + ctx->faults = 0; - commit_mgr_clear_noop_stores (ctx->cm); + + root = zhash_first (ctx->roothash); + while (root) { + commit_mgr_clear_noop_stores (root->cm); + root = zhash_next (ctx->roothash); + } } static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; + stats_clear (ctx); } @@ -1575,12 +1980,91 @@ static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } +static int namespace_create (kvs_ctx_t *ctx, const char *namespace, int flags) +{ + struct kvsroot *root; + json_t *rootdir = NULL; + blobref_t ref; + void *data = NULL; + int len; + + /* Namespace already exists */ + if (zhash_lookup (ctx->roothash, namespace)) { + errno = EEXIST; + goto cleanup; + } + + if (!(root = create_root (ctx, namespace, flags))) { + flux_log_error (ctx->h, "%s: create_root", __FUNCTION__); + goto cleanup; + } + + if (!(rootdir = treeobj_create_dir ())) { + flux_log_error (ctx->h, "%s: treeobj_create_dir", __FUNCTION__); + goto cleanup_remove_root; + } + + if (!(data = treeobj_encode (rootdir))) { + flux_log_error (ctx->h, "%s: treeobj_encode", __FUNCTION__); + goto cleanup_remove_root; + } + len = strlen (data); + + if (blobref_hash (ctx->hash_name, data, len, ref) < 0) { + flux_log_error (ctx->h, "%s: blobref_hash", __FUNCTION__); + goto cleanup_remove_root; + } + + setroot (ctx, root, ref, 0); + + if (event_subscribe (ctx, namespace) < 0) { + flux_log_error (ctx->h, "%s: event_subscribe", __FUNCTION__); + goto cleanup_remove_root; + } + + return 0; + +cleanup_remove_root: + remove_root (ctx, namespace); +cleanup: + free (data); + json_decref (rootdir); + return -1; +} + +static void namespace_create_request_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + kvs_ctx_t *ctx = arg; + const char *namespace; + int flags; + + assert (ctx->rank == 0); + + if (flux_request_unpack (msg, NULL, "{ s:s s:i }", + "namespace", &namespace, + "flags", &flags) < 0) { + flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); + goto error; + } + + if (namespace_create (ctx, namespace, flags) < 0) { + flux_log_error (h, "%s: namespace_create", __FUNCTION__); + goto error; + } + + errno = 0; +error: + if (flux_respond (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); +} + static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, "kvs.stats.get", stats_get_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "kvs.stats.clear",stats_clear_request_cb, 0 }, { FLUX_MSGTYPE_EVENT, "kvs.stats.clear",stats_clear_event_cb, 0 }, - { FLUX_MSGTYPE_EVENT, "kvs.setroot", setroot_event_cb, 0 }, - { FLUX_MSGTYPE_EVENT, "kvs.error", error_event_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.setroot.*", setroot_event_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.error.*", error_event_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "kvs.getroot", getroot_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "kvs.dropcache", dropcache_request_cb, 0 }, { FLUX_MSGTYPE_EVENT, "kvs.dropcache", dropcache_event_cb, 0 }, @@ -1592,6 +2076,8 @@ static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, "kvs.watch", watch_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "kvs.fence", fence_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "kvs.relayfence", relayfence_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.namespace.create", + namespace_create_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -1695,30 +2181,28 @@ int mod_main (flux_t *h, int argc, char **argv) goto done; } process_args (ctx, argc, argv); - if (flux_event_subscribe (h, "hb") < 0) { - flux_log_error (h, "flux_event_subscribe"); - goto done; - } - if (flux_event_subscribe (h, "kvs.") < 0) { - flux_log_error (h, "flux_event_subscribe"); - goto done; - } if (ctx->rank == 0) { + struct kvsroot *root; blobref_t rootref; if (store_initial_rootdir (ctx, NULL, rootref) < 0) { flux_log_error (h, "storing initial root object"); goto done; } - setroot (ctx, rootref, 0); - } else { - blobref_t rootref; - int rootseq; - if (getroot_rpc (ctx, &rootseq, rootref) < 0) { - flux_log_error (h, "getroot"); + + if (!(root = zhash_lookup (ctx->roothash, KVS_PRIMARY_NAMESPACE))) { + if (!(root = create_root (ctx, KVS_PRIMARY_NAMESPACE, 0))) { + flux_log_error (h, "create_root"); + goto done; + } + } + + setroot (ctx, root, rootref, 0); + + if (event_subscribe (ctx, KVS_PRIMARY_NAMESPACE) < 0) { + flux_log_error (h, "event_subscribe"); goto done; } - setroot (ctx, rootref, rootseq); } if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); diff --git a/t/Makefile.am b/t/Makefile.am index 08898fc8c457..cfba751877eb 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -52,6 +52,7 @@ TESTS = \ t1001-kvs-internals.t \ t1002-kvs-watch.t \ t1003-kvs-stress.t \ + t1004-kvs-namespace.t \ t1101-barrier-basic.t \ t1102-cmddriver.t \ t1103-apidisconnect.t \ @@ -125,6 +126,7 @@ check_SCRIPTS = \ t1001-kvs-internals.t \ t1002-kvs-watch.t \ t1003-kvs-stress.t \ + t1004-kvs-namespace.t \ t1101-barrier-basic.t \ t1102-cmddriver.t \ t1103-apidisconnect.t \ diff --git a/t/kvs/watch.c b/t/kvs/watch.c index e7aa3f3d573c..d8029aa2dde5 100644 --- a/t/kvs/watch.c +++ b/t/kvs/watch.c @@ -473,11 +473,16 @@ static int simulwatch_cb (const char *key, const char *json_str, void *arg, int int get_watch_stats (flux_t *h, int *count) { flux_future_t *f; + json_t *ns, *p; int rc = -1; if (!(f = flux_rpc (h, "kvs.stats.get", NULL, FLUX_NODEID_ANY, 0))) goto done; - if (flux_rpc_get_unpack (f, "{ s:i }", "#watchers", count) < 0) + if (flux_rpc_get_unpack (f, "{ s:o }", "namespace", &ns) < 0) + goto done; + if (json_unpack (ns, "{ s:o }", KVS_PRIMARY_NAMESPACE, &p) < 0) + goto done; + if (json_unpack (p, "{ s:i }", "#watchers", count) < 0) goto done; rc = 0; done: diff --git a/t/kvs/watch_disconnect.c b/t/kvs/watch_disconnect.c index 817a8b86a887..109a6d65da66 100644 --- a/t/kvs/watch_disconnect.c +++ b/t/kvs/watch_disconnect.c @@ -18,8 +18,9 @@ void send_watch_requests (flux_t *h, const char *key) int flags = KVS_WATCH_FIRST; flux_mrpc_t *r; - if (!(r = flux_mrpcf (h, "kvs.watch", "all", 0, "{s:s s:i s:n}", + if (!(r = flux_mrpcf (h, "kvs.watch", "all", 0, "{s:s s:s s:i s:n}", "key", key, + "namespace", KVS_PRIMARY_NAMESPACE, "flags", flags, "val"))) log_err_exit ("flux_mrpc kvs.watch"); @@ -40,7 +41,12 @@ int count_watchers (flux_t *h) if (!(r = flux_mrpc (h, "kvs.stats.get", NULL, "all", 0))) log_err_exit ("flux_mrpc kvs.stats.get"); do { - if (flux_mrpc_getf (r, "{s:i}", "#watchers", &n) < 0) + json_t *ns, *p; + if (flux_mrpc_getf (r, "{ s:o }", "namespace", &ns) < 0) + log_err_exit ("kvs.stats.get namespace"); + if (json_unpack (ns, "{ s:o }", KVS_PRIMARY_NAMESPACE, &p) < 0) + log_err_exit ("kvs.stats.get %s", KVS_PRIMARY_NAMESPACE); + if (json_unpack (p, "{ s:i }", "#watchers", &n) < 0) log_err_exit ("kvs.stats.get #watchers"); count += n; } while (flux_mrpc_next (r) == 0); diff --git a/t/t1004-kvs-namespace.t b/t/t1004-kvs-namespace.t new file mode 100755 index 000000000000..820d6524f6c1 --- /dev/null +++ b/t/t1004-kvs-namespace.t @@ -0,0 +1,485 @@ +#!/bin/sh + +test_description='Test flux-kvs and kvs in flux session + +These are tests for ensuring multiple namespaces work. +' + +. `dirname $0`/sharness.sh + +if test "$TEST_LONG" = "t"; then + test_set_prereq LONGTEST +fi + +# Size the session to one more than the number of cores, minimum of 4 +SIZE=$(test_size_large) +test_under_flux ${SIZE} kvs +echo "# $0: flux session size will be ${SIZE}" + +DIR=test.a.b +KEY=test.a.b.c + +# Just in case its set in the environment +unset FLUX_KVS_NAMESPACE + +PRIMARYNAMESPACE=primary +NAMESPACETEST=namespacetest +NAMESPACERANK1=namespacerank1 + +test_kvs_key() { + flux kvs get --json "$1" >output + echo "$2" >expected + test_cmp expected output +} + +test_kvs_key_namespace() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs get --json "$2" >output + echo "$3" >expected + unset FLUX_KVS_NAMESPACE + test_cmp expected output +} + +put_kvs_key_namespace() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs put --json "$2=$3" + unset FLUX_KVS_NAMESPACE +} + +dir_kvs_namespace() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs dir "$2" | sort > $3 + unset FLUX_KVS_NAMESPACE +} + +unlink_kvs_namespace() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs unlink $2 + unset FLUX_KVS_NAMESPACE +} + +unlink_kvs_dir_namespace() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs unlink -Rf $2 + unset FLUX_KVS_NAMESPACE +} + +version_kvs_namespace() { + export FLUX_KVS_NAMESPACE=$1 + version=`flux kvs version` + eval $2=$version + unset FLUX_KVS_NAMESPACE +} + +get_kvs_namespace_exitvalue() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs get --json "$2" + eval $3="$?" + unset FLUX_KVS_NAMESPACE +} + +dir_kvs_namespace_exitvalue() { + export FLUX_KVS_NAMESPACE=$1 + flux kvs dir "$2" + eval $3="$?" + unset FLUX_KVS_NAMESPACE +} + +wait_watch_namespace_put() { + export FLUX_KVS_NAMESPACE=$1 + i=0 + while [ "$(flux kvs get --json $2 2> /dev/null)" != "$3" ] && [ $i -lt 50 ] + do + sleep 0.1 + i=$((i + 1)) + done + unset FLUX_KVS_NAMESPACE + if [ $i -eq 50 ] + then + return 1 + fi + return 0 +} + +wait_watch_current() { + i=0 + while [ "$(tail -n 1 $1 2> /dev/null)" != "$2" ] && [ $i -lt 50 ] + do + sleep 0.1 + i=$((i + 1)) + done + if [ $i -eq 50 ] + then + return 1 + fi + return 0 +} + +# +# Basic tests in default primary namespace +# + +test_expect_success 'kvs: create primary namespace fails' ' + ! flux kvs namespace-create $PRIMARYNAMESPACE +' + +test_expect_success 'kvs: get with primary namespace works' ' + flux kvs put --json $DIR.test=1 && + test_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 1 +' + +test_expect_success 'kvs: put with primary namespace works' ' + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 2 && + test_kvs_key $DIR.test 2 +' + +test_expect_success 'kvs: put/get with primary namespace works' ' + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 3 && + test_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 3 +' + +test_expect_success 'kvs: unlink with primary namespace works' ' + unlink_kvs_namespace $PRIMARYNAMESPACE $DIR.test && + test_must_fail flux kvs get --json $DIR.test +' + +test_expect_success 'kvs: dir with primary namespace works' ' + flux kvs put --json $DIR.a=1 && + flux kvs put --json $DIR.b=2 && + flux kvs put --json $DIR.c=3 && + dir_kvs_namespace $PRIMARYNAMESPACE $DIR output && + cat >expected <watch_out & + watchpid=$! && + wait_watch_current watch_out "0" + flux kvs put --json $DIR.watch=1 && + wait $watchpid + unset FLUX_KVS_NAMESPACE +cat >expected <<-EOF && +0 +1 +EOF + test_cmp watch_out expected +' + +# +# Basic tests in new namespace +# + +test_expect_success 'kvs: namespace create on rank 0 works' ' + flux kvs namespace-create $NAMESPACETEST +' + +test_expect_success 'kvs: namespace create on rank 1 works' ' + flux exec -r 1 sh -c "flux kvs namespace-create $NAMESPACERANK1" +' + +test_expect_success 'kvs: put/get value in new namespace works' ' + put_kvs_key_namespace $NAMESPACETEST $DIR.test 1 && + test_kvs_key_namespace $NAMESPACETEST $DIR.test 1 +' + +test_expect_success 'kvs: unlink in new namespace works' ' + unlink_kvs_namespace $NAMESPACETEST $DIR.test && + get_kvs_namespace_exitvalue $NAMESPACETEST $DIR.test exitvalue && + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: dir in new namespace works' ' + put_kvs_key_namespace $NAMESPACETEST $DIR.a 4 && + put_kvs_key_namespace $NAMESPACETEST $DIR.b 5 && + put_kvs_key_namespace $NAMESPACETEST $DIR.c 6 && + dir_kvs_namespace $NAMESPACETEST $DIR output && + cat >expected <watch_out & + watchpid=$! && + wait_watch_current watch_out "0" + flux kvs put --json $DIR.watch=1 && + wait $watchpid + unset FLUX_KVS_NAMESPACE +cat >expected <<-EOF && +0 +1 +EOF + test_cmp watch_out expected +' + +# +# Basic tests, data in new namespace available across ranks +# + +test_expect_success 'kvs: put value in new namespace, available on other ranks' ' + unlink_kvs_dir_namespace $NAMESPACETEST $DIR && + put_kvs_key_namespace $NAMESPACETEST $DIR.all 1 && + version_kvs_namespace $NAMESPACETEST VERS && + flux exec sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACETEST; flux kvs wait ${VERS} && flux kvs get $DIR.all" +' + +test_expect_success 'kvs: unlink value in new namespace, does not exist all ranks' ' + unlink_kvs_namespace $NAMESPACETEST $DIR.all && + version_kvs_namespace $NAMESPACETEST VERS && + flux exec sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACETEST; flux kvs wait ${VERS} && ! flux kvs get $DIR.all" +' + +# +# Namespace corner case tests +# + +NAMESPACEBAD=namespacebad + +test_expect_success 'kvs: namespace create on existing namespace fails' ' + ! flux kvs namespace-create $NAMESPACETEST +' + +test_expect_success 'kvs: namespace create on existing namespace fails on rank 1' ' + ! flux exec -r 1 sh -c "flux kvs namespace-create $NAMESPACETEST" +' + +test_expect_success 'kvs: get fails on invalid namespace' ' + get_kvs_namespace_exitvalue $NAMESPACEBAD $DIR.test exitvalue && + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: get fails on invalid namespace on rank 1' ' + ! flux exec -r 1 sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACEBAD ; flux kvs get $DIR.test" +' + +test_expect_success NO_CHAIN_LINT 'kvs: put fails on invalid namespace' ' + export FLUX_KVS_NAMESPACE=$NAMESPACEBAD + flux kvs put --json $DIR.test=1 + exitvalue=$? + unset FLUX_KVS_NAMESPACE + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: put fails on invalid namespace on rank 1' ' + ! flux exec -r 1 sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACEBAD ; flux kvs put $DIR.test=1" +' + +test_expect_success NO_CHAIN_LINT 'kvs: version fails on invalid namespace' ' + export FLUX_KVS_NAMESPACE=$NAMESPACEBAD + flux kvs version + exitvalue=$? + unset FLUX_KVS_NAMESPACE + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: version fails on invalid namespace on rank 1' ' + ! flux exec -r 1 sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACEBAD ; flux kvs version" +' + +test_expect_success NO_CHAIN_LINT 'kvs: wait fails on invalid namespace' ' + export FLUX_KVS_NAMESPACE=$NAMESPACEBAD + flux kvs wait 1 + exitvalue=$? + unset FLUX_KVS_NAMESPACE + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: wait fails on invalid namespace on rank 1' ' + ! flux exec -r 1 sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACEBAD ; flux kvs wait 1" +' + +test_expect_success NO_CHAIN_LINT 'kvs: watch fails on invalid namespace' ' + export FLUX_KVS_NAMESPACE=$NAMESPACEBAD + flux kvs watch -c 1 $DIR.test + exitvalue=$? + unset FLUX_KVS_NAMESPACE + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: watch fails on invalid namespace on rank 1' ' + ! flux exec -r 1 sh -c "export FLUX_KVS_NAMESPACE=$NAMESPACEBAD ; flux kvs watch -c 1 $DIR.test" +' + +# +# Basic tests - no pollution between namespaces +# + +test_expect_success 'kvs: put/get in different namespaces works' ' + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 1 && + put_kvs_key_namespace $NAMESPACETEST $DIR.test 2 && + test_kvs_key_namespace $PRIMARYNAMESPACE $DIR.test 1 && + test_kvs_key_namespace $NAMESPACETEST $DIR.test 2 +' + +test_expect_success 'kvs: unlink in different namespaces works' ' + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.testA 1 && + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.testB 1 && + put_kvs_key_namespace $NAMESPACETEST $DIR.testA 2 && + put_kvs_key_namespace $NAMESPACETEST $DIR.testB 2 && + unlink_kvs_namespace $PRIMARYNAMESPACE $DIR.testA && + unlink_kvs_namespace $NAMESPACETEST $DIR.testB && + test_kvs_key_namespace $PRIMARYNAMESPACE $DIR.testB 1 && + test_kvs_key_namespace $NAMESPACETEST $DIR.testA 2 && + get_kvs_namespace_exitvalue $PRIMARYNAMESPACE $DIR.testA exitvalue && + test $exitvalue -ne 0 && + get_kvs_namespace_exitvalue $NAMESPACETEST $DIR.testB exitvalue && + test $exitvalue -ne 0 +' + +test_expect_success 'kvs: dir in different namespace works' ' + unlink_kvs_dir_namespace $PRIMARYNAMESPACE $DIR && + unlink_kvs_dir_namespace $NAMESPACETEST $DIR && + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.a 10 && + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.b 11 && + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.c 12 && + put_kvs_key_namespace $NAMESPACETEST $DIR.a 13 && + put_kvs_key_namespace $NAMESPACETEST $DIR.b 14 && + put_kvs_key_namespace $NAMESPACETEST $DIR.c 15 && + dir_kvs_namespace $PRIMARYNAMESPACE $DIR primaryoutput && + dir_kvs_namespace $NAMESPACETEST $DIR testoutput && + cat >primaryexpected <testexpected <primary_watch_out & + primarywatchpid=$! && + wait_watch_current primary_watch_out "0" + + export FLUX_KVS_NAMESPACE=$NAMESPACETEST + stdbuf -oL flux kvs watch -o -c 1 $DIR.watch >test_watch_out & + testwatchpid=$! && + wait_watch_current test_watch_out "1" + unset FLUX_KVS_NAMESPACE + + put_kvs_key_namespace $PRIMARYNAMESPACE $DIR.watch 1 && + put_kvs_key_namespace $NAMESPACETEST $DIR.watch 2 && + wait $primarywatchpid && + wait $testwatchpid +cat >primaryexpected <<-EOF && +0 +1 +EOF +cat >testexpected <<-EOF && +1 +2 +EOF + test_cmp primaryexpected primary_watch_out && + test_cmp testexpected test_watch_out +' + +test_done diff --git a/t/t1103-apidisconnect.t b/t/t1103-apidisconnect.t index b50cc850f6f8..6ffef10e001a 100755 --- a/t/t1103-apidisconnect.t +++ b/t/t1103-apidisconnect.t @@ -12,7 +12,7 @@ test_under_flux ${SIZE} kvs check_kvs_watchers() { local i n for i in `seq 1 $2`; do - n=`flux module stats --parse "#watchers" kvs` + n=`flux module stats --parse "namespace.primary.#watchers" kvs` echo "Try $i: $n" test $n -eq $1 && return 0 sleep 1 @@ -22,7 +22,7 @@ check_kvs_watchers() { test_expect_success 'kvs watcher gets disconnected on client exit' ' - before_watchers=`flux module stats --parse "#watchers" kvs` && + before_watchers=`flux module stats --parse "namespace.primary.#watchers" kvs` && echo "waiters before test: $before_watchers" && test_expect_code 142 run_timeout 1 flux kvs watch noexist && check_kvs_watchers $before_watchers 3 diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index cd5088d031d1..526e3b409dbe 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -387,12 +387,12 @@ test_expect_success 'wreck job is linked in lwj-complete after failure' ' test_expect_success 'wreck: no KVS watchers leaked after 10 jobs' ' flux exec -r 1-$(($SIZE-1)) -l \ - flux module stats --parse "#watchers" kvs | sort -n >w.before && + flux module stats --parse "namespace.primary.#watchers" kvs | sort -n >w.before && for i in `seq 1 10`; do flux wreckrun --ntasks $SIZE /bin/true done && flux exec -r 1-$(($SIZE-1)) -l \ - flux module stats --parse "#watchers" kvs | sort -n >w.after && + flux module stats --parse "namespace.primary.#watchers" kvs | sort -n >w.after && test_cmp w.before w.after '