Skip to content

Commit

Permalink
kvserver: disable {split,replicate,mvccGC} queues until...
Browse files Browse the repository at this point in the history
...subscribed to span configs. Do the same for the store
rebalancer. We applied this treatment for the merge queue back in cockroachdb#78122
since the fallback behavior, if not subscribed, is to use the statically
defined span config for every operation.

- For the replicate queue this mean obtusely applying a replication
  factor of 3, regardless of configuration. This was possible typically
  post node restart before subscription was initially established. We
  saw this in cockroachdb#98385. It was possible then for us to ignore configured
  voter/non-voter/lease constraints.
- For the split queue, we wouldn't actually compute any split keys if
  unsubscribed, so the missing check was somewhat benign. But we would
  be obtusely applying the default range sizes [128MiB,512MiB], so for
  clusters configured with larger range sizes, this could lead to a
  burst of splitting post node-restart.
- For the MVCC GC queue, it would mean applying the the statically
  configured default GC TTL and ignoring any set protected timestamps.
  The latter is best-effort protection but could result in internal
  operations relying on protection (like backups, changefeeds) failing
  informatively. For clusters configured with GC TTL greater than the
  default, post node-restart it could lead to a burst of MVCC GC
  activity and AOST queries failing to find expected data.
- For the store rebalancer, ignoring span configs could result in
  violating lease preferences and voter constraints.

Fixes cockroachdb#98421.
Fixes cockroachdb#98385.

Release note (bug fix): It was previously possible for CockroachDB to
not respect non-default zone configs. This only happened for a short
window after nodes with existing replicas were restarted, and
self-rectified within seconds. This manifested in a few ways:
- If num_replicas was set to something other than 3, we would still
  add or remove replicas to get to 3x replication.
  - If num_voters was set explicitly to get a mix of voting and
    non-voting replicas, it would be ignored. CockroachDB could possibly
    remove non-voting replicas.
- If range_min_bytes or range_max_bytes were changed from 128 MiB and
  512 MiB respectively, we would instead try to size ranges to be within
  [128 MiB, 512MiB]. This could appear as an excess amount of range
  splits or merges, as visible in the Replication Dashboard under "Range
  Operations".
- If gc.ttlseconds was set to something other than 90000 seconds, we
  would still GC data only older than 90000s/25h. If the GC TTL was set
  to something larger than 25h, AOST queries going further back may now
  start failing. For GC TTLs less than the 25h default, clusters would
  observe increased disk usage due to more retained garbage.
- If constraints, lease_preferences or voter_constraints were set, they
  would be ignored. Range data and leases would possibly be moved
  outside where prescribed.
This issues only lasted a few seconds post node-restarts, and any zone
config violations were rectified shortly after.
  • Loading branch information
irfansharif committed Mar 11, 2023
1 parent c705d32 commit 3e341fd
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 10 deletions.
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting(
true,
)

// ReplicateQueueEnabled is a setting that controls whether the replicate queue
// is enabled.
var ReplicateQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.replicate.queue_enabled",
"whether the replicate queue is enabled",
true,
)

// SplitQueueEnabled is a setting that controls whether the split queue is
// enabled.
var SplitQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.split.queue_enabled",
"whether the split queue is enabled",
true,
)

// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is
// enabled.
var MVCCGCQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.mvcc_gc.queue_enabled",
"whether the MVCC GC queue is enabled",
true,
)

// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string

Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"math"
"math/rand"
"sync/atomic"
Expand Down Expand Up @@ -232,13 +233,35 @@ func (r mvccGCQueueScore) String() string {
humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction))
}

func (mgcq *mvccGCQueue) enabled() bool {
if !mgcq.store.cfg.SpanConfigsDisabled {
if mgcq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling the
// MVCC GC queue would mean applying the statically configured
// default GC TTL and ignoring any set protected timestamps. The
// latter is best-effort protection, but for clusters configured
// with GC TTL greater than the default, post node-restart it could
// lead to a burst of MVCC GC activity and AOST queries failing to
// find expected data.
return false
}
}

st := mgcq.store.ClusterSettings()
return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a replica should be queued for garbage
// collection, and if so, at what priority. Returns true for shouldQ
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (mgcq *mvccGCQueue) shouldQueue(
ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (bool, float64) {
if !mgcq.enabled() {
return false, 0
}

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
conf := repl.SpanConfig()
Expand Down Expand Up @@ -672,6 +695,11 @@ func (r *replicaGCer) GC(
func (mgcq *mvccGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
if !mgcq.enabled() {
log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled")
return false, nil
}

// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
defer repl.MeasureReqCPUNanos(grunning.Time())
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"sync/atomic"
"time"

Expand Down Expand Up @@ -613,9 +614,27 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
return rq
}

func (rq *replicateQueue) enabled() bool {
if !rq.store.cfg.SpanConfigsDisabled {
if rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling the
// replicate queue would mean replicating towards the statically
// defined 3x replication in the fallback span config.
return false
}
}

st := rq.store.ClusterSettings()
return kvserverbase.ReplicateQueueEnabled.Get(&st.SV)
}

func (rq *replicateQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if !rq.enabled() {
return false, 0
}

desc, conf := repl.DescAndSpanConfig()
action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, conf, desc)

Expand Down Expand Up @@ -695,6 +714,11 @@ func (rq *replicateQueue) shouldQueue(
func (rq *replicateQueue) process(
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !rq.enabled() {
log.VEventf(ctx, 2, "skipping replication: queue has been disabled")
return false, nil
}

retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,33 @@ func shouldSplitRange(
return shouldQ, priority
}

func (sq *splitQueue) enabled() bool {
if !sq.store.cfg.SpanConfigsDisabled {
if sq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling the
// split queue would mean applying the statically configured range
// sizes in the fallback span config. For clusters configured with
// larger range sizes, this could lead to a burst of splitting post
// node-restart.
return false
}
}

st := sq.store.ClusterSettings()
return kvserverbase.SplitQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a range should be queued for
// splitting. This is true if the range is intersected by a zone config
// prefix or if the range's size in bytes exceeds the limit for the zone,
// or if the range has too much load on it.
func (sq *splitQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, confReader spanconfig.StoreReader,
) (shouldQ bool, priority float64) {
if !sq.enabled() {
return false, 0
}

shouldQ, priority = shouldSplitRange(ctx, repl.Desc(), repl.GetMVCCStats(),
repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader)

Expand All @@ -203,6 +223,11 @@ var _ PurgatoryError = unsplittableRangeError{}
func (sq *splitQueue) process(
ctx context.Context, r *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !sq.enabled() {
log.VEventf(ctx, 2, "skipping split: queue has been disabled")
return false, nil
}

processed, err = sq.processAttempt(ctx, r, confReader)
if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) {
// ConditionFailedErrors are an expected outcome for range split
Expand Down
30 changes: 20 additions & 10 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,17 @@ const (
// will best accomplish the store-level goals.
type StoreRebalancer struct {
log.AmbientContext
metrics StoreRebalancerMetrics
st *cluster.Settings
storeID roachpb.StoreID
allocator allocatorimpl.Allocator
storePool storepool.AllocatorStorePool
rr RangeRebalancer
replicaRankings *ReplicaRankings
getRaftStatusFn func(replica CandidateReplica) *raft.Status
processTimeoutFn func(replica CandidateReplica) time.Duration
objectiveProvider RebalanceObjectiveProvider
metrics StoreRebalancerMetrics
st *cluster.Settings
storeID roachpb.StoreID
allocator allocatorimpl.Allocator
storePool storepool.AllocatorStorePool
rr RangeRebalancer
replicaRankings *ReplicaRankings
getRaftStatusFn func(replica CandidateReplica) *raft.Status
processTimeoutFn func(replica CandidateReplica) time.Duration
objectiveProvider RebalanceObjectiveProvider
subscribedToSpanConfigs func() bool
}

// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the
Expand All @@ -154,6 +155,7 @@ func NewStoreRebalancer(
if rq.store.cfg.StorePool != nil {
storePool = rq.store.cfg.StorePool
}

sr := &StoreRebalancer{
AmbientContext: ambientCtx,
metrics: makeStoreRebalancerMetrics(),
Expand All @@ -170,6 +172,11 @@ func NewStoreRebalancer(
return rq.processTimeoutFunc(st, replica.Repl())
},
objectiveProvider: objectiveProvider,
subscribedToSpanConfigs: func() bool {
// The store rebalancer makes use of span configs. Wait until we've
// established subscription.
return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty()
},
}
sr.AddLogTag("store-rebalancer", nil)
rq.store.metrics.registry.AddMetricStruct(&sr.metrics)
Expand Down Expand Up @@ -266,6 +273,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) {
if mode == LBRebalancingOff {
continue
}
if !sr.subscribedToSpanConfigs() {
continue
}
hottestRanges := sr.replicaRankings.TopLoad()
objective := sr.RebalanceObjective()
options := sr.scorerOptions(ctx, objective.ToDimension())
Expand Down

0 comments on commit 3e341fd

Please sign in to comment.