Skip to content

Commit

Permalink
kv: permit merge transactions to refresh after subsumption
Browse files Browse the repository at this point in the history
Fixes #59308.

This commit adds support for range merge transactions to refresh. This
is necessary to allow the merge transaction to have its write timestamp
be bumped and still commit without retrying. In such cases, the
transaction must refresh its reads, including its original read on the
RHS's local range descriptor. If we were to block this refresh on the
frozen RHS range, the merge would deadlock.

On the surface, it seems unsafe to permit Refresh requests on an already
subsumed RHS range, because the refresh's effect on the timestamp cache
will never make it to the LHS leaseholder. This risks the future joint
range serving a write that invalidates the Refresh. However, in this
specific situation, we can be sure that such a serializability violation
will not occur because the Range merge also writes to (deletes) this
key. This means that if the Range merge transaction commits, its intent
on the key will be resolved to the timestamp of the refresh and no
future write will ever be able to violate the refresh. Conversely, if the
Range merge transaction does not commit, then the merge will fail and
the update to the RHS's timestamp cache will not be lost (not that this
particularly matters in cases of aborted transactions).

The same line of reasoning as the one above has motivated us to explore
removing keys from a transaction's refresh spans when they are written
to by the transaction, as the intents written by the transaction act as
a form of pessimistic lock that obviate the need for the optimistic
refresh. Such an improvement would eliminate the need for this special
case, but until we generalize the mechanism to prune refresh spans based
on intent spans, we're forced to live with this.

See #59308 (comment)
for why the original fix, which attempted to manually refresh the range
merge transaction before it entered its critical phase, was not sufficient.

Release justification: needed for new functionality.
  • Loading branch information
nvanbenschoten committed Mar 3, 2021
1 parent 4ab29d4 commit df1c620
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ const (
ReplicationAuto TestClusterReplicationMode = iota
// ReplicationManual means that the split, merge and replication queues of all
// servers are stopped, and the test must manually control splitting, merging
// and replication through the TestServer.
// and replication through the TestServer.
// Note that the server starts with a number of system ranges,
// all with a single replica on node 1.
ReplicationManual
Expand Down
65 changes: 63 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,67 @@ func TestStoreRangeMergeTxnFailure(t *testing.T) {
}
}

// TestStoreRangeMergeTxnRefresh verifies that in cases where the range merge
// transaction's timestamp is bumped, it is able to refresh even after it has
// entered the critical phase of the merge and subsumed the RHS.
func TestStoreRangeMergeTxnRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var sawMergeRefresh int32
testingResponseFilter := func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
switch v := ba.Requests[0].GetInner().(type) {
case *roachpb.ConditionalPutRequest:
// Detect the range merge's deletion of the local range descriptor
// and use it as an opportunity to bump the merge transaction's
// write timestamp. This will necessitate a refresh.
//
// Also mark as synthetic, while we're here, to simulate the
// behavior of a range merge across two ranges with the
// LEAD_FOR_GLOBAL_READS closed timestamp policy.
if !v.Value.IsPresent() && bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) {
br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.
Add(100*time.Millisecond.Nanoseconds(), 0).
WithSynthetic(true)
}
case *roachpb.RefreshRequest:
if bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) {
atomic.AddInt32(&sawMergeRefresh, 1)
}
}
return nil
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: testingResponseFilter,
},
},
},
})
defer tc.Stopper().Stop(ctx)
store := tc.GetFirstStoreFromServer(t, 0)

// Create the ranges to be merged.
lhsDesc, _, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)

// Launch the merge.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := kv.SendWrapped(ctx, store.TestSender(), args)
require.Nil(t, pErr)

// Verify that the range merge refreshed.
require.Greater(t, atomic.LoadInt32(&sawMergeRefresh), int32(1))
}

// TestStoreRangeSplitMergeGeneration verifies that splits and merges both
// update the range descriptor generations of the involved ranges according to
// the comment on the RangeDescriptor.Generation field.
Expand Down Expand Up @@ -2042,8 +2103,8 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) {
testingResponseFilter := func(
ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse,
) *roachpb.Error {
del := ba.Requests[0].GetDelete()
if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 {
cput := ba.Requests[0].GetConditionalPut()
if cput != nil && !cput.Value.IsPresent() && bytes.HasSuffix(cput.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 {
// After every few deletions of the local range descriptor, expire all
// range leases. This makes the following sequence of events quite likely:
//
Expand Down
76 changes: 61 additions & 15 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ type Replica struct {
// requests should be held until the completion of the merge is signaled by
// the closing of the channel.
mergeComplete chan struct{}
// mergeTxnID contains the ID of the in-progress merge transaction, if a
// merge is currently in progress. Otherwise, the ID is empty.
mergeTxnID uuid.UUID
// freezeStart indicates the subsumption time of this range when it is the
// right-hand range in an ongoing merge. This range will allow read-only
// traffic below this timestamp, while blocking everything else, until the
Expand Down Expand Up @@ -1366,10 +1369,6 @@ func (r *Replica) shouldWaitForPendingMergeRLocked(
return nil
}

if ba.IsSingleSubsumeRequest() {
return nil
}

// The replica is being merged into its left-hand neighbor. This request
// cannot proceed until the merge completes, signaled by the closing of the
// channel.
Expand All @@ -1381,11 +1380,15 @@ func (r *Replica) shouldWaitForPendingMergeRLocked(
// important that this check occur after we have verified that this replica
// is the leaseholder. Only the leaseholder will have its merge complete
// channel set.

// However, we do permit exactly two forms of requests when a range is in
// the process of being merged into its left-hand neighbor.
//
// Note that Subsume commands are exempt from waiting on the mergeComplete
// channel. This is necessary to avoid deadlock. While normally a Subsume
// request will trigger the installation of a mergeComplete channel after it
// is executed, it may sometimes execute after the mergeComplete channel has
// The first request type that we allow on the RHS of a merge after it has
// entered its critical phase is a Subsume request. This sounds backwards,
// but it is necessary to avoid deadlock. While normally a Subsume request
// will trigger the installation of a mergeComplete channel after it is
// executed, it may sometimes execute after the mergeComplete channel has
// been installed. Consider the case where the RHS replica acquires a new
// lease after the merge transaction deletes its local range descriptor but
// before the Subsume command is sent. The lease acquisition request will
Expand All @@ -1407,14 +1410,55 @@ func (r *Replica) shouldWaitForPendingMergeRLocked(
// irrelevant. Subsume is only sent from within a merge transaction, and
// merge transactions read the RHS descriptor at the beginning of the
// transaction to verify that it has not already been merged away.
if ba.IsSingleSubsumeRequest() {
return nil
}
// The second request type that we allow on the RHS of a merge after it has
// entered its critical phase is a Refresh request, but only one issued by
// the active range merge transaction itself, targeting the RHS's local
// range descriptor. This is necessary to allow the merge transaction to
// have its write timestamp be bumped and still commit without retrying. In
// such cases, the transaction must refresh its reads, including its
// original read on the RHS's local range descriptor. If we were to block
// this refresh on the frozen RHS range, the merge would deadlock.
//
// On the surface, it seems unsafe to permit Refresh requests on an already
// subsumed RHS range, because the Refresh's effect on the timestamp cache
// will never make it to the LHS leaseholder. This risks the future joint
// range serving a write that invalidates the Refresh. However, in this
// specific situation, we can be sure that such a serializability violation
// will not occur because the Range merge also writes to (deletes) this key.
// This means that if the Range merge transaction commits, its intent on the
// key will be resolved to the timestamp of the refresh and no future write
// will ever be able to violate the refresh. Conversely, if the Range merge
// transaction does not commit, then the merge will fail and the update to
// the RHS's timestamp cache will not be lost (not that this particularly
// matters in cases of aborted transactions).
//
// We can't wait for the merge to complete here, though. The replica might
// need to respond to a Subsume request in order for the merge to complete,
// and blocking here would force that Subsume request to sit in hold its
// latches forever, deadlocking the merge. Instead, we release the latches
// we acquired above and return a MergeInProgressError. The store will catch
// that error and resubmit the request after mergeCompleteCh closes. See
// #27442 for the full context.
// The same line of reasoning as the one above has motivated us to explore
// removing keys from a transaction's refresh spans when they are written to
// by the transaction, as the intents written by the transaction act as a
// form of pessimistic lock that obviate the need for the optimistic
// refresh. Such an improvement would eliminate the need for this special
// case, but until we generalize the mechanism to prune refresh spans based
// on intent spans, we're forced to live with this.
if ba.Txn != nil && ba.Txn.ID == r.mu.mergeTxnID {
if ba.IsSingleRefreshRequest() {
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
if ba.Requests[0].GetRefresh().Key.Equal(descKey) {
return nil
}
}
}

// Otherwise, the request must wait. We can't wait for the merge to complete
// here, though. The replica might need to respond to a Subsume request in
// order for the merge to complete, and blocking here would force that
// Subsume request to sit in hold its latches forever, deadlocking the
// merge. Instead, we release the latches we acquired above and return a
// MergeInProgressError. The store will catch that error and resubmit the
// request after mergeCompleteCh closes. See #27442 for the full context.
return &roachpb.MergeInProgressError{}
}

Expand Down Expand Up @@ -1534,6 +1578,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
return true, nil
}
r.mu.mergeComplete = mergeCompleteCh
r.mu.mergeTxnID = intent.Txn.ID
// The RHS of a merge is not permitted to quiesce while a mergeComplete
// channel is installed. (If the RHS is quiescent when the merge commits, any
// orphaned followers would fail to queue themselves for GC.) Unquiesce the
Expand Down Expand Up @@ -1650,6 +1695,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// notice that the replica has been destroyed and return an appropriate
// error. If the merge aborted, the requests will be handled normally.
r.mu.mergeComplete = nil
r.mu.mergeTxnID = uuid.UUID{}
close(mergeCompleteCh)
r.mu.Unlock()
r.raftMu.Unlock()
Expand Down
20 changes: 8 additions & 12 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -563,6 +564,13 @@ func (r *Replica) AdminMerge(
log.Event(ctx, "merge txn begins")
txn.SetDebugName(mergeTxnName)

// If we aren't certain that all possible nodes in the cluster support a
// range merge transaction refreshing its reads while the RHS range is
// subsumed, observe the commit timestamp to force a client-side retry.
if !r.ClusterSettings().Version.IsActive(ctx, clusterversion.PriorReadSummaries) {
_ = txn.CommitTimestamp()
}

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down Expand Up @@ -701,18 +709,6 @@ func (r *Replica) AdminMerge(
return err
}

// Refresh the transaction so that the transaction won't try to refresh
// its reads on the RHS after it is frozen.
if err := txn.ManualRefresh(ctx); err != nil {
return err
}

// Freeze the commit timestamp of the transaction to prevent future pushes
// due to high-priority reads from other transactions. Any attempt to
// refresh reads on the RHS would result in a stalled merge because the
// RHS will be frozen after the Subsume is sent.
_ = txn.CommitTimestamp()

// Intents have been placed, so the merge is now in its critical phase. Get
// a consistent view of the data from the right-hand range. If the merge
// commits, we'll write this data to the left-hand range in the merge
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func (ba *BatchRequest) IsSingleAbortTxnRequest() bool {
return false
}

// IsSingleRefreshRequest returns true iff the batch contains a single request,
// and that request is a RefreshRequest.
func (ba *BatchRequest) IsSingleRefreshRequest() bool {
return ba.isSingleRequestWithMethod(Refresh)
}

// IsSingleSubsumeRequest returns true iff the batch contains a single request,
// and that request is an SubsumeRequest.
func (ba *BatchRequest) IsSingleSubsumeRequest() bool {
Expand Down

0 comments on commit df1c620

Please sign in to comment.