From 90df1f1b265ec72f9d23c33f6bd21e0cc9992f76 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 30 Aug 2018 02:23:20 -0400 Subject: [PATCH] storage: fix a nasty merge deadlock Fix a nasty edge case which could cause a concurrent merge and split to deadlock. See the comment on TestStoreRangeMergeConcurrentSplit for details. Release note: None --- pkg/storage/client_merge_test.go | 73 ++++++++++++++++++++++++++++++++ pkg/storage/replica.go | 65 ++++++++++++++++------------ 2 files changed, 112 insertions(+), 26 deletions(-) diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index d535cc0701da..2eccedb96f23 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -980,6 +980,79 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) { }) } +// TestStoreRangeMergeConcurrentSplit (occasionally) reproduces a race where a +// concurrent merge and split could deadlock. +// +// The bug works like this. A merge of adjacent ranges P and Q and a split of Q +// execute concurrently, though the merge executes with an earlier timestamp. +// The merge updates Q's meta2 range descriptor. The split updates Q's local +// range descriptor, then tries to update Q's meta2 range descriptor, but runs +// into the merge's intent and attempts to push the merge. Under our current +// concurrency control strategy, this results in the split waiting for the merge +// to complete. The merge then tries to update Q's local range descriptor but +// runs into the split's intent. While pushing the split, the merge realizes +// that waiting for the split to complete would cause deadlock, so it aborts the +// split instead. +// +// But before the split can clean up its transaction record and intents, the +// merge locks Q and launches a goroutine to unlock Q when the merge commits. +// Then the merge completes, which has a weird side effect: the split's push of +// the merge will succeed! How is this possible? The split's push request is not +// guaranteed to notice that the split has been aborted before it notices that +// the merge has completed. So the aborted split winds up resolving the merge's +// intent on Q's meta2 range descriptor and leaving its own intent in its place. +// +// In the past, the merge watcher goroutine would perform a range lookup for Q; +// this would indirectly wait for the merge to complete by waiting for its +// intent in meta2 to be resolved. In this case, however, its the *split*'s +// intent that the watcher goroutine sees. This intent can't be resolved because +// the split's transaction record is located on the locked range Q! And so Q can +// never be unlocked. +// +// This bug was fixed by teaching the watcher goroutine to push the merge +// transaction directly instead of doing so indirectly by querying meta2. +// +// Attempting a foolproof reproduction of the bug proved challenging and would +// have required a mess of store filters. This test takes a simpler approach of +// running the necessary split and a merge concurrently and allowing the race +// scheduler to occasionally strike the right interleaving. At the time of +// writing, the test would reliably reproduce the bug in about 50 runs (about +// ten seconds of stress on an eight core laptop). +func TestStoreRangeMergeConcurrentSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + storeCfg := storage.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + mtc := &multiTestContext{storeConfig: &storeCfg} + mtc.Start(t, 1) + defer mtc.Stop() + distSender := mtc.distSenders[0] + + lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.Store(0)) + if err != nil { + t.Fatal(err) + } + + splitErrCh := make(chan error) + go func() { + time.Sleep(10 * time.Millisecond) + splitArgs := adminSplitArgs(rhsDesc.StartKey.AsRawKey().Next()) + _, pErr := client.SendWrapped(ctx, distSender, splitArgs) + splitErrCh <- pErr.GoError() + }() + + mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := client.SendWrapped(ctx, distSender, mergeArgs) + if pErr != nil && !testutils.IsPError(pErr, "range changed during merge") { + t.Fatal(pErr) + } + + if err := <-splitErrCh; err != nil { + t.Fatal(err) + } +} + // TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range // in a merge loses its lease while a merge is in progress, the new leaseholder // does not incorrectly serve traffic before the merge completes. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index d998c6557760..bb4d7eaa2109 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -27,7 +27,6 @@ import ( "time" "unsafe" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/google/btree" @@ -51,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rangefeed" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -61,6 +61,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -2824,33 +2825,45 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { taskCtx := r.AnnotateCtx(context.Background()) err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) { - rs, _, err := client.RangeLookup(ctx, r.DB().NonTransactionalSender(), desc.StartKey.AsRawKey(), - roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */) - if err != nil { - select { - case <-r.store.stopper.ShouldQuiesce(): - // The server is shutting down. The error while fetching the range - // descriptor was probably caused by the shutdown, so ignore it. - return - default: - // Otherwise, this replica is good and truly hosed because we couldn't - // determine its true range descriptor. - // - // TODO(benesch): a retry loop would be better than fataling, but we - // want to smoke out any unexpected errors at first. - log.Fatal(ctx, err) + for retry := retry.Start(base.DefaultRetryOptions()); retry.Next(); { + // Wait for the merge transaction to complete by attempting to push it. We + // don't want to accidentally abort the merge transaction, so we use the + // minimum transaction priority. Note that a push type of + // roachpb.PUSH_TOUCH, though it might appear more semantically correct, + // returns immediately and causes us to spin hot, whereas + // roachpb.PUSH_ABORT efficiently blocks until the transaction completes. + _, pErr := client.SendWrapped(ctx, r.DB().NonTransactionalSender(), &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{Key: intents[0].Txn.Key}, + PusherTxn: roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{Priority: roachpb.MinTxnPriority}, + }, + PusheeTxn: intents[0].Txn, + Now: r.Clock().Now(), + PushType: roachpb.PUSH_ABORT, + }) + if pErr != nil { + select { + case <-r.store.stopper.ShouldQuiesce(): + // The server is shutting down. The error while fetching the range + // descriptor was probably caused by the shutdown, so ignore it. + return + default: + log.Warningf(ctx, "error while watching for merge to complete: %s", pErr) + // We can't safely unblock traffic until we can prove that the merge + // transaction is committed or aborted. Nothing to do but try again. + continue + } } + // Unblock pending requests. If the merge committed, the requests will + // notice that the replica has been destroyed and return an appropriate + // error. If the merge aborted, the requests will be handled normally. + r.mu.Lock() + r.mu.mergeComplete = nil + close(mergeCompleteCh) + r.mu.Unlock() + return } - if len(rs) != 1 { - log.Fatalf(ctx, "expected 1 range descriptor, got %d", len(rs)) - } - // Unblock pending requests. If the merge committed, the requests will - // notice that the replica has been destroyed and return an appropriate - // error. If the merge aborted, the requests will be handled normally. - r.mu.Lock() - r.mu.mergeComplete = nil - close(mergeCompleteCh) - r.mu.Unlock() + log.Fatal(ctx, "unreachable") }) if err == stop.ErrUnavailable { // We weren't able to launch a goroutine to watch for the merge's completion