From ba7c24953e004c70af7f387b857c895442eb3303 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 16:45:08 -0800 Subject: [PATCH 1/8] modules/kvs: Update kvstxn comments Fix forgotten change to function name. --- src/modules/kvs/kvstxn.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index 33f315fc90b5..e640e8e3e0cc 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -143,10 +143,10 @@ void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm); int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm); /* In internally stored ready transactions (moved to ready status via - * kvstxn_mgr_process_transaction_request()), merge them if they are - * capable of being merged. Returns -1 on error, 0 on success. On - * error, it is possible that the ready transaction has been modified - * with different transaction names and operations. The caller is + * kvstxn_mgr_add_transaction()), merge them if they are capable of + * being merged. Returns -1 on error, 0 on success. On error, it is + * possible that the ready transaction has been modified with + * different transaction names and operations. The caller is * responsible for sending errors to all appropriately. */ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm); From 1166c4cf7c733ddcd501bb11fb12223ecc0b2eb2 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 16:56:24 -0800 Subject: [PATCH 2/8] modules/kvs: Merge to new empty kvstxn_t In kvstxn_mgr_merge_ready_transactions(), instead of merging transactions into the current head ready transaction, create a new empty transaction and merge contents into it. Then push that new transaction onto the head of the ready list. Requires users to call kvstxn_mgr_get_ready_commit() after the merge to get the new head. --- src/modules/kvs/kvs.c | 6 +++ src/modules/kvs/kvstxn.c | 81 +++++++++++++++++++++++++++++----------- src/modules/kvs/kvstxn.h | 13 ++++--- 3 files changed, 74 insertions(+), 26 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e6b2031d2176..8e0da069d98c 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -1046,6 +1046,12 @@ static int kvstxn_check_root_cb (struct kvsroot *root, void *arg) */ if (kvstxn_mgr_merge_ready_transactions (root->ktm) < 0) kvstxn_set_aux_errnum (kt, errno); + else { + /* grab new head ready commit, if above succeeds, this + * must succeed */ + kt = kvstxn_mgr_get_ready_transaction (root->ktm); + assert (kt); + } } /* It does not matter if root has been marked for removal, diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 56d668acf487..9a0760affa9f 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -1104,11 +1104,36 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src) return -1; } +static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm) +{ + kvstxn_t *ktnew; + + if (!(ktnew = calloc (1, sizeof (*ktnew)))) + goto error_enomem; + if (!(ktnew->ops = json_array ())) + goto error_enomem; + if (!(ktnew->names = json_array ())) + goto error_enomem; + if (!(ktnew->missing_refs_list = zlist_new ())) + goto error_enomem; + if (!(ktnew->dirty_cache_entries_list = zlist_new ())) + goto error_enomem; + ktnew->ktm = ktm; + ktnew->state = KVSTXN_STATE_INIT; + return ktnew; + +error_enomem: + kvstxn_destroy (ktnew); + errno = ENOMEM; + return NULL; +} + /* Merge ready transactions that are mergeable, where merging consists - * of popping the "donor" transaction off the ready list, and - * appending its ops to the top transaction. The top transaction can - * be appended to if it hasn't started, or is still building the - * rootcpy, e.g. stalled walking the namespace. + * creating a new kvstxn_t, and merging the other transactions in the + * ready queue and appending their ops/names to the new transaction. + * After merging, push the new kvstxn_t onto the head of the ready + * queue. Merging can occur if the top transaction hasn't started, or + * is still building the rootcpy, e.g. stalled walking the namespace. * * Break when an unmergeable transaction is discovered. We do not * wish to merge non-adjacent transactions, as it can create @@ -1124,30 +1149,44 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src) int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) { - kvstxn_t *kt = zlist_first (ktm->ready); + kvstxn_t *first, *second, *new; + kvstxn_t *nextkt; /* transaction must still be in state where merged in ops can be * applied */ - if (kt - && kt->errnum == 0 - && kt->state <= KVSTXN_STATE_APPLY_OPS - && !(kt->flags & FLUX_KVS_NO_MERGE)) { - kvstxn_t *nkt; - while ((nkt = zlist_next (ktm->ready))) { - int ret; + first = zlist_first (ktm->ready); + if (!first + || first->errnum != 0 + || first->aux_errnum != 0 + || first->state > KVSTXN_STATE_APPLY_OPS + || (first->flags & FLUX_KVS_NO_MERGE)) + return 0; - if ((ret = kvstxn_merge (kt, nkt)) < 0) - return -1; + second = zlist_next (ktm->ready); + if (!second + || (second->flags & FLUX_KVS_NO_MERGE)) + return 0; - /* if return == 0, we've merged as many as we currently - * can */ - if (!ret) - break; + if (!(new = kvstxn_create_empty (ktm))) + return -1; + + nextkt = zlist_first (ktm->ready); + do { + int ret; - /* Merged kvstxn, remove off ready list */ - zlist_remove (ktm->ready, nkt); + if ((ret = kvstxn_merge (new, nextkt)) < 0) { + kvstxn_destroy (new); + return -1; } - } + + if (!ret) + break; + + zlist_remove (ktm->ready, nextkt); + } while ((nextkt = zlist_next (ktm->ready))); + + zlist_push (ktm->ready, new); + zlist_freefn (ktm->ready, new, (zlist_free_fn *)kvstxn_destroy, false); return 0; } diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index e640e8e3e0cc..c0982a2281c9 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -143,11 +143,14 @@ void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm); int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm); /* In internally stored ready transactions (moved to ready status via - * kvstxn_mgr_add_transaction()), merge them if they are capable of - * being merged. Returns -1 on error, 0 on success. On error, it is - * possible that the ready transaction has been modified with - * different transaction names and operations. The caller is - * responsible for sending errors to all appropriately. + * kvstxn_mgr_add_transaction()), merge them into a new ready transaction + * if they are capable of being merged. + * + * Callers should be cautioned to re-call + * kvstxn_mgr_get_ready_transaction() for the new head commit as the + * prior one has been removed. + * + * Returns -1 on error, 0 on success. */ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm); From c2d030d0c0e91f0142158e8b73ab90afb295bab1 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 16:59:50 -0800 Subject: [PATCH 3/8] modules/kvs: Refactor internal kvstxn_merge() With recent changes, kvstxn_merge() no longer needs to be fully cleaned up on error. An error code can be returned to the caller kvstxn_mgr_merge_ready_commits(), which will handle full cleanup. --- src/modules/kvs/kvstxn.c | 40 ++++++++-------------------------------- 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 9a0760affa9f..e79249b1c7bb 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -1049,59 +1049,35 @@ int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm) static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src) { - json_t *names = NULL; - json_t *ops = NULL; - int i, len, saved_errno; + int i, len; - if ((dest->flags & FLUX_KVS_NO_MERGE) || (src->flags & FLUX_KVS_NO_MERGE)) + if (src->flags & FLUX_KVS_NO_MERGE) return 0; if ((len = json_array_size (src->names))) { - if (!(names = json_copy (dest->names))) { - saved_errno = ENOMEM; - goto error; - } for (i = 0; i < len; i++) { json_t *name; if ((name = json_array_get (src->names, i))) { - if (json_array_append (names, name) < 0) { - saved_errno = ENOMEM; - goto error; + if (json_array_append (dest->names, name) < 0) { + errno = ENOMEM; + return -1; } } } } if ((len = json_array_size (src->ops))) { - if (!(ops = json_copy (dest->ops))) { - saved_errno = ENOMEM; - goto error; - } for (i = 0; i < len; i++) { json_t *op; if ((op = json_array_get (src->ops, i))) { - if (json_array_append (ops, op) < 0) { - saved_errno = ENOMEM; - goto error; + if (json_array_append (dest->ops, op) < 0) { + errno = ENOMEM; + return -1; } } } } - if (names) { - json_decref (dest->names); - dest->names = names; - } - if (ops) { - json_decref (dest->ops); - dest->ops = ops; - } return 1; - - error: - json_decref (names); - json_decref (ops); - errno = saved_errno; - return -1; } static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm) From 3f0b313cd856c800bc504d20b8667ef975d18d5a Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 17:06:02 -0800 Subject: [PATCH 4/8] modules/kvs: Check flags on kvstxn merge When merging transactions, also ensure flags are identical. --- src/modules/kvs/kvstxn.c | 11 +++++++---- src/modules/kvs/test/kvstxn.c | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index e79249b1c7bb..31771f3dd81b 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -1051,7 +1051,8 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src) { int i, len; - if (src->flags & FLUX_KVS_NO_MERGE) + if (src->flags & FLUX_KVS_NO_MERGE + || dest->flags != src->flags) return 0; if ((len = json_array_size (src->names))) { @@ -1080,7 +1081,7 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src) return 1; } -static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm) +static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm, int flags) { kvstxn_t *ktnew; @@ -1094,6 +1095,7 @@ static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm) goto error_enomem; if (!(ktnew->dirty_cache_entries_list = zlist_new ())) goto error_enomem; + ktnew->flags = flags; ktnew->ktm = ktm; ktnew->state = KVSTXN_STATE_INIT; return ktnew; @@ -1140,10 +1142,11 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) second = zlist_next (ktm->ready); if (!second - || (second->flags & FLUX_KVS_NO_MERGE)) + || (second->flags & FLUX_KVS_NO_MERGE) + || (first->flags != second->flags)) return 0; - if (!(new = kvstxn_create_empty (ktm))) + if (!(new = kvstxn_create_empty (ktm, first->flags))) return -1; nextkt = zlist_first (ktm->ready); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index 3c67fdfb4c2d..1065efd6eb9a 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -348,6 +348,28 @@ void kvstxn_mgr_merge_tests (void) clear_ready_kvstxns (ktm); + /* test unsuccessful merge - different flags */ + + create_ready_kvstxn (ktm, "transaction1", "key1", "1", 0, 0); + create_ready_kvstxn (ktm, "transaction2", "key2", "2", 0, 0x5); + + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions success"); + + names = json_array (); + json_array_append (names, json_string ("transaction1")); + + ops = json_array (); + ops_append (ops, "key1", "1", 0); + + verify_ready_kvstxn (ktm, names, ops, 0, "unmerged fence"); + + json_decref (names); + json_decref (ops); + ops = NULL; + + clear_ready_kvstxns (ktm); + kvstxn_mgr_destroy (ktm); cache_destroy (cache); } From 9dd5fb70080168302c4ae2e652b15e5a5f4f2fe3 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 17:15:29 -0800 Subject: [PATCH 5/8] modules/kvs: don't modify ready queue on error Alter logic in kvstxn_mgr_merge_ready_transactions(), so that on error, no modifications to the kvstxn ready queue occur. --- src/modules/kvs/kvstxn.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 31771f3dd81b..6dfab9ae8218 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -1129,6 +1129,7 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) { kvstxn_t *first, *second, *new; kvstxn_t *nextkt; + int count = 0; /* transaction must still be in state where merged in ops can be * applied */ @@ -1161,9 +1162,16 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) if (!ret) break; - zlist_remove (ktm->ready, nextkt); + count++; } while ((nextkt = zlist_next (ktm->ready))); + /* We use the count variable, so that we only modify the ready + * queue if no errors occur above */ + while (count--) { + nextkt = zlist_pop (ktm->ready); + kvstxn_destroy (nextkt); + } + zlist_push (ktm->ready, new); zlist_freefn (ktm->ready, new, (zlist_free_fn *)kvstxn_destroy, false); return 0; From 79c176a740f5750068299900724639c6429bbfab Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 6 Mar 2018 17:30:12 -0800 Subject: [PATCH 6/8] modules/kvs: Add check in internal kvstxn API Add internal checks that ensures only kvstxn's that are ready for processing are passed to processing functions. Add unit tests appropriately. --- src/modules/kvs/kvstxn.c | 18 ++++++++++++++--- src/modules/kvs/test/kvstxn.c | 37 +++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 6dfab9ae8218..12d7cce57142 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -43,6 +43,8 @@ #include "kvstxn.h" #include "kvs_util.h" +#define KVSTXN_PROCESSING 0x01 + struct kvstxn_mgr { struct cache *cache; const char *namespace; @@ -64,6 +66,7 @@ struct kvstxn { blobref_t newroot; zlist_t *missing_refs_list; zlist_t *dirty_cache_entries_list; + int internal_flags; kvstxn_mgr_t *ktm; enum { KVSTXN_STATE_INIT = 1, @@ -717,6 +720,11 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt, if (kt->errnum) return KVSTXN_PROCESS_ERROR; + if (!(kt->internal_flags & KVSTXN_PROCESSING)) { + kt->errnum = EINVAL; + return KVSTXN_PROCESS_ERROR; + } + switch (kt->state) { case KVSTXN_STATE_INIT: case KVSTXN_STATE_LOAD_ROOT: @@ -1022,14 +1030,18 @@ bool kvstxn_mgr_transaction_ready (kvstxn_mgr_t *ktm) kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm) { - if (kvstxn_mgr_transaction_ready (ktm)) - return zlist_first (ktm->ready); + if (kvstxn_mgr_transaction_ready (ktm)) { + kvstxn_t *kt = zlist_first (ktm->ready); + kt->internal_flags |= KVSTXN_PROCESSING; + return kt; + } return NULL; } void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt) { - zlist_remove (ktm->ready, kt); + if (kt->internal_flags & KVSTXN_PROCESSING) + zlist_remove (ktm->ready, kt); } int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm) diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index 1065efd6eb9a..ccaf8c3e3665 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -686,6 +686,42 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (void) cache_destroy (cache); } +void kvstxn_basic_kvstxn_process_test_invalid_transaction (void) +{ + struct cache *cache; + kvstxn_mgr_t *ktm; + kvstxn_t *ktbad, *kt; + blobref_t rootref; + + cache = create_cache_with_empty_rootdir (rootref); + + ok ((ktm = kvstxn_mgr_create (cache, + KVS_PRIMARY_NAMESPACE, + "sha1", + NULL, + &test_global)) != NULL, + "kvstxn_mgr_create works"); + + create_ready_kvstxn (ktm, "transaction1", "key1", "1", 0, 0); + create_ready_kvstxn (ktm, "transaction2", "key2", "2", 0, 0); + + ok ((ktbad = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions success"); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_process (ktbad, 1, rootref) == KVSTXN_PROCESS_ERROR + && kvstxn_get_errnum (ktbad) == EINVAL, + "kvstxn_process fails on bad kvstxn"); + + kvstxn_mgr_destroy (ktm); + cache_destroy (cache); +} + void kvstxn_basic_root_not_dir (void) { struct cache *cache; @@ -2114,6 +2150,7 @@ int main (int argc, char *argv[]) kvstxn_basic_kvstxn_process_test (); kvstxn_basic_kvstxn_process_test_multiple_transactions (); kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (); + kvstxn_basic_kvstxn_process_test_invalid_transaction (); kvstxn_basic_root_not_dir (); kvstxn_process_root_missing (); kvstxn_process_missing_ref (); From 06c7390c78ed181806f0d9c8724f9615b3bedf0d Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 7 Mar 2018 11:01:26 -0800 Subject: [PATCH 7/8] modules/kvs: Preserve orig kvstxns during merge Do not destroy transactions after they have been merged. Instead flag them as components of a larger merge. When the kvstxn of a set of merged transactions completes/is removed, at that point in time remove all of the components of the larger merge. As a consequence of this change and for optimization purposes, once a merger of transactions has occured, there can no longer be any more mergers until the head merged transaction has completed. If this were not done, the ready queue would constantly be iterted through and new head merged transactions would be created. This can be optimized at a later time. Add unit tests. --- src/modules/kvs/kvstxn.c | 41 +++++++++++++++++++++++++++-------- src/modules/kvs/test/kvstxn.c | 33 +++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 12d7cce57142..a922d69b5d72 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -43,7 +43,9 @@ #include "kvstxn.h" #include "kvs_util.h" -#define KVSTXN_PROCESSING 0x01 +#define KVSTXN_PROCESSING 0x01 +#define KVSTXN_MERGED 0x02 /* kvstxn is a merger of transactions */ +#define KVSTXN_MERGE_COMPONENT 0x04 /* kvstxn is member of a merger */ struct kvstxn_mgr { struct cache *cache; @@ -1040,8 +1042,22 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm) void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt) { - if (kt->internal_flags & KVSTXN_PROCESSING) + if (kt->internal_flags & KVSTXN_PROCESSING) { + bool kvstxn_is_merged = false; + + if (kt->internal_flags & KVSTXN_MERGED) + kvstxn_is_merged = true; + zlist_remove (ktm->ready, kt); + + if (kvstxn_is_merged) { + kvstxn_t *kt_tmp = zlist_first (ktm->ready); + while (kt_tmp && (kt_tmp->internal_flags & KVSTXN_MERGE_COMPONENT)) { + zlist_remove (ktm->ready, kt_tmp); + kt_tmp = zlist_next (ktm->ready); + } + } + } } int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm) @@ -1150,7 +1166,8 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) || first->errnum != 0 || first->aux_errnum != 0 || first->state > KVSTXN_STATE_APPLY_OPS - || (first->flags & FLUX_KVS_NO_MERGE)) + || (first->flags & FLUX_KVS_NO_MERGE) + || first->internal_flags & KVSTXN_MERGED) return 0; second = zlist_next (ktm->ready); @@ -1161,6 +1178,7 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) if (!(new = kvstxn_create_empty (ktm, first->flags))) return -1; + new->internal_flags |= KVSTXN_MERGED; nextkt = zlist_first (ktm->ready); do { @@ -1177,12 +1195,17 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) count++; } while ((nextkt = zlist_next (ktm->ready))); - /* We use the count variable, so that we only modify the ready - * queue if no errors occur above */ - while (count--) { - nextkt = zlist_pop (ktm->ready); - kvstxn_destroy (nextkt); - } + /* if count is zero, checks at beginning of function are invalid */ + assert (count); + + nextkt = zlist_first (ktm->ready); + do { + /* Wipe out KVSTXN_PROCESSING flag if user previously got + * the kvstxn_t + */ + nextkt->internal_flags &= ~KVSTXN_PROCESSING; + nextkt->internal_flags |= KVSTXN_MERGE_COMPONENT; + } while (--count && (nextkt = zlist_next (ktm->ready))); zlist_push (ktm->ready, new); zlist_freefn (ktm->ready, new, (zlist_free_fn *)kvstxn_destroy, false); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index ccaf8c3e3665..57531a089242 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -652,6 +652,16 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (void) ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, "kvstxn_mgr_merge_ready_transactions success"); + /* call merge again to ensure nothing happens */ + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions success"); + + create_ready_kvstxn (ktm, "transaction3", "baz.key3", "3", 0, 0); + + /* call merge again to ensure last transaction not merged */ + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions success"); + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, "kvstxn_mgr_get_ready_transaction returns ready kvstxn"); @@ -662,7 +672,7 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (void) "kvstxn_iter_dirty_cache_entries works for dirty cache entries"); /* why three? 1 for root, 1 for foo.key1 (a new dir), and 1 for - * bar.key2 (a new dir) + * bar.key2 (a new dir), "baz.key3" is not committed. */ ok (count == 3, @@ -679,6 +689,27 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (void) kvstxn_mgr_remove_transaction (ktm, kt); + /* process the lingering transaction */ + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); + + ok (kvstxn_process (kt, 1, newroot) == KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES, + "kvstxn_process returns KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES"); + + ok (kvstxn_iter_dirty_cache_entries (kt, cache_count_dirty_cb, &count) == 0, + "kvstxn_iter_dirty_cache_entries works for dirty cache entries"); + + ok (kvstxn_process (kt, 1, newroot) == KVSTXN_PROCESS_FINISHED, + "kvstxn_process returns KVSTXN_PROCESS_FINISHED"); + + ok ((newroot = kvstxn_get_newroot_ref (kt)) != NULL, + "kvstxn_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "baz.key3", "3"); + + /* now the ready queue should be empty */ + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); From 96b46c030f9188af98699f9313037399ef7d64e2 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 7 Mar 2018 11:21:29 -0800 Subject: [PATCH 8/8] modules/kvs: try orig transactions on kvstxn error In kvstxn_mgr_remove_transaction() support flag for user to fallback a merged kvstxn to the original transactions that made up the merge. By doing so, the user need not send an error to all transactions merged into that kvstxn. Instead, each of the original transactions can be replayed individually, and an error will only be sent to the offending commit/fence transaction. Support kvstxn_fallback_mergeable() so user knows if a kvstxn can be falled back on. In kvstxn_apply(), take advantage of this by not sending an error when a kvstxn's merging can be falled back on. As an exception, do not fallback if it's a "death"-like error (e.g. ENOMEM). Add internal kvstxn API unit tests. Fixes #1337 --- src/modules/kvs/kvs.c | 22 ++++- src/modules/kvs/kvstxn.c | 18 +++- src/modules/kvs/kvstxn.h | 32 ++++++- src/modules/kvs/test/kvstxn.c | 175 +++++++++++++++++++++++++++++++--- 4 files changed, 226 insertions(+), 21 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 8e0da069d98c..2dab300b189e 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -881,6 +881,7 @@ static void kvstxn_apply (kvstxn_t *kt) wait_t *wait = NULL; int errnum = 0; kvstxn_process_t ret; + bool fallback = false; namespace = kvstxn_get_namespace (kt); assert (namespace); @@ -986,17 +987,28 @@ static void kvstxn_apply (kvstxn_t *kt) setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1); setroot_event_send (ctx, root, names); } else { - flux_log (ctx->h, LOG_ERR, "transaction failed: %s", - flux_strerror (errnum)); - error_event_send (ctx, root->namespace, kvstxn_get_names (kt), - errnum); + fallback = kvstxn_fallback_mergeable (kt); + + flux_log (ctx->h, LOG_ERR, "kvstxn failed: %s%s", + flux_strerror (errnum), + fallback ? " (is fallbackable)" : ""); + + /* if merged transaction is fallbackable, ignore the fallback option + * if it's an extreme "death" like error. + */ + if (errnum == ENOMEM || errnum == ENOTSUP) + fallback = false; + + if (!fallback) + error_event_send (ctx, root->namespace, kvstxn_get_names (kt), + errnum); } wait_destroy (wait); /* Completed: remove from 'ready' list. * N.B. treq_t remains in the treq_mgr_t hash until event is received. */ - kvstxn_mgr_remove_transaction (root->ktm, kt); + kvstxn_mgr_remove_transaction (root->ktm, kt, fallback); return; stall: diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index a922d69b5d72..f0d26c6c5bc8 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -160,6 +160,13 @@ int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum) return kt->aux_errnum; } +bool kvstxn_fallback_mergeable (kvstxn_t *kt) +{ + if (kt->internal_flags & KVSTXN_MERGED) + return true; + return false; +} + json_t *kvstxn_get_ops (kvstxn_t *kt) { return kt->ops; @@ -1040,7 +1047,8 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm) return NULL; } -void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt) +void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt, + bool fallback) { if (kt->internal_flags & KVSTXN_PROCESSING) { bool kvstxn_is_merged = false; @@ -1053,7 +1061,13 @@ void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt) if (kvstxn_is_merged) { kvstxn_t *kt_tmp = zlist_first (ktm->ready); while (kt_tmp && (kt_tmp->internal_flags & KVSTXN_MERGE_COMPONENT)) { - zlist_remove (ktm->ready, kt_tmp); + if (fallback) { + kt_tmp->internal_flags &= ~KVSTXN_MERGE_COMPONENT; + kt_tmp->flags |= FLUX_KVS_NO_MERGE; + } + else + zlist_remove (ktm->ready, kt_tmp); + kt_tmp = zlist_next (ktm->ready); } } diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index c0982a2281c9..8070508ff6c9 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -35,6 +35,18 @@ int kvstxn_get_errnum (kvstxn_t *kt); int kvstxn_get_aux_errnum (kvstxn_t *kt); int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum); +/* Returns true if a kvstxn was merged and the user can fallback to + * the original transactions that it was made up of. This function + * should be used when a merged kvstxn has failed. Instead of failing + * all transactions in this merged kvstxn, the kvstxn manager can be + * told to fallback to the original transactions via a flag in + * kvstxn_mgr_remove_transaction(). By falling back to the original + * transactions, each can be played one by one and only the specific + * failing transaction can be sent an error. See + * kvstxn_mgr_remove_kvstxn() below for more details. + */ +bool kvstxn_fallback_mergeable (kvstxn_t *kt); + json_t *kvstxn_get_ops (kvstxn_t *kt); json_t *kvstxn_get_names (kvstxn_t *kt); int kvstxn_get_flags (kvstxn_t *kt); @@ -133,8 +145,22 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm); /* remove a transaction from the kvstxn manager after it is done * processing + * + * If the kvstxn was merged, and the caller would like to fallback to + * the original individual transactions (so they can be retried + * individually), set `fallback` to true. This will put the original + * transactions back on the ready queue, but will make it so they + * cannot be merged in the future (e.g. setting FLUX_KVS_NO_MERGE on + * them). + * + * Be careful with the 'fallback' option. If a transaction was + * successful, you can still fallback the merged kvstxn into its + * individual components. 'fallback' should only be set when you get + * an error (i.e. you don't use kvstxn_get_newroot_ref to get a new + * root). */ -void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt); +void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt, + bool fallback); int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm); void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm); @@ -150,6 +176,10 @@ int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm); * kvstxn_mgr_get_ready_transaction() for the new head commit as the * prior one has been removed. * + * A merged kvstxn can be backed out if an error occurs. See + * kvstxn_fallback_mergeable() and kvstxn_mgr_remove_transaction() + * above. + * * Returns -1 on error, 0 on success. */ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index 57531a089242..6947aea1cd22 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -192,7 +192,7 @@ void kvstxn_mgr_basic_tests (void) ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, "kvstxn_mgr_get_ready_transaction returns != NULL for ready kvstxns"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); ok (kvstxn_mgr_transaction_ready (ktm) == false, "kvstxn_mgr_transaction_ready says no transactions are ready"); @@ -261,7 +261,7 @@ void clear_ready_kvstxns (kvstxn_mgr_t *ktm) kvstxn_t *kt; while ((kt = kvstxn_mgr_get_ready_transaction (ktm))) - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); } void kvstxn_mgr_merge_tests (void) @@ -541,7 +541,7 @@ void kvstxn_basic_kvstxn_process_test (void) verify_value (cache, newroot, "key1", "1"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); @@ -592,7 +592,7 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions (void) strcpy (rootref, newroot); /* get rid of the this kvstxn, we're done */ - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, "kvstxn_mgr_get_ready_transaction returns ready kvstxn"); @@ -618,7 +618,7 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions (void) verify_value (cache, newroot, "key1", "1"); verify_value (cache, newroot, "dir.key2", "2"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); @@ -687,7 +687,7 @@ void kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (void) verify_value (cache, newroot, "foo.key1", "1"); verify_value (cache, newroot, "bar.key2", "2"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); /* process the lingering transaction */ @@ -1795,7 +1795,7 @@ void kvstxn_process_big_fileval (void) verify_value (cache, newroot, "val", "smallstr"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); /* next kvstxn a big value, to make sure it is not json in the * cache */ @@ -1951,7 +1951,7 @@ void kvstxn_process_giant_dir (void) verify_value (cache, newroot, "dir.val0090", "bar"); verify_value (cache, newroot, "dir.val00D0", NULL); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); @@ -2032,7 +2032,7 @@ void kvstxn_process_append (void) verify_value (cache, newroot, "val", "abcdefgh"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); /* * second test, append to a treeobj valref @@ -2063,7 +2063,7 @@ void kvstxn_process_append (void) verify_value (cache, newroot, "valref", "ABCDEFGH"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); /* * third test, append to a non-existent value, it's like an insert @@ -2093,7 +2093,7 @@ void kvstxn_process_append (void) verify_value (cache, newroot, "newval", "foobar"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); kvstxn_mgr_destroy (ktm); cache_destroy (cache); @@ -2148,7 +2148,7 @@ void kvstxn_process_append_errors (void) ok (kvstxn_get_errnum (kt) == EISDIR, "kvstxn_get_errnum return EISDIR"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); /* * append to a symlink, should get EOPNOTSUPP @@ -2165,7 +2165,155 @@ void kvstxn_process_append_errors (void) ok (kvstxn_get_errnum (kt) == EOPNOTSUPP, "kvstxn_get_errnum return EOPNOTSUPP"); - kvstxn_mgr_remove_transaction (ktm, kt); + kvstxn_mgr_remove_transaction (ktm, kt, false); + + kvstxn_mgr_destroy (ktm); + cache_destroy (cache); +} + +void kvstxn_process_fallback_merge (void) +{ + struct cache *cache; + int count = 0; + kvstxn_mgr_t *ktm; + kvstxn_t *kt; + blobref_t rootref; + const char *newroot; + + cache = create_cache_with_empty_rootdir (rootref); + + ok ((ktm = kvstxn_mgr_create (cache, + KVS_PRIMARY_NAMESPACE, + "sha1", + NULL, + &test_global)) != NULL, + "kvstxn_mgr_create works"); + + /* + * This makes sure the basic "merge" works as we expect + */ + + create_ready_kvstxn (ktm, "transaction1", "key1", "42", 0, 0); + create_ready_kvstxn (ktm, "transaction2", "key2", "43", 0, 0); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_fallback_mergeable (kt) == false, + "kvstxn_fallback_mergeable returns false on unmerged transaction"); + + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions works"); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_fallback_mergeable (kt) == true, + "kvstxn_fallback_mergeable returns true on merged transaction"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES, + "kvstxn_process returns KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES"); + + ok (kvstxn_iter_dirty_cache_entries (kt, cache_count_dirty_cb, &count) == 0, + "kvstxn_iter_dirty_cache_entries works for dirty cache entries"); + + ok (count == 1, + "correct number of cache entries were dirty"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_FINISHED, + "kvstxn_process returns KVSTXN_PROCESS_FINISHED"); + + ok ((newroot = kvstxn_get_newroot_ref (kt)) != NULL, + "kvstxn_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "key1", "42"); + verify_value (cache, newroot, "key2", "43"); + + kvstxn_mgr_remove_transaction (ktm, kt, false); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, + "kvstxn_mgr_get_ready_transaction returns NULL, no more transactions"); + + memcpy (rootref, newroot, sizeof (blobref_t)); + + /* + * Now we create an error in a merge by writing to "." + */ + + create_ready_kvstxn (ktm, "transaction3", "key3", "44", 0, 0); + create_ready_kvstxn (ktm, "transaction4", ".", "45", 0, 0); + + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions works"); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_ERROR, + "kvstxn_process returns KVSTXN_PROCESS_ERROR"); + + ok (kvstxn_get_errnum (kt) == EINVAL, + "kvstxn_get_errnum returns EINVAL"); + + ok (kvstxn_fallback_mergeable (kt) == true, + "kvstxn_fallback_mergeable returns true on merged transaction"); + + kvstxn_mgr_remove_transaction (ktm, kt, true); + + /* now the original transactions should be back in the ready queue */ + + /* This should succeed, but shouldn't actually merge anything */ + ok (kvstxn_mgr_merge_ready_transactions (ktm) == 0, + "kvstxn_mgr_merge_ready_transactions works"); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_fallback_mergeable (kt) == false, + "kvstxn_fallback_mergeable returns false on unmerged transaction"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES, + "kvstxn_process returns KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES"); + + count = 0; + ok (kvstxn_iter_dirty_cache_entries (kt, cache_count_dirty_cb, &count) == 0, + "kvstxn_iter_dirty_cache_entries works for dirty cache entries"); + + ok (count == 1, + "correct number of cache entries were dirty"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_FINISHED, + "kvstxn_process returns KVSTXN_PROCESS_FINISHED"); + + ok ((newroot = kvstxn_get_newroot_ref (kt)) != NULL, + "kvstxn_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "key3", "44"); + + kvstxn_mgr_remove_transaction (ktm, kt, false); + + memcpy (rootref, newroot, sizeof (blobref_t)); + + /* now we try and transaction the next fence, which should be the bad one */ + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready transaction"); + + ok (kvstxn_fallback_mergeable (kt) == false, + "kvstxn_fallback_mergeable returns false on unmerged transaction"); + + ok (kvstxn_process (kt, 1, rootref) == KVSTXN_PROCESS_ERROR, + "kvstxn_process returns KVSTXN_PROCESS_ERROR"); + + ok (kvstxn_get_errnum (kt) == EINVAL, + "kvstxn_get_errnum returns EINVAL"); + + kvstxn_mgr_remove_transaction (ktm, kt, false); + + /* now make sure the ready queue is back to empty */ + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, + "kvstxn_mgr_get_ready_transaction returns NULL, no more transactions"); kvstxn_mgr_destroy (ktm); cache_destroy (cache); @@ -2203,6 +2351,7 @@ int main (int argc, char *argv[]) kvstxn_process_giant_dir (); kvstxn_process_append (); kvstxn_process_append_errors (); + kvstxn_process_fallback_merge (); done_testing (); return (0);