From 3e341fd46913a75023264742b122624ab3ee1017 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 10 Mar 2023 20:05:23 -0500 Subject: [PATCH] kvserver: disable {split,replicate,mvccGC} queues until... ...subscribed to span configs. Do the same for the store rebalancer. We applied this treatment for the merge queue back in #78122 since the fallback behavior, if not subscribed, is to use the statically defined span config for every operation. - For the replicate queue this mean obtusely applying a replication factor of 3, regardless of configuration. This was possible typically post node restart before subscription was initially established. We saw this in #98385. It was possible then for us to ignore configured voter/non-voter/lease constraints. - For the split queue, we wouldn't actually compute any split keys if unsubscribed, so the missing check was somewhat benign. But we would be obtusely applying the default range sizes [128MiB,512MiB], so for clusters configured with larger range sizes, this could lead to a burst of splitting post node-restart. - For the MVCC GC queue, it would mean applying the the statically configured default GC TTL and ignoring any set protected timestamps. The latter is best-effort protection but could result in internal operations relying on protection (like backups, changefeeds) failing informatively. For clusters configured with GC TTL greater than the default, post node-restart it could lead to a burst of MVCC GC activity and AOST queries failing to find expected data. - For the store rebalancer, ignoring span configs could result in violating lease preferences and voter constraints. Fixes #98421. Fixes #98385. Release note (bug fix): It was previously possible for CockroachDB to not respect non-default zone configs. This only happened for a short window after nodes with existing replicas were restarted, and self-rectified within seconds. This manifested in a few ways: - If num_replicas was set to something other than 3, we would still add or remove replicas to get to 3x replication. - If num_voters was set explicitly to get a mix of voting and non-voting replicas, it would be ignored. CockroachDB could possibly remove non-voting replicas. - If range_min_bytes or range_max_bytes were changed from 128 MiB and 512 MiB respectively, we would instead try to size ranges to be within [128 MiB, 512MiB]. This could appear as an excess amount of range splits or merges, as visible in the Replication Dashboard under "Range Operations". - If gc.ttlseconds was set to something other than 90000 seconds, we would still GC data only older than 90000s/25h. If the GC TTL was set to something larger than 25h, AOST queries going further back may now start failing. For GC TTLs less than the 25h default, clusters would observe increased disk usage due to more retained garbage. - If constraints, lease_preferences or voter_constraints were set, they would be ignored. Range data and leases would possibly be moved outside where prescribed. This issues only lasted a few seconds post node-restarts, and any zone config violations were rectified shortly after. --- pkg/kv/kvserver/kvserverbase/base.go | 27 +++++++++++++++++++++++++ pkg/kv/kvserver/mvcc_gc_queue.go | 28 ++++++++++++++++++++++++++ pkg/kv/kvserver/replicate_queue.go | 24 ++++++++++++++++++++++ pkg/kv/kvserver/split_queue.go | 25 +++++++++++++++++++++++ pkg/kv/kvserver/store_rebalancer.go | 30 ++++++++++++++++++---------- 5 files changed, 124 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 8dfd824b0ccd..70738ddf38a0 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -36,6 +36,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting( true, ) +// ReplicateQueueEnabled is a setting that controls whether the replicate queue +// is enabled. +var ReplicateQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.replicate.queue_enabled", + "whether the replicate queue is enabled", + true, +) + +// SplitQueueEnabled is a setting that controls whether the split queue is +// enabled. +var SplitQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.split.queue_enabled", + "whether the split queue is enabled", + true, +) + +// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is +// enabled. +var MVCCGCQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.mvcc_gc.queue_enabled", + "whether the MVCC GC queue is enabled", + true, +) + // CmdIDKey is a Raft command id. This will be logged unredacted - keep it random. type CmdIDKey string diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index 2d8af90ab5bf..652e5254de83 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "math" "math/rand" "sync/atomic" @@ -232,6 +233,24 @@ func (r mvccGCQueueScore) String() string { humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction)) } +func (mgcq *mvccGCQueue) enabled() bool { + if !mgcq.store.cfg.SpanConfigsDisabled { + if mgcq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { + // If we don't have any span configs available, enabling the + // MVCC GC queue would mean applying the statically configured + // default GC TTL and ignoring any set protected timestamps. The + // latter is best-effort protection, but for clusters configured + // with GC TTL greater than the default, post node-restart it could + // lead to a burst of MVCC GC activity and AOST queries failing to + // find expected data. + return false + } + } + + st := mgcq.store.ClusterSettings() + return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV) +} + // shouldQueue determines whether a replica should be queued for garbage // collection, and if so, at what priority. Returns true for shouldQ // in the event that the cumulative ages of GC'able bytes or extant @@ -239,6 +258,10 @@ func (r mvccGCQueueScore) String() string { func (mgcq *mvccGCQueue) shouldQueue( ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (bool, float64) { + if !mgcq.enabled() { + return false, 0 + } + // Consult the protected timestamp state to determine whether we can GC and // the timestamp which can be used to calculate the score. conf := repl.SpanConfig() @@ -672,6 +695,11 @@ func (r *replicaGCer) GC( func (mgcq *mvccGCQueue) process( ctx context.Context, repl *Replica, _ spanconfig.StoreReader, ) (processed bool, err error) { + if !mgcq.enabled() { + log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled") + return false, nil + } + // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. defer repl.MeasureReqCPUNanos(grunning.Time()) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 198d417eb86e..44c5da2030bf 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "sync/atomic" "time" @@ -613,9 +614,27 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica return rq } +func (rq *replicateQueue) enabled() bool { + if !rq.store.cfg.SpanConfigsDisabled { + if rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { + // If we don't have any span configs available, enabling the + // replicate queue would mean replicating towards the statically + // defined 3x replication in the fallback span config. + return false + } + } + + st := rq.store.ClusterSettings() + return kvserverbase.ReplicateQueueEnabled.Get(&st.SV) +} + func (rq *replicateQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (shouldQueue bool, priority float64) { + if !rq.enabled() { + return false, 0 + } + desc, conf := repl.DescAndSpanConfig() action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, conf, desc) @@ -695,6 +714,11 @@ func (rq *replicateQueue) shouldQueue( func (rq *replicateQueue) process( ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) { + if !rq.enabled() { + log.VEventf(ctx, 2, "skipping replication: queue has been disabled") + return false, nil + } + retryOpts := retry.Options{ InitialBackoff: 50 * time.Millisecond, MaxBackoff: 1 * time.Second, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 478e8868a72a..b1843ee04c25 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -171,6 +171,22 @@ func shouldSplitRange( return shouldQ, priority } +func (sq *splitQueue) enabled() bool { + if !sq.store.cfg.SpanConfigsDisabled { + if sq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { + // If we don't have any span configs available, enabling the + // split queue would mean applying the statically configured range + // sizes in the fallback span config. For clusters configured with + // larger range sizes, this could lead to a burst of splitting post + // node-restart. + return false + } + } + + st := sq.store.ClusterSettings() + return kvserverbase.SplitQueueEnabled.Get(&st.SV) +} + // shouldQueue determines whether a range should be queued for // splitting. This is true if the range is intersected by a zone config // prefix or if the range's size in bytes exceeds the limit for the zone, @@ -178,6 +194,10 @@ func shouldSplitRange( func (sq *splitQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, confReader spanconfig.StoreReader, ) (shouldQ bool, priority float64) { + if !sq.enabled() { + return false, 0 + } + shouldQ, priority = shouldSplitRange(ctx, repl.Desc(), repl.GetMVCCStats(), repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader) @@ -203,6 +223,11 @@ var _ PurgatoryError = unsplittableRangeError{} func (sq *splitQueue) process( ctx context.Context, r *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) { + if !sq.enabled() { + log.VEventf(ctx, 2, "skipping split: queue has been disabled") + return false, nil + } + processed, err = sq.processAttempt(ctx, r, confReader) if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) { // ConditionFailedErrors are an expected outcome for range split diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index ee5837bca5b1..46a167d9eaba 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -129,16 +129,17 @@ const ( // will best accomplish the store-level goals. type StoreRebalancer struct { log.AmbientContext - metrics StoreRebalancerMetrics - st *cluster.Settings - storeID roachpb.StoreID - allocator allocatorimpl.Allocator - storePool storepool.AllocatorStorePool - rr RangeRebalancer - replicaRankings *ReplicaRankings - getRaftStatusFn func(replica CandidateReplica) *raft.Status - processTimeoutFn func(replica CandidateReplica) time.Duration - objectiveProvider RebalanceObjectiveProvider + metrics StoreRebalancerMetrics + st *cluster.Settings + storeID roachpb.StoreID + allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool + rr RangeRebalancer + replicaRankings *ReplicaRankings + getRaftStatusFn func(replica CandidateReplica) *raft.Status + processTimeoutFn func(replica CandidateReplica) time.Duration + objectiveProvider RebalanceObjectiveProvider + subscribedToSpanConfigs func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -154,6 +155,7 @@ func NewStoreRebalancer( if rq.store.cfg.StorePool != nil { storePool = rq.store.cfg.StorePool } + sr := &StoreRebalancer{ AmbientContext: ambientCtx, metrics: makeStoreRebalancerMetrics(), @@ -170,6 +172,11 @@ func NewStoreRebalancer( return rq.processTimeoutFunc(st, replica.Repl()) }, objectiveProvider: objectiveProvider, + subscribedToSpanConfigs: func() bool { + // The store rebalancer makes use of span configs. Wait until we've + // established subscription. + return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -266,6 +273,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { if mode == LBRebalancingOff { continue } + if !sr.subscribedToSpanConfigs() { + continue + } hottestRanges := sr.replicaRankings.TopLoad() objective := sr.RebalanceObjective() options := sr.scorerOptions(ctx, objective.ToDimension())