Skip to content

Commit

Permalink
storage: fix a nasty merge deadlock
Browse files Browse the repository at this point in the history
Fix a nasty edge case which could cause a concurrent merge and split to
deadlock. See the comment on TestStoreRangeMergeConcurrentSplit for
details.

Release note: None
  • Loading branch information
benesch committed Sep 1, 2018
1 parent 9d69acb commit 90df1f1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 26 deletions.
73 changes: 73 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,79 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) {
})
}

// TestStoreRangeMergeConcurrentSplit (occasionally) reproduces a race where a
// concurrent merge and split could deadlock.
//
// The bug works like this. A merge of adjacent ranges P and Q and a split of Q
// execute concurrently, though the merge executes with an earlier timestamp.
// The merge updates Q's meta2 range descriptor. The split updates Q's local
// range descriptor, then tries to update Q's meta2 range descriptor, but runs
// into the merge's intent and attempts to push the merge. Under our current
// concurrency control strategy, this results in the split waiting for the merge
// to complete. The merge then tries to update Q's local range descriptor but
// runs into the split's intent. While pushing the split, the merge realizes
// that waiting for the split to complete would cause deadlock, so it aborts the
// split instead.
//
// But before the split can clean up its transaction record and intents, the
// merge locks Q and launches a goroutine to unlock Q when the merge commits.
// Then the merge completes, which has a weird side effect: the split's push of
// the merge will succeed! How is this possible? The split's push request is not
// guaranteed to notice that the split has been aborted before it notices that
// the merge has completed. So the aborted split winds up resolving the merge's
// intent on Q's meta2 range descriptor and leaving its own intent in its place.
//
// In the past, the merge watcher goroutine would perform a range lookup for Q;
// this would indirectly wait for the merge to complete by waiting for its
// intent in meta2 to be resolved. In this case, however, its the *split*'s
// intent that the watcher goroutine sees. This intent can't be resolved because
// the split's transaction record is located on the locked range Q! And so Q can
// never be unlocked.
//
// This bug was fixed by teaching the watcher goroutine to push the merge
// transaction directly instead of doing so indirectly by querying meta2.
//
// Attempting a foolproof reproduction of the bug proved challenging and would
// have required a mess of store filters. This test takes a simpler approach of
// running the necessary split and a merge concurrently and allowing the race
// scheduler to occasionally strike the right interleaving. At the time of
// writing, the test would reliably reproduce the bug in about 50 runs (about
// ten seconds of stress on an eight core laptop).
func TestStoreRangeMergeConcurrentSplit(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 1)
defer mtc.Stop()
distSender := mtc.distSenders[0]

lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.Store(0))
if err != nil {
t.Fatal(err)
}

splitErrCh := make(chan error)
go func() {
time.Sleep(10 * time.Millisecond)
splitArgs := adminSplitArgs(rhsDesc.StartKey.AsRawKey().Next())
_, pErr := client.SendWrapped(ctx, distSender, splitArgs)
splitErrCh <- pErr.GoError()
}()

mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := client.SendWrapped(ctx, distSender, mergeArgs)
if pErr != nil && !testutils.IsPError(pErr, "range changed during merge") {
t.Fatal(pErr)
}

if err := <-splitErrCh; err != nil {
t.Fatal(err)
}
}

// TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range
// in a merge loses its lease while a merge is in progress, the new leaseholder
// does not incorrectly serve traffic before the merge completes.
Expand Down
65 changes: 39 additions & 26 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/google/btree"
Expand All @@ -51,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rangefeed"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
Expand All @@ -61,6 +61,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -2824,33 +2825,45 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {

taskCtx := r.AnnotateCtx(context.Background())
err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) {
rs, _, err := client.RangeLookup(ctx, r.DB().NonTransactionalSender(), desc.StartKey.AsRawKey(),
roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */)
if err != nil {
select {
case <-r.store.stopper.ShouldQuiesce():
// The server is shutting down. The error while fetching the range
// descriptor was probably caused by the shutdown, so ignore it.
return
default:
// Otherwise, this replica is good and truly hosed because we couldn't
// determine its true range descriptor.
//
// TODO(benesch): a retry loop would be better than fataling, but we
// want to smoke out any unexpected errors at first.
log.Fatal(ctx, err)
for retry := retry.Start(base.DefaultRetryOptions()); retry.Next(); {
// Wait for the merge transaction to complete by attempting to push it. We
// don't want to accidentally abort the merge transaction, so we use the
// minimum transaction priority. Note that a push type of
// roachpb.PUSH_TOUCH, though it might appear more semantically correct,
// returns immediately and causes us to spin hot, whereas
// roachpb.PUSH_ABORT efficiently blocks until the transaction completes.
_, pErr := client.SendWrapped(ctx, r.DB().NonTransactionalSender(), &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intents[0].Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MinTxnPriority},
},
PusheeTxn: intents[0].Txn,
Now: r.Clock().Now(),
PushType: roachpb.PUSH_ABORT,
})
if pErr != nil {
select {
case <-r.store.stopper.ShouldQuiesce():
// The server is shutting down. The error while fetching the range
// descriptor was probably caused by the shutdown, so ignore it.
return
default:
log.Warningf(ctx, "error while watching for merge to complete: %s", pErr)
// We can't safely unblock traffic until we can prove that the merge
// transaction is committed or aborted. Nothing to do but try again.
continue
}
}
// Unblock pending requests. If the merge committed, the requests will
// notice that the replica has been destroyed and return an appropriate
// error. If the merge aborted, the requests will be handled normally.
r.mu.Lock()
r.mu.mergeComplete = nil
close(mergeCompleteCh)
r.mu.Unlock()
return
}
if len(rs) != 1 {
log.Fatalf(ctx, "expected 1 range descriptor, got %d", len(rs))
}
// Unblock pending requests. If the merge committed, the requests will
// notice that the replica has been destroyed and return an appropriate
// error. If the merge aborted, the requests will be handled normally.
r.mu.Lock()
r.mu.mergeComplete = nil
close(mergeCompleteCh)
r.mu.Unlock()
log.Fatal(ctx, "unreachable")
})
if err == stop.ErrUnavailable {
// We weren't able to launch a goroutine to watch for the merge's completion
Expand Down

0 comments on commit 90df1f1

Please sign in to comment.