diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index fb5e07729b4e..9604f224422f 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -470,3 +471,95 @@ func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) { err = txn.Commit(ctx) require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err) } + +// This test verifies that when a lease is moved to a node that does not match the +// lease preferences the replication queue moves it eagerly back, without considering the +// kv.allocator.min_lease_transfer_interval. +func TestLeasePreferencesRebalance(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + // set min lease transfer high, so we know it does affect the lease movement. + kvserver.MinLeaseTransferInterval.Override(sv, 24*time.Hour) + // Place all the leases in us-west. + zcfg := zonepb.DefaultZoneConfig() + zcfg.LeasePreferences = []zonepb.LeasePreference{ + { + Constraints: []zonepb.Constraint{ + {Type: zonepb.Constraint_REQUIRED, Key: "region", Value: "us-west"}, + }, + }, + } + numNodes := 3 + serverArgs := make(map[int]base.TestServerArgs) + locality := func(region string) roachpb.Locality { + return roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: region}, + }, + } + } + localities := []roachpb.Locality{ + locality("us-west"), + locality("us-east"), + locality("eu"), + } + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: localities[i], + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DefaultZoneConfigOverride: &zcfg, + }, + }, + Settings: settings, + } + } + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + key := keys.UserTableDataMin + tc.SplitRangeOrFatal(t, key) + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...)) + desc := tc.LookupRangeOrFatal(t, key) + leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil) + require.NoError(t, err) + require.Equal(t, tc.Target(0), leaseHolder) + + // Manually move lease out of preference. + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + + testutils.SucceedsSoon(t, func() error { + lh, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + if !lh.Equal(tc.Target(1)) { + return errors.Errorf("Expected leaseholder to be %s but was %s", tc.Target(1), lh) + } + return nil + }) + + tc.GetFirstStoreFromServer(t, 1).SetReplicateQueueActive(true) + require.NoError(t, tc.GetFirstStoreFromServer(t, 1).ForceReplicationScanAndProcess()) + + // The lease should be moved back by the rebalance queue to us-west. + testutils.SucceedsSoon(t, func() error { + lh, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + if !lh.Equal(tc.Target(0)) { + return errors.Errorf("Expected leaseholder to be %s but was %s", tc.Target(0), lh) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 9ff023f390da..41d8aec566a7 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3150,7 +3150,7 @@ func (r *Replica) adminScatter( // range. Note that we disable lease transfers until the final step as // transferring the lease prevents any further action on this node. var allowLeaseTransfer bool - canTransferLease := func() bool { return allowLeaseTransfer } + canTransferLease := func(ctx context.Context, repl *Replica) bool { return allowLeaseTransfer } for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); { requeue, err := rq.processOneChange(ctx, r, canTransferLease, false /* dryRun */) if err != nil { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 8557f23eed74..cac1cc1b19d0 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -49,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1275,3 +1276,26 @@ func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.Lease // We explicitly ignore the returned handle as we won't block on it. _ = r.requestLeaseLocked(ctx, st) } + +// checkLeaseRespectsPreferences checks if current replica owns the lease and +// if it respects the lease preferences defined in the zone config. If there are no +// preferences defined then it will return true and consider that to be in-conformance. +func (r *Replica) checkLeaseRespectsPreferences(ctx context.Context) (bool, error) { + if !r.OwnsValidLease(ctx, r.store.cfg.Clock.NowAsClockTimestamp()) { + return false, errors.Errorf("replica %s is not the leaseholder, cannot check lease preferences", r) + } + _, zone := r.DescAndZone() + if len(zone.LeasePreferences) == 0 { + return true, nil + } + storeDesc, err := r.store.Descriptor(ctx, false /* useCached */) + if err != nil { + return false, err + } + for _, preference := range zone.LeasePreferences { + if constraint.ConjunctionsCheck(*storeDesc, preference.Constraints) { + return true, nil + } + } + return false, nil +} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 887b557022c6..0f3239a01a72 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -12821,7 +12821,7 @@ func TestReplicateQueueProcessOne(t *testing.T) { tc.repl.mu.destroyStatus.Set(errBoom, destroyReasonMergePending) tc.repl.mu.Unlock() - requeue, err := tc.store.replicateQueue.processOneChange(ctx, tc.repl, func() bool { return false }, true /* dryRun */) + requeue, err := tc.store.replicateQueue.processOneChange(ctx, tc.repl, func(ctx context.Context, repl *Replica) bool { return false }, true /* dryRun */) require.Equal(t, errBoom, err) require.False(t, requeue) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 66086d94f187..b4a2ce180e69 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -49,10 +49,10 @@ const ( newReplicaGracePeriod = 5 * time.Minute ) -// minLeaseTransferInterval controls how frequently leases can be transferred +// MinLeaseTransferInterval controls how frequently leases can be transferred // for rebalancing. It does not prevent transferring leases in order to allow // a replica to be removed from a range. -var minLeaseTransferInterval = settings.RegisterDurationSetting( +var MinLeaseTransferInterval = settings.RegisterDurationSetting( "kv.allocator.min_lease_transfer_interval", "controls how frequently leases can be transferred for rebalancing. "+ "It does not prevent transferring leases in order to allow a "+ @@ -285,7 +285,7 @@ func (rq *replicateQueue) shouldQueue( // If the lease is valid, check to see if we should transfer it. status := repl.LeaseStatusAt(ctx, now) if status.IsValid() && - rq.canTransferLease() && + rq.canTransferLeaseFrom(ctx, repl) && rq.allocator.ShouldTransferLease(ctx, zone, voterReplicas, status.Lease.Replica.StoreID, repl.leaseholderStats) { log.VEventf(ctx, 2, "lease transfer needed, enqueuing") @@ -310,7 +310,7 @@ func (rq *replicateQueue) process( // selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { for { - requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */) + requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLeaseFrom, false /* dryRun */) if isSnapshotError(err) { // If ChangeReplicas failed because the snapshot failed, we log the // error but then return success indicating we should retry the @@ -344,7 +344,10 @@ func (rq *replicateQueue) process( } func (rq *replicateQueue) processOneChange( - ctx context.Context, repl *Replica, canTransferLease func() bool, dryRun bool, + ctx context.Context, + repl *Replica, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, + dryRun bool, ) (requeue bool, _ error) { // Check lease and destroy status here. The queue does this higher up already, but // adminScatter (and potential other future callers) also call this method and don't @@ -481,7 +484,7 @@ func (rq *replicateQueue) processOneChange( case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) case AllocatorConsiderRebalance: - return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLease, dryRun) + return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLeaseFrom, dryRun) case AllocatorFinalizeAtomicReplicationChange: _, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.store, repl.Desc()) // Requeue because either we failed to transition out of a joint state @@ -805,12 +808,12 @@ func (rq *replicateQueue) maybeTransferLeaseAway( repl *Replica, removeStoreID roachpb.StoreID, dryRun bool, - canTransferLease func() bool, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, ) (done bool, _ error) { if removeStoreID != repl.store.StoreID() { return false, nil } - if canTransferLease != nil && !canTransferLease() { + if canTransferLeaseFrom != nil && !canTransferLeaseFrom(ctx, repl) { return false, errors.Errorf("cannot transfer lease") } desc, zone := repl.DescAndZone() @@ -1066,7 +1069,7 @@ func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, - canTransferLease func() bool, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, dryRun bool, ) (requeue bool, _ error) { desc, zone := repl.DescAndZone() @@ -1101,7 +1104,7 @@ func (rq *replicateQueue) considerRebalance( if !ok { log.VEventf(ctx, 1, "no suitable rebalance target for non-voters") } else if done, err := rq.maybeTransferLeaseAway( - ctx, repl, removeTarget.StoreID, dryRun, canTransferLease, + ctx, repl, removeTarget.StoreID, dryRun, canTransferLeaseFrom, ); err != nil { log.VEventf(ctx, 1, "want to remove self, but failed to transfer lease away: %s", err) } else if done { @@ -1144,7 +1147,7 @@ func (rq *replicateQueue) considerRebalance( } } - if !canTransferLease() { + if !canTransferLeaseFrom(ctx, repl) { // No action was necessary and no rebalance target was found. Return // without re-queuing this replica. return false, nil @@ -1378,9 +1381,19 @@ func (rq *replicateQueue) changeReplicas( return nil } -func (rq *replicateQueue) canTransferLease() bool { +// canTransferLeaseFrom checks is a lease can be transferred from the specified +// replica. It considers two factors if the replica is in -conformance with +// lease preferences and the last time a transfer occurred to avoid thrashing. +func (rq *replicateQueue) canTransferLeaseFrom(ctx context.Context, repl *Replica) bool { + // Do a best effort check to see if this replica conforms to the configured + // lease preferences (if any), if it does not we want to encourage more + // aggressive lease movement and not delay it. + respectsLeasePreferences, err := repl.checkLeaseRespectsPreferences(ctx) + if err == nil && !respectsLeasePreferences { + return true + } if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil { - minInterval := minLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV) + minInterval := MinLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV) return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval } return true diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2eae1f2ee5de..84584870bfc0 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2731,7 +2731,7 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Recording, error) { ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, s.ClusterSettings().Tracer, "allocator dry run") defer cancel() - canTransferLease := func() bool { return true } + canTransferLease := func(ctx context.Context, repl *Replica) bool { return true } _, err := s.replicateQueue.processOneChange( ctx, repl, canTransferLease, true /* dryRun */) if err != nil {