From 3ff8b3597d9fa13788f5ac18d5f9afdab8133559 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Feb 2021 01:27:37 -0500 Subject: [PATCH] kv/kvserver: allow the merge transaction to be pushed Historically we have not allowed the merge transaction to be pushed. The reason we disabled this was because of the hazard due to attempting to refresh reads on the RHS of the merge after the SubsumeRequest has been sent. The `SubsumeRequest` effectively freezes the RHS until the merge commits or aborts. In order to side-step this hazard, this change ensures that nothing should prevent the merge transaction from either committing or aborting. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 69 +++++++++++++--------------- pkg/kv/kvserver/replica_command.go | 24 ++++++---- 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index a80926110b14..488048d39bed 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) { // transaction's only intent so far is on P's local range descriptor, and so the // split transaction can happily commit. // -// The merge transaction then continues, writing an intent on Q's local -// descriptor. Since the merge transaction is executing at an earlier timestamp -// than the split transaction, the intent is written "under" the updated -// descriptor written by the split transaction. -// -// In the past, the merge transaction would simply push its commit timestamp -// forward and proceed, even though, upon committing, it would discover that it -// was forbidden from committing with a pushed timestamp and abort instead. (For -// why merge transactions cannot forward their commit timestamps, see the -// discussion on the retry loop within AdminMerge.) This was problematic. Before -// the doomed merge transaction attempted to commit, it would send a Subsume -// request, launching a merge watcher goroutine on Q. This watcher goroutine -// could incorrectly think that the merge transaction committed. Why? To -// determine whether a merge has truly aborted, the watcher goroutine sends a -// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the -// Get request returns either nil or a descriptor for a different range, the -// merge is assumed to have committed. In this case, unfortunately, QEndKey is -// the Q's end key post-split. After all, the split has committed and updated -// Q's in-memory descriptor. The split transactions intents are cleaned up -// asynchronously, however, and since the watcher goroutine is not performing a -// consistent read it will not wait for the intents to be cleaned up. So -// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine -// will incorrectly infer that the merge committed. (Note that the watcher -// goroutine can't perform a consistent read, as that would look up the -// transaction record on Q and deadlock, since Q is blocked for merging.) -// -// The bug was fixed by updating Q's local descriptor with a conditional put -// instead of a put. This forces the merge transaction to fail early if writing -// the intent would require forwarding the commit timestamp. In other words, -// this ensures that the merge watcher goroutine is never launched if the RHS -// local descriptor is updated while the merge transaction is executing. +// The merge transaction then continues, reading and writing an intent on Q's +// local descriptor. The locking nature of the read request to Q's local +// descriptor ensures that the merge transaction will observe the post-split +// value for Q. func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1280,8 +1253,11 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { var distSender *kvcoord.DistSender var lhsDescKey atomic.Value + var lhsStartKey atomic.Value var launchSplit int64 - var mergeRetries int64 + var mergePreSplit atomic.Value + var splitCommit atomic.Value + var mergeEndTxnTimestamp atomic.Value testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, req := range ba.Requests { if get := req.GetGet(); get != nil && get.KeyLocking != lock.None { @@ -1289,11 +1265,22 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { // If this is the first merge attempt, launch the split // before the merge's first locking read succeeds. if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) { + mergePreSplit.Store(ba.Txn.ReadTimestamp) _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c"))) return pErr } - // Otherwise, record that the merge retried and proceed. - atomic.AddInt64(&mergeRetries, 1) + // Otherwise, proceed. + } + } + if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) { + splitCommit.Store(ba.Timestamp) + } + if endTxn := req.GetEndTxn(); endTxn != nil { + ct := endTxn.InternalCommitTrigger + startKey, _ := lhsStartKey.Load().(roachpb.RKey) + if ct != nil && ct.MergeTrigger != nil && startKey != nil && + startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) { + mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp) } } } @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey)) atomic.StoreInt64(&launchSplit, 1) + lhsStartKey.Store(lhsDesc.StartKey) mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil { + _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs) + if pErr != nil { t.Fatal(pErr) } - if atomic.LoadInt64(&mergeRetries) == 0 { - t.Fatal("expected merge to retry at least once due to concurrent split") + mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp) + splitTS := splitCommit.Load().(hlc.Timestamp) + mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp) + if splitTS.LessEq(mergePreSplitTS) { + t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS) + } + if mergePostSplitTS.LessEq(splitTS) { + t.Fatalf("expected merge to finish after concurrent split, %v <= %v", mergePostSplitTS, splitTS) } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b3e2870fd6f0..053186a5faec 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge( log.Event(ctx, "merge txn begins") txn.SetDebugName(mergeTxnName) - // Observe the commit timestamp to force a client-side retry. See the - // comment on the retry loop after this closure for details. - // - // TODO(benesch): expose a proper API for preventing the fast path. - _ = txn.CommitTimestamp() - // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge( return err } + // Refresh the transaction so that the transaction won't try to refresh + // its reads on the RHS after it is frozen. + if err := txn.ManualRefresh(ctx); err != nil { + return err + } + + // Freeze the commit timestamp of the transaction to prevent future pushes + // due to high-priority reads from other transactions. Any attempt to + // refresh reads on the RHS would result in a stalled merge because the + // RHS will be frozen after the Subsume is sent. + _ = txn.CommitTimestamp() + // Intents have been placed, so the merge is now in its critical phase. Get // a consistent view of the data from the right-hand range. If the merge // commits, we'll write this data to the left-hand range in the merge @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge( // we'll unlock the right-hand range, giving the next, fresh transaction a // chance to succeed. // - // Note that client.DB.Txn performs retries using the same transaction, so we - // have to use our own retry loop. + // A second reason to eschew kv.DB.Txn() is that the API to disable pipelining + // is finicky and only allows disabling pipelining before any operations have + // been sent, even in prior epochs. Calling DisablePipelining() on a restarted + // transaction yields an error. for { txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID()) err := runMergeTxn(txn)