From fcf9888768a820c9bec16925fd96ec5043946313 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 21 Jul 2023 00:37:07 +0000 Subject: [PATCH] allocator/plan: place leases violating preferences in purgatory This patch places replicas in the replicate queue purgatory when it has a lease violating the lease preferences and it's unable to find a suitable target. This causes the replica to be retried more often. This will only trigger when replicas are eagerly enqueued (typically when we acquire a new lease that violates preferences), since we otherwise don't attempt to enqueue replicas when they don't have a valid lease transfer target. This patch also enables requeuing replicas after a successful rebalance, when the lease violates preferences. Epic: none Release note: None Co-authored-by: Erik Grinaker Co-authored-by: Austen McClernon --- .../allocator/allocatorimpl/allocator.go | 5 + pkg/kv/kvserver/allocator/base.go | 3 + pkg/kv/kvserver/replica_proposal.go | 5 +- pkg/kv/kvserver/replicate_queue.go | 76 ++++++++++-- pkg/kv/kvserver/replicate_queue_test.go | 117 ++++++++++++++++++ 5 files changed, 193 insertions(+), 13 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 6b529503ca8a..4065e8e49475 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2232,6 +2232,11 @@ func (a *Allocator) TransferLeaseTarget( forceDecisionWithoutStats bool, opts allocator.TransferLeaseOptions, ) roachpb.ReplicaDescriptor { + if a.knobs != nil { + if blockFn := a.knobs.BlockTransferTarget; blockFn != nil && blockFn() { + return roachpb.ReplicaDescriptor{} + } + } excludeLeaseRepl := opts.ExcludeLeaseRepl if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) || a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) { diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index ef073bf4c0ed..be9e9c991b01 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -95,6 +95,9 @@ type TestingKnobs struct { Desc() *roachpb.RangeDescriptor StoreID() roachpb.StoreID }) *raft.Status + // BlockTransferTarget can be used to block returning any transfer targets + // from TransferLeaseTarget. + BlockTransferTarget func() bool } // QPSRebalanceThreshold is much like rangeRebalanceThreshold, but for diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 981996ee9456..43830e3431ca 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -464,8 +464,9 @@ func (r *Replica) leasePostApplyLocked( // it in the replicate queue. if leaseChangingHands && iAmTheLeaseHolder { if LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) { - preferenceStatus := makeLeasePreferenceStatus(st, r.store.StoreID(), r.store.Attrs(), - r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) + preferenceStatus := makeLeasePreferenceStatus(r.leaseStatusAtRLocked(ctx, now), + r.store.StoreID(), r.store.Attrs(), r.store.nodeDesc.Attrs, + r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) switch preferenceStatus { case leasePreferencesOK, leasePreferencesLessPreferred, leasePreferencesUnknown: // We could also enqueue the lease when we are a less preferred diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index c7e65372c38b..1f2c2111d50b 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -922,15 +922,20 @@ func (rq *replicateQueue) ShouldRequeue(ctx context.Context, change ReplicateQue // time around. requeue = false - } else if change.Action == allocatorimpl.AllocatorConsiderRebalance { - // Don't requeue after a successful rebalance operation. - requeue = false - } else if change.Op.lhBeingRemoved() { // Don't requeue if the leaseholder was removed as a voter or the range // lease was transferred away. requeue = false + } else if change.Action == allocatorimpl.AllocatorConsiderRebalance && + !change.replica.leaseViolatesPreferences(ctx) { + // Don't requeue after a successful rebalance operation, when the lease + // does not violate any preferences. If the lease does violate preferences, + // the next process attempt will either find a target to transfer the lease + // or place the replica into purgatory if unable. See + // CantTransferLeaseViolatingPreferencesError. + requeue = false + } else { // Otherwise, requeue to see if there is more work to do. As the // operation succeeded and was planned for a repair action i.e. not @@ -1776,7 +1781,8 @@ func (rq *replicateQueue) considerRebalance( if !canTransferLeaseFrom(ctx, repl) { return nil, nil } - return rq.shedLeaseTarget( + var err error + op, err = rq.shedLeaseTarget( ctx, repl, desc, @@ -1786,8 +1792,19 @@ func (rq *replicateQueue) considerRebalance( ExcludeLeaseRepl: false, CheckCandidateFullness: true, }, - ), nil - + ) + if err != nil { + if scatter && errors.Is(err, CantTransferLeaseViolatingPreferencesError{}) { + // When scatter is specified, we ignore lease preference violation + // errors returned from shedLeaseTarget. These errors won't place the + // replica into purgatory because they are called outside the replicate + // queue loop, directly. + log.KvDistribution.VEventf(ctx, 3, "%v", err) + err = nil + } + return nil, err + } + return op, nil } // If we have a valid rebalance action (ok == true) and we haven't @@ -1950,22 +1967,36 @@ func (rq *replicateQueue) shedLeaseTarget( desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig, opts allocator.TransferLeaseOptions, -) (op AllocationOp) { +) (op AllocationOp, _ error) { usage := RangeUsageInfoForRepl(repl) + existingVoters := desc.Replicas().VoterDescriptors() // Learner replicas aren't allowed to become the leaseholder or raft leader, // so only consider the `VoterDescriptors` replicas. target := rq.allocator.TransferLeaseTarget( ctx, rq.storePool, conf, - desc.Replicas().VoterDescriptors(), + existingVoters, repl, usage, false, /* forceDecisionWithoutStats */ opts, ) if target == (roachpb.ReplicaDescriptor{}) { - return nil + // If we don't find a suitable target, but we own a lease violating the + // lease preferences, and there is a more suitable target, return an error + // to place the replica in purgatory and retry sooner. This typically + // happens when we've just acquired a violating lease and we eagerly + // enqueue the replica before we've received Raft leadership, which + // prevents us from finding appropriate lease targets since we can't + // determine if any are behind. + liveVoters, _ := rq.storePool.LiveAndDeadReplicas( + existingVoters, false /* includeSuspectAndDrainingStores */) + preferred := rq.allocator.PreferredLeaseholders(rq.storePool, conf, liveVoters) + if len(preferred) > 0 && repl.leaseViolatesPreferences(ctx) { + return nil, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID} + } + return nil, nil } op = AllocationTransferLeaseOp{ @@ -1974,7 +2005,7 @@ func (rq *replicateQueue) shedLeaseTarget( usage: usage, bypassSafetyChecks: false, } - return op + return op, nil } // shedLease takes in a leaseholder replica, looks for a target for transferring @@ -2199,3 +2230,26 @@ func RangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo { }, } } + +// CantTransferLeaseViolatingPreferencesError is an error returned when a lease +// violates the lease preferences, but we couldn't find a valid target to +// transfer the lease to. It indicates that the replica should be sent to +// purgatory, to retry the transfer faster. +type CantTransferLeaseViolatingPreferencesError struct { + RangeID roachpb.RangeID +} + +var _ errors.SafeFormatter = CantTransferLeaseViolatingPreferencesError{} + +func (e CantTransferLeaseViolatingPreferencesError) Error() string { return fmt.Sprint(e) } + +func (e CantTransferLeaseViolatingPreferencesError) Format(s fmt.State, verb rune) { + errors.FormatError(e, s, verb) +} + +func (e CantTransferLeaseViolatingPreferencesError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("can't transfer r%d lease violating preferences, no suitable target", e.RangeID) + return nil +} + +func (CantTransferLeaseViolatingPreferencesError) PurgatoryErrorMarker() {} diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index e0752ab801c2..bd2d9d0ea90a 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -2487,3 +2488,119 @@ func TestReplicateQueueExpirationLeasesOnly(t *testing.T) { return epochLeases > 0 && expLeases > 0 && expLeases <= initialExpLeases }, 30*time.Second, 500*time.Millisecond) } + +// TestReplicateQueueLeasePreferencePurgatoryError tests that not finding a +// lease transfer target whilst violating lease preferences, will put the +// replica in the replicate queue purgatory. +func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + skip.UnderRace(t) // too slow under stressrace + skip.UnderDeadlock(t) + skip.UnderShort(t) + + const initialPreferredNode = 1 + const nextPreferredNode = 2 + const numRanges = 40 + const numNodes = 3 + + var blockTransferTarget atomic.Bool + + blockTransferTargetFn := func() bool { + block := blockTransferTarget.Load() + return block + } + + knobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + AllocatorKnobs: &allocator.TestingKnobs{ + BlockTransferTarget: blockTransferTargetFn, + }, + }, + } + + serverArgs := make(map[int]base.TestServerArgs, numNodes) + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Knobs: knobs, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "rack", Value: fmt.Sprintf("%d", i+1)}}, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + db := tc.Conns[0] + setLeasePreferences := func(node int) { + _, err := db.Exec(fmt.Sprintf(`ALTER TABLE t CONFIGURE ZONE USING + num_replicas=3, num_voters=3, voter_constraints='[]', lease_preferences='[[+rack=%d]]'`, + node)) + require.NoError(t, err) + } + + leaseCount := func(node int) int { + var count int + err := db.QueryRow(fmt.Sprintf( + "SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node), + ).Scan(&count) + require.NoError(t, err) + return count + } + + checkLeaseCount := func(node, expectedLeaseCount int) error { + if count := leaseCount(node); count != expectedLeaseCount { + return errors.Errorf("expected %d leases on node %d, found %d", + expectedLeaseCount, node, count) + } + return nil + } + + // Create a test table with numRanges-1 splits, to end up with numRanges + // ranges. We will use the test table ranges to assert on the purgatory lease + // preference behavior. + _, err := db.Exec("CREATE TABLE t (i int);") + require.NoError(t, err) + _, err = db.Exec( + fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1)) + require.NoError(t, err) + _, err = db.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t;") + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) + + store := tc.GetFirstStoreFromServer(t, 0) + // Set a preference on the initial node, then wait until all the leases for + // the test table are on that node. + setLeasePreferences(initialPreferredNode) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, store.ForceReplicationScanAndProcess()) + return checkLeaseCount(initialPreferredNode, numRanges) + }) + + // Block returning transfer targets from the allocator, then update the + // preferred node. We expect that every range for the test table will end up + // in purgatory on the initially preferred node. + blockTransferTarget.Store(true) + setLeasePreferences(nextPreferredNode) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, store.ForceReplicationScanAndProcess()) + if purgLen := store.ReplicateQueuePurgatoryLength(); purgLen != numRanges { + return errors.Errorf("expected %d in purgatory but got %v", numRanges, purgLen) + } + return nil + }) + + // Lastly, unblock returning transfer targets. Expect that the leases from + // the test table all move to the new preference. Note we don't force a + // replication queue scan, as the purgatory retry should handle the + // transfers. + blockTransferTarget.Store(false) + testutils.SucceedsSoon(t, func() error { + return checkLeaseCount(nextPreferredNode, numRanges) + }) +}