diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 5ad450fd5e7c..62c2088dc862 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -346,6 +346,9 @@ func (v *validator) processOp(txnID *string, op Operation) { } else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) { // A lease transfer that races with a replica removal may find that // the store it was targeting is no longer part of the range. + } else if resultIsErrorStr(t.Result, `cannot transfer lease while merge in progress`) { + // A lease transfer is not permitted while a range merge is in its + // critical phase. } else if resultIsError(t.Result, liveness.ErrRecordCacheMiss) { // If the existing leaseholder has not yet heard about the transfer // target's liveness record through gossip, it will return an error. diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index acb8d232ef23..a56c43423461 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1854,6 +1854,91 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { } } +// TestStoreRangeMergeRHSLeaseTransfers verifies that in cases where a lease +// transfer is triggered while a range merge is in progress, it is rejected +// immediately and does not prevent the merge itself from completing by creating +// a deadlock. +func TestStoreRangeMergeRHSLeaseTransfers(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Install a hook to control when the merge transaction subsumes the RHS. + // Put this in a sync.Once to ignore retries. + var once sync.Once + subsumeReceived := make(chan struct{}) + finishSubsume := make(chan struct{}) + testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleSubsumeRequest() { + once.Do(func() { + subsumeReceived <- struct{}{} + <-finishSubsume + }) + } + return nil + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: testingRequestFilter, + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + // Create the ranges to be merged. Put both ranges on both stores, but give + // the second store the lease on the RHS. The LHS is largely irrelevant. What + // matters is that the RHS exists on two stores so we can transfer its lease + // during the merge. + lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeWithExpirationLeaseEx() + require.NoError(t, err) + + tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1)) + + // Launch the merge. + mergeErr := make(chan error) + _ = tc.Stopper().RunAsyncTask(ctx, "merge", func(context.Context) { + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), args) + mergeErr <- pErr.GoError() + }) + + // Wait for the merge transaction to send its Subsume request. It won't + // be able to complete just yet, thanks to the hook we installed above. + <-subsumeReceived + + // Transfer the lease to store 0. Even though the Subsume request has not + // yet evaluated, the new leaseholder will notice the deletion intent on its + // local range descriptor (see maybeWatchForMergeLocked) and will begin + // blocking most operations. + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(0)) + + // Attempt to transfer the lease back to store 1. This will cause the + // current leaseholder to revoke its lease (see minLeaseProposedTS), which + // will cause the Subsume request to need to acquire a new range lease. + // + // In the past, this lease transfer would get blocked on the mergeComplete + // channel. While in this state, it would block the lease acquisition + // triggerred by the Subsume request because a replica only performs a + // single lease operation at a time. As a result, this would deadlock and + // neither the lease transfer nor the merge would ever complete. + err = tc.TransferRangeLease(rhsDesc, tc.Target(1)) + require.Regexp(t, "cannot transfer lease while merge in progress", err) + + // Finally, allow the merge to complete. It should complete successfully. + close(finishSubsume) + require.NoError(t, <-mergeErr) +} + // TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the following: // 1. While a range is subsumed, ComputeChecksum requests wait until the merge // is complete before proceeding. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index cac114704f5f..689e278479e9 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -489,6 +489,22 @@ func (r *Replica) handleMergeInProgressError( // Merge no longer in progress. Retry the command. return nil } + // Check to see if the request is a lease transfer. If so, reject it + // immediately instead of a waiting for the merge to complete. This is + // necessary because the merge may need to acquire a range lease in order to + // complete if it still needs to perform its Subsume request, which it + // likely will if this lease transfer revoked the leaseholder's existing + // range lease. Any concurrent lease acquisition attempt will be blocked on + // this lease transfer because a replica only performs a single lease + // operation at a time, so we reject to prevent a deadlock. + // + // NOTE: it would not be sufficient to check for an in-progress merge in + // AdminTransferLease because the range may notice the in-progress merge + // after the lease transfer is initiated but before the lease transfer + // acquires latches. + if ba.IsSingleTransferLeaseRequest() { + return roachpb.NewErrorf("cannot transfer lease while merge in progress") + } log.Event(ctx, "waiting on in-progress merge") select { case <-mergeCompleteCh: diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index cbe935d09e93..3061f81c774f 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -184,43 +184,39 @@ func (ba *BatchRequest) IsSingleSkipLeaseCheckRequest() bool { return ba.IsSingleRequest() && ba.hasFlag(skipLeaseCheck) } +func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool { + return ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == m +} + +// IsSingleTransferLeaseRequest returns true iff the batch contains a single +// request, and that request is a TransferLease. +func (ba *BatchRequest) IsSingleTransferLeaseRequest() bool { + return ba.isSingleRequestWithMethod(TransferLease) +} + // IsSinglePushTxnRequest returns true iff the batch contains a single -// request, and that request is for a PushTxn. +// request, and that request is a PushTxn. func (ba *BatchRequest) IsSinglePushTxnRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*PushTxnRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(PushTxn) } // IsSingleHeartbeatTxnRequest returns true iff the batch contains a single // request, and that request is a HeartbeatTxn. func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*HeartbeatTxnRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(HeartbeatTxn) } // IsSingleEndTxnRequest returns true iff the batch contains a single request, // and that request is an EndTxnRequest. func (ba *BatchRequest) IsSingleEndTxnRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*EndTxnRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(EndTxn) } // IsSingleAbortTxnRequest returns true iff the batch contains a single request, // and that request is an EndTxnRequest(commit=false). func (ba *BatchRequest) IsSingleAbortTxnRequest() bool { - if ba.IsSingleRequest() { - if et, ok := ba.Requests[0].GetInner().(*EndTxnRequest); ok { - return !et.Commit - } + if ba.isSingleRequestWithMethod(EndTxn) { + return !ba.Requests[0].GetInner().(*EndTxnRequest).Commit } return false } @@ -228,40 +224,27 @@ func (ba *BatchRequest) IsSingleAbortTxnRequest() bool { // IsSingleSubsumeRequest returns true iff the batch contains a single request, // and that request is an SubsumeRequest. func (ba *BatchRequest) IsSingleSubsumeRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*SubsumeRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(Subsume) } // IsSingleComputeChecksumRequest returns true iff the batch contains a single // request, and that request is a ComputeChecksumRequest. func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*ComputeChecksumRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(ComputeChecksum) } // IsSingleCheckConsistencyRequest returns true iff the batch contains a single // request, and that request is a CheckConsistencyRequest. func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*CheckConsistencyRequest) - return ok - } - return false + return ba.isSingleRequestWithMethod(CheckConsistency) } // IsSingleAddSSTableRequest returns true iff the batch contains a single // request, and that request is an AddSSTableRequest that will ingest as an SST, // (i.e. does not have IngestAsWrites set) func (ba *BatchRequest) IsSingleAddSSTableRequest() bool { - if ba.IsSingleRequest() { - req, ok := ba.Requests[0].GetInner().(*AddSSTableRequest) - return ok && !req.IngestAsWrites + if ba.isSingleRequestWithMethod(AddSSTable) { + return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites } return false }