Skip to content

Commit

Permalink
modules/kvs: Support ability to revert merged commits
Browse files Browse the repository at this point in the history
In commit_mgr_remove_commit() support flag for user to revert
a merged commit if the option is available.  By doing so, the
user need not send an error to all fences merged into the current
merged commit.  The revert can play all the original commits
back on the ready queue.  They can instead be played back one
by one, and the failing fences can be responded to individually.

Support commit_revert_mergeable() so user knows if a commit is
revertable.

In core KVS file, take advantage of this by not sending an error
when a commit's merging can be reverted.

Add internal commit API unit tests.

Fixes #1337
  • Loading branch information
chu11 committed Feb 22, 2018
1 parent be0f53a commit f680b56
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 21 deletions.
16 changes: 14 additions & 2 deletions src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ int commit_set_aux_errnum (commit_t *c, int errnum)
return c->aux_errnum;
}

bool commit_revert_mergeable (commit_t *c)
{
if (c->internal_flags & COMMIT_MERGED)
return true;
return false;
}

json_t *commit_get_ops (commit_t *c)
{
return c->ops;
Expand Down Expand Up @@ -1126,7 +1133,7 @@ commit_t *commit_mgr_get_ready_commit (commit_mgr_t *cm)
return NULL;
}

void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c)
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c, bool revert)
{
if (c->internal_flags & COMMIT_PROCESSING) {
bool commit_is_merged = false;
Expand All @@ -1139,7 +1146,12 @@ void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c)
if (commit_is_merged) {
commit_t *ctmp = zlist_first (cm->ready);
while (ctmp && (ctmp->internal_flags & COMMIT_MERGE_COMPONENT)) {
zlist_remove (cm->ready, ctmp);
if (revert) {
ctmp->internal_flags &= ~COMMIT_MERGE_COMPONENT;
ctmp->flags |= FLUX_KVS_NO_MERGE;
}
else
zlist_remove (cm->ready, ctmp);
ctmp = zlist_next (cm->ready);
}
}
Expand Down
23 changes: 22 additions & 1 deletion src/modules/kvs/commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ int commit_get_errnum (commit_t *c);
int commit_get_aux_errnum (commit_t *c);
int commit_set_aux_errnum (commit_t *c, int errnum);

/* Returns true if a commit was merged and it can be reverted. This
* function should be used when a merged set of commits has failed.
* Instead of failing all fences in this merged commit, the commit
* manager can be told to revert the merge. Then each ready commit
* can be played one by one and only the specific fence can be sent an
* error. See commit_mgr_remove_commit() below for more details.
*/
bool commit_revert_mergeable (commit_t *c);

json_t *commit_get_ops (commit_t *c);
json_t *commit_get_names (commit_t *c);
int commit_get_flags (commit_t *c);
Expand Down Expand Up @@ -145,8 +154,17 @@ bool commit_mgr_commits_ready (commit_mgr_t *cm);
commit_t *commit_mgr_get_ready_commit (commit_mgr_t *cm);

/* remove a commit from the commit manager after it is done processing
*
* If the commit was merged, and the caller would like the merge to be
* reverted, set `revert` to true. This will put the original commits
* on the ready queue, but will make it so they cannot be merged in
* the future (e.g. FLUX_KVS_NO_MERGE) will be set on them.
*
* Be careful with the 'revert' option. If a commit was successful,
* you can still revert the merge into its individual components.
* 'revert' should only be set on an error path.
*/
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c);
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c, bool revert);

/* remove a fence from the commit manager */
int commit_mgr_remove_fence (commit_mgr_t *cm, const char *name);
Expand All @@ -168,6 +186,9 @@ int commit_mgr_ready_commit_count (commit_mgr_t *cm);
* commit_mgr_get_ready_commit() for the present head commit as the
* prior one of been removed.
*
* A merged commit can be backed out if an error occurs. See
* commit_unmergeable() and commit_mgr_remove_commit() above.
*
* Returns -1 on error, 0 on success.
*/
int commit_mgr_merge_ready_commits (commit_mgr_t *cm);
Expand Down
22 changes: 17 additions & 5 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ static void commit_apply (commit_t *c)
wait_t *wait = NULL;
int errnum = 0;
commit_process_t ret;
bool revert = false;

namespace = commit_get_namespace (c);
assert (namespace);
Expand Down Expand Up @@ -985,17 +986,28 @@ static void commit_apply (commit_t *c)
setroot (ctx, root, commit_get_newroot_ref (c), root->seq + 1);
setroot_event_send (ctx, root, names);
} else {
flux_log (ctx->h, LOG_ERR, "commit failed: %s",
flux_strerror (errnum));
error_event_send (ctx, root->namespace, commit_get_names (c),
errnum);
revert = commit_revert_mergeable (c);

flux_log (ctx->h, LOG_ERR, "commit failed: %s%s",
flux_strerror (errnum),
revert ? " (is revertable)" : "");

/* even if merged commit is revertable, we don't care if it's
* a death-like error
*/
if (errnum == ENOMEM || errnum == ENOTSUP)
revert = false;

if (!revert)
error_event_send (ctx, root->namespace, commit_get_names (c),
errnum);
}
wait_destroy (wait);

/* Completed: remove from 'ready' list.
* N.B. fence_t remains in the fences hash until event is received.
*/
commit_mgr_remove_commit (root->cm, c);
commit_mgr_remove_commit (root->cm, c, revert);
return;

stall:
Expand Down
175 changes: 162 additions & 13 deletions src/modules/kvs/test/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void commit_mgr_basic_tests (void)
ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns != NULL for ready commits");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok (commit_mgr_commits_ready (cm) == false,
"commit_mgr_commits_ready says no fences are ready");
Expand Down Expand Up @@ -308,7 +308,7 @@ void clear_ready_commits (commit_mgr_t *cm)
commit_t *c;

while ((c = commit_mgr_get_ready_commit (cm)))
commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);
}

void commit_mgr_merge_tests (void)
Expand Down Expand Up @@ -600,7 +600,7 @@ void commit_basic_commit_process_test (void)

verify_value (cache, newroot, "key1", "1");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -651,7 +651,7 @@ void commit_basic_commit_process_test_multiple_fences (void)
strcpy (rootref, newroot);

/* get rid of the this commit, we're done */
commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");
Expand All @@ -677,7 +677,7 @@ void commit_basic_commit_process_test_multiple_fences (void)
verify_value (cache, newroot, "key1", "1");
verify_value (cache, newroot, "dir.key2", "2");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -736,7 +736,7 @@ void commit_basic_commit_process_test_multiple_fences_merge (void)
verify_value (cache, newroot, "foo.key1", "1");
verify_value (cache, newroot, "bar.key2", "2");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -1954,7 +1954,7 @@ void commit_process_big_fileval (void)

verify_value (cache, newroot, "val", "smallstr");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/* next commit a big value, to make sure it is not json in the
* cache */
Expand Down Expand Up @@ -2110,7 +2110,7 @@ void commit_process_giant_dir (void)
verify_value (cache, newroot, "dir.val0090", "bar");
verify_value (cache, newroot, "dir.val00D0", NULL);

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");
Expand Down Expand Up @@ -2191,7 +2191,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "val", "abcdefgh");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* second test, append to a treeobj valref
Expand Down Expand Up @@ -2222,7 +2222,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "valref", "ABCDEFGH");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* third test, append to a non-existent value, it's like an insert
Expand Down Expand Up @@ -2252,7 +2252,7 @@ void commit_process_append (void)

verify_value (cache, newroot, "newval", "foobar");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

commit_mgr_destroy (cm);
cache_destroy (cache);
Expand Down Expand Up @@ -2307,7 +2307,7 @@ void commit_process_append_errors (void)
ok (commit_get_errnum (c) == EISDIR,
"commit_get_errnum return EISDIR");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

/*
* append to a symlink, should get EOPNOTSUPP
Expand All @@ -2324,7 +2324,155 @@ void commit_process_append_errors (void)
ok (commit_get_errnum (c) == EOPNOTSUPP,
"commit_get_errnum return EOPNOTSUPP");

commit_mgr_remove_commit (cm, c);
commit_mgr_remove_commit (cm, c, false);

commit_mgr_destroy (cm);
cache_destroy (cache);
}

void commit_process_revert_merge (void)
{
struct cache *cache;
int count = 0;
commit_mgr_t *cm;
commit_t *c;
blobref_t rootref;
const char *newroot;

cache = create_cache_with_empty_rootdir (rootref);

ok ((cm = commit_mgr_create (cache,
KVS_PRIMARY_NAMESPACE,
"sha1",
NULL,
&test_global)) != NULL,
"commit_mgr_create works");

/*
* This makes sure the basic "merge" works as we expect
*/

create_ready_commit (cm, "fence1", "key1", "42", 0, 0);
create_ready_commit (cm, "fence2", "key2", "43", 0, 0);

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_revert_mergeable (c) == false,
"commit_revert_mergeable returns false on unmerged commit");

ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_revert_mergeable (c) == true,
"commit_revert_mergeable returns true on merged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES,
"commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES");

ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0,
"commit_iter_dirty_cache_entries works for dirty cache entries");

ok (count == 1,
"correct number of cache entries were dirty");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_FINISHED,
"commit_process returns COMMIT_PROCESS_FINISHED");

ok ((newroot = commit_get_newroot_ref (c)) != NULL,
"commit_get_newroot_ref returns != NULL when processing complete");

verify_value (cache, newroot, "key1", "42");
verify_value (cache, newroot, "key2", "43");

commit_mgr_remove_commit (cm, c, false);

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");

memcpy (rootref, newroot, sizeof (blobref_t));

/*
* Now we create an error in a merge by writing to "."
*/

create_ready_commit (cm, "fence3", "key3", "44", 0, 0);
create_ready_commit (cm, "fence4", ".", "45", 0, 0);

ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_ERROR,
"commit_process returns COMMIT_PROCESS_ERROR");

ok (commit_get_errnum (c) == EINVAL,
"commit_get_errnum returns EINVAL");

ok (commit_revert_mergeable (c) == true,
"commit_revert_mergeable returns true on merged commit");

commit_mgr_remove_commit (cm, c, true);

/* now the original commits should be back in the ready queue */

/* This should succeed, but shouldn't actually merge anything */
ok (commit_mgr_merge_ready_commits (cm) == 0,
"commit_mgr_merge_ready_commits works");

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_revert_mergeable (c) == false,
"commit_revert_mergeable returns false on unmerged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES,
"commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES");

count = 0;
ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0,
"commit_iter_dirty_cache_entries works for dirty cache entries");

ok (count == 1,
"correct number of cache entries were dirty");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_FINISHED,
"commit_process returns COMMIT_PROCESS_FINISHED");

ok ((newroot = commit_get_newroot_ref (c)) != NULL,
"commit_get_newroot_ref returns != NULL when processing complete");

verify_value (cache, newroot, "key3", "44");

commit_mgr_remove_commit (cm, c, false);

memcpy (rootref, newroot, sizeof (blobref_t));

/* now we try and commit the next fence, which should be the bad one */

ok ((c = commit_mgr_get_ready_commit (cm)) != NULL,
"commit_mgr_get_ready_commit returns ready commit");

ok (commit_revert_mergeable (c) == false,
"commit_revert_mergeable returns false on unmerged commit");

ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_ERROR,
"commit_process returns COMMIT_PROCESS_ERROR");

ok (commit_get_errnum (c) == EINVAL,
"commit_get_errnum returns EINVAL");

commit_mgr_remove_commit (cm, c, false);

/* now make sure the ready queue is back to empty */

ok ((c = commit_mgr_get_ready_commit (cm)) == NULL,
"commit_mgr_get_ready_commit returns NULL, no more commits");

commit_mgr_destroy (cm);
cache_destroy (cache);
Expand Down Expand Up @@ -2363,6 +2511,7 @@ int main (int argc, char *argv[])
commit_process_giant_dir ();
commit_process_append ();
commit_process_append_errors ();
commit_process_revert_merge ();

done_testing ();
return (0);
Expand Down

0 comments on commit f680b56

Please sign in to comment.