From dd8197f2a4f369d7582e4c853305648497411119 Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Mon, 1 Feb 2021 11:49:33 -0800 Subject: [PATCH] kvserver: convert the last set of multiTestContext tests to use TestCluster Makes progress on #8299 This change converts the last set of tests in client_metrics_test, client_raft_test, client_raft_log_queue_test to use TestCluster instead of multiTestContext. Release note: None --- pkg/kv/kvserver/client_metrics_test.go | 180 ++-- pkg/kv/kvserver/client_raft_helpers_test.go | 92 +- pkg/kv/kvserver/client_raft_log_queue_test.go | 109 ++- pkg/kv/kvserver/client_raft_test.go | 842 +++++++++--------- pkg/kv/kvserver/client_test.go | 26 + pkg/testutils/testcluster/testcluster.go | 6 +- pkg/util/leaktest/leaktest.go | 2 + 7 files changed, 651 insertions(+), 606 deletions(-) diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 25ad36b3dd27..4ccf87218ce7 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -13,7 +13,6 @@ package kvserver_test import ( "context" "fmt" - "sync" "testing" "time" @@ -23,7 +22,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -47,23 +48,15 @@ func checkGauge(t *testing.T, id string, g gaugeValuer, e int64) { } } -func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) { +// verifyStats checks a sets of stats on the specified list of servers. This method +// may produce false negatives when executed against a running server that has +// live traffic on it. +func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) { t.Helper() var stores []*kvserver.Store - var wg sync.WaitGroup - - mtc.mu.RLock() - numStores := len(mtc.stores) - // We need to stop the stores at the given indexes, while keeping the reference to the - // store objects. ComputeMVCCStats() still works on a stopped store (it needs - // only the engine, which is still open), and the most recent stats are still - // available on the stopped store object; however, no further information can - // be committed to the store while it is stopped, preventing any races during - // verification. for _, storeIdx := range storeIdxSlice { - stores = append(stores, mtc.stores[storeIdx]) + stores = append(stores, tc.GetFirstStoreFromServer(t, storeIdx)) } - mtc.mu.RUnlock() // Sanity regression check for bug #4624: ensure intent count is zero. // This may not be true immediately due to the asynchronous nature of @@ -78,20 +71,6 @@ func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) { }) } - wg.Add(numStores) - // We actually stop *all* of the Stores. Stopping only a few is riddled - // with deadlocks since operations can span nodes, but stoppers don't - // know about this - taking all of them down at the same time is the - // only sane way of guaranteeing that nothing interesting happens, at - // least when bringing down the nodes jeopardizes majorities. - for i := 0; i < numStores; i++ { - go func(i int) { - defer wg.Done() - mtc.stopStore(i) - }(i) - } - wg.Wait() - for _, s := range stores { idString := s.Ident.String() m := s.Metrics() @@ -128,11 +107,6 @@ func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) { if t.Failed() { t.Fatalf("verifyStats failed, aborting test.") } - - // Restart all Stores. - for i := 0; i < numStores; i++ { - mtc.restartStore(i) - } } func verifyRocksDBStats(t *testing.T, s *kvserver.Store) { @@ -179,13 +153,16 @@ func TestStoreResolveMetrics(t *testing.T) { } } - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 1) - ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) - span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")} + key, err := s.ScratchRange() + require.NoError(t, err) + span := roachpb.Span{Key: key, EndKey: key.Next()} txn := roachpb.MakeTransaction("foo", span.Key, roachpb.MinUserPriority, hlc.Timestamp{WallTime: 123}, 999) @@ -195,7 +172,7 @@ func TestStoreResolveMetrics(t *testing.T) { var ba roachpb.BatchRequest { - repl := mtc.stores[0].LookupReplica(keys.MustAddr(span.Key)) + repl := store.LookupReplica(keys.MustAddr(span.Key)) var err error if ba.Replica, err = repl.GetReplicaDescriptor(); err != nil { t.Fatal(err) @@ -231,17 +208,17 @@ func TestStoreResolveMetrics(t *testing.T) { add(roachpb.ABORTED, false, resolveAbortCount) add(roachpb.ABORTED, true, resolvePoisonCount) - if _, pErr := mtc.senders[0].Send(ctx, ba); pErr != nil { + if _, pErr := store.TestSender().Send(ctx, ba); pErr != nil { t.Fatal(pErr) } - if exp, act := resolveCommitCount, mtc.stores[0].Metrics().ResolveCommitCount.Count(); act < exp || act > exp+50 { + if exp, act := resolveCommitCount, store.Metrics().ResolveCommitCount.Count(); act < exp || act > exp+50 { t.Errorf("expected around %d intent commits, saw %d", exp, act) } - if exp, act := resolveAbortCount, mtc.stores[0].Metrics().ResolveAbortCount.Count(); act < exp || act > exp+50 { + if exp, act := resolveAbortCount, store.Metrics().ResolveAbortCount.Count(); act < exp || act > exp+50 { t.Errorf("expected around %d intent aborts, saw %d", exp, act) } - if exp, act := resolvePoisonCount, mtc.stores[0].Metrics().ResolvePoisonCount.Count(); act < exp || act > exp+50 { + if exp, act := resolvePoisonCount, store.Metrics().ResolvePoisonCount.Count(); act < exp || act > exp+50 { t.Errorf("expected arounc %d abort span poisonings, saw %d", exp, act) } } @@ -250,65 +227,69 @@ func TestStoreMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil /* clock */) - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{ - storeConfig: &storeCfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - defer mtc.Stop() - mtc.Start(t, 3) + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + // Specify a size to trigger the BlockCache in Pebble. + Size: base.SizeSpec{ + InBytes: 1 << 20, + }, + }, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableRaftLogQueue: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) - // Flush RocksDB memtables, so that RocksDB begins using block-based tables. + // Flush Pebble memtables, so that Pebble begins using block-based tables. // This is useful, because most of the stats we track don't apply to // memtables. - if err := mtc.stores[0].Engine().Flush(); err != nil { - t.Fatal(err) - } - if err := mtc.stores[1].Engine().Flush(); err != nil { - t.Fatal(err) - } - - // Disable the raft log truncation which confuses this test. - for _, s := range mtc.stores { - s.SetRaftLogQueueActive(false) + for i := range tc.Servers { + if err := tc.GetFirstStoreFromServer(t, i).Engine().Flush(); err != nil { + t.Fatal(err) + } } - // Perform a split, which has special metrics handling. - splitArgs := adminSplitArgs(roachpb.Key("m")) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), splitArgs); err != nil { + initialCount := tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount.Value() + key := tc.ScratchRange(t) + if _, err := tc.GetFirstStoreFromServer(t, 0).DB().Inc(ctx, key, 10); err != nil { t.Fatal(err) } - // Verify range count is as expected - checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 2) - - // Verify all stats on store0 after split. - verifyStats(t, mtc, 0) + checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1) // Replicate the "right" range to the other stores. - replica := mtc.stores[0].LookupReplica(roachpb.RKey("z")) - mtc.replicateRange(replica.RangeID, 1, 2) + desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...)) // Verify stats on store1 after replication. - verifyStats(t, mtc, 1) + verifyStats(t, tc, 1) // Add some data to the "right" range. - dataKey := []byte("z") - if _, err := mtc.dbs[0].Inc(context.Background(), dataKey, 5); err != nil { + dataKey := key.Next() + if _, err := tc.GetFirstStoreFromServer(t, 0).DB().Inc(ctx, dataKey, 5); err != nil { t.Fatal(err) } - mtc.waitForValues(roachpb.Key("z"), []int64{5, 5, 5}) + tc.WaitForValues(t, dataKey, []int64{5, 5, 5}) // Verify all stats on stores after addition. - verifyStats(t, mtc, 0, 1, 2) + // We skip verifying stats on Server[0] because there is no reliable way to + // do that given all if the system table activity generated by the TestCluster. + // We use Servers[1] and Servers[2] instead, since we can control the traffic + // on those servers. + verifyStats(t, tc, 1, 2) // Create a transaction statement that fails. Regression test for #4969. - if err := mtc.dbs[0].Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + if err := tc.GetFirstStoreFromServer(t, 0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() var expVal roachpb.Value expVal.SetInt(6) @@ -319,34 +300,31 @@ func TestStoreMetrics(t *testing.T) { } // Verify stats after addition. - verifyStats(t, mtc, 0, 1, 2) - checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 2) + verifyStats(t, tc, 1, 2) + checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1) - // Unreplicate range from the first store. + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + tc.RemoveVotersOrFatal(t, key, tc.Target(0)) testutils.SucceedsSoon(t, func() error { - // This statement can fail if store 0 is not the leaseholder. - if err := mtc.transferLeaseNonFatal(context.Background(), replica.RangeID, 0, 1); err != nil { - t.Log(err) + _, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) + if err == nil { + return fmt.Errorf("replica still exists on dest 0") + } else if errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) { + return nil } - // This statement will fail if store 0 IS the leaseholder. This can happen - // even after the previous statement. - return mtc.unreplicateRangeNonFatal(replica.RangeID, 0) + return err }) - - // Wait until we're sure that store 0 has successfully processed its removal. - require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0)) - - mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5}) + tc.WaitForValues(t, dataKey, []int64{0, 5, 5}) // Verify range count is as expected. - checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 1) - checkGauge(t, "store 1", mtc.stores[1].Metrics().ReplicaCount, 1) + checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount) + checkGauge(t, "store 1", tc.GetFirstStoreFromServer(t, 1).Metrics().ReplicaCount, 1) // Verify all stats on all stores after range is removed. - verifyStats(t, mtc, 0, 1, 2) + verifyStats(t, tc, 1, 2) - verifyRocksDBStats(t, mtc.stores[0]) - verifyRocksDBStats(t, mtc.stores[1]) + verifyRocksDBStats(t, tc.GetFirstStoreFromServer(t, 1)) + verifyRocksDBStats(t, tc.GetFirstStoreFromServer(t, 2)) } // TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases ensures that the metric diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index a87e080b1480..e3d0918af690 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -130,48 +131,53 @@ func (h *unreliableRaftHandler) HandleSnapshot( return h.RaftMessageHandler.HandleSnapshot(header, respStream) } -// mtcStoreRaftMessageHandler exists to allows a store to be stopped and +// testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. -type mtcStoreRaftMessageHandler struct { - mtc *multiTestContext +type testClusterStoreRaftMessageHandler struct { + tc *testcluster.TestCluster storeIdx int } -func (h *mtcStoreRaftMessageHandler) HandleRaftRequest( +func (h *testClusterStoreRaftMessageHandler) getStore() (*kvserver.Store, error) { + ts := h.tc.Servers[h.storeIdx] + return ts.Stores().GetStore(ts.GetFirstStoreID()) +} + +func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest( ctx context.Context, req *kvserver.RaftMessageRequest, respStream kvserver.RaftMessageResponseStream, ) *roachpb.Error { - store := h.mtc.Store(h.storeIdx) - if store == nil { - return roachpb.NewErrorf("store not found") + store, err := h.getStore() + if err != nil { + return roachpb.NewError(err) } return store.HandleRaftRequest(ctx, req, respStream) } -func (h *mtcStoreRaftMessageHandler) HandleRaftResponse( +func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse( ctx context.Context, resp *kvserver.RaftMessageResponse, ) error { - store := h.mtc.Store(h.storeIdx) - if store == nil { - return errors.New("store not found") + store, err := h.getStore() + if err != nil { + return err } return store.HandleRaftResponse(ctx, resp) } -func (h *mtcStoreRaftMessageHandler) HandleSnapshot( +func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( header *kvserver.SnapshotRequest_Header, respStream kvserver.SnapshotResponseStream, ) error { - store := h.mtc.Store(h.storeIdx) - if store == nil { - return errors.New("store not found") + store, err := h.getStore() + if err != nil { + return err } return store.HandleSnapshot(header, respStream) } -// mtcPartitionedRange is a convenient abstraction to create a range on a node +// testClusterPartitionedRange is a convenient abstraction to create a range on a node // in a multiTestContext which can be partitioned and unpartitioned. -type mtcPartitionedRange struct { +type testClusterPartitionedRange struct { rangeID roachpb.RangeID mu struct { syncutil.RWMutex @@ -182,8 +188,9 @@ type mtcPartitionedRange struct { handlers []kvserver.RaftMessageHandler } -// setupPartitionedRange sets up an mtcPartitionedRange for the provided mtc, -// rangeID, and node index in the mtc. The range is initially not partitioned. +// setupPartitionedRange sets up an testClusterPartitionedRange for the provided +// TestCluster, rangeID, and node index in the TestCluster. The range is +// initially not partitioned. // // We're going to set up the cluster with partitioning so that we can // partition node p from the others. We do this by installing @@ -205,40 +212,45 @@ type mtcPartitionedRange struct { // If replicaID is zero then it is resolved by looking up the replica for the // partitionedNode of from the current range descriptor of rangeID. func setupPartitionedRange( - mtc *multiTestContext, + tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, partitionedNode int, activated bool, funcs unreliableRaftHandlerFuncs, -) (*mtcPartitionedRange, error) { - handlers := make([]kvserver.RaftMessageHandler, 0, len(mtc.stores)) - for i := range mtc.stores { - handlers = append(handlers, &mtcStoreRaftMessageHandler{ - mtc: mtc, +) (*testClusterPartitionedRange, error) { + handlers := make([]kvserver.RaftMessageHandler, 0, len(tc.Servers)) + for i := range tc.Servers { + handlers = append(handlers, &testClusterStoreRaftMessageHandler{ + tc: tc, storeIdx: i, }) } - return setupPartitionedRangeWithHandlers(mtc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) } func setupPartitionedRangeWithHandlers( - mtc *multiTestContext, + tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, partitionedNode int, activated bool, handlers []kvserver.RaftMessageHandler, funcs unreliableRaftHandlerFuncs, -) (*mtcPartitionedRange, error) { - pr := &mtcPartitionedRange{ +) (*testClusterPartitionedRange, error) { + pr := &testClusterPartitionedRange{ rangeID: rangeID, handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated pr.mu.partitionedNode = partitionedNode if replicaID == 0 { - partRepl, err := mtc.Store(partitionedNode).GetReplica(rangeID) + ts := tc.Servers[partitionedNode] + store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) + if err != nil { + return nil, err + } + partRepl, err := store.GetReplica(rangeID) if err != nil { return nil, err } @@ -251,7 +263,7 @@ func setupPartitionedRangeWithHandlers( pr.mu.partitionedReplicas = map[roachpb.ReplicaID]bool{ replicaID: true, } - for i := range mtc.stores { + for i := range tc.Servers { s := i h := &unreliableRaftHandler{ rangeID: rangeID, @@ -296,32 +308,32 @@ func setupPartitionedRangeWithHandlers( } } pr.handlers = append(pr.handlers, h) - mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h) + tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } return pr, nil } -func (pr *mtcPartitionedRange) deactivate() { pr.set(false) } -func (pr *mtcPartitionedRange) activate() { pr.set(true) } -func (pr *mtcPartitionedRange) set(active bool) { +func (pr *testClusterPartitionedRange) deactivate() { pr.set(false) } +func (pr *testClusterPartitionedRange) activate() { pr.set(true) } +func (pr *testClusterPartitionedRange) set(active bool) { pr.mu.Lock() defer pr.mu.Unlock() pr.mu.partitioned = active } -func (pr *mtcPartitionedRange) addReplica(replicaID roachpb.ReplicaID) { +func (pr *testClusterPartitionedRange) addReplica(replicaID roachpb.ReplicaID) { pr.mu.Lock() defer pr.mu.Unlock() pr.mu.partitionedReplicas[replicaID] = true } -func (pr *mtcPartitionedRange) extend( - mtc *multiTestContext, +func (pr *testClusterPartitionedRange) extend( + tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, partitionedNode int, activated bool, funcs unreliableRaftHandlerFuncs, -) (*mtcPartitionedRange, error) { - return setupPartitionedRangeWithHandlers(mtc, rangeID, replicaID, partitionedNode, activated, pr.handlers, funcs) +) (*testClusterPartitionedRange, error) { + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, pr.handlers, funcs) } diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 891f11e10ae2..195233a766dc 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -13,15 +13,21 @@ package kvserver_test import ( "bytes" "context" - "fmt" "math" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) // TestRaftLogQueue verifies that the raft log queue correctly truncates the @@ -30,69 +36,78 @@ func TestRaftLogQueue(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - // Set maxBytes to something small so we can trigger the raft log truncation // without adding 64MB of logs. const maxBytes = 1 << 16 - // Turn off raft elections so the raft leader won't change out from under - // us in this test. - sc := kvserver.TestStoreConfig(nil) - sc.DefaultZoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - sc.RaftTickInterval = math.MaxInt32 - sc.RaftElectionTimeoutTicks = 1000000 - mtc.storeConfig = &sc - - defer mtc.Stop() - mtc.Start(t, 3) - + zoneConfig := zonepb.DefaultZoneConfig() + zoneConfig.RangeMaxBytes = proto.Int64(maxBytes) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DefaultZoneConfigOverride: &zoneConfig, + }, + }, + RaftConfig: base.RaftConfig{ + // Turn off raft elections so the raft leader won't change out from under + // us in this test. + RaftTickInterval: math.MaxInt32, + RaftElectionTimeoutTicks: 1000000, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + key := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) // Write a single value to ensure we have a leader. - pArgs := putArgs([]byte("key"), []byte("value")) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), pArgs); err != nil { + pArgs := putArgs(key, []byte("value")) + if _, err := kv.SendWrapped(ctx, store.TestSender(), pArgs); err != nil { t.Fatal(err) } // Get the raft leader (and ensure one exists). - rangeID := mtc.stores[0].LookupReplica([]byte("a")).RangeID - raftLeaderRepl := mtc.getRaftLeader(rangeID) - if raftLeaderRepl == nil { - t.Fatalf("could not find raft leader replica for range %d", rangeID) - } + raftLeaderRepl := tc.GetRaftLeader(t, roachpb.RKey(key)) + require.NotNil(t, raftLeaderRepl) originalIndex, err := raftLeaderRepl.GetFirstIndex() if err != nil { t.Fatal(err) } - // Disable splits since we're increasing the raft log with puts. - for _, store := range mtc.stores { - store.SetSplitQueueActive(false) - } - // Write a collection of values to increase the raft log. - value := bytes.Repeat([]byte("a"), 1000) // 1KB + value := bytes.Repeat(key, 1000) // 1KB for size := int64(0); size < 2*maxBytes; size += int64(len(value)) { - pArgs = putArgs([]byte(fmt.Sprintf("key-%d", size)), value) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), pArgs); err != nil { + key = key.Next() + pArgs = putArgs(key, value) + if _, err := kv.SendWrapped(ctx, store.TestSender(), pArgs); err != nil { t.Fatal(err) } } - // Force a truncation check. - for _, store := range mtc.stores { - store.MustForceRaftLogScanAndProcess() - } - - // Ensure that firstIndex has increased indicating that the log - // truncation has occurred. - afterTruncationIndex, err := raftLeaderRepl.GetFirstIndex() - if err != nil { - t.Fatal(err) - } - if afterTruncationIndex <= originalIndex { - t.Fatalf("raft log has not been truncated yet, afterTruncationIndex:%d originalIndex:%d", - afterTruncationIndex, originalIndex) - } + var afterTruncationIndex uint64 + testutils.SucceedsSoon(t, func() error { + // Force a truncation check. + for i := range tc.Servers { + tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess() + } + // Ensure that firstIndex has increased indicating that the log + // truncation has occurred. + afterTruncationIndex, err = raftLeaderRepl.GetFirstIndex() + if err != nil { + return err + } + if afterTruncationIndex <= originalIndex { + return errors.Errorf("raft log has not been truncated yet, afterTruncationIndex:%d originalIndex:%d", + afterTruncationIndex, originalIndex) + } + return nil + }) // Force a truncation check again to ensure that attempting to truncate an // already truncated log has no effect. This check, unlike in the last @@ -101,8 +116,8 @@ func TestRaftLogQueue(t *testing.T) { // GetFirstIndex, giving a false negative. Fixing this requires additional // instrumentation of the queues, which was deemed to require too much work // at the time of this writing. - for _, store := range mtc.stores { - store.MustForceRaftLogScanAndProcess() + for i := range tc.Servers { + tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess() } after2ndTruncationIndex, err := raftLeaderRepl.GetFirstIndex() diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index b758412e4905..c996d7459d62 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -434,31 +434,40 @@ func TestFailedReplicaChange(t *testing.T) { var runFilter atomic.Value runFilter.Store(true) - sc := kvserver.TestStoreConfig(nil) - sc.TestingKnobs.DisableReplicateQueue = true - sc.Clock = nil // manual clock - sc.TestingKnobs.EvalKnobs.TestingEvalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { + testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if runFilter.Load().(bool) { - if et, ok := filterArgs.Req.(*roachpb.EndTxnRequest); ok && et.Commit { + if et, ok := filterArgs.Req.(*roachpb.EndTxnRequest); ok && et.Commit && + et.InternalCommitTrigger != nil && et.InternalCommitTrigger.ChangeReplicasTrigger != nil { return roachpb.NewErrorWithTxn(errors.Errorf("boom"), filterArgs.Hdr.Txn) } } return nil } - mtc := &multiTestContext{storeConfig: &sc} - defer mtc.Stop() - mtc.Start(t, 2) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: testingEvalFilter, + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) - repl, err := mtc.stores[0].GetReplica(1) - if err != nil { - t.Fatal(err) - } + key := tc.ScratchRange(t) + repl := store.LookupReplica(roachpb.RKey(key)) - chgs := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, roachpb.ReplicationTarget{ - NodeID: mtc.stores[1].Ident.NodeID, - StoreID: mtc.stores[1].Ident.StoreID, - }) - if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); !testutils.IsError(err, "boom") { + if _, err := tc.AddVoters(key, tc.Target(1)); !testutils.IsError(err, "boom") { t.Fatalf("did not get expected error: %+v", err) } @@ -474,26 +483,12 @@ func TestFailedReplicaChange(t *testing.T) { // The first failed replica change has laid down intents. Make sure those // are pushable by making the transaction abandoned. - mtc.manualClock.Increment(10 * base.DefaultTxnHeartbeatInterval.Nanoseconds()) - - if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); err != nil { - t.Fatal(err) - } + manualClock.Increment(10 * base.DefaultTxnHeartbeatInterval.Nanoseconds()) + tc.AddVotersOrFatal(t, key, tc.Target(1)) // Wait for the range to sync to both replicas (mainly so leaktest doesn't // complain about goroutines involved in the process). - testutils.SucceedsSoon(t, func() error { - for _, store := range mtc.stores { - rang, err := store.GetReplica(1) - if err != nil { - return err - } - if replicas := rang.Desc().InternalReplicas; len(replicas) <= 1 { - return errors.Errorf("expected > 1 replicas; got %v", replicas) - } - } - return nil - }) + require.NoError(t, tc.WaitForVoters(key, tc.Targets(0, 1)...)) } // We can truncate the old log entries and a new replica will be brought up from a snapshot. @@ -845,22 +840,24 @@ func TestSnapshotAfterTruncation(t *testing.T) { func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - cfg := kvserver.TestStoreConfig(nil) - cfg.TestingKnobs.DisableReplicateQueue = true - cfg.Clock = nil // using manual clock - mtc := &multiTestContext{ - storeConfig: &cfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - defer mtc.Stop() - mtc.Start(t, 3) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) - key := roachpb.Key("a") + key := tc.ScratchRangeWithExpirationLease(t) incA := int64(5) incB := int64(7) incC := int64(9) @@ -873,25 +870,23 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // We're then going to continue modifying this key to make sure that the // temporarily partitioned node can continue to receive updates. incArgs := incrementArgs(key, incA) - if _, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.replicateRange(1, 1, 2) - mtc.waitForValues(key, []int64{incA, incA, incA}) + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...)) + tc.WaitForValues(t, key, []int64{incA, incA, incA}) // We partition the original leader from the other two replicas. This allows // us to build up a large uncommitted Raft log on the partitioned node. const partStore = 0 - partRepl, err := mtc.stores[partStore].GetReplica(1) - if err != nil { - t.Fatal(err) - } + partRepl := tc.GetFirstStoreFromServer(t, partStore).LookupReplica(roachpb.RKey(key)) partReplDesc, err := partRepl.GetReplicaDescriptor() if err != nil { t.Fatal(err) } - partReplSender := mtc.stores[partStore].TestSender() + partReplSender := tc.GetFirstStoreFromServer(t, partStore).TestSender() // Partition the original leader from its followers. We do this by installing // unreliableRaftHandler listeners on all three Stores. The handler on the @@ -907,7 +902,10 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // log.Infof(ctx, "test: installing unreliable Raft transports") for _, s := range []int{0, 1, 2} { - h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]} + h := &unreliableRaftHandler{ + rangeID: partRepl.RangeID, + RaftMessageHandler: tc.GetFirstStoreFromServer(t, s), + } if s != partStore { // Only filter messages from the partitioned store on the other // two stores. @@ -918,7 +916,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { return hb.FromReplicaID == partReplDesc.ReplicaID } } - mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h) + tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } // Perform a series of writes on the partitioned replica. The writes will @@ -927,8 +925,14 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // leader. log.Infof(ctx, "test: sending writes to partitioned replica") g := ctxgroup.WithContext(ctx) - for i := 0; i < 32; i++ { - otherKey := roachpb.Key(fmt.Sprintf("other-%d", i)) + otherKeys := make([]roachpb.Key, 32) + otherKeys[0] = key.Next() + for i := 1; i < 32; i++ { + otherKeys[i] = otherKeys[i-1].Next() + } + for i := range otherKeys { + // This makes the race detector happy. + otherKey := otherKeys[i] g.GoCtx(func(ctx context.Context) error { cCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) defer cancel() @@ -953,8 +957,8 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // who will get the lease. We try repeatedly sending requests to both // candidates until one of them succeeds. var nonPartitionedSenders [2]kv.Sender - nonPartitionedSenders[0] = mtc.stores[1].TestSender() - nonPartitionedSenders[1] = mtc.stores[2].TestSender() + nonPartitionedSenders[0] = tc.GetFirstStoreFromServer(t, 1).TestSender() + nonPartitionedSenders[1] = tc.GetFirstStoreFromServer(t, 2).TestSender() log.Infof(ctx, "test: sending write to transfer lease") incArgs = incrementArgs(key, incB) @@ -962,7 +966,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { var newLeaderRepl *kvserver.Replica var newLeaderReplSender kv.Sender testutils.SucceedsSoon(t, func() error { - mtc.advanceClock(ctx) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) i++ sender := nonPartitionedSenders[i%2] _, pErr := kv.SendWrapped(ctx, sender, incArgs) @@ -975,15 +979,12 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // A request succeeded, proving that there is a new leader and leaseholder. // Remember who that is. newLeaderStoreIdx := 1 + (i % 2) - newLeaderRepl, err = mtc.stores[newLeaderStoreIdx].GetReplica(1) - if err != nil { - t.Fatal(err) - } - newLeaderReplSender = mtc.stores[newLeaderStoreIdx].TestSender() + newLeaderRepl = tc.GetFirstStoreFromServer(t, newLeaderStoreIdx).LookupReplica(roachpb.RKey(key)) + newLeaderReplSender = tc.GetFirstStoreFromServer(t, newLeaderStoreIdx).TestSender() return nil }) log.Infof(ctx, "test: waiting for values...") - mtc.waitForValues(key, []int64{incA, incAB, incAB}) + tc.WaitForValues(t, key, []int64{incA, incAB, incAB}) log.Infof(ctx, "test: waiting for values... done") index, err := newLeaderRepl.GetLastIndex() @@ -994,10 +995,11 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // Truncate the log at index+1 (log entries < N are removed, so this // includes the increment). log.Infof(ctx, "test: truncating log") - truncArgs := truncateLogArgs(index+1, 1) + truncArgs := truncateLogArgs(index+1, partRepl.RangeID) + truncArgs.Key = partRepl.Desc().StartKey.AsRawKey() testutils.SucceedsSoon(t, func() error { - mtc.advanceClock(ctx) - _, pErr := kv.SendWrapped(ctx, newLeaderReplSender, truncArgs) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) + _, pErr := kv.SendWrappedWith(ctx, newLeaderReplSender, roachpb.Header{RangeID: partRepl.RangeID}, truncArgs) if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { return pErr.GoError() } else if pErr != nil { @@ -1006,15 +1008,15 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { return nil }) - snapsMetric := mtc.stores[partStore].Metrics().RangeSnapshotsAppliedByVoters + snapsMetric := tc.GetFirstStoreFromServer(t, partStore).Metrics().RangeSnapshotsAppliedByVoters snapsBefore := snapsMetric.Count() // Remove the partition. Snapshot should follow. log.Infof(ctx, "test: removing the partition") for _, s := range []int{0, 1, 2} { - mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{ - rangeID: 1, - RaftMessageHandler: mtc.stores[s], + tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, &unreliableRaftHandler{ + rangeID: partRepl.RangeID, + RaftMessageHandler: tc.GetFirstStoreFromServer(t, s), unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserver.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just truncated can @@ -1038,15 +1040,15 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { } return nil }) - mtc.waitForValues(key, []int64{incAB, incAB, incAB}) + tc.WaitForValues(t, key, []int64{incAB, incAB, incAB}) // Perform another write. The partitioned replica should be able to receive // replicated updates. incArgs = incrementArgs(key, incC) - if _, pErr := kv.SendWrapped(ctx, mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(key, []int64{incABC, incABC, incABC}) + tc.WaitForValues(t, key, []int64{incABC, incABC, incABC}) } // TestRequestsOnLaggingReplica tests that requests sent to a replica that's @@ -1784,6 +1786,7 @@ func TestUnreplicateFirstRange(t *testing.T) { mtc.transferLease(context.Background(), rangeID, 0, 1) // Unreplicate the from from store 0. mtc.unreplicateRange(rangeID, 0) + require.NoError(t, mtc.waitForUnreplicated(rangeID, 0)) // Replicate the range to store 2. The first range is no longer available on // store 1, and this command will fail if that situation is not properly // supported. @@ -1954,187 +1957,190 @@ func TestReplicateRestartAfterTruncation(t *testing.T) { } func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReAdd bool) { - sc := kvserver.TestStoreConfig(nil) - // Don't timeout raft leaders or range leases (see the relation between - // RaftElectionTimeoutTicks and RangeLeaseActiveDuration). This test expects - // mtc.stores[0] to hold the range lease for range 1. - sc.RaftElectionTimeoutTicks = 1000000 - sc.TestingKnobs.DisableReplicateQueue = true - sc.Clock = nil // manual clock - mtc := &multiTestContext{ - storeConfig: &sc, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + const numServers int = 3 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + ClockSource: manualClock.UnixNano, + }, + }, + RaftConfig: base.RaftConfig{ + // Don't timeout raft leaders or range leases (see the relation between + // RaftElectionTimeoutTicks and RangeLeaseActiveDuration). This test expects + // tc.Servers[0] to hold the range lease for the range under test. + RaftElectionTimeoutTicks: 1000000, + }, + } } - defer mtc.Stop() - mtc.Start(t, 3) + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) - key := roachpb.Key("a") + key := tc.ScratchRangeWithExpirationLease(t) // Replicate the initial range to all three nodes. - const rangeID = roachpb.RangeID(1) - mtc.replicateRange(rangeID, 1, 2) + desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) // Verify that the first increment propagates to all the engines. incArgs := incrementArgs(key, 2) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } - mtc.waitForValues(key, []int64{2, 2, 2}) + tc.WaitForValues(t, key, []int64{2, 2, 2}) // Stop a store. - mtc.stopStore(1) + tc.StopServer(1) if removeBeforeTruncateAndReAdd { // remove the stopped store from the range - mtc.unreplicateRange(rangeID, 1) + tc.RemoveVotersOrFatal(t, key, tc.Target(1)) } // Truncate the logs. { // Get the last increment's log index. - repl, err := mtc.stores[0].GetReplica(rangeID) - if err != nil { - t.Fatal(err) - } + repl := store.LookupReplica(roachpb.RKey(key)) index, err := repl.GetLastIndex() if err != nil { t.Fatal(err) } // Truncate the log at index+1 (log entries < N are removed, so this includes // the increment). - truncArgs := truncateLogArgs(index+1, rangeID) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), truncArgs); err != nil { + truncArgs := truncateLogArgs(index+1, repl.RangeID) + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), truncArgs); err != nil { t.Fatal(err) } } // Ensure that store can catch up with the rest of the group. incArgs = incrementArgs(key, 3) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } - mtc.waitForValues(key, []int64{5, 2, 5}) + tc.WaitForValues(t, key, []int64{5, 2, 5}) // Re-add the store and restart it. // TODO(dt): ben originally suggested we also attempt this in the other order. // This currently hits an NPE in mtc.replicateRange though when it tries to // read the Ident.NodeID field in the specified store, and will become // impossible after streaming snapshots. - mtc.restartStore(1) + require.NoError(t, tc.RestartServer(1)) if removeBeforeTruncateAndReAdd { // Verify old replica is GC'd. Wait out the replica gc queue // inactivity threshold and force a gc scan. - mtc.manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1)) + manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1)) testutils.SucceedsSoon(t, func() error { - mtc.stores[1].MustForceReplicaGCScanAndProcess() - _, err := mtc.stores[1].GetReplica(rangeID) + tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess() + _, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID) if !errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) { return errors.Errorf("expected replica to be garbage collected, got %v %T", err, err) } return nil }) - mtc.replicateRange(rangeID, 1) + tc.AddVotersOrFatal(t, key, tc.Target(1)) } - mtc.waitForValues(key, []int64{5, 5, 5}) + tc.WaitForValues(t, key, []int64{5, 5, 5}) } func testReplicaAddRemove(t *testing.T, addFirst bool) { - sc := kvserver.TestStoreConfig(nil) - // We're gonna want to validate the state of the store before and - // after the replica GC queue does its work, so we disable the - // replica gc queue here and run it manually when we're ready. - sc.TestingKnobs.DisableReplicaGCQueue = true - sc.TestingKnobs.DisableReplicateQueue = true - sc.TestingKnobs.DisableEagerReplicaRemoval = true - sc.Clock = nil // manual clock - mtc := &multiTestContext{ - storeConfig: &sc, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - defer mtc.Stop() - mtc.Start(t, 4) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() - key := roachpb.Key("a") - verifyFn := func(expected []int64) func() error { - return func() error { - values := make([]int64, len(mtc.engines)) - for i, eng := range mtc.engines { - val, _, err := storage.MVCCGet(context.Background(), eng, key, mtc.clock().Now(), - storage.MVCCGetOptions{}) - if err != nil { - return err - } - values[i] = mustGetInt(val) - } - if reflect.DeepEqual(expected, values) { - return nil - } - return errors.Errorf("expected %+v, got %+v", expected, values) + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + const numServers int = 4 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + // We're gonna want to validate the state of the store before and + // after the replica GC queue does its work, so we disable the + // replica gc queue here and run it manually when we're ready. + DisableReplicaGCQueue: true, + DisableEagerReplicaRemoval: true, + }, + }, } } + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + key := tc.ScratchRangeWithExpirationLease(t) // Replicate the initial range to three of the four nodes. - const rangeID = roachpb.RangeID(1) - mtc.replicateRange(rangeID, 3, 1) + tc.AddVotersOrFatal(t, key, tc.Targets(3, 1)...) inc1 := int64(5) { incArgs := incrementArgs(key, inc1) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } } - // The first increment is visible on all three replicas. - testutils.SucceedsSoon(t, verifyFn([]int64{ - inc1, - inc1, - 0, - inc1, - })) + tc.WaitForValues(t, key, []int64{inc1, inc1, 0, inc1}) // Stop a store and replace it. - mtc.stopStore(1) + tc.StopServer(1) if addFirst { - mtc.replicateRange(rangeID, 2) - mtc.unreplicateRange(rangeID, 1) + tc.AddVotersOrFatal(t, key, tc.Target(2)) + tc.RemoveVotersOrFatal(t, key, tc.Target(1)) } else { - mtc.unreplicateRange(rangeID, 1) - mtc.replicateRange(rangeID, 2) + tc.RemoveVotersOrFatal(t, key, tc.Target(1)) + tc.AddVotersOrFatal(t, key, tc.Target(2)) } // The first increment is visible on the new replica. - testutils.SucceedsSoon(t, verifyFn([]int64{ - inc1, - inc1, - inc1, - inc1, - })) + tc.WaitForValues(t, key, []int64{inc1, inc1, inc1, inc1}) // Ensure that the rest of the group can make progress. inc2 := int64(11) { incArgs := incrementArgs(key, inc2) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } } - testutils.SucceedsSoon(t, verifyFn([]int64{ - inc1 + inc2, - inc1, - inc1 + inc2, - inc1 + inc2, - })) + tc.WaitForValues(t, key, []int64{inc1 + inc2, inc1, inc1 + inc2, inc1 + inc2}) // Bring the downed store back up (required for a clean shutdown). - mtc.restartStore(1) + require.NoError(t, tc.RestartServer(1)) // The downed store never sees the increment that was added while it was // down. Perform another increment now that it is back up to verify that it @@ -2142,32 +2148,22 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { inc3 := int64(23) { incArgs := incrementArgs(key, inc3) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } } - testutils.SucceedsSoon(t, verifyFn([]int64{ - inc1 + inc2 + inc3, - inc1, - inc1 + inc2 + inc3, - inc1 + inc2 + inc3, - })) + tc.WaitForValues(t, key, []int64{inc1 + inc2 + inc3, inc1, inc1 + inc2 + inc3, inc1 + inc2 + inc3}) // Wait out the range lease and the unleased duration to make the replica GC'able. - mtc.advanceClock(context.Background()) - mtc.manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1)) - mtc.stores[1].SetReplicaGCQueueActive(true) - mtc.stores[1].MustForceReplicaGCScanAndProcess() + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) + manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1)) + tc.GetFirstStoreFromServer(t, 1).SetReplicaGCQueueActive(true) + tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess() // The removed store no longer has any of the data from the range. - testutils.SucceedsSoon(t, verifyFn([]int64{ - inc1 + inc2 + inc3, - 0, - inc1 + inc2 + inc3, - inc1 + inc2 + inc3, - })) - - desc := mtc.stores[0].LookupReplica(roachpb.RKeyMin).Desc() + tc.WaitForValues(t, key, []int64{inc1 + inc2 + inc3, 0, inc1 + inc2 + inc3, inc1 + inc2 + inc3}) + + desc := tc.LookupRangeOrFatal(t, key) replicaIDsByStore := map[roachpb.StoreID]roachpb.ReplicaID{} for _, rep := range desc.InternalReplicas { replicaIDsByStore[rep.StoreID] = rep.ReplicaID @@ -3222,47 +3218,67 @@ func TestReplicateRogueRemovedNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - sc := kvserver.TestStoreConfig(nil) - // Newly-started stores (including the "rogue" one) should not GC - // their replicas. We'll turn this back on when needed. - sc.TestingKnobs.DisableReplicaGCQueue = true - sc.TestingKnobs.DisableReplicateQueue = true - sc.Clock = nil // manual clock - mtc := &multiTestContext{ - storeConfig: &sc, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + const numServers int = 3 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + // Newly-started stores (including the "rogue" one) should not GC + // their replicas. We'll turn this back on when needed. + DisableReplicaGCQueue: true, + }, + }, + } } - defer mtc.Stop() - mtc.Start(t, 3) + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + key := tc.ScratchRangeWithExpirationLease(t) + // First put the range on all three nodes. + desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) // We're going to set up the cluster with partitioning so that we can // partition node 0 from the others. The partition is not initially active. - partRange, err := setupPartitionedRange(mtc, 1, 0, 0, false /* activated */, unreliableRaftHandlerFuncs{}) + partRange, err := setupPartitionedRange(tc, desc.RangeID, 0, 0, false /* activated */, unreliableRaftHandlerFuncs{}) require.NoError(t, err) - // First put the range on all three nodes. - raftID := roachpb.RangeID(1) - mtc.replicateRange(raftID, 1, 2) // Put some data in the range so we'll have something to test for. - incArgs := incrementArgs([]byte("a"), 5) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + incArgs := incrementArgs(key, 5) + if _, err := kv.SendWrapped(ctx, store.TestSender(), incArgs); err != nil { t.Fatal(err) } // Wait for all nodes to catch up. - mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5}) + tc.WaitForValues(t, key, []int64{5, 5, 5}) // Stop node 2; while it is down remove the range from nodes 2 and 1. - mtc.stopStore(2) - mtc.unreplicateRange(raftID, 2) - mtc.unreplicateRange(raftID, 1) + tc.StopServer(2) + tc.RemoveVotersOrFatal(t, key, tc.Targets(2, 1)...) // Make a write on node 0; this will not be replicated because 0 is the only node left. - incArgs = incrementArgs([]byte("a"), 11) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + incArgs = incrementArgs(key, 11) + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); err != nil { t.Fatal(err) } @@ -3272,14 +3288,14 @@ func TestReplicateRogueRemovedNode(t *testing.T) { // may be recreated by a stray raft message, so we run the GC scan inside the loop. // TODO(bdarnell): if the call to RemoveReplica in replicaGCQueue.process can be // moved under the lock, then the GC scan can be moved out of this loop. - mtc.stores[1].SetReplicaGCQueueActive(true) + tc.GetFirstStoreFromServer(t, 1).SetReplicaGCQueueActive(true) testutils.SucceedsSoon(t, func() error { - mtc.advanceClock(context.Background()) - mtc.manualClock.Increment(int64( + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) + manualClock.Increment(int64( kvserver.ReplicaGCQueueInactivityThreshold) + 1) - mtc.stores[1].MustForceReplicaGCScanAndProcess() + tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess() - actual := mtc.readIntFromEngines(roachpb.Key("a")) + actual := tc.ReadIntFromStores(key) expected := []int64{16, 0, 5} if !reflect.DeepEqual(expected, actual) { return errors.Errorf("expected %v, got %v", expected, actual) @@ -3290,7 +3306,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) { // ReplicaTooOldError from node 0 and proceed to remove themselves. partRange.activate() // Bring node 2 back up. - mtc.restartStore(2) + require.NoError(t, tc.RestartServer(2)) // Try to issue a command on node 2. It should not be able to commit // (so we add it asynchronously). @@ -3299,24 +3315,20 @@ func TestReplicateRogueRemovedNode(t *testing.T) { var finishWG sync.WaitGroup finishWG.Add(1) - rep, err := mtc.stores[2].GetReplica(raftID) - if err != nil { - t.Fatal(err) - } - replicaDesc, ok := rep.Desc().GetReplicaDescriptor(mtc.stores[2].StoreID()) - if !ok { - t.Fatalf("ReplicaID %d not found", raftID) - } + rep := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(key)) + replicaDesc, ok := rep.Desc().GetReplicaDescriptor(tc.Target(2).StoreID) + require.True(t, ok) + go func() { - incArgs := incrementArgs([]byte("a"), 23) + incArgs := incrementArgs(key, 23) startWG.Done() defer finishWG.Done() _, pErr := kv.SendWrappedWith( context.Background(), - mtc.stores[2], + tc.GetFirstStoreFromServer(t, 2).TestSender(), roachpb.Header{ Replica: replicaDesc, - Timestamp: mtc.stores[2].Clock().Now(), + Timestamp: tc.Servers[2].Clock().Now(), }, incArgs, ) if _, ok := pErr.GetDetail().(*roachpb.RangeNotFoundError); !ok { @@ -3336,7 +3348,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) { // copy of the group. time.Sleep(100 * time.Millisecond) testutils.SucceedsSoon(t, func() error { - actual := mtc.readIntFromEngines(roachpb.Key("a")) + actual := tc.ReadIntFromStores(key) // Normally, replica GC has not happened yet on store 2, so we // expect {16, 0, 5}. However, it is possible (on a // slow/overloaded machine) for the end of the ChangeReplicas @@ -3359,12 +3371,12 @@ func TestReplicateRogueRemovedNode(t *testing.T) { // lease will cause GC to do a consistent range lookup, where it // will see that the range has been moved and delete the old // replica. - mtc.stores[2].SetReplicaGCQueueActive(true) - mtc.advanceClock(context.Background()) - mtc.manualClock.Increment(int64( + tc.GetFirstStoreFromServer(t, 2).SetReplicaGCQueueActive(true) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) + manualClock.Increment(int64( kvserver.ReplicaGCQueueInactivityThreshold) + 1) - mtc.stores[2].MustForceReplicaGCScanAndProcess() - mtc.waitForValues(roachpb.Key("a"), []int64{16, 0, 0}) + tc.GetFirstStoreFromServer(t, 2).MustForceReplicaGCScanAndProcess() + tc.WaitForValues(t, key, []int64{16, 0, 0}) // Now that the group has been GC'd, the goroutine that was // attempting to write has finished (with an error). @@ -3813,39 +3825,43 @@ func TestRemovedReplicaError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - cfg := kvserver.TestStoreConfig(nil) - cfg.Clock = nil // using manual clock - cfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{ - storeConfig: &cfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - mtc.Start(t, 2) - defer mtc.Stop() - - // Disable the replica GC queues. This verifies that the replica is - // considered removed even before the gc queue has run, and also - // helps avoid a deadlock at shutdown. - mtc.stores[0].SetReplicaGCQueueActive(false) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + // Disable the replica GC queues. This verifies that the replica is + // considered removed even before the gc queue has run, and also + // helps avoid a deadlock at shutdown. + DisableReplicaGCQueue: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) - raftID := roachpb.RangeID(1) - mtc.replicateRange(raftID, 1) - mtc.transferLease(context.Background(), raftID, 0, 1) - mtc.unreplicateRange(raftID, 0) + key := tc.ScratchRangeWithExpirationLease(t) + desc := tc.AddVotersOrFatal(t, key, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + tc.RemoveVotersOrFatal(t, key, tc.Target(0)) - mtc.manualClock.Increment(mtc.storeConfig.LeaseExpiration()) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) // Expect to get a RangeNotFoundError. We have to allow for ambiguous result // errors to avoid the occasional test flake. Since we use demotions to remove // voters, the actual removal sees a learner, and so the learner is not in // the commit quorum for the removal itself. That is to say, we will only // start seeing the RangeNotFoundError after a little bit of time has passed. - getArgs := getArgs([]byte("a")) + getArgs := getArgs(key) testutils.SucceedsSoon(t, func() error { - _, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{}, getArgs) + _, pErr := kv.SendWrappedWith(ctx, store, roachpb.Header{}, getArgs) switch pErr.GetDetail().(type) { case *roachpb.AmbiguousResultError: return pErr.GoError() @@ -3866,43 +3882,25 @@ func TestTransferRaftLeadership(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - const numStores = 3 - sc := kvserver.TestStoreConfig(nil) - sc.TestingKnobs.DisableMergeQueue = true - // Suppress timeout-based elections (which also includes a previous - // leader stepping down due to a quorum check). Running tests on a - // heavily loaded CPU is enough to reach the raft election timeout - // and cause leadership to change hands in ways this test doesn't - // expect. - sc.RaftElectionTimeoutTicks = 100000 - // This test can rapidly advance the clock via mtc.advanceClock(), - // which could lead the replication queue to consider a store dead - // and remove a replica in the middle of the test. Disable the - // replication queue; we'll control replication manually. - sc.TestingKnobs.DisableReplicateQueue = true - sc.Clock = nil // manual clock - mtc := &multiTestContext{ - storeConfig: &sc, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - defer mtc.Stop() - mtc.Start(t, numStores) - store0 := mtc.Store(0) - store1 := mtc.Store(1) - - key := roachpb.Key("a") - - { - // Split off a range to avoid interacting with the initial splits. - splitArgs := adminSplitArgs(key) - if _, err := kv.SendWrapped(context.Background(), mtc.distSenders[0], splitArgs); err != nil { - t.Fatal(err) - } - } + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections (which also includes a previous + // leader stepping down due to a quorum check). Running tests on a + // heavily loaded CPU is enough to reach the raft election timeout + // and cause leadership to change hands in ways this test doesn't + // expect. + RaftElectionTimeoutTicks: 100000, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store0 := tc.GetFirstStoreFromServer(t, 0) + store1 := tc.GetFirstStoreFromServer(t, 1) + key := tc.ScratchRangeWithExpirationLease(t) repl0 := store0.LookupReplica(keys.MustAddr(key)) if repl0 == nil { t.Fatalf("no replica found for key '%s'", key) @@ -3911,7 +3909,7 @@ func TestTransferRaftLeadership(t *testing.T) { if err != nil { t.Fatal(err) } - mtc.replicateRange(repl0.RangeID, 1, 2) + tc.AddVotersOrFatal(t, key, tc.Target(1)) repl1 := store1.LookupReplica(keys.MustAddr(key)) if repl1 == nil { @@ -3922,7 +3920,7 @@ func TestTransferRaftLeadership(t *testing.T) { t.Fatal(err) } - getArgs := getArgs([]byte("a")) + getArgs := getArgs(key) if _, pErr := kv.SendWrappedWith( context.Background(), store0, roachpb.Header{RangeID: repl0.RangeID}, getArgs, ); pErr != nil { @@ -4868,47 +4866,22 @@ func TestAckWriteBeforeApplication(t *testing.T) { func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - sc := kvserver.TestStoreConfig(nil) - // Newly-started stores (including the "rogue" one) should not GC - // their replicas. We'll turn this back on when needed. - sc.TestingKnobs.DisableReplicaGCQueue = true - sc.TestingKnobs.DisableReplicateQueue = true - sc.RaftDelaySplitToSuppressSnapshotTicks = 0 - // Make the tick interval short so we don't need to wait too long for the - // partitioned leader to time out. Also make the - // RangeLeaseRaftElectionTimeout multiplier high so that system ranges - // like node liveness can actually get leases. - sc.RaftTickInterval = 10 * time.Millisecond - sc.RangeLeaseRaftElectionTimeoutMultiplier = 1000 + ctx := context.Background() + noopProposalFilter := kvserverbase.ReplicaProposalFilter(func(args kvserverbase.ProposalFilterArgs) *roachpb.Error { return nil }) var proposalFilter atomic.Value proposalFilter.Store(noopProposalFilter) - sc.TestingKnobs.TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *roachpb.Error { + testingProposalFilter := func(args kvserverbase.ProposalFilterArgs) *roachpb.Error { return proposalFilter.Load().(kvserverbase.ReplicaProposalFilter)(args) } - ctx := context.Background() increment := func(t *testing.T, db *kv.DB, key roachpb.Key, by int64) { b := &kv.Batch{} b.AddRawRequest(incrementArgs(key, by)) require.NoError(t, db.Run(ctx, b)) } - changeReplicas := func( - t *testing.T, db *kv.DB, typ roachpb.ReplicaChangeType, key roachpb.Key, idx int, - ) error { - ri, err := getRangeInfo(ctx, db, key) - require.NoError(t, err) - _, err = db.AdminChangeReplicas(ctx, ri.Desc.StartKey.AsRawKey(), ri.Desc, - roachpb.MakeReplicationChanges(typ, makeReplicationTargets(idx+1)...)) - return err - } - split := func(t *testing.T, db *kv.DB, key roachpb.Key) { - b := &kv.Batch{} - b.AddRawRequest(adminSplitArgs(key)) - require.NoError(t, db.Run(ctx, b)) - } ensureNoTombstone := func(t *testing.T, store *kvserver.Store, rangeID roachpb.RangeID) { var tombstone roachpb.RangeTombstone tombstoneKey := keys.RangeTombstoneKey(rangeID) @@ -4925,7 +4898,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { require.NoError(t, err) return hs } - partitionReplicaOnSplit := func(t *testing.T, mtc *multiTestContext, key roachpb.Key, basePartition *mtcPartitionedRange, partRange **mtcPartitionedRange) { + partitionReplicaOnSplit := func(t *testing.T, tc *testcluster.TestCluster, key roachpb.Key, basePartition *testClusterPartitionedRange, partRange **testClusterPartitionedRange) { // Set up a hook to partition the RHS range at its initial range ID // before proposing the split trigger. var setupOnce sync.Once @@ -4947,7 +4920,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { replDesc, ok := split.RightDesc.GetReplicaDescriptor(1) require.True(t, ok) var err error - *partRange, err = basePartition.extend(mtc, split.RightDesc.RangeID, replDesc.ReplicaID, + *partRange, err = basePartition.extend(tc, split.RightDesc.RangeID, replDesc.ReplicaID, 0 /* partitionedNode */, true /* activated */, unreliableRaftHandlerFuncs{}) require.NoError(t, err) proposalFilter.Store(noopProposalFilter) @@ -4965,52 +4938,83 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // partitioned and whether the server is killed. In all cases we want the // split to succeed and the RHS to eventually also be on all 3 nodes. setup := func(t *testing.T) ( - mtc *multiTestContext, + tc *testcluster.TestCluster, db *kv.DB, keyA, keyB roachpb.Key, lhsID roachpb.RangeID, - lhsPartition *mtcPartitionedRange, + lhsPartition *testClusterPartitionedRange, + stickyEngineRegistry server.StickyInMemEnginesRegistry, ) { - mtc = &multiTestContext{ - storeConfig: &sc, + stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + const numServers int = 3 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + }, + Store: &kvserver.StoreTestingKnobs{ + // Newly-started stores (including the "rogue" one) should not GC + // their replicas. We'll turn this back on when needed. + DisableReplicaGCQueue: true, + TestingProposalFilter: testingProposalFilter, + }, + }, + RaftConfig: base.RaftConfig{ + RaftDelaySplitToSuppressSnapshotTicks: 0, + // Make the tick interval short so we don't need to wait too long for the + // partitioned leader to time out. Also make the + // RangeLeaseRaftElectionTimeout multiplier high so that system ranges + // like node liveness can actually get leases. + RaftTickInterval: 10 * time.Millisecond, + RangeLeaseRaftElectionTimeoutMultiplier: 1000, + }, + } } - mtc.Start(t, 3) - db = mtc.Store(1).DB() + tc = testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + + db = tc.GetFirstStoreFromServer(t, 1).DB() // Split off a non-system range so we don't have to account for node liveness // traffic. - scratchTableKey := keys.SystemSQLCodec.TablePrefix(math.MaxUint32) + scratchTableKey := tc.ScratchRangeWithExpirationLease(t) // Put some data in the range so we'll have something to test for. keyA = append(append(roachpb.Key{}, scratchTableKey...), 'a') keyB = append(append(roachpb.Key{}, scratchTableKey...), 'b') - - split(t, db, scratchTableKey) - ri, err := getRangeInfo(ctx, db, scratchTableKey) - require.Nil(t, err) - lhsID = ri.Desc.RangeID // First put the range on all three nodes. - mtc.replicateRange(lhsID, 1, 2) + desc := tc.AddVotersOrFatal(t, scratchTableKey, tc.Targets(1, 2)...) // Set up a partition for the LHS range only. Initially it is not active. - lhsPartition, err = setupPartitionedRange(mtc, lhsID, + lhsPartition, err := setupPartitionedRange(tc, desc.RangeID, 0 /* replicaID */, 0 /* partitionedNode */, false /* activated */, unreliableRaftHandlerFuncs{}) require.NoError(t, err) // Wait for all nodes to catch up. increment(t, db, keyA, 5) - mtc.waitForValues(keyA, []int64{5, 5, 5}) + tc.WaitForValues(t, keyA, []int64{5, 5, 5}) // Transfer the lease off of node 0. - mtc.transferLease(ctx, lhsID, 0, 2) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(2)) // Make sure everybody knows about that transfer. increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 6, 6}) + tc.WaitForValues(t, keyA, []int64{6, 6, 6}) lhsPartition.activate() increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 7, 7}) - return mtc, db, keyA, keyB, lhsID, lhsPartition + tc.WaitForValues(t, keyA, []int64{6, 7, 7}) + return tc, db, keyA, keyB, lhsID, lhsPartition, stickyEngineRegistry } // In this case we only have the LHS partitioned. The RHS will learn about its @@ -5019,19 +5023,20 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // partition the RHS and ensure that the split does not clobber the RHS's hard // state. t.Run("(1) no RHS partition", func(t *testing.T) { - mtc, db, keyA, keyB, _, lhsPartition := setup(t) - defer mtc.Stop() + tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) + defer stickyEngineRegistry.CloseAllStickyInMemEngines() - split(t, db, keyB) + defer tc.Stopper().Stop(ctx) + tc.SplitRangeOrFatal(t, keyB) // Write a value which we can observe to know when the split has been // applied by the LHS. increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 8, 8}) + tc.WaitForValues(t, keyA, []int64{6, 8, 8}) increment(t, db, keyB, 6) // Wait for all non-partitioned nodes to catch up. - mtc.waitForValues(keyB, []int64{0, 6, 6}) + tc.WaitForValues(t, keyB, []int64{0, 6, 6}) rhsInfo, err := getRangeInfo(ctx, db, keyB) require.NoError(t, err) @@ -5041,11 +5046,11 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // Remove and re-add the RHS to create a new uninitialized replica at // a higher replica ID. This will lead to a tombstone being written. - require.NoError(t, changeReplicas(t, db, roachpb.REMOVE_VOTER, keyB, 0)) + tc.RemoveVotersOrFatal(t, keyB, tc.Target(0)) // Unsuccessful because the RHS will not accept the learner snapshot // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. - err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err = tc.AddVoters(keyB, tc.Target(0)) require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Without a partitioned RHS we'll end up always writing a tombstone here because @@ -5053,33 +5058,35 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // raft message when the other nodes split and then after the above call // it will find out about its new replica ID and write a tombstone for the // old one. - waitForTombstone(t, mtc.Store(0).Engine(), rhsID) + waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).Engine(), rhsID) lhsPartition.deactivate() - mtc.waitForValues(keyA, []int64{8, 8, 8}) - hs := getHardState(t, mtc.Store(0), rhsID) + tc.WaitForValues(t, keyA, []int64{8, 8, 8}) + hs := getHardState(t, tc.GetFirstStoreFromServer(t, 0), rhsID) require.Equal(t, uint64(0), hs.Commit) testutils.SucceedsSoon(t, func() error { - return changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err := tc.AddVoters(keyB, tc.Target(0)) + return err }) - mtc.waitForValues(keyB, []int64{6, 6, 6}) + tc.WaitForValues(t, keyB, []int64{6, 6, 6}) }) // This case is like the previous case except the store crashes after // laying down a tombstone. t.Run("(2) no RHS partition, with restart", func(t *testing.T) { - mtc, db, keyA, keyB, _, lhsPartition := setup(t) - defer mtc.Stop() + tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) - split(t, db, keyB) + tc.SplitRangeOrFatal(t, keyB) // Write a value which we can observe to know when the split has been // applied by the LHS. increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 8, 8}) + tc.WaitForValues(t, keyA, []int64{6, 8, 8}) increment(t, db, keyB, 6) // Wait for all non-partitioned nodes to catch up. - mtc.waitForValues(keyB, []int64{0, 6, 6}) + tc.WaitForValues(t, keyB, []int64{0, 6, 6}) rhsInfo, err := getRangeInfo(ctx, db, keyB) require.NoError(t, err) @@ -5089,11 +5096,11 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // Remove and re-add the RHS to create a new uninitialized replica at // a higher replica ID. This will lead to a tombstone being written. - require.NoError(t, changeReplicas(t, db, roachpb.REMOVE_VOTER, keyB, 0)) + tc.RemoveVotersOrFatal(t, keyB, tc.Target(0)) // Unsuccessfuly because the RHS will not accept the learner snapshot // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. - err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err = tc.AddVoters(keyB, tc.Target(0)) require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Without a partitioned RHS we'll end up always writing a tombstone here because @@ -5101,7 +5108,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // raft message when the other nodes split and then after the above call // it will find out about its new replica ID and write a tombstone for the // old one. - waitForTombstone(t, mtc.Store(0).Engine(), rhsID) + waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).Engine(), rhsID) // We do all of this incrementing to ensure that nobody will ever // succeed in sending a message the new RHS replica after we restart @@ -5112,21 +5119,22 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { for curB < 100 { curB++ increment(t, db, keyB, 1) - mtc.waitForValues(keyB, []int64{0, curB, curB}) + tc.WaitForValues(t, keyB, []int64{0, curB, curB}) } // Restart store 0 so that it forgets about the newer replicaID. - mtc.stopStore(0) - mtc.restartStore(0) + tc.StopServer(0) + require.NoError(t, tc.RestartServer(0)) lhsPartition.deactivate() - mtc.waitForValues(keyA, []int64{8, 8, 8}) - hs := getHardState(t, mtc.Store(0), rhsID) + tc.WaitForValues(t, keyA, []int64{8, 8, 8}) + hs := getHardState(t, tc.GetFirstStoreFromServer(t, 0), rhsID) require.Equal(t, uint64(0), hs.Commit) testutils.SucceedsSoon(t, func() error { - return changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err := tc.AddVoters(keyB, tc.Target(0)) + return err }) - mtc.waitForValues(keyB, []int64{curB, curB, curB}) + tc.WaitForValues(t, keyB, []int64{curB, curB, curB}) }) // In this case the RHS will be partitioned from hearing anything about @@ -5135,20 +5143,21 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // the split is processed. We partition the RHS's new replica ID before // processing the split to ensure that the RHS doesn't get initialized. t.Run("(3) initial replica RHS partition, no restart", func(t *testing.T) { - mtc, db, keyA, keyB, _, lhsPartition := setup(t) - defer mtc.Stop() - var rhsPartition *mtcPartitionedRange - partitionReplicaOnSplit(t, mtc, keyB, lhsPartition, &rhsPartition) - split(t, db, keyB) + tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + var rhsPartition *testClusterPartitionedRange + partitionReplicaOnSplit(t, tc, keyB, lhsPartition, &rhsPartition) + tc.SplitRangeOrFatal(t, keyB) // Write a value which we can observe to know when the split has been // applied by the LHS. increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 8, 8}) + tc.WaitForValues(t, keyA, []int64{6, 8, 8}) increment(t, db, keyB, 6) // Wait for all non-partitioned nodes to catch up. - mtc.waitForValues(keyB, []int64{0, 6, 6}) + tc.WaitForValues(t, keyB, []int64{0, 6, 6}) rhsInfo, err := getRangeInfo(ctx, db, keyB) require.NoError(t, err) @@ -5158,53 +5167,55 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // Remove and re-add the RHS to create a new uninitialized replica at // a higher replica ID. This will lead to a tombstone being written. - require.NoError(t, changeReplicas(t, db, roachpb.REMOVE_VOTER, keyB, 0)) + tc.RemoveVotersOrFatal(t, keyB, tc.Target(0)) // Unsuccessful because the RHS will not accept the learner snapshot // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. - err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err = tc.AddVoters(keyB, tc.Target(0)) require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Ensure that the replica exists with the higher replica ID. - repl, err := mtc.Store(0).GetReplica(rhsInfo.Desc.RangeID) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rhsInfo.Desc.RangeID) require.NoError(t, err) require.Equal(t, repl.ReplicaID(), rhsInfo.Desc.NextReplicaID) rhsPartition.addReplica(rhsInfo.Desc.NextReplicaID) // Ensure that there's no tombstone. // The RHS on store 0 never should have heard about its original ID. - ensureNoTombstone(t, mtc.Store(0), rhsID) + ensureNoTombstone(t, tc.GetFirstStoreFromServer(t, 0), rhsID) lhsPartition.deactivate() - mtc.waitForValues(keyA, []int64{8, 8, 8}) - hs := getHardState(t, mtc.Store(0), rhsID) + tc.WaitForValues(t, keyA, []int64{8, 8, 8}) + hs := getHardState(t, tc.GetFirstStoreFromServer(t, 0), rhsID) require.Equal(t, uint64(0), hs.Commit) // Now succeed in adding the RHS. Use SucceedsSoon because in rare cases // the learner snapshot can fail due to a race with a raft snapshot from // a raft leader on a different node. testutils.SucceedsSoon(t, func() error { - return changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err := tc.AddVoters(keyB, tc.Target(0)) + return err }) - mtc.waitForValues(keyB, []int64{6, 6, 6}) + tc.WaitForValues(t, keyB, []int64{6, 6, 6}) }) // This case is set up like the previous one except after the RHS learns about // its higher replica ID the store crahes and forgets. The RHS replica gets // initialized by the split. t.Run("(4) initial replica RHS partition, with restart", func(t *testing.T) { - mtc, db, keyA, keyB, _, lhsPartition := setup(t) - defer mtc.Stop() - var rhsPartition *mtcPartitionedRange + tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + var rhsPartition *testClusterPartitionedRange - partitionReplicaOnSplit(t, mtc, keyB, lhsPartition, &rhsPartition) - split(t, db, keyB) + partitionReplicaOnSplit(t, tc, keyB, lhsPartition, &rhsPartition) + tc.SplitRangeOrFatal(t, keyB) // Write a value which we can observe to know when the split has been // applied by the LHS. increment(t, db, keyA, 1) - mtc.waitForValues(keyA, []int64{6, 8, 8}) + tc.WaitForValues(t, keyA, []int64{6, 8, 8}) increment(t, db, keyB, 6) // Wait for all non-partitioned nodes to catch up. - mtc.waitForValues(keyB, []int64{0, 6, 6}) + tc.WaitForValues(t, keyB, []int64{0, 6, 6}) rhsInfo, err := getRangeInfo(ctx, db, keyB) require.NoError(t, err) @@ -5214,15 +5225,15 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // Remove and re-add the RHS to create a new uninitialized replica at // a higher replica ID. This will lead to a tombstone being written. - require.NoError(t, changeReplicas(t, db, roachpb.REMOVE_VOTER, keyB, 0)) + tc.RemoveVotersOrFatal(t, keyB, tc.Target(0)) // Unsuccessfuly because the RHS will not accept the learner snapshot // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. - err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err = tc.AddVoters(keyB, tc.Target(0)) require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Ensure that there's no tombstone. // The RHS on store 0 never should have heard about its original ID. - ensureNoTombstone(t, mtc.Store(0), rhsID) + ensureNoTombstone(t, tc.GetFirstStoreFromServer(t, 0), rhsID) // Now, before we deactivate the LHS partition, partition the newer replica // on the RHS too. @@ -5237,19 +5248,19 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { for curB < 100 { curB++ increment(t, db, keyB, 1) - mtc.waitForValues(keyB, []int64{0, curB, curB}) + tc.WaitForValues(t, keyB, []int64{0, curB, curB}) } - mtc.stopStore(0) - mtc.restartStore(0) + tc.StopServer(0) + require.NoError(t, tc.RestartServer(0)) lhsPartition.deactivate() - mtc.waitForValues(keyA, []int64{8, 8, 8}) + tc.WaitForValues(t, keyA, []int64{8, 8, 8}) // In this case the store has forgotten that it knew the RHS of the split // could not exist. We ensure that it has been initialized to the initial // commit value, which is 10. testutils.SucceedsSoon(t, func() error { - hs := getHardState(t, mtc.Store(0), rhsID) + hs := getHardState(t, tc.GetFirstStoreFromServer(t, 0), rhsID) if hs.Commit != uint64(10) { return errors.Errorf("hard state not yet initialized: got %v, expected %v", hs.Commit, uint64(10)) @@ -5258,9 +5269,10 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { }) rhsPartition.deactivate() testutils.SucceedsSoon(t, func() error { - return changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) + _, err := tc.AddVoters(keyB, tc.Target(0)) + return err }) - mtc.waitForValues(keyB, []int64{curB, curB, curB}) + tc.WaitForValues(t, keyB, []int64{curB, curB, curB}) }) } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 224c87e00026..f336a986f929 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1649,3 +1649,29 @@ func waitForTombstone( }) return tombstone } + +// This test is here to please the unused code linter, and will be removed in +// the next commit. +func TestDummyMultiTestContext(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + cfg := kvserver.TestStoreConfig(nil) + cfg.TestingKnobs.DisableReplicateQueue = true + mtc := &multiTestContext{storeConfig: &cfg} + defer mtc.Stop() + mtc.Start(t, 3) + + key := []byte("a") + mtc.getRaftLeader(1) + + incArgs := incrementArgs(key, 5) + if _, err := kv.SendWrapped(ctx, mtc.Store(0).TestSender(), incArgs); err != nil { + t.Fatal(err) + } + + mtc.waitForValues(key, []int64{5, 0, 0}) + mtc.stopStore(1) + mtc.restartStore(1) +} diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index dcebe198d824..ab23f3613cfa 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1060,10 +1060,10 @@ func (tc *TestCluster) ToggleReplicateQueues(active bool) { } } -// readIntFromStores reads the current integer value at the given key +// ReadIntFromStores reads the current integer value at the given key // from all configured engines, filling in zeros when the value is not // found. -func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 { +func (tc *TestCluster) ReadIntFromStores(key roachpb.Key) []int64 { results := make([]int64, len(tc.Servers)) for i, server := range tc.Servers { err := server.Stores().VisitStores(func(s *kvserver.Store) error { @@ -1094,7 +1094,7 @@ func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 { func (tc *TestCluster) WaitForValues(t testing.TB, key roachpb.Key, expected []int64) { t.Helper() testutils.SucceedsSoon(t, func() error { - actual := tc.readIntFromStores(key) + actual := tc.ReadIntFromStores(key) if !reflect.DeepEqual(expected, actual) { return errors.Errorf("expected %v, got %v", expected, actual) } diff --git a/pkg/util/leaktest/leaktest.go b/pkg/util/leaktest/leaktest.go index cd880108a388..007522cfc73e 100644 --- a/pkg/util/leaktest/leaktest.go +++ b/pkg/util/leaktest/leaktest.go @@ -94,6 +94,8 @@ var PrintLeakedStoppers = func(t testing.TB) {} // function to be run at the end of tests to see whether any // goroutines leaked. func AfterTest(t testing.TB) func() { + // Try a best effort GC to help the race tests move along. + runtime.GC() orig := interestingGoroutines() return func() { t.Helper()