Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: avoid clobbering replica conf #75233

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ go_test(
"client_split_burst_test.go",
"client_split_test.go",
"client_status_test.go",
"client_store_test.go",
"client_tenant_test.go",
"client_test.go",
"closed_timestamp_test.go",
Expand Down
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,11 +1011,11 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// This test was written with the SystemConfigSpan in mind.
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the SystemConfigSpan in mind.
UseSystemConfigSpanForQueues: true,
},
},
})
Expand Down Expand Up @@ -1082,11 +1082,11 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// This test was written with the system config span in mind.
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the system config span in mind.
UseSystemConfigSpanForQueues: true,
},
},
})
Expand Down Expand Up @@ -3562,12 +3562,11 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the system config span in mind.
UseSystemConfigSpanForQueues: true,
TestingResponseFilter: respFilter,
DisableMergeQueue: true,
TestingResponseFilter: respFilter,
},
},
})
Expand Down
72 changes: 72 additions & 0 deletions pkg/kv/kvserver/client_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then
// sets the config zone to a custom max bytes value to verify the ranges' max
// bytes are updated appropriately.
func TestStoreSetRangesMaxBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const expMaxBytes, defaultMaxBytes = 420 << 20, 512 << 20
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: true,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)
store := tc.GetFirstStoreFromServer(t, 0)
tdb := sqlutils.MakeSQLRunner(tc.Conns[0])

tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up the test

testKey := tc.ScratchRange(t)
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(testKey))
if got := repl.GetMaxBytes(); got != defaultMaxBytes {
return errors.Errorf("range max bytes values did not start at %d; got %d", defaultMaxBytes, got)
}
return nil
})

tdb.Exec(t, `ALTER RANGE DEFAULT CONFIGURE ZONE USING range_max_bytes = $1`, expMaxBytes)

testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(testKey))
if got := repl.GetMaxBytes(); got != expMaxBytes {
return errors.Errorf("range max bytes values did not change to %d; got %d", expMaxBytes, got)
}
return nil
})
}
65 changes: 34 additions & 31 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,59 +245,62 @@ func TestReplicateQueueDownReplicate(t *testing.T) {
skip.UnderRace(t, "takes >1min under race")

ctx := context.Background()
const replicaCount = 3

// The goal of this test is to ensure that down replication occurs correctly
// using the replicate queue, and to ensure that's the case, the test
// cluster needs to be kept in auto replication mode.
tc := testcluster.StartTestCluster(t, replicaCount+2,
// The goal of this test is to ensure that down replication occurs
// correctly using the replicate queue, and to ensure that's the case,
// the test cluster needs to be kept in auto replication mode.
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
ScanMinIdleTime: 10 * time.Millisecond,
ScanMaxIdleTime: 10 * time.Millisecond,
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: true,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)

// Disable the replication queues so that the range we're about to create
// doesn't get down-replicated too soon.
tc.ToggleReplicateQueues(false)

testKey := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, testKey)
// At the end of StartTestCluster(), all ranges have 5 replicas since they're
// all "system ranges". When the ScratchRange() splits its range, it also
// starts up with 5 replicas. Since it's not a system range, its default zone
// config asks for 3x replication, and the replication queue will
// down-replicate it.
require.Len(t, desc.Replicas().Descriptors(), 5)
// Re-enable the replication queue.
tc.ToggleReplicateQueues(true)
testutils.SucceedsSoon(t, func() error {
desc := tc.LookupRangeOrFatal(t, testKey)
if got := len(desc.Replicas().Descriptors()); got != 3 {
return errors.Newf("expected 3 replicas for scratch range, found %d", got)
}
return nil
})

_, err := tc.ServerConn(0).Exec(
`ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`,
)
require.NoError(t, err)

for _, s := range tc.Servers {
require.NoError(t, s.Stores().VisitStores(func(s *kvserver.Store) error {
require.NoError(t, s.ForceReplicationScanAndProcess())
return nil
}))
}

// Now wait until the replicas have been down-replicated back to the
// desired number.
testutils.SucceedsSoon(t, func() error {
descriptor, err := tc.LookupRange(testKey)
if err != nil {
t.Fatal(err)
}
if len(descriptor.InternalReplicas) != replicaCount {
return errors.Errorf("replica count, want %d, current %d", replicaCount, len(desc.InternalReplicas))
desc := tc.LookupRangeOrFatal(t, testKey)
if got := len(desc.Replicas().Descriptors()); got != 1 {
return errors.Errorf("expected 1 replica, found %d", got)
}
return nil
})

desc := tc.LookupRangeOrFatal(t, testKey)
infos, err := filterRangeLog(
tc.Conns[0], desc.RangeID, kvserverpb.RangeLogEventType_remove_voter, kvserverpb.ReasonRangeOverReplicated,
)
if err != nil {
t.Fatal(err)
}
if len(infos) < 1 {
t.Fatalf("found no downreplication due to over-replication in the range logs")
}
require.NoError(t, err)
require.Truef(t, len(infos) >= 1, "found no down replication due to over-replication in the range logs")
}

func scanAndGetNumNonVoters(
Expand Down
10 changes: 7 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,9 +1950,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.onSpanConfigUpdate(ctx, update)
})

// When toggling between the system config span and the span configs
// infrastructure, we want to re-apply configs on all replicas from
// whatever the new source is.
// When toggling between the system config span and the span
// configs infrastructure, we want to re-apply configs on all
// replicas from whatever the new source is.
spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV)
if enabled {
Expand Down Expand Up @@ -2281,6 +2281,10 @@ func (s *Store) removeReplicaWithRangefeed(rangeID roachpb.RangeID) {
// systemGossipUpdate is a callback for gossip updates to
// the system config which affect range split boundaries.
func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
if !s.cfg.SpanConfigsDisabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
return // nothing to do
}

ctx := s.AnnotateCtx(context.Background())
s.computeInitialMetrics.Do(func() {
// Metrics depend in part on the system config. Compute them as soon as we
Expand Down
66 changes: 1 addition & 65 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -299,7 +298,7 @@ func TestIterateIDPrefixKeys(t *testing.T) {
stopper.AddCloser(eng)

seed := randutil.NewPseudoSeed()
//const seed = -1666367124291055473
// const seed = -1666367124291055473
t.Logf("seed is %d", seed)
rng := rand.New(rand.NewSource(seed))

Expand Down Expand Up @@ -1277,69 +1276,6 @@ func TestStoreReplicasByKey(t *testing.T) {
}
}

// TestStoreSetRangesMaxBytes creates a set of ranges via splitting
// and then sets the config zone to a custom max bytes value to
// verify the ranges' max bytes are updated appropriately.
func TestStoreSetRangesMaxBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableMergeQueue = true
store := createTestStoreWithConfig(ctx, t, stopper,
testStoreOpts{
// This test was written before test stores could start with more than one
// range and was not adapted.
createSystemRanges: false,
},
&cfg)

baseID := keys.TestingUserDescID(0)
testData := []struct {
repl *Replica
expMaxBytes int64
}{
{store.LookupReplica(roachpb.RKeyMin),
store.cfg.DefaultSpanConfig.RangeMaxBytes},
{splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID)), t),
1 << 20},
{splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+1)), t),
store.cfg.DefaultSpanConfig.RangeMaxBytes},
{splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+2)), t),
2 << 20},
}

// Set zone configs.
zoneA := zonepb.DefaultZoneConfig()
zoneA.RangeMaxBytes = proto.Int64(1 << 20)
config.TestingSetZoneConfig(config.SystemTenantObjectID(baseID), zoneA)

zoneB := zonepb.DefaultZoneConfig()
zoneB.RangeMaxBytes = proto.Int64(2 << 20)
config.TestingSetZoneConfig(config.SystemTenantObjectID(baseID+2), zoneB)

// Despite faking the zone configs, we still need to have a system config
// entry so that the store picks up the new zone configs. This new system
// config needs to be non-empty so that it differs from the initial value
// which triggers the system config callback to be run.
sysCfg := &config.SystemConfigEntries{}
sysCfg.Values = []roachpb.KeyValue{{Key: roachpb.Key("a")}}
if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, sysCfg, 0); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
for _, test := range testData {
if mb := test.repl.GetMaxBytes(); mb != test.expMaxBytes {
return errors.Errorf("range max bytes values did not change to %d; got %d", test.expMaxBytes, mb)
}
}
return nil
})
}

// TestStoreResolveWriteIntent adds a write intent and then verifies
// that a put returns success and aborts intent's txn in the event the
// pushee has lower priority. Otherwise, verifies that the put blocks
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ type StoreTestingKnobs struct {
// UseSystemConfigSpanForQueues uses the system config span infrastructure
// for internal queues (as opposed to the span configs infrastructure). This
// is used only for (old) tests written with the system config span in mind.
//
// TODO(irfansharif): Get rid of this knob, maybe by first moving
// DisableSpanConfigs into a testing knob instead of a server arg.
UseSystemConfigSpanForQueues bool
}

Expand Down