-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
libkvs: cleanup, deprecate old functions, update users #1233
Changes from all commits
2a473c9
97163c5
5d00af2
a6ff79f
0acbac2
0f2e38a
e33cd6a
eb832d9
690a1e8
d09234a
a94e0d5
bbc01d3
7ea51fa
42e52a8
0f8bd26
0a898c7
83a8505
d1dcef8
fef0392
81e771b
abbfd2c
291b888
f512c07
914ab8b
eb6d7d7
d1ec8b5
be3fa50
7a2a21e
84085b2
eea640a
e475343
a377466
7eb5542
9465b92
3a3a1d8
4a24b8c
9e23b76
04bd4e0
8101399
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -400,3 +400,5 @@ symlink | |
nprocs | ||
procs | ||
txn | ||
kvsdir | ||
vpack |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -254,8 +254,11 @@ static int l_flux_new (lua_State *L) | |
static int l_flux_kvsdir_new (lua_State *L) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. commit seems to do more than just update to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh yeah, this specific line is not relevant, just didn't know how to comment on a commit message. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that was pretty vague. Changed to
|
||
{ | ||
const char *path = "."; | ||
kvsdir_t *dir; | ||
const flux_kvsdir_t *dir; | ||
flux_kvsdir_t *cpy; | ||
flux_t *f = lua_get_flux (L, 1); | ||
flux_future_t *fut = NULL; | ||
int rc; | ||
|
||
if (lua_isstring (L, 2)) { | ||
/* | ||
|
@@ -265,10 +268,16 @@ static int l_flux_kvsdir_new (lua_State *L) | |
return (2); | ||
path = lua_tostring (L, 2); | ||
} | ||
|
||
if (kvs_get_dir (f, &dir, "%s", path) < 0) | ||
return lua_pusherror (L, (char *)flux_strerror (errno)); | ||
return lua_push_kvsdir (L, dir); | ||
if (!(fut = flux_kvs_lookup (f, FLUX_KVS_READDIR, path)) | ||
|| flux_kvs_lookup_get_dir (fut, &dir) < 0 | ||
|| !(cpy = flux_kvsdir_copy (dir))) { | ||
rc = lua_pusherror (L, (char *)flux_strerror (errno)); | ||
goto done; | ||
} | ||
rc = lua_push_kvsdir (L, cpy); | ||
done: | ||
flux_future_destroy (fut); | ||
return rc; | ||
} | ||
|
||
static int l_flux_kvs_symlink (lua_State *L) | ||
|
@@ -284,7 +293,7 @@ static int l_flux_kvs_symlink (lua_State *L) | |
if (!(target = lua_tostring (L, 3))) | ||
return lua_pusherror (L, "target expected in arg #3"); | ||
|
||
if (kvs_symlink (f, key, target) < 0) | ||
if (flux_kvs_symlink (f, key, target) < 0) | ||
return lua_pusherror (L, (char *)flux_strerror (errno)); | ||
lua_pushboolean (L, true); | ||
return (1); | ||
|
@@ -299,7 +308,7 @@ static int l_flux_kvs_unlink (lua_State *L) | |
if (!(key = lua_tostring (L, 2))) | ||
return lua_pusherror (L, "key expected in arg #2"); | ||
|
||
if (kvs_unlink (f, key) < 0) | ||
if (flux_kvs_unlink (f, key) < 0) | ||
return lua_pusherror (L, (char *)flux_strerror (errno)); | ||
lua_pushboolean (L, true); | ||
return (1); | ||
|
@@ -309,8 +318,9 @@ static int l_flux_kvs_type (lua_State *L) | |
{ | ||
flux_t *f; | ||
const char *key; | ||
char *val; | ||
kvsdir_t *d; | ||
const flux_kvsdir_t *dir; | ||
const char *json_str; | ||
flux_kvsdir_t *cpy; | ||
flux_future_t *future; | ||
const char *target; | ||
|
||
|
@@ -327,29 +337,36 @@ static int l_flux_kvs_type (lua_State *L) | |
return (2); | ||
} | ||
flux_future_destroy (future); | ||
if (kvs_get_dir (f, &d, "%s", key) == 0) { | ||
if ((future = flux_kvs_lookup (f, FLUX_KVS_READDIR, key)) | ||
&& flux_kvs_lookup_get_dir (future, &dir) == 0 | ||
&& (cpy = flux_kvsdir_copy (dir))) { | ||
lua_pushstring (L, "dir"); | ||
lua_push_kvsdir (L, d); | ||
lua_push_kvsdir (L, cpy); | ||
flux_future_destroy (future); | ||
return (2); | ||
} | ||
if (kvs_get (f, key, &val) == 0) { | ||
flux_future_destroy (future); | ||
if ((future = flux_kvs_lookup (f, 0, key)) | ||
&& flux_kvs_lookup_get (future, &json_str) == 0) { | ||
json_object *o; | ||
lua_pushstring (L, "file"); | ||
if (val && (o = json_tokener_parse (val))) { | ||
if (json_str && (o = json_tokener_parse (json_str))) { | ||
json_object_to_lua (L, o); | ||
json_object_put (o); | ||
} | ||
else | ||
lua_pushnil (L); | ||
flux_future_destroy (future); | ||
return (2); | ||
} | ||
flux_future_destroy (future); | ||
return lua_pusherror (L, "key does not exist"); | ||
} | ||
|
||
int l_flux_kvs_commit (lua_State *L) | ||
{ | ||
flux_t *f = lua_get_flux (L, 1); | ||
if (kvs_commit (f, 0) < 0) | ||
if (flux_kvs_commit_anon (f, 0) < 0) | ||
return lua_pusherror (L, (char *)flux_strerror (errno)); | ||
lua_pushboolean (L, true); | ||
return (1); | ||
|
@@ -364,16 +381,16 @@ int l_flux_kvs_put (lua_State *L) | |
return lua_pusherror (L, "key required"); | ||
|
||
if (lua_isnil (L, 3)) | ||
rc = kvs_put (f, key, NULL); | ||
rc = flux_kvs_put (f, key, NULL); | ||
else { | ||
json_object *o; | ||
if (lua_value_to_json (L, 3, &o) < 0) | ||
return lua_pusherror (L, "Unable to convert to json"); | ||
rc = kvs_put (f, key, json_object_to_json_string (o)); | ||
rc = flux_kvs_put (f, key, json_object_to_json_string (o)); | ||
json_object_put (o); | ||
} | ||
if (rc < 0) | ||
return lua_pusherror (L, "kvs_put (%s): %s", | ||
return lua_pusherror (L, "flux_kvs_put (%s): %s", | ||
key, (char *)flux_strerror (errno)); | ||
|
||
lua_pushboolean (L, true); | ||
|
@@ -382,24 +399,34 @@ int l_flux_kvs_put (lua_State *L) | |
|
||
int l_flux_kvs_get (lua_State *L) | ||
{ | ||
char *json_str; | ||
json_object *o; | ||
flux_future_t *fut = NULL; | ||
const char *json_str; | ||
json_object *o = NULL; | ||
flux_t *f = lua_get_flux (L, 1); | ||
const char *key = lua_tostring (L, 2); | ||
int rc; | ||
|
||
if (key == NULL) | ||
return lua_pusherror (L, "key required"); | ||
if (kvs_get (f, key, &json_str) < 0) | ||
return lua_pusherror (L, "kvs_get: %s", (char *)flux_strerror (errno)); | ||
if (key == NULL) { | ||
rc = lua_pusherror (L, "key required"); | ||
goto done; | ||
} | ||
if (!(fut = flux_kvs_lookup (f, 0, key)) | ||
|| flux_kvs_lookup_get (fut, &json_str) < 0) { | ||
rc = lua_pusherror (L, "flux_kvs_lookup: %s", | ||
(char *)flux_strerror (errno)); | ||
goto done; | ||
} | ||
if (!(o = json_tokener_parse (json_str)) | ||
|| (json_object_to_lua (L, o) < 0)) { | ||
free (json_str); | ||
return lua_pusherror (L, "json_tokener_parse: %s", | ||
rc = lua_pusherror (L, "json_tokener_parse: %s", | ||
(char *)flux_strerror (errno)); | ||
goto done; | ||
} | ||
free (json_str); | ||
rc = 1; | ||
done: | ||
json_object_put (o); | ||
return (1); | ||
flux_future_destroy (fut); | ||
return (rc); | ||
} | ||
|
||
static int l_flux_barrier (lua_State *L) | ||
|
@@ -1058,7 +1085,7 @@ static int l_msghandler_newindex (lua_State *L) | |
return (0); | ||
} | ||
|
||
static int kvswatch_cb_common (const char *key, kvsdir_t *dir, | ||
static int kvswatch_cb_common (const char *key, flux_kvsdir_t *dir, | ||
json_object *val, void *arg, int errnum) | ||
{ | ||
int rc; | ||
|
@@ -1102,7 +1129,8 @@ static int kvswatch_cb_common (const char *key, kvsdir_t *dir, | |
return rc; | ||
} | ||
|
||
static int l_kvsdir_watcher (const char *key, kvsdir_t *dir, void *arg, int errnum) | ||
static int l_kvsdir_watcher (const char *key, flux_kvsdir_t *dir, | ||
void *arg, int errnum) | ||
{ | ||
return kvswatch_cb_common (key, dir, NULL, arg, errnum); | ||
} | ||
|
@@ -1123,7 +1151,7 @@ static int l_kvswatcher_remove (lua_State *L) | |
struct l_flux_ref *kw = luaL_checkudata (L, 1, "FLUX.kvswatcher"); | ||
l_flux_ref_gettable (kw, "kvswatcher"); | ||
lua_getfield (L, -1, "key"); | ||
if (kvs_unwatch (kw->flux, lua_tostring (L, -1)) < 0) | ||
if (flux_kvs_unwatch (kw->flux, lua_tostring (L, -1)) < 0) | ||
return (lua_pusherror (L, "kvs_unwatch: %s", | ||
(char *)flux_strerror (errno))); | ||
/* | ||
|
@@ -1159,9 +1187,9 @@ static int l_kvswatcher_add (lua_State *L) | |
kw = l_flux_ref_create (L, f, 2, "kvswatcher"); | ||
lua_getfield (L, 2, "isdir"); | ||
if (lua_toboolean (L, -1)) | ||
rc = kvs_watch_dir (f, l_kvsdir_watcher, (void *) kw, "%s", key); | ||
rc = flux_kvs_watch_dir (f, l_kvsdir_watcher, (void *) kw, "%s", key); | ||
else | ||
rc = kvs_watch (f, key, l_kvswatcher, (void *) kw); | ||
rc = flux_kvs_watch (f, key, l_kvswatcher, (void *) kw); | ||
if (rc < 0) { | ||
l_flux_ref_destroy (kw, "kvswatcher"); | ||
return lua_pusherror (L, (char *)flux_strerror (errno)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you are considering removal of the
kvsdir
abstraction, but in the context of the addition offlux_kvs_lookup_get_dir
it might be useful to consider addition of a refcount to kvsdir objects. This would avoid the necessary copies in the common case where we want the kvsdir to persist, but we're done with the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have
flux_kvsdir_incref()
but since the object returned from the lookup is const (indicating that it still belongs to the future), it seems a bit wrong to allow its reference count to be modified? This is why I addedflux_kvsdir_copy()
, but I intended that to be short lived until the various places whereflux_kvsdir_t
was used for purposes other than "list of names in directory" were converted to something else TBD.As we just discussed offline, possibly we need to come up with a new object that serves as a handle to the KVS, and contains things like a "current working directory", a snapshot reference, or a namespace handle, as well as the broker handle, and then all KVS functions would take one of those instead of the broker handle... I'll open an issue on that idea.