Skip to content

Commit

Permalink
allocator/plan: place leases violating preferences in purgatory
Browse files Browse the repository at this point in the history
This patch places replicas in the replicate queue purgatory when
it has a lease violating the lease preferences and it's unable to find a
suitable target. This causes the replica to be retried more often.

This will only trigger when replicas are eagerly enqueued (typically
when we acquire a new lease that violates preferences), since we
otherwise don't attempt to enqueue replicas when they don't have a valid
lease transfer target.

This patch also enables requeuing replicas after a successful rebalance,
when the lease violates preferences.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
kvoli and erikgrinaker committed Jul 25, 2023
1 parent ccd257d commit 27ec04c
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 11 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,11 @@ func (a *Allocator) TransferLeaseTarget(
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
if a.knobs != nil {
if blockFn := a.knobs.BlockTransferTarget; blockFn != nil && blockFn() {
return roachpb.ReplicaDescriptor{}
}
}
excludeLeaseRepl := opts.ExcludeLeaseRepl
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) ||
a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type TestingKnobs struct {
Desc() *roachpb.RangeDescriptor
StoreID() roachpb.StoreID
}) *raft.Status
// BlockTransferTarget can be used to block returning any transfer targets
// from TransferLeaseTarget.
BlockTransferTarget func() bool
}

// QPSRebalanceThreshold is much like rangeRebalanceThreshold, but for
Expand Down
55 changes: 48 additions & 7 deletions pkg/kv/kvserver/allocator/plan/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,8 @@ func (rp ReplicaPlanner) considerRebalance(
if !canTransferLeaseFrom(ctx, repl) {
return nil, stats, nil
}
return rp.shedLeaseTarget(
var err error
op, err = rp.shedLeaseTarget(
ctx,
repl,
desc,
Expand All @@ -878,8 +879,11 @@ func (rp ReplicaPlanner) considerRebalance(
ExcludeLeaseRepl: false,
CheckCandidateFullness: true,
},
), stats, nil

)
if err != nil {
return nil, stats, err
}
return op, stats, nil
}

// If we have a valid rebalance action (ok == true) and we haven't
Expand Down Expand Up @@ -925,22 +929,36 @@ func (rp ReplicaPlanner) shedLeaseTarget(
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
opts allocator.TransferLeaseOptions,
) (op AllocationOp) {
) (op AllocationOp, _ error) {
usage := repl.RangeUsageInfo()
existingVoters := desc.Replicas().VoterDescriptors()
// Learner replicas aren't allowed to become the leaseholder or raft leader,
// so only consider the `VoterDescriptors` replicas.
target := rp.allocator.TransferLeaseTarget(
ctx,
rp.storePool,
conf,
desc.Replicas().VoterDescriptors(),
existingVoters,
repl,
usage,
false, /* forceDecisionWithoutStats */
opts,
)
if target == (roachpb.ReplicaDescriptor{}) {
return nil
// If we don't find a suitable target, but we own a lease violating the
// lease preferences, and there is a more suitable target, return an error
// to place the replica in purgatory and retry sooner. This typically
// happens when we've just acquired a violating lease and we eagerly
// enqueue the replica before we've received Raft leadership, which
// prevents us from finding appropriate lease targets since we can't
// determine if any are behind.
liveVoters, _ := rp.storePool.LiveAndDeadReplicas(
existingVoters, false /* includeSuspectAndDrainingStores */)
preferred := rp.allocator.PreferredLeaseholders(rp.storePool, conf, liveVoters)
if len(preferred) > 0 && repl.LeaseViolatesPreferences(ctx) {
return nil, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID}
}
return nil, nil
}

op = AllocationTransferLeaseOp{
Expand All @@ -949,7 +967,7 @@ func (rp ReplicaPlanner) shedLeaseTarget(
Usage: usage,
bypassSafetyChecks: false,
}
return op
return op, nil
}

// maybeTransferLeaseAwayTarget is called whenever a replica on a given store
Expand Down Expand Up @@ -1015,3 +1033,26 @@ func (rp ReplicaPlanner) maybeTransferLeaseAwayTarget(

return op, nil
}

// CantTransferLeaseViolatingPreferencesError is an error returned when a lease
// violates the lease preferences, but we couldn't find a valid target to
// transfer the lease to. It indicates that the replica should be sent to
// purgatory, to retry the transfer faster.
type CantTransferLeaseViolatingPreferencesError struct {
RangeID roachpb.RangeID
}

var _ errors.SafeFormatter = CantTransferLeaseViolatingPreferencesError{}

func (e CantTransferLeaseViolatingPreferencesError) Error() string { return fmt.Sprint(e) }

func (e CantTransferLeaseViolatingPreferencesError) Format(s fmt.State, verb rune) {
errors.FormatError(e, s, verb)
}

func (e CantTransferLeaseViolatingPreferencesError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("can't transfer r%d lease violating preferences, no suitable target", e.RangeID)
return nil
}

func (CantTransferLeaseViolatingPreferencesError) PurgatoryErrorMarker() {}
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,15 +813,17 @@ func ShouldRequeue(ctx context.Context, change plan.ReplicateChange) bool {
// time around.
requeue = false

} else if change.Action == allocatorimpl.AllocatorConsiderRebalance {
// Don't requeue after a successful rebalance operation.
requeue = false

} else if change.Op.LHBeingRemoved() {
// Don't requeue if the leaseholder was removed as a voter or the range
// lease was transferred away.
requeue = false

} else if change.Action == allocatorimpl.AllocatorConsiderRebalance &&
!change.Replica.LeaseViolatesPreferences(ctx) {
// Don't requeue after a successful rebalance operation, when the lease
// does not violate any preferences.
requeue = false

} else {
// Otherwise, requeue to see if there is more work to do. As the
// operation succeeded and was planned for a repair action i.e. not
Expand Down
117 changes: 117 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -2496,3 +2497,119 @@ func TestReplicateQueueExpirationLeasesOnly(t *testing.T) {
return epochLeases > 0 && expLeases > 0 && expLeases <= initialExpLeases
}, 30*time.Second, 500*time.Millisecond)
}

// TestReplicateQueueLeasePreferencePurgatoryError tests that not finding a
// lease transfer target whilst violating lease preferences, will put the
// replica in the replicate queue purgatory.
func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

skip.UnderRace(t) // too slow under stressrace
skip.UnderDeadlock(t)
skip.UnderShort(t)

const initialPreferredNode = 1
const nextPreferredNode = 2
const numRanges = 40
const numNodes = 3

var blockTransferTarget atomic.Bool

blockTransferTargetFn := func() bool {
block := blockTransferTarget.Load()
return block
}

knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
AllocatorKnobs: &allocator.TestingKnobs{
BlockTransferTarget: blockTransferTargetFn,
},
},
}

serverArgs := make(map[int]base.TestServerArgs, numNodes)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: knobs,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "rack", Value: fmt.Sprintf("%d", i+1)}},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

db := tc.Conns[0]
setLeasePreferences := func(node int) {
_, err := db.Exec(fmt.Sprintf(`ALTER TABLE t CONFIGURE ZONE USING
num_replicas=3, num_voters=3, voter_constraints='[]', lease_preferences='[[+rack=%d]]'`,
node))
require.NoError(t, err)
}

leaseCount := func(node int) int {
var count int
err := db.QueryRow(fmt.Sprintf(
"SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node),
).Scan(&count)
require.NoError(t, err)
return count
}

checkLeaseCount := func(node, expectedLeaseCount int) error {
if count := leaseCount(node); count != expectedLeaseCount {
return errors.Errorf("expected %d leases on node %d, found %d",
expectedLeaseCount, node, count)
}
return nil
}

// Create a test table with numRanges-1 splits, to end up with numRanges
// ranges. We will use the test table ranges to assert on the purgatory lease
// preference behavior.
_, err := db.Exec("CREATE TABLE t (i int);")
require.NoError(t, err)
_, err = db.Exec(
fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1))
require.NoError(t, err)
_, err = db.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t;")
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

store := tc.GetFirstStoreFromServer(t, 0)
// Set a preference on the initial node, then wait until all the leases for
// the test table are on that node.
setLeasePreferences(initialPreferredNode)
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ForceReplicationScanAndProcess())
return checkLeaseCount(initialPreferredNode, numRanges)
})

// Block returning transfer targets from the allocator, then update the
// preferred node. We expect that every range for the test table will end up
// in purgatory on the initially preferred node.
blockTransferTarget.Store(true)
setLeasePreferences(nextPreferredNode)
testutils.SucceedsSoon(t, func() error {
require.NoError(t, store.ForceReplicationScanAndProcess())
if purgLen := store.ReplicateQueuePurgatoryLength(); purgLen != numRanges {
return errors.Errorf("expected %d in purgatory but got %v", numRanges, purgLen)
}
return nil
})

// Lastly, unblock returning transfer targets. Expect that the leases from
// the test table all move to the new preference. Note we don't force a
// replication queue scan, as the purgatory retry should handle the
// transfers.
blockTransferTarget.Store(false)
testutils.SucceedsSoon(t, func() error {
return checkLeaseCount(nextPreferredNode, numRanges)
})
}

0 comments on commit 27ec04c

Please sign in to comment.