Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: Replace createTestStore with TestServer #61644

Merged
merged 1 commit into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -3250,9 +3249,19 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) {
func TestStoreSplitDisappearingReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
store, _ := createTestStore(t, stopper)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
go kvserver.WatchForDisappearingReplicas(t, store)
for i := 0; i < 100; i++ {
key := roachpb.Key(fmt.Sprintf("a%d", i))
Expand Down
209 changes: 0 additions & 209 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,229 +21,20 @@ package kvserver_test
import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)

// createTestStore creates a test store using an in-memory
// engine.
func createTestStore(t testing.TB, stopper *stop.Stopper) (*kvserver.Store, *hlc.ManualClock) {
manual := hlc.NewManualClock(123)
cfg := kvserver.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
store := createTestStoreWithOpts(t, testStoreOpts{cfg: &cfg}, stopper)
return store, manual
}

// DEPRECATED. Use createTestStoreWithOpts().
func createTestStoreWithConfig(
t testing.TB, stopper *stop.Stopper, storeCfg kvserver.StoreConfig,
) *kvserver.Store {
store := createTestStoreWithOpts(t,
testStoreOpts{
cfg: &storeCfg,
},
stopper,
)
return store
}

// testStoreOpts affords control over aspects of store creation.
type testStoreOpts struct {
// dontBootstrap, if set, means that the engine will not be bootstrapped.
dontBootstrap bool
// dontCreateSystemRanges is relevant only if dontBootstrap is not set.
// If set, the store will have a single range. If not set, the store will have
// all the system ranges that are generally created for a cluster at boostrap.
dontCreateSystemRanges bool

cfg *kvserver.StoreConfig
eng storage.Engine
}

// createTestStoreWithOpts creates a test store using the given engine and clock.
// TestStoreConfig() can be used for creating a config suitable for most
// tests.
func createTestStoreWithOpts(
t testing.TB, opts testStoreOpts, stopper *stop.Stopper,
) *kvserver.Store {
var storeCfg kvserver.StoreConfig
if opts.cfg == nil {
manual := hlc.NewManualClock(123)
storeCfg = kvserver.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
} else {
storeCfg = *opts.cfg
}
eng := opts.eng
if eng == nil {
eng = storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
}

tracer := storeCfg.Settings.Tracer
ac := log.AmbientContext{Tracer: tracer}
storeCfg.AmbientCtx = ac

rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: ac,
Config: &base.Config{Insecure: true},
Clock: storeCfg.Clock,
Stopper: stopper,
Settings: storeCfg.Settings,
})
// Ensure that tests using this test context and restart/shut down
// their servers do not inadvertently start talking to servers from
// unrelated concurrent tests.
rpcContext.ClusterID.Set(context.Background(), uuid.MakeV4())
nodeDesc := &roachpb.NodeDescriptor{
NodeID: 1,
Address: util.MakeUnresolvedAddr("tcp", "invalid.invalid:26257"),
}
server := rpc.NewServer(rpcContext) // never started
storeCfg.Gossip = gossip.NewTest(
nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(), storeCfg.DefaultZoneConfig,
)
storeCfg.ScanMaxIdleTime = 1 * time.Second
stores := kvserver.NewStores(ac, storeCfg.Clock)

if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil {
t.Fatal(err)
}

retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()
distSender := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: ac,
Settings: storeCfg.Settings,
Clock: storeCfg.Clock,
NodeDescs: storeCfg.Gossip,
RPCContext: rpcContext,
RPCRetryOptions: &retryOpts,
FirstRangeProvider: storeCfg.Gossip,
TestingKnobs: kvcoord.ClientTestingKnobs{
TransportFactory: kvcoord.SenderTransportFactory(tracer, stores),
},
})

tcsFactory := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ac,
Settings: storeCfg.Settings,
Clock: storeCfg.Clock,
Stopper: stopper,
},
distSender,
)
storeCfg.DB = kv.NewDB(ac, tcsFactory, storeCfg.Clock, stopper)
storeCfg.StorePool = kvserver.NewTestStorePool(storeCfg)
storeCfg.Transport = kvserver.NewDummyRaftTransport(storeCfg.Settings)
// TODO(bdarnell): arrange to have the transport closed.
ctx := context.Background()
if !opts.dontBootstrap {
require.NoError(t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
if err := kvserver.InitEngine(
ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
); err != nil {
t.Fatal(err)
}
}
store := kvserver.NewStore(ctx, storeCfg, eng, nodeDesc)
if !opts.dontBootstrap {
var kvs []roachpb.KeyValue
var splits []roachpb.RKey
kvs, tableSplits := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, storeCfg.DefaultZoneConfig, storeCfg.DefaultSystemZoneConfig,
).GetInitialValues()
if !opts.dontCreateSystemRanges {
splits = config.StaticSplits()
splits = append(splits, tableSplits...)
sort.Slice(splits, func(i, j int) bool {
return splits[i].Less(splits[j])
})
}
err := kvserver.WriteInitialClusterData(
ctx,
eng,
kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, storeCfg.Clock.PhysicalNow(),
storeCfg.TestingKnobs,
)
if err != nil {
t.Fatal(err)
}
}
if err := store.Start(ctx, stopper); err != nil {
t.Fatal(err)
}
stores.AddStore(store)

// Connect to gossip and gossip the store's capacity.
<-store.Gossip().Connected
if err := store.GossipStore(ctx, false /* useCached */); err != nil {
t.Fatal(err)
}
// Wait for the store's single range to have quorum before proceeding.
repl := store.LookupReplica(roachpb.RKeyMin)

// Send a request through the range to make sure everything is warmed up
// and works.
// NB: it's unclear if this code is necessary.
var ba roachpb.BatchRequest
get := roachpb.GetRequest{}
get.Key = keys.LocalMax
ba.Header.Replica = repl.Desc().Replicas().VoterDescriptors()[0]
ba.Header.RangeID = repl.RangeID
ba.Add(&get)
_, pErr := store.Send(ctx, ba)
require.NoError(t, pErr.GoError())

// Wait for the system config to be available in gossip. All sorts of things
// might not work properly while the system config is not available.
testutils.SucceedsSoon(t, func() error {
if cfg := store.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip yet")
}
return nil
})

// Make all the initial ranges part of replication queue purgatory. This is
// similar to what a real cluster does after bootstrap - we want the initial
// ranges to up-replicate as soon as other nodes join.
if err := store.ForceReplicationScanAndProcess(); err != nil {
t.Fatal(err)
}

return store
}

// getArgs returns a GetRequest and GetResponse pair addressed to
// the default replica for the specified key.
func getArgs(key roachpb.Key) *roachpb.GetRequest {
Expand Down
19 changes: 13 additions & 6 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -340,17 +341,23 @@ func TestReplicaRangefeedExpiringLeaseError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
store, _ := createTestStore(t, stopper)
ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)

_, rdesc, err := s.ScratchRangeWithExpirationLeaseEx()
require.NoError(t, err)

// Establish a rangefeed on the replica we plan to remove.
stream := newTestStream()
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: store.LookupReplica(roachpb.RKey("a")).RangeID,
RangeID: store.LookupReplica(rdesc.StartKey).RangeID,
},
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")},
Span: roachpb.Span{Key: rdesc.StartKey.AsRawKey(), EndKey: rdesc.EndKey.AsRawKey()},
}

// Cancel the stream's context so that RangeFeed would return
Expand Down
30 changes: 19 additions & 11 deletions pkg/kv/kvserver/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,37 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestEagerReplication(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
storeCfg := kvserver.TestStoreConfig(nil /* clock */)
// Disable the replica scanner so that we rely on the eager replication code
// path that occurs after splits.
storeCfg.TestingKnobs.DisableScanner = true

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store := createTestStoreWithConfig(t, stopper, storeCfg)
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable the replica scanner so that we rely on the eager replication code
// path that occurs after splits.
DisableScanner: true,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// After bootstrap, all of the system ranges should be present in replicate
// queue purgatory (because we only have a single store in the test and thus
Expand All @@ -55,10 +63,10 @@ func TestEagerReplication(t *testing.T) {

// The addition of replicas to the replicateQueue after a split
// occurs happens after the update of the descriptors in meta2
// leaving a tiny window of time in which the newly split replica
// leaving a tiny window of time in which the newly split replicas
// will not have been added to purgatory. Thus we loop.
testutils.SucceedsSoon(t, func() error {
expected := purgatoryStartCount + 1
expected := purgatoryStartCount + 2
if n := store.ReplicateQueuePurgatoryLength(); expected != n {
return errors.Errorf("expected %d replicas in purgatory, but found %d", expected, n)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,10 +938,13 @@ func NewStore(
s.scanner.AddQueues(
s.gcQueue, s.mergeQueue, s.splitQueue, s.replicateQueue, s.replicaGCQueue,
s.raftLogQueue, s.raftSnapshotQueue, s.consistencyQueue)

if s.cfg.TimeSeriesDataStore != nil {
tsDS := s.cfg.TimeSeriesDataStore
if s.cfg.TestingKnobs.TimeSeriesDataStore != nil {
tsDS = s.cfg.TestingKnobs.TimeSeriesDataStore
}
if tsDS != nil {
s.tsMaintenanceQueue = newTimeSeriesMaintenanceQueue(
s, s.db, s.cfg.Gossip, s.cfg.TimeSeriesDataStore,
s, s.db, s.cfg.Gossip, tsDS,
)
s.scanner.AddQueues(s.tsMaintenanceQueue)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ type StoreTestingKnobs struct {
// gossiped store capacity values which need be exceeded before the store will
// gossip immediately without waiting for the periodic gossip interval.
GossipWhenCapacityDeltaExceedsFraction float64
// TimeSeriesDataStore is an interface used by the store's time series
// maintenance queue to dispatch individual maintenance tasks.
TimeSeriesDataStore TimeSeriesDataStore
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
Loading