Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocator: make disk capacity threshold a setting #97409

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,7 @@ func (a Allocator) RebalanceNonVoter(
func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions {
return &RangeCountScorerOptions{
IOOverloadOptions: a.IOOverloadOptions(),
DiskCapacityOptions: a.DiskOptions(),
deterministic: a.deterministic,
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV),
}
Expand All @@ -1923,6 +1924,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
return &ScatterScorerOptions{
RangeCountScorerOptions: RangeCountScorerOptions{
IOOverloadOptions: a.IOOverloadOptions(),
DiskCapacityOptions: a.DiskOptions(),
deterministic: a.deterministic,
rangeRebalanceThreshold: 0,
},
Expand Down Expand Up @@ -2172,6 +2174,13 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
return true
}

// DiskOptions returns the disk options. The disk options are used to determine
// whether a store has disk capacity for additional replicas; or whether the
// disk is over capacity and should shed replicas.
func (a *Allocator) DiskOptions() DiskCapacityOptions {
return makeDiskCapacityOptions(&a.st.SV)
}

// IOOverloadOptions returns the store IO overload options. It is used to
// filter and score candidates based on their level of IO overload and
// enforcement level.
Expand Down Expand Up @@ -2352,6 +2361,7 @@ func (a *Allocator) TransferLeaseTarget(
storeDescMap,
&LoadScorerOptions{
IOOverloadOptions: a.IOOverloadOptions(),
DiskOptions: a.DiskOptions(),
Deterministic: a.deterministic,
LoadDims: opts.LoadDimensions,
LoadThreshold: LoadThresholds(&a.st.SV, opts.LoadDimensions...),
Expand Down
199 changes: 157 additions & 42 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -50,14 +50,15 @@ const (
// https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf.
allocatorRandomCount = 2

// rebalanceToMaxFractionUsedThreshold: if the fraction used of a store
// descriptor capacity is greater than this value, it will never be used as a
// rebalance target. This is important for providing a buffer between fully
// healthy stores and full stores (as determined by
// allocator.MaxFractionUsedThreshold). Without such a buffer, replicas could
// hypothetically ping pong back and forth between two nodes, making one full
// and then the other.
rebalanceToMaxFractionUsedThreshold = 0.925
// defaultMaxDiskUtilizationThreshold is the default maximum threshold for
// disk utilization. The value is used as the default in the cluster setting
// maxDiskUtilizationThreshold.
defaultMaxDiskUtilizationThreshold = 0.95

// defaultMaxDiskUtilizationThreshold is the default maximum threshold for a
// rebalance target disk utilization. The value is used as the default in the
// cluster setting defaultMaxDiskUtilizationThreshold.
defaultRebalanceToMaxDiskUtilizationThreshold = 0.925

// minRangeRebalanceThreshold is the number of replicas by which a store
// must deviate from the mean number of replicas to be considered overfull
Expand Down Expand Up @@ -223,6 +224,61 @@ var LeaseIOOverloadThresholdEnforcement = settings.RegisterEnumSetting(
},
)

// maxDiskUtilizationThreshold controls the point at which the store cedes
// having room for new replicas. If the fraction used of a store descriptor
// capacity is greater than this value, it will never be used as a rebalance or
// allocate target and we will actively try to move replicas off of it.
var maxDiskUtilizationThreshold = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.max_disk_utilization_threshold",
"maximum disk utilization before a store will never be used as a rebalance "+
"or allocation target and will actively have replicas moved off of it; "+
"this should be set higher than "+
"`kv.allocator.rebalance_to_max_disk_utilization_threshold`",
defaultMaxDiskUtilizationThreshold,
func(f float64) error {
if f > 0.99 {
return errors.Errorf(
"Cannot set kv.allocator.max_disk_utilization_threshold " +
"greater than 0.99")
}
if f < 0.0 {
return errors.Errorf(
"Cannot set kv.allocator.max_disk_utilization_threshold less than 0")
}
return nil
},
)

// rebalanceToMaxDiskUtilizationThreshold: if the fraction used of a store
// descriptor capacity is greater than this value, it will never be used as a
// rebalance target. This is important for providing a buffer between fully
// healthy stores and full stores (as determined by
// allocator.MaxFractionUsedThreshold). Without such a buffer, replicas could
// hypothetically ping pong back and forth between two nodes, making one full
// and then the other.
var rebalanceToMaxDiskUtilizationThreshold = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.rebalance_to_max_disk_utilization_threshold",
"maximum disk utilization before a store will never be used as a rebalance "+
"target; this should be set lower than "+
"`kv.allocator.max_disk_utilization_threshold`",
defaultRebalanceToMaxDiskUtilizationThreshold,
func(f float64) error {
if f > 0.99 {
return errors.Errorf(
"Cannot set kv.allocator.rebalance_to_max_disk_utilization_threshold " +
"greater than 0.99")
}
if f < 0.0 {
return errors.Errorf(
"Cannot set kv.allocator.rebalance_to_max_disk_utilization_threshold " +
"less than 0")
}
return nil
},
)

// ScorerOptions defines the interface for the two heuristics that trigger
// replica rebalancing: range count convergence and QPS convergence.
type ScorerOptions interface {
Expand Down Expand Up @@ -278,6 +334,8 @@ type ScorerOptions interface {
// getIOOverloadOptions returns the scorer options for store IO overload. It
// is used to inform scoring based on the IO overload of a store.
getIOOverloadOptions() IOOverloadOptions
// getDiskOptions returns the scorer options for disk fullness.
getDiskOptions() DiskCapacityOptions
}

func jittered(val float64, jitter float64, rand allocatorRand) float64 {
Expand Down Expand Up @@ -326,6 +384,7 @@ func (o *ScatterScorerOptions) maybeJitterStoreStats(
// converging range counts across stores in the cluster.
type RangeCountScorerOptions struct {
IOOverloadOptions
DiskCapacityOptions
deterministic bool
rangeRebalanceThreshold float64
}
Expand All @@ -336,6 +395,10 @@ func (o *RangeCountScorerOptions) getIOOverloadOptions() IOOverloadOptions {
return o.IOOverloadOptions
}

func (o *RangeCountScorerOptions) getDiskOptions() DiskCapacityOptions {
return o.DiskCapacityOptions
}

func (o *RangeCountScorerOptions) maybeJitterStoreStats(
sl storepool.StoreList, _ allocatorRand,
) (perturbedSL storepool.StoreList) {
Expand Down Expand Up @@ -447,6 +510,7 @@ func (o *RangeCountScorerOptions) removalMaximallyConvergesScore(
// further the goal of converging QPS across stores in the cluster.
type LoadScorerOptions struct {
IOOverloadOptions IOOverloadOptions
DiskOptions DiskCapacityOptions
Deterministic bool
LoadDims []load.Dimension

Expand Down Expand Up @@ -486,6 +550,10 @@ func (o *LoadScorerOptions) getIOOverloadOptions() IOOverloadOptions {
return o.IOOverloadOptions
}

func (o *LoadScorerOptions) getDiskOptions() DiskCapacityOptions {
return o.DiskOptions
}

func (o *LoadScorerOptions) maybeJitterStoreStats(
sl storepool.StoreList, _ allocatorRand,
) storepool.StoreList {
Expand Down Expand Up @@ -613,6 +681,39 @@ func (o *LoadScorerOptions) removalMaximallyConvergesScore(
return 0
}

// DiskCapacityOptions is the scorer options for disk fullness. It is used to
// inform scoring based on the disk utilization of a store.
type DiskCapacityOptions struct {
RebalanceToThreshold float64
ShedAndBlockAllThreshold float64
}

func makeDiskCapacityOptions(sv *settings.Values) DiskCapacityOptions {
return DiskCapacityOptions{
RebalanceToThreshold: rebalanceToMaxDiskUtilizationThreshold.Get(sv),
ShedAndBlockAllThreshold: maxDiskUtilizationThreshold.Get(sv),
}
}

func defaultDiskCapacityOptions() DiskCapacityOptions {
return DiskCapacityOptions{
RebalanceToThreshold: defaultRebalanceToMaxDiskUtilizationThreshold,
ShedAndBlockAllThreshold: defaultMaxDiskUtilizationThreshold,
}
}

// MaxCapacityCheck returns true if the store has room for a new replica.
func (do DiskCapacityOptions) maxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < do.ShedAndBlockAllThreshold
}

// RebalanceToMaxCapacityCheck returns true if the store has enough room to
// accept a rebalance. The bar for this is stricter than for whether a store
// has enough room to accept a necessary replica (i.e. via AllocateCandidates).
func (do DiskCapacityOptions) rebalanceToMaxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < do.RebalanceToThreshold
}

// candidate store for allocation. These are ordered by importance.
type candidate struct {
store roachpb.StoreDescriptor
Expand Down Expand Up @@ -997,7 +1098,43 @@ func rankedCandidateListForAllocation(
var candidates candidateList
existingReplTargets := roachpb.MakeReplicaSet(existingReplicas).ReplicationTargets()
var nonVoterReplTargets []roachpb.ReplicationTarget

// Filter the list of candidateStores to only those which are valid. A valid
// store satisfies the constraints and does not have a full disk. It isn't
// fair to compare the stores which are invalid/full to the average range
// count of those which are valid/not-full.
validCandidateStores := []roachpb.StoreDescriptor{}
for _, s := range candidateStores.Stores {
if !options.getDiskOptions().maxCapacityCheck(s) ||
!options.getIOOverloadOptions().allocateReplicaToCheck(
ctx,
s,
candidateStores.CandidateIOOverloadScores.Mean,
) {
continue
}

constraintsOK, necessary := constraintsCheck(s)
if !constraintsOK {
if necessary {
log.KvDistribution.VEventf(
ctx,
3,
"cannot allocate necessary %s on s%d",
targetType,
s.StoreID,
)
}
continue
}
validCandidateStores = append(validCandidateStores, s)
}

// Create a new store list, which will update the average for each stat to
// only be the average value of valid candidates.
validStoreList := storepool.MakeStoreList(validCandidateStores)

for _, s := range validStoreList.Stores {
// Disregard all the stores that already have replicas.
if StoreHasReplica(s.StoreID, existingReplTargets) {
continue
Expand All @@ -1007,6 +1144,11 @@ func rankedCandidateListForAllocation(
if !allowMultipleReplsPerNode && nodeHasReplica(s.Node.NodeID, existingReplTargets) {
continue
}

// All invalid stores are filtered out above, before this loop, so
// constraintsOK should always be true.
constraintsOK, necessary := constraintsCheck(s)

if !isStoreValidForRoutineReplicaTransfer(ctx, s.StoreID) {
log.KvDistribution.VEventf(
ctx,
Expand All @@ -1017,29 +1159,9 @@ func rankedCandidateListForAllocation(
)
continue
}
constraintsOK, necessary := constraintsCheck(s)
if !constraintsOK {
if necessary {
log.KvDistribution.VEventf(
ctx,
3,
"cannot allocate necessary %s on s%d",
targetType,
s.StoreID,
)
}
continue
}

if !allocator.MaxCapacityCheck(s) || !options.getIOOverloadOptions().allocateReplicaToCheck(
ctx,
s,
candidateStores.CandidateIOOverloadScores.Mean,
) {
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := options.balanceScore(candidateStores, s.Capacity)
balanceScore := options.balanceScore(validStoreList, s.Capacity)
var hasNonVoter bool
if targetType == VoterTarget {
if nonVoterReplTargets == nil {
Expand All @@ -1049,8 +1171,8 @@ func rankedCandidateListForAllocation(
}
candidates = append(candidates, candidate{
store: s,
valid: constraintsOK,
necessary: necessary,
valid: constraintsOK,
diversityScore: diversityScore,
balanceScore: balanceScore,
hasNonVoter: hasNonVoter,
Expand Down Expand Up @@ -1100,7 +1222,7 @@ func candidateListForRemoval(
store: s,
valid: constraintsOK,
necessary: necessary,
fullDisk: !allocator.MaxCapacityCheck(s),
fullDisk: !options.getDiskOptions().maxCapacityCheck(s),
// When removing a replica from a store, we do not want to include
// IO overloaded in ranking stores. This would submit already
// overloaded amplification stores to additional load of moving a
Expand Down Expand Up @@ -1397,7 +1519,7 @@ func rankedCandidateListForRebalancing(
continue
}
valid, necessary := removalConstraintsChecker(store)
fullDisk := !allocator.MaxCapacityCheck(store)
fullDisk := !options.getDiskOptions().maxCapacityCheck(store)

if !valid {
if !needRebalanceFrom {
Expand Down Expand Up @@ -1516,7 +1638,7 @@ func rankedCandidateListForRebalancing(
store: store,
valid: constraintsOK,
necessary: necessary,
fullDisk: !allocator.MaxCapacityCheck(store),
fullDisk: !options.getDiskOptions().maxCapacityCheck(store),
diversityScore: diversityScore,
}
if !cand.less(existing) {
Expand Down Expand Up @@ -1615,7 +1737,7 @@ func rankedCandidateListForRebalancing(
// rebalance candidates.
s := cand.store
candIOOverloadScore, _ := s.Capacity.IOThreshold.Score()
cand.fullDisk = !rebalanceToMaxCapacityCheck(s)
cand.fullDisk = !options.getDiskOptions().rebalanceToMaxCapacityCheck(s)
cand.ioOverloadScore = candIOOverloadScore
cand.ioOverloaded = !options.getIOOverloadOptions().rebalanceReplicaToCheck(
ctx,
Expand Down Expand Up @@ -2345,13 +2467,6 @@ func (o IOOverloadOptions) existingLeaseCheck(
return true
}

// rebalanceToMaxCapacityCheck returns true if the store has enough room to
// accept a rebalance. The bar for this is stricter than for whether a store
// has enough room to accept a necessary replica (i.e. via AllocateCandidates).
func rebalanceToMaxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < rebalanceToMaxFractionUsedThreshold
}

func scoresAlmostEqual(score1, score2 float64) bool {
return math.Abs(score1-score2) < epsilon
}
Expand Down
Loading