From 888bc909498390e775173694733e8a2258b7e9c1 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 26 Feb 2024 16:47:51 -0500 Subject: [PATCH] kvserver: enqueue replicas into lease queue when IO overloaded Previously, when the allocator lease IO overload threshold enforcement was set to `shed`, leases would take up to 10 minutes to drain from an IO overloaded store. This was because the shed pacing mechanism was the replica scanner, which by default takes up to 10 minutes. This commit introduces (1) proactive gossip of the store capacity when it both increases over the last value and is greater than an absolute value (LeaseIOOverloadThreshold), and (2) enqueues leaseholder replicas into the lease queue when receiving a gossip update, if the gossip update is for the local store and the store is IO overloaded. In order to prevent wasted resources, the mass enqueue on overload only occurs at most once every `kv.allocator.min_io_overload_lease_shed_interval`, which defaults to 30s. The combination of (1) proactive gossip on IO overload increase, and (2) enqueuing into the lease queue on shed conditions being met, results in leases shedding from an IO overloaded node in under a second. Part of: #118866 Release note: None --- .../allocator/allocatorimpl/allocator.go | 4 +- .../allocatorimpl/allocator_scorer.go | 8 +- pkg/kv/kvserver/asim/gossip/BUILD.bazel | 1 + pkg/kv/kvserver/asim/gossip/gossip.go | 3 +- pkg/kv/kvserver/lease_queue.go | 16 +++ pkg/kv/kvserver/lease_queue_test.go | 32 ++++++ pkg/kv/kvserver/store.go | 98 ++++++++++++++++++- pkg/kv/kvserver/store_gossip.go | 33 ++++++- pkg/kv/kvserver/store_gossip_test.go | 30 +++++- 9 files changed, 214 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 93e4c41fdecd..0c78b28ccad8 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2120,7 +2120,7 @@ func (a *Allocator) nonIOOverloadedLeaseTargets( // Instead, we create a buffer between the two to avoid leases moving back // and forth. if (replDesc.StoreID == leaseStoreID) && - (!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)) { + (!ok || !ioOverloadOptions.ExistingLeaseCheck(ctx, store, sl)) { continue } @@ -2156,7 +2156,7 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload( // overloaded. for _, replDesc := range existingReplicas { if store, ok := sl.FindStoreByID(replDesc.StoreID); ok && replDesc.StoreID == leaseStoreID { - return !ioOverloadOptions.existingLeaseCheck(ctx, store, sl) + return !ioOverloadOptions.ExistingLeaseCheck(ctx, store, sl) } } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index ea6e1ec6c16d..d7a078ceda37 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -2496,10 +2496,10 @@ func (o IOOverloadOptions) transferLeaseToCheck( return true } -// transferLeaseToCheck returns true if the store IO overload does not exceed -// the cluster threshold and mean, or the enforcement level does not prevent -// existing stores from holidng leases whilst being IO overloaded. -func (o IOOverloadOptions) existingLeaseCheck( +// ExistingLeaseCheck returns true if the store IO overload does not exceed the +// cluster threshold and mean, or the enforcement level does not prevent +// existing stores from holding leases whilst being IO overloaded. +func (o IOOverloadOptions) ExistingLeaseCheck( ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList, ) bool { score := o.storeScore(store) diff --git a/pkg/kv/kvserver/asim/gossip/BUILD.bazel b/pkg/kv/kvserver/asim/gossip/BUILD.bazel index 277c9dc94138..e0693b56bb95 100644 --- a/pkg/kv/kvserver/asim/gossip/BUILD.bazel +++ b/pkg/kv/kvserver/asim/gossip/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/util/hlc", "//pkg/util/protoutil", ], diff --git a/pkg/kv/kvserver/asim/gossip/gossip.go b/pkg/kv/kvserver/asim/gossip/gossip.go index 36a923232863..2fbf54c336b0 100644 --- a/pkg/kv/kvserver/asim/gossip/gossip.go +++ b/pkg/kv/kvserver/asim/gossip/gossip.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) @@ -60,7 +61,7 @@ func newStoreGossiper(descriptorGetter func(cached bool) roachpb.StoreDescriptor desc := sg.descriptorGetter(false /* cached */) knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true} - sg.local = kvserver.NewStoreGossip(sg, sg, knobs) + sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &cluster.MakeTestingClusterSettings().SV) sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID} return sg diff --git a/pkg/kv/kvserver/lease_queue.go b/pkg/kv/kvserver/lease_queue.go index 2a9155948019..ae4ef3877d4e 100644 --- a/pkg/kv/kvserver/lease_queue.go +++ b/pkg/kv/kvserver/lease_queue.go @@ -54,6 +54,17 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) +// MinIOOverloadLeaseShedInterval controls how frequently a store may decide to +// shed all leases due to becoming IO overloaded. +var MinIOOverloadLeaseShedInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.allocator.min_io_overload_lease_shed_interval", + "controls how frequently all leases can be shed from a node "+ + "due to the node becoming IO overloaded", + 30*time.Second, + settings.NonNegativeDuration, +) + type leaseQueue struct { planner plan.ReplicationPlanner allocator allocatorimpl.Allocator @@ -171,6 +182,11 @@ func (lq *leaseQueue) canTransferLeaseFrom( if repl.LeaseViolatesPreferences(ctx, conf) { return true } + // If the local store is IO overloaded, then always allow transferring the + // lease away from the replica. + if !lq.store.existingLeaseCheckIOOverload(ctx) { + return true + } if lastLeaseTransfer := lq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil { minInterval := MinLeaseTransferInterval.Get(&lq.store.cfg.Settings.SV) return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval diff --git a/pkg/kv/kvserver/lease_queue_test.go b/pkg/kv/kvserver/lease_queue_test.go index f105a62e429f..18180bcc2664 100644 --- a/pkg/kv/kvserver/lease_queue_test.go +++ b/pkg/kv/kvserver/lease_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -289,3 +290,34 @@ func TestLeaseQueueRaceReplicateQueue(t *testing.T) { _, processErr, _ := repl.Store().Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */) require.ErrorIs(t, processErr, plan.NewErrAllocatorToken("lease")) } + +// TestLeaseQueueShedsOnIOOverload asserts that leases are shed if the store +// becomes IO overloaded. +func TestLeaseQueueShedsOnIOOverload(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s1 := tc.GetFirstStoreFromServer(t, 0) + capacityBefore, err := s1.Capacity(ctx, false /* useCached */) + require.NoError(t, err) + // The test wouldn't be interesting if there aren't already leases on s1. + require.Greater(t, capacityBefore.LeaseCount, int32(20)) + + ioThreshold := allocatorimpl.TestingIOThresholdWithScore(1) + s1.UpdateIOThreshold(&ioThreshold) + testutils.SucceedsSoon(t, func() error { + capacityAfter, err := s1.Capacity(ctx, false /* useCached */) + if err != nil { + return err + } + if capacityAfter.LeaseCount > 0 { + return errors.Errorf("expected 0 leases on store 1, found %d", + capacityAfter.LeaseCount) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 19252a041d9d..667ff65c88e7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -82,6 +82,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/shuffle" @@ -1068,6 +1069,10 @@ type Store struct { maxL0Size *slidingwindow.Swag } + // lastIOOverloadLeaseShed tracks the last time the store attempted to shed + // all range leases it held due to becoming IO overloaded. + lastIOOverloadLeaseShed atomic.Value + counts struct { // Number of placeholders removed due to error. Not a good fit for meaningful // metrics, as snapshots to initialized ranges don't get a placeholder. @@ -1430,6 +1435,7 @@ func NewStore( // store pool in those cases. allocatorStorePool = cfg.StorePool storePoolIsDeterministic = allocatorStorePool.IsDeterministic() + allocatorStorePool.SetOnCapacityChange(s.makeIOOverloadCapacityChangeFn()) s.rebalanceObjManager = newRebalanceObjectiveManager( ctx, @@ -1624,7 +1630,7 @@ func NewStore( updateSystemConfigUpdateQueueLimits) if s.cfg.Gossip != nil { - s.storeGossip = NewStoreGossip(cfg.Gossip, s, cfg.TestingKnobs.GossipTestingKnobs) + s.storeGossip = NewStoreGossip(cfg.Gossip, s, cfg.TestingKnobs.GossipTestingKnobs, &cfg.Settings.SV) // Add range scanner and configure with queues. s.scanner = newReplicaScanner( @@ -2584,12 +2590,100 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // StoreDescriptor. func (s *Store) UpdateIOThreshold(ioThreshold *admissionpb.IOThreshold) { now := s.Clock().Now().GoTime() + s.ioThreshold.Lock() - defer s.ioThreshold.Unlock() s.ioThreshold.t = ioThreshold s.ioThreshold.maxL0NumSubLevels.Record(now, float64(ioThreshold.L0NumSubLevels)) s.ioThreshold.maxL0NumFiles.Record(now, float64(ioThreshold.L0NumFiles)) s.ioThreshold.maxL0Size.Record(now, float64(ioThreshold.L0Size)) + maxL0NumSubLevels, _ := s.ioThreshold.maxL0NumSubLevels.Query(now) + maxL0NumFiles, _ := s.ioThreshold.maxL0NumFiles.Query(now) + maxL0Size, _ := s.ioThreshold.maxL0Size.Query(now) + s.ioThreshold.Unlock() + + ioThresholdMax := protoutil.Clone(ioThreshold).(*admissionpb.IOThreshold) + ioThresholdMax.L0NumSubLevels = int64(maxL0NumSubLevels) + ioThresholdMax.L0NumFiles = int64(maxL0NumFiles) + ioThresholdMax.L0Size = int64(maxL0Size) + + // Update the store's cached capacity and potentially gossip the updated + // capacity async if the IO threshold has increased. + s.storeGossip.RecordNewIOThreshold(*ioThreshold, *ioThresholdMax) +} + +// existingLeaseCheckIOOverload checks whether the store is IO overloaded +// enough that it would fail the allocator checks for holding leases, returning +// true when okay and false otherwise. When false is returned, it indicates +// that a replicas' lease will be transferred away when encountered by the +// lease queue. +func (s *Store) existingLeaseCheckIOOverload(ctx context.Context) bool { + storeList, _, _ := s.cfg.StorePool.GetStoreList(storepool.StoreFilterNone) + storeDescriptor, ok := s.cfg.StorePool.GetStoreDescriptor(s.StoreID()) + if !ok { + return false + } + return s.allocator.IOOverloadOptions().ExistingLeaseCheck( + ctx, storeDescriptor, storeList) +} + +// makeIOOverloadCapacityChangeFn returns a capacity change callback which will +// enqueue all leaseholder replicas into the lease queue when: (1) the capacity +// change is for the local store and (2) the store's IO is considered too +// overloaded to hold onto its existing leases. The leases will be processed +// via the lease queue and shed to another replica, if available. +func (s *Store) makeIOOverloadCapacityChangeFn() storepool.CapacityChangeFn { + return func(storeID roachpb.StoreID, old, cur roachpb.StoreCapacity) { + // There's nothing to do when there are no leases on the store. + if cur.LeaseCount == 0 { + return + } + + // Don't react to other stores capacity changes, only IO overload change on + // the local store descriptor is relevant. + if !s.IsStarted() || s.StoreID() != storeID { + return + } + + // Avoid shedding leases too frequently by checking the last time a shed + // was attempted. + if lastShed := s.lastIOOverloadLeaseShed.Load(); lastShed != nil { + minInterval := MinIOOverloadLeaseShedInterval.Get(&s.cfg.Settings.SV) + if timeutil.Since(lastShed.(time.Time)) < minInterval { + return + } + } + + // Lastly, check whether the store is considered IO overloaded relative to + // the configured threshold and the cluster average. + ctx := context.Background() + s.AnnotateCtx(ctx) + if s.existingLeaseCheckIOOverload(ctx) { + return + } + + // NB: Update the last shed time prior to trying to shed leases, this + // should limit the window of concurrent shedding activity (in the case + // where multiple capacity changes are called within a short window). This + // could be removed entirely with a mutex but hardly seems necessary. + s.lastIOOverloadLeaseShed.Store(s.Clock().Now().GoTime()) + log.KvDistribution.Infof( + ctx, "IO overload detected, will shed leases %v", cur.LeaseCount) + + // This callback is on the gossip goroutine, once we know we wish to shed + // leases, split off the actual enqueuing work to a separate async task + // goroutine. + if err := s.stopper.RunTask(ctx, "io-overload: shed leases", func(ctx context.Context) { + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + s.leaseQueue.maybeAdd(ctx, repl, repl.Clock().NowAsClockTimestamp()) + return true /* wantMore */ + }) + }); err != nil { + log.KvDistribution.Infof(ctx, + "unable to shed leases due to IO overload: %v", err) + // An error should only be encountered when the server is quiescing, as + // such we don't reset the timer on a failed attempt. + } + } } // VisitReplicasOption optionally modifies store.VisitReplicas. diff --git a/pkg/kv/kvserver/store_gossip.go b/pkg/kv/kvserver/store_gossip.go index b555bc32eb4b..25f8c1252fe1 100644 --- a/pkg/kv/kvserver/store_gossip.go +++ b/pkg/kv/kvserver/store_gossip.go @@ -20,7 +20,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -223,6 +226,7 @@ type StoreGossip struct { // descriptorGetter is used for getting an up to date or cached store // descriptor to gossip. descriptorGetter StoreDescriptorProvider + sv *settings.Values } // StoreGossipTestingKnobs defines the testing knobs specific to StoreGossip. @@ -249,13 +253,17 @@ type StoreGossipTestingKnobs struct { // store descriptor: both proactively, calling Gossip() and reacively on // capacity/load changes. func NewStoreGossip( - gossiper InfoGossiper, descGetter StoreDescriptorProvider, testingKnobs StoreGossipTestingKnobs, + gossiper InfoGossiper, + descGetter StoreDescriptorProvider, + testingKnobs StoreGossipTestingKnobs, + sv *settings.Values, ) *StoreGossip { return &StoreGossip{ cachedCapacity: &cachedCapacity{}, gossiper: gossiper, descriptorGetter: descGetter, knobs: testingKnobs, + sv: sv, } } @@ -414,6 +422,20 @@ func (s *StoreGossip) RecordNewPerSecondStats(newQPS, newWPS float64) { } } +// RecordNewIOThreshold takes new values for the IO threshold and recent +// maximum IO threshold and decides whether the score has changed enough to +// justify re-gossiping the store's capacity. +func (s *StoreGossip) RecordNewIOThreshold(threshold, thresholdMax admissionpb.IOThreshold) { + s.cachedCapacity.Lock() + s.cachedCapacity.cached.IOThreshold = threshold + s.cachedCapacity.cached.IOThresholdMax = thresholdMax + s.cachedCapacity.Unlock() + + if shouldGossip, reason := s.shouldGossipOnCapacityDelta(); shouldGossip { + s.asyncGossipStore(context.TODO(), reason, true /* useCached */) + } +} + // shouldGossipOnCapacityDelta determines whether the difference between the // last gossiped store capacity and the currently cached capacity is large // enough that gossiping immediately is required to avoid poor allocation @@ -431,6 +453,8 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string) gossipWhenCapacityDeltaExceedsFraction = overrideCapacityDeltaFraction } + gossipMinMaxIOOverloadScore := allocatorimpl.LeaseIOOverloadThreshold.Get(s.sv) + s.cachedCapacity.Lock() updateForQPS, deltaQPS := deltaExceedsThreshold( s.cachedCapacity.lastGossiped.QueriesPerSecond, s.cachedCapacity.cached.QueriesPerSecond, @@ -444,6 +468,10 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string) updateForLeaseCount, deltaLeaseCount := deltaExceedsThreshold( float64(s.cachedCapacity.lastGossiped.LeaseCount), float64(s.cachedCapacity.cached.LeaseCount), gossipWhenLeaseCountDeltaExceeds, gossipWhenCapacityDeltaExceedsFraction) + cachedMaxIOScore, _ := s.cachedCapacity.cached.IOThresholdMax.Score() + lastGossipMaxIOScore, _ := s.cachedCapacity.lastGossiped.IOThresholdMax.Score() + updateForMaxIOOverloadScore := cachedMaxIOScore >= gossipMinMaxIOOverloadScore && + cachedMaxIOScore > lastGossipMaxIOScore s.cachedCapacity.Unlock() if s.knobs.DisableLeaseCapacityGossip { @@ -462,6 +490,9 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string) if updateForLeaseCount { reason += fmt.Sprintf("lease-count(%.1f) ", deltaLeaseCount) } + if updateForMaxIOOverloadScore { + reason += fmt.Sprintf("io-overload(%.1f) ", cachedMaxIOScore) + } if reason != "" { should = true reason += "change" diff --git a/pkg/kv/kvserver/store_gossip_test.go b/pkg/kv/kvserver/store_gossip_test.go index d8a026d5aca3..d12f097b1123 100644 --- a/pkg/kv/kvserver/store_gossip_test.go +++ b/pkg/kv/kvserver/store_gossip_test.go @@ -13,7 +13,9 @@ package kvserver import ( "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/require" ) @@ -64,13 +66,39 @@ func TestStoreGossipDeltaTrigger(t *testing.T) { expectedReason: "queries-per-second(100.0) writes-per-second(-100.0) range-count(5.0) lease-count(-5.0) change", expectedShould: true, }, + { + desc: "no delta: IO overload <= minimum", + lastGossiped: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(0)}, + cached: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(allocatorimpl.DefaultLeaseIOOverloadThreshold - 1e9)}, + expectedReason: "", + expectedShould: false, + }, + { + desc: "no delta: IO overload unchanged", + lastGossiped: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(allocatorimpl.DefaultLeaseIOOverloadThreshold)}, + cached: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(allocatorimpl.DefaultLeaseIOOverloadThreshold)}, + expectedReason: "", + expectedShould: false, + }, + { + desc: "should gossip on IO overload increase greater than min", + lastGossiped: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(0)}, + cached: roachpb.StoreCapacity{IOThresholdMax: allocatorimpl.TestingIOThresholdWithScore(allocatorimpl.DefaultLeaseIOOverloadThreshold)}, + expectedReason: "io-overload(0.3) change", + expectedShould: true, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { cfg := &StoreConfig{} cfg.SetDefaults(1 /* numStores */) - sg := NewStoreGossip(nil, nil, cfg.TestingKnobs.GossipTestingKnobs) + sg := NewStoreGossip( + nil, + nil, + cfg.TestingKnobs.GossipTestingKnobs, + &cluster.MakeTestingClusterSettings().SV, + ) sg.cachedCapacity.cached = tc.cached sg.cachedCapacity.lastGossiped = tc.lastGossiped