Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: support ability to "revert" / "back out" merged commits #1346

Merged
merged 8 commits into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ static void kvstxn_apply (kvstxn_t *kt)
wait_t *wait = NULL;
int errnum = 0;
kvstxn_process_t ret;
bool fallback = false;

namespace = kvstxn_get_namespace (kt);
assert (namespace);
Expand Down Expand Up @@ -986,17 +987,28 @@ static void kvstxn_apply (kvstxn_t *kt)
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names);
} else {
flux_log (ctx->h, LOG_ERR, "transaction failed: %s",
flux_strerror (errnum));
error_event_send (ctx, root->namespace, kvstxn_get_names (kt),
errnum);
fallback = kvstxn_fallback_mergeable (kt);

flux_log (ctx->h, LOG_ERR, "kvstxn failed: %s%s",
flux_strerror (errnum),
fallback ? " (is fallbackable)" : "");

/* if merged transaction is fallbackable, ignore the fallback option
* if it's an extreme "death" like error.
*/
if (errnum == ENOMEM || errnum == ENOTSUP)
fallback = false;

if (!fallback)
error_event_send (ctx, root->namespace, kvstxn_get_names (kt),
errnum);
}
wait_destroy (wait);

/* Completed: remove from 'ready' list.
* N.B. treq_t remains in the treq_mgr_t hash until event is received.
*/
kvstxn_mgr_remove_transaction (root->ktm, kt);
kvstxn_mgr_remove_transaction (root->ktm, kt, fallback);
return;

stall:
Expand Down Expand Up @@ -1046,6 +1058,12 @@ static int kvstxn_check_root_cb (struct kvsroot *root, void *arg)
*/
if (kvstxn_mgr_merge_ready_transactions (root->ktm) < 0)
kvstxn_set_aux_errnum (kt, errno);
else {
/* grab new head ready commit, if above succeeds, this
* must succeed */
kt = kvstxn_mgr_get_ready_transaction (root->ktm);
assert (kt);
}
}

/* It does not matter if root has been marked for removal,
Expand Down
187 changes: 131 additions & 56 deletions src/modules/kvs/kvstxn.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
#include "kvstxn.h"
#include "kvs_util.h"

#define KVSTXN_PROCESSING 0x01
#define KVSTXN_MERGED 0x02 /* kvstxn is a merger of transactions */
#define KVSTXN_MERGE_COMPONENT 0x04 /* kvstxn is member of a merger */

struct kvstxn_mgr {
struct cache *cache;
const char *namespace;
Expand All @@ -64,6 +68,7 @@ struct kvstxn {
blobref_t newroot;
zlist_t *missing_refs_list;
zlist_t *dirty_cache_entries_list;
int internal_flags;
kvstxn_mgr_t *ktm;
enum {
KVSTXN_STATE_INIT = 1,
Expand Down Expand Up @@ -155,6 +160,13 @@ int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum)
return kt->aux_errnum;
}

bool kvstxn_fallback_mergeable (kvstxn_t *kt)
{
if (kt->internal_flags & KVSTXN_MERGED)
return true;
return false;
}

json_t *kvstxn_get_ops (kvstxn_t *kt)
{
return kt->ops;
Expand Down Expand Up @@ -717,6 +729,11 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt,
if (kt->errnum)
return KVSTXN_PROCESS_ERROR;

if (!(kt->internal_flags & KVSTXN_PROCESSING)) {
kt->errnum = EINVAL;
return KVSTXN_PROCESS_ERROR;
}

switch (kt->state) {
case KVSTXN_STATE_INIT:
case KVSTXN_STATE_LOAD_ROOT:
Expand Down Expand Up @@ -1022,14 +1039,39 @@ bool kvstxn_mgr_transaction_ready (kvstxn_mgr_t *ktm)

kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm)
{
if (kvstxn_mgr_transaction_ready (ktm))
return zlist_first (ktm->ready);
if (kvstxn_mgr_transaction_ready (ktm)) {
kvstxn_t *kt = zlist_first (ktm->ready);
kt->internal_flags |= KVSTXN_PROCESSING;
return kt;
}
return NULL;
}

void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt)
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt,
bool fallback)
{
zlist_remove (ktm->ready, kt);
if (kt->internal_flags & KVSTXN_PROCESSING) {
bool kvstxn_is_merged = false;

if (kt->internal_flags & KVSTXN_MERGED)
kvstxn_is_merged = true;

zlist_remove (ktm->ready, kt);

if (kvstxn_is_merged) {
kvstxn_t *kt_tmp = zlist_first (ktm->ready);
while (kt_tmp && (kt_tmp->internal_flags & KVSTXN_MERGE_COMPONENT)) {
if (fallback) {
kt_tmp->internal_flags &= ~KVSTXN_MERGE_COMPONENT;
kt_tmp->flags |= FLUX_KVS_NO_MERGE;
}
else
zlist_remove (ktm->ready, kt_tmp);

kt_tmp = zlist_next (ktm->ready);
}
}
}
}

int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm)
Expand All @@ -1049,66 +1091,69 @@ int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm)

static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src)
{
json_t *names = NULL;
json_t *ops = NULL;
int i, len, saved_errno;
int i, len;

if ((dest->flags & FLUX_KVS_NO_MERGE) || (src->flags & FLUX_KVS_NO_MERGE))
if (src->flags & FLUX_KVS_NO_MERGE
|| dest->flags != src->flags)
return 0;

if ((len = json_array_size (src->names))) {
if (!(names = json_copy (dest->names))) {
saved_errno = ENOMEM;
goto error;
}
for (i = 0; i < len; i++) {
json_t *name;
if ((name = json_array_get (src->names, i))) {
if (json_array_append (names, name) < 0) {
saved_errno = ENOMEM;
goto error;
if (json_array_append (dest->names, name) < 0) {
errno = ENOMEM;
return -1;
}
}
}
}
if ((len = json_array_size (src->ops))) {
if (!(ops = json_copy (dest->ops))) {
saved_errno = ENOMEM;
goto error;
}
for (i = 0; i < len; i++) {
json_t *op;
if ((op = json_array_get (src->ops, i))) {
if (json_array_append (ops, op) < 0) {
saved_errno = ENOMEM;
goto error;
if (json_array_append (dest->ops, op) < 0) {
errno = ENOMEM;
return -1;
}
}
}
}

if (names) {
json_decref (dest->names);
dest->names = names;
}
if (ops) {
json_decref (dest->ops);
dest->ops = ops;
}
return 1;
}

error:
json_decref (names);
json_decref (ops);
errno = saved_errno;
return -1;
static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm, int flags)
{
kvstxn_t *ktnew;

if (!(ktnew = calloc (1, sizeof (*ktnew))))
goto error_enomem;
if (!(ktnew->ops = json_array ()))
goto error_enomem;
if (!(ktnew->names = json_array ()))
goto error_enomem;
if (!(ktnew->missing_refs_list = zlist_new ()))
goto error_enomem;
if (!(ktnew->dirty_cache_entries_list = zlist_new ()))
goto error_enomem;
ktnew->flags = flags;
ktnew->ktm = ktm;
ktnew->state = KVSTXN_STATE_INIT;
return ktnew;

error_enomem:
kvstxn_destroy (ktnew);
errno = ENOMEM;
return NULL;
}

/* Merge ready transactions that are mergeable, where merging consists
* of popping the "donor" transaction off the ready list, and
* appending its ops to the top transaction. The top transaction can
* be appended to if it hasn't started, or is still building the
* rootcpy, e.g. stalled walking the namespace.
* creating a new kvstxn_t, and merging the other transactions in the
* ready queue and appending their ops/names to the new transaction.
* After merging, push the new kvstxn_t onto the head of the ready
* queue. Merging can occur if the top transaction hasn't started, or
* is still building the rootcpy, e.g. stalled walking the namespace.
*
* Break when an unmergeable transaction is discovered. We do not
* wish to merge non-adjacent transactions, as it can create
Expand All @@ -1124,30 +1169,60 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src)

int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm)
{
kvstxn_t *kt = zlist_first (ktm->ready);
kvstxn_t *first, *second, *new;
kvstxn_t *nextkt;
int count = 0;

/* transaction must still be in state where merged in ops can be
* applied */
if (kt
&& kt->errnum == 0
&& kt->state <= KVSTXN_STATE_APPLY_OPS
&& !(kt->flags & FLUX_KVS_NO_MERGE)) {
kvstxn_t *nkt;
while ((nkt = zlist_next (ktm->ready))) {
int ret;
first = zlist_first (ktm->ready);
if (!first
|| first->errnum != 0
|| first->aux_errnum != 0
|| first->state > KVSTXN_STATE_APPLY_OPS
|| (first->flags & FLUX_KVS_NO_MERGE)
|| first->internal_flags & KVSTXN_MERGED)
return 0;

if ((ret = kvstxn_merge (kt, nkt)) < 0)
return -1;
second = zlist_next (ktm->ready);
if (!second
|| (second->flags & FLUX_KVS_NO_MERGE)
|| (first->flags != second->flags))
return 0;

/* if return == 0, we've merged as many as we currently
* can */
if (!ret)
break;
if (!(new = kvstxn_create_empty (ktm, first->flags)))
return -1;
new->internal_flags |= KVSTXN_MERGED;

nextkt = zlist_first (ktm->ready);
do {
int ret;

/* Merged kvstxn, remove off ready list */
zlist_remove (ktm->ready, nkt);
if ((ret = kvstxn_merge (new, nextkt)) < 0) {
kvstxn_destroy (new);
return -1;
}
}

if (!ret)
break;

count++;
} while ((nextkt = zlist_next (ktm->ready)));

/* if count is zero, checks at beginning of function are invalid */
assert (count);

nextkt = zlist_first (ktm->ready);
do {
/* Wipe out KVSTXN_PROCESSING flag if user previously got
* the kvstxn_t
*/
nextkt->internal_flags &= ~KVSTXN_PROCESSING;
nextkt->internal_flags |= KVSTXN_MERGE_COMPONENT;
} while (--count && (nextkt = zlist_next (ktm->ready)));

zlist_push (ktm->ready, new);
zlist_freefn (ktm->ready, new, (zlist_free_fn *)kvstxn_destroy, false);
return 0;
}

Expand Down
45 changes: 39 additions & 6 deletions src/modules/kvs/kvstxn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ int kvstxn_get_errnum (kvstxn_t *kt);
int kvstxn_get_aux_errnum (kvstxn_t *kt);
int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum);

/* Returns true if a kvstxn was merged and the user can fallback to
* the original transactions that it was made up of. This function
* should be used when a merged kvstxn has failed. Instead of failing
* all transactions in this merged kvstxn, the kvstxn manager can be
* told to fallback to the original transactions via a flag in
* kvstxn_mgr_remove_transaction(). By falling back to the original
* transactions, each can be played one by one and only the specific
* failing transaction can be sent an error. See
* kvstxn_mgr_remove_kvstxn() below for more details.
*/
bool kvstxn_fallback_mergeable (kvstxn_t *kt);

json_t *kvstxn_get_ops (kvstxn_t *kt);
json_t *kvstxn_get_names (kvstxn_t *kt);
int kvstxn_get_flags (kvstxn_t *kt);
Expand Down Expand Up @@ -133,8 +145,22 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm);

/* remove a transaction from the kvstxn manager after it is done
* processing
*
* If the kvstxn was merged, and the caller would like to fallback to
* the original individual transactions (so they can be retried
* individually), set `fallback` to true. This will put the original
* transactions back on the ready queue, but will make it so they
* cannot be merged in the future (e.g. setting FLUX_KVS_NO_MERGE on
* them).
*
* Be careful with the 'fallback' option. If a transaction was
* successful, you can still fallback the merged kvstxn into its
* individual components. 'fallback' should only be set when you get
* an error (i.e. you don't use kvstxn_get_newroot_ref to get a new
* root).
*/
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt);
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt,
bool fallback);

int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm);
void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm);
Expand All @@ -143,11 +169,18 @@ void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm);
int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm);

/* In internally stored ready transactions (moved to ready status via
* kvstxn_mgr_process_transaction_request()), merge them if they are
* capable of being merged. Returns -1 on error, 0 on success. On
* error, it is possible that the ready transaction has been modified
* with different transaction names and operations. The caller is
* responsible for sending errors to all appropriately.
* kvstxn_mgr_add_transaction()), merge them into a new ready transaction
* if they are capable of being merged.
*
* Callers should be cautioned to re-call
* kvstxn_mgr_get_ready_transaction() for the new head commit as the
* prior one has been removed.
*
* A merged kvstxn can be backed out if an error occurs. See
* kvstxn_fallback_mergeable() and kvstxn_mgr_remove_transaction()
* above.
*
* Returns -1 on error, 0 on success.
*/
int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm);

Expand Down
Loading