From df1c620dc4d95a9e598ff852753d3a7e61d01cb6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Mar 2021 02:37:45 -0500 Subject: [PATCH] kv: permit merge transactions to refresh after subsumption Fixes #59308. This commit adds support for range merge transactions to refresh. This is necessary to allow the merge transaction to have its write timestamp be bumped and still commit without retrying. In such cases, the transaction must refresh its reads, including its original read on the RHS's local range descriptor. If we were to block this refresh on the frozen RHS range, the merge would deadlock. On the surface, it seems unsafe to permit Refresh requests on an already subsumed RHS range, because the refresh's effect on the timestamp cache will never make it to the LHS leaseholder. This risks the future joint range serving a write that invalidates the Refresh. However, in this specific situation, we can be sure that such a serializability violation will not occur because the Range merge also writes to (deletes) this key. This means that if the Range merge transaction commits, its intent on the key will be resolved to the timestamp of the refresh and no future write will ever be able to violate the refresh. Conversely, if the Range merge transaction does not commit, then the merge will fail and the update to the RHS's timestamp cache will not be lost (not that this particularly matters in cases of aborted transactions). The same line of reasoning as the one above has motivated us to explore removing keys from a transaction's refresh spans when they are written to by the transaction, as the intents written by the transaction act as a form of pessimistic lock that obviate the need for the optimistic refresh. Such an improvement would eliminate the need for this special case, but until we generalize the mechanism to prune refresh spans based on intent spans, we're forced to live with this. See https://github.com/cockroachdb/cockroach/issues/59308#issuecomment-786162869 for why the original fix, which attempted to manually refresh the range merge transaction before it entered its critical phase, was not sufficient. Release justification: needed for new functionality. --- pkg/base/test_server_args.go | 2 +- pkg/kv/kvserver/client_merge_test.go | 65 +++++++++++++++++++++++- pkg/kv/kvserver/replica.go | 76 ++++++++++++++++++++++------ pkg/kv/kvserver/replica_command.go | 20 +++----- pkg/roachpb/batch.go | 6 +++ 5 files changed, 139 insertions(+), 30 deletions(-) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 39bb2f845ba3..f2c3d64fc828 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -211,7 +211,7 @@ const ( ReplicationAuto TestClusterReplicationMode = iota // ReplicationManual means that the split, merge and replication queues of all // servers are stopped, and the test must manually control splitting, merging - // and replication through the TestServer. + // and replication through the TestServer. // Note that the server starts with a number of system ranges, // all with a single replica on node 1. ReplicationManual diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index a56c43423461..8feb60467878 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1096,6 +1096,67 @@ func TestStoreRangeMergeTxnFailure(t *testing.T) { } } +// TestStoreRangeMergeTxnRefresh verifies that in cases where the range merge +// transaction's timestamp is bumped, it is able to refresh even after it has +// entered the critical phase of the merge and subsumed the RHS. +func TestStoreRangeMergeTxnRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var sawMergeRefresh int32 + testingResponseFilter := func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { + switch v := ba.Requests[0].GetInner().(type) { + case *roachpb.ConditionalPutRequest: + // Detect the range merge's deletion of the local range descriptor + // and use it as an opportunity to bump the merge transaction's + // write timestamp. This will necessitate a refresh. + // + // Also mark as synthetic, while we're here, to simulate the + // behavior of a range merge across two ranges with the + // LEAD_FOR_GLOBAL_READS closed timestamp policy. + if !v.Value.IsPresent() && bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp. + Add(100*time.Millisecond.Nanoseconds(), 0). + WithSynthetic(true) + } + case *roachpb.RefreshRequest: + if bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { + atomic.AddInt32(&sawMergeRefresh, 1) + } + } + return nil + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: testingResponseFilter, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + // Create the ranges to be merged. + lhsDesc, _, err := tc.Servers[0].ScratchRangeEx() + require.NoError(t, err) + + // Launch the merge. + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), args) + require.Nil(t, pErr) + + // Verify that the range merge refreshed. + require.Greater(t, atomic.LoadInt32(&sawMergeRefresh), int32(1)) +} + // TestStoreRangeSplitMergeGeneration verifies that splits and merges both // update the range descriptor generations of the involved ranges according to // the comment on the RangeDescriptor.Generation field. @@ -2042,8 +2103,8 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { testingResponseFilter := func( ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse, ) *roachpb.Error { - del := ba.Requests[0].GetDelete() - if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 { + cput := ba.Requests[0].GetConditionalPut() + if cput != nil && !cput.Value.IsPresent() && bytes.HasSuffix(cput.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 { // After every few deletions of the local range descriptor, expire all // range leases. This makes the following sequence of events quite likely: // diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 571048001e3d..1f48e3c67d9b 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -281,6 +281,9 @@ type Replica struct { // requests should be held until the completion of the merge is signaled by // the closing of the channel. mergeComplete chan struct{} + // mergeTxnID contains the ID of the in-progress merge transaction, if a + // merge is currently in progress. Otherwise, the ID is empty. + mergeTxnID uuid.UUID // freezeStart indicates the subsumption time of this range when it is the // right-hand range in an ongoing merge. This range will allow read-only // traffic below this timestamp, while blocking everything else, until the @@ -1366,10 +1369,6 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( return nil } - if ba.IsSingleSubsumeRequest() { - return nil - } - // The replica is being merged into its left-hand neighbor. This request // cannot proceed until the merge completes, signaled by the closing of the // channel. @@ -1381,11 +1380,15 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( // important that this check occur after we have verified that this replica // is the leaseholder. Only the leaseholder will have its merge complete // channel set. + + // However, we do permit exactly two forms of requests when a range is in + // the process of being merged into its left-hand neighbor. // - // Note that Subsume commands are exempt from waiting on the mergeComplete - // channel. This is necessary to avoid deadlock. While normally a Subsume - // request will trigger the installation of a mergeComplete channel after it - // is executed, it may sometimes execute after the mergeComplete channel has + // The first request type that we allow on the RHS of a merge after it has + // entered its critical phase is a Subsume request. This sounds backwards, + // but it is necessary to avoid deadlock. While normally a Subsume request + // will trigger the installation of a mergeComplete channel after it is + // executed, it may sometimes execute after the mergeComplete channel has // been installed. Consider the case where the RHS replica acquires a new // lease after the merge transaction deletes its local range descriptor but // before the Subsume command is sent. The lease acquisition request will @@ -1407,14 +1410,55 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( // irrelevant. Subsume is only sent from within a merge transaction, and // merge transactions read the RHS descriptor at the beginning of the // transaction to verify that it has not already been merged away. + if ba.IsSingleSubsumeRequest() { + return nil + } + // The second request type that we allow on the RHS of a merge after it has + // entered its critical phase is a Refresh request, but only one issued by + // the active range merge transaction itself, targeting the RHS's local + // range descriptor. This is necessary to allow the merge transaction to + // have its write timestamp be bumped and still commit without retrying. In + // such cases, the transaction must refresh its reads, including its + // original read on the RHS's local range descriptor. If we were to block + // this refresh on the frozen RHS range, the merge would deadlock. + // + // On the surface, it seems unsafe to permit Refresh requests on an already + // subsumed RHS range, because the Refresh's effect on the timestamp cache + // will never make it to the LHS leaseholder. This risks the future joint + // range serving a write that invalidates the Refresh. However, in this + // specific situation, we can be sure that such a serializability violation + // will not occur because the Range merge also writes to (deletes) this key. + // This means that if the Range merge transaction commits, its intent on the + // key will be resolved to the timestamp of the refresh and no future write + // will ever be able to violate the refresh. Conversely, if the Range merge + // transaction does not commit, then the merge will fail and the update to + // the RHS's timestamp cache will not be lost (not that this particularly + // matters in cases of aborted transactions). // - // We can't wait for the merge to complete here, though. The replica might - // need to respond to a Subsume request in order for the merge to complete, - // and blocking here would force that Subsume request to sit in hold its - // latches forever, deadlocking the merge. Instead, we release the latches - // we acquired above and return a MergeInProgressError. The store will catch - // that error and resubmit the request after mergeCompleteCh closes. See - // #27442 for the full context. + // The same line of reasoning as the one above has motivated us to explore + // removing keys from a transaction's refresh spans when they are written to + // by the transaction, as the intents written by the transaction act as a + // form of pessimistic lock that obviate the need for the optimistic + // refresh. Such an improvement would eliminate the need for this special + // case, but until we generalize the mechanism to prune refresh spans based + // on intent spans, we're forced to live with this. + if ba.Txn != nil && ba.Txn.ID == r.mu.mergeTxnID { + if ba.IsSingleRefreshRequest() { + desc := r.descRLocked() + descKey := keys.RangeDescriptorKey(desc.StartKey) + if ba.Requests[0].GetRefresh().Key.Equal(descKey) { + return nil + } + } + } + + // Otherwise, the request must wait. We can't wait for the merge to complete + // here, though. The replica might need to respond to a Subsume request in + // order for the merge to complete, and blocking here would force that + // Subsume request to sit in hold its latches forever, deadlocking the + // merge. Instead, we release the latches we acquired above and return a + // MergeInProgressError. The store will catch that error and resubmit the + // request after mergeCompleteCh closes. See #27442 for the full context. return &roachpb.MergeInProgressError{} } @@ -1534,6 +1578,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { return true, nil } r.mu.mergeComplete = mergeCompleteCh + r.mu.mergeTxnID = intent.Txn.ID // The RHS of a merge is not permitted to quiesce while a mergeComplete // channel is installed. (If the RHS is quiescent when the merge commits, any // orphaned followers would fail to queue themselves for GC.) Unquiesce the @@ -1650,6 +1695,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { // notice that the replica has been destroyed and return an appropriate // error. If the merge aborted, the requests will be handled normally. r.mu.mergeComplete = nil + r.mu.mergeTxnID = uuid.UUID{} close(mergeCompleteCh) r.mu.Unlock() r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 960dc2997958..0696859fef64 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -563,6 +564,13 @@ func (r *Replica) AdminMerge( log.Event(ctx, "merge txn begins") txn.SetDebugName(mergeTxnName) + // If we aren't certain that all possible nodes in the cluster support a + // range merge transaction refreshing its reads while the RHS range is + // subsumed, observe the commit timestamp to force a client-side retry. + if !r.ClusterSettings().Version.IsActive(ctx, clusterversion.PriorReadSummaries) { + _ = txn.CommitTimestamp() + } + // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the @@ -701,18 +709,6 @@ func (r *Replica) AdminMerge( return err } - // Refresh the transaction so that the transaction won't try to refresh - // its reads on the RHS after it is frozen. - if err := txn.ManualRefresh(ctx); err != nil { - return err - } - - // Freeze the commit timestamp of the transaction to prevent future pushes - // due to high-priority reads from other transactions. Any attempt to - // refresh reads on the RHS would result in a stalled merge because the - // RHS will be frozen after the Subsume is sent. - _ = txn.CommitTimestamp() - // Intents have been placed, so the merge is now in its critical phase. Get // a consistent view of the data from the right-hand range. If the merge // commits, we'll write this data to the left-hand range in the merge diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 3061f81c774f..48f98bfc14b5 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -221,6 +221,12 @@ func (ba *BatchRequest) IsSingleAbortTxnRequest() bool { return false } +// IsSingleRefreshRequest returns true iff the batch contains a single request, +// and that request is a RefreshRequest. +func (ba *BatchRequest) IsSingleRefreshRequest() bool { + return ba.isSingleRequestWithMethod(Refresh) +} + // IsSingleSubsumeRequest returns true iff the batch contains a single request, // and that request is an SubsumeRequest. func (ba *BatchRequest) IsSingleSubsumeRequest() bool {