Skip to content

Commit

Permalink
kvserver: enqueue replicas into lease queue when IO overloaded
Browse files Browse the repository at this point in the history
Previously, when the allocator lease IO overload threshold enforcement
was set to `shed`, leases would take up to 10 minutes to drain from an
IO overloaded store. This was because the shed pacing mechanism was the
replica scanner, which by default takes up to 10 minutes.

This commit introduces (1) proactive gossip of the store capacity when
it both increases over the last value and is greater than an absolute
value (LeaseIOOverloadThreshold), and (2) enqueues leaseholder replicas
into the lease queue when receiving a gossip update, if the gossip
update is for the local store and the store is IO overloaded.

In order to prevent wasted resources, the mass enqueue on overload only
occurs at most once every
`kv.allocator.min_io_overload_lease_shed_interval`, which defaults to
30s.

The combination of (1) proactive gossip on IO overload increase, and (2)
enqueuing into the lease queue on shed conditions being met, results in
leases shedding from an IO overloaded node in under a second.

Part of: #118866
Release note: None
  • Loading branch information
kvoli committed Mar 22, 2024
1 parent 0b6de9c commit 888bc90
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2120,7 +2120,7 @@ func (a *Allocator) nonIOOverloadedLeaseTargets(
// Instead, we create a buffer between the two to avoid leases moving back
// and forth.
if (replDesc.StoreID == leaseStoreID) &&
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)) {
(!ok || !ioOverloadOptions.ExistingLeaseCheck(ctx, store, sl)) {
continue
}

Expand Down Expand Up @@ -2156,7 +2156,7 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
// overloaded.
for _, replDesc := range existingReplicas {
if store, ok := sl.FindStoreByID(replDesc.StoreID); ok && replDesc.StoreID == leaseStoreID {
return !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)
return !ioOverloadOptions.ExistingLeaseCheck(ctx, store, sl)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,10 +2496,10 @@ func (o IOOverloadOptions) transferLeaseToCheck(
return true
}

// transferLeaseToCheck returns true if the store IO overload does not exceed
// the cluster threshold and mean, or the enforcement level does not prevent
// existing stores from holidng leases whilst being IO overloaded.
func (o IOOverloadOptions) existingLeaseCheck(
// ExistingLeaseCheck returns true if the store IO overload does not exceed the
// cluster threshold and mean, or the enforcement level does not prevent
// existing stores from holding leases whilst being IO overloaded.
func (o IOOverloadOptions) ExistingLeaseCheck(
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score := o.storeScore(store)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/state",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/util/hlc",
"//pkg/util/protoutil",
],
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/asim/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func newStoreGossiper(descriptorGetter func(cached bool) roachpb.StoreDescriptor

desc := sg.descriptorGetter(false /* cached */)
knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true}
sg.local = kvserver.NewStoreGossip(sg, sg, knobs)
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &cluster.MakeTestingClusterSettings().SV)
sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID}

return sg
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/lease_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// MinIOOverloadLeaseShedInterval controls how frequently a store may decide to
// shed all leases due to becoming IO overloaded.
var MinIOOverloadLeaseShedInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.allocator.min_io_overload_lease_shed_interval",
"controls how frequently all leases can be shed from a node "+
"due to the node becoming IO overloaded",
30*time.Second,
settings.NonNegativeDuration,
)

type leaseQueue struct {
planner plan.ReplicationPlanner
allocator allocatorimpl.Allocator
Expand Down Expand Up @@ -171,6 +182,11 @@ func (lq *leaseQueue) canTransferLeaseFrom(
if repl.LeaseViolatesPreferences(ctx, conf) {
return true
}
// If the local store is IO overloaded, then always allow transferring the
// lease away from the replica.
if !lq.store.existingLeaseCheckIOOverload(ctx) {
return true
}
if lastLeaseTransfer := lq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil {
minInterval := MinLeaseTransferInterval.Get(&lq.store.cfg.Settings.SV)
return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval
Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -289,3 +290,34 @@ func TestLeaseQueueRaceReplicateQueue(t *testing.T) {
_, processErr, _ := repl.Store().Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
require.ErrorIs(t, processErr, plan.NewErrAllocatorToken("lease"))
}

// TestLeaseQueueShedsOnIOOverload asserts that leases are shed if the store
// becomes IO overloaded.
func TestLeaseQueueShedsOnIOOverload(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

s1 := tc.GetFirstStoreFromServer(t, 0)
capacityBefore, err := s1.Capacity(ctx, false /* useCached */)
require.NoError(t, err)
// The test wouldn't be interesting if there aren't already leases on s1.
require.Greater(t, capacityBefore.LeaseCount, int32(20))

ioThreshold := allocatorimpl.TestingIOThresholdWithScore(1)
s1.UpdateIOThreshold(&ioThreshold)
testutils.SucceedsSoon(t, func() error {
capacityAfter, err := s1.Capacity(ctx, false /* useCached */)
if err != nil {
return err
}
if capacityAfter.LeaseCount > 0 {
return errors.Errorf("expected 0 leases on store 1, found %d",
capacityAfter.LeaseCount)
}
return nil
})
}
98 changes: 96 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
Expand Down Expand Up @@ -1068,6 +1069,10 @@ type Store struct {
maxL0Size *slidingwindow.Swag
}

// lastIOOverloadLeaseShed tracks the last time the store attempted to shed
// all range leases it held due to becoming IO overloaded.
lastIOOverloadLeaseShed atomic.Value

counts struct {
// Number of placeholders removed due to error. Not a good fit for meaningful
// metrics, as snapshots to initialized ranges don't get a placeholder.
Expand Down Expand Up @@ -1430,6 +1435,7 @@ func NewStore(
// store pool in those cases.
allocatorStorePool = cfg.StorePool
storePoolIsDeterministic = allocatorStorePool.IsDeterministic()
allocatorStorePool.SetOnCapacityChange(s.makeIOOverloadCapacityChangeFn())

s.rebalanceObjManager = newRebalanceObjectiveManager(
ctx,
Expand Down Expand Up @@ -1624,7 +1630,7 @@ func NewStore(
updateSystemConfigUpdateQueueLimits)

if s.cfg.Gossip != nil {
s.storeGossip = NewStoreGossip(cfg.Gossip, s, cfg.TestingKnobs.GossipTestingKnobs)
s.storeGossip = NewStoreGossip(cfg.Gossip, s, cfg.TestingKnobs.GossipTestingKnobs, &cfg.Settings.SV)

// Add range scanner and configure with queues.
s.scanner = newReplicaScanner(
Expand Down Expand Up @@ -2584,12 +2590,100 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error {
// StoreDescriptor.
func (s *Store) UpdateIOThreshold(ioThreshold *admissionpb.IOThreshold) {
now := s.Clock().Now().GoTime()

s.ioThreshold.Lock()
defer s.ioThreshold.Unlock()
s.ioThreshold.t = ioThreshold
s.ioThreshold.maxL0NumSubLevels.Record(now, float64(ioThreshold.L0NumSubLevels))
s.ioThreshold.maxL0NumFiles.Record(now, float64(ioThreshold.L0NumFiles))
s.ioThreshold.maxL0Size.Record(now, float64(ioThreshold.L0Size))
maxL0NumSubLevels, _ := s.ioThreshold.maxL0NumSubLevels.Query(now)
maxL0NumFiles, _ := s.ioThreshold.maxL0NumFiles.Query(now)
maxL0Size, _ := s.ioThreshold.maxL0Size.Query(now)
s.ioThreshold.Unlock()

ioThresholdMax := protoutil.Clone(ioThreshold).(*admissionpb.IOThreshold)
ioThresholdMax.L0NumSubLevels = int64(maxL0NumSubLevels)
ioThresholdMax.L0NumFiles = int64(maxL0NumFiles)
ioThresholdMax.L0Size = int64(maxL0Size)

// Update the store's cached capacity and potentially gossip the updated
// capacity async if the IO threshold has increased.
s.storeGossip.RecordNewIOThreshold(*ioThreshold, *ioThresholdMax)
}

// existingLeaseCheckIOOverload checks whether the store is IO overloaded
// enough that it would fail the allocator checks for holding leases, returning
// true when okay and false otherwise. When false is returned, it indicates
// that a replicas' lease will be transferred away when encountered by the
// lease queue.
func (s *Store) existingLeaseCheckIOOverload(ctx context.Context) bool {
storeList, _, _ := s.cfg.StorePool.GetStoreList(storepool.StoreFilterNone)
storeDescriptor, ok := s.cfg.StorePool.GetStoreDescriptor(s.StoreID())
if !ok {
return false
}
return s.allocator.IOOverloadOptions().ExistingLeaseCheck(
ctx, storeDescriptor, storeList)
}

// makeIOOverloadCapacityChangeFn returns a capacity change callback which will
// enqueue all leaseholder replicas into the lease queue when: (1) the capacity
// change is for the local store and (2) the store's IO is considered too
// overloaded to hold onto its existing leases. The leases will be processed
// via the lease queue and shed to another replica, if available.
func (s *Store) makeIOOverloadCapacityChangeFn() storepool.CapacityChangeFn {
return func(storeID roachpb.StoreID, old, cur roachpb.StoreCapacity) {
// There's nothing to do when there are no leases on the store.
if cur.LeaseCount == 0 {
return
}

// Don't react to other stores capacity changes, only IO overload change on
// the local store descriptor is relevant.
if !s.IsStarted() || s.StoreID() != storeID {
return
}

// Avoid shedding leases too frequently by checking the last time a shed
// was attempted.
if lastShed := s.lastIOOverloadLeaseShed.Load(); lastShed != nil {
minInterval := MinIOOverloadLeaseShedInterval.Get(&s.cfg.Settings.SV)
if timeutil.Since(lastShed.(time.Time)) < minInterval {
return
}
}

// Lastly, check whether the store is considered IO overloaded relative to
// the configured threshold and the cluster average.
ctx := context.Background()
s.AnnotateCtx(ctx)
if s.existingLeaseCheckIOOverload(ctx) {
return
}

// NB: Update the last shed time prior to trying to shed leases, this
// should limit the window of concurrent shedding activity (in the case
// where multiple capacity changes are called within a short window). This
// could be removed entirely with a mutex but hardly seems necessary.
s.lastIOOverloadLeaseShed.Store(s.Clock().Now().GoTime())
log.KvDistribution.Infof(
ctx, "IO overload detected, will shed leases %v", cur.LeaseCount)

// This callback is on the gossip goroutine, once we know we wish to shed
// leases, split off the actual enqueuing work to a separate async task
// goroutine.
if err := s.stopper.RunTask(ctx, "io-overload: shed leases", func(ctx context.Context) {
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
s.leaseQueue.maybeAdd(ctx, repl, repl.Clock().NowAsClockTimestamp())
return true /* wantMore */
})
}); err != nil {
log.KvDistribution.Infof(ctx,
"unable to shed leases due to IO overload: %v", err)
// An error should only be encountered when the server is quiescing, as
// such we don't reset the timer on a failed attempt.
}
}
}

// VisitReplicasOption optionally modifies store.VisitReplicas.
Expand Down
33 changes: 32 additions & 1 deletion pkg/kv/kvserver/store_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -223,6 +226,7 @@ type StoreGossip struct {
// descriptorGetter is used for getting an up to date or cached store
// descriptor to gossip.
descriptorGetter StoreDescriptorProvider
sv *settings.Values
}

// StoreGossipTestingKnobs defines the testing knobs specific to StoreGossip.
Expand All @@ -249,13 +253,17 @@ type StoreGossipTestingKnobs struct {
// store descriptor: both proactively, calling Gossip() and reacively on
// capacity/load changes.
func NewStoreGossip(
gossiper InfoGossiper, descGetter StoreDescriptorProvider, testingKnobs StoreGossipTestingKnobs,
gossiper InfoGossiper,
descGetter StoreDescriptorProvider,
testingKnobs StoreGossipTestingKnobs,
sv *settings.Values,
) *StoreGossip {
return &StoreGossip{
cachedCapacity: &cachedCapacity{},
gossiper: gossiper,
descriptorGetter: descGetter,
knobs: testingKnobs,
sv: sv,
}
}

Expand Down Expand Up @@ -414,6 +422,20 @@ func (s *StoreGossip) RecordNewPerSecondStats(newQPS, newWPS float64) {
}
}

// RecordNewIOThreshold takes new values for the IO threshold and recent
// maximum IO threshold and decides whether the score has changed enough to
// justify re-gossiping the store's capacity.
func (s *StoreGossip) RecordNewIOThreshold(threshold, thresholdMax admissionpb.IOThreshold) {
s.cachedCapacity.Lock()
s.cachedCapacity.cached.IOThreshold = threshold
s.cachedCapacity.cached.IOThresholdMax = thresholdMax
s.cachedCapacity.Unlock()

if shouldGossip, reason := s.shouldGossipOnCapacityDelta(); shouldGossip {
s.asyncGossipStore(context.TODO(), reason, true /* useCached */)
}
}

// shouldGossipOnCapacityDelta determines whether the difference between the
// last gossiped store capacity and the currently cached capacity is large
// enough that gossiping immediately is required to avoid poor allocation
Expand All @@ -431,6 +453,8 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string)
gossipWhenCapacityDeltaExceedsFraction = overrideCapacityDeltaFraction
}

gossipMinMaxIOOverloadScore := allocatorimpl.LeaseIOOverloadThreshold.Get(s.sv)

s.cachedCapacity.Lock()
updateForQPS, deltaQPS := deltaExceedsThreshold(
s.cachedCapacity.lastGossiped.QueriesPerSecond, s.cachedCapacity.cached.QueriesPerSecond,
Expand All @@ -444,6 +468,10 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string)
updateForLeaseCount, deltaLeaseCount := deltaExceedsThreshold(
float64(s.cachedCapacity.lastGossiped.LeaseCount), float64(s.cachedCapacity.cached.LeaseCount),
gossipWhenLeaseCountDeltaExceeds, gossipWhenCapacityDeltaExceedsFraction)
cachedMaxIOScore, _ := s.cachedCapacity.cached.IOThresholdMax.Score()
lastGossipMaxIOScore, _ := s.cachedCapacity.lastGossiped.IOThresholdMax.Score()
updateForMaxIOOverloadScore := cachedMaxIOScore >= gossipMinMaxIOOverloadScore &&
cachedMaxIOScore > lastGossipMaxIOScore
s.cachedCapacity.Unlock()

if s.knobs.DisableLeaseCapacityGossip {
Expand All @@ -462,6 +490,9 @@ func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string)
if updateForLeaseCount {
reason += fmt.Sprintf("lease-count(%.1f) ", deltaLeaseCount)
}
if updateForMaxIOOverloadScore {
reason += fmt.Sprintf("io-overload(%.1f) ", cachedMaxIOScore)
}
if reason != "" {
should = true
reason += "change"
Expand Down
Loading

0 comments on commit 888bc90

Please sign in to comment.