From f3ea2f7b8ab37f72c1999506ddceca200004892b Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 23 Jun 2017 14:14:59 -0700 Subject: [PATCH] libkvs/txn: add KVS transaction object Rework the "write" side of the KVS API in terms of an explicit transaction object as discussed in #1094. The interface is essentially - create txn - append ops to txn: put, pack, mkdir, unlink, symlink - commit/fence txn - destroy txn Also: rename the KVS_NOMERGE flag to FLUX_KVS_NOMERGE. Provide a private interface so that unit tests can examine internal contents of a commit. --- src/common/libkvs/Makefile.am | 7 +- src/common/libkvs/kvs.h | 7 +- src/common/libkvs/kvs_txn.c | 325 ++++++++++++++++++++++++++++ src/common/libkvs/kvs_txn.h | 39 ++++ src/common/libkvs/kvs_txn_private.h | 12 + src/modules/kvs/commit.c | 2 +- src/modules/kvs/fence.c | 3 +- src/modules/kvs/test/commit.c | 4 +- src/modules/kvs/test/fence.c | 4 +- t/kvs/basic.c | 2 +- t/kvs/commit.c | 2 +- t/kvs/commitmerge.c | 2 +- 12 files changed, 391 insertions(+), 18 deletions(-) create mode 100644 src/common/libkvs/kvs_txn.c create mode 100644 src/common/libkvs/kvs_txn.h create mode 100644 src/common/libkvs/kvs_txn_private.h diff --git a/src/common/libkvs/Makefile.am b/src/common/libkvs/Makefile.am index 13425862929c..0ef7322e7b88 100644 --- a/src/common/libkvs/Makefile.am +++ b/src/common/libkvs/Makefile.am @@ -22,14 +22,17 @@ libkvs_la_SOURCES = \ kvs_classic.c \ kvs_watch.c \ jansson_dirent.c \ - jansson_dirent.h + jansson_dirent.h \ + kvs_txn.c \ + kvs_txn_private.h fluxcoreinclude_HEADERS = \ kvs.h \ kvs_lookup.h \ kvs_dir.h \ kvs_watch.h \ - kvs_classic.h + kvs_classic.h \ + kvs_txn.h TESTS = \ test_proto.t \ diff --git a/src/common/libkvs/kvs.h b/src/common/libkvs/kvs.h index b35b414590b1..d3a108ebef6a 100644 --- a/src/common/libkvs/kvs.h +++ b/src/common/libkvs/kvs.h @@ -9,12 +9,7 @@ #include "kvs_dir.h" #include "kvs_classic.h" #include "kvs_watch.h" - -/* Flags for commit and fence operations - */ -enum flux_kvs_flags { - KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */ -}; +#include "kvs_txn.h" /* kvs_put() and kvs_put_string() both make copies of the value argument * The caller retains ownership of the original. diff --git a/src/common/libkvs/kvs_txn.c b/src/common/libkvs/kvs_txn.c new file mode 100644 index 000000000000..49e96edeb727 --- /dev/null +++ b/src/common/libkvs/kvs_txn.c @@ -0,0 +1,325 @@ +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include + +#include "kvs_txn_private.h" +#include "jansson_dirent.h" + +#include "src/common/libutil/blobref.h" + +struct flux_kvs_txn { + json_t *ops; + int cursor; +}; + +void flux_kvs_txn_destroy (flux_kvs_txn_t *txn) +{ + if (txn) { + int saved_errno = errno; + json_decref (txn->ops); + free (txn); + errno = saved_errno; + } +} + +flux_kvs_txn_t *flux_kvs_txn_create (void) +{ + flux_kvs_txn_t *txn = calloc (1, sizeof (*txn)); + if (!txn) { + errno = ENOMEM; + goto error; + } + if (!(txn->ops = json_array ())) { + errno = ENOMEM; + goto error; + } + return txn; +error: + flux_kvs_txn_destroy (txn); + return NULL; +} + +static int validate_flags (int flags, int allowed) +{ + if ((flags & allowed) != flags) { + errno = EINVAL; + return -1; + } + return 0; +} + +static int validate_op (json_t *op) +{ + const char *key; + json_t *dirent = NULL; + + if (json_unpack (op, "{s:s}", "key", &key) < 0) + goto error; + if (strlen (key) == 0) + goto error; + if (json_unpack (op, "{s:n}", "dirent") == 0) + ; // unlink sets dirent NULL + else if (json_unpack (op, "{s:o}", "dirent", &dirent) == 0) { + if (j_dirent_validate (dirent) < 0) + goto error; + } else + goto error; + return 0; +error: + errno = EINVAL; + return -1; +} + +int flux_kvs_txn_put (flux_kvs_txn_t *txn, int flags, + const char *key, const char *json_str) +{ + json_t *val; + json_t *op = NULL; + + if (!txn || !key) { + errno = EINVAL; + goto error; + } + if (validate_flags (flags, FLUX_KVS_TREEOBJ) < 0) + goto error; + if (!json_str) + return flux_kvs_txn_unlink (txn, flags, key); + if (!(val = json_loads (json_str, JSON_DECODE_ANY, NULL))) { + errno = EINVAL; + goto error; + } + if ((flags & FLUX_KVS_TREEOBJ)) + op = json_pack ("{s:s s:o}", "key", key, + "dirent", val); + else + op = json_pack ("{s:s s:{s:o}}", "key", key, + "dirent", "FILEVAL", val); + if (!op) { + json_decref (val); + errno = ENOMEM; + goto error; + } + if (validate_op (op) < 0) { + json_decref (op); + goto error; + } + if (json_array_append_new (txn->ops, op) < 0) { + json_decref (op); + errno = ENOMEM; + goto error; + } + return 0; +error: + return -1; +} + +int flux_kvs_txn_pack (flux_kvs_txn_t *txn, int flags, + const char *key, const char *fmt, ...) +{ + va_list ap; + json_t *val; + json_t *op = NULL; + + if (!txn || !key | !fmt) { + errno = EINVAL; + goto error; + } + if (validate_flags (flags, FLUX_KVS_TREEOBJ) < 0) + goto error; + va_start (ap, fmt); + val = json_vpack_ex (NULL, 0, fmt, ap); + va_end (ap); + if (!val) { + errno = EINVAL; + goto error; + } + if ((flags & FLUX_KVS_TREEOBJ)) + op = json_pack ("{s:s s:o}", "key", key, + "dirent", val); + else + op = json_pack ("{s:s s:{s:o}}", "key", key, + "dirent", "FILEVAL", val); + if (!op) { + json_decref (val); + errno = ENOMEM; + goto error; + } + if (validate_op (op) < 0) { + json_decref (op); + goto error; + } + if (json_array_append_new (txn->ops, op) < 0) { + json_decref (op); + errno = ENOMEM; + goto error; + } + return 0; +error: + return -1; +} + +int flux_kvs_txn_mkdir (flux_kvs_txn_t *txn, int flags, + const char *key) +{ + json_t *op; + + if (!txn || !key) { + errno = EINVAL; + goto error; + } + if (validate_flags (flags, 0) < 0) + goto error; + if (!(op = json_pack ("{s:s s:{s:{}}}", "key", key, + "dirent", "DIRVAL"))) { + errno = ENOMEM; + goto error; + } + if (validate_op (op) < 0) { + json_decref (op); + goto error; + } + if (json_array_append_new (txn->ops, op) < 0) { + json_decref (op); + errno = ENOMEM; + goto error; + } + return 0; +error: + return -1; +} + +int flux_kvs_txn_unlink (flux_kvs_txn_t *txn, int flags, + const char *key) +{ + json_t *op; + + if (!txn || !key) { + errno = EINVAL; + goto error; + } + if (validate_flags (flags, 0) < 0) + goto error; + if (!(op = json_pack ("{s:s s:n}", "key", key, + "dirent"))) { + errno = ENOMEM; + goto error; + } + if (validate_op (op) < 0) { + json_decref (op); + goto error; + } + if (json_array_append_new (txn->ops, op) < 0) { + json_decref (op); + errno = ENOMEM; + goto error; + } + return 0; +error: + return -1; +} + +int flux_kvs_txn_symlink (flux_kvs_txn_t *txn, int flags, + const char *key, const char *target) +{ + json_t *op; + + if (!txn || !key | !target) { + errno = EINVAL; + goto error; + } + if (validate_flags (flags, 0) < 0) + goto error; + if (!(op = json_pack ("{s:s s:{s:s}}", "key", key, + "dirent", "LINKVAL", target))) { + errno = EINVAL; + goto error; + } + if (validate_op (op) < 0) { + json_decref (op); + goto error; + } + if (json_array_append_new (txn->ops, op) < 0) { + json_decref (op); + errno = ENOMEM; + goto error; + } + return 0; +error: + return -1; +} + +/* for unit tests + */ +int txn_get (flux_kvs_txn_t *txn, int request, void *arg) +{ + switch (request) { + case TXN_GET_FIRST: + txn->cursor = 0; + if (arg) + *(json_t **)arg = json_array_get (txn->ops, txn->cursor); + txn->cursor++; + break; + case TXN_GET_NEXT: + if (arg) + *(json_t **)arg = json_array_get (txn->ops, txn->cursor); + txn->cursor++; + break; + case TXN_GET_ALL: + if (arg) + *(json_t **)arg = txn->ops; + break; + default: + errno = EINVAL; + return -1; + } + return 0; +} + +flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name, + int nprocs, flux_kvs_txn_t *txn) +{ + flux_kvs_txn_t *empty_txn = NULL; + flux_future_t *f = NULL; + int saved_errno; + + if (!txn && !(txn = empty_txn = flux_kvs_txn_create ())) + goto done; + if (!(f = flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0, + "{s:s s:i s:i s:O}", + "name", name, + "nprocs", nprocs, + "flags", flags, + "ops", txn->ops))) + goto done; +done: + saved_errno = errno; + flux_kvs_txn_destroy (empty_txn); + errno = saved_errno; + return f; +} + +flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn) +{ + zuuid_t *uuid; + flux_future_t *f = NULL; + int saved_errno; + + if (!(uuid = zuuid_new ())) { + errno = ENOMEM; + goto done; + } + if (!(f = flux_kvs_fence (h, flags, zuuid_str (uuid), 1, txn))) + goto done; +done: + saved_errno = errno; + zuuid_destroy (&uuid); + errno = saved_errno; + return f; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libkvs/kvs_txn.h b/src/common/libkvs/kvs_txn.h new file mode 100644 index 000000000000..872723e5d91d --- /dev/null +++ b/src/common/libkvs/kvs_txn.h @@ -0,0 +1,39 @@ +#ifndef _FLUX_CORE_KVS_TXN_H +#define _FLUX_CORE_KVS_TXN_H + +#include + +enum kvs_commit_flags { + FLUX_KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */ +}; + +typedef struct flux_kvs_txn flux_kvs_txn_t; + +flux_kvs_txn_t *flux_kvs_txn_create (void); +void flux_kvs_txn_destroy (flux_kvs_txn_t *txn); + +int flux_kvs_txn_put (flux_kvs_txn_t *txn, int flags, + const char *key, const char *json_str); + +int flux_kvs_txn_pack (flux_kvs_txn_t *txn, int flags, + const char *key, const char *fmt, ...); + +int flux_kvs_txn_mkdir (flux_kvs_txn_t *txn, int flags, + const char *key); + +int flux_kvs_txn_unlink (flux_kvs_txn_t *txn, int flags, + const char *key); + +int flux_kvs_txn_symlink (flux_kvs_txn_t *txn, int flags, + const char *key, const char *target); + +flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn); + +flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name, + int nprocs, flux_kvs_txn_t *txn); + +#endif /* !_FLUX_CORE_KVS_TXN_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libkvs/kvs_txn_private.h b/src/common/libkvs/kvs_txn_private.h new file mode 100644 index 000000000000..2bad5f8b65aa --- /dev/null +++ b/src/common/libkvs/kvs_txn_private.h @@ -0,0 +1,12 @@ +#ifndef _KVS_TXN_PRIVATE_H +#define _KVS_TXN_PRIVATE_H + +/* interface for unit tests only */ +enum { TXN_GET_FIRST, TXN_GET_NEXT, TXN_GET_ALL }; +int txn_get (flux_kvs_txn_t *txn, int request, void *arg); + +#endif /* !_KVS_TXN_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/kvs/commit.c b/src/modules/kvs/commit.c index af3da4994a31..0d9a3f8e2e5f 100644 --- a/src/modules/kvs/commit.c +++ b/src/modules/kvs/commit.c @@ -619,7 +619,7 @@ void commit_mgr_merge_ready_commits (commit_mgr_t *cm) if (c && c->errnum == 0 && c->state <= COMMIT_STATE_APPLY_OPS - && !(fence_get_flags (c->f) & KVS_NO_MERGE)) { + && !(fence_get_flags (c->f) & FLUX_KVS_NO_MERGE)) { commit_t *nc; nc = zlist_pop (cm->ready); assert (nc == c); diff --git a/src/modules/kvs/fence.c b/src/modules/kvs/fence.c index 3f3df74e3258..9bd7428e0749 100644 --- a/src/modules/kvs/fence.c +++ b/src/modules/kvs/fence.c @@ -154,8 +154,7 @@ int fence_merge (fence_t *dest, fence_t *src) { int i, len; - if (dest->flags & KVS_NO_MERGE - || src->flags & KVS_NO_MERGE) + if ((dest->flags & FLUX_KVS_NO_MERGE) || (src->flags & FLUX_KVS_NO_MERGE)) return 0; if (Jget_ar_len (src->names, &len)) { diff --git a/src/modules/kvs/test/commit.c b/src/modules/kvs/test/commit.c index 202ede91a4a7..900644f738b6 100644 --- a/src/modules/kvs/test/commit.c +++ b/src/modules/kvs/test/commit.c @@ -232,7 +232,7 @@ void commit_mgr_merge_tests (void) /* test unsuccessful merge */ - create_ready_commit (cm, "fence1", "key1", "1", KVS_NO_MERGE); + create_ready_commit (cm, "fence1", "key1", "1", FLUX_KVS_NO_MERGE); create_ready_commit (cm, "fence2", "key2", "2", 0); commit_mgr_merge_ready_commits (cm); @@ -258,7 +258,7 @@ void commit_mgr_merge_tests (void) /* test unsuccessful merge */ create_ready_commit (cm, "fence1", "key1", "1", 0); - create_ready_commit (cm, "fence2", "key2", "2", KVS_NO_MERGE); + create_ready_commit (cm, "fence2", "key2", "2", FLUX_KVS_NO_MERGE); commit_mgr_merge_ready_commits (cm); diff --git a/src/modules/kvs/test/fence.c b/src/modules/kvs/test/fence.c index f171478c440c..d4efaaf14afe 100644 --- a/src/modules/kvs/test/fence.c +++ b/src/modules/kvs/test/fence.c @@ -259,7 +259,7 @@ void merge_tests (void) fence_destroy (f1); fence_destroy (f2); - f1 = create_fence ("foo", "A", KVS_NO_MERGE); + f1 = create_fence ("foo", "A", FLUX_KVS_NO_MERGE); f2 = create_fence ("bar", "B", 0); ok (fence_merge (f1, f2) == 0, @@ -269,7 +269,7 @@ void merge_tests (void) fence_destroy (f2); f1 = create_fence ("foo", "A", 0); - f2 = create_fence ("bar", "B", KVS_NO_MERGE); + f2 = create_fence ("bar", "B", FLUX_KVS_NO_MERGE); ok (fence_merge (f1, f2) == 0, "fence_merge no merge"); diff --git a/t/kvs/basic.c b/t/kvs/basic.c index 3933a7e49789..29e5383c49b8 100644 --- a/t/kvs/basic.c +++ b/t/kvs/basic.c @@ -301,7 +301,7 @@ void cmd_put_common (flux_t *h, int argc, char **argv, bool mergeable) log_err_exit ("%s", key); } free (key); - if (kvs_commit (h, mergeable ? 0 : KVS_NO_MERGE) < 0) + if (kvs_commit (h, mergeable ? 0 : FLUX_KVS_NO_MERGE) < 0) log_err_exit ("kvs_commit"); } diff --git a/t/kvs/commit.c b/t/kvs/commit.c index cd9b2471965d..b9429f3bbf05 100644 --- a/t/kvs/commit.c +++ b/t/kvs/commit.c @@ -104,7 +104,7 @@ void *thread (void *arg) if (kvs_put_int (t->h, key, 42) < 0) log_err_exit ("%s", key); if (nopt && (i % nopt_divisor) == 0) - flags |= KVS_NO_MERGE; + flags |= FLUX_KVS_NO_MERGE; else flags = 0; if (fopt) { diff --git a/t/kvs/commitmerge.c b/t/kvs/commitmerge.c index fd2995a917b0..e353720f2b5f 100644 --- a/t/kvs/commitmerge.c +++ b/t/kvs/commitmerge.c @@ -200,7 +200,7 @@ void *committhread (void *arg) if (kvs_put_int (t->h, key, t->n) < 0) log_err_exit ("%s", key); - if (kvs_commit (t->h, nopt ? KVS_NO_MERGE : 0) < 0) + if (kvs_commit (t->h, nopt ? FLUX_KVS_NO_MERGE : 0) < 0) log_err_exit ("kvs_commit"); flux_close (t->h);