Skip to content

Commit

Permalink
modules/kvs: add kvs_get_dirat()
Browse files Browse the repository at this point in the history
Add kvs_get_dirat(), which is like kvs_get_dir() but looks
up key relative to a snapshot reference.

The snapshot reference is stored in the returned kvsdir_t object,
and is then used in subsequent kvsdir_get_*() operations on that
object.  It is thus possible to recursively walk a snapshot of a
section of the namespace by starting with kvs_get_dirat() and
looking up subdirectories with kvsdir_get_dir().

Fixes flux-framework#64.
  • Loading branch information
garlick committed Sep 29, 2016
1 parent 0cdf008 commit fc905ee
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 69 deletions.
3 changes: 3 additions & 0 deletions src/modules/kvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ int kvs_get_treeobj (flux_t h, const char *key, char **treeobj);
*/
int kvs_getat (flux_t h, const char *treeobj,
const char *key, char **json_str);
int kvs_get_dirat (flux_t h, const char *treeobj,
const char *key, kvsdir_t **dirp);


/* kvs_watch* is like kvs_get* except the registered callback is called
* to set the value. It will be called immediately to set the initial
Expand Down
204 changes: 135 additions & 69 deletions src/modules/kvs/libkvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

struct kvsdir_struct {
flux_t handle;
json_object *rootref; /* optional snapshot reference */
char *key;
json_object *o;
int count;
Expand Down Expand Up @@ -136,17 +137,20 @@ void kvsdir_incref (kvsdir_t *dir)
void kvsdir_destroy (kvsdir_t *dir)
{
if (--dir->usecount == 0) {
Jput (dir->rootref);
free (dir->key);
json_object_put (dir->o);
free (dir);
}
}

static kvsdir_t *kvsdir_alloc (flux_t handle, const char *key, json_object *o)
static kvsdir_t *kvsdir_alloc (flux_t handle, json_object *rootref,
const char *key, json_object *o)
{
kvsdir_t *dir = xzmalloc (sizeof (*dir));

dir->handle = handle;
dir->rootref = Jget (rootref); /* may be NULL */
dir->key = xstrdup (key);
dir->o = o;
json_object_get (dir->o);
Expand Down Expand Up @@ -326,6 +330,28 @@ int kvs_getat (flux_t h, const char *treeobj,
return -1;
}

int kvs_get_dirat (flux_t h, const char *treeobj,
const char *key, kvsdir_t **dir)
{
JSON v = NULL;
JSON rootref = NULL;
int rc = -1;

if (!treeobj || !key || !dir || !(rootref = Jfromstr (treeobj))
|| dirent_validate (rootref) < 0) {
errno = EINVAL;
goto done;
}
if (getobj (h, rootref, key, KVS_PROTO_READDIR, &v) < 0)
goto done;
*dir = kvsdir_alloc (h, rootref, key, v);
rc = 0;
done:
Jput (v);
Jput (rootref);
return rc;
}

/* deprecated */
int kvs_get_obj (flux_t h, const char *key, JSON *val)
{
Expand Down Expand Up @@ -353,7 +379,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...)
const char *k = strlen (key) > 0 ? key : ".";
if (getobj (h, NULL, k, KVS_PROTO_READDIR, &v) < 0)
goto done;
*dir = kvsdir_alloc (h, k, v);
*dir = kvsdir_alloc (h, NULL, k, v);
rc = 0;
done:
Jput (v);
Expand Down Expand Up @@ -611,7 +637,7 @@ static int dispatch_watch (flux_t h, kvs_watcher_t *wp, json_object *val)
}
case WATCH_DIR: {
kvs_set_dir_f set = wp->set;
kvsdir_t *dir = val ? kvsdir_alloc (h, wp->key, val) : NULL;
kvsdir_t *dir = val ? kvsdir_alloc (h, NULL, wp->key, val) : NULL;
rc = set (wp->key, dir, wp->arg, errnum);
if (dir)
kvsdir_destroy (dir);
Expand Down Expand Up @@ -810,7 +836,7 @@ int kvs_watch_once_dir (flux_t h, kvsdir_t **dirp, const char *fmt, ...)
}
if (*dirp)
kvsdir_destroy (*dirp);
*dirp = kvsdir_alloc (h, key, val);
*dirp = kvsdir_alloc (h, NULL, key, val);
rc = 0;
done:
if (val)
Expand Down Expand Up @@ -1342,129 +1368,169 @@ int kvs_dropcache (flux_t h)
** kvsdir_t convenience functions
**/

int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp)
static int dirgetobj (kvsdir_t *dir, const char *name,
int flags, json_object **val)
{
char *key = kvsdir_key_at (dir, name);
int rc = -1;
char *json_str = NULL;
JSON out;
char *key;
int rc;

if (kvs_get (dir->handle, key, &json_str) < 0)
goto done;
if (!(out = Jfromstr (json_str))) {
errno = EPROTO;
goto done;
}
*valp = out;
rc = 0;
done:
key = kvsdir_key_at (dir, name);
rc = getobj (dir->handle, dir->rootref, key, flags, val);
free (key);
if (json_str)
free (json_str);
return rc;
}

int kvsdir_get (kvsdir_t *dir, const char *name, char **valp)
int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp)
{
char *key;
int rc;

key = kvsdir_key_at (dir, name);
rc = kvs_get (dir->handle, key, valp);
free (key);
return dirgetobj (dir, name, 0, valp);
}

return rc;
int kvsdir_get (kvsdir_t *dir, const char *name, char **valp)
{
JSON v = NULL;
if (dirgetobj (dir, name, 0, &v) < 0)
return -1;
if (valp)
*valp = xstrdup (Jtostr (v));
Jput (v);
return 0;
}

int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...)
{
int rc;
int rc = -1;
char *name, *key;
va_list ap;
JSON v = NULL;

va_start (ap, fmt);
if (vasprintf (&name, fmt, ap) < 0)
oom ();
va_end (ap);

key = kvsdir_key_at (dir, name);
rc = kvs_get_dir (dir->handle, dirp, "%s", key);
if (getobj (dir->handle, dir->rootref, key, KVS_PROTO_READDIR, &v) < 0)
goto done;
*dirp = kvsdir_alloc (dir->handle, dir->rootref, key, v);
rc = 0;
done:
Jput (v);
free (key);

if (name)
free (name);
free (name);
return rc;
}

int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_symlink (dir->handle, key, valp);
free (key);
int rc = -1;
JSON v = NULL;

if (dirgetobj (dir, name, KVS_PROTO_READLINK, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_string) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = xstrdup (json_object_get_string (v));
rc = 0;
done:
Jput (v);
return rc;
}

int kvsdir_get_string (kvsdir_t *dir, const char *name, char **valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_string (dir->handle, key, valp);
free (key);
JSON v = NULL;
int rc = -1;

if (dirgetobj (dir, name, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_string) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = xstrdup (json_object_get_string (v));
rc = 0;
done:
Jput (v);
return rc;
}

int kvsdir_get_int (kvsdir_t *dir, const char *name, int *valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_int (dir->handle, key, valp);
free (key);
JSON v = NULL;
int rc = -1;

if (dirgetobj (dir, name, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_int) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = json_object_get_int (v);
rc = 0;
done:
Jput (v);
return rc;
}

int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_int64 (dir->handle, key, valp);
free (key);
JSON v = NULL;
int rc = -1;

if (dirgetobj (dir, name, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_int) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = json_object_get_int64 (v);
rc = 0;
done:
Jput (v);
return rc;
}

int kvsdir_get_double (kvsdir_t *dir, const char *name, double *valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_double (dir->handle, key, valp);
free (key);
JSON v = NULL;
int rc = -1;

if (dirgetobj (dir, name, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_double) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = json_object_get_double (v);
rc = 0;
done:
Jput (v);
return rc;
}

int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *valp)
{
int rc;
char *key;

key = kvsdir_key_at (dir, name);
rc = kvs_get_boolean (dir->handle, key, valp);
free (key);
JSON v = NULL;
int rc = -1;

if (dirgetobj (dir, name, 0, &v) < 0)
goto done;
if (json_object_get_type (v) != json_type_boolean) {
errno = EPROTO;
goto done;
}
if (valp)
*valp = json_object_get_boolean (v);
rc = 0;
done:
Jput (v);
return rc;
}

Expand Down

0 comments on commit fc905ee

Please sign in to comment.