Skip to content

Commit

Permalink
modules/kvs: Support ability to fallback merged commits
Browse files Browse the repository at this point in the history
In commit_mgr_remove_commit() support flag for user to fallback
a merged commit to the original commits that made up the merge.

By doing so, the user need not send an error to all fences merged
into that merged commit.  Instead, each of the original commits
can be replayed individually, and an error will only be sent to
the offending commit/fence.

Support commit_fallback_mergeable() so user knows if a commit can
be falled back on.

In core KVS file, take advantage of this by not sending an error
when a commit's merging can be falled back on.  As an exception,
do not fallback if it's a "death"-like error (e.g. ENOMEM).

Add internal commit API unit tests.

Fixes #1337
  • Loading branch information
chu11 committed Feb 23, 2018
1 parent be0f53a commit 3fb1fd7
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 21 deletions.
16 changes: 14 additions & 2 deletions src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ int commit_set_aux_errnum (commit_t *c, int errnum)
return c->aux_errnum;
}

bool commit_fallback_mergeable (commit_t *c)
{
if (c->internal_flags & COMMIT_MERGED)
return true;
return false;
}

json_t *commit_get_ops (commit_t *c)
{
return c->ops;
Expand Down Expand Up @@ -1126,7 +1133,7 @@ commit_t *commit_mgr_get_ready_commit (commit_mgr_t *cm)
return NULL;
}

void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c)
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c, bool fallback)
{
if (c->internal_flags & COMMIT_PROCESSING) {
bool commit_is_merged = false;
Expand All @@ -1139,7 +1146,12 @@ void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c)
if (commit_is_merged) {
commit_t *ctmp = zlist_first (cm->ready);
while (ctmp && (ctmp->internal_flags & COMMIT_MERGE_COMPONENT)) {
zlist_remove (cm->ready, ctmp);
if (fallback) {
ctmp->internal_flags &= ~COMMIT_MERGE_COMPONENT;
ctmp->flags |= FLUX_KVS_NO_MERGE;
}
else
zlist_remove (cm->ready, ctmp);
ctmp = zlist_next (cm->ready);
}
}
Expand Down
26 changes: 25 additions & 1 deletion src/modules/kvs/commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ int commit_get_errnum (commit_t *c);
int commit_get_aux_errnum (commit_t *c);
int commit_set_aux_errnum (commit_t *c, int errnum);

/* Returns true if a commit was merged and the user can fallback to
* the original commits. This function should be used when a merged
* set of commits has failed. Instead of failing all fences in this
* merged commit, the commit manager can be told to fallback to the
* original commits via a flag in commit_mgr_remove_commit(). By
* falling back to the original commits, each ready commit can be
* played one by one and only the specific failing fence can be sent
* an error. See commit_mgr_remove_commit() below for more details.
*/
bool commit_fallback_mergeable (commit_t *c);

json_t *commit_get_ops (commit_t *c);
json_t *commit_get_names (commit_t *c);
int commit_get_flags (commit_t *c);
Expand Down Expand Up @@ -145,8 +156,18 @@ bool commit_mgr_commits_ready (commit_mgr_t *cm);
commit_t *commit_mgr_get_ready_commit (commit_mgr_t *cm);

/* remove a commit from the commit manager after it is done processing
*
* If the commit was merged, and the caller would like to fallback to
* the original individual commits (so they can be retried
* individually), set `fallback` to true. This will put the original
* commits on the ready queue, but will make it so they cannot be
* merged in the future (e.g. setting FLUX_KVS_NO_MERGE on them).
*
* Be careful with the 'fallback' option. If a commit was successful,
* you can still fallback the merged commit into its individual
* components. 'fallback' should only be set when you get an error.
*/
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c);
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c, bool fallback);

/* remove a fence from the commit manager */
int commit_mgr_remove_fence (commit_mgr_t *cm, const char *name);
Expand All @@ -168,6 +189,9 @@ int commit_mgr_ready_commit_count (commit_mgr_t *cm);
* commit_mgr_get_ready_commit() for the present head commit as the
* prior one of been removed.
*
* A merged commit can be backed out if an error occurs. See
* commit_unmergeable() and commit_mgr_remove_commit() above.
*
* Returns -1 on error, 0 on success.
*/
int commit_mgr_merge_ready_commits (commit_mgr_t *cm);
Expand Down
22 changes: 17 additions & 5 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ static void commit_apply (commit_t *c)
wait_t *wait = NULL;
int errnum = 0;
commit_process_t ret;
bool fallback = false;

namespace = commit_get_namespace (c);
assert (namespace);
Expand Down Expand Up @@ -985,17 +986,28 @@ static void commit_apply (commit_t *c)
setroot (ctx, root, commit_get_newroot_ref (c), root->seq + 1);
setroot_event_send (ctx, root, names);
} else {
flux_log (ctx->h, LOG_ERR, "commit failed: %s",
flux_strerror (errnum));
error_event_send (ctx, root->namespace, commit_get_names (c),
errnum);
fallback = commit_fallback_mergeable (c);

flux_log (ctx->h, LOG_ERR, "commit failed: %s%s",
flux_strerror (errnum),
fallback ? " (is fallbackable)" : "");

/* if merged commit is fallbackable, ignore the fallback option
* if it's an extreme "death" error.
*/
if (errnum == ENOMEM || errnum == ENOTSUP)
fallback = false;

if (!fallback)
error_event_send (ctx, root->namespace, commit_get_names (c),
errnum);
}
wait_destroy (wait);

/* Completed: remove from 'ready' list.
* N.B. fence_t remains in the fences hash until event is received.
*/
commit_mgr_remove_commit (root->cm, c);
commit_mgr_remove_commit (root->cm, c, fallback);
return;

stall:
Expand Down
175 changes: 162 additions & 13 deletions src/modules/kvs/test/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void commit_mgr_basic_tests (void)
ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns != NULL for ready commits");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok (commit_mgr_commits_ready (cm) == false,
"commit_mgr_commits_ready says no fences are ready");
Expand Down Expand Up @@ -308,7 +308,7 @@ void clear_ready_commits (commit_mgr_t *cm)
commit_t *c;

while ((c = commit_mgr_get_ready_commit (cm)))
commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);
}

void commit_mgr_merge_tests (void)
Expand Down Expand Up @@ -600,7 +600,7 @@ void commit_basic_commit_process_test (void)

verify_value (cache, newroot, "key1", "1");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -651,7 +651,7 @@ void commit_basic_commit_process_test_multiple_fences (void)
strcpy (rootref, newroot);

/* get rid of the this commit, we're done */
commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");
Expand All @@ -677,7 +677,7 @@ void commit_basic_commit_process_test_multiple_fences (void)
verify_value (cache, newroot, "key1", "1");
verify_value (cache, newroot, "dir.key2", "2");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -736,7 +736,7 @@ void commit_basic_commit_process_test_multiple_fences_merge (void)
verify_value (cache, newroot, "foo.key1", "1");
verify_value (cache, newroot, "bar.key2", "2");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -1954,7 +1954,7 @@ void commit_process_big_fileval (void)

verify_value (cache, newroot, "val", "smallstr");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/* next commit a big value, to make sure it is not json in the
* cache */
Expand Down Expand Up @@ -2110,7 +2110,7 @@ void commit_process_giant_dir (void)
verify_value (cache, newroot, "dir.val0090", "bar");
verify_value (cache, newroot, "dir.val00D0", NULL);

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -2191,7 +2191,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "val", "abcdefgh");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* second test, append to a treeobj valref
Expand Down Expand Up @@ -2222,7 +2222,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "valref", "ABCDEFGH");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* third test, append to a non-existent value, it's like an insert
Expand Down Expand Up @@ -2252,7 +2252,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "newval", "foobar");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

commit_mgr_destroy (cm);
cache_destroy (cache);
Expand Down Expand Up @@ -2307,7 +2307,7 @@ void commit_process_append_errors (void)
ok (commit_get_errnum (c) == EISDIR,
"commit_get_errnum return EISDIR");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* append to a symlink, should get EOPNOTSUPP
Expand All @@ -2324,7 +2324,155 @@ void commit_process_append_errors (void)
ok (commit_get_errnum (c) == EOPNOTSUPP,
"commit_get_errnum return EOPNOTSUPP");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

commit_mgr_destroy (cm);
cache_destroy (cache);
}

void commit_process_fallback_merge (void)
{
struct cache *cache;
int count = 0;
commit_mgr_t *cm;
commit_t *c;
blobref_t rootref;
const char *newroot;

cache = create_cache_with_empty_rootdir (rootref);

ok ((cm = commit_mgr_create (cache,
KVS_PRIMARY_NAMESPACE,
"sha1",
NULL,
&test_global)) != NULL,
"commit_mgr_create works");

/*
* This makes sure the basic "merge" works as we expect
*/

create_ready_commit (cm, "fence1", "key1", "42", 0, 0);
create_ready_commit (cm, "fence2", "key2", "43", 0, 0);

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_fallback_mergeable (c) == false,
"commit_fallback_mergeable returns false on unmerged commit");

ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_fallback_mergeable (c) == true,
"commit_fallback_mergeable returns true on merged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES,
"commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES");

ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0,
"commit_iter_dirty_cache_entries works for dirty cache entries");

ok (count == 1,
"correct number of cache entries were dirty");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_FINISHED,
"commit_process returns COMMIT_PROCESS_FINISHED");

ok ((newroot = commit_get_newroot_ref (c)) != NULL,
"commit_get_newroot_ref returns != NULL when processing complete");

verify_value (cache, newroot, "key1", "42");
verify_value (cache, newroot, "key2", "43");

commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");

memcpy (rootref, newroot, sizeof (blobref_t));

/*
* Now we create an error in a merge by writing to "."
*/

create_ready_commit (cm, "fence3", "key3", "44", 0, 0);
create_ready_commit (cm, "fence4", ".", "45", 0, 0);

ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_ERROR,
"commit_process returns COMMIT_PROCESS_ERROR");

ok (commit_get_errnum (c) == EINVAL,
"commit_get_errnum returns EINVAL");

ok (commit_fallback_mergeable (c) == true,
"commit_fallback_mergeable returns true on merged commit");

commit_mgr_remove_commit (cm, c, true);

/* now the original commits should be back in the ready queue */

/* This should succeed, but shouldn't actually merge anything */
ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_fallback_mergeable (c) == false,
"commit_fallback_mergeable returns false on unmerged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES,
"commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES");

count = 0;
ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0,
"commit_iter_dirty_cache_entries works for dirty cache entries");

ok (count == 1,
"correct number of cache entries were dirty");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_FINISHED,
"commit_process returns COMMIT_PROCESS_FINISHED");

ok ((newroot = commit_get_newroot_ref (c)) != NULL,
"commit_get_newroot_ref returns != NULL when processing complete");

verify_value (cache, newroot, "key3", "44");

commit_mgr_remove_commit (cm, c, false);

memcpy (rootref, newroot, sizeof (blobref_t));

/* now we try and commit the next fence, which should be the bad one */

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_fallback_mergeable (c) == false,
"commit_fallback_mergeable returns false on unmerged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_ERROR,
"commit_process returns COMMIT_PROCESS_ERROR");

ok (commit_get_errnum (c) == EINVAL,
"commit_get_errnum returns EINVAL");

commit_mgr_remove_commit (cm, c, false);

/* now make sure the ready queue is back to empty */

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");

commit_mgr_destroy (cm);
cache_destroy (cache);
Expand Down Expand Up @@ -2363,6 +2511,7 @@ int main (int argc, char *argv[])
commit_process_giant_dir ();
commit_process_append ();
commit_process_append_errors ();
commit_process_fallback_merge ();

done_testing ();
return (0);
Expand Down

0 comments on commit 3fb1fd7

Please sign in to comment.