Skip to content

Commit

Permalink
modules/kvs-watch: Support FLUX_KVS_WATCH_FULL
Browse files Browse the repository at this point in the history
Support new flag that supports "kvs.watch" semantics.  In particular,
instead of watching for specific writes to a key, lookup a key on every
setroot change and see if the value has changed.

Fixes flux-framework#1653
  • Loading branch information
chu11 committed Nov 20, 2018
1 parent 4233ed2 commit 95df254
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum kvs_op {
FLUX_KVS_WATCH_WAITCREATE = 8,
FLUX_KVS_TREEOBJ = 16,
FLUX_KVS_APPEND = 32,
FLUX_KVS_WATCH_FULL = 64
};

typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t;
Expand Down
4 changes: 4 additions & 0 deletions src/common/libkvs/kvs_lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ static int validate_lookup_flags (int flags, bool watch_ok)
if (flags & FLUX_KVS_WATCH_WAITCREATE
&& !(flags & FLUX_KVS_WATCH))
return -1;
if (flags & FLUX_KVS_WATCH_FULL
&& !(flags & FLUX_KVS_WATCH))
return -1;

flags &= ~FLUX_KVS_WATCH;
flags &= ~FLUX_KVS_WATCH_WAITCREATE;
flags &= ~FLUX_KVS_WATCH_FULL;
switch (flags) {
case 0:
case FLUX_KVS_TREEOBJ:
Expand Down
82 changes: 78 additions & 4 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct watcher {
zlist_t *lookups; // list of futures, in commit order

struct namespace *ns; // back pointer for removal
void *prev; // previous watch for KVS_WATCH_FULL
int prev_len; // previous watch len for KVS_WATCH_FULL
};

/* Current KVS root.
Expand Down Expand Up @@ -103,6 +105,7 @@ static void watcher_destroy (struct watcher *w)
flux_future_destroy (f);
zlist_destroy (&w->lookups);
}
free (w->prev);
free (w);
errno = saved_errno;
}
Expand Down Expand Up @@ -252,9 +255,70 @@ static bool array_match (json_t *a, const char *key)
return false;
}

static int handle_full_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (!w->responded) {
/* this is the first response case, simply store the first
* data */
if (!(w->prev = malloc (len))) {
errno = ENOMEM;
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}
else {
/* not first response case, compare to previous to see if
* respond should be done, update data if necessary */
if (w->prev_len == len
&& !memcmp (w->prev, data, len))
return 0;

if (len > w->prev_len) {
if (!(w->prev = realloc (w->prev, len)))
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}

w->responded = true;
return 0;
}

static int handle_normal_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}

w->responded = true;
return 0;
}

/* New value of key is available in future 'f' container.
* Send response to watcher using raw payload from lookup response.
* Return 0 on success, -1 on error (caller should destroy watcher).
*
* Exception for FLUX_KVS_WATCH_FULL, must check if value is
* different than old value.
*/
static int handle_lookup_response (flux_future_t *f, struct watcher *w)
{
Expand All @@ -270,9 +334,14 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w)
goto error;
}
if (!w->mute) {
if (flux_respond_raw (h, w->request, data, len) < 0)
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
w->responded = true;
if (w->flags & FLUX_KVS_WATCH_FULL) {
if (handle_full_response (h, w, data, len) < 0)
goto error;
}
else {
if (handle_normal_response (h, w, data, len) < 0)
goto error;
}
}
return 0;
error:
Expand Down Expand Up @@ -416,8 +485,13 @@ static void watcher_respond (struct namespace *ns, struct watcher *w)
* kvs.lookupat request with the requestor's creds, in case the key lookup
* traverses to a new namespace. Leave it up to the KVS module to ensure
* the requestor is permitted to access *that* namespace.
*
* Note on FLUX_KVS_WATCH_FULL: A lookup / comparison is done on every
* change.
*/
else if (w->rootseq == -1 || array_match (ns->commit->keys, w->key)) {
else if (w->rootseq == -1
|| (w->flags & FLUX_KVS_WATCH_FULL)
|| array_match (ns->commit->keys, w->key)) {
flux_future_t *f;
if (!(f = lookupat (ns->ctx->h,
w->flags,
Expand Down

0 comments on commit 95df254

Please sign in to comment.