Skip to content

Commit

Permalink
kvserver: make canTransferLease more aggressive when out of lease pre…
Browse files Browse the repository at this point in the history
…ferences

Informs #62485

In a reproduction of #62485 we noticed that it took awhile for leases
to move back to the preferred region. canTransferLease prevents a lease
from moving if it just moved, so if a non-desired replica steals the
lease to recover a range from unavailability it will have to potentially wait
for a whole replica_queue cycle to move the lease again. This change changes
the canLeaseTransfer to not have a timeout for lease movement, if it is out
of preferences.

Release note: None
  • Loading branch information
lunevalex committed Apr 15, 2021
1 parent 77a9403 commit 7495434
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 16 deletions.
93 changes: 93 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -470,3 +471,95 @@ func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) {
err = txn.Commit(ctx)
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err)
}

// This test verifies that when a lease is moved to a node that does not match the
// lease preferences the replication queue moves it eagerly back, without considering the
// kv.allocator.min_lease_transfer_interval.
func TestLeasePreferencesRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV
// set min lease transfer high, so we know it does affect the lease movement.
kvserver.MinLeaseTransferInterval.Override(sv, 24*time.Hour)
// Place all the leases in us-west.
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{Type: zonepb.Constraint_REQUIRED, Key: "region", Value: "us-west"},
},
},
}
numNodes := 3
serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("us-west"),
locality("us-east"),
locality("eu"),
}
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DefaultZoneConfigOverride: &zcfg,
},
},
Settings: settings,
}
}
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

key := keys.UserTableDataMin
tc.SplitRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...))
desc := tc.LookupRangeOrFatal(t, key)
leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(0), leaseHolder)

// Manually move lease out of preference.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))

testutils.SucceedsSoon(t, func() error {
lh, err := tc.FindRangeLeaseHolder(desc, nil)
if err != nil {
return err
}
if !lh.Equal(tc.Target(1)) {
return errors.Errorf("Expected leaseholder to be %s but was %s", tc.Target(1), lh)
}
return nil
})

tc.GetFirstStoreFromServer(t, 1).SetReplicateQueueActive(true)
require.NoError(t, tc.GetFirstStoreFromServer(t, 1).ForceReplicationScanAndProcess())

// The lease should be moved back by the rebalance queue to us-west.
testutils.SucceedsSoon(t, func() error {
lh, err := tc.FindRangeLeaseHolder(desc, nil)
if err != nil {
return err
}
if !lh.Equal(tc.Target(0)) {
return errors.Errorf("Expected leaseholder to be %s but was %s", tc.Target(0), lh)
}
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3150,7 +3150,7 @@ func (r *Replica) adminScatter(
// range. Note that we disable lease transfers until the final step as
// transferring the lease prevents any further action on this node.
var allowLeaseTransfer bool
canTransferLease := func() bool { return allowLeaseTransfer }
canTransferLease := func(ctx context.Context, repl *Replica) bool { return allowLeaseTransfer }
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
requeue, err := rq.processOneChange(ctx, r, canTransferLease, false /* dryRun */)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1275,3 +1276,26 @@ func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.Lease
// We explicitly ignore the returned handle as we won't block on it.
_ = r.requestLeaseLocked(ctx, st)
}

// checkLeaseRespectsPreferences checks if current replica owns the lease and
// if it respects the lease preferences defined in the zone config. If there are no
// preferences defined then it will return true and consider that to be in-conformance.
func (r *Replica) checkLeaseRespectsPreferences(ctx context.Context) (bool, error) {
if !r.OwnsValidLease(ctx, r.store.cfg.Clock.NowAsClockTimestamp()) {
return false, errors.Errorf("replica %s is not the leaseholder, cannot check lease preferences", r)
}
_, zone := r.DescAndZone()
if len(zone.LeasePreferences) == 0 {
return true, nil
}
storeDesc, err := r.store.Descriptor(ctx, false /* useCached */)
if err != nil {
return false, err
}
for _, preference := range zone.LeasePreferences {
if constraint.ConjunctionsCheck(*storeDesc, preference.Constraints) {
return true, nil
}
}
return false, nil
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12821,7 +12821,7 @@ func TestReplicateQueueProcessOne(t *testing.T) {
tc.repl.mu.destroyStatus.Set(errBoom, destroyReasonMergePending)
tc.repl.mu.Unlock()

requeue, err := tc.store.replicateQueue.processOneChange(ctx, tc.repl, func() bool { return false }, true /* dryRun */)
requeue, err := tc.store.replicateQueue.processOneChange(ctx, tc.repl, func(ctx context.Context, repl *Replica) bool { return false }, true /* dryRun */)
require.Equal(t, errBoom, err)
require.False(t, requeue)
}
Expand Down
39 changes: 26 additions & 13 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const (
newReplicaGracePeriod = 5 * time.Minute
)

// minLeaseTransferInterval controls how frequently leases can be transferred
// MinLeaseTransferInterval controls how frequently leases can be transferred
// for rebalancing. It does not prevent transferring leases in order to allow
// a replica to be removed from a range.
var minLeaseTransferInterval = settings.RegisterDurationSetting(
var MinLeaseTransferInterval = settings.RegisterDurationSetting(
"kv.allocator.min_lease_transfer_interval",
"controls how frequently leases can be transferred for rebalancing. "+
"It does not prevent transferring leases in order to allow a "+
Expand Down Expand Up @@ -285,7 +285,7 @@ func (rq *replicateQueue) shouldQueue(
// If the lease is valid, check to see if we should transfer it.
status := repl.LeaseStatusAt(ctx, now)
if status.IsValid() &&
rq.canTransferLease() &&
rq.canTransferLeaseFrom(ctx, repl) &&
rq.allocator.ShouldTransferLease(ctx, zone, voterReplicas, status.Lease.Replica.StoreID, repl.leaseholderStats) {

log.VEventf(ctx, 2, "lease transfer needed, enqueuing")
Expand All @@ -310,7 +310,7 @@ func (rq *replicateQueue) process(
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
for {
requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */)
requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLeaseFrom, false /* dryRun */)
if isSnapshotError(err) {
// If ChangeReplicas failed because the snapshot failed, we log the
// error but then return success indicating we should retry the
Expand Down Expand Up @@ -344,7 +344,10 @@ func (rq *replicateQueue) process(
}

func (rq *replicateQueue) processOneChange(
ctx context.Context, repl *Replica, canTransferLease func() bool, dryRun bool,
ctx context.Context,
repl *Replica,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
dryRun bool,
) (requeue bool, _ error) {
// Check lease and destroy status here. The queue does this higher up already, but
// adminScatter (and potential other future callers) also call this method and don't
Expand Down Expand Up @@ -481,7 +484,7 @@ func (rq *replicateQueue) processOneChange(
case AllocatorRemoveLearner:
return rq.removeLearner(ctx, repl, dryRun)
case AllocatorConsiderRebalance:
return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLease, dryRun)
return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLeaseFrom, dryRun)
case AllocatorFinalizeAtomicReplicationChange:
_, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.store, repl.Desc())
// Requeue because either we failed to transition out of a joint state
Expand Down Expand Up @@ -805,12 +808,12 @@ func (rq *replicateQueue) maybeTransferLeaseAway(
repl *Replica,
removeStoreID roachpb.StoreID,
dryRun bool,
canTransferLease func() bool,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
) (done bool, _ error) {
if removeStoreID != repl.store.StoreID() {
return false, nil
}
if canTransferLease != nil && !canTransferLease() {
if canTransferLeaseFrom != nil && !canTransferLeaseFrom(ctx, repl) {
return false, errors.Errorf("cannot transfer lease")
}
desc, zone := repl.DescAndZone()
Expand Down Expand Up @@ -1066,7 +1069,7 @@ func (rq *replicateQueue) considerRebalance(
ctx context.Context,
repl *Replica,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
canTransferLease func() bool,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
dryRun bool,
) (requeue bool, _ error) {
desc, zone := repl.DescAndZone()
Expand Down Expand Up @@ -1101,7 +1104,7 @@ func (rq *replicateQueue) considerRebalance(
if !ok {
log.VEventf(ctx, 1, "no suitable rebalance target for non-voters")
} else if done, err := rq.maybeTransferLeaseAway(
ctx, repl, removeTarget.StoreID, dryRun, canTransferLease,
ctx, repl, removeTarget.StoreID, dryRun, canTransferLeaseFrom,
); err != nil {
log.VEventf(ctx, 1, "want to remove self, but failed to transfer lease away: %s", err)
} else if done {
Expand Down Expand Up @@ -1144,7 +1147,7 @@ func (rq *replicateQueue) considerRebalance(
}
}

if !canTransferLease() {
if !canTransferLeaseFrom(ctx, repl) {
// No action was necessary and no rebalance target was found. Return
// without re-queuing this replica.
return false, nil
Expand Down Expand Up @@ -1378,9 +1381,19 @@ func (rq *replicateQueue) changeReplicas(
return nil
}

func (rq *replicateQueue) canTransferLease() bool {
// canTransferLeaseFrom checks is a lease can be transferred from the specified
// replica. It considers two factors if the replica is in -conformance with
// lease preferences and the last time a transfer occurred to avoid thrashing.
func (rq *replicateQueue) canTransferLeaseFrom(ctx context.Context, repl *Replica) bool {
// Do a best effort check to see if this replica conforms to the configured
// lease preferences (if any), if it does not we want to encourage more
// aggressive lease movement and not delay it.
respectsLeasePreferences, err := repl.checkLeaseRespectsPreferences(ctx)
if err == nil && !respectsLeasePreferences {
return true
}
if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil {
minInterval := minLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV)
minInterval := MinLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV)
return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2731,7 +2731,7 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS
func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Recording, error) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, s.ClusterSettings().Tracer, "allocator dry run")
defer cancel()
canTransferLease := func() bool { return true }
canTransferLease := func(ctx context.Context, repl *Replica) bool { return true }
_, err := s.replicateQueue.processOneChange(
ctx, repl, canTransferLease, true /* dryRun */)
if err != nil {
Expand Down

0 comments on commit 7495434

Please sign in to comment.