Skip to content

Commit

Permalink
allocator/plan: place leases violating preferences in purgatory
Browse files Browse the repository at this point in the history
This patch places replicas in the replicate queue purgatory when
it has a lease violating the lease preferences and it's unable to find a
suitable target. This causes the replica to be retried more often.

This will only trigger when replicas are eagerly enqueued (typically
when we acquire a new lease that violates preferences), since we
otherwise don't attempt to enqueue replicas when they don't have a valid
lease transfer target.

This patch also enables requeuing replicas after a successful rebalance,
when the lease violates preferences.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
kvoli and erikgrinaker committed Jul 26, 2023
1 parent 34aad03 commit fcf9888
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 13 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,11 @@ func (a *Allocator) TransferLeaseTarget(
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
if a.knobs != nil {
if blockFn := a.knobs.BlockTransferTarget; blockFn != nil && blockFn() {
return roachpb.ReplicaDescriptor{}
}
}
excludeLeaseRepl := opts.ExcludeLeaseRepl
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) ||
a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type TestingKnobs struct {
Desc() *roachpb.RangeDescriptor
StoreID() roachpb.StoreID
}) *raft.Status
// BlockTransferTarget can be used to block returning any transfer targets
// from TransferLeaseTarget.
BlockTransferTarget func() bool
}

// QPSRebalanceThreshold is much like rangeRebalanceThreshold, but for
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ func (r *Replica) leasePostApplyLocked(
// it in the replicate queue.
if leaseChangingHands && iAmTheLeaseHolder {
if LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) {
preferenceStatus := makeLeasePreferenceStatus(st, r.store.StoreID(), r.store.Attrs(),
r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences)
preferenceStatus := makeLeasePreferenceStatus(r.leaseStatusAtRLocked(ctx, now),
r.store.StoreID(), r.store.Attrs(), r.store.nodeDesc.Attrs,
r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences)
switch preferenceStatus {
case leasePreferencesOK, leasePreferencesLessPreferred, leasePreferencesUnknown:
// We could also enqueue the lease when we are a less preferred
Expand Down
76 changes: 65 additions & 11 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,15 +922,20 @@ func (rq *replicateQueue) ShouldRequeue(ctx context.Context, change ReplicateQue
// time around.
requeue = false

} else if change.Action == allocatorimpl.AllocatorConsiderRebalance {
// Don't requeue after a successful rebalance operation.
requeue = false

} else if change.Op.lhBeingRemoved() {
// Don't requeue if the leaseholder was removed as a voter or the range
// lease was transferred away.
requeue = false

} else if change.Action == allocatorimpl.AllocatorConsiderRebalance &&
!change.replica.leaseViolatesPreferences(ctx) {
// Don't requeue after a successful rebalance operation, when the lease
// does not violate any preferences. If the lease does violate preferences,
// the next process attempt will either find a target to transfer the lease
// or place the replica into purgatory if unable. See
// CantTransferLeaseViolatingPreferencesError.
requeue = false

} else {
// Otherwise, requeue to see if there is more work to do. As the
// operation succeeded and was planned for a repair action i.e. not
Expand Down Expand Up @@ -1776,7 +1781,8 @@ func (rq *replicateQueue) considerRebalance(
if !canTransferLeaseFrom(ctx, repl) {
return nil, nil
}
return rq.shedLeaseTarget(
var err error
op, err = rq.shedLeaseTarget(
ctx,
repl,
desc,
Expand All @@ -1786,8 +1792,19 @@ func (rq *replicateQueue) considerRebalance(
ExcludeLeaseRepl: false,
CheckCandidateFullness: true,
},
), nil

)
if err != nil {
if scatter && errors.Is(err, CantTransferLeaseViolatingPreferencesError{}) {
// When scatter is specified, we ignore lease preference violation
// errors returned from shedLeaseTarget. These errors won't place the
// replica into purgatory because they are called outside the replicate
// queue loop, directly.
log.KvDistribution.VEventf(ctx, 3, "%v", err)
err = nil
}
return nil, err
}
return op, nil
}

// If we have a valid rebalance action (ok == true) and we haven't
Expand Down Expand Up @@ -1950,22 +1967,36 @@ func (rq *replicateQueue) shedLeaseTarget(
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
opts allocator.TransferLeaseOptions,
) (op AllocationOp) {
) (op AllocationOp, _ error) {
usage := RangeUsageInfoForRepl(repl)
existingVoters := desc.Replicas().VoterDescriptors()
// Learner replicas aren't allowed to become the leaseholder or raft leader,
// so only consider the `VoterDescriptors` replicas.
target := rq.allocator.TransferLeaseTarget(
ctx,
rq.storePool,
conf,
desc.Replicas().VoterDescriptors(),
existingVoters,
repl,
usage,
false, /* forceDecisionWithoutStats */
opts,
)
if target == (roachpb.ReplicaDescriptor{}) {
return nil
// If we don't find a suitable target, but we own a lease violating the
// lease preferences, and there is a more suitable target, return an error
// to place the replica in purgatory and retry sooner. This typically
// happens when we've just acquired a violating lease and we eagerly
// enqueue the replica before we've received Raft leadership, which
// prevents us from finding appropriate lease targets since we can't
// determine if any are behind.
liveVoters, _ := rq.storePool.LiveAndDeadReplicas(
existingVoters, false /* includeSuspectAndDrainingStores */)
preferred := rq.allocator.PreferredLeaseholders(rq.storePool, conf, liveVoters)
if len(preferred) > 0 && repl.leaseViolatesPreferences(ctx) {
return nil, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID}
}
return nil, nil
}

op = AllocationTransferLeaseOp{
Expand All @@ -1974,7 +2005,7 @@ func (rq *replicateQueue) shedLeaseTarget(
usage: usage,
bypassSafetyChecks: false,
}
return op
return op, nil
}

// shedLease takes in a leaseholder replica, looks for a target for transferring
Expand Down Expand Up @@ -2199,3 +2230,26 @@ func RangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo {
},
}
}

// CantTransferLeaseViolatingPreferencesError is an error returned when a lease
// violates the lease preferences, but we couldn't find a valid target to
// transfer the lease to. It indicates that the replica should be sent to
// purgatory, to retry the transfer faster.
type CantTransferLeaseViolatingPreferencesError struct {
RangeID roachpb.RangeID
}

var _ errors.SafeFormatter = CantTransferLeaseViolatingPreferencesError{}

func (e CantTransferLeaseViolatingPreferencesError) Error() string { return fmt.Sprint(e) }

func (e CantTransferLeaseViolatingPreferencesError) Format(s fmt.State, verb rune) {
errors.FormatError(e, s, verb)
}

func (e CantTransferLeaseViolatingPreferencesError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("can't transfer r%d lease violating preferences, no suitable target", e.RangeID)
return nil
}

func (CantTransferLeaseViolatingPreferencesError) PurgatoryErrorMarker() {}
117 changes: 117 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
Expand Down Expand Up @@ -2487,3 +2488,119 @@ func TestReplicateQueueExpirationLeasesOnly(t *testing.T) {
return epochLeases > 0 && expLeases > 0 && expLeases <= initialExpLeases
}, 30*time.Second, 500*time.Millisecond)
}

// TestReplicateQueueLeasePreferencePurgatoryError tests that not finding a
// lease transfer target whilst violating lease preferences, will put the
// replica in the replicate queue purgatory.
func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

skip.UnderRace(t) // too slow under stressrace
skip.UnderDeadlock(t)
skip.UnderShort(t)

const initialPreferredNode = 1
const nextPreferredNode = 2
const numRanges = 40
const numNodes = 3

var blockTransferTarget atomic.Bool

blockTransferTargetFn := func() bool {
block := blockTransferTarget.Load()
return block
}

knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
AllocatorKnobs: &allocator.TestingKnobs{
BlockTransferTarget: blockTransferTargetFn,
},
},
}

serverArgs := make(map[int]base.TestServerArgs, numNodes)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: knobs,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "rack", Value: fmt.Sprintf("%d", i+1)}},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

db := tc.Conns[0]
setLeasePreferences := func(node int) {
_, err := db.Exec(fmt.Sprintf(`ALTER TABLE t CONFIGURE ZONE USING
num_replicas=3, num_voters=3, voter_constraints='[]', lease_preferences='[[+rack=%d]]'`,
node))
require.NoError(t, err)
}

leaseCount := func(node int) int {
var count int
err := db.QueryRow(fmt.Sprintf(
"SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node),
).Scan(&count)
require.NoError(t, err)
return count
}

checkLeaseCount := func(node, expectedLeaseCount int) error {
if count := leaseCount(node); count != expectedLeaseCount {
return errors.Errorf("expected %d leases on node %d, found %d",
expectedLeaseCount, node, count)
}
return nil
}

// Create a test table with numRanges-1 splits, to end up with numRanges
// ranges. We will use the test table ranges to assert on the purgatory lease
// preference behavior.
_, err := db.Exec("CREATE TABLE t (i int);")
require.NoError(t, err)
_, err = db.Exec(
fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1))
require.NoError(t, err)
_, err = db.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t;")
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

store := tc.GetFirstStoreFromServer(t, 0)
// Set a preference on the initial node, then wait until all the leases for
// the test table are on that node.
setLeasePreferences(initialPreferredNode)
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ForceReplicationScanAndProcess())
return checkLeaseCount(initialPreferredNode, numRanges)
})

// Block returning transfer targets from the allocator, then update the
// preferred node. We expect that every range for the test table will end up
// in purgatory on the initially preferred node.
blockTransferTarget.Store(true)
setLeasePreferences(nextPreferredNode)
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ForceReplicationScanAndProcess())
if purgLen := store.ReplicateQueuePurgatoryLength(); purgLen != numRanges {
return errors.Errorf("expected %d in purgatory but got %v", numRanges, purgLen)
}
return nil
})

// Lastly, unblock returning transfer targets. Expect that the leases from
// the test table all move to the new preference. Note we don't force a
// replication queue scan, as the purgatory retry should handle the
// transfers.
blockTransferTarget.Store(false)
testutils.SucceedsSoon(t, func() error {
return checkLeaseCount(nextPreferredNode, numRanges)
})
}

0 comments on commit fcf9888

Please sign in to comment.