Skip to content

Commit

Permalink
kvserver: pro-actively enqueue less preferred leases into lease queue
Browse files Browse the repository at this point in the history
Leases are checked against lease preferences after application, when a
lease violates the applied preferences it is enqueued into the lease
queue. Leases may also satisfy some preference but not the first one, in
which case they are considered less preferred.

Less preferred leases previously needed to wait for the replica scanner
to enqueue them into the lease queue before the lease would be
considered to be moved to the first preference.

Enqueue less preferred leases after application, similar to leases
violating applied preferences.

Resolves: #116081
Release note: None
  • Loading branch information
kvoli committed Mar 15, 2024
1 parent ae00dcb commit 54dd22d
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/tests/lease_preferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func registerLeasePreferences(r registry.Registry) {
replFactor: 5,
checkNodes: []int{1, 3, 4, 5},
eventFn: makeStopNodesEventFn(2 /* targets */),
waitForLessPreferred: false,
waitForLessPreferred: true,
postEventWaitDuration: 10 * time.Minute,
})
},
Expand Down Expand Up @@ -161,7 +161,7 @@ func registerLeasePreferences(r registry.Registry) {
eventFn: makeTransferLeasesEventFn(
5 /* gateway */, 5 /* target */),
checkNodes: []int{1, 2, 3, 4, 5},
waitForLessPreferred: false,
waitForLessPreferred: true,
postEventWaitDuration: 10 * time.Minute,
})
},
Expand Down Expand Up @@ -201,7 +201,7 @@ func runLeasePreferences(
// ...
// dc=N: n2N-1 n2N
fmt.Sprintf("--locality=region=fake-region,zone=fake-zone,dc=%d", (node-1)/2+1),
"--vmodule=replica_proposal=2,replicate_queue=3,replicate=3")
"--vmodule=replica_proposal=2,lease_queue=3,lease=3")
c.Start(ctx, t.L(), opts, settings, c.Node(node))

}
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2163,10 +2163,10 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
return false
}

// leaseholderShouldMoveDueToPreferences returns true if the current leaseholder
// LeaseholderShouldMoveDueToPreferences returns true if the current leaseholder
// is in violation of lease preferences _that can otherwise be satisfied_ by
// some existing replica.
func (a *Allocator) leaseholderShouldMoveDueToPreferences(
func (a *Allocator) LeaseholderShouldMoveDueToPreferences(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf *roachpb.SpanConfig,
Expand All @@ -2176,6 +2176,7 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
GetFirstIndex() kvpb.RaftIndex
},
allExistingReplicas []roachpb.ReplicaDescriptor,
exclReplsInNeedOfSnapshots bool,
) bool {
// Defensive check to ensure that this is never called with a replica set that
// does not contain the leaseholder.
Expand All @@ -2202,7 +2203,7 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
// If there are any replicas that do match lease preferences, then we check if
// the existing leaseholder is one of them.
preferred := a.PreferredLeaseholders(storePool, conf, candidates)
if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots {
if exclReplsInNeedOfSnapshots {
preferred = excludeReplicasInNeedOfSnapshots(
ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred)
}
Expand Down Expand Up @@ -2274,7 +2275,8 @@ func (a *Allocator) TransferLeaseTarget(
}
}
excludeLeaseRepl := opts.ExcludeLeaseRepl
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) ||
excludeReplsInNeedOfSnap := a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots
if a.LeaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing, excludeReplsInNeedOfSnap) ||
a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) {
// Explicitly exclude the current leaseholder from the result set if it is
// in violation of lease preferences that can be satisfied by some other
Expand Down Expand Up @@ -2633,7 +2635,8 @@ func (a *Allocator) ShouldTransferLease(
},
usageInfo allocator.RangeUsageInfo,
) TransferLeaseDecision {
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) {
excludeReplsInNeedOfSnap := a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots
if a.LeaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing, excludeReplsInNeedOfSnap) {
return TransferLeaseForPreferences
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/allocator/plan/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,14 @@ func (lp LeasePlanner) PlanOneChange(
// enqueue the replica before we've received Raft leadership, which
// prevents us from finding appropriate lease targets since we can't
// determine if any are behind.
liveVoters, _ := lp.storePool.LiveAndDeadReplicas(
existingVoters, false /* includeSuspectAndDrainingStores */)
preferred := lp.allocator.PreferredLeaseholders(lp.storePool, conf, liveVoters)
if len(preferred) > 0 &&
repl.LeaseViolatesPreferences(ctx, conf) {
if lp.allocator.LeaseholderShouldMoveDueToPreferences(
ctx,
lp.storePool,
conf,
repl,
existingVoters,
false, /* excludeReplsInNeedOfSnap */
) {
return change, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID}
}
// There is no target and no more preferred leaseholders, a no-op.
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/kvserver/lease_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,16 @@ func (lq *leaseQueue) updateChan() <-chan time.Time {
func (lq *leaseQueue) canTransferLeaseFrom(
ctx context.Context, repl *Replica, conf *roachpb.SpanConfig,
) bool {
// Do a best effort check to see if this replica conforms to the configured
// lease preferences (if any), if it does not we want to encourage more
// aggressive lease movement and not delay it.
if repl.LeaseViolatesPreferences(ctx, conf) {
// If there are preferred leaseholders and the current leaseholder is not in
// this group, then always allow the lease to transfer without delay.
if lq.allocator.LeaseholderShouldMoveDueToPreferences(
ctx,
lq.storePool,
conf,
repl,
repl.Desc().Replicas().VoterDescriptors(),
false, /* excludeReplsInNeedOfSnap */
) {
return true
}
if lastLeaseTransfer := lq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil {
Expand Down
94 changes: 93 additions & 1 deletion pkg/kv/kvserver/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -145,7 +148,7 @@ func TestLeaseQueueLeasePreferencePurgatoryError(t *testing.T) {

// Lastly, unblock returning transfer targets. Expect that the leases from
// the test table all move to the new preference. Note we don't force a
// replication queue scan, as the purgatory retry should handle the
// lease queue scan, as the purgatory retry should handle the
// transfers.
blockTransferTarget.Store(false)
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -289,3 +292,92 @@ func TestLeaseQueueRaceReplicateQueue(t *testing.T) {
_, processErr, _ := repl.Store().Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
require.ErrorIs(t, processErr, plan.NewErrAllocatorToken("lease"))
}

// TestLeaseQueueProactiveEnqueueOnPreferences asserts that a lease quickly
// transfers back to a store which satisfies the first applied lease
// preference.
func TestLeaseQueueProactiveEnqueueOnPreferences(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

const numNodes = 3
const preferredNode = 1
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{
Type: zonepb.Constraint_REQUIRED,
Key: "rack",
Value: "1",
},
},
},
{
Constraints: []zonepb.Constraint{
{
Type: zonepb.Constraint_REQUIRED,
Key: "rack",
Value: "2",
},
},
},
}

knobs := base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: false,
},
Server: &server.TestingKnobs{
DefaultZoneConfigOverride: &zcfg,
},
}
serverArgs := make(map[int]base.TestServerArgs, numNodes)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: knobs,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "rack", Value: fmt.Sprintf("%d", i+1)}},
},
}
}
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
require.NoError(t, tc.WaitForFullReplication())
desc := tc.GetRaftLeader(t, roachpb.RKey(scratchKey)).Desc()

t.Run("violating", func(t *testing.T) {
tc.TransferRangeLeaseOrFatal(t, *desc, roachpb.ReplicationTarget{NodeID: 3, StoreID: 3})
testutils.SucceedsSoon(t, func() error {
target, err := tc.FindRangeLeaseHolder(*desc, nil)
if err != nil {
return err
}
if target.StoreID != preferredNode {
return errors.Errorf("lease not on preferred node %v, on %v",
preferredNode, target)
}
return nil
})
})

t.Run("less-preferred", func(t *testing.T) {
tc.TransferRangeLeaseOrFatal(t, *desc, roachpb.ReplicationTarget{NodeID: 2, StoreID: 2})
testutils.SucceedsSoon(t, func() error {
target, err := tc.FindRangeLeaseHolder(*desc, nil)
if err != nil {
return err
}
if target.StoreID != preferredNode {
return errors.Errorf("lease not on preferred node %v, on %v",
preferredNode, target)
}
return nil
})
})
}
13 changes: 9 additions & 4 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,20 @@ func (r *Replica) leasePostApplyLocked(
preferenceStatus := CheckStoreAgainstLeasePreferences(r.store.StoreID(), r.store.Attrs(),
r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences)
switch preferenceStatus {
case LeasePreferencesOK, LeasePreferencesLessPreferred:
// We could also enqueue the lease when we are a less preferred
// leaseholder, however the replicate queue will eventually get to it and
// we already satisfy _some_ preference.
case LeasePreferencesOK:
case LeasePreferencesViolating:
log.VEventf(ctx, 2,
"acquired lease violates lease preferences, enqueuing for transfer [lease=%v preferences=%v]",
newLease, r.mu.conf.LeasePreferences)
r.store.leaseQueue.AddAsync(ctx, r, allocatorimpl.TransferLeaseForPreferences.Priority())
case LeasePreferencesLessPreferred:
// Enqueue the replica at a slightly lower priority than violation to
// process the lease transfer after ranges where the leaseholder is
// violating the preference.
log.VEventf(ctx, 2,
"acquired lease is less preferred, enqueuing for transfer [lease=%v preferences=%v]",
newLease, r.mu.conf.LeasePreferences)
r.store.leaseQueue.AddAsync(ctx, r, allocatorimpl.TransferLeaseForPreferences.Priority()-1)
default:
log.Fatalf(ctx, "unknown lease preferences status: %v", preferenceStatus)
}
Expand Down

0 comments on commit 54dd22d

Please sign in to comment.