Skip to content

Commit

Permalink
modules/kvs: kvs.get allows send/recv of treeobj
Browse files Browse the repository at this point in the history
Allow an optional root dirent to accompany a kvs.get
request.  If present, key will be looked up relative to
this directory.  It is an EINVAL error if the dirent
is invalid or not a directory reference.

In addition, unconditionally return the same root dirent,
if set, or the root dirent if not, in the kvs.get response.

This allows API's to be developed that allow a static
"version" or snapshot of the KVS namespace or a portion of it
to be traversed.

Update kvs.get codec and its unit test.
  • Loading branch information
garlick committed Sep 28, 2016
1 parent 927ac17 commit cdb339f
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 42 deletions.
35 changes: 27 additions & 8 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,8 @@ static bool lookup (ctx_t *ctx, json_object *root, wait_t *wait,
json_object *vp, *dirent, *val = NULL;
int errnum = 0;

if (root == NULL) {
if (!load (ctx, ctx->rootdir, wait, &root))
goto stall;
}
assert (root != NULL);

if (!strcmp (name, ".")) { /* special case root */
if ((flags & KVS_PROTO_TREEOBJ)) {
val = dirent_create ("DIRREF", ctx->rootdir);
Expand Down Expand Up @@ -874,6 +872,10 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w,
int flags;
const char *key;
JSON val;
JSON root = NULL;
JSON root_dirent = NULL;
JSON tmp_dirent = NULL;
const char *root_ref = ctx->rootdir;
wait_t *wait = NULL;
int lookup_errnum = 0;
int rc = -1;
Expand All @@ -884,11 +886,24 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w,
errno = EPROTO;
goto done;
}
if (kp_tget_dec (in, &key, &flags) < 0)
if (kp_tget_dec (in, &root_dirent, &key, &flags) < 0)
goto done;
if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, arg)))
goto done;
if (!lookup (ctx, NULL, wait, flags, key, &val, &lookup_errnum))
/* If root dirent was specified, lookup corresponding 'root' directory.
* Otherwise, use the current root.
*/
if (root_dirent) {
if (!Jget_str (root_dirent, "DIRREF", &root_ref)) {
errno = EINVAL;
goto done;
}
} else {
root_dirent = tmp_dirent = dirent_create ("DIRREF", ctx->rootdir);
}
if (!load (ctx, root_ref, wait, &root))
goto stall;
if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum))
goto stall;
if (lookup_errnum != 0) {
errno = lookup_errnum;
Expand All @@ -898,7 +913,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w,
errno = ENOENT;
goto done;
}
if (!(out = kp_rget_enc (val)))
if (!(out = kp_rget_enc (root_dirent, val)))
goto done;
rc = 0;
done:
Expand All @@ -909,6 +924,7 @@ static void get_request_cb (flux_t h, flux_msg_handler_t *w,
stall:
Jput (in);
Jput (out);
Jput (tmp_dirent);
}

static bool compare_json (json_object *o1, json_object *o2)
Expand All @@ -928,6 +944,7 @@ static void watch_request_cb (flux_t h, flux_msg_handler_t *w,
JSON in2 = NULL;
JSON out = NULL;
JSON oval, val = NULL;
JSON root = NULL;
flux_msg_t *cpy = NULL;
const char *key;
int flags;
Expand All @@ -946,7 +963,9 @@ static void watch_request_cb (flux_t h, flux_msg_handler_t *w,
goto done;
if (!(wait = wait_create_msg_handler (h, w, msg, watch_request_cb, arg)))
goto done;
if (!lookup (ctx, NULL, wait, flags, key, &val, &lookup_errnum))
if (!load (ctx, ctx->rootdir, wait, &root))
goto stall;
if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum))
goto stall;
if (lookup_errnum) {
errno = lookup_errnum;
Expand Down
29 changes: 15 additions & 14 deletions src/modules/kvs/libkvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ char *kvsdir_key_at (kvsdir_t *dir, const char *name)
** GET
**/

static int getobj (flux_t h, const char *key, int flags, json_object **val)
static int getobj (flux_t h, json_object *rootdir, const char *key,
int flags, json_object **val)
{
flux_rpc_t *rpc = NULL;
const char *json_str;
Expand All @@ -341,7 +342,7 @@ static int getobj (flux_t h, const char *key, int flags, json_object **val)
goto done;
}
k = pathcat (kvs_getcwd (h), key);
if (!(in = kp_tget_enc (k, flags)))
if (!(in = kp_tget_enc (rootdir, k, flags)))
goto done;
if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0)))
goto done;
Expand All @@ -351,7 +352,7 @@ static int getobj (flux_t h, const char *key, int flags, json_object **val)
errno = EPROTO;
goto done;
}
if (kp_rget_dec (out, &v) < 0)
if (kp_rget_dec (out, NULL, &v) < 0)
goto done;
if (val)
*val = Jget (v);
Expand All @@ -371,7 +372,7 @@ int kvs_get (flux_t h, const char *key, char **val)
{
JSON v;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
return -1;
if (val)
*val = xstrdup (Jtostr (v));
Expand All @@ -382,7 +383,7 @@ int kvs_get (flux_t h, const char *key, char **val)
/* deprecated */
int kvs_get_obj (flux_t h, const char *key, JSON *val)
{
return getobj (h, key, 0, val);
return getobj (h, NULL, key, 0, val);
}

int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...)
Expand All @@ -405,7 +406,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...)
key = xvasprintf (fmt, ap);
va_end (ap);
k = pathcat (kvs_getcwd (h), key);
if (!(in = kp_tget_enc (k, KVS_PROTO_READDIR)))
if (!(in = kp_tget_enc (NULL, k, KVS_PROTO_READDIR)))
goto done;
if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0)))
goto done;
Expand Down Expand Up @@ -435,7 +436,7 @@ int kvs_get_symlink (flux_t h, const char *key, char **val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, KVS_PROTO_READLINK, &v) < 0)
if (getobj (h, NULL, key, KVS_PROTO_READLINK, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_string) {
errno = EPROTO;
Expand All @@ -455,7 +456,7 @@ int kvs_get_treeobj (flux_t h, const char *key, char **val)
const char *s;
int rc = -1;

if (getobj (h, key, KVS_PROTO_TREEOBJ, &v) < 0)
if (getobj (h, NULL, key, KVS_PROTO_TREEOBJ, &v) < 0)
goto done;
if (val) {
s = json_object_to_json_string_ext (v, JSON_C_TO_STRING_PLAIN);
Expand All @@ -472,7 +473,7 @@ int kvs_get_string (flux_t h, const char *key, char **val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_string) {
errno = EPROTO;
Expand All @@ -491,7 +492,7 @@ int kvs_get_int (flux_t h, const char *key, int *val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_int) {
errno = EPROTO;
Expand All @@ -510,7 +511,7 @@ int kvs_get_int64 (flux_t h, const char *key, int64_t *val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_int) {
errno = EPROTO;
Expand All @@ -529,7 +530,7 @@ int kvs_get_double (flux_t h, const char *key, double *val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_double) {
errno = EPROTO;
Expand All @@ -548,7 +549,7 @@ int kvs_get_boolean (flux_t h, const char *key, bool *val)
JSON v = NULL;
int rc = -1;

if (getobj (h, key, 0, &v) < 0)
if (getobj (h, NULL, key, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_boolean) {
errno = EPROTO;
Expand Down Expand Up @@ -1651,7 +1652,7 @@ int kvsdir_unlink (kvsdir_t *dir, const char *name)
int kvs_copy (flux_t h, const char *from, const char *to)
{
JSON dirent;
if (getobj (h, from, KVS_PROTO_TREEOBJ, &dirent) < 0)
if (getobj (h, NULL, from, KVS_PROTO_TREEOBJ, &dirent) < 0)
return -1;
if (kvs_put_dirent (h, to, dirent) < 0) {
Jput (dirent);
Expand Down
26 changes: 15 additions & 11 deletions src/modules/kvs/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
/* kvs.get
*/

JSON kp_tget_enc (const char *key, int flags)
JSON kp_tget_enc (JSON rootdir, const char *key, int flags)
{
JSON o = NULL;

Expand All @@ -53,13 +53,15 @@ JSON kp_tget_enc (const char *key, int flags)
goto done;
}
o = Jnew ();
if (rootdir)
Jadd_obj (o, "rootdir", rootdir); /* takes a ref on rootdir */
Jadd_str (o, "key", key);
Jadd_int (o, "flags", flags);
done:
return o;
}

int kp_tget_dec (JSON o, const char **key, int *flags)
int kp_tget_dec (JSON o, JSON *rootdir, const char **key, int *flags)
{
int rc = -1;

Expand All @@ -71,34 +73,36 @@ int kp_tget_dec (JSON o, const char **key, int *flags)
errno = EPROTO;
goto done;
}
if (rootdir)
*rootdir = Jobj_get (o, "rootdir");
rc = 0;
done:
return rc;
}

JSON kp_rget_enc (JSON val)
JSON kp_rget_enc (JSON rootdir, JSON val)
{
JSON o = NULL;

o = Jnew ();
json_object_object_add (o, "rootdir", rootdir);
json_object_object_add (o, "val", val);
return o;
}

int kp_rget_dec (JSON o, JSON *val)
int kp_rget_dec (JSON o, JSON *rootdir, JSON *val)
{
int rc = -1;
JSON v = NULL;
JSON v, r;

if (!o || !val) {
if (!o || !(v = Jobj_get (o, "val")) || !(r = Jobj_get (o, "rootdir"))) {
errno = EINVAL;
goto done;
}
if (!(v = Jobj_get (o, "val"))) {
errno = EPROTO;
goto done;
}
*val = v;
if (val)
*val = v;
if (rootdir)
*rootdir = r;
rc = 0;
done:
return rc;
Expand Down
10 changes: 6 additions & 4 deletions src/modules/kvs/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ enum {

/* kvs.get
*/
json_object *kp_tget_enc (const char *key, int flags);
int kp_tget_dec (json_object *o, const char **key, int *flags);
json_object *kp_tget_enc (json_object *rootdir,
const char *key, int flags);
int kp_tget_dec (json_object *o, json_object **rootdir,
const char **key, int *flags);

json_object *kp_rget_enc (json_object *val);
int kp_rget_dec (json_object *o, json_object **val);
json_object *kp_rget_enc (json_object *rootdir, json_object *val);
int kp_rget_dec (json_object *o, json_object **rootdir, json_object **val);


/* kvs.watch
Expand Down
31 changes: 26 additions & 5 deletions src/modules/kvs/test/proto.c
Original file line number Diff line number Diff line change
@@ -1,43 +1,64 @@
#include <string.h>

#include "src/common/libutil/shortjson.h"
#include "src/modules/kvs/proto.h"
#include "src/common/libtap/tap.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/xzmalloc.h"

#include "src/modules/kvs/proto.h"
#include "src/modules/kvs/json_dirent.h"

void test_get (void)
{
JSON o;
const char *key = NULL;
JSON val = NULL;
JSON dirent = NULL;
JSON dirent2 = NULL;
int i, flags;

o = kp_tget_enc ("foo", 42);
o = kp_tget_enc (NULL, "foo", 42);
ok (o != NULL,
"kp_tget_enc works");
diag ("get request: %s", Jtostr (o));
flags = 0;
ok (kp_tget_dec (o, &key, &flags) == 0 && flags == 42,
ok (kp_tget_dec (o, NULL, &key, &flags) == 0 && flags == 42,
"kp_tget_dec works");
like (key, "^foo$",
"kp_tget_dec returned encoded key");
Jput (o);

dirent = dirent_create ("DIRREF", "sha1-abcdefabcdef00000");
o = kp_tget_enc (dirent, "foo", 42);
ok (o != NULL,
"kp_tget_enc with optional dirent arg works");
diag ("get request: %s", Jtostr (o));
flags = 0;
ok (kp_tget_dec (o, &dirent2, &key, &flags) == 0 && flags == 42,
"kp_tget_dec works");
ok (dirent_validate (dirent2) && dirent_match (dirent, dirent2),
"kp_tget_dec returned dirent");
Jput (dirent);
Jput (o);


val = Jnew ();
Jadd_int (val, "i", 42);
o = kp_rget_enc (val);
dirent = dirent_create ("DIRREF", "sha1-abcdefabcdef00000");
o = kp_rget_enc (dirent, val);
val = NULL; /* val now owned by o */
ok (o != NULL,
"kp_rget_enc works");
diag ("get response: %s", Jtostr (o));
ok (kp_rget_dec (o, &val) == 0,
ok (kp_rget_dec (o, &dirent2, &val) == 0,
"kp_rget_dec works");
// get response: { "val": { "i": 42 } }
i = 0;
ok (val && Jget_int (val, "i", &i) && i == 42,
"kp_rget_dec returned encoded object");
ok (dirent_validate (dirent2) && dirent_match (dirent, dirent2),
"kp_rget_dec returned rootref");
Jput (o); /* owns val */
}

Expand Down

0 comments on commit cdb339f

Please sign in to comment.