From 8c9b242af99513e4b95ab7aa963ff1d034a7be4a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 10:52:19 -0700 Subject: [PATCH 01/25] modules/kvs: kvs.get codec: treeobj in/out Allow a treeobj to be sent as an optional argument to a kvs.get request, so that the key can be looked up relative to a snapshot. Echo this back in the response, or if it was not provided, include a treeobj that identifies the current root that was used for the lookup. Update codec test Adjust callers to compile with the new codec signature. New functionality based on the codec change will be added in a subsequent commit. --- src/modules/kvs/kvs.c | 4 ++-- src/modules/kvs/libkvs.c | 8 ++++---- src/modules/kvs/proto.c | 26 +++++++++++++++----------- src/modules/kvs/proto.h | 10 ++++++---- src/modules/kvs/test/proto.c | 31 ++++++++++++++++++++++++++----- 5 files changed, 53 insertions(+), 26 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index a872c3bc9988..172637c74c92 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -884,7 +884,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, errno = EPROTO; goto done; } - if (kp_tget_dec (in, &key, &flags) < 0) + if (kp_tget_dec (in, NULL, &key, &flags) < 0) goto done; if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, arg))) goto done; @@ -898,7 +898,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, errno = ENOENT; goto done; } - if (!(out = kp_rget_enc (val))) + if (!(out = kp_rget_enc (NULL, val))) goto done; rc = 0; done: diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 77d361b94e9c..4921da773119 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -341,7 +341,7 @@ static int getobj (flux_t h, const char *key, int flags, json_object **val) goto done; } k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (k, flags))) + if (!(in = kp_tget_enc (NULL, k, flags))) goto done; if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) goto done; @@ -351,7 +351,7 @@ static int getobj (flux_t h, const char *key, int flags, json_object **val) errno = EPROTO; goto done; } - if (kp_rget_dec (out, &v) < 0) + if (kp_rget_dec (out, NULL, &v) < 0) goto done; if (val) *val = Jget (v); @@ -405,7 +405,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) key = xvasprintf (fmt, ap); va_end (ap); k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (k, KVS_PROTO_READDIR))) + if (!(in = kp_tget_enc (NULL, k, KVS_PROTO_READDIR))) goto done; if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) goto done; @@ -415,7 +415,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) errno = EPROTO; goto done; } - if (kp_rget_dec (out, &v) < 0) + if (kp_rget_dec (out, NULL, &v) < 0) goto done; *dir = kvsdir_alloc (h, k, v); rc = 0; diff --git a/src/modules/kvs/proto.c b/src/modules/kvs/proto.c index 4bc9d1e69e57..cd2ceb645078 100644 --- a/src/modules/kvs/proto.c +++ b/src/modules/kvs/proto.c @@ -44,7 +44,7 @@ /* kvs.get */ -JSON kp_tget_enc (const char *key, int flags) +JSON kp_tget_enc (JSON rootdir, const char *key, int flags) { JSON o = NULL; @@ -53,13 +53,15 @@ JSON kp_tget_enc (const char *key, int flags) goto done; } o = Jnew (); + if (rootdir) + Jadd_obj (o, "rootdir", rootdir); /* takes a ref on rootdir */ Jadd_str (o, "key", key); Jadd_int (o, "flags", flags); done: return o; } -int kp_tget_dec (JSON o, const char **key, int *flags) +int kp_tget_dec (JSON o, JSON *rootdir, const char **key, int *flags) { int rc = -1; @@ -71,34 +73,36 @@ int kp_tget_dec (JSON o, const char **key, int *flags) errno = EPROTO; goto done; } + if (rootdir) + *rootdir = Jobj_get (o, "rootdir"); rc = 0; done: return rc; } -JSON kp_rget_enc (JSON val) +JSON kp_rget_enc (JSON rootdir, JSON val) { JSON o = NULL; o = Jnew (); + json_object_object_add (o, "rootdir", rootdir); json_object_object_add (o, "val", val); return o; } -int kp_rget_dec (JSON o, JSON *val) +int kp_rget_dec (JSON o, JSON *rootdir, JSON *val) { int rc = -1; - JSON v = NULL; + JSON v; - if (!o || !val) { + if (!o || !(v = Jobj_get (o, "val"))) { errno = EINVAL; goto done; } - if (!(v = Jobj_get (o, "val"))) { - errno = EPROTO; - goto done; - } - *val = v; + if (val) + *val = v; + if (rootdir) + *rootdir = Jobj_get (o, "rootdir"); rc = 0; done: return rc; diff --git a/src/modules/kvs/proto.h b/src/modules/kvs/proto.h index 03ab7f001d5e..8122ba68cdf9 100644 --- a/src/modules/kvs/proto.h +++ b/src/modules/kvs/proto.h @@ -12,11 +12,13 @@ enum { /* kvs.get */ -json_object *kp_tget_enc (const char *key, int flags); -int kp_tget_dec (json_object *o, const char **key, int *flags); +json_object *kp_tget_enc (json_object *rootdir, + const char *key, int flags); +int kp_tget_dec (json_object *o, json_object **rootdir, + const char **key, int *flags); -json_object *kp_rget_enc (json_object *val); -int kp_rget_dec (json_object *o, json_object **val); +json_object *kp_rget_enc (json_object *rootdir, json_object *val); +int kp_rget_dec (json_object *o, json_object **rootdir, json_object **val); /* kvs.watch diff --git a/src/modules/kvs/test/proto.c b/src/modules/kvs/test/proto.c index e6c3acadcf41..7ba8d7840837 100644 --- a/src/modules/kvs/test/proto.c +++ b/src/modules/kvs/test/proto.c @@ -1,43 +1,64 @@ #include #include "src/common/libutil/shortjson.h" -#include "src/modules/kvs/proto.h" #include "src/common/libtap/tap.h" #include "src/common/libutil/log.h" #include "src/common/libutil/shortjson.h" #include "src/common/libutil/xzmalloc.h" +#include "src/modules/kvs/proto.h" +#include "src/modules/kvs/json_dirent.h" + void test_get (void) { JSON o; const char *key = NULL; JSON val = NULL; + JSON dirent = NULL; + JSON dirent2 = NULL; int i, flags; - o = kp_tget_enc ("foo", 42); + o = kp_tget_enc (NULL, "foo", 42); ok (o != NULL, "kp_tget_enc works"); diag ("get request: %s", Jtostr (o)); flags = 0; - ok (kp_tget_dec (o, &key, &flags) == 0 && flags == 42, + ok (kp_tget_dec (o, NULL, &key, &flags) == 0 && flags == 42, "kp_tget_dec works"); like (key, "^foo$", "kp_tget_dec returned encoded key"); Jput (o); + dirent = dirent_create ("DIRREF", "sha1-abcdefabcdef00000"); + o = kp_tget_enc (dirent, "foo", 42); + ok (o != NULL, + "kp_tget_enc with optional dirent arg works"); + diag ("get request: %s", Jtostr (o)); + flags = 0; + ok (kp_tget_dec (o, &dirent2, &key, &flags) == 0 && flags == 42, + "kp_tget_dec works"); + ok (dirent_validate (dirent2) && dirent_match (dirent, dirent2), + "kp_tget_dec returned dirent"); + Jput (dirent); + Jput (o); + + val = Jnew (); Jadd_int (val, "i", 42); - o = kp_rget_enc (val); + dirent = dirent_create ("DIRREF", "sha1-abcdefabcdef00000"); + o = kp_rget_enc (dirent, val); val = NULL; /* val now owned by o */ ok (o != NULL, "kp_rget_enc works"); diag ("get response: %s", Jtostr (o)); - ok (kp_rget_dec (o, &val) == 0, + ok (kp_rget_dec (o, &dirent2, &val) == 0, "kp_rget_dec works"); // get response: { "val": { "i": 42 } } i = 0; ok (val && Jget_int (val, "i", &i) && i == 42, "kp_rget_dec returned encoded object"); + ok (dirent_validate (dirent2) && dirent_match (dirent, dirent2), + "kp_rget_dec returned rootref"); Jput (o); /* owns val */ } From 0c969c92e4d3558f96d7eb88cc8913d63b9dd52c Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 11:38:39 -0700 Subject: [PATCH 02/25] modules/kvs: handle treeobj args in kvs.get request If a treeobj is provided in a kvs.get request, look up the key relative to this "snapshot". Return the treeobj used to look up the key in the response, whether it be the one provided in the request, or one representing the current root. This may be useful in a future API that allows the current version of a directory to be fetched and then walked as a snapshot. --- src/modules/kvs/kvs.c | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 172637c74c92..78bf0f2c4fe6 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -775,10 +775,8 @@ static bool lookup (ctx_t *ctx, json_object *root, wait_t *wait, json_object *vp, *dirent, *val = NULL; int errnum = 0; - if (root == NULL) { - if (!load (ctx, ctx->rootdir, wait, &root)) - goto stall; - } + assert (root != NULL); + if (!strcmp (name, ".")) { /* special case root */ if ((flags & KVS_PROTO_TREEOBJ)) { val = dirent_create ("DIRREF", ctx->rootdir); @@ -874,6 +872,10 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, int flags; const char *key; JSON val; + JSON root = NULL; + JSON root_dirent = NULL; + JSON tmp_dirent = NULL; + const char *root_ref = ctx->rootdir; wait_t *wait = NULL; int lookup_errnum = 0; int rc = -1; @@ -884,11 +886,24 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, errno = EPROTO; goto done; } - if (kp_tget_dec (in, NULL, &key, &flags) < 0) + if (kp_tget_dec (in, &root_dirent, &key, &flags) < 0) goto done; if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, arg))) goto done; - if (!lookup (ctx, NULL, wait, flags, key, &val, &lookup_errnum)) + /* If root dirent was specified, lookup corresponding 'root' directory. + * Otherwise, use the current root. + */ + if (root_dirent) { + if (!Jget_str (root_dirent, "DIRREF", &root_ref)) { + errno = EINVAL; + goto done; + } + } else { + root_dirent = tmp_dirent = dirent_create ("DIRREF", ctx->rootdir); + } + if (!load (ctx, root_ref, wait, &root)) + goto stall; + if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum)) goto stall; if (lookup_errnum != 0) { errno = lookup_errnum; @@ -898,7 +913,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, errno = ENOENT; goto done; } - if (!(out = kp_rget_enc (NULL, val))) + if (!(out = kp_rget_enc (Jget (root_dirent), val))) goto done; rc = 0; done: @@ -909,6 +924,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w, stall: Jput (in); Jput (out); + Jput (tmp_dirent); } static bool compare_json (json_object *o1, json_object *o2) @@ -928,6 +944,7 @@ static void watch_request_cb (flux_t h, flux_msg_handler_t *w, JSON in2 = NULL; JSON out = NULL; JSON oval, val = NULL; + JSON root = NULL; flux_msg_t *cpy = NULL; const char *key; int flags; @@ -946,7 +963,9 @@ static void watch_request_cb (flux_t h, flux_msg_handler_t *w, goto done; if (!(wait = wait_create_msg_handler (h, w, msg, watch_request_cb, arg))) goto done; - if (!lookup (ctx, NULL, wait, flags, key, &val, &lookup_errnum)) + if (!load (ctx, ctx->rootdir, wait, &root)) + goto stall; + if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum)) goto stall; if (lookup_errnum) { errno = lookup_errnum; From c36453cb7963071ae780b0722065911f323954cd Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 11:41:30 -0700 Subject: [PATCH 03/25] modules/kvs: add treeobj param to internal getobj Add treeobj argument to getobj(), an internal client function, so that we can implement kvs_getat() using it. --- src/modules/kvs/libkvs.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 4921da773119..cc2ca679cced 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -325,7 +325,8 @@ char *kvsdir_key_at (kvsdir_t *dir, const char *name) ** GET **/ -static int getobj (flux_t h, const char *key, int flags, json_object **val) +static int getobj (flux_t h, json_object *rootdir, const char *key, + int flags, json_object **val) { flux_rpc_t *rpc = NULL; const char *json_str; @@ -341,7 +342,7 @@ static int getobj (flux_t h, const char *key, int flags, json_object **val) goto done; } k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (NULL, k, flags))) + if (!(in = kp_tget_enc (rootdir, k, flags))) goto done; if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) goto done; @@ -371,7 +372,7 @@ int kvs_get (flux_t h, const char *key, char **val) { JSON v; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) return -1; if (val) *val = xstrdup (Jtostr (v)); @@ -382,7 +383,7 @@ int kvs_get (flux_t h, const char *key, char **val) /* deprecated */ int kvs_get_obj (flux_t h, const char *key, JSON *val) { - return getobj (h, key, 0, val); + return getobj (h, NULL, key, 0, val); } int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) @@ -435,7 +436,7 @@ int kvs_get_symlink (flux_t h, const char *key, char **val) JSON v = NULL; int rc = -1; - if (getobj (h, key, KVS_PROTO_READLINK, &v) < 0) + if (getobj (h, NULL, key, KVS_PROTO_READLINK, &v) < 0) goto done; if (json_object_get_type (v) != json_type_string) { errno = EPROTO; @@ -455,7 +456,7 @@ int kvs_get_treeobj (flux_t h, const char *key, char **val) const char *s; int rc = -1; - if (getobj (h, key, KVS_PROTO_TREEOBJ, &v) < 0) + if (getobj (h, NULL, key, KVS_PROTO_TREEOBJ, &v) < 0) goto done; if (val) { s = json_object_to_json_string_ext (v, JSON_C_TO_STRING_PLAIN); @@ -472,7 +473,7 @@ int kvs_get_string (flux_t h, const char *key, char **val) JSON v = NULL; int rc = -1; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) goto done; if (json_object_get_type (v) != json_type_string) { errno = EPROTO; @@ -491,7 +492,7 @@ int kvs_get_int (flux_t h, const char *key, int *val) JSON v = NULL; int rc = -1; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) goto done; if (json_object_get_type (v) != json_type_int) { errno = EPROTO; @@ -510,7 +511,7 @@ int kvs_get_int64 (flux_t h, const char *key, int64_t *val) JSON v = NULL; int rc = -1; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) goto done; if (json_object_get_type (v) != json_type_int) { errno = EPROTO; @@ -529,7 +530,7 @@ int kvs_get_double (flux_t h, const char *key, double *val) JSON v = NULL; int rc = -1; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) goto done; if (json_object_get_type (v) != json_type_double) { errno = EPROTO; @@ -548,7 +549,7 @@ int kvs_get_boolean (flux_t h, const char *key, bool *val) JSON v = NULL; int rc = -1; - if (getobj (h, key, 0, &v) < 0) + if (getobj (h, NULL, key, 0, &v) < 0) goto done; if (json_object_get_type (v) != json_type_boolean) { errno = EPROTO; @@ -1651,7 +1652,7 @@ int kvsdir_unlink (kvsdir_t *dir, const char *name) int kvs_copy (flux_t h, const char *from, const char *to) { JSON dirent; - if (getobj (h, from, KVS_PROTO_TREEOBJ, &dirent) < 0) + if (getobj (h, NULL, from, KVS_PROTO_TREEOBJ, &dirent) < 0) return -1; if (kvs_put_dirent (h, to, dirent) < 0) { Jput (dirent); From 166dc00f4d043d5b4ccef34b1cd5657c346d728a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 15 Sep 2016 00:10:46 +0000 Subject: [PATCH 04/25] modules/kvs: add kvs_getat() Add a new function kvs_getat() which is like kvs_get() but has a new "treeobj" argument that specifies the specific root (or other directory) "snapshot" that the key will be looked up in. Partially fixes #64 --- src/modules/kvs/kvs.h | 5 +++++ src/modules/kvs/libkvs.c | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/modules/kvs/kvs.h b/src/modules/kvs/kvs.h index 30c83dff663b..aca622e5994a 100644 --- a/src/modules/kvs/kvs.h +++ b/src/modules/kvs/kvs.h @@ -48,6 +48,11 @@ int kvs_get_symlink (flux_t h, const char *key, char **valp); */ int kvs_get_treeobj (flux_t h, const char *key, char **treeobj); +/* Like kvs_get() but lookup is relative to 'treeobj'. + */ +int kvs_getat (flux_t h, const char *treeobj, + const char *key, char **json_str); + /* kvs_watch* is like kvs_get* except the registered callback is called * to set the value. It will be called immediately to set the initial * value and again each time the value changes. diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index cc2ca679cced..f36d6b8a699f 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -380,6 +380,29 @@ int kvs_get (flux_t h, const char *key, char **val) return 0; } +int kvs_getat (flux_t h, const char *treeobj, + const char *key, char **val) +{ + JSON v = NULL; + JSON dirent = NULL; + + if (!treeobj || !key || !(dirent = Jfromstr (treeobj)) + || dirent_validate (dirent) < 0) { + errno = EINVAL; + goto error; + } + if (getobj (h, dirent, key, 0, &v) < 0) + goto error; + if (val) + *val = xstrdup (Jtostr (v)); + Jput (dirent); + return 0; +error: + Jput (v); + Jput (dirent); + return -1; +} + /* deprecated */ int kvs_get_obj (flux_t h, const char *key, JSON *val) { From f95f44a033c2d6352751012f135011c11cda16f8 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 15 Sep 2016 00:18:10 +0000 Subject: [PATCH 05/25] cmd/flux-kvs: add getat subcommand --- src/cmd/flux-kvs.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index f67276e72b90..166beb209ef3 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -66,6 +66,7 @@ void cmd_dir (flux_t h, int argc, char **argv); void cmd_dirsize (flux_t h, int argc, char **argv); void cmd_get_treeobj (flux_t h, int argc, char **argv); void cmd_put_treeobj (flux_t h, int argc, char **argv); +void cmd_getat (flux_t h, int argc, char **argv); void usage (void) @@ -93,6 +94,7 @@ void usage (void) " flux-kvs dropcache-all\n" " flux-kvs get-treeobj key\n" " flux-kvs put-treeobj key=treeobj\n" +" flux-kvs getat treeobj key\n" ); exit (1); } @@ -166,6 +168,8 @@ int main (int argc, char *argv[]) cmd_get_treeobj (h, argc - optind, argv + optind); else if (!strcmp (cmd, "put-treeobj")) cmd_put_treeobj (h, argc - optind, argv + optind); + else if (!strcmp (cmd, "getat")) + cmd_getat (h, argc - optind, argv + optind); else usage (); @@ -647,6 +651,17 @@ void cmd_get_treeobj (flux_t h, int argc, char **argv) free (treeobj); } +void cmd_getat (flux_t h, int argc, char **argv) +{ + char *val = NULL; + if (argc != 2) + log_msg_exit ("getat: specify treeobj and key"); + if (kvs_getat (h, argv[0], argv[1], &val) < 0) + log_err_exit ("kvs_getat %s %s", argv[0], argv[1]); + printf ("%s\n", val); + free (val); +} + void cmd_put_treeobj (flux_t h, int argc, char **argv) { if (argc != 1) From 523567248f3dc936d47a23f71fe669d0047a59ce Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 15 Sep 2016 00:39:39 +0000 Subject: [PATCH 06/25] test/kvs: add getat tests Exercise flux-kvs getat via t1000-kvs-basic.t sharness test. --- t/t1000-kvs-basic.t | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 152675c17671..9a68ea3e6906 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -355,6 +355,34 @@ test_expect_success 'kvs: put-treeobj: fails bad dirent: bad blobref' ' test_must_fail flux kvs put-treeobj $TEST.a="{\"DIRREF\":\"sha1-bbb\"}" ' +test_expect_success 'kvs: getat: fails bad on dirent' ' + flux kvs unlink $TEST && + test_must_fail flux kvs getat 42 $TEST.a && + test_must_fail flux kvs getat "{\"DIRREF\":\"sha2-aaa\"}" $TEST.a && + test_must_fail flux kvs getat "{\"DIRREF\":\"sha1-bbb\"}" $TEST.a && + test_must_fail flux kvs getat "{\"DIRVAL\":{}}" $TEST.a +' + +test_expect_success 'kvs: getat: works on root from get-treeobj' ' + flux kvs unlink $TEST && + flux kvs put $TEST.a.b.c=42 && + test $(flux kvs getat $(flux kvs get-treeobj .) $TEST.a.b.c) = 42 +' + +test_expect_success 'kvs: getat: works on subdir from get-treeobj' ' + flux kvs unlink $TEST && + flux kvs put $TEST.a.b.c=42 && + test $(flux kvs getat $(flux kvs get-treeobj $TEST.a.b) c) = 42 +' + +test_expect_success 'kvs: getat: works on outdated root' ' + flux kvs unlink $TEST && + flux kvs put $TEST.a.b.c=42 && + ROOTREF=$(flux kvs get-treeobj .) && + flux kvs put $TEST.a.b.c=43 && + test $(flux kvs getat $ROOTREF $TEST.a.b.c) = 42 +' + test_expect_success 'kvs: kvsdir_get_size works' ' flux kvs mkdir $TEST.dirsize && flux kvs put $TEST.dirsize.a=1 && From 656c47d465525a76bfaab0cb4a4cc7fbb2dff014 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 21 Sep 2016 13:07:44 -0700 Subject: [PATCH 07/25] doc/flux-kvs(1): describe gatat subcommand --- doc/man1/flux-kvs.adoc | 5 +++++ doc/test/spell.en.pws | 2 ++ 2 files changed, 7 insertions(+) diff --git a/doc/man1/flux-kvs.adoc b/doc/man1/flux-kvs.adoc index 9dda815211ae..9f870b4e5b62 100644 --- a/doc/man1/flux-kvs.adoc +++ b/doc/man1/flux-kvs.adoc @@ -128,6 +128,11 @@ Associate a new treeobj with 'key', overwriting the previous treeobj, if any. A treeobj should be treated as an opaque string value, which may contain spaces. +*getat* 'treeobj' 'key':: +Retrieve the value stored under 'key', starting the lookup at 'treeobj'. +If nothing has been stored under 'key', display an error message. + + AUTHOR ------ This page is maintained by the Flux community. diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index 650355aee407..cb40692dec7e 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -325,3 +325,5 @@ hwm recurse treeobj unlinked +getat +lookup From 0cdf008767fa27d78b19e8aad6484bffb0e331c9 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 14:58:39 -0700 Subject: [PATCH 08/25] modules/kvs: drop internal working directory abstraction The "cwd" abstraction in the KVS user facing API code was only used to store a directory prefix for the kvsdir_*() functions. Get rid of it and just use kvsdir_key_at() which is much clearer. --- src/modules/kvs/libkvs.c | 264 +++++++++++++-------------------------- 1 file changed, 90 insertions(+), 174 deletions(-) diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index f36d6b8a699f..1b2fd7108340 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -83,8 +83,6 @@ typedef struct { typedef struct { zhash_t *watchers; /* kvs_watch_t hashed by stringified matchtag */ - char *cwd; - zlist_t *dirstack; flux_msg_handler_t *w; json_object *ops; /* JSON array of put, unlink, etc operations */ zhash_t *fence_ops; @@ -99,12 +97,9 @@ static void freectx (void *arg) kvsctx_t *ctx = arg; if (ctx) { zhash_destroy (&ctx->watchers); - zlist_destroy (&ctx->dirstack); zhash_destroy (&ctx->fence_ops); if (ctx->w) flux_msg_handler_destroy (ctx->w); - if (ctx->cwd) - free (ctx->cwd); Jput (ctx->ops); free (ctx); } @@ -119,10 +114,6 @@ static kvsctx_t *getctx (flux_t h) ctx = xzmalloc (sizeof (*ctx)); if (!(ctx->watchers = zhash_new ())) oom (); - if (!(ctx->dirstack = zlist_new ())) - oom (); - if (!(ctx->cwd = xstrdup ("."))) - oom (); match.topic_glob = "kvs.watch"; if (!(ctx->w = flux_msg_handler_create (h, match, watch_response_cb, ctx))) @@ -132,70 +123,6 @@ static kvsctx_t *getctx (flux_t h) return ctx; } -/** - ** Current working directory implementation is just used internally - ** for now. I'm not sure it is all that useful of an abstractions - ** to expose in the KVS API. - **/ - -/* Create new path from current working directory and relative path. - * Confusing: "." is our path separator, so think of it as POSIX "/", - * and there is no equiv of POSIX "." and "..". - */ -static char *pathcat (const char *cwd, const char *relpath) -{ - char *path; - bool fq = false; - - while (*cwd == '.') - cwd++; - while (*relpath == '.') { - relpath++; - fq = true; - } - if (fq || strlen (cwd) == 0) - path = xstrdup (strlen (relpath) > 0 ? relpath : "."); - else - path = xasprintf ("%s.%s", cwd, relpath); - return path; -} - -const char *kvs_getcwd (flux_t h) -{ - kvsctx_t *ctx = getctx (h); - return ctx->cwd; -} - -#if 0 -static void kvs_chdir (flux_t h, const char *path) -{ - kvsctx_t *ctx = getctx (h); - char *new; - - new = pathcat (ctx->cwd, xstrdup (path ? path : ".")); - free (ctx->cwd); - ctx->cwd = new; -} -#endif -static void kvs_pushd (flux_t h, const char *path) -{ - kvsctx_t *ctx = getctx (h); - - if (zlist_push (ctx->dirstack, ctx->cwd) < 0) - oom (); - ctx->cwd = pathcat (ctx->cwd, path ? path : "."); -} - -static void kvs_popd (flux_t h) -{ - kvsctx_t *ctx = getctx (h); - - if (zlist_size (ctx->dirstack) > 0) { - free (ctx->cwd); - ctx->cwd = zlist_pop (ctx->dirstack); - } -} - /** ** kvsdir_t primary functions. ** A kvsdir_t is analagous to posix (DIR *). @@ -270,7 +197,7 @@ kvsitr_t *kvsitr_create (kvsdir_t *dir) void kvsitr_destroy (kvsitr_t *itr) { - free (itr); + free (itr); } const char *kvsitr_next (kvsitr_t *itr) @@ -330,10 +257,9 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, { flux_rpc_t *rpc = NULL; const char *json_str; - char *k = NULL; JSON in = NULL; JSON out = NULL; - JSON v; + JSON v = NULL; int saved_errno; int rc = -1; @@ -341,8 +267,7 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, errno = EINVAL; goto done; } - k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (rootdir, k, flags))) + if (!(in = kp_tget_enc (rootdir, key, flags))) goto done; if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) goto done; @@ -361,8 +286,6 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, saved_errno = errno; Jput (in); Jput (out); - if (k) - free (k); flux_rpc_destroy (rpc); errno = saved_errno; return rc; @@ -370,7 +293,7 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, int kvs_get (flux_t h, const char *key, char **val) { - JSON v; + JSON v = NULL; if (getobj (h, NULL, key, 0, &v) < 0) return -1; @@ -413,12 +336,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) { va_list ap; char *key = NULL; - char *k = NULL; - const char *json_str; - flux_rpc_t *rpc = NULL; - JSON in = NULL; - JSON out = NULL; - JSON v; + JSON v = NULL; int rc = -1; if (!h || !dir || !fmt) { @@ -428,29 +346,19 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) va_start (ap, fmt); key = xvasprintf (fmt, ap); va_end (ap); - k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (NULL, k, KVS_PROTO_READDIR))) - goto done; - if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) - goto done; - if (flux_rpc_get (rpc, NULL, &json_str) < 0) - goto done; - if (!(out = Jfromstr (json_str))) { - errno = EPROTO; - goto done; - } - if (kp_rget_dec (out, NULL, &v) < 0) + + /* N.B. python kvs tests use empty string key for some reason. + * Don't break them for now. + */ + const char *k = strlen (key) > 0 ? key : "."; + if (getobj (h, NULL, k, KVS_PROTO_READDIR, &v) < 0) goto done; *dir = kvsdir_alloc (h, k, v); rc = 0; done: - Jput (in); - Jput (out); - if (k) - free (k); + Jput (v); if (key) free (key); - flux_rpc_destroy (rpc); return rc; } @@ -1075,7 +983,6 @@ int kvs_watch_boolean (flux_t h, const char *key, kvs_set_boolean_f set, static int kvs_put_dirent (flux_t h, const char *key, json_object *dirent) { kvsctx_t *ctx = getctx (h); - char *k = NULL; int rc = -1; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; @@ -1083,11 +990,9 @@ static int kvs_put_dirent (flux_t h, const char *key, json_object *dirent) errno = EINVAL; goto done; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent); + dirent_append (ops, key, dirent); rc = 0; done: - free (k); return rc; } @@ -1227,7 +1132,6 @@ int kvs_symlink (flux_t h, const char *key, const char *target) { kvsctx_t *ctx = getctx (h); JSON val = NULL; - char *k = NULL; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; if (!h || !key || !target) { @@ -1238,9 +1142,7 @@ int kvs_symlink (flux_t h, const char *key, const char *target) errno = ENOMEM; return -1; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent_create ("LINKVAL", val)); - free (k); + dirent_append (ops, key, dirent_create ("LINKVAL", val)); Jput (val); return 0; } @@ -1249,16 +1151,13 @@ int kvs_mkdir (flux_t h, const char *key) { kvsctx_t *ctx = getctx (h); JSON val = Jnew (); - char *k = NULL; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; if (!h || !key) { errno = EINVAL; return -1; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent_create ("DIRVAL", val)); - free (k); + dirent_append (ops, key, dirent_create ("DIRVAL", val)); Jput (val); return 0; } @@ -1445,12 +1344,12 @@ int kvs_dropcache (flux_t h) int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) { + char *key = kvsdir_key_at (dir, name); int rc = -1; char *json_str = NULL; JSON out; - kvs_pushd (dir->handle, dir->key); - if (kvs_get (dir->handle, name, &json_str) < 0) + if (kvs_get (dir->handle, key, &json_str) < 0) goto done; if (!(out = Jfromstr (json_str))) { errno = EPROTO; @@ -1459,7 +1358,7 @@ int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) *valp = out; rc = 0; done: - kvs_popd (dir->handle); + free (key); if (json_str) free (json_str); return rc; @@ -1467,11 +1366,12 @@ int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) int kvsdir_get (kvsdir_t *dir, const char *name, char **valp) { + char *key; int rc; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get (dir->handle, key, valp); + free (key); return rc; } @@ -1479,7 +1379,7 @@ int kvsdir_get (kvsdir_t *dir, const char *name, char **valp) int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) { int rc; - char *name; + char *name, *key; va_list ap; va_start (ap, fmt); @@ -1487,9 +1387,9 @@ int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) oom (); va_end (ap); - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_dir (dir->handle, dirp, "%s", name); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_dir (dir->handle, dirp, "%s", key); + free (key); if (name) free (name); @@ -1499,10 +1399,11 @@ int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_symlink (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_symlink (dir->handle, key, valp); + free (key); return rc; } @@ -1510,10 +1411,11 @@ int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **valp) int kvsdir_get_string (kvsdir_t *dir, const char *name, char **valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_string (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_string (dir->handle, key, valp); + free (key); return rc; } @@ -1521,10 +1423,11 @@ int kvsdir_get_string (kvsdir_t *dir, const char *name, char **valp) int kvsdir_get_int (kvsdir_t *dir, const char *name, int *valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_int (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_int (dir->handle, key, valp); + free (key); return rc; } @@ -1532,10 +1435,11 @@ int kvsdir_get_int (kvsdir_t *dir, const char *name, int *valp) int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_int64 (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_int64 (dir->handle, key, valp); + free (key); return rc; } @@ -1543,10 +1447,11 @@ int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *valp) int kvsdir_get_double (kvsdir_t *dir, const char *name, double *valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_double (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_double (dir->handle, key, valp); + free (key); return rc; } @@ -1554,10 +1459,11 @@ int kvsdir_get_double (kvsdir_t *dir, const char *name, double *valp) int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *valp) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_boolean (dir->handle, name, valp); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_get_boolean (dir->handle, key, valp); + free (key); return rc; } @@ -1565,10 +1471,11 @@ int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *valp) int kvsdir_put_obj (kvsdir_t *dir, const char *name, json_object *val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put (dir->handle, name, Jtostr (val)); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put (dir->handle, key, Jtostr (val)); + free (key); return (rc); } @@ -1576,10 +1483,11 @@ int kvsdir_put_obj (kvsdir_t *dir, const char *name, json_object *val) int kvsdir_put (kvsdir_t *dir, const char *name, const char *val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put (dir->handle, key, val); + free (key); return (rc); } @@ -1587,10 +1495,11 @@ int kvsdir_put (kvsdir_t *dir, const char *name, const char *val) int kvsdir_put_string (kvsdir_t *dir, const char *name, const char *val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_string (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put_string (dir->handle, key, val); + free (key); return (rc); } @@ -1598,10 +1507,11 @@ int kvsdir_put_string (kvsdir_t *dir, const char *name, const char *val) int kvsdir_put_int (kvsdir_t *dir, const char *name, int val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_int (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put_int (dir->handle, key, val); + free (key); return (rc); } @@ -1609,10 +1519,11 @@ int kvsdir_put_int (kvsdir_t *dir, const char *name, int val) int kvsdir_put_int64 (kvsdir_t *dir, const char *name, int64_t val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_int64 (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put_int64 (dir->handle, key, val); + free (key); return (rc); } @@ -1620,10 +1531,11 @@ int kvsdir_put_int64 (kvsdir_t *dir, const char *name, int64_t val) int kvsdir_put_double (kvsdir_t *dir, const char *name, double val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_double (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put_double (dir->handle, key, val); + free (key); return (rc); } @@ -1631,10 +1543,11 @@ int kvsdir_put_double (kvsdir_t *dir, const char *name, double val) int kvsdir_put_boolean (kvsdir_t *dir, const char *name, bool val) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_boolean (dir->handle, name, val); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_put_boolean (dir->handle, key, val); + free (key); return (rc); } @@ -1642,10 +1555,11 @@ int kvsdir_put_boolean (kvsdir_t *dir, const char *name, bool val) int kvsdir_mkdir (kvsdir_t *dir, const char *name) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_mkdir (dir->handle, name); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_mkdir (dir->handle, key); + free (key); return (rc); } @@ -1653,10 +1567,11 @@ int kvsdir_mkdir (kvsdir_t *dir, const char *name) int kvsdir_symlink (kvsdir_t *dir, const char *name, const char *target) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_symlink (dir->handle, name, target); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_symlink (dir->handle, key, target); + free (key); return (rc); } @@ -1664,10 +1579,11 @@ int kvsdir_symlink (kvsdir_t *dir, const char *name, const char *target) int kvsdir_unlink (kvsdir_t *dir, const char *name) { int rc; + char *key; - kvs_pushd (dir->handle, dir->key); - rc = kvs_unlink (dir->handle, name); - kvs_popd (dir->handle); + key = kvsdir_key_at (dir, name); + rc = kvs_unlink (dir->handle, key); + free (key); return (rc); } From fc905eedb4b5ffca25bfba308a994037cc7814f6 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 15:23:39 -0700 Subject: [PATCH 09/25] modules/kvs: add kvs_get_dirat() Add kvs_get_dirat(), which is like kvs_get_dir() but looks up key relative to a snapshot reference. The snapshot reference is stored in the returned kvsdir_t object, and is then used in subsequent kvsdir_get_*() operations on that object. It is thus possible to recursively walk a snapshot of a section of the namespace by starting with kvs_get_dirat() and looking up subdirectories with kvsdir_get_dir(). Fixes #64. --- src/modules/kvs/kvs.h | 3 + src/modules/kvs/libkvs.c | 204 ++++++++++++++++++++++++++------------- 2 files changed, 138 insertions(+), 69 deletions(-) diff --git a/src/modules/kvs/kvs.h b/src/modules/kvs/kvs.h index aca622e5994a..983df75b9d90 100644 --- a/src/modules/kvs/kvs.h +++ b/src/modules/kvs/kvs.h @@ -52,6 +52,9 @@ int kvs_get_treeobj (flux_t h, const char *key, char **treeobj); */ int kvs_getat (flux_t h, const char *treeobj, const char *key, char **json_str); +int kvs_get_dirat (flux_t h, const char *treeobj, + const char *key, kvsdir_t **dirp); + /* kvs_watch* is like kvs_get* except the registered callback is called * to set the value. It will be called immediately to set the initial diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 1b2fd7108340..2b0289f6066b 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -55,6 +55,7 @@ struct kvsdir_struct { flux_t handle; + json_object *rootref; /* optional snapshot reference */ char *key; json_object *o; int count; @@ -136,17 +137,20 @@ void kvsdir_incref (kvsdir_t *dir) void kvsdir_destroy (kvsdir_t *dir) { if (--dir->usecount == 0) { + Jput (dir->rootref); free (dir->key); json_object_put (dir->o); free (dir); } } -static kvsdir_t *kvsdir_alloc (flux_t handle, const char *key, json_object *o) +static kvsdir_t *kvsdir_alloc (flux_t handle, json_object *rootref, + const char *key, json_object *o) { kvsdir_t *dir = xzmalloc (sizeof (*dir)); dir->handle = handle; + dir->rootref = Jget (rootref); /* may be NULL */ dir->key = xstrdup (key); dir->o = o; json_object_get (dir->o); @@ -326,6 +330,28 @@ int kvs_getat (flux_t h, const char *treeobj, return -1; } +int kvs_get_dirat (flux_t h, const char *treeobj, + const char *key, kvsdir_t **dir) +{ + JSON v = NULL; + JSON rootref = NULL; + int rc = -1; + + if (!treeobj || !key || !dir || !(rootref = Jfromstr (treeobj)) + || dirent_validate (rootref) < 0) { + errno = EINVAL; + goto done; + } + if (getobj (h, rootref, key, KVS_PROTO_READDIR, &v) < 0) + goto done; + *dir = kvsdir_alloc (h, rootref, key, v); + rc = 0; +done: + Jput (v); + Jput (rootref); + return rc; +} + /* deprecated */ int kvs_get_obj (flux_t h, const char *key, JSON *val) { @@ -353,7 +379,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) const char *k = strlen (key) > 0 ? key : "."; if (getobj (h, NULL, k, KVS_PROTO_READDIR, &v) < 0) goto done; - *dir = kvsdir_alloc (h, k, v); + *dir = kvsdir_alloc (h, NULL, k, v); rc = 0; done: Jput (v); @@ -611,7 +637,7 @@ static int dispatch_watch (flux_t h, kvs_watcher_t *wp, json_object *val) } case WATCH_DIR: { kvs_set_dir_f set = wp->set; - kvsdir_t *dir = val ? kvsdir_alloc (h, wp->key, val) : NULL; + kvsdir_t *dir = val ? kvsdir_alloc (h, NULL, wp->key, val) : NULL; rc = set (wp->key, dir, wp->arg, errnum); if (dir) kvsdir_destroy (dir); @@ -810,7 +836,7 @@ int kvs_watch_once_dir (flux_t h, kvsdir_t **dirp, const char *fmt, ...) } if (*dirp) kvsdir_destroy (*dirp); - *dirp = kvsdir_alloc (h, key, val); + *dirp = kvsdir_alloc (h, NULL, key, val); rc = 0; done: if (val) @@ -1342,45 +1368,40 @@ int kvs_dropcache (flux_t h) ** kvsdir_t convenience functions **/ -int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) +static int dirgetobj (kvsdir_t *dir, const char *name, + int flags, json_object **val) { - char *key = kvsdir_key_at (dir, name); - int rc = -1; - char *json_str = NULL; - JSON out; + char *key; + int rc; - if (kvs_get (dir->handle, key, &json_str) < 0) - goto done; - if (!(out = Jfromstr (json_str))) { - errno = EPROTO; - goto done; - } - *valp = out; - rc = 0; -done: + key = kvsdir_key_at (dir, name); + rc = getobj (dir->handle, dir->rootref, key, flags, val); free (key); - if (json_str) - free (json_str); return rc; } -int kvsdir_get (kvsdir_t *dir, const char *name, char **valp) +int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) { - char *key; - int rc; - - key = kvsdir_key_at (dir, name); - rc = kvs_get (dir->handle, key, valp); - free (key); + return dirgetobj (dir, name, 0, valp); +} - return rc; +int kvsdir_get (kvsdir_t *dir, const char *name, char **valp) +{ + JSON v = NULL; + if (dirgetobj (dir, name, 0, &v) < 0) + return -1; + if (valp) + *valp = xstrdup (Jtostr (v)); + Jput (v); + return 0; } int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) { - int rc; + int rc = -1; char *name, *key; va_list ap; + JSON v = NULL; va_start (ap, fmt); if (vasprintf (&name, fmt, ap) < 0) @@ -1388,83 +1409,128 @@ int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) va_end (ap); key = kvsdir_key_at (dir, name); - rc = kvs_get_dir (dir->handle, dirp, "%s", key); + if (getobj (dir->handle, dir->rootref, key, KVS_PROTO_READDIR, &v) < 0) + goto done; + *dirp = kvsdir_alloc (dir->handle, dir->rootref, key, v); + rc = 0; +done: + Jput (v); free (key); - - if (name) - free (name); + free (name); return rc; } int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_symlink (dir->handle, key, valp); - free (key); + int rc = -1; + JSON v = NULL; + if (dirgetobj (dir, name, KVS_PROTO_READLINK, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_string) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = xstrdup (json_object_get_string (v)); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_get_string (kvsdir_t *dir, const char *name, char **valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_string (dir->handle, key, valp); - free (key); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_string) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = xstrdup (json_object_get_string (v)); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_get_int (kvsdir_t *dir, const char *name, int *valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_int (dir->handle, key, valp); - free (key); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_int) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = json_object_get_int (v); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_int64 (dir->handle, key, valp); - free (key); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_int) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = json_object_get_int64 (v); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_get_double (kvsdir_t *dir, const char *name, double *valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_double (dir->handle, key, valp); - free (key); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_double) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = json_object_get_double (v); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *valp) { - int rc; - char *key; - - key = kvsdir_key_at (dir, name); - rc = kvs_get_boolean (dir->handle, key, valp); - free (key); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_boolean) { + errno = EPROTO; + goto done; + } + if (valp) + *valp = json_object_get_boolean (v); + rc = 0; +done: + Jput (v); return rc; } From a4a611ece8a3de60f7cb5d6a4cb9f92a33101b66 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 15:28:16 -0700 Subject: [PATCH 10/25] modules/kvs: kvsdir_put_*() fail on snapshot If kvsdir_put_*() is used on a kvsdir_t created by kvsdir_get_at(), or kvsdir_get_dir() if the original directory was created with kvsdir_get_at(), then immediatley fail with errno == EROFS. Although allowing the put would have no effect on snapshot integrity, this at least discourages users from assuming that kvsdir_get() tracks kvsdir_put() due to read-your-writes consistency. Since a kvsdir_t's internal root reference does not change once fixed, kvsdir_get_*() will never see new data. --- src/modules/kvs/libkvs.c | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 2b0289f6066b..4d2290fa0602 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -1539,6 +1539,10 @@ int kvsdir_put_obj (kvsdir_t *dir, const char *name, json_object *val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put (dir->handle, key, Jtostr (val)); free (key); @@ -1551,6 +1555,10 @@ int kvsdir_put (kvsdir_t *dir, const char *name, const char *val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put (dir->handle, key, val); free (key); @@ -1563,6 +1571,10 @@ int kvsdir_put_string (kvsdir_t *dir, const char *name, const char *val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put_string (dir->handle, key, val); free (key); @@ -1575,6 +1587,10 @@ int kvsdir_put_int (kvsdir_t *dir, const char *name, int val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put_int (dir->handle, key, val); free (key); @@ -1587,6 +1603,10 @@ int kvsdir_put_int64 (kvsdir_t *dir, const char *name, int64_t val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put_int64 (dir->handle, key, val); free (key); @@ -1599,6 +1619,10 @@ int kvsdir_put_double (kvsdir_t *dir, const char *name, double val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put_double (dir->handle, key, val); free (key); @@ -1611,6 +1635,10 @@ int kvsdir_put_boolean (kvsdir_t *dir, const char *name, bool val) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_put_boolean (dir->handle, key, val); free (key); @@ -1623,6 +1651,10 @@ int kvsdir_mkdir (kvsdir_t *dir, const char *name) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_mkdir (dir->handle, key); free (key); @@ -1635,6 +1667,10 @@ int kvsdir_symlink (kvsdir_t *dir, const char *name, const char *target) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_symlink (dir->handle, key, target); free (key); @@ -1647,6 +1683,10 @@ int kvsdir_unlink (kvsdir_t *dir, const char *name) int rc; char *key; + if (dir->rootref) { + errno = EROFS; + return -1; + } key = kvsdir_key_at (dir, name); rc = kvs_unlink (dir->handle, key); free (key); From 90e984283b62075fa51f9b1f25f3db7a2e5717aa Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 21 Sep 2016 14:50:33 -0700 Subject: [PATCH 11/25] cmd/flux-kvs: add dirat subcommand Add dirat subcommand: Usage: flux-kvs dirat [-r] treeobj key This command demonstrates walking a snapshot. --- src/cmd/flux-kvs.c | 61 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index 166beb209ef3..930edb6bf1da 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -67,6 +67,7 @@ void cmd_dirsize (flux_t h, int argc, char **argv); void cmd_get_treeobj (flux_t h, int argc, char **argv); void cmd_put_treeobj (flux_t h, int argc, char **argv); void cmd_getat (flux_t h, int argc, char **argv); +void cmd_dirat (flux_t h, int argc, char **argv); void usage (void) @@ -95,6 +96,7 @@ void usage (void) " flux-kvs get-treeobj key\n" " flux-kvs put-treeobj key=treeobj\n" " flux-kvs getat treeobj key\n" +" flux-kvs dirat [-r] treeobj [key]\n" ); exit (1); } @@ -170,6 +172,8 @@ int main (int argc, char *argv[]) cmd_put_treeobj (h, argc - optind, argv + optind); else if (!strcmp (cmd, "getat")) cmd_getat (h, argc - optind, argv + optind); + else if (!strcmp (cmd, "dirat")) + cmd_dirat (h, argc - optind, argv + optind); else usage (); @@ -521,34 +525,34 @@ static void dump_kvs_val (const char *key, const char *json_str) Jput (o); } -static void dump_kvs_dir (flux_t h, bool ropt, const char *path) +static void dump_kvs_dir (kvsdir_t *dir, bool ropt) { - kvsdir_t *dir; kvsitr_t *itr; const char *name; char *key; - if (kvs_get_dir (h, &dir, "%s", path) < 0) - log_err_exit ("%s", path); - itr = kvsitr_create (dir); while ((name = kvsitr_next (itr))) { key = kvsdir_key_at (dir, name); if (kvsdir_issymlink (dir, name)) { char *link; - if (kvs_get_symlink (h, key, &link) < 0) + if (kvsdir_get_symlink (dir, name, &link) < 0) log_err_exit ("%s", key); printf ("%s -> %s\n", key, link); free (link); } else if (kvsdir_isdir (dir, name)) { - if (ropt) - dump_kvs_dir (h, ropt, key); - else + if (ropt) { + kvsdir_t *ndir; + if (kvsdir_get_dir (dir, &ndir, "%s", name) < 0) + log_err_exit ("%s", key); + dump_kvs_dir (ndir, ropt); + kvsdir_destroy (ndir); + } else printf ("%s.\n", key); } else { char *json_str; - if (kvs_get (h, key, &json_str) < 0) + if (kvsdir_get (dir, name, &json_str) < 0) log_err_exit ("%s", key); dump_kvs_val (key, json_str); free (json_str); @@ -556,7 +560,6 @@ static void dump_kvs_dir (flux_t h, bool ropt, const char *path) free (key); } kvsitr_destroy (itr); - kvsdir_destroy (dir); } void cmd_watch_dir (flux_t h, int argc, char **argv) @@ -583,7 +586,8 @@ void cmd_watch_dir (flux_t h, int argc, char **argv) kvsdir_destroy (dir); dir = NULL; } else { - dump_kvs_dir (h, ropt, key); + dump_kvs_dir (dir, ropt); + kvsdir_destroy (dir); printf ("======================\n"); } rc = kvs_watch_once_dir (h, &dir, "%s", key); @@ -595,6 +599,8 @@ void cmd_watch_dir (flux_t h, int argc, char **argv) void cmd_dir (flux_t h, int argc, char **argv) { bool ropt = false; + char *key; + kvsdir_t *dir; if (argc > 0 && !strcmp (argv[0], "-r")) { ropt = true; @@ -602,11 +608,38 @@ void cmd_dir (flux_t h, int argc, char **argv) argv++; } if (argc == 0) - dump_kvs_dir (h, ropt, "."); + key = "."; else if (argc == 1) - dump_kvs_dir (h, ropt, argv[0]); + key = argv[0]; else log_msg_exit ("dir: specify zero or one directory"); + if (kvs_get_dir (h, &dir, "%s", key) < 0) + log_err_exit ("%s", key); + dump_kvs_dir (dir, ropt); + kvsdir_destroy (dir); +} + +void cmd_dirat (flux_t h, int argc, char **argv) +{ + bool ropt = false; + char *key; + kvsdir_t *dir; + + if (argc > 0 && !strcmp (argv[0], "-r")) { + ropt = true; + argc--; + argv++; + } + if (argc == 1) + key = "."; + else if (argc == 2) + key = argv[1]; + else + log_msg_exit ("dir: specify treeobj and zero or one directory"); + if (kvs_get_dirat (h, argv[0], key, &dir) < 0) + log_err_exit ("%s", key); + dump_kvs_dir (dir, ropt); + kvsdir_destroy (dir); } void cmd_dirsize (flux_t h, int argc, char **argv) From 586a46c661763141ad30997cd708ebf52de0069d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 23 Sep 2016 10:34:42 -0700 Subject: [PATCH 12/25] doc/flux-kvs(1): describe dirat subcommand --- doc/man1/flux-kvs.adoc | 6 ++++++ doc/test/spell.en.pws | 1 + 2 files changed, 7 insertions(+) diff --git a/doc/man1/flux-kvs.adoc b/doc/man1/flux-kvs.adoc index 9f870b4e5b62..3946ae15ff5e 100644 --- a/doc/man1/flux-kvs.adoc +++ b/doc/man1/flux-kvs.adoc @@ -132,6 +132,12 @@ may contain spaces. Retrieve the value stored under 'key', starting the lookup at 'treeobj'. If nothing has been stored under 'key', display an error message. +*dirat* 'treeobj' ['key']:: +Display all keys and their values under the directory 'key', starting +lookup at 'treeobj'. If 'key' does not exist or is not a directory, +display an error message. If 'key' is not provided, "." (root of +the namespace) is assumed. + AUTHOR ------ diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index cb40692dec7e..58a18510e97e 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -327,3 +327,4 @@ treeobj unlinked getat lookup +dirat From 57aab34b449e00fa7dc4b500048d5b99640c7c5d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 28 Sep 2016 13:57:24 -0700 Subject: [PATCH 13/25] modules/kvs: add kvs_get_symlinkat() --- src/modules/kvs/kvs.h | 2 ++ src/modules/kvs/libkvs.c | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/modules/kvs/kvs.h b/src/modules/kvs/kvs.h index 983df75b9d90..d511e201e133 100644 --- a/src/modules/kvs/kvs.h +++ b/src/modules/kvs/kvs.h @@ -54,6 +54,8 @@ int kvs_getat (flux_t h, const char *treeobj, const char *key, char **json_str); int kvs_get_dirat (flux_t h, const char *treeobj, const char *key, kvsdir_t **dirp); +int kvs_get_symlinkat (flux_t h, const char *treeobj, + const char *key, char **val); /* kvs_watch* is like kvs_get* except the registered callback is called diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 4d2290fa0602..e8ef7547dd4f 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -352,6 +352,33 @@ int kvs_get_dirat (flux_t h, const char *treeobj, return rc; } +int kvs_get_symlinkat (flux_t h, const char *treeobj, + const char *key, char **val) +{ + JSON v = NULL; + JSON dirent = NULL; + int rc = -1; + + if (!treeobj || !key || !(dirent = Jfromstr (treeobj)) + || dirent_validate (dirent) < 0) { + errno = EINVAL; + goto done; + } + if (getobj (h, dirent, key, KVS_PROTO_READLINK, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_string) { + errno = EPROTO; + goto done; + } + if (val) + *val = xstrdup (json_object_get_string (v)); + rc = 0; +done: + Jput (v); + Jput (dirent); + return rc; +} + /* deprecated */ int kvs_get_obj (flux_t h, const char *key, JSON *val) { From 459355d72175b2f53c080a08e971d1c6dc964eee Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 23 Sep 2016 11:20:44 -0700 Subject: [PATCH 14/25] cmd/flux-kvs: add readlinkat subcommand --- src/cmd/flux-kvs.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index 930edb6bf1da..26ed02df9777 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -68,6 +68,7 @@ void cmd_get_treeobj (flux_t h, int argc, char **argv); void cmd_put_treeobj (flux_t h, int argc, char **argv); void cmd_getat (flux_t h, int argc, char **argv); void cmd_dirat (flux_t h, int argc, char **argv); +void cmd_readlinkat (flux_t h, int argc, char **argv); void usage (void) @@ -97,6 +98,7 @@ void usage (void) " flux-kvs put-treeobj key=treeobj\n" " flux-kvs getat treeobj key\n" " flux-kvs dirat [-r] treeobj [key]\n" +" flux-kvs readlinkat treeobj key\n" ); exit (1); } @@ -174,6 +176,8 @@ int main (int argc, char *argv[]) cmd_getat (h, argc - optind, argv + optind); else if (!strcmp (cmd, "dirat")) cmd_dirat (h, argc - optind, argv + optind); + else if (!strcmp (cmd, "readlinkat")) + cmd_readlinkat (h, argc - optind, argv + optind); else usage (); @@ -711,6 +715,22 @@ void cmd_put_treeobj (flux_t h, int argc, char **argv) } +void cmd_readlinkat (flux_t h, int argc, char **argv) +{ + int i; + char *target; + + if (argc < 2) + log_msg_exit ("readlink: specify treeobj and one or more keys"); + for (i = 1; i < argc; i++) { + if (kvs_get_symlinkat (h, argv[0], argv[i], &target) < 0) + log_err_exit ("%s", argv[i]); + else + printf ("%s\n", target); + free (target); + } +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ From 41ff8009fdda442d33b085337ba28efb1cca9e00 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 23 Sep 2016 11:38:15 -0700 Subject: [PATCH 15/25] doc/flux-kvs(1): describe readlinkat subcommand --- doc/man1/flux-kvs.adoc | 4 ++++ doc/test/spell.en.pws | 1 + 2 files changed, 5 insertions(+) diff --git a/doc/man1/flux-kvs.adoc b/doc/man1/flux-kvs.adoc index 3946ae15ff5e..51c2294f106a 100644 --- a/doc/man1/flux-kvs.adoc +++ b/doc/man1/flux-kvs.adoc @@ -138,6 +138,10 @@ lookup at 'treeobj'. If 'key' does not exist or is not a directory, display an error message. If 'key' is not provided, "." (root of the namespace) is assumed. +*readlinkat* 'treeobj' 'key':: +Retrieve the key a link refers to rather than its value, as would be +returned by *get*, starting lookup at 'treeobj'. + AUTHOR ------ diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index 58a18510e97e..252139d6f571 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -328,3 +328,4 @@ unlinked getat lookup dirat +readlinkat From 8f5d983a6a0d982fe1b37cd9356274793a03bbd0 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 23 Sep 2016 11:29:15 -0700 Subject: [PATCH 16/25] test/kvs: add coverage for kvs_get_symlinkat --- t/t1000-kvs-basic.t | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 9a68ea3e6906..54e3346bc2c2 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -282,6 +282,15 @@ test_expect_success 'kvs: symlink: kvs_copy does not follow symlinks (bottom)' ' test "$LINKVAL" = "$TEST.a.b.X" ' +test_expect_success 'kvs: get_symlinkat works after symlink unlinked' ' + flux kvs unlink $TEST && + flux kvs link $TEST.a.b.X $TEST.a.b.link && + ROOTREF=$(flux kvs get-treeobj .) && + flux kvs unlink $TEST && + LINKVAL=$(flux kvs readlinkat $ROOTREF $TEST.a.b.link) && + test "$LINKVAL" = "$TEST.a.b.X" +' + test_expect_success 'kvs: get-treeobj: returns directory reference for root' ' flux kvs unlink $TEST && flux kvs get-treeobj . | grep -q "DIRREF" From 03074b2f1e49724a5a4a1adfd00221c42c1be330 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2016 15:50:44 -0700 Subject: [PATCH 17/25] libpmi/single: use getpid for appnum not -1 Problem: a single rank session sets the "session id" to -1, and when this is used to create the scratch directory, breaks flux list-instances. Use getpid() for the appnum instead of -1 in the libpmi "single" (standalone) personality. Fixes #797 --- src/common/libpmi/single.c | 2 +- src/common/libpmi/test/single.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/libpmi/single.c b/src/common/libpmi/single.c index d89b00f6bd4d..1762af74f38e 100644 --- a/src/common/libpmi/single.c +++ b/src/common/libpmi/single.c @@ -79,7 +79,7 @@ int pmi_single_get_rank (struct pmi_single *pmi, int *rank) int pmi_single_get_appnum (struct pmi_single *pmi, int *appnum) { - *appnum = -1; + *appnum = getpid (); return PMI_SUCCESS; } diff --git a/src/common/libpmi/test/single.c b/src/common/libpmi/test/single.c index ce282c77649c..12de5d357693 100644 --- a/src/common/libpmi/test/single.c +++ b/src/common/libpmi/test/single.c @@ -34,8 +34,8 @@ int main (int argc, char *argv[]) "pmi_single_get_rank works, rank == 0"); appnum = -2; rc = pmi_single_get_appnum (pmi, &appnum); - ok (rc == PMI_SUCCESS && appnum == -1, - "pmi_single_get_appnum works, appnum == -1"); + ok (rc == PMI_SUCCESS && appnum >= 0, + "pmi_single_get_appnum works, appnum positive number"); size = -1; rc = pmi_single_get_universe_size (pmi, &size); ok (rc == PMI_SUCCESS && size == 1, From 0af9e015af2ba1a4ae41698907b00d98484c48b5 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2016 16:04:36 -0700 Subject: [PATCH 18/25] modules/kvs: fix memory leak Add a missing free() found by grondo. Fixes #826. --- src/modules/kvs/kvs.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 78bf0f2c4fe6..83e6d31867a9 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -1089,6 +1089,7 @@ static void fence_destroy (fence_t *f) /* FIXME: respond with error here? */ zlist_destroy (&f->requests); } + free (f); } } From e64d7ac6f8c7e303bf4f7fe605c86fcb0e7bade8 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 29 Sep 2016 16:14:24 -0700 Subject: [PATCH 19/25] broker/hello: fix memory leak Add missing json free found by grondo. Fixes #827. --- src/broker/hello.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/broker/hello.c b/src/broker/hello.c index 9d1ab3f1395c..b29132509891 100644 --- a/src/broker/hello.c +++ b/src/broker/hello.c @@ -229,6 +229,7 @@ static void join_request (flux_t h, flux_msg_handler_t *w, log_msg_exit ("hello: error decoding join request"); if (flux_reduce_append (hello->reduce, (void *)(uintptr_t)count, batch) < 0) log_err_exit ("hello: flux_reduce_append"); + Jput (in); } /* Reduction ops From 1dc799d0e606f34bb45d3791c226f977a474f4a1 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 30 Sep 2016 17:30:25 +0000 Subject: [PATCH 20/25] modules/kvs: fix memory leak when commit fails If a commit fails before the working copy of the root directory is passed to store(), it needs to be freed when the fence struct is freed. Fixes #825 --- src/modules/kvs/kvs.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 83e6d31867a9..3984448d62cc 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -506,7 +506,7 @@ static void commit_apply_fence (fence_t *f) /* Make a copy of the root directory. */ - if (!f->rootcpy) { + if (!f->rootcpy_stored && !f->rootcpy) { json_object *rootdir; if (!load (ctx, ctx->rootdir, wait, &rootdir)) goto stall; @@ -552,6 +552,7 @@ static void commit_apply_fence (fence_t *f) else if (store (ctx, f->rootcpy, f->newroot, wait) < 0) f->errnum = errno; f->rootcpy_stored = true; /* cache takes ownership of rootcpy */ + f->rootcpy = NULL; if (wait_get_usecount (wait) > 0) goto stall; } @@ -1082,6 +1083,7 @@ static void fence_destroy (fence_t *f) if (f) { Jput (f->names); Jput (f->ops); + Jput (f->rootcpy); if (f->requests) { flux_msg_t *msg; while ((msg = zlist_pop (f->requests))) From 5ae0b7242f0417d7f49bf26c608bd905970af65b Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Oct 2016 09:28:55 -0700 Subject: [PATCH 21/25] test/kvs: test flux kvs dirat Use the 'flux kvs dirat' command to walk a directory by reference after it has been unlinked. This adds minimal test coverage for kvs_get_dirat(). --- t/t1000-kvs-basic.t | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 54e3346bc2c2..14dd724babf5 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -409,6 +409,12 @@ test_expect_success 'kvs: walk 16x3 directory tree' ' test $(flux kvs dir -r $TEST.dtree | wc -l) = 4096 ' +test_expect_success 'kvs: unlink, walk 16x3 directory tree with dirat' ' + DIRREF=$(flux kvs get-treeobj $TEST.dtree) && + flux kvs unlink $TEST.dtree && + test $(flux kvs dirat -r $DIRREF | wc -l) = 4096 +' + test_expect_success 'kvs: put key of . fails' ' test_must_fail flux kvs put .=1 ' From 9d570df71498bb33faf48629ce600af7f2bc9b8e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Oct 2016 09:47:46 -0700 Subject: [PATCH 22/25] test/kvs: test kvsdir accessors Walk a directory with "flux kvs dir" that includes a symlink, booelan, and double to increase coverage for kvsdir_get_() functions. --- t/t1000-kvs-basic.t | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 14dd724babf5..25215d719872 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -415,6 +415,19 @@ test_expect_success 'kvs: unlink, walk 16x3 directory tree with dirat' ' test $(flux kvs dirat -r $DIRREF | wc -l) = 4096 ' +test_expect_success 'kvs: store 2x4 directory tree and walk' ' + ${FLUX_BUILD_DIR}/t/kvs/dtree -h4 -w2 --prefix $TEST.dtree + test $(flux kvs dir -r $TEST.dtree | wc -l) = 16 +' + +# exercise kvsdir_get_symlink, _double, _boolean, +test_expect_success 'kvs: add other types to 2x4 directory and walk' ' + flux kvs link $TEST.dtree $TEST.dtree.link && + flux kvs put $TEST.dtree.double=3.14 && + flux kvs put $TEST.dtree.booelan=true && + test $(flux kvs dir -r $TEST.dtree | wc -l) = 19 +' + test_expect_success 'kvs: put key of . fails' ' test_must_fail flux kvs put .=1 ' From d6e98d4c8d187eefaede0cb6e47b5029b41de2ce Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Oct 2016 10:09:02 -0700 Subject: [PATCH 23/25] test/kvs: add coverage for kvsdir_put_*() functions Add a --mkdir option to t/kvs/dtree which creates the directory tree recursively using kvsdir_put() functions, with the intention of increasing coverage for hte kvsidr_put_*() functions. Drive dtree --mkdir via the kvs-basic sharness test. --- t/kvs/dtree.c | 55 ++++++++++++++++++++++++++++++++++++++++++--- t/t1000-kvs-basic.t | 6 +++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/t/kvs/dtree.c b/t/kvs/dtree.c index 5ee13372bcc9..6178672b03ae 100644 --- a/t/kvs/dtree.c +++ b/t/kvs/dtree.c @@ -34,20 +34,22 @@ #include "src/common/libutil/log.h" -#define OPTIONS "p:w:h:" +#define OPTIONS "p:w:h:D" static const struct option longopts[] = { {"prefix", required_argument, 0, 'p'}, {"width", required_argument, 0, 'w'}, {"height", required_argument, 0, 'h'}, + {"mkdir", no_argument, 0, 'D'}, { 0, 0, 0, 0 }, }; void dtree (flux_t h, const char *prefix, int width, int height); +void dtree_mkdir (flux_t h, kvsdir_t *dir, int width, int height); void usage (void) { fprintf (stderr, -"Usage: dtree [--prefix NAME] [--width N] [--height N]\n" +"Usage: dtree [--mkdir] [--prefix NAME] [--width N] [--height N]\n" ); exit (1); } @@ -58,6 +60,7 @@ int main (int argc, char *argv[]) int width = 1; int height = 1; char *prefix = "dtree"; + int Dopt = 0; flux_t h; log_init ("dtree"); @@ -73,6 +76,9 @@ int main (int argc, char *argv[]) case 'p': /* --prefix NAME */ prefix = optarg; break; + case 'D': /* --mkdir */ + Dopt++; + break; default: usage (); break; @@ -84,12 +90,27 @@ int main (int argc, char *argv[]) usage (); if (!(h = flux_open (NULL, 0))) log_err_exit ("flux_open"); - dtree (h, prefix, width, height); + if (Dopt) { + kvsdir_t *dir; + if (kvs_mkdir (h, prefix) < 0) + log_err_exit ("kvs_mkdir %s", prefix); + if (kvs_commit (h) < 0) + log_err_exit ("kvs_commit"); + if (kvs_get_dir (h, &dir, "%s", prefix) < 0) + log_err_exit ("kvs_get_dir %s", prefix); + dtree_mkdir (h, dir, width, height); + kvsdir_destroy (dir); + } else { + dtree (h, prefix, width, height); + } if (kvs_commit (h) < 0) log_err_exit ("kvs_commit"); flux_close (h); } +/* This version simply puts keys and values, creating intermediate + * directories as a side effect. + */ void dtree (flux_t h, const char *prefix, int width, int height) { int i; @@ -106,6 +127,34 @@ void dtree (flux_t h, const char *prefix, int width, int height) } } +/* This version creates intermediate directories and references them + * using kvsdir_t objects. This is a less efficient method but provides + * alternate code coverage. + */ +void dtree_mkdir (flux_t h, kvsdir_t *dir, int width, int height) +{ + int i; + char key[16]; + kvsdir_t *ndir; + + for (i = 0; i < width; i++) { + snprintf (key, sizeof (key), "%.4x", i); + if (height == 1) { + if (kvsdir_put_int (dir, key, 1) < 0) + log_err_exit ("kvsdir_put_int %s", key); + } else { + if (kvsdir_mkdir (dir, key) < 0) + log_err_exit ("kvsdir_mkdir %s", key); + if (kvs_commit (h) < 0) + log_err_exit ("kvs_commit"); + if (kvsdir_get_dir (dir, &ndir, "%s", key) < 0) + log_err_exit ("kvsdir_get_dir"); + dtree_mkdir (h, ndir, width, height - 1); + kvsdir_destroy (ndir); + } + } +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 25215d719872..92c9ecfe6617 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -428,6 +428,12 @@ test_expect_success 'kvs: add other types to 2x4 directory and walk' ' test $(flux kvs dir -r $TEST.dtree | wc -l) = 19 ' +test_expect_success 'kvs: store 3x4 directory tree using kvsdir_put functions' ' + flux kvs unlink $TEST.dtree && + ${FLUX_BUILD_DIR}/t/kvs/dtree --mkdir -h4 -w3 --prefix $TEST.dtree && + test $(flux kvs dir -r $TEST.dtree | wc -l) = 81 +' + test_expect_success 'kvs: put key of . fails' ' test_must_fail flux kvs put .=1 ' From fc7f2291df5edb08524922356d549da20ad5db86 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Oct 2016 13:38:20 -0700 Subject: [PATCH 24/25] modules/kvs: fix memory leak in commit_merge_all() An extra JSON reference was being taken in the commit merging code on rank 0. Drop the extra Jget(). Fixes #825 --- src/modules/kvs/kvs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 3984448d62cc..5eab1861805e 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -626,7 +626,7 @@ void commit_merge_all (ctx_t *ctx) for (i = 0; i < len; i++) { json_object *op; if (Jget_ar_obj (nf->ops, i, &op)) - Jadd_ar_obj (f->ops, Jget (op)); + Jadd_ar_obj (f->ops, op); } } } From fe5c0a0f7508e6bb80ca6fb28f917d15844342a4 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Oct 2016 16:00:56 -0700 Subject: [PATCH 25/25] test/kvs: add coverage for kvs_watch_once_dir() Add a count argument to flux-kvs watch-dir, then drive it from t1000-kvs-basic.t. This is one of the areas that coveralls noted had no test coverage. --- src/cmd/flux-kvs.c | 15 ++++++++++++--- t/t1000-kvs-basic.t | 8 ++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index 26ed02df9777..2e44ac584397 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -83,7 +83,7 @@ void usage (void) " flux-kvs mkdir key [key...]\n" " flux-kvs exists key\n" " flux-kvs watch key\n" -" flux-kvs watch-dir [-r] key\n" +" flux-kvs watch-dir [-r] [count] key\n" " flux-kvs copy-tokvs key file\n" " flux-kvs copy-fromkvs key file\n" " flux-kvs copy srckey dstkey\n" @@ -572,12 +572,18 @@ void cmd_watch_dir (flux_t h, int argc, char **argv) char *key; kvsdir_t *dir = NULL; int rc; + int count = -1; if (argc > 0 && !strcmp (argv[0], "-r")) { ropt = true; argc--; argv++; } + if (argc == 2) { + count = strtoul (argv[0], NULL, 10); + argc--; + argv++; + } if (argc != 1) log_msg_exit ("watchdir: specify one directory"); key = argv[0]; @@ -591,13 +597,16 @@ void cmd_watch_dir (flux_t h, int argc, char **argv) dir = NULL; } else { dump_kvs_dir (dir, ropt); - kvsdir_destroy (dir); printf ("======================\n"); + fflush (stdout); } + if (--count == 0) + goto done; rc = kvs_watch_once_dir (h, &dir, "%s", key); } log_err_exit ("%s", key); - +done: + kvsdir_destroy (dir); } void cmd_dir (flux_t h, int argc, char **argv) diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 92c9ecfe6617..e3960ab1eae2 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -488,6 +488,14 @@ test_expect_success 'kvs: 8 threads/rank each doing 100 put,fence in a loop' ' # watch tests +test_expect_success 'kvs: watch 5 versions of directory' ' + flux kvs unlink $TEST.foo && + flux kvs watch-dir -r 5 $TEST.foo >watch_out & + while $(grep -s '===============' watch_out | wc -l) -lt 5; do + flux kvs put $TEST.foo.a=$(date +%N); \ + done +' + test_expect_success 'kvs: watch-mt: multi-threaded kvs watch program' ' ${FLUX_BUILD_DIR}/t/kvs/watch mt 100 100 $TEST.a && flux kvs unlink $TEST.a