Skip to content

Commit

Permalink
common/libkvs: Add ns params to flux_kvs_copy/move
Browse files Browse the repository at this point in the history
Have flux_kvs_copy() and flux_kvs_move() take namespace parameters
for the source & destination keys.  This allows copying and moving
between different namespaces without need for using the "ns:" prefix.
  • Loading branch information
chu11 committed Jan 22, 2019
1 parent 4c7956b commit ad467bd
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ int cmd_copy (optparse_t *p, int argc, char **argv)
srckey = argv[optindex];
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_copy (h, srckey, dstkey, 0))
if (!(f = flux_kvs_copy (h, NULL, srckey, NULL, dstkey, 0))
|| flux_future_get (f, NULL) < 0)
log_err_exit ("flux_kvs_copy");
flux_future_destroy (f);
Expand All @@ -1649,7 +1649,7 @@ int cmd_move (optparse_t *p, int argc, char **argv)
srckey = argv[optindex];
dstkey = argv[optindex + 1];

if (!(f = flux_kvs_move (h, srckey, dstkey, 0))
if (!(f = flux_kvs_move (h, NULL, srckey, NULL, dstkey, 0))
|| flux_future_get (f, NULL) < 0)
log_err_exit ("flux_kvs_move");
flux_future_destroy (f);
Expand Down
77 changes: 60 additions & 17 deletions src/common/libkvs/kvs_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,44 @@
#include <flux/core.h>

#include "kvs_copy.h"
#include "kvs_commit_private.h"
#include "kvs_lookup_private.h"

struct copy_context {
int commit_flags;
char *srcns;
char *srckey;
char *dstns;
char *dstkey;
};

static void copy_context_destroy (struct copy_context *ctx)
{
if (ctx) {
int saved_errno = errno;
free (ctx->srcns);
free (ctx->srckey);
free (ctx->dstns);
free (ctx->dstkey);
free (ctx);
errno = saved_errno;
}
}

static struct copy_context *copy_context_create (const char *srckey,
static struct copy_context *copy_context_create (const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;

if (!(ctx = calloc (1, sizeof (*ctx))))
return NULL;
if (!(ctx->srckey = strdup (srckey)) || !(ctx->dstkey = strdup (dstkey))) {
if ((srcns && !(ctx->srcns = strdup (srcns)))
|| !(ctx->srckey = strdup (srckey))
|| (dstns && !(ctx->dstns = strdup (dstns)))
|| !(ctx->dstkey = strdup (dstkey))) {
copy_context_destroy (ctx);
return NULL;
}
Expand Down Expand Up @@ -76,8 +87,14 @@ static void copy_continuation (flux_future_t *f, void *arg)
goto error;
if (flux_kvs_txn_unlink (txn, 0, ctx->srckey) < 0)
goto error;
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
if (ctx->srcns) {
if (!(f2 = flux_kvs_commit_ns (h, ctx->srcns, ctx->commit_flags, txn)))
goto error;
}
else {
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
}
if (flux_future_continue (f, f2) < 0) {
flux_future_destroy (f2);
goto error;
Expand Down Expand Up @@ -110,8 +127,14 @@ static void lookup_continuation (flux_future_t *f, void *arg)
goto error;
if (flux_kvs_txn_put_treeobj (txn, 0, ctx->dstkey, val) < 0)
goto error;
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
if (ctx->dstns) {
if (!(f2 = flux_kvs_commit_ns (h, ctx->dstns, ctx->commit_flags, txn)))
goto error;
}
else {
if (!(f2 = flux_kvs_commit (h, ctx->commit_flags, txn)))
goto error;
}
if (flux_future_continue (f, f2) < 0) {
flux_future_destroy (f2);
goto error;
Expand All @@ -124,9 +147,12 @@ static void lookup_continuation (flux_future_t *f, void *arg)
flux_kvs_txn_destroy (txn);
}

flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags)
flux_future_t *flux_kvs_copy (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;
flux_future_t *f1;
Expand All @@ -136,9 +162,19 @@ flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
errno = EINVAL;
return NULL;
}
if (!(f1 = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
if (!(ctx = copy_context_create (srckey, dstkey, commit_flags)))
if (srcns) {
if (!(f1 = flux_kvs_lookup_ns (h, srcns, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
}
else {
if (!(f1 = flux_kvs_lookup (h, FLUX_KVS_TREEOBJ, srckey)))
return NULL;
}
if (!(ctx = copy_context_create (srcns,
srckey,
dstns,
dstkey,
commit_flags)))
goto error;
if (flux_aux_set (h, NULL, ctx, (flux_free_f)copy_context_destroy) < 0) {
copy_context_destroy (ctx);
Expand All @@ -152,9 +188,12 @@ flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
return NULL;
}

flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags)
flux_future_t *flux_kvs_move (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags)
{
struct copy_context *ctx;
flux_future_t *f1;
Expand All @@ -164,9 +203,13 @@ flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
errno = EINVAL;
return NULL;
}
if (!(f1 = flux_kvs_copy (h, srckey, dstkey, commit_flags)))
if (!(f1 = flux_kvs_copy (h, srcns, srckey, dstns, dstkey, commit_flags)))
return NULL;
if (!(ctx = copy_context_create (srckey, dstkey, commit_flags)))
if (!(ctx = copy_context_create (srcns,
srckey,
dstns,
dstkey,
commit_flags)))
goto error;
if (flux_aux_set (h, NULL, ctx, (flux_free_f)copy_context_destroy) < 0) {
copy_context_destroy (ctx);
Expand Down
37 changes: 25 additions & 12 deletions src/common/libkvs/kvs_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,39 @@
extern "C" {
#endif

/* Create a copy of 'srckey' at 'dstkey'.
/* Create a copy of 'srckey' at 'dstkey'. Read from / write to the
* specified namespaces. If a namespace is not specified (i.e. NULL),
* the namespace from flux_kvs_get_namespace() will be used.
*
* Due to the hash-tree design of the KVS, dstkey is by definition a
* "deep copy" (or writable snapshot) of all content below srckey.
* The copy operation has a low overhead since it only copies a single
* directory entry. 'srckey' and 'dstkey' may be in different namespaces.
* directory entry.
*
* Returns future on success, NULL on failure with errno set.
*/
flux_future_t *flux_kvs_copy (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags);
flux_future_t *flux_kvs_copy (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags);

/* Move 'srckey' to 'dstkey'.
* This is a copy followed by an unlink on 'srckey'.
* 'srckey' and 'dstkey' may be in different namespaces.
* The copy and unlink are not atomic.
/* Move 'srckey' to 'dstkey'. Read from / write to the
* specified namespaces. If a namespace is not specified (i.e. NULL),
* the namespace from flux_kvs_get_namespace() will be used.
*
* This is a copy followed by an unlink on 'srckey'. The copy and
* unlink are not atomic.
*
* Returns future on success, NULL on failure with errno set.
*/
flux_future_t *flux_kvs_move (flux_t *h, const char *srckey,
const char *dstkey,
int commit_flags);
flux_future_t *flux_kvs_move (flux_t *h,
const char *srcns,
const char *srckey,
const char *dstns,
const char *dstkey,
int commit_flags);

#ifdef __cplusplus
}
Expand Down
18 changes: 12 additions & 6 deletions src/common/libkvs/test/kvs_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,29 @@ int main (int argc, char *argv[])
plan (NO_PLAN);

errno = 0;
ok (flux_kvs_copy (NULL, "a", "b", 0) == NULL && errno == EINVAL,
ok (flux_kvs_copy (NULL, NULL, "a", NULL, "b", 0) == NULL
&& errno == EINVAL,
"flux_kvs_copy h=NULL fails with EINVAL");
errno = 0;
ok (flux_kvs_copy (h, NULL, "b", 0) == NULL && errno == EINVAL,
ok (flux_kvs_copy (h, NULL, NULL, NULL, "b", 0) == NULL
&& errno == EINVAL,
"flux_kvs_copy srckey=NULL fails with EINVAL");
errno = 0;
ok (flux_kvs_copy (h, "a", NULL, 0) == NULL && errno == EINVAL,
ok (flux_kvs_copy (h, NULL, "a", NULL, NULL, 0) == NULL
&& errno == EINVAL,
"flux_kvs_copy srckey=NULL fails with EINVAL");

errno = 0;
ok (flux_kvs_move (NULL, "a", "b", 0) == NULL && errno == EINVAL,
ok (flux_kvs_move (NULL, NULL, "a", NULL, "b", 0) == NULL
&& errno == EINVAL,
"flux_kvs_move h=NULL fails with EINVAL");
errno = 0;
ok (flux_kvs_move (h, NULL, "b", 0) == NULL && errno == EINVAL,
ok (flux_kvs_move (h, NULL, NULL, NULL, "b", 0) == NULL
&& errno == EINVAL,
"flux_kvs_move srckey=NULL fails with EINVAL");
errno = 0;
ok (flux_kvs_move (h, "a", NULL, 0) == NULL && errno == EINVAL,
ok (flux_kvs_move (h, NULL, "a", NULL, NULL, 0) == NULL
&& errno == EINVAL,
"flux_kvs_move srckey=NULL fails with EINVAL");

done_testing();
Expand Down

0 comments on commit ad467bd

Please sign in to comment.