Skip to content

Commit

Permalink
kvserver: disable eager replicate queue on span cfg
Browse files Browse the repository at this point in the history
Replicas were enqueued into the replicate queue, upon the store
receiving a span config update which could affect the replica. The
replicate queue `shouldQueue` is relatively more expensive than other
queues.

Introduce the cluster setting
`kv.eager_replicate_enqueue_on_span_config_update.enabled`, which when
set to true, enables queuing up replicas on span config updates; when
set to false, disables queuing replicas on span config updates.

By default, this settings is set to false.

Resolves: cockroachdb#108724
Release note: None
  • Loading branch information
kvoli committed Aug 14, 2023
1 parent dc2c52d commit 703a9d5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/lease_preferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func runLeasePreferences(
// https://github.com/cockroachdb/cockroach/issues/105274
settings := install.MakeClusterSettings()
settings.ClusterSettings["server.span_stats.span_batch_limit"] = "4096"
settings.ClusterSettings["kv.eager_replicate_enqueue_on_span_config_update.enabled"] = "true"

startNodes := func(nodes ...int) {
for _, node := range nodes {
Expand Down
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func TestProtectedTimestamps(t *testing.T) {
_, err = conn.Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.eager_replicate_enqueue_on_span_config_update.enabled = true") // speeds up the test
require.NoError(t, err)

const tableRangeMaxBytes = 64 << 20
_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)
Expand Down Expand Up @@ -125,13 +128,9 @@ ORDER BY raw_start_key ASC LIMIT 1`)

getStoreAndReplica := func() (*kvserver.Store, *kvserver.Replica) {
startKey := getTableStartKey()
// Okay great now we have a key and can go find replicas and stores and what not.
r := tc.LookupRangeOrFatal(t, startKey)
l, _, err := tc.FindRangeLease(r, nil)
require.NoError(t, err)

lhServer := tc.Server(int(l.Replica.NodeID) - 1)
return getFirstStoreReplica(t, lhServer, startKey)
// There's only one server, so there's no point searching for which server
// the leaseholder is on, it could only be on s0.
return getFirstStoreReplica(t, s0, startKey)
}

waitForRangeMaxBytes := func(maxBytes int64) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// EagerReplicateEnqueueOnSpanConfigUpdateEnabled controls whether replicas are
// enqueued into the replicate queue, following a span config update which
// affects the replica.
var EagerReplicateEnqueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.eager_replicate_enqueue_on_span_config_update.enabled",
"controls whether replicas are enqueued into the replicate queue for "+
"processing, when a span config update occurs, which affects the replica",
false,
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,9 +2444,15 @@ func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) {
s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.replicateQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})

// The replicate queue has a relatively more expensive queue check
// (shouldQueue), because it scales with the number of stores, and
// performs more checks.
if EagerReplicateEnqueueOnSpanConfigUpdateEnabled.Get(&s.GetStoreConfig().Settings.SV) {
s.replicateQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
return nil // more
},
); err != nil {
Expand Down

0 comments on commit 703a9d5

Please sign in to comment.