Skip to content

Commit

Permalink
Merge pull request #1286 from chu11/issue1197-part1
Browse files Browse the repository at this point in the history
kvs: Refactor in preparation for private namespaces
  • Loading branch information
garlick authored Dec 6, 2017
2 parents 54205c6 + 26e6dc7 commit 57e24fa
Show file tree
Hide file tree
Showing 25 changed files with 1,403 additions and 166 deletions.
3 changes: 3 additions & 0 deletions doc/man1/flux-kvs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ arguments are described below.

COMMANDS
--------
*namespace-create* 'name' ['name...']::
Create a namespace in which kvs values will be read/written to.

*get* [-j|-r|-t] [-a treeobj] 'key' ['key...']::
Retrieve the value stored under 'key'. If nothing has been stored under
'key', display an error message. If no options, value is displayed with
Expand Down
3 changes: 2 additions & 1 deletion doc/man3/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ MAN3_FILES_PRIMARY = \
flux_future_create.3 \
flux_kvs_lookup.3 \
flux_kvs_commit.3 \
flux_kvs_txn_create.3
flux_kvs_txn_create.3 \
flux_kvs_namespace_create.3

# These files are generated as roff .so includes of a primary page.
# A2X handles this automatically if mentioned in NAME section
Expand Down
7 changes: 7 additions & 0 deletions doc/man3/flux_kvs_commit.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ block until the response has been received. Both accept an optional timeout.
was successful, indicating the entire transaction was committed, or -1
on failure, indicating none of the transaction was committed.
By default, both `flux_kvs_commit()` and `flux_kvs_fence()` operate on
the default KVS namespace. To use a different namespace, set the
environment variable FLUX_KVS_NAMESPACE to the namespace you wish to
use.
FLAGS
-----

Expand Down Expand Up @@ -83,6 +88,8 @@ A request was malformed.
ENOSYS::
The KVS module is not loaded.

ENOTSUP::
An unknown namespace was requested.

AUTHOR
------
Expand Down
7 changes: 7 additions & 0 deletions doc/man3/flux_kvs_lookup.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ _treeobj_ is a serialized RFC 11 object that references a particular
static set of content within the KVS, effectively a snapshot.
See `flux_kvs_lookup_get_treeobj()` below.
By default, both `flux_kvs_lookup()` and `flux_kvs_lookupat()` operate
on the default KVS namespace. To use a different namespace, set the
environment variable FLUX_KVS_NAMESPACE to the namespace you wish to
use.
All the functions below are variations on a common theme. First they
complete the lookup RPC by blocking on the response, if not already received.
Then they interpret the result in different ways. They may be called more
Expand Down Expand Up @@ -145,6 +150,8 @@ EFBIG::
ENOSYS::
The KVS module is not loaded.

ENOTSUP::
An unknown namespace was requested.

AUTHOR
------
Expand Down
74 changes: 74 additions & 0 deletions doc/man3/flux_kvs_namespace_create.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
flux_kvs_namespace_create(3)
============================
:doctype: manpage


NAME
----
flux_kvs_namespace_create - create a KVS namespace


SYNOPSIS
--------
#include <flux/core.h>

flux_future_t *flux_kvs_namespace_create (flux_t *h,
const char *namespace,
int flags);

DESCRIPTION
-----------
`flux_kvs_namespace_create()` creates a KVS namespace. Within a
namespace, users can get/put KVS values completely independent of
other KVS namespaces.
FLAGS
-----

The _flags_ mask is currently unused and should be set to 0.


RETURN VALUE
------------
`flux_kvs_namespace_create()` returns a `flux_future_t` on success, or
NULL on failure with errno set appropriately.
ERRORS
------

EINVAL::
One of the arguments was invalid.

ENOMEM::
Out of memory.

EPROTO::
A request was malformed.

ENOSYS::
The KVS module is not loaded.

EEXIST::
The namespace already exists.

AUTHOR
------
This page is maintained by the Flux community.
RESOURCES
---------
Github: <http://github.com/flux-framework>


COPYRIGHT
---------
include::COPYRIGHT.adoc[]
SEE ALSO
---------
flux_kvs_lookup(3), flux_kvs_commit(3)
2 changes: 2 additions & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,5 @@ vpack
dirref
lookups
mh
namespaces
ENOTSUP
30 changes: 30 additions & 0 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "src/common/libutil/readall.h"
#include "src/common/libkvs/treeobj.h"

int cmd_namespace_create (optparse_t *p, int argc, char **argv);
int cmd_get (optparse_t *p, int argc, char **argv);
int cmd_put (optparse_t *p, int argc, char **argv);
int cmd_unlink (optparse_t *p, int argc, char **argv);
Expand Down Expand Up @@ -169,6 +170,13 @@ static struct optparse_option unlink_opts[] = {
};

static struct optparse_subcommand subcommands[] = {
{ "namespace-create",
"name [name...]",
"Create a KVS namespace",
cmd_namespace_create,
0,
NULL
},
{ "get",
"[-j|-r|-t] [-a treeobj] key [key...]",
"Get value stored under key",
Expand Down Expand Up @@ -334,6 +342,28 @@ int main (int argc, char *argv[])
return (exitval);
}

int cmd_namespace_create (optparse_t *p, int argc, char **argv)
{
flux_t *h = (flux_t *)optparse_get_data (p, "flux_handle");
flux_future_t *f;
int optindex, i;

optindex = optparse_option_index (p);
if ((optindex - argc) == 0) {
optparse_print_usage (p);
exit (1);
}
for (i = optindex; i < argc; i++) {
const char *name = argv[i];
int flags = 0;
if (!(f = flux_kvs_namespace_create (h, name, flags))
|| flux_future_get (f, NULL) < 0)
log_err_exit ("%s", name);
flux_future_destroy (f);
}
return (0);
}

static void kv_printf (const char *key, int maxcol, const char *fmt, ...)
{
va_list ap;
Expand Down
6 changes: 6 additions & 0 deletions src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ noinst_LTLIBRARIES = libkvs.la

libkvs_la_SOURCES = \
kvs.c \
kvs_private.h \
kvs_lookup.c \
kvs_dir.c \
kvs_dir_private.h \
Expand All @@ -34,6 +35,7 @@ fluxcoreinclude_HEADERS = \
kvs_commit.h

TESTS = \
test_kvs.t \
test_kvs_txn.t \
test_kvs_lookup.t \
test_kvs_dir.t \
Expand All @@ -57,6 +59,10 @@ test_cppflags = \
$(AM_CPPFLAGS) \
-I$(top_srcdir)/src/common/libtap

test_kvs_t_SOURCES = test/kvs.c
test_kvs_t_CPPFLAGS = $(test_cppflags)
test_kvs_t_LDADD = $(test_ldadd) $(LIBDL)

test_kvs_txn_t_SOURCES = test/kvs_txn.c
test_kvs_txn_t_CPPFLAGS = $(test_cppflags)
test_kvs_txn_t_LDADD = $(test_ldadd) $(LIBDL)
Expand Down
31 changes: 28 additions & 3 deletions src/common/libkvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,36 @@
#endif
#include <flux/core.h>

const char *get_kvs_namespace (void)
{
if (getenv ("FLUX_KVS_NAMESPACE"))
return getenv ("FLUX_KVS_NAMESPACE");
return KVS_PRIMARY_NAMESPACE;
}

flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *namespace,
int flags)
{
if (!namespace || flags) {
errno = EINVAL;
return NULL;
}

return flux_rpc_pack (h, "kvs.namespace.create", 0, 0,
"{ s:s s:i }",
"namespace", namespace,
"flags", flags);
}

int flux_kvs_get_version (flux_t *h, int *versionp)
{
flux_future_t *f;
const char *namespace = get_kvs_namespace ();
int version;
int rc = -1;

if (!(f = flux_rpc (h, "kvs.getroot", NULL, FLUX_NODEID_ANY, 0)))
if (!(f = flux_rpc_pack (h, "kvs.getroot", FLUX_NODEID_ANY, 0, "{ s:s }",
"namespace", namespace)))
goto done;
if (flux_rpc_get_unpack (f, "{ s:i }", "rootseq", &version) < 0)
goto done;
Expand All @@ -48,10 +71,12 @@ int flux_kvs_get_version (flux_t *h, int *versionp)
int flux_kvs_wait_version (flux_t *h, int version)
{
flux_future_t *f;
const char *namespace = get_kvs_namespace ();
int ret = -1;

if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i }",
"rootseq", version)))
if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i s:s }",
"rootseq", version,
"namespace", namespace)))
goto done;
/* N.B. response contains (rootseq, rootref) but we don't need it.
*/
Expand Down
7 changes: 7 additions & 0 deletions src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@
extern "C" {
#endif

#define KVS_PRIMARY_NAMESPACE "primary"

enum {
FLUX_KVS_READDIR = 1,
FLUX_KVS_READLINK = 2,
FLUX_KVS_TREEOBJ = 16,
FLUX_KVS_APPEND = 32,
};

/* Namespace */
flux_future_t *flux_kvs_namespace_create (flux_t *h, const char *namespace,
int flags);

/* Synchronization:
* Process A commits data, then gets the store version V and sends it to B.
* Process B waits for the store version to be >= V, then reads data.
* To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE.
*/
int flux_kvs_get_version (flux_t *h, int *versionp);
int flux_kvs_wait_version (flux_t *h, int version);
Expand Down
9 changes: 7 additions & 2 deletions src/common/libkvs/kvs_commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,34 @@
#include <czmq.h>
#include <flux/core.h>

#include "kvs_private.h"
#include "kvs_txn_private.h"
#include "src/common/libutil/blobref.h"

flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
int nprocs, flux_kvs_txn_t *txn)
{
const char *namespace = get_kvs_namespace ();

if (txn) {
json_t *ops;
if (!(ops = txn_get_ops (txn))) {
errno = EINVAL;
return NULL;
}
return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0,
"{s:s s:i s:i s:O}",
"{s:s s:i s:s s:i s:O}",
"name", name,
"nprocs", nprocs,
"namespace", namespace,
"flags", flags,
"ops", ops);
} else {
return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0,
"{s:s s:i s:i s:[]}",
"{s:s s:i s:s s:i s:[]}",
"name", name,
"nprocs", nprocs,
"namespace", namespace,
"flags", flags,
"ops");
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/libkvs/kvs_commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ enum kvs_commit_flags {
FLUX_KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */
};

/* To use an alternate namespace, set environment variable FLUX_KVS_NAMESPACE */

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn);

flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
Expand Down
20 changes: 14 additions & 6 deletions src/common/libkvs/kvs_lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <czmq.h>
#include <flux/core.h>

#include "kvs_private.h"
#include "kvs_dir_private.h"
#include "kvs_lookup.h"
#include "treeobj.h"
Expand Down Expand Up @@ -99,16 +100,19 @@ flux_future_t *flux_kvs_lookup (flux_t *h, int flags, const char *key)
{
struct lookup_ctx *ctx;
flux_future_t *f;
const char *namespace = get_kvs_namespace ();

if (!h || !key || strlen (key) == 0 || validate_lookup_flags (flags) < 0) {
errno = EINVAL;
return NULL;
}
if (!(ctx = alloc_ctx (h, flags, key)))
return NULL;
if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0, "{s:s s:i}",
"key", key,
"flags", flags))) {
if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0,
"{s:s s:s s:i}",
"key", key,
"namespace", namespace,
"flags", flags))) {
free_ctx (ctx);
return NULL;
}
Expand Down Expand Up @@ -140,16 +144,20 @@ flux_future_t *flux_kvs_lookupat (flux_t *h, int flags, const char *key,
}
}
else {
const char *namespace = get_kvs_namespace ();

if (!(ctx->atref = strdup (treeobj)))
return NULL;
if (!(obj = json_loads (treeobj, 0, NULL))) {
errno = EINVAL;
return NULL;
}
if (!(f = flux_rpc_pack (h, "kvs.get", FLUX_NODEID_ANY, 0,
"{s:s s:i s:O}", "key", key,
"flags", flags,
"rootdir", obj))) {
"{s:s s:s s:i s:O}",
"key", key,
"namespace", namespace,
"flags", flags,
"rootdir", obj))) {
free_ctx (ctx);
json_decref (obj);
return NULL;
Expand Down
Loading

0 comments on commit 57e24fa

Please sign in to comment.