Skip to content

Commit

Permalink
Merge #60905
Browse files Browse the repository at this point in the history
60905: kv: reject lease transfers during in-progress range merge r=nvanbenschoten a=nvanbenschoten

Found with kvnemesis's new ability to issue lease transfers.

This commit prevents TransferLease requests from waiting on in-progress
merges, instead rejecting them in `handleMergeInProgressError`. This is
necessary because the merge may need to acquire a range lease in order
to complete if it still needs to perform its Subsume request, which it
likely will if this lease transfer revoked the leaseholder's existing
range lease. Any concurrent lease acquisition attempt will be blocked on
this lease transfer because a replica only performs a single lease
operation at a time, so we reject to prevent a deadlock.

The commit adds a new `TestStoreRangeMergeRHSLeaseTransfers` test which
reliably hit this deadlock before the issue was fixed.

Release note (bug fix): Fixes a rare deadlock where a series of lease
transfers concurrent with a Range merge could block each other from
ever completing.

Release justification: bug fix

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 2, 2021
2 parents aa29488 + 9fbbb1a commit f8b7708
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 38 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (v *validator) processOp(txnID *string, op Operation) {
} else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) {
// A lease transfer that races with a replica removal may find that
// the store it was targeting is no longer part of the range.
} else if resultIsErrorStr(t.Result, `cannot transfer lease while merge in progress`) {
// A lease transfer is not permitted while a range merge is in its
// critical phase.
} else if resultIsError(t.Result, liveness.ErrRecordCacheMiss) {
// If the existing leaseholder has not yet heard about the transfer
// target's liveness record through gossip, it will return an error.
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,91 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
}
}

// TestStoreRangeMergeRHSLeaseTransfers verifies that in cases where a lease
// transfer is triggered while a range merge is in progress, it is rejected
// immediately and does not prevent the merge itself from completing by creating
// a deadlock.
func TestStoreRangeMergeRHSLeaseTransfers(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Install a hook to control when the merge transaction subsumes the RHS.
// Put this in a sync.Once to ignore retries.
var once sync.Once
subsumeReceived := make(chan struct{})
finishSubsume := make(chan struct{})
testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.IsSingleSubsumeRequest() {
once.Do(func() {
subsumeReceived <- struct{}{}
<-finishSubsume
})
}
return nil
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: testingRequestFilter,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)
store := tc.GetFirstStoreFromServer(t, 0)

// Create the ranges to be merged. Put both ranges on both stores, but give
// the second store the lease on the RHS. The LHS is largely irrelevant. What
// matters is that the RHS exists on two stores so we can transfer its lease
// during the merge.
lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeWithExpirationLeaseEx()
require.NoError(t, err)

tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1))
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1))

// Launch the merge.
mergeErr := make(chan error)
_ = tc.Stopper().RunAsyncTask(ctx, "merge", func(context.Context) {
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := kv.SendWrapped(ctx, store.TestSender(), args)
mergeErr <- pErr.GoError()
})

// Wait for the merge transaction to send its Subsume request. It won't
// be able to complete just yet, thanks to the hook we installed above.
<-subsumeReceived

// Transfer the lease to store 0. Even though the Subsume request has not
// yet evaluated, the new leaseholder will notice the deletion intent on its
// local range descriptor (see maybeWatchForMergeLocked) and will begin
// blocking most operations.
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(0))

// Attempt to transfer the lease back to store 1. This will cause the
// current leaseholder to revoke its lease (see minLeaseProposedTS), which
// will cause the Subsume request to need to acquire a new range lease.
//
// In the past, this lease transfer would get blocked on the mergeComplete
// channel. While in this state, it would block the lease acquisition
// triggerred by the Subsume request because a replica only performs a
// single lease operation at a time. As a result, this would deadlock and
// neither the lease transfer nor the merge would ever complete.
err = tc.TransferRangeLease(rhsDesc, tc.Target(1))
require.Regexp(t, "cannot transfer lease while merge in progress", err)

// Finally, allow the merge to complete. It should complete successfully.
close(finishSubsume)
require.NoError(t, <-mergeErr)
}

// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the following:
// 1. While a range is subsumed, ComputeChecksum requests wait until the merge
// is complete before proceeding.
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,22 @@ func (r *Replica) handleMergeInProgressError(
// Merge no longer in progress. Retry the command.
return nil
}
// Check to see if the request is a lease transfer. If so, reject it
// immediately instead of a waiting for the merge to complete. This is
// necessary because the merge may need to acquire a range lease in order to
// complete if it still needs to perform its Subsume request, which it
// likely will if this lease transfer revoked the leaseholder's existing
// range lease. Any concurrent lease acquisition attempt will be blocked on
// this lease transfer because a replica only performs a single lease
// operation at a time, so we reject to prevent a deadlock.
//
// NOTE: it would not be sufficient to check for an in-progress merge in
// AdminTransferLease because the range may notice the in-progress merge
// after the lease transfer is initiated but before the lease transfer
// acquires latches.
if ba.IsSingleTransferLeaseRequest() {
return roachpb.NewErrorf("cannot transfer lease while merge in progress")
}
log.Event(ctx, "waiting on in-progress merge")
select {
case <-mergeCompleteCh:
Expand Down
59 changes: 21 additions & 38 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,84 +184,67 @@ func (ba *BatchRequest) IsSingleSkipLeaseCheckRequest() bool {
return ba.IsSingleRequest() && ba.hasFlag(skipLeaseCheck)
}

func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool {
return ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == m
}

// IsSingleTransferLeaseRequest returns true iff the batch contains a single
// request, and that request is a TransferLease.
func (ba *BatchRequest) IsSingleTransferLeaseRequest() bool {
return ba.isSingleRequestWithMethod(TransferLease)
}

// IsSinglePushTxnRequest returns true iff the batch contains a single
// request, and that request is for a PushTxn.
// request, and that request is a PushTxn.
func (ba *BatchRequest) IsSinglePushTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*PushTxnRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(PushTxn)
}

// IsSingleHeartbeatTxnRequest returns true iff the batch contains a single
// request, and that request is a HeartbeatTxn.
func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*HeartbeatTxnRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(HeartbeatTxn)
}

// IsSingleEndTxnRequest returns true iff the batch contains a single request,
// and that request is an EndTxnRequest.
func (ba *BatchRequest) IsSingleEndTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*EndTxnRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(EndTxn)
}

// IsSingleAbortTxnRequest returns true iff the batch contains a single request,
// and that request is an EndTxnRequest(commit=false).
func (ba *BatchRequest) IsSingleAbortTxnRequest() bool {
if ba.IsSingleRequest() {
if et, ok := ba.Requests[0].GetInner().(*EndTxnRequest); ok {
return !et.Commit
}
if ba.isSingleRequestWithMethod(EndTxn) {
return !ba.Requests[0].GetInner().(*EndTxnRequest).Commit
}
return false
}

// IsSingleSubsumeRequest returns true iff the batch contains a single request,
// and that request is an SubsumeRequest.
func (ba *BatchRequest) IsSingleSubsumeRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*SubsumeRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(Subsume)
}

// IsSingleComputeChecksumRequest returns true iff the batch contains a single
// request, and that request is a ComputeChecksumRequest.
func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*ComputeChecksumRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(ComputeChecksum)
}

// IsSingleCheckConsistencyRequest returns true iff the batch contains a single
// request, and that request is a CheckConsistencyRequest.
func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*CheckConsistencyRequest)
return ok
}
return false
return ba.isSingleRequestWithMethod(CheckConsistency)
}

// IsSingleAddSSTableRequest returns true iff the batch contains a single
// request, and that request is an AddSSTableRequest that will ingest as an SST,
// (i.e. does not have IngestAsWrites set)
func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
if ba.IsSingleRequest() {
req, ok := ba.Requests[0].GetInner().(*AddSSTableRequest)
return ok && !req.IngestAsWrites
if ba.isSingleRequestWithMethod(AddSSTable) {
return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites
}
return false
}
Expand Down

0 comments on commit f8b7708

Please sign in to comment.