Skip to content

Commit

Permalink
Merge pull request #90202 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.2.0-90106

release-22.2.0: kv: reacquire proscribed leases on drain, then transfer
  • Loading branch information
nvanbenschoten authored Oct 21, 2022
2 parents 6e67dfb + 06bbac9 commit 0ac7e4e
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 20 deletions.
6 changes: 4 additions & 2 deletions pkg/cmd/roachtest/tests/quit.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ func (q *quitTest) checkNoLeases(ctx context.Context, nodeID int) {
invLeaseMap[i] = invalidLeases
}
}
// (1): is there a range with no replica outside of nodeID?
// (1): is there a range where every replica thinks the lease is held by
// nodeID? If so, the value in knownRanges will be set to 0.
var leftOver []string
for r, n := range knownRanges {
if n == 0 {
Expand All @@ -322,7 +323,8 @@ func (q *quitTest) checkNoLeases(ctx context.Context, nodeID int) {
if len(leftOver) > 0 {
q.Fatalf("(1) ranges with no lease outside of node %d: %# v", nodeID, pretty.Formatter(leftOver))
}
// (2): is there a range with left over replicas on nodeID?
// (2): is there a range where any replica thinks the lease is held by
// nodeID?
//
// TODO(knz): Eventually we want this condition to be always
// true, i.e. fail the test immediately if found to be false
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func TransferLease(
prevLease, _ := cArgs.EvalCtx.GetLease()

newLease := args.Lease
args.Lease = roachpb.Lease{} // prevent accidental use below

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
Expand Down
95 changes: 88 additions & 7 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1372,7 +1373,8 @@ type leaseTransferTest struct {
replica0Desc, replica1Desc roachpb.ReplicaDescriptor
leftKey roachpb.Key
filterMu syncutil.Mutex
filter func(filterArgs kvserverbase.FilterArgs) *roachpb.Error
evalFilter kvserverbase.ReplicaCommandFilter
propFilter kvserverbase.ReplicaProposalFilter
waitForTransferBlocked atomic.Value
transferBlocked chan struct{}
manualClock *hlc.HybridManualClock
Expand All @@ -1384,12 +1386,22 @@ func setupLeaseTransferTest(t *testing.T) *leaseTransferTest {
manualClock: hlc.NewHybridManualClock(),
}

testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error {
testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
l.filterMu.Lock()
filterCopy := l.evalFilter
l.filterMu.Unlock()
if filterCopy != nil {
return filterCopy(args)
}
return nil
}

testingProposalFilter := func(args kvserverbase.ProposalFilterArgs) *roachpb.Error {
l.filterMu.Lock()
filterCopy := l.filter
filterCopy := l.propFilter
l.filterMu.Unlock()
if filterCopy != nil {
return filterCopy(filterArgs)
return filterCopy(args)
}
return nil
}
Expand All @@ -1413,6 +1425,7 @@ func setupLeaseTransferTest(t *testing.T) *leaseTransferTest {
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: testingEvalFilter,
},
TestingProposalFilter: testingProposalFilter,
LeaseTransferBlockedOnExtensionEvent: leaseTransferBlockedOnExtensionEvent,
},
Server: &server.TestingKnobs{
Expand Down Expand Up @@ -1487,10 +1500,10 @@ func (l *leaseTransferTest) setFilter(setTo bool, extensionSem chan struct{}) {
l.filterMu.Lock()
defer l.filterMu.Unlock()
if !setTo {
l.filter = nil
l.evalFilter = nil
return
}
l.filter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error {
l.evalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error {
if filterArgs.Sid != l.tc.Target(1).StoreID {
return nil
}
Expand All @@ -1502,7 +1515,7 @@ func (l *leaseTransferTest) setFilter(setTo bool, extensionSem chan struct{}) {
// Notify the main thread that the extension is in progress and wait for
// the signal to proceed.
l.filterMu.Lock()
l.filter = nil
l.evalFilter = nil
l.filterMu.Unlock()
extensionSem <- struct{}{}
log.Infof(filterArgs.Ctx, "filter blocking request: %s", llReq)
Expand Down Expand Up @@ -1774,6 +1787,74 @@ func TestLeaseExpirationBasedDrainTransferWithExtension(t *testing.T) {
}
}

// TestLeaseExpirationBasedDrainTransferWithProscribed verifies that a draining
// store reacquires proscribed leases for ranges before transferring those
// leases away.
func TestLeaseExpirationBasedDrainTransferWithProscribed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
l := setupLeaseTransferTest(t)
defer l.tc.Stopper().Stop(ctx)
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)

var failedOnce sync.Once
failedCh := make(chan struct{})
failLeaseTransfers := func(fail bool) {
l.filterMu.Lock()
defer l.filterMu.Unlock()
if !fail {
l.propFilter = nil
return
}
l.propFilter = func(filterArgs kvserverbase.ProposalFilterArgs) *roachpb.Error {
if filterArgs.Req.IsSingleTransferLeaseRequest() {
target := filterArgs.Req.Requests[0].GetTransferLease().Lease.Replica
if target == l.replica0Desc {
failedOnce.Do(func() { close(failedCh) })
return roachpb.NewError(kvserver.NewLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(
target, raftutil.ReplicaStateProbe))
}
}
return nil
}
}

// Fail lease transfers on the target range after the previous lease has been
// revoked (after evaluation, during raft proposal). In doing so, we leave the
// range with a PROSCRIBED lease.
failLeaseTransfers(true /* fail */)

// Drain node 1.
drainedCh := make(chan struct{})
go func() {
l.tc.GetFirstStoreFromServer(t, 1).SetDraining(true, nil /* reporter */, false /* verbose */)
close(drainedCh)
}()

// Wait until the lease transfer has failed at least once.
<-failedCh

// The drain should be unable to succeed.
select {
case <-drainedCh:
t.Fatalf("drain unexpectedly succeeded")
case <-time.After(10 * time.Millisecond):
}

// Stop failing lease transfers.
failLeaseTransfers(false /* fail */)

// The drain should succeed.
<-drainedCh
l.checkHasLease(t, 0)
}

// TestLeaseExpirationBelowFutureTimeRequest tests two cases where a
// request is sent to a range with a future-time timestamp that is past
// the current expiration time of the range's lease. In the first case,
Expand Down
61 changes: 51 additions & 10 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,24 +1542,65 @@ func (s *Store) SetDraining(drain bool, reporter func(int, redact.SafeString), v
break
}

// Is the lease owned by this store?
leaseLocallyOwned := drainingLeaseStatus.OwnedBy(s.StoreID())

// Is there some other replica that we can transfer the lease to?
// Learner replicas aren't allowed to become the leaseholder or raft
// leader, so only consider the `Voters` replicas.
needsLeaseTransfer := len(r.Desc().Replicas().VoterDescriptors()) > 1 &&
drainingLeaseStatus.IsValid() &&
drainingLeaseStatus.OwnedBy(s.StoreID())
// leader, so only consider the Voters replicas.
transferTargetAvailable := len(r.Desc().Replicas().VoterDescriptors()) > 1

// Note that this code doesn't deal with transferring the Raft
// leadership. Leadership tries to follow the lease, so when leases
// are transferred, leadership will be transferred too. For ranges
// without leases we probably should try to move the leadership
// manually to a non-draining replica.
// If so, and the lease is proscribed, we have to reacquire it so that
// we can transfer it away. Other replicas won't know that this lease
// is proscribed and not usable by this replica, so failing to
// transfer the lease away could cause temporary unavailability.
needsLeaseReacquisition := leaseLocallyOwned && transferTargetAvailable &&
drainingLeaseStatus.State == kvserverpb.LeaseState_PROSCRIBED

if !needsLeaseTransfer {
// Otherwise, if the lease is locally owned and valid, transfer it.
needsLeaseTransfer := leaseLocallyOwned && transferTargetAvailable &&
drainingLeaseStatus.State == kvserverpb.LeaseState_VALID

if !needsLeaseTransfer && !needsLeaseReacquisition {
// Skip this replica.
atomic.AddInt32(&numTransfersAttempted, -1)
return
}

if needsLeaseReacquisition {
// Re-acquire the proscribed lease for this replica so that we can
// transfer it away during a later iteration.
desc := r.Desc()
if verbose || log.V(1) {
// This logging is useful to troubleshoot incomplete drains.
log.Infof(ctx, "attempting to acquire proscribed lease %v for range %s",
drainingLeaseStatus.Lease, desc)
}

_, pErr := r.redirectOnOrAcquireLease(ctx)
if pErr != nil {
const failFormat = "failed to acquire proscribed lease %s for range %s when draining: %v"
infoArgs := []interface{}{drainingLeaseStatus.Lease, desc, pErr}
if verbose {
log.Dev.Infof(ctx, failFormat, infoArgs...)
} else {
log.VErrEventf(ctx, 1 /* level */, failFormat, infoArgs...)
}
// The lease reacquisition failed. Either we no longer hold the
// lease or we will need to attempt to reacquire it again. Either
// way, handle this on a future iteration.
return
}

// The lease reacquisition succeeded. Proceed to the lease transfer.
}

// Note that this code doesn't deal with transferring the Raft
// leadership. Leadership tries to follow the lease, so when leases
// are transferred, leadership will be transferred too. For ranges
// without leases we probably should try to move the leadership
// manually to a non-draining replica.

desc, conf := r.DescAndSpanConfig()

if verbose || log.V(1) {
Expand Down

0 comments on commit 0ac7e4e

Please sign in to comment.