Skip to content

Commit

Permalink
kvserver: disable merge queue until kvsubscriber has updated
Browse files Browse the repository at this point in the history
If we don't have any span configs available, enabling range merges would
be extremely dangerous -- we could collapse everything into a single
range. We've observed this happen when the kvsubscriber's initial scan
overflows its bounded buffer, preventing it from ever getting a
snapshot. A future commit will fix the bounded memory issue, but the
side-effect pointed out the need for this important safe guard.

Informs #77687.

Release justification: bug fix
Release note: None
  • Loading branch information
irfansharif committed Mar 19, 2022
1 parent e0e6166 commit 5c99e1f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 1 deletion.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
}

func (mq *mergeQueue) enabled() bool {
if !mq.store.cfg.SpanConfigsDisabled {
if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling range merges would
// be extremely dangerous -- we could collapse everything into a single
// range.
return false
}
}

st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/merge_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func TestMergeQueueShouldQueue(t *testing.T) {
testCtx := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testCtx.Start(ctx, t, stopper)
tsc := TestStoreConfig(nil)
tsc.SpanConfigsDisabled = true
testCtx.StartWithStoreConfig(ctx, t, stopper, tsc)

mq := newMergeQueue(testCtx.store, testCtx.store.DB())
kvserverbase.MergeQueueEnabled.Override(ctx, &testCtx.store.ClusterSettings().SV, true)
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
name = "spanconfigkvsubscriber_test",
srcs = [
"datadriven_test.go",
"kvsubscriber_client_test.go",
"kvsubscriber_test.go",
"main_test.go",
"spanconfigdecoder_test.go",
Expand All @@ -48,6 +49,7 @@ go_test(
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
90 changes: 90 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigkvsubscriber_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestBlockedKVSubscriberDisablesMerges ensures that the merge queue is
// disabled until the KVSubscriber has some snapshot.
func TestBlockedKVSubscriberDisablesMerges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

blockSubscriberCh := make(chan struct{})
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{
PostRangeFeedStart: func() { <-blockSubscriberCh },
},
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
scKVSubscriber := ts.SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
require.True(t, scKVSubscriber.LastUpdated().IsEmpty())

store := tc.GetFirstStoreFromServer(t, 0)
scratchKey := tc.ScratchRange(t)
tc.SplitRangeOrFatal(t, scratchKey.Next())

var repl *kvserver.Replica
testutils.SucceedsSoon(t, func() error {
repl = store.LookupReplica(roachpb.RKey(scratchKey))
if repl == nil {
return errors.New(`could not find replica`)
}
return nil
})

{
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.NoError(t, processErr)
require.NoError(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`))
}

close(blockSubscriberCh)
testutils.SucceedsSoon(t, func() error {
if scKVSubscriber.LastUpdated().IsEmpty() {
return errors.New("expected non-empty update ts")
}
return nil
})

{
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.NoError(t, processErr)
require.Error(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`))
}
}

0 comments on commit 5c99e1f

Please sign in to comment.