diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 089ecd82fe43..f166f20bb2de 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -1912,7 +1912,7 @@ func (a Allocator) RebalanceNonVoter( // machinery to achieve range count convergence. func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions { return &RangeCountScorerOptions{ - StoreHealthOptions: a.StoreHealthOptions(ctx), + StoreHealthOptions: a.StoreHealthOptions(), deterministic: a.deterministic, rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV), } @@ -1922,7 +1922,7 @@ func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerOptions { return &ScatterScorerOptions{ RangeCountScorerOptions: RangeCountScorerOptions{ - StoreHealthOptions: a.StoreHealthOptions(ctx), + StoreHealthOptions: a.StoreHealthOptions(), deterministic: a.deterministic, rangeRebalanceThreshold: 0, }, @@ -2032,6 +2032,51 @@ func (a *Allocator) ValidLeaseTargets( return candidates } +// healthyLeaseTargets returns a list of healthy replica targets and whether +// the leaseholder replica should be replaced, given the existing replicas, +// store health options and IO overload of existing replica stores. +func (a *Allocator) healthyLeaseTargets( + ctx context.Context, + storePool storepool.AllocatorStorePool, + existingReplicas []roachpb.ReplicaDescriptor, + leaseStoreID roachpb.StoreID, + storeHealthOpts StoreHealthOptions, +) (healthyTargets []roachpb.ReplicaDescriptor, excludeLeaseRepl bool) { + sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect) + avgIOOverload := sl.CandidateIOOverloadScores.Mean + + for _, replDesc := range existingReplicas { + store, ok := sl.FindStoreByID(replDesc.StoreID) + // If the replica is the current leaseholder, don't include it as a + // candidate and additionally excludeLeaseRepl if it is filtered out of the + // store list due to being suspect; or the leaseholder store doesn't pass + // the leaseholder store health check. + // + // Note that the leaseholder store health check is less strict than the + // transfer target check below. We don't want to shed leases at the same + // point a candidate becomes ineligible as it could lead to thrashing. + // Instead, we create a buffer between the two to avoid leases moving back + // and forth. + if (replDesc.StoreID == leaseStoreID) && + (!ok || !storeHealthOpts.leaseStoreIsHealthy(ctx, store, avgIOOverload)) { + excludeLeaseRepl = true + continue + } + + // If the replica is not the leaseholder, don't include it as a candidate + // if it is filtered out similar to above, or the replica store doesn't + // pass the lease transfer store health check. + if replDesc.StoreID != leaseStoreID && + (!ok || !storeHealthOpts.transferToStoreIsHealthy(ctx, store, avgIOOverload)) { + continue + } + + healthyTargets = append(healthyTargets, replDesc) + } + + return healthyTargets, excludeLeaseRepl +} + // leaseholderShouldMoveDueToPreferences returns true if the current leaseholder // is in violation of lease preferences _that can otherwise be satisfied_ by // some existing replica. @@ -2088,13 +2133,14 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // considering the threshold for io overload. This threshold is not // considered in allocation or rebalancing decisions (excluding candidate // stores as targets) when enforcementLevel is set to storeHealthNoAction or -// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When -// there is a mixed version cluster, storeHealthNoAction is set instead. -func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions { - enforcementLevel := IOOverloadEnforcementLevel(IOOverloadThresholdEnforcement.Get(&a.st.SV)) +// storeHealthLogOnly. By default block_rebalance_to is the enforcement level +// for replica allocation and shed for lease transfers. +func (a *Allocator) StoreHealthOptions() StoreHealthOptions { return StoreHealthOptions{ - EnforcementLevel: enforcementLevel, - IOOverloadThreshold: IOOverloadThreshold.Get(&a.st.SV), + EnforcementLevel: IOOverloadEnforcementLevel(IOOverloadThresholdEnforcement.Get(&a.st.SV)), + IOOverloadThreshold: IOOverloadThreshold.Get(&a.st.SV), + LeaseEnforcementLevel: LeaseIOOverloadEnforcementLevel(LeaseIOOverloadThresholdEnforcement.Get(&a.st.SV)), + LeaseIOOverloadThresholdShedBuffer: ShedIOOverloadThresholdBuffer.Get(&a.st.SV), } } @@ -2148,7 +2194,10 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } + var excludeLeaseReplForHealth bool existing = a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts) + existing, excludeLeaseReplForHealth = a.healthyLeaseTargets(ctx, storePool, existing, leaseRepl.StoreID(), a.StoreHealthOptions()) + excludeLeaseRepl = excludeLeaseRepl || excludeLeaseReplForHealth // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { log.KvDistribution.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID()) @@ -2262,7 +2311,7 @@ func (a *Allocator) TransferLeaseTarget( candidates, storeDescMap, &LoadScorerOptions{ - StoreHealthOptions: a.StoreHealthOptions(ctx), + StoreHealthOptions: a.StoreHealthOptions(), Deterministic: a.deterministic, LoadDims: opts.LoadDimensions, LoadThreshold: LoadThresholds(&a.st.SV, opts.LoadDimensions...), @@ -2861,3 +2910,11 @@ func maxReplicaID(replicas []roachpb.ReplicaDescriptor) roachpb.ReplicaID { } return max } + +func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID { + ret := make([]roachpb.StoreID, len(descs)) + for i, desc := range descs { + ret[i] = desc.StoreID + } + return ret +} diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index f4ea22c5285a..819817fae9f6 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -76,6 +76,11 @@ const ( // conjunction with IOOverloadMeanThreshold below. DefaultIOOverloadThreshold = 0.5 + // DefaultIOOverloadShedBuffer is used to shed leases from stores with an IO + // overload score greater than the threshold + buffer. This is typically used + // in conjunction with IOOverloadMeanThreshold below. + DefaultIOOverloadShedBuffer = 0.4 + // IOOverloadMeanThreshold is the percentage above the mean after which a // store could be conisdered unhealthy if also exceeding the threshold. IOOverloadMeanThreshold = 1.1 @@ -95,7 +100,7 @@ const ( // as targets for any action regardless of the store IO overload. IOOverloadThresholdNoAction IOOverloadEnforcementLevel = iota // IOOverloadThresholdLogOnly will not exclude stores from being considered - // as targets for any action regarldess of the store IO overload. When a + // as targets for any action regardless of the store IO overload. When a // store exceeds IOOverloadThreshold, an event is logged. IOOverloadThresholdLogOnly // IOOverloadThresholdBlockRebalanceTo excludes stores from being being @@ -167,6 +172,69 @@ var IOOverloadThresholdEnforcement = settings.RegisterEnumSetting( }, ) +// LeaseIOOverloadEnforcementLevel represents the level of action that may be +// taken or excluded when a lease candidate disk is considered unhealthy. +type LeaseIOOverloadEnforcementLevel int64 + +const ( + // LeaseIOOverloadThresholdBlock will not exclude stores from being + // considered as leaseholder targets for a range, regardless of the store IO + // overload. + LeaseIOOverloadThresholdNoAction LeaseIOOverloadEnforcementLevel = iota + // LeaseIOOverloadThresholdBlock will not exclude stores from being + // considered as leaseholder targets for a range, regardless of the store IO + // overload. When a store exceeds IOOverloadThreshold an event is logged. + LeaseIOOverloadThresholdNoActionLogOnly + // LeaseIOOverloadThresholdBlock excludes stores from being considered as + // leaseholder targets for a range if they exceed (a) + // kv.allocator.io_overload_threshold and (b) the mean IO overload among + // possible candidates. The current leaseholder store will NOT be excluded as + // a candidate for its current range leases. + LeaseIOOverloadThresholdBlock + // LeaseIOOverloadThresholdShed has the same behavior as block, however the + // current leaseholder store WILL BE excluded as a candidate for its current + // range leases i.e. The lease will always transfer to a healthy and valid + // store if one exists. + LeaseIOOverloadThresholdShed +) + +// ShedIOOverloadThresholdBuffer added to IOOverloadThreshold is the maximum IO +// overload score the current leaseholder store for a range may have before +// considered unhealthy. If unhealthy and LeaseIOOverloadThresholdEnforcement +// is set to 'shed', the store will shed its leases to other healthy stores. +var ShedIOOverloadThresholdBuffer = settings.RegisterFloatSetting( + settings.SystemOnly, + "kv.allocator.io_overload_threshold_lease_shed_buffer", + "a store will shed its leases when its IO overload score is above this "+ + "value added to `kv.allocator.io_overload_threshold` and "+ + "`kv.allocator.io_overload_threshold_enforcement_leases` is `shed`", + DefaultIOOverloadShedBuffer, +) + +// LeaseIOOverloadThresholdEnforcement defines the level of enforcement for +// lease transfers when a candidate stores' IO overload exceeds the threshold +// defined in IOOverloadThreshold, and additionally +// ShedIOOverloadThresholdBuffer when shed is set. +var LeaseIOOverloadThresholdEnforcement = settings.RegisterEnumSetting( + settings.SystemOnly, + "kv.allocator.io_overload_threshold_enforcement_leases", + "the level of enforcement on lease transfers when a candidate store has an"+ + "io overload score exceeding `kv.allocator.io_overload_threshold` and above the "+ + "average of comparable allocation candidates:`block_none` will exclude "+ + "no candidate stores, `block_none_log` will exclude no candidates but log an "+ + "event, `block` will exclude candidates stores from being "+ + "targets of lease transfers, `shed` will exclude candidate stores "+ + "from being targets of lease transfers and cause the existing "+ + "leaseholder to transfer away its lease", + "block", + map[int64]string{ + int64(LeaseIOOverloadThresholdNoAction): "block_none", + int64(LeaseIOOverloadThresholdNoActionLogOnly): "block_none_log", + int64(LeaseIOOverloadThresholdBlock): "block", + int64(LeaseIOOverloadThresholdShed): "shed", + }, +) + // ScorerOptions defines the interface for the two heuristics that trigger // replica rebalancing: range count convergence and QPS convergence. type ScorerOptions interface { @@ -2177,8 +2245,10 @@ func convergesOnMean(oldVal, newVal, mean float64) bool { // StoreHealthOptions is the scorer options for store health. It is // used to inform scoring based on the health of a store. type StoreHealthOptions struct { - EnforcementLevel IOOverloadEnforcementLevel - IOOverloadThreshold float64 + EnforcementLevel IOOverloadEnforcementLevel + LeaseEnforcementLevel LeaseIOOverloadEnforcementLevel + IOOverloadThreshold float64 + LeaseIOOverloadThresholdShedBuffer float64 } // storeIsHealthy returns true if the store IO overload does not exceed @@ -2192,7 +2262,6 @@ func (o StoreHealthOptions) storeIsHealthy( ioOverloadScore < o.IOOverloadThreshold { return true } - // Still log an event when the IO overload score exceeds the threshold, however // does not exceed the cluster average. This is enabled to avoid confusion // where candidate stores are still targets, despite exeeding the @@ -2245,6 +2314,65 @@ func (o StoreHealthOptions) rebalanceToStoreIsHealthy( return o.EnforcementLevel < IOOverloadThresholdBlockRebalanceTo } +func (o StoreHealthOptions) transferToStoreIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + ioOverloadScore, _ := store.Capacity.IOThreshold.Score() + if o.LeaseEnforcementLevel == LeaseIOOverloadThresholdNoAction || + ioOverloadScore < o.IOOverloadThreshold { + return true + } + + if ioOverloadScore < avg*IOOverloadMeanThreshold { + log.KvDistribution.VEventf(ctx, 5, + "s%d, lease transfer check io overload %.2f exceeds threshold %.2f, but "+ + "below average watermark: %.2f, action enabled %d", + store.StoreID, ioOverloadScore, o.IOOverloadThreshold, + avg*IOOverloadMeanThreshold, o.LeaseEnforcementLevel) + return true + } + + log.KvDistribution.VEventf(ctx, 5, + "s%d, lease transfer check io overload %.2f exceeds threshold %.2f, above average "+ + "watermark: %.2f, action enabled %d", + store.StoreID, ioOverloadScore, o.IOOverloadThreshold, + avg*IOOverloadMeanThreshold, o.LeaseEnforcementLevel) + + // The store is only considered unhealthy when the enforcement level is + // LeaseIOOverloadThresholdNoAction. + return o.LeaseEnforcementLevel < LeaseIOOverloadThresholdBlock +} + +func (o StoreHealthOptions) leaseStoreIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + threshold := o.IOOverloadThreshold + o.LeaseIOOverloadThresholdShedBuffer + ioOverloadScore, _ := store.Capacity.IOThreshold.Score() + if o.LeaseEnforcementLevel == LeaseIOOverloadThresholdNoAction || + ioOverloadScore < threshold { + return true + } + + if ioOverloadScore < avg*IOOverloadMeanThreshold { + log.KvDistribution.VEventf(ctx, 5, + "s%d, existing leaseholder check io overload %.2f exceeds threshold %.2f, but "+ + "below average watermark: %.2f, action enabled %d", + store.StoreID, ioOverloadScore, o.IOOverloadThreshold, + avg*IOOverloadMeanThreshold, o.LeaseEnforcementLevel) + return true + } + + log.KvDistribution.VEventf(ctx, 5, + "s%d, existing leaseholder check io overload %.2f exceeds threshold %.2f, above average "+ + "watermark: %.2f, action enabled %d", + store.StoreID, ioOverloadScore, o.IOOverloadThreshold, + avg*IOOverloadMeanThreshold, o.LeaseEnforcementLevel) + + // The existing leaseholder store is only considered unhealthy when the enforcement level is + // LeaseIOOverloadThresholdShed. + return o.LeaseEnforcementLevel < LeaseIOOverloadThresholdShed +} + // rebalanceToMaxCapacityCheck returns true if the store has enough room to // accept a rebalance. The bar for this is stricter than for whether a store // has enough room to accept a necessary replica (i.e. via AllocateCandidates). diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index adce6f429a6c..60cc4b38c6ae 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -1976,6 +1976,141 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { } } +func TestAllocatorTransferLeaseTargetHealthCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + floats := func(nums ...float64) []float64 { + return nums + } + + // We want the shed threshold to be 0.9 and the overload threhsold to be 0.5 + // i.e. block transfers at >=0.5 and block transfers + shed leases at >=0.9. + const shedBuffer = 0.4 + const ioOverloadThreshold = 0.5 + + testCases := []struct { + name string + leaseCounts, IOScores []float64 + leaseholder roachpb.StoreID + excludeLeaseRepl bool + expected roachpb.StoreID + enforcement LeaseIOOverloadEnforcementLevel + }{ + { + name: "don't move off of store with high io overload when block enforcement", + leaseCounts: floats(100, 100, 100, 100, 100), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 1, + expected: 0, + enforcement: LeaseIOOverloadThresholdBlock, + }, + { + name: "move off of store with high io overload when shed enforcement", + leaseCounts: floats(100, 100, 100, 100, 100), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 1, + // Store 3 is above the threshold (1.0 > 0.8), but equal to the avg (1.0), so + // it is still considered a healthy candidate. + expected: 3, + enforcement: LeaseIOOverloadThresholdShed, + }, + { + name: "don't transfer to io overloaded store when block enforcement", + leaseCounts: floats(0, 100, 100, 400, 400), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 5, + expected: 3, + enforcement: LeaseIOOverloadThresholdBlock, + }, + { + name: "don't transfer to io overloaded store when shed enforcement", + leaseCounts: floats(0, 100, 100, 400, 400), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 5, + expected: 3, + enforcement: LeaseIOOverloadThresholdShed, + }, + { + name: "still transfer to io overloaded store when no action enforcement", + leaseCounts: floats(0, 100, 100, 400, 400), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 5, + expected: 2, + enforcement: LeaseIOOverloadThresholdNoAction, + }, + { + name: "still transfer to io overloaded store when no action log enforcement", + leaseCounts: floats(0, 100, 100, 400, 400), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 5, + expected: 2, + enforcement: LeaseIOOverloadThresholdNoActionLogOnly, + }, + { + name: "move off of store with high io overload with skewed lease counts shed enforcement", + leaseCounts: floats(0, 0, 10000, 10000, 10000), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 1, + expected: 3, + enforcement: LeaseIOOverloadThresholdShed, + }, + { + name: "don't move off of store with high io overload but less than shed threshold with shed enforcement", + leaseCounts: floats(0, 0, 0, 0, 0), + IOScores: floats(0.89, 0, 0, 0, 0), + leaseholder: 1, + expected: 0, + enforcement: LeaseIOOverloadThresholdShed, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + n := len(tc.leaseCounts) + stores := make([]*roachpb.StoreDescriptor, n) + existing := make([]roachpb.ReplicaDescriptor, 0, n) + for i := range tc.leaseCounts { + existing = append(existing, replicas(roachpb.StoreID(i+1))...) + stores[i] = &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i + 1), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)}, + Capacity: roachpb.StoreCapacity{ + LeaseCount: int32(tc.leaseCounts[i]), + IOThreshold: TestingIOThresholdWithScore(tc.IOScores[i]), + }, + } + } + + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + LeaseIOOverloadThresholdEnforcement.Override(ctx, &a.st.SV, int64(tc.enforcement)) + IOOverloadThreshold.Override(ctx, &a.st.SV, ioOverloadThreshold) + ShedIOOverloadThresholdBuffer.Override(ctx, &a.st.SV, shedBuffer) + + target := a.TransferLeaseTarget( + ctx, + sp, + emptySpanConfig(), + existing, + &mockRepl{ + replicationFactor: int32(n), + storeID: tc.leaseholder, + }, + allocator.RangeUsageInfo{}, /* stats */ + false, /* forceDecisionWithoutStats */ + allocator.TransferLeaseOptions{ + CheckCandidateFullness: true, + }, + ) + require.Equal(t, tc.expected, target.StoreID) + }) + } + +} + func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 14a4fabe11ae..ed4f04e785df 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -284,7 +284,7 @@ func (sr *StoreRebalancer) scorerOptions( ctx context.Context, lbDimension load.Dimension, ) *allocatorimpl.LoadScorerOptions { return &allocatorimpl.LoadScorerOptions{ - StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), + StoreHealthOptions: sr.allocator.StoreHealthOptions(), Deterministic: sr.storePool.IsDeterministic(), LoadDims: []load.Dimension{lbDimension}, LoadThreshold: allocatorimpl.LoadThresholds(&sr.st.SV, lbDimension),