Skip to content

Commit

Permalink
kvserver: sensitize AdminScatter to force replica movement
Browse files Browse the repository at this point in the history
This patch makes it such that `AdminScatter` now triggers a ~mostly random
rebalance action. This has always been the contract that most of its callers
(especially the `SSTBatcher`) assumed.

Previously, calling `AdminScatter` would simply enqueue that range into the
`replicateQueue`. The `replicateQueue` only cares about reconciling
replica-count differences between stores in the cluster if there are stores
that are more than 5% away from the mean. If all candidate stores were within
5% of the mean, then calling `AdminScatter` wouldn't do anything.

Now, `AdminScatter` still enqueues the range into the `replicateQueue` but with
an option to force it to:
1. Ignore the 5% padding provided by `kv.allocator.range_rebalance_threshold`.
2. Add some jitter to the existing replica-counts of all candidate stores.

This means that `AdminScatter` now forces mostly randomized rebalances to
stores that are reasonable targets (i.e. we still won't rebalance to stores
that are too far above the mean in terms of replica-count, or stores that don't
meet the constraints placed on the range, etc).

Release note (performance improvement): IMPORTs and index backfills
should now do a better job of spreading their load out over the nodes in
the cluster.
  • Loading branch information
aayushshah15 committed Feb 8, 2022
1 parent 832d394 commit 73734cb
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 36 deletions.
97 changes: 78 additions & 19 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ func rangeUsageInfoForRepl(repl *Replica) RangeUsageInfo {
type Allocator struct {
storePool *StorePool
nodeLatencyFn func(addr string) (time.Duration, bool)
randGen allocatorRand

knobs *AllocatorTestingKnobs
// TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source
// wrapped inside a mutex, to avoid misuse.
randGen allocatorRand
knobs *AllocatorTestingKnobs
}

// MakeAllocator creates a new allocator using the specified StorePool.
Expand Down Expand Up @@ -863,15 +864,21 @@ func (a Allocator) simulateRemoveTarget(
candidates []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
sl StoreList,
rangeUsageInfo RangeUsageInfo,
targetType targetReplicaType,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
candidateStores := make([]roachpb.StoreDescriptor, 0, len(candidates))
for _, cand := range candidates {
for _, store := range sl.stores {
if cand.StoreID == store.StoreID {
candidateStores = append(candidateStores, store)
}
}
}

// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
// but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue
// (with the main exceptions being Scatter and the status server's allocator debug endpoint).
// Try to make this interfere less with other callers.
switch t := targetType; t {
case voterTarget:
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER)
Expand All @@ -882,7 +889,11 @@ func (a Allocator) simulateRemoveTarget(
)
log.VEventf(ctx, 3, "simulating which voter would be removed after adding s%d",
targetStore)
return a.RemoveVoter(ctx, conf, candidates, existingVoters, existingNonVoters, options)

return a.removeTarget(
ctx, conf, makeStoreList(candidateStores),
existingVoters, existingNonVoters, voterTarget, options,
)
case nonVoterTarget:
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER)
defer a.storePool.updateLocalStoreAfterRebalance(
Expand All @@ -892,33 +903,43 @@ func (a Allocator) simulateRemoveTarget(
)
log.VEventf(ctx, 3, "simulating which non-voter would be removed after adding s%d",
targetStore)
return a.RemoveNonVoter(ctx, conf, candidates, existingVoters, existingNonVoters, options)
return a.removeTarget(
ctx, conf, makeStoreList(candidateStores),
existingVoters, existingNonVoters, nonVoterTarget, options,
)
default:
panic(fmt.Sprintf("unknown targetReplicaType: %s", t))
}
}

func (a Allocator) storeListForCandidates(candidates []roachpb.ReplicationTarget) StoreList {
result := make([]roachpb.StoreDescriptor, 0, len(candidates))
sl, _, _ := a.storePool.getStoreList(storeFilterNone)
for _, cand := range candidates {
for _, store := range sl.stores {
if cand.StoreID == store.StoreID {
result = append(result, store)
}
}
}
return makeStoreList(result)
}

func (a Allocator) removeTarget(
ctx context.Context,
conf roachpb.SpanConfig,
candidates []roachpb.ReplicationTarget,
candidateStoreList StoreList,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
targetType targetReplicaType,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
if len(candidates) == 0 {
if len(candidateStoreList.stores) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one" +
" candidate replica to allocator.removeTarget()")
}

existingReplicas := append(existingVoters, existingNonVoters...)
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
candidateStoreIDs[i] = exist.StoreID
}
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)
analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
existingReplicas, conf.NumReplicas, conf.Constraints)
analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
Expand Down Expand Up @@ -978,10 +999,17 @@ func (a Allocator) RemoveVoter(
existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(voterCandidates))
for i, exist := range voterCandidates {
candidateStoreIDs[i] = exist.StoreID
}
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

return a.removeTarget(
ctx,
conf,
roachpb.MakeReplicaSet(voterCandidates).ReplicationTargets(),
candidateStoreList,
existingVoters,
existingNonVoters,
voterTarget,
Expand All @@ -1002,10 +1030,17 @@ func (a Allocator) RemoveNonVoter(
existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(nonVoterCandidates))
for i, exist := range nonVoterCandidates {
candidateStoreIDs[i] = exist.StoreID
}
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

return a.removeTarget(
ctx,
conf,
roachpb.MakeReplicaSet(nonVoterCandidates).ReplicationTargets(),
candidateStoreList,
existingVoters,
existingNonVoters,
nonVoterTarget,
Expand All @@ -1024,6 +1059,13 @@ func (a Allocator) rebalanceTarget(
options scorerOptions,
) (add, remove roachpb.ReplicationTarget, details string, ok bool) {
sl, _, _ := a.storePool.getStoreList(filter)

// If we're considering a rebalance due to an `AdminScatterRequest`, we'd like
// to ensure that we're returning a random rebalance target to a new store
// that's a reasonable fit for an existing replica. So we might jitter the
// existing stats on the stores inside `sl`.
sl = options.maybeJitterStoreStats(sl, a.randGen)

existingReplicas := append(existingVoters, existingNonVoters...)

zero := roachpb.ReplicationTarget{}
Expand Down Expand Up @@ -1125,6 +1167,7 @@ func (a Allocator) rebalanceTarget(
replicaCandidates,
existingPlusOneNew,
otherReplicaSet,
sl,
rangeUsageInfo,
targetType,
options,
Expand Down Expand Up @@ -1253,6 +1296,22 @@ func (a *Allocator) scorerOptions() *rangeCountScorerOptions {
}
}

func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions {
return &scatterScorerOptions{
rangeCountScorerOptions: rangeCountScorerOptions{
deterministic: a.storePool.deterministic,
rangeRebalanceThreshold: 0,
},
// We set jitter to be equal to the padding around replica-count rebalancing
// because we'd like to make it such that rebalances made due to an
// `AdminScatter` are roughly in line (but more random than) the rebalances
// made by the replicateQueue during normal course of operations. In other
// words, we don't want stores that are too far away from the mean to be
// affected by the jitter.
jitter: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
}
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica
// unless asked to do otherwise by the checkTransferLeaseSource parameter.
Expand Down
57 changes: 54 additions & 3 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ var rangeRebalanceThreshold = func() *settings.FloatSetting {
return s
}()

// CockroachDB's has two heuristics that trigger replica rebalancing: range
// count convergence and QPS convergence. scorerOptions defines the interface
// that both of these heuristics must implement.
// CockroachDB has two heuristics that trigger replica rebalancing: range count
// convergence and QPS convergence. scorerOptions defines the interface that
// both of these heuristics must implement.
type scorerOptions interface {
// maybeJitterStoreStats returns a `StoreList` that's identical to the
// parameter `sl`, but may have jittered stats on the stores.
//
// This is to ensure that, when scattering via `AdminScatterRequest`, we will
// be more likely to find a rebalance opportunity.
maybeJitterStoreStats(sl StoreList, allocRand allocatorRand) StoreList
// deterministic is set by tests to have the allocator methods sort their
// results by constraints score as well as by store IDs, as opposed to just
// the score.
Expand Down Expand Up @@ -134,6 +140,41 @@ type scorerOptions interface {
removalMaximallyConvergesScore(removalCandStoreList StoreList, existing roachpb.StoreDescriptor) int
}

func jittered(val float64, jitter float64, rand allocatorRand) float64 {
rand.Lock()
defer rand.Unlock()
result := val * jitter * (rand.Float64())
if rand.Int31()%2 == 0 {
result *= -1
}

return result
}

// scatterScorerOptions is used by the replicateQueue when called via the
// `AdminScatterRequest`. It is like `rangeCountScorerOptions` but with the
// rangeRebalanceThreshold set to zero (i.e. with all padding disabled). It also
// perturbs the stats on existing stores to add a bit of random jitter.
type scatterScorerOptions struct {
rangeCountScorerOptions
// jitter specifies the degree to which we will perturb existing store stats.
jitter float64
}

func (o *scatterScorerOptions) maybeJitterStoreStats(
sl StoreList, allocRand allocatorRand,
) (perturbedSL StoreList) {
perturbedStoreDescs := make([]roachpb.StoreDescriptor, 0, len(sl.stores))
for _, store := range sl.stores {
store.Capacity.RangeCount += int32(jittered(
float64(store.Capacity.RangeCount), o.jitter, allocRand,
))
perturbedStoreDescs = append(perturbedStoreDescs, store)
}

return makeStoreList(perturbedStoreDescs)
}

// rangeCountScorerOptions is used by the replicateQueue to tell the Allocator's
// rebalancing machinery to base its balance/convergence scores on range counts.
// This means that the resulting rebalancing decisions will further the goal of
Expand All @@ -143,6 +184,12 @@ type rangeCountScorerOptions struct {
rangeRebalanceThreshold float64
}

func (o *rangeCountScorerOptions) maybeJitterStoreStats(
sl StoreList, _ allocatorRand,
) (perturbedSL StoreList) {
return sl
}

func (o *rangeCountScorerOptions) deterministicForTesting() bool {
return o.deterministic
}
Expand Down Expand Up @@ -267,6 +314,10 @@ type qpsScorerOptions struct {
qpsPerReplica float64
}

func (o *qpsScorerOptions) maybeJitterStoreStats(sl StoreList, _ allocatorRand) StoreList {
return sl
}

func (o *qpsScorerOptions) deterministicForTesting() bool {
return o.deterministic
}
Expand Down
79 changes: 77 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,8 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) {
nil,
rangeUsageInfo,
storeFilterThrottled,
a.scorerOptions())
a.scorerOptions(),
)
var resultID roachpb.StoreID
if ok {
resultID = result.StoreID
Expand Down Expand Up @@ -4075,7 +4076,8 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) {
defer stopper.Stop(ctx)
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(test.stores, t)
add, remove, _, ok := a.RebalanceNonVoter(ctx,
add, remove, _, ok := a.RebalanceNonVoter(
ctx,
test.conf,
nil,
test.existingVoters,
Expand Down Expand Up @@ -7206,6 +7208,79 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) {
}
}

// TestAllocatorRebalanceWithScatter tests that when `scatter` is set to true,
// the allocator will produce rebalance opportunities even when it normally
// wouldn't.
func TestAllocatorRebalanceWithScatter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper, g, _, a, _ := createTestAllocator(ctx, 10 /* numNodes */, true /* deterministic */)
defer stopper.Stop(ctx)

stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{
NodeID: 1,
},
Capacity: roachpb.StoreCapacity{
RangeCount: 1000,
},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{
NodeID: 2,
},
Capacity: roachpb.StoreCapacity{
RangeCount: 1000,
},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{
NodeID: 3,
},
Capacity: roachpb.StoreCapacity{
RangeCount: 1000,
},
},
}

gossiputil.NewStoreGossiper(g).GossipStores(stores, t)

var rangeUsageInfo RangeUsageInfo

// Ensure that we wouldn't normally rebalance when all stores have the same
// replica count.
_, _, _, ok := a.RebalanceVoter(
ctx,
emptySpanConfig(),
nil,
replicas(1),
nil,
rangeUsageInfo,
storeFilterThrottled,
a.scorerOptions(),
)
require.False(t, ok)

// Ensure that we would produce a rebalance target when running with scatter.
_, _, _, ok = a.RebalanceVoter(
ctx,
emptySpanConfig(),
nil,
replicas(1),
nil,
rangeUsageInfo,
storeFilterThrottled,
a.scorerOptionsForScatter(),
)
require.True(t, ok)
}

// TestAllocatorRebalanceAway verifies that when a replica is on a node with a
// bad span config, the replica will be rebalanced off of it.
func TestAllocatorRebalanceAway(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2976,7 +2976,7 @@ func (s *Store) relocateOne(
targetStore, _, err := s.allocator.removeTarget(
ctx,
conf,
args.targetsToRemove(),
s.allocator.storeListForCandidates(args.targetsToRemove()),
existingVoters,
existingNonVoters,
args.targetType,
Expand Down Expand Up @@ -3191,7 +3191,9 @@ func (r *Replica) adminScatter(
var allowLeaseTransfer bool
canTransferLease := func(ctx context.Context, repl *Replica) bool { return allowLeaseTransfer }
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
requeue, err := rq.processOneChange(ctx, r, canTransferLease, false /* dryRun */)
requeue, err := rq.processOneChange(
ctx, r, canTransferLease, true /* scatter */, false, /* dryRun */
)
if err != nil {
// TODO(tbg): can this use IsRetriableReplicationError?
if isSnapshotError(err) {
Expand Down
Loading

0 comments on commit 73734cb

Please sign in to comment.