diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index a80926110b14..488048d39bed 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) { // transaction's only intent so far is on P's local range descriptor, and so the // split transaction can happily commit. // -// The merge transaction then continues, writing an intent on Q's local -// descriptor. Since the merge transaction is executing at an earlier timestamp -// than the split transaction, the intent is written "under" the updated -// descriptor written by the split transaction. -// -// In the past, the merge transaction would simply push its commit timestamp -// forward and proceed, even though, upon committing, it would discover that it -// was forbidden from committing with a pushed timestamp and abort instead. (For -// why merge transactions cannot forward their commit timestamps, see the -// discussion on the retry loop within AdminMerge.) This was problematic. Before -// the doomed merge transaction attempted to commit, it would send a Subsume -// request, launching a merge watcher goroutine on Q. This watcher goroutine -// could incorrectly think that the merge transaction committed. Why? To -// determine whether a merge has truly aborted, the watcher goroutine sends a -// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the -// Get request returns either nil or a descriptor for a different range, the -// merge is assumed to have committed. In this case, unfortunately, QEndKey is -// the Q's end key post-split. After all, the split has committed and updated -// Q's in-memory descriptor. The split transactions intents are cleaned up -// asynchronously, however, and since the watcher goroutine is not performing a -// consistent read it will not wait for the intents to be cleaned up. So -// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine -// will incorrectly infer that the merge committed. (Note that the watcher -// goroutine can't perform a consistent read, as that would look up the -// transaction record on Q and deadlock, since Q is blocked for merging.) -// -// The bug was fixed by updating Q's local descriptor with a conditional put -// instead of a put. This forces the merge transaction to fail early if writing -// the intent would require forwarding the commit timestamp. In other words, -// this ensures that the merge watcher goroutine is never launched if the RHS -// local descriptor is updated while the merge transaction is executing. +// The merge transaction then continues, reading and writing an intent on Q's +// local descriptor. The locking nature of the read request to Q's local +// descriptor ensures that the merge transaction will observe the post-split +// value for Q. func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1280,8 +1253,11 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { var distSender *kvcoord.DistSender var lhsDescKey atomic.Value + var lhsStartKey atomic.Value var launchSplit int64 - var mergeRetries int64 + var mergePreSplit atomic.Value + var splitCommit atomic.Value + var mergeEndTxnTimestamp atomic.Value testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, req := range ba.Requests { if get := req.GetGet(); get != nil && get.KeyLocking != lock.None { @@ -1289,11 +1265,22 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { // If this is the first merge attempt, launch the split // before the merge's first locking read succeeds. if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) { + mergePreSplit.Store(ba.Txn.ReadTimestamp) _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c"))) return pErr } - // Otherwise, record that the merge retried and proceed. - atomic.AddInt64(&mergeRetries, 1) + // Otherwise, proceed. + } + } + if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) { + splitCommit.Store(ba.Timestamp) + } + if endTxn := req.GetEndTxn(); endTxn != nil { + ct := endTxn.InternalCommitTrigger + startKey, _ := lhsStartKey.Load().(roachpb.RKey) + if ct != nil && ct.MergeTrigger != nil && startKey != nil && + startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) { + mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp) } } } @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey)) atomic.StoreInt64(&launchSplit, 1) + lhsStartKey.Store(lhsDesc.StartKey) mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil { + _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs) + if pErr != nil { t.Fatal(pErr) } - if atomic.LoadInt64(&mergeRetries) == 0 { - t.Fatal("expected merge to retry at least once due to concurrent split") + mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp) + splitTS := splitCommit.Load().(hlc.Timestamp) + mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp) + if splitTS.LessEq(mergePreSplitTS) { + t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS) + } + if mergePostSplitTS.LessEq(splitTS) { + t.Fatalf("expected merge to finish after concurrent split, %v <= %v", mergePostSplitTS, splitTS) } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b3e2870fd6f0..053186a5faec 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge( log.Event(ctx, "merge txn begins") txn.SetDebugName(mergeTxnName) - // Observe the commit timestamp to force a client-side retry. See the - // comment on the retry loop after this closure for details. - // - // TODO(benesch): expose a proper API for preventing the fast path. - _ = txn.CommitTimestamp() - // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge( return err } + // Refresh the transaction so that the transaction won't try to refresh + // its reads on the RHS after it is frozen. + if err := txn.ManualRefresh(ctx); err != nil { + return err + } + + // Freeze the commit timestamp of the transaction to prevent future pushes + // due to high-priority reads from other transactions. Any attempt to + // refresh reads on the RHS would result in a stalled merge because the + // RHS will be frozen after the Subsume is sent. + _ = txn.CommitTimestamp() + // Intents have been placed, so the merge is now in its critical phase. Get // a consistent view of the data from the right-hand range. If the merge // commits, we'll write this data to the left-hand range in the merge @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge( // we'll unlock the right-hand range, giving the next, fresh transaction a // chance to succeed. // - // Note that client.DB.Txn performs retries using the same transaction, so we - // have to use our own retry loop. + // A second reason to eschew kv.DB.Txn() is that the API to disable pipelining + // is finicky and only allows disabling pipelining before any operations have + // been sent, even in prior epochs. Calling DisablePipelining() on a restarted + // transaction yields an error. for { txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID()) err := runMergeTxn(txn)