From 4ddcef7b9886c0534a5a422fba556242f65ed47e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 21 Sep 2016 16:34:34 -0700 Subject: [PATCH] modules/kvs: rework kvsdir_get_*() functions If a kvsdir_t was created with kvs_get_dirat(), then the convenience functions kvsdir_get_*() operate "at" (relative to) the original treeobj. This means a recursive walk using kvsdir_get_dir() will follow a consistent snapshot, unaffected by changes being made concurrently to the KVS namespace. Fixes #64 --- src/modules/kvs/libkvs.c | 419 ++++++++++++++++----------------------- 1 file changed, 175 insertions(+), 244 deletions(-) diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 9b0ac429a048..ed89c7f1c36f 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -55,7 +55,7 @@ struct kvsdir_struct { flux_t handle; - char *treeobj; + json_object *dirent; char *key; json_object *o; int count; @@ -84,8 +84,6 @@ typedef struct { typedef struct { zhash_t *watchers; /* kvs_watch_t hashed by stringified matchtag */ - char *cwd; - zlist_t *dirstack; flux_msg_handler_t *w; json_object *ops; /* JSON array of put, unlink, etc operations */ zhash_t *fence_ops; @@ -100,12 +98,9 @@ static void freectx (void *arg) kvsctx_t *ctx = arg; if (ctx) { zhash_destroy (&ctx->watchers); - zlist_destroy (&ctx->dirstack); zhash_destroy (&ctx->fence_ops); if (ctx->w) flux_msg_handler_destroy (ctx->w); - if (ctx->cwd) - free (ctx->cwd); Jput (ctx->ops); free (ctx); } @@ -120,10 +115,6 @@ static kvsctx_t *getctx (flux_t h) ctx = xzmalloc (sizeof (*ctx)); if (!(ctx->watchers = zhash_new ())) oom (); - if (!(ctx->dirstack = zlist_new ())) - oom (); - if (!(ctx->cwd = xstrdup ("."))) - oom (); match.topic_glob = "kvs.watch"; if (!(ctx->w = flux_msg_handler_create (h, match, watch_response_cb, ctx))) @@ -133,70 +124,6 @@ static kvsctx_t *getctx (flux_t h) return ctx; } -/** - ** Current working directory implementation is just used internally - ** for now. I'm not sure it is all that useful of an abstractions - ** to expose in the KVS API. - **/ - -/* Create new path from current working directory and relative path. - * Confusing: "." is our path separator, so think of it as POSIX "/", - * and there is no equiv of POSIX "." and "..". - */ -static char *pathcat (const char *cwd, const char *relpath) -{ - char *path; - bool fq = false; - - while (*cwd == '.') - cwd++; - while (*relpath == '.') { - relpath++; - fq = true; - } - if (fq || strlen (cwd) == 0) - path = xstrdup (strlen (relpath) > 0 ? relpath : "."); - else - path = xasprintf ("%s.%s", cwd, relpath); - return path; -} - -const char *kvs_getcwd (flux_t h) -{ - kvsctx_t *ctx = getctx (h); - return ctx->cwd; -} - -#if 0 -static void kvs_chdir (flux_t h, const char *path) -{ - kvsctx_t *ctx = getctx (h); - char *new; - - new = pathcat (ctx->cwd, xstrdup (path ? path : ".")); - free (ctx->cwd); - ctx->cwd = new; -} -#endif -static void kvs_pushd (flux_t h, const char *path) -{ - kvsctx_t *ctx = getctx (h); - - if (zlist_push (ctx->dirstack, ctx->cwd) < 0) - oom (); - ctx->cwd = pathcat (ctx->cwd, path ? path : "."); -} - -static void kvs_popd (flux_t h) -{ - kvsctx_t *ctx = getctx (h); - - if (zlist_size (ctx->dirstack) > 0) { - free (ctx->cwd); - ctx->cwd = zlist_pop (ctx->dirstack); - } -} - /** ** kvsdir_t primary functions. ** A kvsdir_t is analagous to posix (DIR *). @@ -210,22 +137,21 @@ void kvsdir_incref (kvsdir_t *dir) void kvsdir_destroy (kvsdir_t *dir) { if (--dir->usecount == 0) { - if (dir->treeobj) - free (dir->treeobj); + Jput (dir->dirent); free (dir->key); json_object_put (dir->o); free (dir); } } -static kvsdir_t *kvsdir_alloc (flux_t handle, const char *treeobj, +static kvsdir_t *kvsdir_alloc (flux_t handle, json_object *dirent, const char *key, json_object *o) { kvsdir_t *dir = xzmalloc (sizeof (*dir)); dir->handle = handle; - if (treeobj) - dir->treeobj = xstrdup (treeobj); + if (dirent) + dir->dirent = Jget (dirent); // can be (likely) NULL dir->key = xstrdup (key); dir->o = o; json_object_get (dir->o); @@ -327,6 +253,15 @@ char *kvsdir_key_at (kvsdir_t *dir, const char *name) return key; } +/* private for now (used by kvsdir_get_dir()) */ +static json_object *kvsdir_treeobj_at (kvsdir_t *dir, const char *name) +{ + json_object *dirent; + if (json_object_object_get_ex (dir->o, name, &dirent)) + return dirent; + return NULL; +} + /** ** GET **/ @@ -336,7 +271,6 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, { flux_rpc_t *rpc = NULL; const char *json_str; - char *k = NULL; JSON in = NULL; JSON out = NULL; JSON v; @@ -347,8 +281,7 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, errno = EINVAL; goto done; } - k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (rootdir, k, flags))) + if (!(in = kp_tget_enc (rootdir, key, flags))) goto done; if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) goto done; @@ -367,8 +300,6 @@ static int getobj (flux_t h, json_object *rootdir, const char *key, saved_errno = errno; Jput (in); Jput (out); - if (k) - free (k); flux_rpc_destroy (rpc); errno = saved_errno; return rc; @@ -423,7 +354,7 @@ int kvs_get_dirat (flux_t h, const char *treeobj, } if (getobj (h, dirent, key, KVS_PROTO_READDIR, &v) < 0) goto done; - *dir = kvsdir_alloc (h, treeobj, key, v); + *dir = kvsdir_alloc (h, dirent, key, v); rc = 0; done: Jput (v); @@ -468,12 +399,7 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) { va_list ap; char *key = NULL; - char *k = NULL; - const char *json_str; - flux_rpc_t *rpc = NULL; - JSON in = NULL; - JSON out = NULL; - JSON v; + JSON v = NULL; int rc = -1; if (!h || !dir || !fmt) { @@ -483,29 +409,15 @@ int kvs_get_dir (flux_t h, kvsdir_t **dir, const char *fmt, ...) va_start (ap, fmt); key = xvasprintf (fmt, ap); va_end (ap); - k = pathcat (kvs_getcwd (h), key); - if (!(in = kp_tget_enc (NULL, k, KVS_PROTO_READDIR))) - goto done; - if (!(rpc = flux_rpc (h, "kvs.get", Jtostr (in), FLUX_NODEID_ANY, 0))) - goto done; - if (flux_rpc_get (rpc, NULL, &json_str) < 0) - goto done; - if (!(out = Jfromstr (json_str))) { - errno = EPROTO; - goto done; - } - if (kp_rget_dec (out, &v) < 0) + + if (getobj (h, NULL, key, KVS_PROTO_READDIR, &v) < 0) goto done; - *dir = kvsdir_alloc (h, NULL, k, v); + *dir = kvsdir_alloc (h, NULL, key, v); rc = 0; done: - Jput (in); - Jput (out); - if (k) - free (k); + Jput (v); if (key) free (key); - flux_rpc_destroy (rpc); return rc; } @@ -1130,7 +1042,6 @@ int kvs_watch_boolean (flux_t h, const char *key, kvs_set_boolean_f set, static int kvs_put_dirent (flux_t h, const char *key, json_object *dirent) { kvsctx_t *ctx = getctx (h); - char *k = NULL; int rc = -1; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; @@ -1138,11 +1049,9 @@ static int kvs_put_dirent (flux_t h, const char *key, json_object *dirent) errno = EINVAL; goto done; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent); + dirent_append (ops, key, dirent); rc = 0; done: - free (k); return rc; } @@ -1282,7 +1191,6 @@ int kvs_symlink (flux_t h, const char *key, const char *target) { kvsctx_t *ctx = getctx (h); JSON val = NULL; - char *k = NULL; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; if (!h || !key || !target) { @@ -1293,9 +1201,7 @@ int kvs_symlink (flux_t h, const char *key, const char *target) errno = ENOMEM; return -1; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent_create ("LINKVAL", val)); - free (k); + dirent_append (ops, key, dirent_create ("LINKVAL", val)); Jput (val); return 0; } @@ -1304,16 +1210,13 @@ int kvs_mkdir (flux_t h, const char *key) { kvsctx_t *ctx = getctx (h); JSON val = Jnew (); - char *k = NULL; JSON *ops = ctx->fence_context ? &ctx->fence_context : &ctx->ops; if (!h || !key) { errno = EINVAL; return -1; } - k = pathcat (kvs_getcwd (h), key); - dirent_append (ops, k, dirent_create ("DIRVAL", val)); - free (k); + dirent_append (ops, key, dirent_create ("DIRVAL", val)); Jput (val); return 0; } @@ -1498,232 +1401,260 @@ int kvs_dropcache (flux_t h) ** kvsdir_t convenience functions **/ -int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **valp) +static int dirgetobj (kvsdir_t *dir, const char *name, + int flags, json_object **val) { + char *key = kvsdir_key_at (dir, name); int rc = -1; - char *json_str = NULL; - JSON out; - kvs_pushd (dir->handle, dir->key); - if (kvs_get (dir->handle, name, &json_str) < 0) - goto done; - if (!(out = Jfromstr (json_str))) { - errno = EPROTO; + if (getobj (dir->handle, dir->dirent, key, flags, val) < 0) goto done; - } - *valp = out; rc = 0; done: - kvs_popd (dir->handle); - if (json_str) - free (json_str); + free (key); return rc; } -int kvsdir_get (kvsdir_t *dir, const char *name, char **valp) +int kvsdir_get_obj (kvsdir_t *dir, const char *name, json_object **val) { - int rc; + return dirgetobj (dir, name, 0, val); +} - kvs_pushd (dir->handle, dir->key); - rc = kvs_get (dir->handle, name, valp); - kvs_popd (dir->handle); +int kvsdir_get (kvsdir_t *dir, const char *name, char **val) +{ + JSON v; - return rc; + if (dirgetobj (dir, name, 0, &v) < 0) + return -1; + if (val) + *val = xstrdup (Jtostr (v)); + Jput (v); + return 0; } int kvsdir_get_dir (kvsdir_t *dir, kvsdir_t **dirp, const char *fmt, ...) { - int rc; - char *name; va_list ap; + char *name = NULL; + char *key = NULL; + JSON v; + JSON dirent; + int rc = -1; + if (!dir || !dirp || !fmt) { + errno = EINVAL; + goto done; + } va_start (ap, fmt); - if (vasprintf (&name, fmt, ap) < 0) - oom (); + name = xvasprintf (fmt, ap); va_end (ap); - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_dir (dir->handle, dirp, "%s", name); - kvs_popd (dir->handle); - + key = kvsdir_key_at (dir, name); + dirent = kvsdir_treeobj_at (dir, name); + if (getobj (dir->handle, dir->dirent, key, KVS_PROTO_READDIR, &v) < 0) + goto done; + *dirp = kvsdir_alloc (dir->handle, dir->dirent ? dirent : NULL, key, v); + rc = 0; +done: + Jput (v); if (name) free (name); + if (key) + free (key); return rc; } -int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **valp) +int kvsdir_get_symlink (kvsdir_t *dir, const char *name, char **val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_symlink (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, KVS_PROTO_READLINK, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_string) { + errno = EPROTO; + goto done; + } + if (val) + *val = xstrdup (json_object_get_string (v)); + rc = 0; +done: + Jput (v); return rc; } -int kvsdir_get_string (kvsdir_t *dir, const char *name, char **valp) +int kvsdir_get_string (kvsdir_t *dir, const char *name, char **val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_string (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_string) { + errno = EPROTO; + goto done; + } + if (val) + *val = xstrdup (json_object_get_string (v)); + rc = 0; +done: + Jput (v); return rc; } -int kvsdir_get_int (kvsdir_t *dir, const char *name, int *valp) +int kvsdir_get_int (kvsdir_t *dir, const char *name, int *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_int (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_int) { + errno = EPROTO; + goto done; + } + if (val) + *val = json_object_get_int (v); + rc = 0; +done: + Jput (v); return rc; } -int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *valp) +int kvsdir_get_int64 (kvsdir_t *dir, const char *name, int64_t *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_int64 (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_int) { + errno = EPROTO; + goto done; + } + if (val) + *val = json_object_get_int64 (v); + rc = 0; +done: + Jput (v); return rc; } -int kvsdir_get_double (kvsdir_t *dir, const char *name, double *valp) +int kvsdir_get_double (kvsdir_t *dir, const char *name, double *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_double (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_double) { + errno = EPROTO; + goto done; + } + if (val) + *val = json_object_get_double (v); + rc = 0; +done: + Jput (v); return rc; } -int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *valp) +int kvsdir_get_boolean (kvsdir_t *dir, const char *name, bool *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_get_boolean (dir->handle, name, valp); - kvs_popd (dir->handle); + JSON v = NULL; + int rc = -1; + if (dirgetobj (dir, name, 0, &v) < 0) + goto done; + if (json_object_get_type (v) != json_type_boolean) { + errno = EPROTO; + goto done; + } + if (val) + *val = json_object_get_boolean (v); + rc = 0; +done: + Jput (v); return rc; } int kvsdir_put_obj (kvsdir_t *dir, const char *name, json_object *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put (dir->handle, name, Jtostr (val)); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put (dir->handle, key, Jtostr (val)); + free (key); return (rc); } int kvsdir_put (kvsdir_t *dir, const char *name, const char *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put (dir->handle, key, val); + free (key); return (rc); } int kvsdir_put_string (kvsdir_t *dir, const char *name, const char *val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_string (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put_string (dir->handle, key, val); + free (key); return (rc); } int kvsdir_put_int (kvsdir_t *dir, const char *name, int val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_int (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put_int (dir->handle, key, val); + free (key); return (rc); } int kvsdir_put_int64 (kvsdir_t *dir, const char *name, int64_t val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_int64 (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put_int64 (dir->handle, key, val); + free (key); return (rc); } int kvsdir_put_double (kvsdir_t *dir, const char *name, double val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_double (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put_double (dir->handle, key, val); + free (key); return (rc); } int kvsdir_put_boolean (kvsdir_t *dir, const char *name, bool val) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_put_boolean (dir->handle, name, val); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_put_boolean (dir->handle, key, val); + free (key); return (rc); } int kvsdir_mkdir (kvsdir_t *dir, const char *name) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_mkdir (dir->handle, name); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_mkdir (dir->handle, key); + free (key); return (rc); } int kvsdir_symlink (kvsdir_t *dir, const char *name, const char *target) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_symlink (dir->handle, name, target); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_symlink (dir->handle, key, target); + free (key); return (rc); } int kvsdir_unlink (kvsdir_t *dir, const char *name) { - int rc; - - kvs_pushd (dir->handle, dir->key); - rc = kvs_unlink (dir->handle, name); - kvs_popd (dir->handle); - + char *key = kvsdir_key_at (dir, name); + int rc = kvs_unlink (dir->handle, key); + free (key); return (rc); }