Skip to content

Commit

Permalink
Merge pull request #78190 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-78122

release-22.1: kvserver: disable merge queue until kvsubscriber has updated
  • Loading branch information
irfansharif authored Mar 22, 2022
2 parents 7b7ba41 + 5c99e1f commit ec0a78b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 1 deletion.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
}

func (mq *mergeQueue) enabled() bool {
if !mq.store.cfg.SpanConfigsDisabled {
if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling range merges would
// be extremely dangerous -- we could collapse everything into a single
// range.
return false
}
}

st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/merge_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func TestMergeQueueShouldQueue(t *testing.T) {
testCtx := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testCtx.Start(ctx, t, stopper)
tsc := TestStoreConfig(nil)
tsc.SpanConfigsDisabled = true
testCtx.StartWithStoreConfig(ctx, t, stopper, tsc)

mq := newMergeQueue(testCtx.store, testCtx.store.DB())
kvserverbase.MergeQueueEnabled.Override(ctx, &testCtx.store.ClusterSettings().SV, true)
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
name = "spanconfigkvsubscriber_test",
srcs = [
"datadriven_test.go",
"kvsubscriber_client_test.go",
"kvsubscriber_test.go",
"main_test.go",
"spanconfigdecoder_test.go",
Expand All @@ -48,6 +49,7 @@ go_test(
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
90 changes: 90 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigkvsubscriber_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestBlockedKVSubscriberDisablesMerges ensures that the merge queue is
// disabled until the KVSubscriber has some snapshot.
func TestBlockedKVSubscriberDisablesMerges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

blockSubscriberCh := make(chan struct{})
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{
PostRangeFeedStart: func() { <-blockSubscriberCh },
},
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
scKVSubscriber := ts.SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
require.True(t, scKVSubscriber.LastUpdated().IsEmpty())

store := tc.GetFirstStoreFromServer(t, 0)
scratchKey := tc.ScratchRange(t)
tc.SplitRangeOrFatal(t, scratchKey.Next())

var repl *kvserver.Replica
testutils.SucceedsSoon(t, func() error {
repl = store.LookupReplica(roachpb.RKey(scratchKey))
if repl == nil {
return errors.New(`could not find replica`)
}
return nil
})

{
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.NoError(t, processErr)
require.NoError(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`))
}

close(blockSubscriberCh)
testutils.SucceedsSoon(t, func() error {
if scKVSubscriber.LastUpdated().IsEmpty() {
return errors.New("expected non-empty update ts")
}
return nil
})

{
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.NoError(t, processErr)
require.Error(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`))
}
}

0 comments on commit ec0a78b

Please sign in to comment.