diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 534253af8527..691a29eee142 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -125,6 +125,15 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue { } func (mq *mergeQueue) enabled() bool { + if !mq.store.cfg.SpanConfigsDisabled { + if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() { + // If we don't have any span configs available, enabling range merges would + // be extremely dangerous -- we could collapse everything into a single + // range. + return false + } + } + st := mq.store.ClusterSettings() return kvserverbase.MergeQueueEnabled.Get(&st.SV) } diff --git a/pkg/kv/kvserver/merge_queue_test.go b/pkg/kv/kvserver/merge_queue_test.go index 0a396e11cf83..3bd836d77805 100644 --- a/pkg/kv/kvserver/merge_queue_test.go +++ b/pkg/kv/kvserver/merge_queue_test.go @@ -36,7 +36,9 @@ func TestMergeQueueShouldQueue(t *testing.T) { testCtx := testContext{} stopper := stop.NewStopper() defer stopper.Stop(ctx) - testCtx.Start(ctx, t, stopper) + tsc := TestStoreConfig(nil) + tsc.SpanConfigsDisabled = true + testCtx.StartWithStoreConfig(ctx, t, stopper, tsc) mq := newMergeQueue(testCtx.store, testCtx.store.DB()) kvserverbase.MergeQueueEnabled.Override(ctx, &testCtx.store.ClusterSettings().SV, true) diff --git a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel index 5777efc65dc7..bccfc05b4caf 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel +++ b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel @@ -36,6 +36,7 @@ go_test( name = "spanconfigkvsubscriber_test", srcs = [ "datadriven_test.go", + "kvsubscriber_client_test.go", "kvsubscriber_test.go", "main_test.go", "spanconfigdecoder_test.go", @@ -48,6 +49,7 @@ go_test( "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", "//pkg/kv/kvclient/rangefeed/rangefeedcache", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go new file mode 100644 index 000000000000..caa8f08e2d38 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestBlockedKVSubscriberDisablesMerges ensures that the merge queue is +// disabled until the KVSubscriber has some snapshot. +func TestBlockedKVSubscriberDisablesMerges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + blockSubscriberCh := make(chan struct{}) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{ + PostRangeFeedStart: func() { <-blockSubscriberCh }, + }, + }, + }, + }, + }) + + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + scKVSubscriber := ts.SpanConfigKVSubscriber().(spanconfig.KVSubscriber) + require.True(t, scKVSubscriber.LastUpdated().IsEmpty()) + + store := tc.GetFirstStoreFromServer(t, 0) + scratchKey := tc.ScratchRange(t) + tc.SplitRangeOrFatal(t, scratchKey.Next()) + + var repl *kvserver.Replica + testutils.SucceedsSoon(t, func() error { + repl = store.LookupReplica(roachpb.RKey(scratchKey)) + if repl == nil { + return errors.New(`could not find replica`) + } + return nil + }) + + { + trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.NoError(t, processErr) + require.NoError(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) + } + + close(blockSubscriberCh) + testutils.SucceedsSoon(t, func() error { + if scKVSubscriber.LastUpdated().IsEmpty() { + return errors.New("expected non-empty update ts") + } + return nil + }) + + { + trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.NoError(t, processErr) + require.Error(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) + } +}