Skip to content

Commit

Permalink
libkvs/txn: add KVS transaction object
Browse files Browse the repository at this point in the history
Rework the "write" side of the KVS API in terms of an
explicit transaction object as discussed in flux-framework#1094.
The interface is essentially
- create txn
- append ops to txn:  put, pack, link, mkdir, unlink, symlink
- commit/fence txn
- destroy txn

Also: rename the KVS_NOMERGE flag to FLUX_KVS_NOMERGE.

Provide a private interface so that unit tests can
examine internal contents of a commit.
  • Loading branch information
garlick committed Jul 17, 2017
1 parent 014ad21 commit cca8b4f
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 18 deletions.
7 changes: 5 additions & 2 deletions src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ libkvs_la_SOURCES = \
kvs_classic.c \
kvs_watch.c \
jansson_dirent.c \
jansson_dirent.h
jansson_dirent.h \
kvs_txn.c \
kvs_txn_private.h

fluxcoreinclude_HEADERS = \
kvs.h \
kvs_lookup.h \
kvs_dir.h \
kvs_watch.h \
kvs_classic.h
kvs_classic.h \
kvs_txn.h

TESTS = \
test_proto.t \
Expand Down
7 changes: 1 addition & 6 deletions src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@
#include "kvs_dir.h"
#include "kvs_classic.h"
#include "kvs_watch.h"

/* Flags for commit and fence operations
*/
enum flux_kvs_flags {
KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */
};
#include "kvs_txn.h"

/* kvs_put() and kvs_put_string() both make copies of the value argument
* The caller retains ownership of the original.
Expand Down
289 changes: 289 additions & 0 deletions src/common/libkvs/kvs_txn.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <jansson.h>
#include <czmq.h>
#include <flux/core.h>

#include "kvs_txn_private.h"
#include "jansson_dirent.h"

#include "src/common/libutil/blobref.h"

struct flux_kvs_txn {
json_t *ops;
int cursor;
};

void flux_kvs_txn_destroy (flux_kvs_txn_t *txn)
{
if (txn) {
int saved_errno = errno;
json_decref (txn->ops);
free (txn);
errno = saved_errno;
}
}

flux_kvs_txn_t *flux_kvs_txn_create (void)
{
flux_kvs_txn_t *txn = calloc (1, sizeof (*txn));
if (!txn) {
errno = ENOMEM;
goto error;
}
if (!(txn->ops = json_array ())) {
errno = ENOMEM;
goto error;
}
return txn;
error:
flux_kvs_txn_destroy (txn);
return NULL;
}

int flux_kvs_txn_put (flux_kvs_txn_t *txn, int flags,
const char *key, const char *json_str)
{
json_t *val;
json_t *op = NULL;

if (!txn || !key) {
errno = EINVAL;
goto error;
}
if (!json_str)
return flux_kvs_txn_unlink (txn, flags, key);
if (!(val = json_loads (json_str, JSON_DECODE_ANY, NULL))) {
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:{s:o}}", "key", key,
"dirent", "FILEVAL", val))) {
json_decref (val);
errno = ENOMEM;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

int flux_kvs_txn_pack (flux_kvs_txn_t *txn, int flags,
const char *key, const char *fmt, ...)
{
va_list ap;
json_t *val;
json_t *op = NULL;

if (!txn || !key | !fmt) {
errno = EINVAL;
goto error;
}
va_start (ap, fmt);
val = json_vpack_ex (NULL, 0, fmt, ap);
va_end (ap);
if (!val) {
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:{s:o}}", "key", key,
"dirent", "FILEVAL", val))) {
json_decref (val);
errno = ENOMEM;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

int flux_kvs_txn_link (flux_kvs_txn_t *txn, int flags,
const char *key, const char *json_str)
{
json_t *val;
json_t *op = NULL;

if (!txn || !key | !json_str) {
errno = EINVAL;
goto error;
}
if (!(val = json_loads (json_str, JSON_DECODE_ANY, NULL))) {
errno = EINVAL;
goto error;
}
if (j_dirent_validate (val) < 0) {
json_decref (val);
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:o}", "key", key,
"dirent", val))) {
json_decref (val);
errno = ENOMEM;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

int flux_kvs_txn_mkdir (flux_kvs_txn_t *txn, int flags,
const char *key)
{
json_t *op;

if (!txn || !key) {
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:{s:{}}}", "key", key,
"dirent", "DIRVAL"))) {
errno = ENOMEM;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

int flux_kvs_txn_unlink (flux_kvs_txn_t *txn, int flags,
const char *key)
{
json_t *op;

if (!txn || !key) {
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:n}", "key", key,
"dirent"))) {
errno = ENOMEM;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

int flux_kvs_txn_symlink (flux_kvs_txn_t *txn, int flags,
const char *key, const char *target)
{
json_t *op;

if (!txn || !key | !target) {
errno = EINVAL;
goto error;
}
if (!(op = json_pack ("{s:s s:{s:s}}", "key", key,
"dirent", "LINKVAL", target))) {
errno = EINVAL;
goto error;
}
if (json_array_append_new (txn->ops, op) < 0) {
json_decref (op);
errno = ENOMEM;
goto error;
}
return 0;
error:
return -1;
}

/* for unit tests
*/
int txn_get (flux_kvs_txn_t *txn, int request, void *arg)
{
switch (request) {
case TXN_GET_FIRST:
txn->cursor = 0;
if (arg)
*(json_t **)arg = json_array_get (txn->ops, txn->cursor);
txn->cursor++;
break;
case TXN_GET_NEXT:
if (arg)
*(json_t **)arg = json_array_get (txn->ops, txn->cursor);
txn->cursor++;
break;
case TXN_GET_ALL:
if (arg)
*(json_t **)arg = txn->ops;
break;
default:
errno = EINVAL;
return -1;
}
return 0;
}

flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
int nprocs, flux_kvs_txn_t *txn)
{
flux_kvs_txn_t *empty_txn = NULL;
flux_future_t *f = NULL;
int saved_errno;

if (!txn && !(txn = empty_txn = flux_kvs_txn_create ()))
goto done;
if (!(f = flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0,
"{s:s s:i s:i s:O}",
"name", name,
"nprocs", nprocs,
"flags", flags,
"ops", txn->ops)))
goto done;
done:
saved_errno = errno;
flux_kvs_txn_destroy (empty_txn);
errno = saved_errno;
return f;
}

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
{
zuuid_t *uuid;
flux_future_t *f = NULL;
int saved_errno;

if (!(uuid = zuuid_new ())) {
errno = ENOMEM;
goto done;
}
if (!(f = flux_kvs_fence (h, flags, zuuid_str (uuid), 1, txn)))
goto done;
done:
saved_errno = errno;
zuuid_destroy (&uuid);
errno = saved_errno;
return f;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
42 changes: 42 additions & 0 deletions src/common/libkvs/kvs_txn.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef _FLUX_CORE_KVS_TXN_H
#define _FLUX_CORE_KVS_TXN_H

#include <flux/core.h>

enum kvs_commit_flags {
FLUX_KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */
};

typedef struct flux_kvs_txn flux_kvs_txn_t;

flux_kvs_txn_t *flux_kvs_txn_create (void);
void flux_kvs_txn_destroy (flux_kvs_txn_t *txn);

int flux_kvs_txn_put (flux_kvs_txn_t *txn, int flags,
const char *key, const char *json_str);

int flux_kvs_txn_pack (flux_kvs_txn_t *txn, int flags,
const char *key, const char *fmt, ...);

int flux_kvs_txn_link (flux_kvs_txn_t *txn, int flags,
const char *key, const char *json_str);

int flux_kvs_txn_mkdir (flux_kvs_txn_t *txn, int flags,
const char *key);

int flux_kvs_txn_unlink (flux_kvs_txn_t *txn, int flags,
const char *key);

int flux_kvs_txn_symlink (flux_kvs_txn_t *txn, int flags,
const char *key, const char *target);

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn);

flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
int nprocs, flux_kvs_txn_t *txn);

#endif /* !_FLUX_CORE_KVS_TXN_H */

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
12 changes: 12 additions & 0 deletions src/common/libkvs/kvs_txn_private.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#ifndef _KVS_TXN_PRIVATE_H
#define _KVS_TXN_PRIVATE_H

/* interface for unit tests only */
enum { TXN_GET_FIRST, TXN_GET_NEXT, TXN_GET_ALL };
int txn_get (flux_kvs_txn_t *txn, int request, void *arg);

#endif /* !_KVS_TXN_PRIVATE_H */

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
2 changes: 1 addition & 1 deletion src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ void commit_mgr_merge_ready_commits (commit_mgr_t *cm)
if (c
&& c->errnum == 0
&& c->state <= COMMIT_STATE_APPLY_OPS
&& !(fence_get_flags (c->f) & KVS_NO_MERGE)) {
&& !(fence_get_flags (c->f) & FLUX_KVS_NO_MERGE)) {
commit_t *nc;
nc = zlist_pop (cm->ready);
assert (nc == c);
Expand Down
3 changes: 1 addition & 2 deletions src/modules/kvs/fence.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ int fence_merge (fence_t *dest, fence_t *src)
{
int i, len;

if (dest->flags & KVS_NO_MERGE
|| src->flags & KVS_NO_MERGE)
if ((dest->flags & FLUX_KVS_NO_MERGE) || (src->flags & FLUX_KVS_NO_MERGE))
return 0;

if (Jget_ar_len (src->names, &len)) {
Expand Down
Loading

0 comments on commit cca8b4f

Please sign in to comment.