Skip to content

Commit

Permalink
Merge pull request #1458 from chu11/issue1446
Browse files Browse the repository at this point in the history
kvs: do not generate uuid for commit transactions, use fixed algorithm for 'name'
  • Loading branch information
grondo authored Apr 13, 2018
2 parents 6293cac + 2e4877b commit c2502c4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 42 deletions.
39 changes: 9 additions & 30 deletions src/common/libkvs/kvs_commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,48 +62,27 @@ flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
{
zuuid_t *uuid = NULL;
const char *namespace;
const char *name;
flux_future_t *f = NULL;
json_t *ops;
int saved_errno = 0;

if (!txn) {
errno = EINVAL;
return NULL;
}

if (!(uuid = zuuid_new ())) {
saved_errno = errno;
goto cleanup;
}
name = zuuid_str (uuid);

if (!(namespace = flux_kvs_get_namespace (h))) {
saved_errno = errno;
goto cleanup;
}
if (!(namespace = flux_kvs_get_namespace (h)))
return NULL;

if (!(ops = txn_get_ops (txn))) {
saved_errno = EINVAL;
goto cleanup;
}
if (!(f = flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
"{s:s s:s s:i s:O}",
"name", name,
"namespace", namespace,
"flags", flags,
"ops", ops))) {
saved_errno = errno;
goto cleanup;
errno = EINVAL;
return NULL;
}

cleanup:
zuuid_destroy (&uuid);
if (saved_errno)
errno = saved_errno;
return f;
return flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
"{s:s s:i s:O}",
"namespace", namespace,
"flags", flags,
"ops", ops);
}

/*
Expand Down
13 changes: 6 additions & 7 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ typedef struct {
int transaction_merge;
bool events_init; /* flag */
const char *hash_name;
unsigned int seq; /* for commit transactions */
} kvs_ctx_t;

struct kvs_cb_data {
Expand Down Expand Up @@ -1818,16 +1819,14 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
kvs_ctx_t *ctx = arg;
struct kvsroot *root;
const char *namespace;
const char *name;
int saved_errno, flags;
bool stall = false;
json_t *ops = NULL;
treq_t *tr;
char *alt_ns = NULL;

if (flux_request_unpack (msg, NULL, "{ s:o s:s s:s s:i }",
if (flux_request_unpack (msg, NULL, "{ s:o s:s s:i }",
"ops", &ops,
"name", &name,
"namespace", &namespace,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
Expand All @@ -1847,8 +1846,8 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(tr = treq_create (name, 1, flags))) {
flux_log_error (h, "%s: treq_create", __FUNCTION__);
if (!(tr = treq_create_rank (ctx->rank, ctx->seq++, 1, flags))) {
flux_log_error (h, "%s: treq_create_rank", __FUNCTION__);
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
Expand All @@ -1874,7 +1873,7 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
treq_set_processed (tr, true);

if (kvstxn_mgr_add_transaction (root->ktm,
name,
treq_get_name (tr),
ops,
flags) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
Expand All @@ -1889,7 +1888,7 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (!(f = flux_rpc_pack (h, "kvs.relaycommit", 0, FLUX_RPC_NORESPONSE,
"{ s:O s:s s:s s:i }",
"ops", ops,
"name", name,
"name", treq_get_name (tr),
"namespace", alt_ns ? alt_ns : namespace,
"flags", flags))) {
flux_log_error (h, "%s: flux_rpc_pack", __FUNCTION__);
Expand Down
15 changes: 15 additions & 0 deletions src/modules/kvs/test/treq.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ void treq_basic_tests (void)
flux_msg_destroy (request);

treq_destroy (tr);

ok (treq_create_rank (1, 2, -1, 0) == NULL,
"treq_create_rank fails on bad input");

ok ((tr = treq_create_rank (214, 3577, 2, 4)) != NULL,
"treq_create_rank works");

ok ((name = treq_get_name (tr)) != NULL,
"treq_get_name works");

ok (strstr (name, "214") != NULL,
"treq_get_name returns name with rank in it");

ok (strstr (name, "3577") != NULL,
"treq_get_name returns name with seq in it");
}

void treq_ops_tests (void)
Expand Down
55 changes: 50 additions & 5 deletions src/modules/kvs/treq.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ void treq_destroy (treq_t *tr)
}
}

treq_t *treq_create (const char *name, int nprocs, int flags)
static treq_t *treq_create_common (int nprocs, int flags)
{
treq_t *tr = NULL;
int saved_errno;

if (!name || nprocs <= 0) {
if (nprocs <= 0) {
saved_errno = EINVAL;
goto error;
}
Expand All @@ -210,13 +210,58 @@ treq_t *treq_create (const char *name, int nprocs, int flags)
saved_errno = ENOMEM;
goto error;
}
tr->nprocs = nprocs;
tr->flags = flags;
tr->processed = false;

return tr;
error:
treq_destroy (tr);
errno = saved_errno;
return NULL;
}

treq_t *treq_create (const char *name, int nprocs, int flags)
{
treq_t *tr = NULL;
int saved_errno;

if (!name) {
saved_errno = EINVAL;
goto error;
}

if (!(tr = treq_create_common (nprocs, flags))) {
saved_errno = EINVAL;
goto error;
}

if (!(tr->name = strdup (name))) {
saved_errno = ENOMEM;
goto error;
}
tr->nprocs = nprocs;
tr->flags = flags;
tr->processed = false;

return tr;
error:
treq_destroy (tr);
errno = saved_errno;
return NULL;
}

treq_t *treq_create_rank (uint32_t rank, unsigned int seq, int nprocs, int flags)
{
treq_t *tr = NULL;
int saved_errno;

if (!(tr = treq_create_common (nprocs, flags))) {
saved_errno = EINVAL;
goto error;
}

if (asprintf (&(tr->name), "treq.%u.%u", rank, seq) < 0) {
saved_errno = ENOMEM;
goto error;
}

return tr;
error:
Expand Down
4 changes: 4 additions & 0 deletions src/modules/kvs/treq.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ int treq_mgr_transactions_count (treq_mgr_t *trm);
* treq_t API
*/

/* treq_create - name is passed in */
treq_t *treq_create (const char *name, int nprocs, int flags);

/* treq_create_rank - internally will create name based on rank & seq */
treq_t *treq_create_rank (uint32_t rank, unsigned int seq, int nprocs, int flags);

void treq_destroy (treq_t *tr);

/* if number of calls to treq_add_request_ops() is == nprocs */
Expand Down

0 comments on commit c2502c4

Please sign in to comment.