diff --git a/pkg/kv/kvserver/client_spanconfigs_test.go b/pkg/kv/kvserver/client_spanconfigs_test.go index ef14f6557d6b..4c0e9d5e73c3 100644 --- a/pkg/kv/kvserver/client_spanconfigs_test.go +++ b/pkg/kv/kvserver/client_spanconfigs_test.go @@ -13,6 +13,7 @@ package kvserver_test import ( "context" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -42,7 +43,8 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) { cluster.MakeTestingClusterSettings(), nil, ) - mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore) + var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore) ctx := context.Background() @@ -106,7 +108,8 @@ func TestFallbackSpanConfigOverride(t *testing.T) { st := cluster.MakeTestingClusterSettings() spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil) - mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore) + var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore) ctx := context.Background() args := base.TestServerArgs{ @@ -152,14 +155,20 @@ func TestFallbackSpanConfigOverride(t *testing.T) { } type mockSpanConfigSubscriber struct { - callback func(ctx context.Context, config roachpb.Span) + callback func(ctx context.Context, config roachpb.Span) + lastUpdated time.Time spanconfig.Store } var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{} -func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber { - return &mockSpanConfigSubscriber{Store: store} +func newMockSpanConfigSubscriber( + lastUpdated time.Time, store spanconfig.Store, +) *mockSpanConfigSubscriber { + return &mockSpanConfigSubscriber{ + lastUpdated: lastUpdated, + Store: store, + } } func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { @@ -185,7 +194,7 @@ func (m *mockSpanConfigSubscriber) GetProtectionTimestamps( } func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp { - panic("unimplemented") + return hlc.Timestamp{WallTime: m.lastUpdated.UnixNano()} } func (m *mockSpanConfigSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) { diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 662e7276b854..4e834a2f7cad 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -93,7 +93,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue { queueConfig{ maxSize: defaultQueueMaxSize, needsLease: true, - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, successes: store.metrics.ConsistencyQueueSuccesses, failures: store.metrics.ConsistencyQueueFailures, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 8dfd824b0ccd..8726f17b6a23 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -36,6 +36,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting( true, ) +// ReplicateQueueEnabled is a setting that controls whether the replicate queue +// is enabled. +var ReplicateQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.replicate_queue.enabled", + "whether the replicate queue is enabled", + true, +) + +// SplitQueueEnabled is a setting that controls whether the split queue is +// enabled. +var SplitQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.split_queue.enabled", + "whether the split queue is enabled", + true, +) + +// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is +// enabled. +var MVCCGCQueueEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.mvcc_gc_queue.enabled", + "whether the MVCC GC queue is enabled", + true, +) + // CmdIDKey is a Raft command id. This will be logged unredacted - keep it random. type CmdIDKey string diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 8905d3393a89..c5c3e23207e6 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -116,7 +116,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue { // factor. processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate), needsLease: true, - needsSystemConfig: true, + needsSpanConfigs: true, acceptsUnsplitRanges: false, successes: store.metrics.MergeQueueSuccesses, failures: store.metrics.MergeQueueFailures, @@ -129,15 +129,6 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue { } func (mq *mergeQueue) enabled() bool { - if !mq.store.cfg.SpanConfigsDisabled { - if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { - // If we don't have any span configs available, enabling range merges would - // be extremely dangerous -- we could collapse everything into a single - // range. - return false - } - } - st := mq.store.ClusterSettings() return kvserverbase.MergeQueueEnabled.Get(&st.SV) } diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index 2d8af90ab5bf..6016e52e42dc 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -178,7 +179,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue { queueConfig{ maxSize: defaultQueueMaxSize, needsLease: true, - needsSystemConfig: true, + needsSpanConfigs: true, acceptsUnsplitRanges: false, processTimeoutFunc: func(st *cluster.Settings, _ replicaInQueue) time.Duration { timeout := mvccGCQueueTimeout @@ -232,6 +233,11 @@ func (r mvccGCQueueScore) String() string { humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction)) } +func (mgcq *mvccGCQueue) enabled() bool { + st := mgcq.store.ClusterSettings() + return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV) +} + // shouldQueue determines whether a replica should be queued for garbage // collection, and if so, at what priority. Returns true for shouldQ // in the event that the cumulative ages of GC'able bytes or extant @@ -239,6 +245,10 @@ func (r mvccGCQueueScore) String() string { func (mgcq *mvccGCQueue) shouldQueue( ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (bool, float64) { + if !mgcq.enabled() { + return false, 0 + } + // Consult the protected timestamp state to determine whether we can GC and // the timestamp which can be used to calculate the score. conf := repl.SpanConfig() @@ -672,6 +682,11 @@ func (r *replicaGCer) GC( func (mgcq *mvccGCQueue) process( ctx context.Context, repl *Replica, _ spanconfig.StoreReader, ) (processed bool, err error) { + if !mgcq.enabled() { + log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled") + return false, nil + } + // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. defer repl.MeasureReqCPUNanos(grunning.Time()) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 70b5cf2a9dbe..fbf6f2429ac3 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -322,13 +322,13 @@ type queueConfig struct { // (if not already initialized) when deciding whether to process this // replica. needsRaftInitialized bool - // needsSystemConfig controls whether this queue requires a valid copy of the - // system config to operate on a replica. Not all queues require it, and it's + // needsSpanConfigs controls whether this queue requires a valid copy of the + // span configs to operate on a replica. Not all queues require it, and it's // unsafe for certain queues to wait on it. For example, a raft snapshot may - // be needed in order to make it possible for the system config to become - // available (as observed in #16268), so the raft snapshot queue can't - // require the system config to already be available. - needsSystemConfig bool + // be needed in order to make it possible for the span config range to + // become available (as observed in #16268), so the raft snapshot queue + // can't require the span configs to already be available. + needsSpanConfigs bool // acceptsUnsplitRanges controls whether this queue can process ranges that // need to be split due to zone config settings. Ranges are checked before // calling queueImpl.shouldQueue and queueImpl.process. @@ -378,7 +378,7 @@ type queueConfig struct { // // Replicas are added asynchronously through `MaybeAddAsync` or `AddAsync`. // MaybeAddAsync checks the various requirements selected by the queue config -// (needsSystemConfig, needsLease, acceptsUnsplitRanges) as well as the +// (needsSpanConfigs, needsLease, acceptsUnsplitRanges) as well as the // queueImpl's `shouldQueue`. AddAsync does not check any of this and accept a // priority directly instead of getting it from `shouldQueue`. These methods run // with shared a maximum concurrency of `addOrMaybeAddSemSize`. If the maximum @@ -473,9 +473,9 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b ambient := store.cfg.AmbientCtx ambient.AddLogTag(name, nil) - if !cfg.acceptsUnsplitRanges && !cfg.needsSystemConfig { + if !cfg.acceptsUnsplitRanges && !cfg.needsSpanConfigs { log.Fatalf(ambient.AnnotateCtx(context.Background()), - "misconfigured queue: acceptsUnsplitRanges=false requires needsSystemConfig=true; got %+v", cfg) + "misconfigured queue: acceptsUnsplitRanges=false requires needsSpanConfigs=true; got %+v", cfg) } bq := baseQueue{ @@ -639,12 +639,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. ctx = repl.AnnotateCtx(ctx) // Load the system config if it's needed. var confReader spanconfig.StoreReader - if bq.needsSystemConfig { + if bq.needsSpanConfigs { var err error confReader, err = bq.store.GetConfReader(ctx) if err != nil { - if errors.Is(err, errSysCfgUnavailable) && log.V(1) { - log.Warningf(ctx, "unable to retrieve system config, skipping: %v", err) + if errors.Is(err, errSpanConfigsUnavailable) && log.V(1) { + log.Warningf(ctx, "unable to retrieve span configs, skipping: %v", err) } return } @@ -931,10 +931,10 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error { // Load the system config if it's needed. var confReader spanconfig.StoreReader - if bq.needsSystemConfig { + if bq.needsSpanConfigs { var err error confReader, err = bq.store.GetConfReader(ctx) - if errors.Is(err, errSysCfgUnavailable) { + if errors.Is(err, errSpanConfigsUnavailable) { if log.V(1) { log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err) } diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index d9ff9e3fdf8d..9375e2fdd17e 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -98,7 +98,7 @@ func (tq *testQueueImpl) updateChan() <-chan time.Time { func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue { if !cfg.acceptsUnsplitRanges { // Needed in order to pass the validation in newBaseQueue. - cfg.needsSystemConfig = true + cfg.needsSpanConfigs = true } cfg.successes = metric.NewCounter(metric.Metadata{Name: "processed"}) cfg.failures = metric.NewCounter(metric.Metadata{Name: "failures"}) @@ -579,7 +579,7 @@ func TestNeedsSystemConfig(t *testing.T) { { confReader, err := tc.store.GetConfReader(ctx) require.Nil(t, confReader) - require.True(t, errors.Is(err, errSysCfgUnavailable)) + require.True(t, errors.Is(err, errSpanConfigsUnavailable)) } r, err := tc.store.GetReplica(1) @@ -597,7 +597,7 @@ func TestNeedsSystemConfig(t *testing.T) { // bqNeedsSysCfg will not add the replica or process it without a system config. bqNeedsSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{ - needsSystemConfig: true, + needsSpanConfigs: true, acceptsUnsplitRanges: true, maxSize: 1, }) @@ -623,7 +623,7 @@ func TestNeedsSystemConfig(t *testing.T) { // Now check that a queue which doesn't require the system config can // successfully add and process a replica. bqNoSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{ - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, maxSize: 1, }) diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index ea16cfd68ac9..83f79cff0a82 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -177,7 +177,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue { maxSize: defaultQueueMaxSize, maxConcurrency: raftLogQueueConcurrency, needsLease: false, - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, successes: store.metrics.RaftLogQueueSuccesses, failures: store.metrics.RaftLogQueueFailures, diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index f8dcdeac6549..2c7b1952272d 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue { // leaseholder. Operating on a replica without holding the lease is the // reason Raft snapshots cannot be performed by the replicateQueue. needsLease: false, - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate), successes: store.metrics.RaftSnapshotQueueSuccesses, diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index 90bf2cbc405c..a276a5a2bdb9 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -99,7 +99,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue { maxSize: defaultQueueMaxSize, needsLease: false, needsRaftInitialized: true, - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, processDestroyedReplicas: true, successes: store.metrics.ReplicaGCQueueSuccesses, diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 9b4bd8de1923..a8dc4298bda9 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -358,9 +358,9 @@ func (r *Replica) updateRangeInfo(ctx context.Context, desc *roachpb.RangeDescri // to different zones. // Load the system config. confReader, err := r.store.GetConfReader(ctx) - if errors.Is(err, errSysCfgUnavailable) { - // This could be before the system config was ever gossiped, or it - // expired. Let the gossip callback set the info. + if errors.Is(err, errSpanConfigsUnavailable) { + // This could be before the span config subscription was ever + // established. log.Warningf(ctx, "unable to retrieve conf reader, cannot determine range MaxBytes") return nil } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 198d417eb86e..60cc9d8a6a47 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -567,7 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica queueConfig{ maxSize: defaultQueueMaxSize, needsLease: true, - needsSystemConfig: true, + needsSpanConfigs: true, acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit, // The processing of the replicate queue often needs to send snapshots // so we use the raftSnapshotQueueTimeoutFunc. This function sets a @@ -613,9 +614,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica return rq } +func (rq *replicateQueue) enabled() bool { + st := rq.store.ClusterSettings() + return kvserverbase.ReplicateQueueEnabled.Get(&st.SV) +} + func (rq *replicateQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (shouldQueue bool, priority float64) { + if !rq.enabled() { + return false, 0 + } + desc, conf := repl.DescAndSpanConfig() action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, conf, desc) @@ -695,6 +705,11 @@ func (rq *replicateQueue) shouldQueue( func (rq *replicateQueue) process( ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) { + if !rq.enabled() { + log.VEventf(ctx, 2, "skipping replication: queue has been disabled") + return false, nil + } + retryOpts := retry.Options{ InitialBackoff: 50 * time.Millisecond, MaxBackoff: 1 * time.Second, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 478e8868a72a..3165c6d2fb48 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -119,7 +119,7 @@ func newSplitQueue(store *Store, db *kv.DB) *splitQueue { maxSize: defaultQueueMaxSize, maxConcurrency: splitQueueConcurrency, needsLease: true, - needsSystemConfig: true, + needsSpanConfigs: true, acceptsUnsplitRanges: true, successes: store.metrics.SplitQueueSuccesses, failures: store.metrics.SplitQueueFailures, @@ -171,6 +171,11 @@ func shouldSplitRange( return shouldQ, priority } +func (sq *splitQueue) enabled() bool { + st := sq.store.ClusterSettings() + return kvserverbase.SplitQueueEnabled.Get(&st.SV) +} + // shouldQueue determines whether a range should be queued for // splitting. This is true if the range is intersected by a zone config // prefix or if the range's size in bytes exceeds the limit for the zone, @@ -178,6 +183,10 @@ func shouldSplitRange( func (sq *splitQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, confReader spanconfig.StoreReader, ) (shouldQ bool, priority float64) { + if !sq.enabled() { + return false, 0 + } + shouldQ, priority = shouldSplitRange(ctx, repl.Desc(), repl.GetMVCCStats(), repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader) @@ -203,6 +212,11 @@ var _ PurgatoryError = unsplittableRangeError{} func (sq *splitQueue) process( ctx context.Context, r *Replica, confReader spanconfig.StoreReader, ) (processed bool, err error) { + if !sq.enabled() { + log.VEventf(ctx, 2, "skipping split: queue has been disabled") + return false, nil + } + processed, err = sq.processAttempt(ctx, r, confReader) if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) { // ConditionFailedErrors are an expected outcome for range split diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 076591fb6bc2..d5cb3fdd5108 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2066,7 +2066,7 @@ func (s *Store) WaitForInit() { // GetConfReader exposes access to a configuration reader. func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, error) { if s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues { - return nil, errSysCfgUnavailable + return nil, errSpanConfigsUnavailable } if s.cfg.TestingKnobs.ConfReaderInterceptor != nil { return s.cfg.TestingKnobs.ConfReaderInterceptor(), nil @@ -2078,11 +2078,31 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro sysCfg := s.cfg.SystemConfigProvider.GetSystemConfig() if sysCfg == nil { - return nil, errSysCfgUnavailable + return nil, errSpanConfigsUnavailable } return sysCfg, nil } + if s.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { + // This code path is used in various internal queues. It's important to + // surface explicitly that we don't have any span configs instead of + // falling back to the statically configured one. + // - enabling range merges would be extremely dangerous -- we could + // collapse everything into a single range. + // - enabling the split queue would mean applying the statically + // configured range sizes in the fallback span config. For clusters + // configured with larger range sizes, this could lead to a burst of + // splitting post node-restart. + // - enabling the MVCC GC queue would mean applying the statically + // configured default GC TTL and ignoring any set protected + // timestamps. The latter is best-effort protection, but for clusters + // configured with GC TTL greater than the default, post node-restart + // it could lead to a burst of MVCC GC activity and AOST queries + // failing to find expected data. + // - enabling the replicate queue would mean replicating towards the + // statically defined 3x replication in the fallback span config. + return nil, errSpanConfigsUnavailable + } return s.cfg.SpanConfigSubscriber, nil } @@ -3281,9 +3301,6 @@ func (s *Store) AllocatorCheckRange( ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range", spanOptions...) confReader, err := s.GetConfReader(ctx) - if err == nil { - err = s.WaitForSpanConfigSubscription(ctx) - } if err != nil { log.Eventf(ctx, "span configs unavailable: %s", err) return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err diff --git a/pkg/kv/kvserver/store_gossip.go b/pkg/kv/kvserver/store_gossip.go index dfb3dbb8b3c5..4d33e385f5ec 100644 --- a/pkg/kv/kvserver/store_gossip.go +++ b/pkg/kv/kvserver/store_gossip.go @@ -156,7 +156,7 @@ func (s *Store) startGossip() { } } -var errSysCfgUnavailable = errors.New("system config not available in gossip") +var errSpanConfigsUnavailable = errors.New("span configs not available") // systemGossipUpdate is a callback for gossip updates to // the system config which affect range split boundaries. diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index ee5837bca5b1..48153d077f25 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -129,16 +129,17 @@ const ( // will best accomplish the store-level goals. type StoreRebalancer struct { log.AmbientContext - metrics StoreRebalancerMetrics - st *cluster.Settings - storeID roachpb.StoreID - allocator allocatorimpl.Allocator - storePool storepool.AllocatorStorePool - rr RangeRebalancer - replicaRankings *ReplicaRankings - getRaftStatusFn func(replica CandidateReplica) *raft.Status - processTimeoutFn func(replica CandidateReplica) time.Duration - objectiveProvider RebalanceObjectiveProvider + metrics StoreRebalancerMetrics + st *cluster.Settings + storeID roachpb.StoreID + allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool + rr RangeRebalancer + replicaRankings *ReplicaRankings + getRaftStatusFn func(replica CandidateReplica) *raft.Status + processTimeoutFn func(replica CandidateReplica) time.Duration + objectiveProvider RebalanceObjectiveProvider + subscribedToSpanConfigs func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -170,6 +171,11 @@ func NewStoreRebalancer( return rq.processTimeoutFunc(st, replica.Repl()) }, objectiveProvider: objectiveProvider, + subscribedToSpanConfigs: func() bool { + // The store rebalancer makes use of span configs. Wait until we've + // established subscription. + return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -266,6 +272,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { if mode == LBRebalancingOff { continue } + if !sr.subscribedToSpanConfigs() { + continue + } hottestRanges := sr.replicaRankings.TopLoad() objective := sr.RebalanceObjective() options := sr.scorerOptions(ctx, objective.ToDimension()) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 2dae3353a731..59c8ff61f17a 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3462,7 +3462,7 @@ func TestAllocatorCheckRangeUnconfigured(t *testing.T) { } else { // Expect error looking up spanConfig if we can't use the system config span, // as the spanconfig.KVSubscriber infrastructure is not initialized. - require.ErrorIs(t, err, errSysCfgUnavailable) + require.ErrorIs(t, err, errSpanConfigsUnavailable) require.Equal(t, allocatorimpl.AllocatorNoop, action) } }) diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index e4ca12eae0f0..1cdd878dc2d8 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -112,7 +112,7 @@ func newTimeSeriesMaintenanceQueue( queueConfig{ maxSize: defaultQueueMaxSize, needsLease: true, - needsSystemConfig: false, + needsSpanConfigs: false, acceptsUnsplitRanges: true, successes: store.metrics.TimeSeriesMaintenanceQueueSuccesses, failures: store.metrics.TimeSeriesMaintenanceQueueFailures, diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go index 12a401ac73e8..5736a1d335a2 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go @@ -67,12 +67,12 @@ func TestBlockedKVSubscriberDisablesMerges(t *testing.T) { }) { - trace, processErr, err := store.Enqueue( + _, processErr, err := store.Enqueue( ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ ) - require.NoError(t, err) require.NoError(t, processErr) - require.NoError(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) + require.Error(t, err) + require.True(t, testutils.IsError(err, `unable to retrieve conf reader`)) } close(blockSubscriberCh) @@ -89,6 +89,7 @@ func TestBlockedKVSubscriberDisablesMerges(t *testing.T) { ) require.NoError(t, err) require.NoError(t, processErr) + require.Error(t, testutils.MatchInOrder(trace.String(), `unable to retrieve conf reader`)) require.Error(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) } }