Skip to content

Commit

Permalink
modules/kvs: send keys[] with setroot event
Browse files Browse the repository at this point in the history
Include an array of (de-duplicated) keys that were modified
by the commit in the kvs.setroot event.  This prepares the
way for more efficient kvs_watch() handling as described in flux-framework#803.

Update test for kvs.setroot codec.
  • Loading branch information
garlick committed Sep 19, 2016
1 parent 481a70c commit a7da268
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 16 deletions.
52 changes: 47 additions & 5 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ typedef struct {
href_t newroot;
} fence_t;

static int setroot_event_send (ctx_t *ctx, json_object *names);
static int setroot_event_send (ctx_t *ctx, json_object *names,
json_object *ops);
static int error_event_send (ctx_t *ctx, json_object *names, int errnum);
void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
Expand Down Expand Up @@ -604,7 +605,7 @@ static void commit_apply_fence (fence_t *f)
count, opcount);
}
setroot (ctx, f->newroot, ctx->rootseq + 1);
setroot_event_send (ctx, f->names);
setroot_event_send (ctx, f->names, f->ops);
} else {
flux_log (ctx->h, LOG_ERR, "commit failed: %s",
flux_strerror (f->errnum));
Expand Down Expand Up @@ -1513,6 +1514,7 @@ static void setroot_event_cb (flux_t h, flux_msg_handler_t *w,
const char *rootdir;
JSON root = NULL;
JSON names = NULL;
JSON keys = NULL;

if (flux_event_decode (msg, NULL, &json_str) < 0) {
flux_log_error (ctx->h, "%s: flux_event_decode", __FUNCTION__);
Expand All @@ -1523,7 +1525,7 @@ static void setroot_event_cb (flux_t h, flux_msg_handler_t *w,
flux_log_error (ctx->h, "%s: json decode", __FUNCTION__);
goto done;
}
if (kp_tsetroot_dec (out, &rootseq, &rootdir, &root, &names) < 0) {
if (kp_tsetroot_dec (out, &rootseq, &rootdir, &root, &names, &keys) < 0) {
flux_log_error (ctx->h, "%s: kp_tsetroot_dec", __FUNCTION__);
goto done;
}
Expand Down Expand Up @@ -1551,18 +1553,57 @@ static void setroot_event_cb (flux_t h, flux_msg_handler_t *w,
Jput (out);
}

static int setroot_event_send (ctx_t *ctx, json_object *names)
static bool key_is_duplicate (json_object *keys, const char *key)
{
int i, len;
const char *k;
if (Jget_ar_len (keys, &len)) {
for (i = 0; i < len; i++) {
if (Jget_ar_str (keys, i, &k) && !strcmp (k, key))
return true;
}
}
return false;
}

static json_object *ops_to_keys (json_object *ops)
{
int i, len;
JSON keys = Jnew_ar();

if (!Jget_ar_len (ops, &len))
goto error;
for (i = 0; i < len; i++) {
JSON op;
const char *key;
if (!Jget_ar_obj (ops, i, &op) || !Jget_str (op, "key", &key))
goto error;
if (!key_is_duplicate (keys, key))
Jadd_ar_str (keys, key);
}
return keys;
error:
Jput (keys);
return NULL;
}

static int setroot_event_send (ctx_t *ctx, json_object *names, json_object *ops)
{
JSON in = NULL;
JSON root = NULL;
JSON keys = NULL;
flux_msg_t *msg = NULL;
int rc = -1;

if (!(keys = ops_to_keys (ops))) {
errno = EINVAL;
goto done;
}
if (event_includes_rootdir) {
bool stall = !load (ctx, ctx->rootdir, NULL, &root);
FASSERT (ctx->h, stall == false);
}
if (!(in = kp_tsetroot_enc (ctx->rootseq, ctx->rootdir, root, names)))
if (!(in = kp_tsetroot_enc (ctx->rootseq, ctx->rootdir, root, names, keys)))
goto done;
if (!(msg = flux_event_encode ("kvs.setroot", Jtostr (in))))
goto done;
Expand All @@ -1571,6 +1612,7 @@ static int setroot_event_send (ctx_t *ctx, json_object *names)
rc = 0;
done:
Jput (in);
Jput (keys);
flux_msg_destroy (msg);
return rc;
}
Expand Down
12 changes: 7 additions & 5 deletions src/modules/kvs/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,36 +271,38 @@ int kp_rgetroot_dec (JSON o, int *rootseq, const char **rootdir)
*/

JSON kp_tsetroot_enc (int rootseq, const char *rootdir, JSON root,
JSON names)
JSON names, JSON keys)
{
JSON o = NULL;
int n;

if (!rootdir || !names || !Jget_ar_len (names, &n) || n < 1) {
if (!rootdir || !names || !keys
|| !Jget_ar_len (names, &n) || n < 1) {
errno = EINVAL;
goto done;
}
o = Jnew ();
Jadd_int (o, "rootseq", rootseq);
Jadd_str (o, "rootdir", rootdir);
Jadd_obj (o, "names", names); /* takes a ref */
Jadd_obj (o, "keys", keys); /* takes a ref */
if (root)
Jadd_obj (o, "rootdirval", root); /* takes a ref */
done:
return o;
}

int kp_tsetroot_dec (JSON o, int *rootseq, const char **rootdir,
JSON *root, JSON *names)
JSON *root, JSON *names, JSON *keys)
{
int rc = -1;

if (!o || !rootseq || !rootdir || !root || !names) {
if (!o || !rootseq || !rootdir || !root || !names | !keys) {
errno = EINVAL;
goto done;
}
if (!Jget_int (o, "rootseq", rootseq) || !Jget_str (o, "rootdir", rootdir)
|| !Jget_obj (o, "names", names)) {
|| !Jget_obj (o, "names", names) || !Jget_obj (o, "keys", keys)) {
errno = EPROTO;
goto done;
}
Expand Down
6 changes: 4 additions & 2 deletions src/modules/kvs/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ int kp_rgetroot_dec (json_object *o, int *rootseq, const char **rootdir);
/* kvs.setroot (event)
*/
json_object *kp_tsetroot_enc (int rootseq, const char *rootdir,
json_object *root, json_object *names);
json_object *root, json_object *names,
json_object *keys);
int kp_tsetroot_dec (json_object *o, int *rootseq, const char **rootdir,
json_object **root, json_object **names);
json_object **root, json_object **names,
json_object **keys);

/* kvs.error (event)
*/
Expand Down
13 changes: 9 additions & 4 deletions src/modules/kvs/test/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,25 @@ void test_setroot (void)
JSON o;
const char *rootdir, *name;
int rootseq;
JSON root, names;
const char *key;
JSON root, names, keys;

names = Jnew_ar ();
Jadd_ar_str (names, "foo");
ok ((o = kp_tsetroot_enc (42, "abc", NULL, names)) != NULL,
keys = Jnew_ar ();
Jadd_ar_str (keys, "a.b.c");
ok ((o = kp_tsetroot_enc (42, "abc", NULL, names, keys)) != NULL,
"kp_tsetroot_enc works");
Jput (names);
Jput (keys);

diag ("setroot: %s", Jtostr (o));

ok (kp_tsetroot_dec (o, &rootseq, &rootdir, &root, &names) == 0
ok (kp_tsetroot_dec (o, &rootseq, &rootdir, &root, &names, &keys) == 0
&& rootseq == 42 && rootdir != NULL && !strcmp (rootdir, "abc")
&& root == NULL && names != NULL && Jget_ar_str (names, 0, &name)
&& !strcmp (name, "foo"),
&& keys != NULL && Jget_ar_str (keys, 0, &key)
&& !strcmp (key, "a.b.c"),
"kp_tsetroot_dec works");
Jput (o);
}
Expand Down

0 comments on commit a7da268

Please sign in to comment.