Skip to content

Commit

Permalink
kvserver: disable {split,replicate,mvccGC} queues until...
Browse files Browse the repository at this point in the history
...subscribed to span configs. Do the same for the store
rebalancer. We applied this treatment for the merge queue back in #78122
since the fallback behavior, if not subscribed, is to use the statically
defined span config for every operation.

- For the replicate queue this mean obtusely applying a replication
  factor of 3, regardless of configuration. This was possible typically
  post node restart before subscription was initially established. We
  saw this in #98385. It was possible then for us to ignore configured
  voter/non-voter/lease constraints.
- For the split queue, we wouldn't actually compute any split keys if
  unsubscribed, so the missing check was somewhat benign. But we would
  be obtusely applying the default range sizes [128MiB,512MiB], so for
  clusters configured with larger range sizes, this could lead to a
  burst of splitting post node-restart.
- For the MVCC GC queue, it would mean applying the the statically
  configured default GC TTL and ignoring any set protected timestamps.
  The latter is best-effort protection but could result in internal
  operations relying on protection (like backups, changefeeds) failing
  informatively. For clusters configured with GC TTL greater than the
  default, post node-restart it could lead to a burst of MVCC GC
  activity and AOST queries failing to find expected data.
- For the store rebalancer, ignoring span configs could result in
  violating lease preferences and voter/non-voter constraints.

Fixes #98421.
Fixes #98385.

While here, we also introduce the following non-public cluster settings
to selectively enable/disable KV queues:
- kv.mvcc_gc_queue.enabled
- kv.split_queue.enabled
- kv.replicate_queue.enabled

Release note (bug fix): It was previously possible for CockroachDB to
not respect non-default zone configs. This could only happen for a short
window after nodes with existing replicas were restarted (measured in
seconds), and self-rectified (also within seconds). This manifested in a
few ways:
- If num_replicas was set to something other than 3, we would still
  add or remove replicas to get to 3x replication.
  - If num_voters was set explicitly to get a mix of voting and
    non-voting replicas, it would be ignored. CockroachDB could possibly
    remove non-voting replicas.
- If range_min_bytes or range_max_bytes were changed from 128 MiB and
  512 MiB respectively, we would instead try to size ranges to be within
  [128 MiB, 512MiB]. This could appear as an excess amount of range
  splits or merges, as visible in the Replication Dashboard under "Range
  Operations".
- If gc.ttlseconds was set to something other than 90000 seconds, we
  would still GC data only older than 90000s/25h. If the GC TTL was set
  to something larger than 25h, AOST queries going further back may now
  start failing. For GC TTLs less than the 25h default, clusters would
  observe increased disk usage due to more retained garbage.
- If constraints, lease_preferences or voter_constraints were set, they
  would be ignored. Range data and leases would possibly be moved
  outside where prescribed.
This issues lasted a few seconds post node-restarts, and any zone config
violations were rectified shortly after.
  • Loading branch information
irfansharif committed Mar 13, 2023
1 parent c705d32 commit 234bcd0
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 65 deletions.
21 changes: 15 additions & 6 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -42,7 +43,8 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
cluster.MakeTestingClusterSettings(),
nil,
)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

ctx := context.Background()

Expand Down Expand Up @@ -106,7 +108,8 @@ func TestFallbackSpanConfigOverride(t *testing.T) {

st := cluster.MakeTestingClusterSettings()
spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

ctx := context.Background()
args := base.TestServerArgs{
Expand Down Expand Up @@ -152,14 +155,20 @@ func TestFallbackSpanConfigOverride(t *testing.T) {
}

type mockSpanConfigSubscriber struct {
callback func(ctx context.Context, config roachpb.Span)
callback func(ctx context.Context, config roachpb.Span)
lastUpdated time.Time
spanconfig.Store
}

var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{}

func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{Store: store}
func newMockSpanConfigSubscriber(
lastUpdated time.Time, store spanconfig.Store,
) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{
lastUpdated: lastUpdated,
Store: store,
}
}

func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
Expand All @@ -185,7 +194,7 @@ func (m *mockSpanConfigSubscriber) GetProtectionTimestamps(
}

func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp {
panic("unimplemented")
return hlc.Timestamp{WallTime: m.lastUpdated.UnixNano()}
}

func (m *mockSpanConfigSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ConsistencyQueueSuccesses,
failures: store.metrics.ConsistencyQueueFailures,
Expand Down
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting(
true,
)

// ReplicateQueueEnabled is a setting that controls whether the replicate queue
// is enabled.
var ReplicateQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.replicate_queue.enabled",
"whether the replicate queue is enabled",
true,
)

// SplitQueueEnabled is a setting that controls whether the split queue is
// enabled.
var SplitQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.split_queue.enabled",
"whether the split queue is enabled",
true,
)

// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is
// enabled.
var MVCCGCQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.mvcc_gc_queue.enabled",
"whether the MVCC GC queue is enabled",
true,
)

// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string

Expand Down
11 changes: 1 addition & 10 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
successes: store.metrics.MergeQueueSuccesses,
failures: store.metrics.MergeQueueFailures,
Expand All @@ -129,15 +129,6 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
}

func (mq *mergeQueue) enabled() bool {
if !mq.store.cfg.SpanConfigsDisabled {
if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling range merges would
// be extremely dangerous -- we could collapse everything into a single
// range.
return false
}
}

st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -178,7 +179,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
processTimeoutFunc: func(st *cluster.Settings, _ replicaInQueue) time.Duration {
timeout := mvccGCQueueTimeout
Expand Down Expand Up @@ -232,13 +233,22 @@ func (r mvccGCQueueScore) String() string {
humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction))
}

func (mgcq *mvccGCQueue) enabled() bool {
st := mgcq.store.ClusterSettings()
return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a replica should be queued for garbage
// collection, and if so, at what priority. Returns true for shouldQ
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (mgcq *mvccGCQueue) shouldQueue(
ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (bool, float64) {
if !mgcq.enabled() {
return false, 0
}

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
conf := repl.SpanConfig()
Expand Down Expand Up @@ -672,6 +682,11 @@ func (r *replicaGCer) GC(
func (mgcq *mvccGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
if !mgcq.enabled() {
log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled")
return false, nil
}

// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
defer repl.MeasureReqCPUNanos(grunning.Time())
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,13 @@ type queueConfig struct {
// (if not already initialized) when deciding whether to process this
// replica.
needsRaftInitialized bool
// needsSystemConfig controls whether this queue requires a valid copy of the
// system config to operate on a replica. Not all queues require it, and it's
// needsSpanConfigs controls whether this queue requires a valid copy of the
// span configs to operate on a replica. Not all queues require it, and it's
// unsafe for certain queues to wait on it. For example, a raft snapshot may
// be needed in order to make it possible for the system config to become
// available (as observed in #16268), so the raft snapshot queue can't
// require the system config to already be available.
needsSystemConfig bool
// be needed in order to make it possible for the span config range to
// become available (as observed in #16268), so the raft snapshot queue
// can't require the span configs to already be available.
needsSpanConfigs bool
// acceptsUnsplitRanges controls whether this queue can process ranges that
// need to be split due to zone config settings. Ranges are checked before
// calling queueImpl.shouldQueue and queueImpl.process.
Expand Down Expand Up @@ -378,7 +378,7 @@ type queueConfig struct {
//
// Replicas are added asynchronously through `MaybeAddAsync` or `AddAsync`.
// MaybeAddAsync checks the various requirements selected by the queue config
// (needsSystemConfig, needsLease, acceptsUnsplitRanges) as well as the
// (needsSpanConfigs, needsLease, acceptsUnsplitRanges) as well as the
// queueImpl's `shouldQueue`. AddAsync does not check any of this and accept a
// priority directly instead of getting it from `shouldQueue`. These methods run
// with shared a maximum concurrency of `addOrMaybeAddSemSize`. If the maximum
Expand Down Expand Up @@ -473,9 +473,9 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
ambient := store.cfg.AmbientCtx
ambient.AddLogTag(name, nil)

if !cfg.acceptsUnsplitRanges && !cfg.needsSystemConfig {
if !cfg.acceptsUnsplitRanges && !cfg.needsSpanConfigs {
log.Fatalf(ambient.AnnotateCtx(context.Background()),
"misconfigured queue: acceptsUnsplitRanges=false requires needsSystemConfig=true; got %+v", cfg)
"misconfigured queue: acceptsUnsplitRanges=false requires needsSpanConfigs=true; got %+v", cfg)
}

bq := baseQueue{
Expand Down Expand Up @@ -639,12 +639,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
ctx = repl.AnnotateCtx(ctx)
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if err != nil {
if errors.Is(err, errSysCfgUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve system config, skipping: %v", err)
if errors.Is(err, errSpanConfigsUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve span configs, skipping: %v", err)
}
return
}
Expand Down Expand Up @@ -931,10 +931,10 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
if errors.Is(err, errSpanConfigsUnavailable) {
if log.V(1) {
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (tq *testQueueImpl) updateChan() <-chan time.Time {
func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue {
if !cfg.acceptsUnsplitRanges {
// Needed in order to pass the validation in newBaseQueue.
cfg.needsSystemConfig = true
cfg.needsSpanConfigs = true
}
cfg.successes = metric.NewCounter(metric.Metadata{Name: "processed"})
cfg.failures = metric.NewCounter(metric.Metadata{Name: "failures"})
Expand Down Expand Up @@ -579,7 +579,7 @@ func TestNeedsSystemConfig(t *testing.T) {
{
confReader, err := tc.store.GetConfReader(ctx)
require.Nil(t, confReader)
require.True(t, errors.Is(err, errSysCfgUnavailable))
require.True(t, errors.Is(err, errSpanConfigsUnavailable))
}

r, err := tc.store.GetReplica(1)
Expand All @@ -597,7 +597,7 @@ func TestNeedsSystemConfig(t *testing.T) {

// bqNeedsSysCfg will not add the replica or process it without a system config.
bqNeedsSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand All @@ -623,7 +623,7 @@ func TestNeedsSystemConfig(t *testing.T) {
// Now check that a queue which doesn't require the system config can
// successfully add and process a replica.
bqNoSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue {
maxSize: defaultQueueMaxSize,
maxConcurrency: raftLogQueueConcurrency,
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.RaftLogQueueSuccesses,
failures: store.metrics.RaftLogQueueFailures,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
// leaseholder. Operating on a replica without holding the lease is the
// reason Raft snapshots cannot be performed by the replicateQueue.
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue {
maxSize: defaultQueueMaxSize,
needsLease: false,
needsRaftInitialized: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processDestroyedReplicas: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ func (r *Replica) updateRangeInfo(ctx context.Context, desc *roachpb.RangeDescri
// to different zones.
// Load the system config.
confReader, err := r.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
// This could be before the system config was ever gossiped, or it
// expired. Let the gossip callback set the info.
if errors.Is(err, errSpanConfigsUnavailable) {
// This could be before the span config subscription was ever
// established.
log.Warningf(ctx, "unable to retrieve conf reader, cannot determine range MaxBytes")
return nil
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -567,7 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit,
// The processing of the replicate queue often needs to send snapshots
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
Expand Down Expand Up @@ -613,9 +614,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
return rq
}

func (rq *replicateQueue) enabled() bool {
st := rq.store.ClusterSettings()
return kvserverbase.ReplicateQueueEnabled.Get(&st.SV)
}

func (rq *replicateQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if !rq.enabled() {
return false, 0
}

desc, conf := repl.DescAndSpanConfig()
action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, conf, desc)

Expand Down Expand Up @@ -695,6 +705,11 @@ func (rq *replicateQueue) shouldQueue(
func (rq *replicateQueue) process(
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !rq.enabled() {
log.VEventf(ctx, 2, "skipping replication: queue has been disabled")
return false, nil
}

retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Expand Down
Loading

0 comments on commit 234bcd0

Please sign in to comment.