Skip to content

Commit

Permalink
Merge pull request #1253 from garlick/kvs_api_cleanup
Browse files Browse the repository at this point in the history
KVS: clean up lookup and txn API's
  • Loading branch information
chu11 authored Oct 25, 2017
2 parents 0f0f5eb + 29e8f6e commit 2694116
Show file tree
Hide file tree
Showing 20 changed files with 423 additions and 295 deletions.
8 changes: 7 additions & 1 deletion doc/man3/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ MAN3_FILES_SECONDARY = \
flux_kvs_lookup_get_unpack.3 \
flux_kvs_lookup_get_raw.3 \
flux_kvs_lookup_get_dir.3 \
flux_kvs_lookup_get_treeobj.3 \
flux_kvs_lookup_get_symlink.3 \
flux_kvs_fence.3 \
flux_kvs_txn_destroy.3 \
flux_kvs_txn_put.3 \
Expand All @@ -136,7 +138,8 @@ MAN3_FILES_SECONDARY = \
flux_kvs_txn_mkdir.3 \
flux_kvs_txn_unlink.3 \
flux_kvs_txn_symlink.3 \
flux_kvs_txn_put_raw.3
flux_kvs_txn_put_raw.3 \
flux_kvs_txn_put_treeobj.3

ADOC_FILES = $(MAN3_FILES_PRIMARY:%.3=%.adoc)
XML_FILES = $(MAN3_FILES_PRIMARY:%.3=%.xml)
Expand Down Expand Up @@ -239,6 +242,8 @@ flux_kvs_lookup_get.3: flux_kvs_lookup.3
flux_kvs_lookup_get_unpack.3: flux_kvs_lookup.3
flux_kvs_lookup_get_raw.3: flux_kvs_lookup.3
flux_kvs_lookup_get_dir.3: flux_kvs_lookup.3
flux_kvs_lookup_treeobj.3: flux_kvs_lookup.3
flux_kvs_lookup_symlink.3: flux_kvs_lookup.3
flux_kvs_fence.3: flux_kvs_commit.3
flux_kvs_txn_destroy.3: flux_kvs_txn_create.3
flux_kvs_txn_put.3: flux_kvs_txn_create.3
Expand All @@ -248,6 +253,7 @@ flux_kvs_txn_mkdir.3: flux_kvs_txn_create.3
flux_kvs_txn_unlink.3: flux_kvs_txn_create.3
flux_kvs_txn_symlink.3: flux_kvs_txn_create.3
flux_kvs_txn_put_raw.3: flux_kvs_txn_create.3
flux_kvs_txn_put_treeobj.3: flux_kvs_txn_create.3

flux_open.3: topen.c
flux_send.3: tsend.c
Expand Down
96 changes: 63 additions & 33 deletions doc/man3/flux_kvs_lookup.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ flux_kvs_lookup(3)

NAME
----
flux_kvs_lookup, flux_kvs_lookupat, flux_kvs_lookup_get, flux_kvs_lookup_get_unpack, flux_kvs_lookup_get_raw, flux_kvs_lookup_get_dir - look up KVS key
flux_kvs_lookup, flux_kvs_lookupat, flux_kvs_lookup_get, flux_kvs_lookup_get_unpack, flux_kvs_lookup_get_raw, flux_kvs_lookup_get_dir, flux_kvs_lookup_get_treeobj, flux_kvs_lookup_get_symlink - look up KVS key


SYNOPSIS
Expand All @@ -17,49 +17,72 @@ SYNOPSIS
flux_future_t *flux_kvs_lookupat (flux_t *h, int flags,
const char *key, const char *treeobj);

int flux_kvs_lookup_get (flux_future_t *f, const char **json_str);
int flux_kvs_lookup_get (flux_future_t *f, const char **value);

int flux_kvs_lookup_get_unpack (flux_future_t *f, const char *fmt, ...);

int flux_kvs_lookup_get_raw (flux_future_t *f, const void **data, int *len);
int flux_kvs_lookup_get_raw (flux_future_t *f,
const void **data, int *len);

int flux_kvs_lookup_get_dir (flux_future_t *f, const flux_kvsdir_t **dir);
int flux_kvs_lookup_get_dir (flux_future_t *f,
const flux_kvsdir_t **dir);

int flux_kvs_lookup_get_treeobj (flux_future_t *f, const char **treeobj);

int flux_kvs_lookup_get_symlink (flux_future_t *f, const char **target);


DESCRIPTION
-----------
The Flux Key Value Store is a general purpose distributed storage
service used by Flux services. It is (currently) a single namespace
per Flux instance and is available globally, with "loosely consistent"
semantics.
service used by Flux services.
`flux_kvs_lookup()` sends a request to the KVS service to translate
_key_ to its current value. It returns a `flux_future_t` object which
acts as handle for synchronization and container for the response.
_flags_ modifies the request as described below.
`flux_kvs_lookup()` sends a request to the KVS service to look up _key_.
It returns a `flux_future_t` object which acts as handle for synchronization
and container for the result. _flags_ modifies the request as described
below.
`flux_kvs_lookupat()` is identical to `flux_kvs_lookup()` except
_treeobj_ is a serialized JSON treeobj object that references a
particular snapshot within the KVS.
_treeobj_ is a serialized RFC 11 object that references a particular
static set of content within the KVS, effectively a snapshot.
See `flux_kvs_lookup_get_treeobj()` below.
All the functions below are variations on a common theme. First they
complete the lookup RPC by blocking on the response, if not already received.
Then they interpret the result in different ways. They may be called more
than once on the same future, and they may be intermixed to probe a result
or interpret it in different ways. Results remain valid until
`flux_future_destroy()` is called.
`flux_kvs_lookup_get()` interprets the result as a NULL-terminated string
value. The value is assigned to _value_. If the value is empty, NULL
is assigned.
`flux_kvs_lookup_get_unpack()` interprets the result as a NULL-terminated
string value, then as encoded JSON (not necessarily enclosed in an object).
The value is parsed according to variable arguments in Jansson `json_unpack()`
format.
`flux_kvs_lookup_get ()` completes a lookup operation, blocking on
response(s) if needed, parsing the result, and returning the requested
value in _json_str_. _buf_ is valid until `flux_future_destroy()` is called.
_json_str_ may be a JSON object, array, or bare value.
`flux_kvs_lookup_get_raw()` interprets the result as a raw, opaque value,
which it assigns to _buf_ and its length to _len_. The value may be any
byte sequence. If the value has zero length, NULL is assigned to _buf_.
`flux_kvs_lookup_get_unpack()` is identical to `flux_kvs_lookup_get()` except
the returned JSON is parsed according to variable arguments in Jansson
`json_unpack()` format.
`flux_kvs_lookup_get_dir()` interprets the result as a directory,
e.g. in response to a lookup with the FLUX_KVS_READDIR flag set.
The directory object is assigned to _dir_.
`flux_kvs_lookup_get_raw()` is identical to `flux_kvs_lookup_get()` except
the raw value is returned without decoding.
`flux_kvs_lookup_get_treeobj()` interprets the result as any RFC 11 object.
The object in JSON-encoded form is assigned to _treeobj_. Since all
lookup requests return an RFC 11 object of one type or another, this
function should work on all.
`flux_kvs_lookup_get_dir()` is identical to `flux_kvs_lookup_get()` except
a directory object is returned.
`flux_kvs_lookup_get_symlink()` interprets the result as a symlink target,
e.g. in response to a lookup with the FLUX_KVS_READLINK flag set.
The result is parsed and symlink target is assigned to _target_.
These functions may be used asynchronously.
See `flux_future_then(3)` for details.
These functions may be used asynchronously. See `flux_future_then(3)` for
details.
FLAGS
Expand All @@ -73,10 +96,15 @@ Look up a directory, not a value. The lookup fails if the key does
not refer to a directory object.

FLUX_KVS_READLINK::
If key is a symlink, read the link value, what it refers to.
If key is a symlink, read the link value. The lookup fails if the key
does not refer to a symlink object.

FLUX_KVS_TREEOBJ::
Return the object representation.
All KVS lookups return an RFC 11 tree object. This flag requests that
they be returned without conversion. That is, a "valref" will not
be converted to a "val" object, and a "dirref" will not be converted
to a "dir" object. This is useful for obtaining a snapshot reference
that can be passed to `flux_kvs_lookupat()`.


RETURN VALUE
Expand All @@ -85,16 +113,18 @@ RETURN VALUE
`flux_kvs_lookup()` and `flux_kvs_lookupat()` return a
`flux_future_t` on success, or NULL on failure with errno set appropriately.
`flux_kvs_lookup_get()`, `flux_kvs_lookup_get_unpack()`, and
`flux_kvs_lookup_get_raw()` return 0 on success, or -1 on failure with
errno set appropriately.
`flux_kvs_lookup_get()`, `flux_kvs_lookup_get_unpack()`,
`flux_kvs_lookup_get_raw()`, `flux_kvs_lookup_get_dir()`,
`flux_kvs_lookup_get_treeobj()`, and `flux_kvs_lookup_get_symlink()`
return 0 on success, or -1 on failure with errno set appropriately.
ERRORS
------

EINVAL::
One of the arguments was invalid.
One of the arguments was invalid, or FLUX_KVS_READLINK was used but
the key does not refer to a symlink.

ENOMEM::
Out of memory.
Expand All @@ -109,7 +139,7 @@ EISDIR::
FLUX_KVS_READDIR flag was NOT set and key points to a directory.

EPROTO::
A request was malformed.
A request or response was malformed.

EFBIG::

Expand Down
49 changes: 22 additions & 27 deletions doc/man3/flux_kvs_txn_create.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ flux_kvs_txn_create(3)

NAME
----
flux_kvs_txn_create, flux_kvs_txn_destroy, flux_kvs_txn_put, flux_kvs_txn_pack, flux_kvs_txn_vpack, flux_kvs_txn_mkdir, flux_kvs_txn_unlink, flux_kvs_txn_symlink, flux_kvs_txn_put_raw - operate on a KVS transaction object
flux_kvs_txn_create, flux_kvs_txn_destroy, flux_kvs_txn_put, flux_kvs_txn_pack, flux_kvs_txn_vpack, flux_kvs_txn_mkdir, flux_kvs_txn_unlink, flux_kvs_txn_symlink, flux_kvs_txn_put_raw, flux_kvs_txn_put_treeobj - operate on a KVS transaction object


SYNOPSIS
Expand All @@ -17,7 +17,7 @@ SYNOPSIS
void flux_kvs_txn_destroy (flux_kvs_txn_t *txn);

int flux_kvs_txn_put (flux_kvs_txn_t *txn, int flags,
const char *key, const char *json_str);
const char *key, const char *value);

int flux_kvs_txn_pack (flux_kvs_txn_t *txn, int flags,
const char *key, const char *fmt, ...);
Expand All @@ -37,33 +37,33 @@ SYNOPSIS
int flux_kvs_txn_put_raw (flux_kvs_txn_t *txn, int flags,
const char *key, const void *data, int len);

int flux_kvs_txn_put_treeobj (flux_kvs_txn_t *txn, int flags,
const char *key, const char *treeobj);


DESCRIPTION
-----------
The Flux Key Value Store is a general purpose distributed storage
service used by Flux services.
`flux_kvs_txn_create()` creates a KVS transaction object that may be
passed to `flux_kvs_commit(3)` or `flux_kvs_fence(3)`. The transaction
consists of a list of operations that are applied to the KVS together,
in order. The entire transaction either succeeds or fails. After commit
or fence, the object must be destroyed with `flux_kvs_txn_destroy()`.
`flux_kvs_txn_put()` sets _key_ to a value represented by _json_str_.
If _key_ does not exist it is created. _key_ is hierarchical, with period
(".") used as a path separator. "." represents the root of the namespace
and is optional at the beginning of _key_. Any path components in _key_
that do not exist is created. Any path components in _key_ that must be
converted to directories are overwritten. The value _json_str_ may be be
any bare JSON value (except null), a JSON array, or a JSON object, encoded
as a string. Alternatively, the FLUX_KVS_TREEOBJ flag may be specified
indicating that the _json_str_ value is to be interpreted as an RFC 11
tree object, as described in FLAGS below. A NULL _json_str_ value is
equivalent to calling `flux_kvs_txn_unlink()` on _key_.
`flux_kvs_txn_pack()` is identical to `flux_kvs_txn_put()`, except
`json_pack()` style arguments (see below) are used to construct the
value. `flux_kvs_txn_vpack()` is a variant that accepts a _va_list_
argument.
Each function below adds a single operation to _txn_. _key_ is a
hierarchical path name with period (".") used as path separator.
When the transaction is committed, any existing keys or path components
that are in conflict with the requested operation are overwritten.
`flux_kvs_txn_put()` sets _key_ to a NULL terminated string _value_.
_value_ may be NULL indicating that an empty value should be stored.
`flux_kvs_txn_pack()` sets _key_ to a NULL terminated string encoded
from a JSON object built with `json_pack()` style arguments (see below).
`flux_kvs_txn_vpack()` is a variant that accepts a _va_list_ argument.
`flux_kvs_txn_mkdir()` sets _key_ to an empty directory.
Expand All @@ -76,19 +76,14 @@ another key. _target_ need not exist.
`flux_kvs_txn_put_raw()` sets _key_ to a value containing raw data
referred to by _data_ of length _len_.
`flux_kvs_txn_put_treeobj()` sets _key_ to an RFC 11 object, encoded
as a JSON string.
FLAGS
-----

The _flags_ argument may be zero, or a bitmask of one or more of the
following flags:

FLUX_KVS_TREEOBJ::
The specified value is interpreted as an RFC 11 tree object (KVS meta data)
rather than the actual value. Currently the only way to obtain a valid
tree object is with `flux_kvs_lookup(3)` or `flux_kvs_lookupat(3)`. In
the future, other methods may be available. Note: this flag is only
valid for `flux_kvs_txn_put()` and `flux_kvs_txn_pack()`.
The _flags_ argument is currently unused and must be zero.


include::JSON_PACK.adoc[]
Expand Down
2 changes: 2 additions & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,5 @@ procs
txn
kvsdir
vpack
dirref
lookups
2 changes: 1 addition & 1 deletion src/bindings/lua/flux-lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ static int l_flux_kvs_type (lua_State *L)
return lua_pusherror (L, "key expected in arg #2");

if ((future = flux_kvs_lookup (f, FLUX_KVS_READLINK, key))
&& flux_kvs_lookup_get (future, &target) == 0) {
&& flux_kvs_lookup_get_symlink (future, &target) == 0) {
lua_pushstring (L, "symlink");
lua_pushstring (L, target);
flux_future_destroy (future);
Expand Down
26 changes: 15 additions & 11 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,13 @@ int cmd_put (optparse_t *p, int argc, char **argv)
log_msg_exit ("put: you must specify a value as key=value");
*val++ = '\0';

if (flux_kvs_txn_put (txn, 0, key, val) < 0) {
if (errno != EINVAL)
json_t *obj;
if ((obj = json_loads (val, JSON_DECODE_ANY, NULL))) {
if (flux_kvs_txn_put (txn, 0, key, val) < 0)
log_err_exit ("%s", key);
json_decref (obj);
}
else { // encode as JSON string if not already valid encoded JSON
if (flux_kvs_txn_pack (txn, 0, key, "s", val) < 0)
log_err_exit ("%s", key);
}
Expand Down Expand Up @@ -506,7 +510,7 @@ int cmd_readlink (optparse_t *p, int argc, char **argv)

for (i = optindex; i < argc; i++) {
if (!(f = flux_kvs_lookup (h, FLUX_KVS_READLINK, argv[i]))
|| flux_kvs_lookup_get (f, &target) < 0)
|| flux_kvs_lookup_get_symlink (f, &target) < 0)
log_err_exit ("%s", argv[i]);
else
printf ("%s\n", target);
Expand Down Expand Up @@ -781,7 +785,7 @@ static void dump_kvs_dir (const flux_kvsdir_t *dir, bool Ropt, bool dopt)
if (flux_kvsdir_issymlink (dir, name)) {
const char *link;
if (!(f = flux_kvs_lookupat (h, FLUX_KVS_READLINK, key, rootref))
|| flux_kvs_lookup_get (f, &link) < 0)
|| flux_kvs_lookup_get_symlink (f, &link) < 0)
log_err_exit ("%s", key);
printf ("%s -> %s\n", key, link);
flux_future_destroy (f);
Expand Down Expand Up @@ -1056,7 +1060,7 @@ static int categorize_key (optparse_t *p, const char *key,
}
if (!(f = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, nkey)))
log_err_exit ("flux_kvs_lookup");
if (flux_kvs_lookup_get (f, &json_str) < 0) {
if (flux_kvs_lookup_get_treeobj (f, &json_str) < 0) {
fprintf (stderr, "%s: %s\n", nkey, flux_strerror (errno));
goto error;
}
Expand Down Expand Up @@ -1166,12 +1170,12 @@ int cmd_copy (optparse_t *p, int argc, char **argv)
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey))
|| flux_kvs_lookup_get (f, &json_str) < 0)
|| flux_kvs_lookup_get_treeobj (f, &json_str) < 0)
log_err_exit ("flux_kvs_lookup %s", srckey);
if (!(txn = flux_kvs_txn_create ()))
log_err_exit ("flux_kvs_txn_create");
if (flux_kvs_txn_put (txn, FLUX_KVS_TREEOBJ, dstkey, json_str) < 0)
log_err_exit( "flux_kvs_txn_put");
if (flux_kvs_txn_put_treeobj (txn, 0, dstkey, json_str) < 0)
log_err_exit( "flux_kvs_txn_put_treeobj");
flux_future_destroy (f);

if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0)
Expand All @@ -1198,12 +1202,12 @@ int cmd_move (optparse_t *p, int argc, char **argv)
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey))
|| flux_kvs_lookup_get (f, &json_str) < 0)
|| flux_kvs_lookup_get_treeobj (f, &json_str) < 0)
log_err_exit ("flux_kvs_lookup %s", srckey);
if (!(txn = flux_kvs_txn_create ()))
log_err_exit ("flux_kvs_txn_create");
if (flux_kvs_txn_put (txn, FLUX_KVS_TREEOBJ, dstkey, json_str) < 0)
log_err_exit( "flux_kvs_txn_put");
if (flux_kvs_txn_put_treeobj (txn, 0, dstkey, json_str) < 0)
log_err_exit( "flux_kvs_txn_put_treeobj");
if (flux_kvs_txn_unlink (txn, 0, srckey) < 0)
log_err_exit( "flux_kvs_txn_unlink");
flux_future_destroy (f);
Expand Down
2 changes: 1 addition & 1 deletion src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ static int jobid_exist (flux_t *h, int64_t j)
if (path == NULL)
goto done;
if (!(f = flux_kvs_lookup (h, FLUX_KVS_READDIR, path))
|| flux_kvs_lookup_get (f, NULL) < 0) {
|| flux_kvs_lookup_get_dir (f, NULL) < 0) {
flux_log (h, LOG_DEBUG, "flux_kvs_lookup(%s): %s",
path, flux_strerror (errno));
goto done;
Expand Down
1 change: 1 addition & 0 deletions src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ libkvs_la_SOURCES = \
kvs.c \
kvs_lookup.c \
kvs_dir.c \
kvs_dir_private.h \
kvs_classic.c \
kvs_watch.c \
kvs_commit.c \
Expand Down
Loading

0 comments on commit 2694116

Please sign in to comment.