Skip to content

Commit

Permalink
modules/kvs-watch: Support FLUX_KVS_WATCH_PEDANTIC
Browse files Browse the repository at this point in the history
Support new flag that supports the more classic "kvs.watch" semantics.
In particular, instead of watching for specific changes of a key,
lookup a key on every setroot change to verify if the value has
changed.

Fixes flux-framework#1653
  • Loading branch information
chu11 committed Nov 19, 2018
1 parent 4233ed2 commit 85875e0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum kvs_op {
FLUX_KVS_WATCH_WAITCREATE = 8,
FLUX_KVS_TREEOBJ = 16,
FLUX_KVS_APPEND = 32,
FLUX_KVS_WATCH_PEDANTIC = 64
};

typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t;
Expand Down
4 changes: 4 additions & 0 deletions src/common/libkvs/kvs_lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ static int validate_lookup_flags (int flags, bool watch_ok)
if (flags & FLUX_KVS_WATCH_WAITCREATE
&& !(flags & FLUX_KVS_WATCH))
return -1;
if (flags & FLUX_KVS_WATCH_PEDANTIC
&& !(flags & FLUX_KVS_WATCH))
return -1;

flags &= ~FLUX_KVS_WATCH;
flags &= ~FLUX_KVS_WATCH_WAITCREATE;
flags &= ~FLUX_KVS_WATCH_PEDANTIC;
switch (flags) {
case 0:
case FLUX_KVS_TREEOBJ:
Expand Down
82 changes: 78 additions & 4 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct watcher {
zlist_t *lookups; // list of futures, in commit order

struct namespace *ns; // back pointer for removal
void *prev; // previous watch for KVS_WATCH_PEDANTIC
int prev_len; // previous watch len for KVS_WATCH_PEDANTIC
};

/* Current KVS root.
Expand Down Expand Up @@ -103,6 +105,7 @@ static void watcher_destroy (struct watcher *w)
flux_future_destroy (f);
zlist_destroy (&w->lookups);
}
free (w->prev);
free (w);
errno = saved_errno;
}
Expand Down Expand Up @@ -252,9 +255,70 @@ static bool array_match (json_t *a, const char *key)
return false;
}

static int handle_pedantic_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (!w->responded) {
/* this is the first response case, simply store the first
* data */
if (!(w->prev = malloc (len))) {
errno = ENOMEM;
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}
else {
/* not first response case, compare to previous to see if
* respond should be done, update data if necessary */
if (w->prev_len == len
&& !memcmp (w->prev, data, len))
return 0;

if (len > w->prev_len) {
if (!(w->prev = realloc (w->prev, len)))
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}

w->responded = true;
return 0;
}

static int handle_normal_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}

w->responded = true;
return 0;
}

/* New value of key is available in future 'f' container.
* Send response to watcher using raw payload from lookup response.
* Return 0 on success, -1 on error (caller should destroy watcher).
*
* Exception for FLUX_KVS_WATCH_PEDANTIC, must check if value is
* different than old value.
*/
static int handle_lookup_response (flux_future_t *f, struct watcher *w)
{
Expand All @@ -270,9 +334,14 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w)
goto error;
}
if (!w->mute) {
if (flux_respond_raw (h, w->request, data, len) < 0)
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
w->responded = true;
if (w->flags & FLUX_KVS_WATCH_PEDANTIC) {
if (handle_pedantic_response (h, w, data, len) < 0)
goto error;
}
else {
if (handle_normal_response (h, w, data, len) < 0)
goto error;
}
}
return 0;
error:
Expand Down Expand Up @@ -416,8 +485,13 @@ static void watcher_respond (struct namespace *ns, struct watcher *w)
* kvs.lookupat request with the requestor's creds, in case the key lookup
* traverses to a new namespace. Leave it up to the KVS module to ensure
* the requestor is permitted to access *that* namespace.
*
* Note on FLUX_KVS_WATCH_PEDANTIC: A lookup / comparison is done on every
* change.
*/
else if (w->rootseq == -1 || array_match (ns->commit->keys, w->key)) {
else if (w->rootseq == -1
|| (w->flags & FLUX_KVS_WATCH_PEDANTIC)
|| array_match (ns->commit->keys, w->key)) {
flux_future_t *f;
if (!(f = lookupat (ns->ctx->h,
w->flags,
Expand Down

0 comments on commit 85875e0

Please sign in to comment.