From c0aa6328b19a2c15c9dee3b23ea2ae753a096f54 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 14 Nov 2020 13:11:59 +0100 Subject: [PATCH] stopper: assert that stoppers don't leak Until now leaked stoppers were just reported in tests but did not cause failures. This PR switches the check to cause errors instead. Release note: None --- pkg/cli/demo_test.go | 39 +++++++++++-------- .../client/requestbatcher/batcher_test.go | 4 +- pkg/kv/kvclient/kvcoord/range_cache_test.go | 16 +++++++- pkg/kv/kvserver/client_merge_test.go | 1 + .../closedts/container/container_test.go | 1 + pkg/kv/kvserver/rangefeed/processor_test.go | 4 +- pkg/sql/distsql_physical_planner_test.go | 4 +- pkg/util/leaktest/leaktest.go | 2 - pkg/util/stop/stopper.go | 2 +- pkg/util/stop/stopper_test.go | 1 + 10 files changed, 50 insertions(+), 24 deletions(-) diff --git a/pkg/cli/demo_test.go b/pkg/cli/demo_test.go index 00ddb2799941..c76c653e4d54 100644 --- a/pkg/cli/demo_test.go +++ b/pkg/cli/demo_test.go @@ -11,6 +11,7 @@ package cli import ( + "context" "fmt" "testing" @@ -65,27 +66,31 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { }, } - for _, tc := range testCases { - demoCtxTemp := demoCtx - demoCtx.sqlPoolMemorySize = tc.sqlPoolMemorySize - demoCtx.cacheSize = tc.cacheSize + for i, tc := range testCases { + t.Run(fmt.Sprint(i), func(t *testing.T) { + demoCtxTemp := demoCtx + demoCtx.sqlPoolMemorySize = tc.sqlPoolMemorySize + demoCtx.cacheSize = tc.cacheSize - actual := testServerArgsForTransientCluster(unixSocketDetails{}, tc.nodeID, tc.joinAddr, "") + actual := testServerArgsForTransientCluster(unixSocketDetails{}, tc.nodeID, tc.joinAddr, "") + stopper := actual.Stopper + defer stopper.Stop(context.Background()) - assert.Len(t, actual.StoreSpecs, 1) - assert.Equal( - t, - fmt.Sprintf("demo-node%d", tc.nodeID), - actual.StoreSpecs[0].StickyInMemoryEngineID, - ) + assert.Len(t, actual.StoreSpecs, 1) + assert.Equal( + t, + fmt.Sprintf("demo-node%d", tc.nodeID), + actual.StoreSpecs[0].StickyInMemoryEngineID, + ) - // We cannot compare these. - actual.Stopper = nil - actual.StoreSpecs = nil + // We cannot compare these. + actual.Stopper = nil + actual.StoreSpecs = nil - assert.Equal(t, tc.expected, actual) + assert.Equal(t, tc.expected, actual) - // Restore demoCtx state after each test. - demoCtx = demoCtxTemp + // Restore demoCtx state after each test. + demoCtx = demoCtxTemp + }) } } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 3262f1eae447..914ba54d791d 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -548,10 +548,12 @@ func TestMaxKeysPerBatchReq(t *testing.T) { func TestPanicWithNilSender(t *testing.T) { defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) defer func() { if r := recover(); r == nil { t.Fatalf("failed to panic with a nil Sender") } }() - New(Config{Stopper: stop.NewStopper()}) + New(Config{Stopper: stopper}) } diff --git a/pkg/kv/kvclient/kvcoord/range_cache_test.go b/pkg/kv/kvclient/kvcoord/range_cache_test.go index eaba50e631ec..c0ff6a1a999d 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache_test.go +++ b/pkg/kv/kvclient/kvcoord/range_cache_test.go @@ -35,6 +35,7 @@ import ( type testDescriptorDB struct { data llrb.Tree + stopper *stop.Stopper cache *RangeDescriptorCache lookupCount int64 disablePrefetch bool @@ -256,10 +257,13 @@ func initTestDescriptorDB(t *testing.T) *testDescriptorDB { } } // TODO(andrei): don't leak this Stopper. Someone needs to Stop() it. - db.cache = NewRangeDescriptorCache(st, db, staticSize(2<<10), stop.NewStopper()) + db.stopper = stop.NewStopper() + db.cache = NewRangeDescriptorCache(st, db, staticSize(2<<10), db.stopper) return db } +func (db *testDescriptorDB) stop() { db.stopper.Stop(context.Background()) } + // assertLookupCountEq fails unless exactly the number of lookups have been observed. func (db *testDescriptorDB) assertLookupCountEq(t *testing.T, exp int64, key string) { t.Helper() @@ -331,6 +335,7 @@ func TestDescriptorDBGetDescriptors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() key := roachpb.RKey("k") expectedRspansMap := map[bool][]roachpb.RSpan{ @@ -388,6 +393,7 @@ func TestRangeCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() ctx := context.Background() // Totally uncached range. @@ -504,6 +510,7 @@ func TestRangeCacheCoalescedRequests(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() ctx := context.Background() pauseLookupResumeAndAssert := func(key string, expected int64) { @@ -561,6 +568,7 @@ func TestRangeCacheContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() // lookupAndWaitUntilJoin performs a RangeDescriptor lookup in a new // goroutine and blocks until the request is added to the inflight request @@ -627,6 +635,7 @@ func TestRangeCacheDetectSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() ctx := context.Background() pauseLookupResumeAndAssert := func(key string, evictToken EvictionToken) { @@ -696,6 +705,7 @@ func TestRangeCacheDetectSplitReverseScan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() ctx := context.Background() // A request initially looks up the range descriptor ["a"-"b"). @@ -827,6 +837,8 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("reverse=%t", tc.reverseScan), func(t *testing.T) { db := initTestDescriptorDB(t) + defer db.stop() + db.disablePrefetch = true ctx := context.Background() @@ -951,6 +963,8 @@ func TestRangeCacheUseIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) db := initTestDescriptorDB(t) + defer db.stop() + ctx := context.Background() // A request initially looks up the range descriptor ["a"-"b"). diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 16b635b0d2d5..befb34a864ed 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3401,6 +3401,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { startWithSingleRange: true, } mtc.Start(t, 1) + defer mtc.Stop() store := mtc.Store(0) stopper := mtc.engineStoppers[0] diff --git a/pkg/kv/kvserver/closedts/container/container_test.go b/pkg/kv/kvserver/closedts/container/container_test.go index 01e63709c8ce..6783150dbfc6 100644 --- a/pkg/kv/kvserver/closedts/container/container_test.go +++ b/pkg/kv/kvserver/closedts/container/container_test.go @@ -123,6 +123,7 @@ func setupTwoNodeTest() (_ *TestContainer, _ *TestContainer, shutdown func()) { defer wg.Done() c2.Stopper.Stop(context.Background()) }() + wg.Wait() } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index e32d59a929a3..4d0b7cc1e950 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -432,7 +432,9 @@ func TestNilProcessor(t *testing.T) { // The following should panic because they are not safe // to call on a nil Processor. - require.Panics(t, func() { p.Start(stop.NewStopper(), nil) }) + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + require.Panics(t, func() { p.Start(stopper, nil) }) require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) }) } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 73d713f49c7e..22027dc0735f 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -261,7 +261,9 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) { size := func() int64 { return 2 << 10 } st := cluster.MakeTestingClusterSettings() - rangeCache := kvcoord.NewRangeDescriptorCache(st, nil /* db */, size, stop.NewStopper()) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rangeCache := kvcoord.NewRangeDescriptorCache(st, nil /* db */, size, stopper) r := MakeDistSQLReceiver( ctx, nil /* resultWriter */, tree.Rows, rangeCache, nil /* txn */, nil /* updateClock */, &SessionTracing{}) diff --git a/pkg/util/leaktest/leaktest.go b/pkg/util/leaktest/leaktest.go index 354277a43c2b..fc3da536c601 100644 --- a/pkg/util/leaktest/leaktest.go +++ b/pkg/util/leaktest/leaktest.go @@ -112,8 +112,6 @@ func AfterTest(t testing.TB) func() { return } - // TODO(tbg): make this call 't.Error' instead of 't.Logf' once there is - // enough Stopper discipline. PrintLeakedStoppers(t) // Loop, waiting for goroutines to shut down. diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index f2f8bfb460fe..37fcfb9885d8 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -95,7 +95,7 @@ func PrintLeakedStoppers(t testing.TB) { trackedStoppers.Lock() defer trackedStoppers.Unlock() for _, tracked := range trackedStoppers.stoppers { - t.Logf("leaked stopper, created at:\n%s", tracked.createdAt) + t.Errorf("leaked stopper, created at:\n%s", tracked.createdAt) } } diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index 66bba2e9b5f5..ee6a5bfbb35a 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -346,6 +346,7 @@ func TestStopperRunTaskPanic(t *testing.T) { s := stop.NewStopper(stop.OnPanic(func(v interface{}) { ch <- v })) + defer s.Stop(context.Background()) // If RunTask were not panic-safe, Stop() would deadlock. type testFn func() explode := func(context.Context) { panic(ch) }