diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 39bb2f845ba3..f2c3d64fc828 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -211,7 +211,7 @@ const ( ReplicationAuto TestClusterReplicationMode = iota // ReplicationManual means that the split, merge and replication queues of all // servers are stopped, and the test must manually control splitting, merging - // and replication through the TestServer. + // and replication through the TestServer. // Note that the server starts with a number of system ranges, // all with a single replica on node 1. ReplicationManual diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index a56c43423461..8feb60467878 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1096,6 +1096,67 @@ func TestStoreRangeMergeTxnFailure(t *testing.T) { } } +// TestStoreRangeMergeTxnRefresh verifies that in cases where the range merge +// transaction's timestamp is bumped, it is able to refresh even after it has +// entered the critical phase of the merge and subsumed the RHS. +func TestStoreRangeMergeTxnRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var sawMergeRefresh int32 + testingResponseFilter := func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { + switch v := ba.Requests[0].GetInner().(type) { + case *roachpb.ConditionalPutRequest: + // Detect the range merge's deletion of the local range descriptor + // and use it as an opportunity to bump the merge transaction's + // write timestamp. This will necessitate a refresh. + // + // Also mark as synthetic, while we're here, to simulate the + // behavior of a range merge across two ranges with the + // LEAD_FOR_GLOBAL_READS closed timestamp policy. + if !v.Value.IsPresent() && bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp. + Add(100*time.Millisecond.Nanoseconds(), 0). + WithSynthetic(true) + } + case *roachpb.RefreshRequest: + if bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { + atomic.AddInt32(&sawMergeRefresh, 1) + } + } + return nil + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: testingResponseFilter, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + // Create the ranges to be merged. + lhsDesc, _, err := tc.Servers[0].ScratchRangeEx() + require.NoError(t, err) + + // Launch the merge. + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), args) + require.Nil(t, pErr) + + // Verify that the range merge refreshed. + require.Greater(t, atomic.LoadInt32(&sawMergeRefresh), int32(1)) +} + // TestStoreRangeSplitMergeGeneration verifies that splits and merges both // update the range descriptor generations of the involved ranges according to // the comment on the RangeDescriptor.Generation field. @@ -2042,8 +2103,8 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { testingResponseFilter := func( ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse, ) *roachpb.Error { - del := ba.Requests[0].GetDelete() - if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 { + cput := ba.Requests[0].GetConditionalPut() + if cput != nil && !cput.Value.IsPresent() && bytes.HasSuffix(cput.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 { // After every few deletions of the local range descriptor, expire all // range leases. This makes the following sequence of events quite likely: // diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 571048001e3d..1f48e3c67d9b 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -281,6 +281,9 @@ type Replica struct { // requests should be held until the completion of the merge is signaled by // the closing of the channel. mergeComplete chan struct{} + // mergeTxnID contains the ID of the in-progress merge transaction, if a + // merge is currently in progress. Otherwise, the ID is empty. + mergeTxnID uuid.UUID // freezeStart indicates the subsumption time of this range when it is the // right-hand range in an ongoing merge. This range will allow read-only // traffic below this timestamp, while blocking everything else, until the @@ -1366,10 +1369,6 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( return nil } - if ba.IsSingleSubsumeRequest() { - return nil - } - // The replica is being merged into its left-hand neighbor. This request // cannot proceed until the merge completes, signaled by the closing of the // channel. @@ -1381,11 +1380,15 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( // important that this check occur after we have verified that this replica // is the leaseholder. Only the leaseholder will have its merge complete // channel set. + + // However, we do permit exactly two forms of requests when a range is in + // the process of being merged into its left-hand neighbor. // - // Note that Subsume commands are exempt from waiting on the mergeComplete - // channel. This is necessary to avoid deadlock. While normally a Subsume - // request will trigger the installation of a mergeComplete channel after it - // is executed, it may sometimes execute after the mergeComplete channel has + // The first request type that we allow on the RHS of a merge after it has + // entered its critical phase is a Subsume request. This sounds backwards, + // but it is necessary to avoid deadlock. While normally a Subsume request + // will trigger the installation of a mergeComplete channel after it is + // executed, it may sometimes execute after the mergeComplete channel has // been installed. Consider the case where the RHS replica acquires a new // lease after the merge transaction deletes its local range descriptor but // before the Subsume command is sent. The lease acquisition request will @@ -1407,14 +1410,55 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( // irrelevant. Subsume is only sent from within a merge transaction, and // merge transactions read the RHS descriptor at the beginning of the // transaction to verify that it has not already been merged away. + if ba.IsSingleSubsumeRequest() { + return nil + } + // The second request type that we allow on the RHS of a merge after it has + // entered its critical phase is a Refresh request, but only one issued by + // the active range merge transaction itself, targeting the RHS's local + // range descriptor. This is necessary to allow the merge transaction to + // have its write timestamp be bumped and still commit without retrying. In + // such cases, the transaction must refresh its reads, including its + // original read on the RHS's local range descriptor. If we were to block + // this refresh on the frozen RHS range, the merge would deadlock. + // + // On the surface, it seems unsafe to permit Refresh requests on an already + // subsumed RHS range, because the Refresh's effect on the timestamp cache + // will never make it to the LHS leaseholder. This risks the future joint + // range serving a write that invalidates the Refresh. However, in this + // specific situation, we can be sure that such a serializability violation + // will not occur because the Range merge also writes to (deletes) this key. + // This means that if the Range merge transaction commits, its intent on the + // key will be resolved to the timestamp of the refresh and no future write + // will ever be able to violate the refresh. Conversely, if the Range merge + // transaction does not commit, then the merge will fail and the update to + // the RHS's timestamp cache will not be lost (not that this particularly + // matters in cases of aborted transactions). // - // We can't wait for the merge to complete here, though. The replica might - // need to respond to a Subsume request in order for the merge to complete, - // and blocking here would force that Subsume request to sit in hold its - // latches forever, deadlocking the merge. Instead, we release the latches - // we acquired above and return a MergeInProgressError. The store will catch - // that error and resubmit the request after mergeCompleteCh closes. See - // #27442 for the full context. + // The same line of reasoning as the one above has motivated us to explore + // removing keys from a transaction's refresh spans when they are written to + // by the transaction, as the intents written by the transaction act as a + // form of pessimistic lock that obviate the need for the optimistic + // refresh. Such an improvement would eliminate the need for this special + // case, but until we generalize the mechanism to prune refresh spans based + // on intent spans, we're forced to live with this. + if ba.Txn != nil && ba.Txn.ID == r.mu.mergeTxnID { + if ba.IsSingleRefreshRequest() { + desc := r.descRLocked() + descKey := keys.RangeDescriptorKey(desc.StartKey) + if ba.Requests[0].GetRefresh().Key.Equal(descKey) { + return nil + } + } + } + + // Otherwise, the request must wait. We can't wait for the merge to complete + // here, though. The replica might need to respond to a Subsume request in + // order for the merge to complete, and blocking here would force that + // Subsume request to sit in hold its latches forever, deadlocking the + // merge. Instead, we release the latches we acquired above and return a + // MergeInProgressError. The store will catch that error and resubmit the + // request after mergeCompleteCh closes. See #27442 for the full context. return &roachpb.MergeInProgressError{} } @@ -1534,6 +1578,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { return true, nil } r.mu.mergeComplete = mergeCompleteCh + r.mu.mergeTxnID = intent.Txn.ID // The RHS of a merge is not permitted to quiesce while a mergeComplete // channel is installed. (If the RHS is quiescent when the merge commits, any // orphaned followers would fail to queue themselves for GC.) Unquiesce the @@ -1650,6 +1695,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { // notice that the replica has been destroyed and return an appropriate // error. If the merge aborted, the requests will be handled normally. r.mu.mergeComplete = nil + r.mu.mergeTxnID = uuid.UUID{} close(mergeCompleteCh) r.mu.Unlock() r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 960dc2997958..0696859fef64 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -563,6 +564,13 @@ func (r *Replica) AdminMerge( log.Event(ctx, "merge txn begins") txn.SetDebugName(mergeTxnName) + // If we aren't certain that all possible nodes in the cluster support a + // range merge transaction refreshing its reads while the RHS range is + // subsumed, observe the commit timestamp to force a client-side retry. + if !r.ClusterSettings().Version.IsActive(ctx, clusterversion.PriorReadSummaries) { + _ = txn.CommitTimestamp() + } + // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the @@ -701,18 +709,6 @@ func (r *Replica) AdminMerge( return err } - // Refresh the transaction so that the transaction won't try to refresh - // its reads on the RHS after it is frozen. - if err := txn.ManualRefresh(ctx); err != nil { - return err - } - - // Freeze the commit timestamp of the transaction to prevent future pushes - // due to high-priority reads from other transactions. Any attempt to - // refresh reads on the RHS would result in a stalled merge because the - // RHS will be frozen after the Subsume is sent. - _ = txn.CommitTimestamp() - // Intents have been placed, so the merge is now in its critical phase. Get // a consistent view of the data from the right-hand range. If the merge // commits, we'll write this data to the left-hand range in the merge diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 3061f81c774f..48f98bfc14b5 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -221,6 +221,12 @@ func (ba *BatchRequest) IsSingleAbortTxnRequest() bool { return false } +// IsSingleRefreshRequest returns true iff the batch contains a single request, +// and that request is a RefreshRequest. +func (ba *BatchRequest) IsSingleRefreshRequest() bool { + return ba.isSingleRequestWithMethod(Refresh) +} + // IsSingleSubsumeRequest returns true iff the batch contains a single request, // and that request is an SubsumeRequest. func (ba *BatchRequest) IsSingleSubsumeRequest() bool {