diff --git a/doc/man1/flux-kvs.adoc b/doc/man1/flux-kvs.adoc index 0a366dfeb0d2..59cc304ea24e 100644 --- a/doc/man1/flux-kvs.adoc +++ b/doc/man1/flux-kvs.adoc @@ -68,7 +68,7 @@ Remove a kvs namespace. *namespace-list*:: List all current namespaces and info on each namespace. -*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-f] [-c count] 'key' ['key...']:: +*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-u] [-f] [-c count] 'key' ['key...']:: Retrieve the value stored under 'key'. If nothing has been stored under 'key', display an error message. If no options, value is displayed with a newline appended (if value length is nonzero). If '-l', a 'key=' prefix is @@ -79,10 +79,11 @@ snapshot reference. If '-w', after the initial value, display the new value each time the key is written to until interrupted, or if '-c count' is specified, until 'count' values have been displayed. If the initial value does not yet exist, `-W` can be specified to wait for it -to be created. By default, only a direct write to a key is monitored, -which may miss several unique situations, such as the replacement of -an entire parent directory. The '-f' option can be specified to -monitor for many of these unique situations. +to be created. If '-u' is specified, only writes that change the key +value will be displayed. By default, only a direct write to a key is +monitored, which may miss several unique situations, such as the +replacement of an entire parent directory. The '-f' option can be +specified to monitor for many of these special situations. *put* [-j|-r|-t] [-n] [-A] 'key=value' ['key=value...']:: Store 'value' under 'key' and commit it. If it already has a value, diff --git a/src/cmd/flux-kvs.c b/src/cmd/flux-kvs.c index c02804b71926..debe4a8df952 100644 --- a/src/cmd/flux-kvs.c +++ b/src/cmd/flux-kvs.c @@ -102,11 +102,14 @@ static struct optparse_option get_opts[] = { .usage = "Print key= before value", }, { .name = "watch", .key = 'w', .has_arg = 0, - .usage = "Monitor key changes", + .usage = "Monitor key writes", }, { .name = "waitcreate", .key = 'W', .has_arg = 0, .usage = "Wait for creation to occur on watch", }, + { .name = "uniq", .key = 'u', .has_arg = 0, + .usage = "Only monitor key writes if values have changed", + }, { .name = "full", .key = 'f', .has_arg = 0, .usage = "Monitor key changes with more complete accuracy", }, @@ -692,6 +695,8 @@ void cmd_get_one (flux_t *h, const char *key, struct lookup_ctx *ctx) flags |= FLUX_KVS_WATCH_WAITCREATE; if (optparse_hasopt (ctx->p, "full")) flags |= FLUX_KVS_WATCH_FULL; + if (optparse_hasopt (ctx->p, "uniq")) + flags |= FLUX_KVS_WATCH_UNIQ; } if (optparse_hasopt (ctx->p, "at")) { const char *reference = optparse_get_str (ctx->p, "at", ""); diff --git a/src/common/libkvs/kvs.h b/src/common/libkvs/kvs.h index c57483cbc7c8..8c9423466bb1 100644 --- a/src/common/libkvs/kvs.h +++ b/src/common/libkvs/kvs.h @@ -50,7 +50,8 @@ enum kvs_op { FLUX_KVS_WATCH_WAITCREATE = 8, FLUX_KVS_TREEOBJ = 16, FLUX_KVS_APPEND = 32, - FLUX_KVS_WATCH_FULL = 64 + FLUX_KVS_WATCH_FULL = 64, + FLUX_KVS_WATCH_UNIQ = 128 }; typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t; diff --git a/src/common/libkvs/kvs_lookup.c b/src/common/libkvs/kvs_lookup.c index 4b6869e2d1a9..1beb7cde1fce 100644 --- a/src/common/libkvs/kvs_lookup.c +++ b/src/common/libkvs/kvs_lookup.c @@ -53,6 +53,10 @@ struct lookup_ctx { static const char *auxkey = "flux::lookup_ctx"; +#define FLUX_KVS_WATCH_FLAGS (FLUX_KVS_WATCH_WAITCREATE \ + | FLUX_KVS_WATCH_FULL \ + | FLUX_KVS_WATCH_UNIQ) + static void free_ctx (struct lookup_ctx *ctx) { if (ctx) { @@ -88,16 +92,14 @@ static int validate_lookup_flags (int flags, bool watch_ok) { if ((flags & FLUX_KVS_WATCH) && !watch_ok) return -1; - if (flags & FLUX_KVS_WATCH_WAITCREATE - && !(flags & FLUX_KVS_WATCH)) - return -1; - if (flags & FLUX_KVS_WATCH_FULL + if (flags & FLUX_KVS_WATCH_FLAGS && !(flags & FLUX_KVS_WATCH)) return -1; flags &= ~FLUX_KVS_WATCH; flags &= ~FLUX_KVS_WATCH_WAITCREATE; flags &= ~FLUX_KVS_WATCH_FULL; + flags &= ~FLUX_KVS_WATCH_UNIQ; switch (flags) { case 0: case FLUX_KVS_TREEOBJ: diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 70ab336a735b..c173d16c37a0 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -255,10 +255,10 @@ static bool array_match (json_t *a, const char *key) return false; } -static int handle_full_response (flux_t *h, - struct watcher *w, - const void *data, - int len) +static int handle_compare_response (flux_t *h, + struct watcher *w, + const void *data, + int len) { if (!w->responded) { /* this is the first response case, simply store the first @@ -334,8 +334,9 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w) goto error; } if (!w->mute) { - if (w->flags & FLUX_KVS_WATCH_FULL) { - if (handle_full_response (h, w, data, len) < 0) + if (w->flags & FLUX_KVS_WATCH_FULL + || w->flags & FLUX_KVS_WATCH_UNIQ) { + if (handle_compare_response (h, w, data, len) < 0) goto error; } else { diff --git a/t/t1007-kvs-lookup-watch.t b/t/t1007-kvs-lookup-watch.t index 49b2098d85c1..8210b890749a 100755 --- a/t/t1007-kvs-lookup-watch.t +++ b/t/t1007-kvs-lookup-watch.t @@ -100,6 +100,31 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get --watch sees duplicate commited wait $pid ' +test_expect_success NO_CHAIN_LINT 'flux kvs get --watch and --uniq do not see duplicate commited values' ' + flux kvs put test.f=1 && + + flux kvs get --count=3 --watch --uniq test.f >seq4.out & + pid=$! && + $waitfile --count=1 --timeout=10 \ + --pattern="[0-9]+" seq4.out >/dev/null && + for i in $(seq 2 20); \ + do flux kvs put --no-merge test.f=1; \ + done && + for i in $(seq 2 20); \ + do flux kvs put --no-merge test.f=2; \ + done && + for i in $(seq 2 20); \ + do flux kvs put --no-merge test.f=3; \ + done && + cat >expected <<-EOF && +1 +2 +3 + EOF + wait $pid && + test_cmp expected seq4.out +' + # in --watch & --waitcreate tests, call wait_watcherscount_nonzero to # ensure background watcher has started, otherwise test can be racy