diff --git a/pkg/kv/kvserver/gc_queue_test.go b/pkg/kv/kvserver/gc_queue_test.go index 3f67252a0e14..68f41b2514c8 100644 --- a/pkg/kv/kvserver/gc_queue_test.go +++ b/pkg/kv/kvserver/gc_queue_test.go @@ -479,17 +479,23 @@ func TestGCQueueProcess(t *testing.T) { ts3 := makeTS(now-intentAgeThreshold.Nanoseconds(), 0) // 2h old ts4 := makeTS(now-(intentAgeThreshold.Nanoseconds()-1), 0) // 2h-1ns old ts5 := makeTS(now-1e9, 0) // 1s old - key1 := roachpb.Key("a") - key2 := roachpb.Key("b") - key3 := roachpb.Key("c") - key4 := roachpb.Key("d") - key5 := roachpb.Key("e") - key6 := roachpb.Key("f") - key7 := roachpb.Key("g") - key8 := roachpb.Key("h") - key9 := roachpb.Key("i") - key10 := roachpb.Key("j") - key11 := roachpb.Key("k") + mkKey := func(suff string) roachpb.Key { + var k roachpb.Key + k = append(k, keys.ScratchRangeMin...) + k = append(k, suff...) + return k + } + key1 := mkKey("a") + key2 := mkKey("b") + key3 := mkKey("c") + key4 := mkKey("d") + key5 := mkKey("e") + key6 := mkKey("f") + key7 := mkKey("g") + key8 := mkKey("h") + key9 := mkKey("i") + key10 := mkKey("j") + key11 := mkKey("k") data := []struct { key roachpb.Key @@ -582,16 +588,16 @@ func TestGCQueueProcess(t *testing.T) { } // The total size of the GC'able versions of the keys and values in Info. - // Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes. + // Key size: len(scratch+"a") + MVCCVersionTimestampSize (13 bytes) = 15 bytes. // Value size: len("value") + headerSize (5 bytes) = 10 bytes. - // key1 at ts1 (14 bytes) => "value" (10 bytes) - // key2 at ts1 (14 bytes) => "value" (10 bytes) - // key3 at ts1 (14 bytes) => "value" (10 bytes) - // key4 at ts1 (14 bytes) => "value" (10 bytes) - // key5 at ts1 (14 bytes) => "value" (10 bytes) - // key5 at ts2 (14 bytes) => delete (0 bytes) - // key10 at ts1 (14 bytes) => delete (0 bytes) - var expectedVersionsKeyBytes int64 = 7 * 14 + // key1 at ts1 (15 bytes) => "value" (10 bytes) + // key2 at ts1 (15 bytes) => "value" (10 bytes) + // key3 at ts1 (15 bytes) => "value" (10 bytes) + // key4 at ts1 (15 bytes) => "value" (10 bytes) + // key5 at ts1 (15 bytes) => "value" (10 bytes) + // key5 at ts2 (15 bytes) => delete (0 bytes) + // key10 at ts1 (15 bytes) => delete (0 bytes) + var expectedVersionsKeyBytes int64 = 7 * 15 var expectedVersionsValBytes int64 = 5 * 10 // Call Run with dummy functions to get current Info. @@ -670,7 +676,7 @@ func TestGCQueueProcess(t *testing.T) { return err } for i, kv := range kvs { - log.VEventf(ctx, 1, "%d: %s", i, kv.Key) + t.Logf("%d: %s", i, kv.Key) } if len(kvs) != len(expKVs) { return fmt.Errorf("expected length %d; got %d", len(expKVs), len(kvs)) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 6646c56618f5..6207c11d01a3 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -28,9 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/exit" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -48,8 +46,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -59,7 +55,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -180,93 +175,38 @@ func (tc *testContext) StartWithStoreConfigAndVersion( ) { tc.TB = t ctx := context.Background() - // Setup fake zone config handler. - config.TestingSetupZoneConfigHook(stopper) - rpcContext := rpc.NewContext(rpc.ContextOptions{ - TenantID: roachpb.SystemTenantID, - AmbientCtx: cfg.AmbientCtx, - Config: &base.Config{Insecure: true}, - Clock: cfg.Clock, - Stopper: stopper, - Settings: cfg.Settings, - }) - grpcServer := rpc.NewServer(rpcContext) // never started require.Nil(t, tc.gossip) - tc.gossip = gossip.NewTest(1, rpcContext, grpcServer, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) require.Nil(t, tc.transport) - dialer := nodedialer.New(rpcContext, gossip.AddressResolver(tc.gossip)) - tc.transport = NewRaftTransport(cfg.AmbientCtx, cfg.Settings, dialer, grpcServer, stopper) - require.Nil(t, tc.engine) - disableSeparatedIntents := - !cfg.Settings.Version.ActiveVersionOrEmpty(context.Background()).IsActive( - clusterversion.PostSeparatedIntentsMigration) - log.Infof(context.Background(), "engine creation is randomly setting disableSeparatedIntents: %t", - disableSeparatedIntents) - - var err error - tc.engine, err = storage.Open(context.Background(), - storage.InMemory(), - storage.Attributes(roachpb.Attributes{Attrs: []string{"dc1", "mem"}}), - storage.MaxSize(1<<20), - storage.SetSeparatedIntents(disableSeparatedIntents), - storage.Settings(cfg.Settings)) - require.NoError(t, err) - stopper.AddCloser(tc.engine) - require.Nil(t, tc.store) - cv := clusterversion.ClusterVersion{Version: bootstrapVersion} - cfg.Gossip = tc.gossip - cfg.Transport = tc.transport - cfg.StorePool = NewTestStorePool(cfg) - // Create a test sender without setting a store. This is to deal with the - // circular dependency between the test sender and the store. The actual - // store will be passed to the sender after it is created and bootstrapped. - factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) - - require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv)) - if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{ - ClusterID: uuid.MakeV4(), - NodeID: 1, - StoreID: 1, - }); err != nil { - t.Fatal(err) - } - if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil { - t.Fatal(err) - } - tc.store = NewStore(ctx, cfg, tc.engine, &roachpb.NodeDescriptor{NodeID: 1}) - // Now that we have our actual store, monkey patch the factory used in cfg.DB. - factory.setStore(tc.store) - // We created the store without a real KV client, so it can't perform splits - // or merges. - tc.store.splitQueue.SetDisabled(true) - tc.store.mergeQueue.SetDisabled(true) - require.Nil(t, tc.repl) - if err := WriteInitialClusterData( - ctx, tc.store.Engine(), - nil, /* initialValues */ - bootstrapVersion, - 1 /* numStores */, nil /* splits */, cfg.Clock.PhysicalNow(), - cfg.TestingKnobs, - ); err != nil { - t.Fatal(err) - } - if err := tc.store.Start(ctx, stopper); err != nil { - t.Fatal(err) - } - tc.store.WaitForInit() - tc.repl, err = tc.store.GetReplica(1) - if err != nil { - t.Fatal(err) - } - tc.rangeID = tc.repl.RangeID - if err := tc.initConfigs(t); err != nil { + // NB: this also sets up fake zone config handlers via TestingSetupZoneConfigHook. + // + // TODO(tbg): the above is not good, figure out which tests need this and make them + // call it directly. + // + // NB: split queue, merge queue, and scanner are also disabled. + store := createTestStoreWithoutStart( + t, stopper, testStoreOpts{ + createSystemRanges: false, + bootstrapVersion: bootstrapVersion, + }, &cfg, + ) + if err := store.Start(ctx, stopper); err != nil { t.Fatal(err) } + store.WaitForInit() + repl, err := store.GetReplica(1) + require.NoError(t, err) + tc.repl = repl + tc.rangeID = repl.RangeID + tc.gossip = store.cfg.Gossip + tc.transport = store.cfg.Transport + tc.engine = store.engine + tc.store = store + // TODO(tbg): see if this is needed. Would like to remove it. + require.NoError(t, tc.initConfigs(t)) } func (tc *testContext) Sender() kv.Sender { @@ -1223,6 +1163,12 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { tc.manualClock.Increment(11 + int64(tc.Clock().MaxOffset())) // advance time now = tc.Clock().NowAsClockTimestamp() + ch := tc.gossip.RegisterSystemConfigChannel() + select { + case <-ch: + default: + } + // Give lease to this range. if err := sendLeaseRequest(tc.repl, &roachpb.Lease{ Start: now.ToTimestamp().Add(11, 0).UnsafeToClockTimestamp(), @@ -1237,16 +1183,19 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - cfg := tc.gossip.GetSystemConfig() - if cfg == nil { - return errors.Errorf("expected system config to be set") + sysCfg := tc.gossip.GetSystemConfig() + if sysCfg == nil { + return errors.Errorf("no system config yet") } - numValues := len(cfg.Values) - if numValues != 1 { - return errors.Errorf("num config values != 1; got %d", numValues) + var found bool + for _, cur := range sysCfg.Values { + if key.Equal(cur.Key) { + found = true + break + } } - if k := cfg.Values[numValues-1].Key; !k.Equal(key) { - return errors.Errorf("invalid key for config value (%q != %q)", k, key) + if !found { + return errors.Errorf("key %s not found in SystemConfig", key) } return nil }) @@ -1548,112 +1497,6 @@ func TestReplicaGossipAllConfigs(t *testing.T) { } } -// TestReplicaNoGossipConfig verifies that certain commands (e.g., -// reads, writes in uncommitted transactions) do not trigger gossip. -func TestReplicaNoGossipConfig(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - tc.Start(t, stopper) - - // Write some arbitrary data in the system span (up to, but not including MaxReservedID+1) - key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID) - - txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock()) - h := roachpb.Header{Txn: txn} - req1 := putArgs(key, []byte("foo")) - req2, _ := endTxnArgs(txn, true /* commit */) - req2.LockSpans = []roachpb.Span{{Key: key}} - req3 := getArgs(key) - - testCases := []struct { - req roachpb.Request - h roachpb.Header - }{ - {&req1, h}, - {&req2, h}, - {&req3, roachpb.Header{}}, - } - - for i, test := range testCases { - assignSeqNumsForReqs(txn, test.req) - if _, pErr := kv.SendWrappedWith(context.Background(), tc.Sender(), test.h, test.req); pErr != nil { - t.Fatal(pErr) - } - - // System config is not gossiped. - cfg := tc.gossip.GetSystemConfig() - if cfg == nil { - t.Fatal("config not set") - } - if len(cfg.Values) != 0 { - t.Errorf("System config was gossiped at #%d", i) - } - } -} - -// TestReplicaNoGossipFromNonLeader verifies that a non-lease holder replica -// does not gossip configurations. -func TestReplicaNoGossipFromNonLeader(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(t, stopper) - - // Write some arbitrary data in the system span (up to, but not including MaxReservedID+1) - key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID) - - txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock()) - req1 := putArgs(key, nil) - - assignSeqNumsForReqs(txn, &req1) - if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{ - Txn: txn, - }, &req1); pErr != nil { - t.Fatal(pErr) - } - - req2, h := endTxnArgs(txn, true /* commit */) - req2.LockSpans = []roachpb.Span{{Key: key}} - assignSeqNumsForReqs(txn, &req2) - if _, pErr := tc.SendWrappedWith(h, &req2); pErr != nil { - t.Fatal(pErr) - } - // Execute a get to resolve the intent. - req3 := getArgs(key) - if _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: txn.WriteTimestamp}, &req3); pErr != nil { - t.Fatal(pErr) - } - - // Increment the clock's timestamp to expire the range lease. - tc.manualClock.Set(leaseExpiry(tc.repl)) - if tc.repl.CurrentLeaseStatus(ctx).State != kvserverpb.LeaseState_EXPIRED { - t.Fatal("range lease should have been expired") - } - - // Make sure the information for db1 is not gossiped. Since obtaining - // a lease updates the gossiped information, we do that. - if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil { - t.Fatal(pErr) - } - // Fetch the raw gossip info. GetSystemConfig is based on callbacks at - // modification time. But we're checking for _not_ gossiped, so there should - // be no callbacks. Easier to check the raw info. - var cfg config.SystemConfigEntries - err := tc.gossip.GetInfoProto(gossip.KeySystemConfig, &cfg) - if err != nil { - t.Fatal(err) - } - if len(cfg.Values) != 0 { - t.Fatalf("non-lease holder gossiped the system config") - } -} - func getArgs(key []byte) roachpb.GetRequest { return roachpb.GetRequest{ RequestHeader: roachpb.RequestHeader{ @@ -7063,10 +6906,21 @@ func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) { return err } - if len(cfg.Values) != 1 || !bytes.Equal(cfg.Values[0].Key, keys.SystemConfigSpan.Key) { - return errors.Errorf("expected only key %s in SystemConfigSpan map: %+v", keys.SystemConfigSpan.Key, cfg) + var found bool + for _, cur := range cfg.Values { + if !cur.Key.Equal(keys.SystemConfigSpan.Key) { + continue + } + if !v.EqualTagAndData(cur.Value) { + continue + } + found = true + break } - return nil + if found { + return nil + } + return errors.New("recent write not found in gossiped SystemConfig") }) } diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 79ef16222069..4970c2ac95b3 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -892,14 +892,15 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { &AllocatorTestingKnobs{AllowLeaseTransfersToReplicasNeedingSnapshots: true}, ) defer stopper.Stop(context.Background()) - gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g + cfg.StorePool = a.storePool s := createTestStoreWithoutStart(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} rq := newReplicateQueue(s, a) rr := newReplicaRankings() diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 586721e522c8..a5d7e5c2f8cd 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -39,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/storage" @@ -50,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -100,107 +103,52 @@ func (s *Store) TestSender() kv.Sender { }) } -// testSenderFactory is an implementation of the -// client.TxnSenderFactory interface. -type testSenderFactory struct { - store *Store - nonTxnSender *testSender -} - -func (f *testSenderFactory) RootTransactionalSender( - txn *roachpb.Transaction, _ roachpb.UserPriority, -) kv.TxnSender { - return kv.NewMockTransactionalSender( - func( - ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest, - ) (*roachpb.BatchResponse, *roachpb.Error) { - return f.store.Send(ctx, ba) - }, - txn) +// testStoreOpts affords control over aspects of store creation. +type testStoreOpts struct { + // If createSystemRanges is not set, the store will have a single range. If + // set, the store will have all the system ranges that are generally created + // for a cluster at boostrap. + createSystemRanges bool + bootstrapVersion roachpb.Version // defaults to TestingClusterVersion } -func (f *testSenderFactory) LeafTransactionalSender(tis *roachpb.LeafTxnInputState) kv.TxnSender { - return kv.NewMockTransactionalSender( - func( - ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest, - ) (*roachpb.BatchResponse, *roachpb.Error) { - return f.store.Send(ctx, ba) - }, - &tis.Txn) +func (opts *testStoreOpts) splits() (_kvs []roachpb.KeyValue, _splits []roachpb.RKey) { + kvs, splits := bootstrap.MakeMetadataSchema( + keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), + ).GetInitialValues() + if !opts.createSystemRanges { + return kvs, nil + } + splits = append(config.StaticSplits(), splits...) + sort.Slice(splits, func(i, j int) bool { + return splits[i].Less(splits[j]) + }) + return kvs, splits } -func (f *testSenderFactory) NonTransactionalSender() kv.Sender { - if f.nonTxnSender != nil { - return f.nonTxnSender - } - f.nonTxnSender = &testSender{store: f.store} - return f.nonTxnSender +type mockNodeStore struct { + desc *roachpb.NodeDescriptor } -func (f *testSenderFactory) setStore(s *Store) { - f.store = s - if f.nonTxnSender != nil { - // monkey-patch an already created Sender, helping with test bootstrapping. - f.nonTxnSender.store = s - } +func (m mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + return m.desc, nil } -// testSender is an implementation of the client.TxnSender interface -// which passes all requests through to a single store. -type testSender struct { +type dummyFirstRangeProvider struct { store *Store } -// Send forwards the call to the single store. This is a poor man's -// version of kv.TxnCoordSender, but it serves the purposes of -// supporting tests in this package. Transactions are not supported. -// Since kv/ depends on storage/, we can't get access to a -// TxnCoordSender from here. -// TODO(tschottdorf): {kv->storage}.LocalSender -func (db *testSender) Send( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - if et, ok := ba.GetArg(roachpb.EndTxn); ok { - return nil, roachpb.NewErrorf("%s method not supported", et.Method()) - } - // Lookup range and direct request. - rs, err := keys.Range(ba.Requests) - if err != nil { - return nil, roachpb.NewError(err) - } - repl := db.store.LookupReplica(rs.Key) - if repl == nil || !repl.Desc().ContainsKeyRange(rs.Key, rs.EndKey) { - panic(fmt.Sprintf("didn't find right replica for key: %s", rs.Key)) - } - ba.RangeID = repl.RangeID - repDesc, err := repl.GetReplicaDescriptor() - if err != nil { - return nil, roachpb.NewError(err) - } - ba.Replica = repDesc - br, pErr := db.store.Send(ctx, ba) - if br != nil && br.Error != nil { - panic(roachpb.ErrorUnexpectedlySet(db.store, br)) - } - if pErr != nil { - return nil, pErr - } - return br, nil +func (d dummyFirstRangeProvider) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) { + return d.store.GetReplicaIfExists(1).Desc(), nil } -// testStoreOpts affords control over aspects of store creation. -type testStoreOpts struct { - // If createSystemRanges is not set, the store will have a single range. If - // set, the store will have all the system ranges that are generally created - // for a cluster at boostrap. - createSystemRanges bool -} +func (d dummyFirstRangeProvider) OnFirstRangeChanged(f func(*roachpb.RangeDescriptor)) {} // createTestStoreWithoutStart creates a test store using an in-memory // engine without starting the store. It returns the store, the store // clock's manual unix nanos time and a stopper. The caller is // responsible for stopping the stopper upon completion. -// Some fields of ctx are populated by this function. +// Some fields of cfg are populated by this function. func createTestStoreWithoutStart( t testing.TB, stopper *stop.Stopper, opts testStoreOpts, cfg *StoreConfig, ) *Store { @@ -216,8 +164,17 @@ func createTestStoreWithoutStart( Settings: cfg.Settings, }) server := rpc.NewServer(rpcContext) // never started - cfg.Gossip = gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) - cfg.StorePool = NewTestStorePool(*cfg) + + // Some tests inject their own Gossip and StorePool, via + // createTestAllocatorWithKnobs, at the time of writing + // TestChooseLeaseToTransfer and TestNoLeaseTransferToBehindReplicas. This is + // generally considered bad and should eventually be refactored away. + if cfg.Gossip == nil { + cfg.Gossip = gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) + } + if cfg.StorePool == nil { + cfg.StorePool = NewTestStorePool(*cfg) + } // Many tests using this test harness (as opposed to higher-level // ones like multiTestContext or TestServer) want to micro-manage // replicas and the background queues just get in the way. The @@ -231,32 +188,57 @@ func createTestStoreWithoutStart( cfg.TestingKnobs.DisableMergeQueue = true eng := storage.NewDefaultInMemForTesting() stopper.AddCloser(eng) + require.Nil(t, cfg.Transport) cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer) - factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) - store := NewStore(context.Background(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) - factory.setStore(store) + stores := NewStores(cfg.AmbientCtx, cfg.Clock) + nodeDesc := &roachpb.NodeDescriptor{NodeID: 1} + + rangeProv := &dummyFirstRangeProvider{} + var storeSender struct{ kv.Sender } + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: cfg.Settings, + Clock: cfg.Clock, + NodeDescs: mockNodeStore{desc: nodeDesc}, + RPCContext: rpcContext, + RPCRetryOptions: &retry.Options{}, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(cfg.Gossip)), // TODO + FirstRangeProvider: rangeProv, + TestingKnobs: kvcoord.ClientTestingKnobs{ + TransportFactory: kvcoord.SenderTransportFactory(cfg.AmbientCtx.Tracer, &storeSender), + }, + }) - require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion)) + txnCoordSenderFactory := kvcoord.NewTxnCoordSenderFactory(kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: cfg.Settings, + Clock: cfg.Clock, + Stopper: stopper, + HeartbeatInterval: -1, + }, ds) + require.Nil(t, cfg.DB) + cfg.DB = kv.NewDB(cfg.AmbientCtx, txnCoordSenderFactory, cfg.Clock, stopper) + store := NewStore(context.Background(), *cfg, eng, nodeDesc) + storeSender.Sender = store + + storeIdent := roachpb.StoreIdent{NodeID: 1, StoreID: 1} + cv := clusterversion.TestingClusterVersion + if opts.bootstrapVersion != (roachpb.Version{}) { + cv = clusterversion.ClusterVersion{Version: opts.bootstrapVersion} + } + require.NoError(t, WriteClusterVersion(context.Background(), eng, cv)) if err := InitEngine( - context.Background(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, + context.Background(), eng, storeIdent, ); err != nil { t.Fatal(err) } - var splits []roachpb.RKey - kvs, tableSplits := bootstrap.MakeMetadataSchema( - keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ).GetInitialValues() - if opts.createSystemRanges { - splits = config.StaticSplits() - splits = append(splits, tableSplits...) - sort.Slice(splits, func(i, j int) bool { - return splits[i].Less(splits[j]) - }) - } + rangeProv.store = store + store.Ident = &storeIdent // would usually be set during Store.Start, but can't call that yet + stores.AddStore(store) + + kvs, splits := opts.splits() if err := WriteInitialClusterData( - context.Background(), eng, kvs, /* initialValues */ - clusterversion.TestingBinaryVersion, + context.Background(), eng, kvs /* initialValues */, cv.Version, 1 /* numStores */, splits, cfg.Clock.PhysicalNow(), cfg.TestingKnobs, ); err != nil { t.Fatal(err) @@ -428,77 +410,31 @@ func TestStoreInitAndBootstrap(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // We need a fixed clock to avoid LastUpdateNanos drifting on us. - cfg := TestStoreConfig(hlc.NewClock(func() int64 { return 123 }, time.Nanosecond)) - stopper := stop.NewStopper() ctx := context.Background() + stopper := stop.NewStopper() + cfg := TestStoreConfig(nil) + store := createTestStoreWithConfig(t, stopper, testStoreOpts{}, &cfg) defer stopper.Stop(ctx) - eng := storage.NewDefaultInMemForTesting() - stopper.AddCloser(eng) - cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer) - factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) - { - store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) - // Can't start as haven't bootstrapped. - if err := store.Start(ctx, stopper); err == nil { - t.Error("expected failure starting un-bootstrapped store") - } - - require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion)) - // Bootstrap with a fake ident. - if err := InitEngine(ctx, eng, testIdent); err != nil { - t.Fatalf("error bootstrapping store: %+v", err) - } - - // Verify we can read the store ident after a flush. - if err := eng.Flush(); err != nil { - t.Fatal(err) - } - if _, err := ReadStoreIdent(ctx, eng); err != nil { - t.Fatalf("unable to read store ident: %+v", err) - } - - // Bootstrap the system ranges. - var splits []roachpb.RKey - kvs, tableSplits := bootstrap.MakeMetadataSchema( - keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ).GetInitialValues() - splits = config.StaticSplits() - splits = append(splits, tableSplits...) - sort.Slice(splits, func(i, j int) bool { - return splits[i].Less(splits[j]) - }) - if err := WriteInitialClusterData( - ctx, eng, kvs /* initialValues */, clusterversion.TestingBinaryVersion, - 1 /* numStores */, splits, cfg.Clock.PhysicalNow(), cfg.TestingKnobs, - ); err != nil { - t.Errorf("failure to create first range: %+v", err) - } - } - - // Now, attempt to initialize a store with a now-bootstrapped range. - store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) - if err := store.Start(ctx, stopper); err != nil { - t.Fatalf("failure initializing bootstrapped store: %+v", err) + if _, err := ReadStoreIdent(ctx, store.Engine()); err != nil { + t.Fatalf("unable to read store ident: %+v", err) } - for i := 1; i <= store.ReplicaCount(); i++ { - r, err := store.GetReplica(roachpb.RangeID(i)) - if err != nil { - t.Fatalf("failure fetching range %d: %+v", i, err) - } - rs := r.GetMVCCStats() - + store.VisitReplicas(func(repl *Replica) (more bool) { + // Stats should agree with recomputation. Hold raftMu to avoid + // background activity from creating discrepancies between engine + // and in-mem stats. + repl.raftMu.Lock() + defer repl.raftMu.Unlock() + memMS := repl.GetMVCCStats() // Stats should agree with a recomputation. - now := r.store.Clock().Now() - if ms, err := rditer.ComputeStatsForRange(r.Desc(), eng, now.WallTime); err != nil { - t.Errorf("failure computing range's stats: %+v", err) - } else if ms != rs { - t.Errorf("expected range's stats to agree with recomputation: %s", pretty.Diff(ms, rs)) - } - } + now := store.Clock().Now() + diskMS, err := rditer.ComputeStatsForRange(repl.Desc(), store.Engine(), now.WallTime) + require.NoError(t, err) + memMS.AgeTo(diskMS.LastUpdateNanos) + require.Equal(t, memMS, diskMS) + return true // more + }) } // TestInitializeEngineErrors verifies bootstrap failure if engine @@ -1203,25 +1139,13 @@ func TestStoreSendBadRange(t *testing.T) { func splitTestRange(store *Store, splitKey roachpb.RKey, t *testing.T) *Replica { ctx := context.Background() repl := store.LookupReplica(splitKey) - require.NotNil(t, repl) - rangeID, err := store.AllocateRangeID(ctx) - require.NoError(t, err) - rhsDesc := roachpb.NewRangeDescriptor( - rangeID, splitKey, repl.Desc().EndKey, repl.Desc().Replicas()) - // Minimal amount of work to keep this deprecated machinery working: Write - // some required Raft keys. - err = stateloader.WriteInitialRangeState(ctx, store.engine, *rhsDesc, roachpb.Version{}) - require.NoError(t, err) - newRng, err := newReplica(ctx, rhsDesc, store, repl.ReplicaID()) - require.NoError(t, err) - newLeftDesc := *repl.Desc() - newLeftDesc.EndKey = splitKey - err = store.SplitRange(repl.AnnotateCtx(context.Background()), repl, newRng, &roachpb.SplitTrigger{ - RightDesc: *rhsDesc, - LeftDesc: newLeftDesc, - }) - require.NoError(t, err) - return newRng + _, err := repl.AdminSplit(ctx, roachpb.AdminSplitRequest{ + RequestHeader: roachpb.RequestHeader{Key: splitKey.AsRawKey()}, + SplitKey: splitKey.AsRawKey(), + ExpirationTime: store.Clock().Now().Add(24*time.Hour.Nanoseconds(), 0), + }, "splitTestRange") + require.NoError(t, err.GoError()) + return store.LookupReplica(splitKey) } // TestStoreSendOutOfRange passes a key not contained diff --git a/pkg/storage/open.go b/pkg/storage/open.go index 56ad6272c6f1..890bbb7ed8b8 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -12,12 +12,10 @@ package storage import ( "context" - "math/rand" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" ) @@ -51,16 +49,11 @@ var MustExist ConfigOption = func(cfg *engineConfig) error { } // ForTesting configures the engine for use in testing. It may randomize some -// config options to improve test coverage +// config options to improve test coverage. var ForTesting ConfigOption = func(cfg *engineConfig) error { if cfg.Settings == nil { cfg.Settings = cluster.MakeTestingClusterSettings() } - disableSeparatedIntents := rand.Intn(2) == 0 - log.Infof(context.Background(), - "engine creation is randomly setting disableSeparatedIntents: %t", - disableSeparatedIntents) - cfg.DisableSeparatedIntents = disableSeparatedIntents return nil } diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 2278eb3a9b5b..5d8c4caf09ed 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -133,17 +133,13 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto clusterID := &cfg.RPCContext.ClusterID server := rpc.NewServer(cfg.RPCContext) // never started ltc.Gossip = gossip.New(ambient, clusterID, nc, cfg.RPCContext, server, ltc.stopper, metric.NewRegistry(), roachpb.Locality{}, zonepb.DefaultZoneConfigRef()) - disableSeparatedIntents := !cfg.Settings.Version.ActiveVersionOrEmpty( - context.Background()).IsActive(clusterversion.PostSeparatedIntentsMigration) - log.Infof(context.Background(), "engine creation is randomly setting disableSeparatedIntents: %t", - disableSeparatedIntents) var err error ltc.Eng, err = storage.Open( ambient.AnnotateCtx(context.Background()), storage.InMemory(), storage.CacheSize(0), storage.MaxSize(50<<20 /* 50 MiB */), - storage.SetSeparatedIntents(disableSeparatedIntents)) + ) if err != nil { t.Fatal(err) }