From 95df254b623bb48053b65307d262e6b80009633c Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 16 Nov 2018 12:53:49 -0800 Subject: [PATCH] modules/kvs-watch: Support FLUX_KVS_WATCH_FULL Support new flag that supports "kvs.watch" semantics. In particular, instead of watching for specific writes to a key, lookup a key on every setroot change and see if the value has changed. Fixes #1653 --- src/common/libkvs/kvs.h | 1 + src/common/libkvs/kvs_lookup.c | 4 ++ src/modules/kvs-watch/kvs-watch.c | 82 +++++++++++++++++++++++++++++-- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/common/libkvs/kvs.h b/src/common/libkvs/kvs.h index dc3f4b952356..c57483cbc7c8 100644 --- a/src/common/libkvs/kvs.h +++ b/src/common/libkvs/kvs.h @@ -50,6 +50,7 @@ enum kvs_op { FLUX_KVS_WATCH_WAITCREATE = 8, FLUX_KVS_TREEOBJ = 16, FLUX_KVS_APPEND = 32, + FLUX_KVS_WATCH_FULL = 64 }; typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t; diff --git a/src/common/libkvs/kvs_lookup.c b/src/common/libkvs/kvs_lookup.c index 69d30dfac3d7..4b6869e2d1a9 100644 --- a/src/common/libkvs/kvs_lookup.c +++ b/src/common/libkvs/kvs_lookup.c @@ -91,9 +91,13 @@ static int validate_lookup_flags (int flags, bool watch_ok) if (flags & FLUX_KVS_WATCH_WAITCREATE && !(flags & FLUX_KVS_WATCH)) return -1; + if (flags & FLUX_KVS_WATCH_FULL + && !(flags & FLUX_KVS_WATCH)) + return -1; flags &= ~FLUX_KVS_WATCH; flags &= ~FLUX_KVS_WATCH_WAITCREATE; + flags &= ~FLUX_KVS_WATCH_FULL; switch (flags) { case 0: case FLUX_KVS_TREEOBJ: diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 187dab91b56e..70ab336a735b 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -52,6 +52,8 @@ struct watcher { zlist_t *lookups; // list of futures, in commit order struct namespace *ns; // back pointer for removal + void *prev; // previous watch for KVS_WATCH_FULL + int prev_len; // previous watch len for KVS_WATCH_FULL }; /* Current KVS root. @@ -103,6 +105,7 @@ static void watcher_destroy (struct watcher *w) flux_future_destroy (f); zlist_destroy (&w->lookups); } + free (w->prev); free (w); errno = saved_errno; } @@ -252,9 +255,70 @@ static bool array_match (json_t *a, const char *key) return false; } +static int handle_full_response (flux_t *h, + struct watcher *w, + const void *data, + int len) +{ + if (!w->responded) { + /* this is the first response case, simply store the first + * data */ + if (!(w->prev = malloc (len))) { + errno = ENOMEM; + return -1; + } + memcpy (w->prev, data, len); + w->prev_len = len; + + if (flux_respond_raw (h, w->request, data, len) < 0) { + flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); + return -1; + } + } + else { + /* not first response case, compare to previous to see if + * respond should be done, update data if necessary */ + if (w->prev_len == len + && !memcmp (w->prev, data, len)) + return 0; + + if (len > w->prev_len) { + if (!(w->prev = realloc (w->prev, len))) + return -1; + } + memcpy (w->prev, data, len); + w->prev_len = len; + + if (flux_respond_raw (h, w->request, data, len) < 0) { + flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); + return -1; + } + } + + w->responded = true; + return 0; +} + +static int handle_normal_response (flux_t *h, + struct watcher *w, + const void *data, + int len) +{ + if (flux_respond_raw (h, w->request, data, len) < 0) { + flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); + return -1; + } + + w->responded = true; + return 0; +} + /* New value of key is available in future 'f' container. * Send response to watcher using raw payload from lookup response. * Return 0 on success, -1 on error (caller should destroy watcher). + * + * Exception for FLUX_KVS_WATCH_FULL, must check if value is + * different than old value. */ static int handle_lookup_response (flux_future_t *f, struct watcher *w) { @@ -270,9 +334,14 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w) goto error; } if (!w->mute) { - if (flux_respond_raw (h, w->request, data, len) < 0) - flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); - w->responded = true; + if (w->flags & FLUX_KVS_WATCH_FULL) { + if (handle_full_response (h, w, data, len) < 0) + goto error; + } + else { + if (handle_normal_response (h, w, data, len) < 0) + goto error; + } } return 0; error: @@ -416,8 +485,13 @@ static void watcher_respond (struct namespace *ns, struct watcher *w) * kvs.lookupat request with the requestor's creds, in case the key lookup * traverses to a new namespace. Leave it up to the KVS module to ensure * the requestor is permitted to access *that* namespace. + * + * Note on FLUX_KVS_WATCH_FULL: A lookup / comparison is done on every + * change. */ - else if (w->rootseq == -1 || array_match (ns->commit->keys, w->key)) { + else if (w->rootseq == -1 + || (w->flags & FLUX_KVS_WATCH_FULL) + || array_match (ns->commit->keys, w->key)) { flux_future_t *f; if (!(f = lookupat (ns->ctx->h, w->flags,