From 2f79490d367200acafba13d56ea2c3f292d026dc Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 4 Aug 2023 11:47:00 -0400 Subject: [PATCH] kvserver: refactor tests to avoid reading from stopped servers' engines Epic: none Release note: none --- pkg/cli/democluster/demo_cluster.go | 10 -- pkg/kv/kvserver/client_metrics_test.go | 61 +++++------- pkg/kv/kvserver/client_raft_test.go | 99 +++++-------------- .../client_replica_circuit_breaker_test.go | 3 - pkg/kv/kvserver/client_split_test.go | 19 +--- .../kvserver/flow_control_integration_test.go | 9 +- pkg/kv/kvserver/helpers_test.go | 4 +- pkg/testutils/testcluster/testcluster.go | 10 +- 8 files changed, 69 insertions(+), 146 deletions(-) diff --git a/pkg/cli/democluster/demo_cluster.go b/pkg/cli/democluster/demo_cluster.go index add103430831..ab4ffcdcf368 100644 --- a/pkg/cli/democluster/demo_cluster.go +++ b/pkg/cli/democluster/demo_cluster.go @@ -307,16 +307,6 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { } rpcAddrReadyChs[i] = rpcAddrReady } - - // Ensure we close all sticky stores we've created when the stopper - // instructs the entire cluster to stop. We do this only here - // because we want this closer to be registered after all the - // individual servers' Stop() methods have been registered - // via createAndAddNode() above. - c.stopper.AddCloser(stop.CloserFn(func() { - c.stickyVFSRegistry.CloseAllEngines() - })) - // Start the remaining nodes asynchronously. for i := 1; i < c.demoCtx.NumNodes; i++ { if err := c.startNodeAsync(ctx, i, errCh, timeoutCh); err != nil { diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 2c06a2dc0796..f450957d4d02 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "strconv" - "sync" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -50,14 +49,11 @@ func checkGauge(t *testing.T, id string, g gaugeValuer, e int64) { } } -// verifyStatsOnStoppedServer checks a sets of stats on the specified list of servers. This method -// may produce false negatives when executed against a running server that has -// live traffic on it. -func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) { +// verifyStatsOnServers checks a sets of stats on the specified +// list of servers. +func verifyStatsOnServers(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) { t.Helper() var stores []*kvserver.Store - var wg sync.WaitGroup - for _, storeIdx := range storeIdxSlice { stores = append(stores, tc.GetFirstStoreFromServer(t, storeIdx)) } @@ -75,21 +71,25 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store }) } - wg.Add(len(storeIdxSlice)) - // We actually stop *all* of the Servers. Stopping only a few is riddled - // with deadlocks since operations can span nodes, but stoppers don't - // know about this - taking all of them down at the same time is the - // only sane way of guaranteeing that nothing interesting happens, at - // least when bringing down the nodes jeopardizes majorities. - for _, storeIdx := range storeIdxSlice { - go func(i int) { - defer wg.Done() - tc.StopServer(i) - }(storeIdx) + // Acquire readers into all three stores. + // TODO(jackson): This still leaves an opening for a flake; I + // believe the previous code that stopped all servers also was + // susceptible to flakes. + consistentIters := make([]storage.Reader, len(stores)) + defer func() { + for i := range consistentIters { + if consistentIters[i] != nil { + consistentIters[i].Close() + } + } + }() + for i, s := range stores { + ro := s.TODOEngine().NewReadOnly(storage.StandardDurability) + require.NoError(t, ro.PinEngineStateForIterators()) + consistentIters[i] = ro } - wg.Wait() - for _, s := range stores { + for i, s := range stores { idString := s.Ident.String() m := s.Metrics() @@ -100,7 +100,7 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store } // Compute real total MVCC statistics from store. - realStats, err := s.ComputeMVCCStats() + realStats, err := s.ComputeMVCCStats(consistentIters[i]) if err != nil { t.Fatal(err) } @@ -127,12 +127,7 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store } if t.Failed() { - t.Fatalf("verifyStatsOnStoppedServer failed, aborting test.") - } - - // Restart all Stores. - for _, storeIdx := range storeIdxSlice { - require.NoError(t, tc.RestartServer(storeIdx)) + t.Fatalf("verifyStatsOnServers failed, aborting test.") } } @@ -265,8 +260,6 @@ func TestStoreMetrics(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -284,7 +277,7 @@ func TestStoreMetrics(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ DisableRaftLogQueue: true, @@ -322,7 +315,7 @@ func TestStoreMetrics(t *testing.T) { require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...)) // Verify stats on store1 after replication. - verifyStatsOnStoppedServer(t, tc, 1) + verifyStatsOnServers(t, tc, 1) // Add some data to the "right" range. rangeKeyStart, rangeKeyEnd := key, key.Next() @@ -340,7 +333,7 @@ func TestStoreMetrics(t *testing.T) { // do that given all if the system table activity generated by the TestCluster. // We use Servers[1] and Servers[2] instead, since we can control the traffic // on those servers. - verifyStatsOnStoppedServer(t, tc, 1, 2) + verifyStatsOnServers(t, tc, 1, 2) // Create a transaction statement that fails. Regression test for #4969. if err := tc.GetFirstStoreFromServer(t, 0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -354,7 +347,7 @@ func TestStoreMetrics(t *testing.T) { } // Verify stats after addition. - verifyStatsOnStoppedServer(t, tc, 1, 2) + verifyStatsOnServers(t, tc, 1, 2) checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1) tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1)) testutils.SucceedsSoon(t, func() error { @@ -373,7 +366,7 @@ func TestStoreMetrics(t *testing.T) { checkGauge(t, "store 1", tc.GetFirstStoreFromServer(t, 1).Metrics().ReplicaCount, 1) // Verify all stats on all stores after range is removed. - verifyStatsOnStoppedServer(t, tc, 1, 2) + verifyStatsOnServers(t, tc, 1, 2) verifyStorageStats(t, tc.GetFirstStoreFromServer(t, 1)) verifyStorageStats(t, tc.GetFirstStoreFromServer(t, 2)) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 42c93ef28506..8cd68a3dbec4 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -95,7 +95,6 @@ func TestStoreRecoverFromEngine(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -113,7 +112,7 @@ func TestStoreRecoverFromEngine(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, }, @@ -670,10 +669,6 @@ func TestSnapshotAfterTruncation(t *testing.T) { name = "differentTerm" } t.Run(name, func(t *testing.T) { - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -689,7 +684,7 @@ func TestSnapshotAfterTruncation(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, } @@ -737,7 +732,7 @@ func TestSnapshotAfterTruncation(t *testing.T) { t.Fatal(err) } - tc.WaitForValues(t, key, []int64{incAB, incA, incAB}) + tc.WaitForValues(t, key, []int64{incAB, 0 /* stopped server */, incAB}) repl0 := store.LookupReplica(key) index := repl0.GetLastIndex() @@ -1721,11 +1716,6 @@ func TestConcurrentRaftSnapshots(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -1738,7 +1728,7 @@ func TestConcurrentRaftSnapshots(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, } @@ -1785,7 +1775,7 @@ func TestConcurrentRaftSnapshots(t *testing.T) { t.Fatal(err) } - tc.WaitForValues(t, key, []int64{incAB, incA, incA, incAB, incAB}) + tc.WaitForValues(t, key, []int64{incAB, 0 /* stopped */, 0 /* stopped */, incAB, incAB}) index := repl.GetLastIndex() @@ -1809,8 +1799,6 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyVFSRegistry := server.NewStickyVFSRegistry() - const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -1823,7 +1811,7 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ // Disable the replica GC queue so that it doesn't accidentally pick up the @@ -1890,8 +1878,6 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyVFSRegistry := server.NewStickyVFSRegistry() - raftConfig := base.RaftConfig{ // Drop the raft tick interval so the Raft group is ticked more. RaftTickInterval: 10 * time.Millisecond, @@ -1919,7 +1905,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { RaftConfig: raftConfig, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ // Disable leader transfers during leaseholder changes so that we @@ -2212,11 +2198,6 @@ func TestProgressWithDownNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -2229,7 +2210,7 @@ func TestProgressWithDownNode(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, } @@ -2263,7 +2244,7 @@ func TestProgressWithDownNode(t *testing.T) { } // The new increment can be seen on both live replicas. - tc.WaitForValues(t, key, []int64{16, 5, 16}) + tc.WaitForValues(t, key, []int64{16, 0 /* stopped */, 16}) // Once the downed node is restarted, it will catch up. require.NoError(t, tc.RestartServer(1)) @@ -2299,8 +2280,6 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -2313,7 +2292,7 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), WallClock: manualClock, }, }, @@ -2371,7 +2350,7 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA t.Fatal(err) } - tc.WaitForValues(t, key, []int64{5, 2, 5}) + tc.WaitForValues(t, key, []int64{5, 0 /* stopped */, 5}) // Re-add the store and restart it. // TODO(dt): ben originally suggested we also attempt this in the other order. @@ -2404,11 +2383,6 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -2421,7 +2395,7 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ @@ -2467,7 +2441,7 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { tc.AddVotersOrFatal(t, key, tc.Target(2)) } // The first increment is visible on the new replica. - tc.WaitForValues(t, key, []int64{inc1, inc1, inc1, inc1}) + tc.WaitForValues(t, key, []int64{inc1, 0 /* stopped */, inc1, inc1}) // Ensure that the rest of the group can make progress. inc2 := int64(11) @@ -2477,7 +2451,7 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { t.Fatal(err) } } - tc.WaitForValues(t, key, []int64{inc1 + inc2, inc1, inc1 + inc2, inc1 + inc2}) + tc.WaitForValues(t, key, []int64{inc1 + inc2, 0 /* stopped */, inc1 + inc2, inc1 + inc2}) // Bring the downed store back up (required for a clean shutdown). require.NoError(t, tc.RestartServer(1)) @@ -3508,11 +3482,6 @@ func TestReplicateRogueRemovedNode(t *testing.T) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -3525,7 +3494,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ @@ -3585,7 +3554,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) { tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess() actual := tc.ReadIntFromStores(key) - expected := []int64{16, 0, 5} + expected := []int64{16, 0, 0 /* stopped */} if !reflect.DeepEqual(expected, actual) { return errors.Errorf("expected %v, got %v", expected, actual) } @@ -3857,11 +3826,6 @@ func TestReplicaTooOldGC(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -3874,7 +3838,7 @@ func TestReplicaTooOldGC(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ DisableScanner: true, @@ -3920,7 +3884,7 @@ func TestReplicaTooOldGC(t *testing.T) { if _, err := kv.SendWrapped(ctx, store.TestSender(), incArgs); err != nil { t.Fatal(err) } - tc.WaitForValues(t, key, []int64{16, 16, 16, 5}) + tc.WaitForValues(t, key, []int64{16, 16, 16, 0 /* stopped */}) // Wait for a bunch of raft ticks in order to flush any heartbeats through // the system. In particular, a coalesced heartbeat containing a quiesce @@ -3959,11 +3923,6 @@ func TestReplicateReAddAfterDown(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -3976,7 +3935,7 @@ func TestReplicateReAddAfterDown(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, } @@ -4017,7 +3976,7 @@ func TestReplicateReAddAfterDown(t *testing.T) { if _, err := kv.SendWrapped(ctx, store.TestSender(), incArgs); err != nil { t.Fatal(err) } - tc.WaitForValues(t, key, []int64{16, 16, 5}) + tc.WaitForValues(t, key, []int64{16, 16, 0 /* stopped */}) // Bring it back up and re-add the range. There is a race when the // store applies its removal and re-addition back to back: the @@ -5238,9 +5197,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { keyA, keyB roachpb.Key, lhsID roachpb.RangeID, lhsPartition *testClusterPartitionedRange, - stickyVFSRegistry server.StickyVFSRegistry, ) { - stickyVFSRegistry = server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -5254,7 +5211,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ // Newly-started stores (including the "rogue" one) should not GC @@ -5308,7 +5265,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { increment(t, db, keyA, 1) tc.WaitForValues(t, keyA, []int64{6, 7, 7}) - return tc, db, keyA, keyB, lhsID, lhsPartition, stickyVFSRegistry + return tc, db, keyA, keyB, lhsID, lhsPartition } // In this case we only have the LHS partitioned. The RHS will learn about its @@ -5317,8 +5274,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // partition the RHS and ensure that the split does not clobber the RHS's hard // state. t.Run("(1) no RHS partition", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) - defer stickyVFSRegistry.CloseAllEngines() + tc, db, keyA, keyB, _, lhsPartition := setup(t) defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5370,8 +5326,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // This case is like the previous case except the store crashes after // laying down a tombstone. t.Run("(2) no RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) - defer stickyVFSRegistry.CloseAllEngines() + tc, db, keyA, keyB, _, lhsPartition := setup(t) defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5443,8 +5398,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // the split is processed. We partition the RHS's new replica ID before // processing the split to ensure that the RHS doesn't get initialized. t.Run("(3) initial replica RHS partition, no restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) - defer stickyVFSRegistry.CloseAllEngines() + tc, db, keyA, keyB, _, lhsPartition := setup(t) defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange partitionReplicaOnSplit(t, tc, keyB, lhsPartition, &rhsPartition) @@ -5505,8 +5459,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // its higher replica ID the store crashes and forgets. The RHS replica gets // initialized by the split. t.Run("(4) initial replica RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) - defer stickyVFSRegistry.CloseAllEngines() + tc, db, keyA, keyB, _, lhsPartition := setup(t) defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 5bf9df94eec5..efb1a404c642 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -36,7 +36,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -818,8 +817,6 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { }, } tc := testcluster.StartTestCluster(t, 2, args) - tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllEngines)) - _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '45s'`) require.NoError(t, err) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index c2f548d6fc72..61fde6f6b827 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -1509,8 +1509,9 @@ func runSetupSplitSnapshotRace( t.Fatal(pErr) } - // Store 3 still has the old value, but 4 and 5 are up to date. - tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 2, 5, 5}) + // Store 3 still has the old value (but it's offline), and 4 and 5 + // are up to date. + tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 0 /* stopped */, 5, 5}) // Scan the meta ranges to resolve all intents if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSenderI().(kv.Sender), @@ -1541,12 +1542,7 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - - runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, server.NewStickyVFSRegistry(), func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the left range up first so that the split happens before it sees a snapshot. for i := 1; i <= 3; i++ { require.NoError(t, tc.RestartServer(i)) @@ -1580,12 +1576,7 @@ func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): Currently this test uses ReuseEngines because - // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - - runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, server.NewStickyVFSRegistry(), func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the right range up first. for i := 3; i <= 5; i++ { require.NoError(t, tc.RestartServer(i)) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index ad148db2f4e4..8d658d14872a 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -763,11 +763,6 @@ func TestFlowControlRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(jackson): It's unclear if this test can be refactor to omit the - // ReuseEnginesDeprecated option. - stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) - defer stickyVFSRegistry.CloseAllEngines() - const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) var maintainStreamsForBehindFollowers atomic.Bool @@ -796,7 +791,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyVFSRegistry: stickyVFSRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ @@ -965,7 +960,7 @@ ORDER BY name ASC; FROM crdb_internal.kv_flow_control_handles `, "range_id", "store_id", "total_tracked_tokens") - tc.WaitForValues(t, k, []int64{incAB, incA, incA, incAB, incAB}) + tc.WaitForValues(t, k, []int64{incAB, 0 /* stopped */, 0 /* stopped */, incAB, incAB}) index := repl.GetLastIndex() h.comment(`-- (Truncating raft log.)`) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 5ea7b5e0678b..39d7c870e0ea 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -81,14 +81,14 @@ func (s *Store) AddReplica(repl *Replica) error { // ComputeMVCCStats immediately computes correct total MVCC usage statistics // for the store, returning the computed values (but without modifying the // store). -func (s *Store) ComputeMVCCStats() (enginepb.MVCCStats, error) { +func (s *Store) ComputeMVCCStats(reader storage.Reader) (enginepb.MVCCStats, error) { var totalStats enginepb.MVCCStats var err error now := s.Clock().PhysicalNow() newStoreReplicaVisitor(s).Visit(func(r *Replica) bool { var stats enginepb.MVCCStats - stats, err = rditer.ComputeStatsForRange(r.Desc(), s.TODOEngine(), now) + stats, err = rditer.ComputeStatsForRange(r.Desc(), reader, now) if err != nil { return false } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index ff7c6babb0f8..f63a24ecc247 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1605,11 +1605,15 @@ func (tc *TestCluster) ToggleReplicateQueues(active bool) { } // ReadIntFromStores reads the current integer value at the given key -// from all configured engines, filling in zeros when the value is not -// found. +// from all configured engines on un-stopped servers, filling in zeros +// when the value is not found. func (tc *TestCluster) ReadIntFromStores(key roachpb.Key) []int64 { results := make([]int64, len(tc.Servers)) for i, server := range tc.Servers { + // Skip stopped servers, leaving their value as zero. + if tc.ServerStopped(i) { + continue + } err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { valRes, err := storage.MVCCGet(context.Background(), s.TODOEngine(), key, server.Clock().Now(), storage.MVCCGetOptions{}) @@ -1725,7 +1729,7 @@ func (tc *TestCluster) RestartServerWithInspect( for i, specs := range serverArgs.StoreSpecs { if specs.InMemory && specs.StickyVFSID == "" { - return errors.Errorf("failed to restart Server %d, because a restart can only be used on a server with a sticky engine", i) + return errors.Errorf("failed to restart Server %d, because a restart can only be used on a server with a sticky VFS", i) } } s, err := serverutils.NewServer(serverArgs)