Skip to content

Commit

Permalink
allocator: check io overload on lease transfer
Browse files Browse the repository at this point in the history
Previously, the allocator would return lease transfer targets without
considering the IO overload of stores involved. When leases would
transfer to the IO overloaded stores, service latency tended to degrade.

This commit adds IO overload checks prior to lease transfers. The IO
overload checks are similar to the IO overload checks for allocating
replicas in #97142.

The checks work by comparing a candidate store against
`kv.allocator.lease_io_overload_threshold` and the mean of other candidates.
If the candidate store is equal to or greater than both these values, it
is considered IO overloaded. The default value is 0.5.

The current leaseholder has to meet a higher bar to be considered IO
overloaded. It must have an IO overload score greater or equal to
`kv.allocator.lease_shed_io_overload_threshold`. The default value is
0.9.

The level of enforcement for IO overload is controlled by
`kv.allocator.lease_io_overload_threshold_enforcement` controls the
action taken when a candidate store for a lease transfer is IO overloaded.

- `ignore`: ignore IO overload scores entirely during lease transfers
  (effectively disabling this mechanism);
- `block_transfer_to`: lease transfers only consider stores that aren't
  IO overloaded (existing leases on IO overloaded stores are left as
  is);
- `shed`: actively shed leases from IO overloaded stores to less IO
  overloaded stores (this is a super-set of block_transfer_to).

The default is `block_transfer_to`.

This commit also updates the existing replica IO overload checks to be
prefixed with `Replica`, to avoid confusion between lease and replica
IO overload checks.

Resolves: #96508

Release note (ops change): Range leases will no longer be transferred to
stores which are IO overloaded.
  • Loading branch information
kvoli committed Feb 27, 2023
1 parent 7a3778b commit 3f1263d
Show file tree
Hide file tree
Showing 7 changed files with 563 additions and 258 deletions.
151 changes: 124 additions & 27 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,7 +1912,7 @@ func (a Allocator) RebalanceNonVoter(
// machinery to achieve range count convergence.
func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions {
return &RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
IOOverloadOptions: a.IOOverloadOptions(),
deterministic: a.deterministic,
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV),
}
Expand All @@ -1922,7 +1922,7 @@ func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions
func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerOptions {
return &ScatterScorerOptions{
RangeCountScorerOptions: RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
IOOverloadOptions: a.IOOverloadOptions(),
deterministic: a.deterministic,
rangeRebalanceThreshold: 0,
},
Expand All @@ -1946,6 +1946,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
// - It excludes replicas that may need snapshots. If replica calling this
// method is not the Raft leader (meaning that it doesn't know whether follower
// replicas need a snapshot or not), produces no results.
// - It excludes replicas that are on stores which are IO overloaded.
func (a *Allocator) ValidLeaseTargets(
ctx context.Context,
storePool storepool.AllocatorStorePool,
Expand Down Expand Up @@ -2029,9 +2030,96 @@ func (a *Allocator) ValidLeaseTargets(
candidates = preferred
}

// Filter the candidate list to only those stores which are not IO
// overloaded.
nonIOOverloadedPreferred := a.nonIOOverloadedLeaseTargets(
ctx,
storePool,
candidates,
leaseRepl.StoreID(),
a.IOOverloadOptions(),
)

return nonIOOverloadedPreferred
}

// nonIOOverloadedLeaseTargets returns a list of non IO overloaded lease
// replica targets and whether the leaseholder replica should be replaced,
// given the existing replicas, IO overload options and IO overload of
// existing replica stores.
func (a *Allocator) nonIOOverloadedLeaseTargets(
ctx context.Context,
storePool storepool.AllocatorStorePool,
existingReplicas []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
ioOverloadOptions IOOverloadOptions,
) (candidates []roachpb.ReplicaDescriptor) {
// We return early to avoid unnecessary work when IO overload is set to be
// ignored anyway.
if ioOverloadOptions.LeaseEnforcementLevel == IOOverloadThresholdIgnore {
return existingReplicas
}

sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

for _, replDesc := range existingReplicas {
store, ok := sl.FindStoreByID(replDesc.StoreID)
// If the replica is the current leaseholder, don't include it as a
// candidate and if it is filtered out of the store list due to being
// suspect; or the leaseholder store doesn't pass the leaseholder IO
// overload check.
//
// Note that the leaseholder store IO overload check is less strict than
// the transfer target check below. We don't want to shed leases at the
// same point a candidate becomes ineligible as it could lead to thrashing.
// Instead, we create a buffer between the two to avoid leases moving back
// and forth.
if (replDesc.StoreID == leaseStoreID) &&
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)) {
continue
}

// If the replica is not the leaseholder, don't include it as a candidate
// if it is filtered out similar to above, or the replica store doesn't
// pass the lease transfer IO overload check.
if replDesc.StoreID != leaseStoreID &&
(!ok || !ioOverloadOptions.transferLeaseToCheck(ctx, store, avgIOOverload)) {
continue
}

candidates = append(candidates, replDesc)
}

return candidates
}

// leaseholderShouldMoveDueToIOOverload returns true if the current leaseholder
// store is IO overloaded and there are other viable leaseholder stores.
func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
ctx context.Context,
storePool storepool.AllocatorStorePool,
existingReplicas []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
ioOverloadOptions IOOverloadOptions,
) bool {
sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

// Check the existing replicas for the leaseholder, if it doesn't meet the
// check return that the lease should be moved due to IO overload on the
// current leaseholder store. If the leaseholder is suspect or doesn't have a
// store descriptor ready, then we ignore it below and don't consider it IO
// overloaded.
for _, replDesc := range existingReplicas {
if store, ok := sl.FindStoreByID(replDesc.StoreID); ok && replDesc.StoreID == leaseStoreID {
return !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)
}
}

return false
}

// leaseholderShouldMoveDueToPreferences returns true if the current leaseholder
// is in violation of lease preferences _that can otherwise be satisfied_ by
// some existing replica.
Expand Down Expand Up @@ -2084,17 +2172,16 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
return true
}

// StoreHealthOptions returns the store health options, currently only
// considering the threshold for io overload. This threshold is not
// considered in allocation or rebalancing decisions (excluding candidate
// stores as targets) when enforcementLevel is set to storeHealthNoAction or
// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When
// there is a mixed version cluster, storeHealthNoAction is set instead.
func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions {
enforcementLevel := IOOverloadEnforcementLevel(IOOverloadThresholdEnforcement.Get(&a.st.SV))
return StoreHealthOptions{
EnforcementLevel: enforcementLevel,
IOOverloadThreshold: IOOverloadThreshold.Get(&a.st.SV),
// IOOverloadOptions returns the store IO overload options. It is used to
// filter and score candidates based on their level of IO overload and
// enforcement level.
func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
return IOOverloadOptions{
ReplicaEnforcementLevel: IOOverloadEnforcementLevel(ReplicaIOOverloadThresholdEnforcement.Get(&a.st.SV)),
LeaseEnforcementLevel: IOOverloadEnforcementLevel(LeaseIOOverloadThresholdEnforcement.Get(&a.st.SV)),
ReplicaIOOverloadThreshold: ReplicaIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadThreshold: LeaseIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadShedThreshold: LeaseIOOverloadShedThreshold.Get(&a.st.SV),
}
}

Expand Down Expand Up @@ -2129,10 +2216,11 @@ func (a *Allocator) TransferLeaseTarget(
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
excludeLeaseRepl := opts.ExcludeLeaseRepl
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) {
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) ||
a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) {
// Explicitly exclude the current leaseholder from the result set if it is
// in violation of lease preferences that can be satisfied by some other
// replica.
// replica or is IO overloaded.
excludeLeaseRepl = true
}

Expand All @@ -2148,9 +2236,10 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}

existing = a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts)
validTargets := a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) {
if len(validTargets) == 0 || (len(validTargets) == 1 && validTargets[0].StoreID == leaseRepl.StoreID()) {
log.KvDistribution.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID())
return roachpb.ReplicaDescriptor{}
}
Expand All @@ -2166,7 +2255,7 @@ func (a *Allocator) TransferLeaseTarget(
// falls back to `leaseCountConvergence`. Rationalize this or refactor this
// logic to be more clear.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, storePool, source, existing, usageInfo, nil, candidateLeasesMean,
ctx, storePool, source, validTargets, usageInfo, nil, candidateLeasesMean,
)
if !excludeLeaseRepl {
switch transferDec {
Expand All @@ -2176,7 +2265,7 @@ func (a *Allocator) TransferLeaseTarget(
}
fallthrough
case decideWithoutStats:
if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, storePool, sl, source, existing) {
if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, storePool, sl, source, validTargets) {
return roachpb.ReplicaDescriptor{}
}
case shouldTransfer:
Expand All @@ -2202,13 +2291,13 @@ func (a *Allocator) TransferLeaseTarget(
if !opts.CheckCandidateFullness {
a.randGen.Lock()
defer a.randGen.Unlock()
return existing[a.randGen.Intn(len(existing))]
return validTargets[a.randGen.Intn(len(validTargets))]
}

var bestOption roachpb.ReplicaDescriptor
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
candidates := make([]roachpb.ReplicaDescriptor, 0, len(validTargets))
bestOptionLeaseCount := int32(math.MaxInt32)
for _, repl := range existing {
for _, repl := range validTargets {
if leaseRepl.StoreID() == repl.StoreID {
continue
}
Expand All @@ -2225,7 +2314,7 @@ func (a *Allocator) TransferLeaseTarget(
}
}
if len(candidates) == 0 {
// If there were no existing replicas on stores with less-than-mean
// If there were no validTargets replicas on stores with less-than-mean
// leases, and we _must_ move the lease away (indicated by
// `opts.excludeLeaseRepl`), just return the best possible option.
if excludeLeaseRepl {
Expand All @@ -2239,8 +2328,8 @@ func (a *Allocator) TransferLeaseTarget(

case allocator.LoadConvergence:
leaseReplLoad := usageInfo.TransferImpact()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
candidates := make([]roachpb.StoreID, 0, len(validTargets)-1)
for _, repl := range validTargets {
if repl.StoreID != leaseRepl.StoreID() {
candidates = append(candidates, repl.StoreID)
}
Expand All @@ -2262,7 +2351,7 @@ func (a *Allocator) TransferLeaseTarget(
candidates,
storeDescMap,
&LoadScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
IOOverloadOptions: a.IOOverloadOptions(),
Deterministic: a.deterministic,
LoadDims: opts.LoadDimensions,
LoadThreshold: LoadThresholds(&a.st.SV, opts.LoadDimensions...),
Expand Down Expand Up @@ -2316,7 +2405,7 @@ func (a *Allocator) TransferLeaseTarget(
log.KvDistribution.Fatalf(ctx, "unknown declineReason: %v", noRebalanceReason)
}

for _, repl := range existing {
for _, repl := range validTargets {
if repl.StoreID == bestStore {
return repl
}
Expand Down Expand Up @@ -2861,3 +2950,11 @@ func maxReplicaID(replicas []roachpb.ReplicaDescriptor) roachpb.ReplicaID {
}
return max
}

func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
ret := make([]roachpb.StoreID, len(descs))
for i, desc := range descs {
ret[i] = desc.StoreID
}
return ret
}
Loading

0 comments on commit 3f1263d

Please sign in to comment.