Skip to content

Commit

Permalink
kvserver: trim down TestStoreInitAndBootstrap
Browse files Browse the repository at this point in the history
This test was duplicating parts of `createTestStoreWithoutStart` and it
was the one bogus hold-out user of `testSenderFactory`, which we now
get to remove.

Release note: None
  • Loading branch information
tbg committed Nov 4, 2021
1 parent 75029de commit 139cf34
Showing 1 changed file with 19 additions and 153 deletions.
172 changes: 19 additions & 153 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,94 +103,6 @@ func (s *Store) TestSender() kv.Sender {
})
}

// testSenderFactory is an implementation of the
// client.TxnSenderFactory interface.
type testSenderFactory struct {
store *Store
nonTxnSender *testSender
}

func (f *testSenderFactory) RootTransactionalSender(
txn *roachpb.Transaction, _ roachpb.UserPriority,
) kv.TxnSender {
return kv.NewMockTransactionalSender(
func(
ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f.store.Send(ctx, ba)
},
txn)
}

func (f *testSenderFactory) LeafTransactionalSender(tis *roachpb.LeafTxnInputState) kv.TxnSender {
return kv.NewMockTransactionalSender(
func(
ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f.store.Send(ctx, ba)
},
&tis.Txn)
}

func (f *testSenderFactory) NonTransactionalSender() kv.Sender {
if f.nonTxnSender != nil {
return f.nonTxnSender
}
f.nonTxnSender = &testSender{store: f.store}
return f.nonTxnSender
}

func (f *testSenderFactory) setStore(s *Store) {
f.store = s
if f.nonTxnSender != nil {
// monkey-patch an already created Sender, helping with test bootstrapping.
f.nonTxnSender.store = s
}
}

// testSender is an implementation of the client.TxnSender interface
// which passes all requests through to a single store.
type testSender struct {
store *Store
}

// Send forwards the call to the single store. This is a poor man's
// version of kv.TxnCoordSender, but it serves the purposes of
// supporting tests in this package. Transactions are not supported.
// Since kv/ depends on storage/, we can't get access to a
// TxnCoordSender from here.
// TODO(tschottdorf): {kv->storage}.LocalSender
func (db *testSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if et, ok := ba.GetArg(roachpb.EndTxn); ok {
return nil, roachpb.NewErrorf("%s method not supported", et.Method())
}
// Lookup range and direct request.
rs, err := keys.Range(ba.Requests)
if err != nil {
return nil, roachpb.NewError(err)
}
repl := db.store.LookupReplica(rs.Key)
if repl == nil || !repl.Desc().ContainsKeyRange(rs.Key, rs.EndKey) {
panic(fmt.Sprintf("didn't find right replica for key: %s", rs.Key))
}
ba.RangeID = repl.RangeID
repDesc, err := repl.GetReplicaDescriptor()
if err != nil {
return nil, roachpb.NewError(err)
}
ba.Replica = repDesc
br, pErr := db.store.Send(ctx, ba)
if br != nil && br.Error != nil {
panic(roachpb.ErrorUnexpectedlySet(db.store, br))
}
if pErr != nil {
return nil, pErr
}
return br, nil
}

// testStoreOpts affords control over aspects of store creation.
type testStoreOpts struct {
// If createSystemRanges is not set, the store will have a single range. If
Expand Down Expand Up @@ -498,77 +410,31 @@ func TestStoreInitAndBootstrap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We need a fixed clock to avoid LastUpdateNanos drifting on us.
cfg := TestStoreConfig(hlc.NewClock(func() int64 { return 123 }, time.Nanosecond))
stopper := stop.NewStopper()
ctx := context.Background()
stopper := stop.NewStopper()
cfg := TestStoreConfig(nil)
store := createTestStoreWithConfig(t, stopper, testStoreOpts{}, &cfg)
defer stopper.Stop(ctx)
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer)
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper)
{
store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
// Can't start as haven't bootstrapped.
if err := store.Start(ctx, stopper); err == nil {
t.Error("expected failure starting un-bootstrapped store")
}

require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
// Bootstrap with a fake ident.
if err := InitEngine(ctx, eng, testIdent); err != nil {
t.Fatalf("error bootstrapping store: %+v", err)
}

// Verify we can read the store ident after a flush.
if err := eng.Flush(); err != nil {
t.Fatal(err)
}
if _, err := ReadStoreIdent(ctx, eng); err != nil {
t.Fatalf("unable to read store ident: %+v", err)
}

// Bootstrap the system ranges.
var splits []roachpb.RKey
kvs, tableSplits := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
).GetInitialValues()
splits = config.StaticSplits()
splits = append(splits, tableSplits...)
sort.Slice(splits, func(i, j int) bool {
return splits[i].Less(splits[j])
})

if err := WriteInitialClusterData(
ctx, eng, kvs /* initialValues */, clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, cfg.Clock.PhysicalNow(), cfg.TestingKnobs,
); err != nil {
t.Errorf("failure to create first range: %+v", err)
}
if _, err := ReadStoreIdent(ctx, store.Engine()); err != nil {
t.Fatalf("unable to read store ident: %+v", err)
}

// Now, attempt to initialize a store with a now-bootstrapped range.
store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
if err := store.Start(ctx, stopper); err != nil {
t.Fatalf("failure initializing bootstrapped store: %+v", err)
}

for i := 1; i <= store.ReplicaCount(); i++ {
r, err := store.GetReplica(roachpb.RangeID(i))
if err != nil {
t.Fatalf("failure fetching range %d: %+v", i, err)
}
rs := r.GetMVCCStats()

store.VisitReplicas(func(repl *Replica) (more bool) {
// Stats should agree with recomputation. Hold raftMu to avoid
// background activity from creating discrepancies between engine
// and in-mem stats.
repl.raftMu.Lock()
defer repl.raftMu.Unlock()
memMS := repl.GetMVCCStats()
// Stats should agree with a recomputation.
now := r.store.Clock().Now()
if ms, err := rditer.ComputeStatsForRange(r.Desc(), eng, now.WallTime); err != nil {
t.Errorf("failure computing range's stats: %+v", err)
} else if ms != rs {
t.Errorf("expected range's stats to agree with recomputation: %s", pretty.Diff(ms, rs))
}
}
now := store.Clock().Now()
diskMS, err := rditer.ComputeStatsForRange(repl.Desc(), store.Engine(), now.WallTime)
require.NoError(t, err)
memMS.AgeTo(diskMS.LastUpdateNanos)
require.Equal(t, memMS, diskMS)
return true // more
})
}

// TestInitializeEngineErrors verifies bootstrap failure if engine
Expand Down

0 comments on commit 139cf34

Please sign in to comment.