From f4dff127f878ad2f1da103edb0fac38f0e4a6058 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 14 Jul 2020 15:48:50 +0200 Subject: [PATCH 1/2] leaktest: print non-closed stoppers In https://github.com/cockroachdb/cockroach/issues/51363 we saw use-after-close of an engine, which I tracked down to `LocalTestCluster` using more than one stopper (only one of which was actually stopped). This is likely a larger class of problem, so I added a facility to ensure that no stoppers are currently active, which is invoked from leaktest. However, due to the large amount of work that is necessary to sanitize the whole testing codebase, this is for now only advisory (i.e. logs, does not error). Nevertheless, this will be useful in tests in which a use-after-stop actually causes problems. For the LocalTestCluster, I made sure that the warning is no longer printed, which mainly entailed not handing a different stopper to the kv DB than that used for the main server. This explains the failures we have been seeing, which always involved a KV client request. Closes #51363. Release note: None --- .../kvclient/kvcoord/txn_coord_sender_test.go | 1 + pkg/kv/kvserver/closedts/container/noop.go | 3 +- .../localtestcluster/local_test_cluster.go | 4 ++- pkg/util/leaktest/leaktest.go | 5 ++++ pkg/util/stop/stopper.go | 28 ++++++++++++++++--- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 6c99cdbb8ce3..a5a1c02fad7e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -51,6 +51,7 @@ func createTestDBWithContextAndKnobs( ) *localtestcluster.LocalTestCluster { s := &localtestcluster.LocalTestCluster{ DBContext: &dbCtx, + Stopper: dbCtx.Stopper, StoreTestingKnobs: knobs, } s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster) diff --git a/pkg/kv/kvserver/closedts/container/noop.go b/pkg/kv/kvserver/closedts/container/noop.go index 2541c62e373b..1a0cf41cf599 100644 --- a/pkg/kv/kvserver/closedts/container/noop.go +++ b/pkg/kv/kvserver/closedts/container/noop.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -35,7 +34,7 @@ func NoopContainer() *Container { return &Container{ Config: Config{ Settings: cluster.MakeTestingClusterSettings(), - Stopper: stop.NewStopper(), + Stopper: nil, Clock: func(roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) { return hlc.Timestamp{}, 0, errors.New("closed timestamps disabled for testing") }, diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 4f95060ecf01..ae5c6ef8523e 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -112,7 +112,9 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } ltc.tester = t - ltc.Stopper = stop.NewStopper() + if ltc.Stopper == nil { + ltc.Stopper = stop.NewStopper() + } cfg.RPCContext = rpc.NewContext(rpc.ContextOptions{ AmbientCtx: ambient, Config: baseCtx, diff --git a/pkg/util/leaktest/leaktest.go b/pkg/util/leaktest/leaktest.go index 7ef8e41fe125..18fea0533ecf 100644 --- a/pkg/util/leaktest/leaktest.go +++ b/pkg/util/leaktest/leaktest.go @@ -29,6 +29,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/petermattis/goid" @@ -108,6 +109,10 @@ func AfterTest(t testing.TB) func() { return } + // TODO(tbg): make this call 't.Error' instead of 't.Logf' once there is + // enough Stopper discipline. + stop.PrintLeakedStoppers(t) + // Loop, waiting for goroutines to shut down. // Wait up to 5 seconds, but finish as quickly as possible. deadline := timeutil.Now().Add(5 * time.Second) diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 72924de463d8..6c6b62130330 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "net/http" + "runtime/debug" "sort" "strings" "sync" @@ -42,7 +43,8 @@ var ErrUnavailable = &roachpb.NodeUnavailableError{} func register(s *Stopper) { trackedStoppers.Lock() - trackedStoppers.stoppers = append(trackedStoppers.stoppers, s) + trackedStoppers.stoppers = append(trackedStoppers.stoppers, + stopperWithStack{s: s, stack: debug.Stack()}) trackedStoppers.Unlock() } @@ -51,7 +53,7 @@ func unregister(s *Stopper) { defer trackedStoppers.Unlock() sl := trackedStoppers.stoppers for i, tracked := range sl { - if tracked == s { + if tracked.s == s { trackedStoppers.stoppers = sl[:i+copy(sl[i:], sl[i+1:])] return } @@ -59,9 +61,14 @@ func unregister(s *Stopper) { panic("attempt to unregister untracked stopper") } +type stopperWithStack struct { + s *Stopper + stack []byte +} + var trackedStoppers struct { syncutil.Mutex - stoppers []*Stopper + stoppers []stopperWithStack } // HandleDebug responds with the list of stopper tasks actively running. @@ -69,13 +76,26 @@ func HandleDebug(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") trackedStoppers.Lock() defer trackedStoppers.Unlock() - for _, s := range trackedStoppers.stoppers { + for _, ss := range trackedStoppers.stoppers { + s := ss.s s.mu.Lock() fmt.Fprintf(w, "%p: %d tasks\n%s", s, s.mu.numTasks, s.runningTasksLocked()) s.mu.Unlock() } } +// PrintLeakedStoppers prints (using `t`) the creation site of each Stopper +// for which `.Stop()` has not yet been called. +func PrintLeakedStoppers(t interface { + Logf(string, ...interface{}) +}) { + trackedStoppers.Lock() + defer trackedStoppers.Unlock() + for _, tracked := range trackedStoppers.stoppers { + t.Logf("leaked stopper, created at:\n%s", tracked.stack) + } +} + // Closer is an interface for objects to attach to the stopper to // be closed once the stopper completes. type Closer interface { From 8ba28dc7e518d1f7a7eec7e2885af4e6383913b5 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 17 Jul 2020 18:21:43 +0200 Subject: [PATCH 2/2] *: plumb stopper to kv.NewDB and kv.DefaultDBContext DefaultDBContext previously created an internal stopper, but nobody was ever stopping it. Nothing interesting happens in this change. Release note: None --- .../kvfollowerreadsccl/followerreads_test.go | 3 +- pkg/kv/client_test.go | 14 +- pkg/kv/db.go | 10 +- .../kvcoord/dist_sender_server_test.go | 2 +- pkg/kv/kvclient/kvcoord/split_test.go | 6 +- .../kvcoord/txn_coord_sender_server_test.go | 2 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 62 +++--- pkg/kv/kvclient/kvcoord/txn_test.go | 4 +- pkg/kv/kvserver/addressing_test.go | 2 +- pkg/kv/kvserver/client_test.go | 4 +- pkg/kv/kvserver/idalloc/id_alloc_test.go | 2 +- .../intentresolver/intent_resolver_test.go | 22 +- .../protectedts/ptverifier/verifier_test.go | 2 +- pkg/kv/kvserver/replica_test.go | 2 +- pkg/kv/kvserver/store_test.go | 4 +- pkg/kv/kvserver/txnrecovery/manager_test.go | 2 +- pkg/kv/kvserver/txnwait/queue_test.go | 19 +- pkg/kv/txn_test.go | 188 ++++++++++-------- pkg/server/server.go | 2 +- pkg/server/testserver.go | 6 +- pkg/sql/conn_executor_internal_test.go | 2 +- pkg/sql/distsql_running_test.go | 2 +- pkg/sql/sem/tree/timeconv_test.go | 10 +- pkg/sql/txn_state_test.go | 9 +- .../localtestcluster/local_test_cluster.go | 49 ++--- pkg/util/leaktest/leaktest.go | 7 +- pkg/util/stop/stopper.go | 18 +- 27 files changed, 246 insertions(+), 209 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 0155b4a39341..526ae0b144b8 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -156,8 +156,7 @@ func TestOracleFactory(t *testing.T) { rpcContext := rpc.NewInsecureTestingContext(clock, stopper) c := kv.NewDB(log.AmbientContext{ Tracer: tracing.NewTracer(), - }, kv.MockTxnSenderFactory{}, - hlc.NewClock(hlc.UnixNano, time.Nanosecond)) + }, kv.MockTxnSenderFactory{}, hlc.NewClock(hlc.UnixNano, time.Nanosecond), stopper) txn := kv.NewTxn(context.Background(), c, 0) of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{ Settings: st, diff --git a/pkg/kv/client_test.go b/pkg/kv/client_test.go index 92183a4373de..cfa69a328502 100644 --- a/pkg/kv/client_test.go +++ b/pkg/kv/client_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -687,6 +688,9 @@ func TestReadConsistencyTypes(t *testing.T) { roachpb.INCONSISTENT, } { t.Run(rc.String(), func(t *testing.T) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) // Mock out DistSender's sender function to check the read consistency for // outgoing BatchRequests and return an empty reply. factory := kv.NonTransactionalFactoryFunc( @@ -699,8 +703,7 @@ func TestReadConsistencyTypes(t *testing.T) { }) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) - ctx := context.Background() + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) prepWithRC := func() *kv.Batch { b := &kv.Batch{} @@ -830,6 +833,10 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + // Mock out sender function to check that created transactions // have the observed timestamp set for the configured node ID. factory := kv.MakeMockTxnSenderFactory( @@ -839,7 +846,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) { setup := func(nodeID roachpb.NodeID) *kv.DB { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - dbCtx := kv.DefaultDBContext() + dbCtx := kv.DefaultDBContext(stopper) var c base.NodeIDContainer if nodeID != 0 { c.Set(context.Background(), nodeID) @@ -849,7 +856,6 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) { db := kv.NewDBWithContext(testutils.MakeAmbientCtx(), factory, clock, dbCtx) return db } - ctx := context.Background() // Verify direct creation of Txns. directCases := []struct { diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d4a8ecf5bce2..b958b5c210f0 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -183,13 +183,13 @@ type DBContext struct { // DefaultDBContext returns (a copy of) the default options for // NewDBWithContext. -func DefaultDBContext() DBContext { +func DefaultDBContext(stopper *stop.Stopper) DBContext { var c base.NodeIDContainer return DBContext{ UserPriority: roachpb.NormalUserPriority, // TODO(tbg): this is ugly. Force callers to pass in an SQLIDContainer. NodeID: base.NewSQLIDContainer(0, &c, true /* exposed */), - Stopper: stop.NewStopper(), + Stopper: stopper, } } @@ -278,8 +278,10 @@ func (db *DB) Clock() *hlc.Clock { } // NewDB returns a new DB. -func NewDB(actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock) *DB { - return NewDBWithContext(actx, factory, clock, DefaultDBContext()) +func NewDB( + actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper, +) *DB { + return NewDBWithContext(actx, factory, clock, DefaultDBContext(stopper)) } // NewDBWithContext returns a new DB with the given parameters. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 40dc4ddad572..12cc92463e4a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -101,7 +101,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) { }, ds, ) - db := kv.NewDB(ambient, tsf, s.Clock()) + db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) // Now, with an intent pending, attempt (asynchronously) to read // from an arbitrary key. This will cause the distributed sender to diff --git a/pkg/kv/kvclient/kvcoord/split_test.go b/pkg/kv/kvclient/kvcoord/split_test.go index 9a7d46d26e80..c2413e5c1408 100644 --- a/pkg/kv/kvclient/kvcoord/split_test.go +++ b/pkg/kv/kvclient/kvcoord/split_test.go @@ -193,7 +193,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) { s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster) // This is purely to silence log spam. - config.TestingSetupZoneConfigHook(s.Stopper) + config.TestingSetupZoneConfigHook(s.Stopper()) defer s.Stop() // Start test writer write about a 32K/key so there aren't too many @@ -240,7 +240,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) { func TestRangeSplitsWithSameKeyTwice(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{ + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ DisableScanner: true, DisableSplitQueue: true, DisableMergeQueue: true, @@ -268,7 +268,7 @@ func TestRangeSplitsWithSameKeyTwice(t *testing.T) { func TestRangeSplitsStickyBit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{ + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ DisableScanner: true, DisableSplitQueue: true, DisableMergeQueue: true, diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go index 1b60967d8c95..691d7e1f4db3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go @@ -98,7 +98,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { }, s.DistSenderI().(*kvcoord.DistSender), ) - db := kv.NewDB(ambient, tsf, s.Clock()) + db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) if err := txn.Put(ctx, key, "val"); err != nil { t.Fatal(err) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index a5a1c02fad7e..530e66816824 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -43,15 +43,13 @@ import ( // createTestDB creates a local test server and starts it. The caller // is responsible for stopping the test server. func createTestDB(t testing.TB) *localtestcluster.LocalTestCluster { - return createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), nil) + return createTestDBWithKnobs(t, nil) } -func createTestDBWithContextAndKnobs( - t testing.TB, dbCtx kv.DBContext, knobs *kvserver.StoreTestingKnobs, +func createTestDBWithKnobs( + t testing.TB, knobs *kvserver.StoreTestingKnobs, ) *localtestcluster.LocalTestCluster { s := &localtestcluster.LocalTestCluster{ - DBContext: &dbCtx, - Stopper: dbCtx.Stopper, StoreTestingKnobs: knobs, } s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster) @@ -237,11 +235,11 @@ func TestTxnCoordSenderCondenseLockSpans(t *testing.T) { AmbientCtx: ambient, Settings: st, Clock: s.Clock, - Stopper: s.Stopper, + Stopper: s.Stopper(), }, ds, ) - db := kv.NewDB(ambient, tsf, s.Clock) + db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) ctx := context.Background() txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) @@ -282,7 +280,7 @@ func TestTxnCoordSenderCondenseLockSpans(t *testing.T) { func TestTxnCoordSenderHeartbeat(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{ + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ DisableScanner: true, DisableSplitQueue: true, DisableMergeQueue: true, @@ -306,14 +304,14 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) { HeartbeatInterval: time.Millisecond, Settings: s.Cfg.Settings, Clock: s.Clock, - Stopper: s.Stopper, + Stopper: s.Stopper(), }, NewDistSenderForLocalTestCluster( s.Cfg.Settings, &roachpb.NodeDescriptor{NodeID: 1}, - ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper, s.Gossip, + ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper(), s.Gossip, ), ) - quickHeartbeatDB := kv.NewDB(ambient, tsf, s.Clock) + quickHeartbeatDB := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) // We're going to test twice. In both cases the heartbeat is supposed to // notice that its transaction is aborted, but: @@ -660,7 +658,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) { }, } - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), knobs) + s := createTestDBWithKnobs(t, knobs) defer s.Stop() ctx := context.Background() @@ -818,7 +816,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { }, senderFn, ) - db := kv.NewDB(ambient, tsf, clock) + db := kv.NewDB(ambient, tsf, clock, stopper) key := roachpb.Key("test-key") now := clock.Now() origTxnProto := roachpb.MakeTransaction( @@ -962,7 +960,7 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { ) defer stopper.Stop(ctx) - db := kv.NewDB(ambient, factory, clock) + db := kv.NewDB(ambient, factory, clock, stopper) txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Acquire locks on a-b, c, u-w before the final batch. @@ -1053,9 +1051,7 @@ func checkTxnMetricsOnce( // have a faster sample interval and returns a cleanup function to be // executed by callers. func setupMetricsTest(t *testing.T) (*localtestcluster.LocalTestCluster, TxnMetrics, func()) { - dbCtx := kv.DefaultDBContext() s := &localtestcluster.LocalTestCluster{ - DBContext: &dbCtx, // Liveness heartbeat txns mess up the metrics. DisableLivenessHeartbeat: true, DontCreateSystemRanges: true, @@ -1336,7 +1332,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { senderFn, ) - db := kv.NewDB(ambient, factory, clock) + db := kv.NewDB(ambient, factory, clock, stopper) txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) if pErr := txn.Put(ctx, "a", "b"); pErr != nil { t.Fatalf("put failed: %s", pErr) @@ -1416,7 +1412,7 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) { }, sender, ) - db := kv.NewDB(ambient, factory, clock) + db := kv.NewDB(ambient, factory, clock, stopper) sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if _, ok := ba.GetArg(roachpb.EndTxn); !ok { @@ -1484,7 +1480,7 @@ func TestOnePCErrorTracking(t *testing.T) { }, sender, ) - db := kv.NewDB(ambient, factory, clock) + db := kv.NewDB(ambient, factory, clock, stopper) keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") // Register a matcher catching the commit attempt. @@ -1578,7 +1574,7 @@ func TestCommitReadOnlyTransaction(t *testing.T) { testutils.RunTrueAndFalse(t, "explicit txn", func(t *testing.T, explicitTxn bool) { testutils.RunTrueAndFalse(t, "with get", func(t *testing.T, withGet bool) { calls = nil - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() if withGet { @@ -1684,7 +1680,7 @@ func TestCommitMutatingTransaction(t *testing.T) { for i, test := range testArgs { t.Run(test.expMethod.String(), func(t *testing.T) { calls = nil - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) if err := db.Txn(ctx, test.f); err != nil { t.Fatalf("%d: unexpected error on commit: %s", i, err) } @@ -1727,7 +1723,7 @@ func TestAbortReadOnlyTransaction(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) if err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { return errors.New("foo") }); err == nil { @@ -1784,7 +1780,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) { testutils.RunTrueAndFalse(t, "success", func(t *testing.T, success bool) { @@ -1876,7 +1872,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) if err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { defer func() { attempt++ }() @@ -1932,7 +1928,7 @@ func TestSequenceNumbers(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) for i := 0; i < 5; i++ { @@ -1982,7 +1978,7 @@ func TestConcurrentTxnRequestsProhibited(t *testing.T) { }, sender, ) - db := kv.NewDB(ambient, factory, clock) + db := kv.NewDB(ambient, factory, clock, stopper) err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { g, gCtx := errgroup.WithContext(ctx) @@ -2024,7 +2020,7 @@ func TestTxnRequestTxnTimestamp(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) curReq := 0 requests := []struct { @@ -2100,7 +2096,7 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) { }, sender, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) // We're going to run two tests: one where the EndTxn is by itself in a // batch, one where it is not. As of June 2018, the EndTxn is elided in @@ -2164,9 +2160,9 @@ func TestTxnCoordSenderPipelining(t *testing.T) { AmbientCtx: ambientCtx, Settings: s.Cfg.Settings, Clock: s.Clock, - Stopper: s.Stopper, + Stopper: s.Stopper(), }, senderFn) - db := kv.NewDB(ambientCtx, tsf, s.Clock) + db := kv.NewDB(ambientCtx, tsf, s.Clock, s.Stopper()) err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "key", "val") @@ -2245,7 +2241,7 @@ func TestAnchorKey(t *testing.T) { }, senderFn, ) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { ba := txn.NewBatch() @@ -2283,7 +2279,7 @@ func TestLeafTxnClientRejectError(t *testing.T) { }, } - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), knobs) + s := createTestDBWithKnobs(t, knobs) defer s.Stop() ctx := context.Background() @@ -2314,7 +2310,7 @@ func TestLeafTxnClientRejectError(t *testing.T) { func TestUpdateRoootWithLeafFinalStateInAbortedTxn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), nil /* knobs */) + s := createTestDBWithKnobs(t, nil /* knobs */) defer s.Stop() ctx := context.Background() diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 01744598cfad..46177de8833b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -378,7 +378,7 @@ func TestTxnLongDelayBetweenWritesWithConcurrentRead(t *testing.T) { func TestTxnRepeatGetWithRangeSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{ + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ DisableScanner: true, DisableSplitQueue: true, DisableMergeQueue: true, @@ -620,7 +620,7 @@ func TestTxnCommitTimestampAdvancedByRefresh(t *testing.T) { injected := false var refreshTS hlc.Timestamp errKey := roachpb.Key("inject_err") - s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{ + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { if g, ok := ba.GetArg(roachpb.Get); ok && g.(*roachpb.GetRequest).Key.Equal(errKey) { if injected { diff --git a/pkg/kv/kvserver/addressing_test.go b/pkg/kv/kvserver/addressing_test.go index b6ab6401595e..9b2ca1bcfe93 100644 --- a/pkg/kv/kvserver/addressing_test.go +++ b/pkg/kv/kvserver/addressing_test.go @@ -145,7 +145,7 @@ func TestUpdateRangeAddressing(t *testing.T) { }, store.TestSender(), ) - db := kv.NewDB(actx, tcsf, store.cfg.Clock) + db := kv.NewDB(actx, tcsf, store.cfg.Clock, stopper) ctx := context.Background() txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) if err := txn.Run(ctx, b); err != nil { diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 99a5ba58b17b..7d85c57a79ff 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -177,7 +177,7 @@ func createTestStoreWithOpts( }, distSender, ) - storeCfg.DB = kv.NewDB(ac, tcsFactory, storeCfg.Clock) + storeCfg.DB = kv.NewDB(ac, tcsFactory, storeCfg.Clock, stopper) storeCfg.StorePool = kvserver.NewTestStorePool(storeCfg) storeCfg.Transport = kvserver.NewDummyRaftTransport(storeCfg.Settings) // TODO(bdarnell): arrange to have the transport closed. @@ -823,7 +823,7 @@ func (m *multiTestContext) populateDB(idx int, st *cluster.Settings, stopper *st }, m.distSenders[idx], ) - m.dbs[idx] = kv.NewDB(ambient, tcsFactory, m.clocks[idx]) + m.dbs[idx] = kv.NewDB(ambient, tcsFactory, m.clocks[idx], stopper) } func (m *multiTestContext) populateStorePool( diff --git a/pkg/kv/kvserver/idalloc/id_alloc_test.go b/pkg/kv/kvserver/idalloc/id_alloc_test.go index 1bd5fe004e51..b94751a1e5c5 100644 --- a/pkg/kv/kvserver/idalloc/id_alloc_test.go +++ b/pkg/kv/kvserver/idalloc/id_alloc_test.go @@ -46,7 +46,7 @@ func newTestAllocator(t testing.TB) (*localtestcluster.LocalTestCluster, *idallo Key: keys.RangeIDGenerator, Incrementer: idalloc.DBIncrementer(s.DB), BlockSize: 10, /* blockSize */ - Stopper: s.Stopper, + Stopper: s.Stopper(), }) if err != nil { s.Stop() diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go index 4f2348804d0f..3b3fb419c016 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go @@ -215,7 +215,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { - ir := newIntentResolverWithSendFuncs(cfg, c.sendFuncs) + ir := newIntentResolverWithSendFuncs(cfg, c.sendFuncs, stopper) var didPush, didSucceed bool done := make(chan struct{}) onComplete := func(pushed, succeeded bool) { @@ -256,7 +256,7 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) { pushTxnSendFunc(t, 1), resolveIntentsSendFunc(t), ) - ir := newIntentResolverWithSendFuncs(cfg, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf, stopper) // Run defaultTaskLimit tasks which will block until blocker is closed. blocker := make(chan struct{}) defer close(blocker) @@ -329,7 +329,7 @@ func TestCleanupIntentsAsync(t *testing.T) { Stopper: stopper, Clock: clock, } - ir := newIntentResolverWithSendFuncs(cfg, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf, stopper) err := ir.CleanupIntentsAsync(context.Background(), c.intents, true) sf.drain(t) stopper.Stop(context.Background()) @@ -394,7 +394,7 @@ func TestCleanupMultipleIntentsAsync(t *testing.T) { MaxIntentResolutionBatchSize: 1, }, } - ir := newIntentResolverWithSendFuncs(cfg, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf, stopper) err := ir.CleanupIntentsAsync(ctx, testIntents, false) sf.drain(t) stopper.Stop(ctx) @@ -493,7 +493,7 @@ func TestCleanupTxnIntentsAsyncWithPartialRollback(t *testing.T) { Stopper: stopper, Clock: clock, } - ir := newIntentResolverWithSendFuncs(cfg, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf, stopper) intents := []result.EndTxnIntents{{Txn: txn}} @@ -567,7 +567,7 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { Stopper: stopper, Clock: clock, } - ir := newIntentResolverWithSendFuncs(cfg, c.sendFuncs) + ir := newIntentResolverWithSendFuncs(cfg, c.sendFuncs, stopper) if c.before != nil { defer c.before(&c, ir)() } @@ -665,7 +665,7 @@ func TestCleanupMultipleTxnIntentsAsync(t *testing.T) { MaxIntentResolutionBatchSize: 1, }, } - ir := newIntentResolverWithSendFuncs(cfg, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf, stopper) err := ir.CleanupTxnIntentsAsync(ctx, 1, testEndTxnIntents, false) sf.drain(t) stopper.Stop(ctx) @@ -744,7 +744,7 @@ func TestCleanupIntents(t *testing.T) { t.Run("", func(t *testing.T) { c.cfg.Stopper = stopper c.cfg.Clock = clock - ir := newIntentResolverWithSendFuncs(c.cfg, c.sendFuncs) + ir := newIntentResolverWithSendFuncs(c.cfg, c.sendFuncs, stopper) num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT) assert.Equal(t, num, c.expectedNum, "number of resolved intents") assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err) @@ -782,7 +782,9 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In // A library of useful sendFuncs are defined below. type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) -func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver { +func newIntentResolverWithSendFuncs( + c Config, sf *sendFuncs, stopper *stop.Stopper, +) *IntentResolver { txnSenderFactory := kv.NonTransactionalFactoryFunc( func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { sf.mu.Lock() @@ -792,7 +794,7 @@ func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver { }) db := kv.NewDB(log.AmbientContext{ Tracer: tracing.NewTracer(), - }, txnSenderFactory, c.Clock) + }, txnSenderFactory, c.Clock, stopper) c.DB = db c.MaxGCBatchWait = time.Nanosecond return New(c) diff --git a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go index 5325c2fce125..0a5b27b17e29 100644 --- a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go +++ b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go @@ -65,7 +65,7 @@ func TestVerifier(t *testing.T) { pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor)) withDB := ptstorage.WithDatabase(pts, s.DB()) - db := kv.NewDB(s.DB().AmbientContext, tsf, s.Clock()) + db := kv.NewDB(s.DB().AmbientContext, tsf, s.Clock(), s.Stopper()) ptv := ptverifier.New(db, pts) makeTableSpan := func(tableID uint32) roachpb.Span { k := keys.SystemSQLCodec.TablePrefix(tableID) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 699862a4bce5..3506de732c7b 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -244,7 +244,7 @@ func (tc *testContext) StartWithStoreConfigAndVersion( // circular dependency between the test sender and the store. The actual // store will be passed to the sender after it is created and bootstrapped. factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock) + cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv)) if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{ diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 5e9537958bc4..a9b30a045c4f 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -231,7 +231,7 @@ func createTestStoreWithoutStart( stopper.AddCloser(eng) cfg.Transport = NewDummyRaftTransport(cfg.Settings) factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock) + cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) store := NewStore(context.Background(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) factory.setStore(store) @@ -436,7 +436,7 @@ func TestStoreInitAndBootstrap(t *testing.T) { stopper.AddCloser(eng) cfg.Transport = NewDummyRaftTransport(cfg.Settings) factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock) + cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) { store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) // Can't start as haven't bootstrapped. diff --git a/pkg/kv/kvserver/txnrecovery/manager_test.go b/pkg/kv/kvserver/txnrecovery/manager_test.go index b01fa9386bf1..2a20cedc2530 100644 --- a/pkg/kv/kvserver/txnrecovery/manager_test.go +++ b/pkg/kv/kvserver/txnrecovery/manager_test.go @@ -34,7 +34,7 @@ func makeManager(s *kv.Sender) (Manager, *hlc.Clock, *stop.Stopper) { ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { return (*s).Send(ctx, ba) - }), clock) + }), clock, stopper) return NewManager(ac, clock, db, stopper), clock, stopper } diff --git a/pkg/kv/kvserver/txnwait/queue_test.go b/pkg/kv/kvserver/txnwait/queue_test.go index 5c290e75bb6f..513ab82e4d97 100644 --- a/pkg/kv/kvserver/txnwait/queue_test.go +++ b/pkg/kv/kvserver/txnwait/queue_test.go @@ -164,18 +164,18 @@ func TestIsPushed(t *testing.T) { } } -func makeConfig(s kv.SenderFunc) Config { +func makeConfig(s kv.SenderFunc, stopper *stop.Stopper) Config { var cfg Config cfg.RangeDesc = &roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKeyMax, } manual := hlc.NewManualClock(123) cfg.Clock = hlc.NewClock(manual.UnixNano, time.Nanosecond) - cfg.Stopper = stop.NewStopper() + cfg.Stopper = stopper cfg.Metrics = NewMetrics(time.Minute) if s != nil { factory := kv.NonTransactionalFactoryFunc(s) - cfg.DB = kv.NewDB(testutils.MakeAmbientCtx(), factory, cfg.Clock) + cfg.DB = kv.NewDB(testutils.MakeAmbientCtx(), factory, cfg.Clock, stopper) } return cfg } @@ -186,12 +186,14 @@ func makeConfig(s kv.SenderFunc) Config { // leak. func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() - cfg := makeConfig(nil) - defer cfg.Stopper.Stop(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + cfg := makeConfig(nil, stopper) q := NewQueue(cfg) q.Enable() - ctx, cancel := context.WithCancel(context.Background()) waitingRes := make(chan *roachpb.Error) go func() { req := &roachpb.QueryTxnRequest{WaitForUpdate: true} @@ -223,13 +225,14 @@ func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { // released. func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) var mockSender kv.SenderFunc cfg := makeConfig(func( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { return mockSender(ctx, ba) - }) - defer cfg.Stopper.Stop(context.Background()) + }, stopper) q := NewQueue(cfg) q.Enable() diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index abf4804465ac..465d1029725c 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -40,10 +41,12 @@ func TestTxnSnowballTrace(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock) tracer := tracing.NewTracer() ctx, sp := tracing.StartSnowballTrace(context.Background(), tracer, "test-txn") + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock, stopper) if err := db.Txn(ctx, func(ctx context.Context, txn *Txn) error { log.Event(ctx, "inside txn") @@ -134,16 +137,16 @@ func TestInitPut(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) // This test is mostly an excuse to exercise otherwise unused code. // TODO(vivekmenezes): update test or remove when InitPut is being // considered sufficiently tested and this path exercised. clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB( - testutils.MakeAmbientCtx(), - newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - br := ba.CreateReply() - return br, nil - }), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + br := ba.CreateReply() + return br, nil + }), clock, stopper) txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) if pErr := txn.InitPut(ctx, "a", "b", false); pErr != nil { @@ -158,8 +161,11 @@ func TestInitPut(t *testing.T) { func TestTransactionConfig(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - dbCtx := DefaultDBContext() + dbCtx := DefaultDBContext(stopper) dbCtx.UserPriority = 101 db := NewDBWithContext( testutils.MakeAmbientCtx(), @@ -181,14 +187,15 @@ func TestTransactionConfig(t *testing.T) { func TestCommitTransactionOnce(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) count := 0 - db := NewDB( - testutils.MakeAmbientCtx(), - newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - count++ - return ba.CreateReply(), nil - }), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + count++ + return ba.CreateReply(), nil + }), clock, stopper) if err := db.Txn(context.Background(), func(ctx context.Context, txn *Txn) error { b := txn.NewBatch() b.Put("z", "adding a write exposed a bug in #1882") @@ -206,17 +213,18 @@ func TestCommitTransactionOnce(t *testing.T) { func TestAbortMutatingTransaction(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) var calls []roachpb.Method - db := NewDB( - testutils.MakeAmbientCtx(), - newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - calls = append(calls, ba.Methods()...) - if et, ok := ba.GetArg(roachpb.EndTxn); ok && et.(*roachpb.EndTxnRequest).Commit { - t.Errorf("expected commit to be false") - } - return ba.CreateReply(), nil - }), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + calls = append(calls, ba.Methods()...) + if et, ok := ba.GetArg(roachpb.EndTxn); ok && et.(*roachpb.EndTxnRequest).Commit { + t.Errorf("expected commit to be false") + } + return ba.CreateReply(), nil + }), clock, stopper) if err := db.Txn(context.Background(), func(ctx context.Context, txn *Txn) error { if err := txn.Put(ctx, "a", "b"); err != nil { @@ -258,37 +266,38 @@ func TestRunTransactionRetryOnErrors(t *testing.T) { for _, test := range testCases { t.Run(fmt.Sprintf("%T", test.err), func(t *testing.T) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) count := 0 - db := NewDB( - testutils.MakeAmbientCtx(), - newTestTxnFactory( - func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - - if _, ok := ba.GetArg(roachpb.Put); ok { - count++ - if count == 1 { - var pErr *roachpb.Error - if errors.HasType(test.err, (*roachpb.ReadWithinUncertaintyIntervalError)(nil)) { - // This error requires an observed timestamp to have been - // recorded on the origin node. - ba.Txn.UpdateObservedTimestamp(1, hlc.Timestamp{WallTime: 1, Logical: 1}) - pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn) - pErr.OriginNode = 1 - } else { - pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn) - } - - if pErr.TransactionRestart != roachpb.TransactionRestart_NONE { - // HACK ALERT: to do without a TxnCoordSender, we jump through - // hoops to get the retryable error expected by db.Txn(). - return nil, roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( - pErr.Message, ba.Txn.ID, *ba.Txn)) - } - return nil, pErr + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory( + func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + + if _, ok := ba.GetArg(roachpb.Put); ok { + count++ + if count == 1 { + var pErr *roachpb.Error + if errors.HasType(test.err, (*roachpb.ReadWithinUncertaintyIntervalError)(nil)) { + // This error requires an observed timestamp to have been + // recorded on the origin node. + ba.Txn.UpdateObservedTimestamp(1, hlc.Timestamp{WallTime: 1, Logical: 1}) + pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn) + pErr.OriginNode = 1 + } else { + pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn) + } + + if pErr.TransactionRestart != roachpb.TransactionRestart_NONE { + // HACK ALERT: to do without a TxnCoordSender, we jump through + // hoops to get the retryable error expected by db.Txn(). + return nil, roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( + pErr.Message, ba.Txn.ID, *ba.Txn)) } + return nil, pErr } - return ba.CreateReply(), nil - }), clock) + } + return ba.CreateReply(), nil + }), clock, stopper) err := db.Txn(context.Background(), func(ctx context.Context, txn *Txn) error { return txn.Put(ctx, "a", "b") }) @@ -317,9 +326,11 @@ func TestTransactionStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock, stopper) for _, write := range []bool{true, false} { for _, commit := range []bool{true, false} { txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) @@ -355,9 +366,11 @@ func TestCommitInBatchWrongTxn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock, stopper) txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) b1 := &Batch{} @@ -377,23 +390,23 @@ func TestSetPriority(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) var expected roachpb.UserPriority - db := NewDB( - testutils.MakeAmbientCtx(), - newTestTxnFactory( - func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - if ba.UserPriority != expected { - pErr := roachpb.NewErrorf("Priority not set correctly in the batch! "+ - "(expected: %s, value: %s)", expected, ba.UserPriority) - return nil, pErr - } + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory( + func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if ba.UserPriority != expected { + pErr := roachpb.NewErrorf("Priority not set correctly in the batch! "+ + "(expected: %s, value: %s)", expected, ba.UserPriority) + return nil, pErr + } - br := &roachpb.BatchResponse{} - br.Txn.Update(ba.Txn) // copy - return br, nil - }), clock) + br := &roachpb.BatchResponse{} + br.Txn.Update(ba.Txn) // copy + return br, nil + }), clock, stopper) // Verify the normal priority setting path. expected = roachpb.NormalUserPriority @@ -419,8 +432,11 @@ func TestSetPriority(t *testing.T) { func TestWrongTxnRetry(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock, stopper) var retries int txnClosure := func(ctx context.Context, outerTxn *Txn) error { @@ -445,8 +461,12 @@ func TestWrongTxnRetry(t *testing.T) { func TestBatchMixRawRequest(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock) + db := NewDB(testutils.MakeAmbientCtx(), newTestTxnFactory(nil), clock, stopper) b := &Batch{} b.AddRawRequest(&roachpb.EndTxnRequest{}) @@ -460,17 +480,16 @@ func TestUpdateDeadlineMaybe(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) mc := hlc.NewManualClock(1) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - db := NewDB( - testutils.MakeAmbientCtx(), - MakeMockTxnSenderFactory( - func(context.Context, *roachpb.Transaction, roachpb.BatchRequest, - ) (*roachpb.BatchResponse, *roachpb.Error) { - return nil, nil - }), - clock) + db := NewDB(testutils.MakeAmbientCtx(), MakeMockTxnSenderFactory( + func(context.Context, *roachpb.Transaction, roachpb.BatchRequest, + ) (*roachpb.BatchResponse, *roachpb.Error) { + return nil, nil + }), clock, stopper) txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) if txn.deadline() != nil { @@ -508,17 +527,16 @@ func TestAnchoringErrorNoTrigger(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) mc := hlc.NewManualClock(1) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - db := NewDB( - testutils.MakeAmbientCtx(), - MakeMockTxnSenderFactory( - func(context.Context, *roachpb.Transaction, roachpb.BatchRequest, - ) (*roachpb.BatchResponse, *roachpb.Error) { - return nil, nil - }), - clock) + db := NewDB(testutils.MakeAmbientCtx(), MakeMockTxnSenderFactory( + func(context.Context, *roachpb.Transaction, roachpb.BatchRequest, + ) (*roachpb.BatchResponse, *roachpb.Error) { + return nil, nil + }), clock, stopper) txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) require.EqualError(t, txn.SetSystemConfigTrigger(true /* forSystemTenant */), "unimplemented") require.False(t, txn.systemConfigTrigger) diff --git a/pkg/server/server.go b/pkg/server/server.go index a43508d13c04..9949602817cf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -404,7 +404,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender) - dbCtx := kv.DefaultDBContext() + dbCtx := kv.DefaultDBContext(stopper) dbCtx.NodeID = idContainer dbCtx.Stopper = stopper db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index c31a2d269989..819abbd7f875 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -543,11 +543,7 @@ func makeSQLServerArgs( }, ds, ) - db := kv.NewDB( - baseCfg.AmbientCtx, - tcsFactory, - clock, - ) + db := kv.NewDB(baseCfg.AmbientCtx, tcsFactory, clock, stopper) circularInternalExecutor := &sql.InternalExecutor{} // Protected timestamps won't be available (at first) in multi-tenant diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 882b4f81a814..34973d75bf5b 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -246,7 +246,7 @@ func startConnExecutor( ) (*roachpb.BatchResponse, *roachpb.Error) { return nil, nil }) - db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper) st := cluster.MakeTestingClusterSettings() nodeID := base.TestingIDContainer distSQLMetrics := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */) diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 31f4d53bb798..3acf43c14c62 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -102,7 +102,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { }, s.DistSenderI().(*kvcoord.DistSender), ) - shortDB := kv.NewDB(ambient, tsf, s.Clock()) + shortDB := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) iter := 0 // We'll trace to make sure the test isn't fooling itself. diff --git a/pkg/sql/sem/tree/timeconv_test.go b/pkg/sql/sem/tree/timeconv_test.go index 8b5a70df339e..af1fce473baf 100644 --- a/pkg/sql/sem/tree/timeconv_test.go +++ b/pkg/sql/sem/tree/timeconv_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" ) // Test that EvalContext.GetClusterTimestamp() gets its timestamp from the @@ -42,16 +43,17 @@ func TestClusterTimestampConversion(t *testing.T) { {9223372036854775807, 2147483647, "9223372036854775807.2147483647"}, } + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) senderFactory := kv.MakeMockTxnSenderFactory( func(context.Context, *roachpb.Transaction, roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { panic("unused") }) - db := kv.NewDB( - testutils.MakeAmbientCtx(), - senderFactory, - clock) + db := kv.NewDB(testutils.MakeAmbientCtx(), senderFactory, clock, stopper) for _, d := range testData { ts := hlc.Timestamp{WallTime: d.walltime, Logical: d.logical} diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 7a894eff8797..0008c7e7ebb0 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -48,7 +49,7 @@ type testContext struct { settings *cluster.Settings } -func makeTestContext() testContext { +func makeTestContext(stopper *stop.Stopper) testContext { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) factory := kv.MakeMockTxnSenderFactory( @@ -62,7 +63,7 @@ func makeTestContext() testContext { return testContext{ manualClock: manual, clock: clock, - mockDB: kv.NewDB(ambient, factory, clock), + mockDB: kv.NewDB(ambient, factory, clock, stopper), mon: mon.NewMonitor( "test root mon", mon.MemoryResource, @@ -209,8 +210,10 @@ func TestTransitions(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) dummyRewCap := rewindCapability{rewindPos: CmdPos(12)} - testCon := makeTestContext() + testCon := makeTestContext(stopper) tranCtx := transitionCtx{ db: testCon.mockDB, nodeIDOrZero: roachpb.NodeID(5), diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index ae5c6ef8523e..727bc6e508e5 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -56,10 +56,10 @@ type LocalTestCluster struct { Eng storage.Engine Store *kvserver.Store StoreTestingKnobs *kvserver.StoreTestingKnobs - DBContext *kv.DBContext + dbContext *kv.DBContext DB *kv.DB Stores *kvserver.Stores - Stopper *stop.Stopper + stopper *stop.Stopper Latency time.Duration // sleep for each RPC sent tester testing.TB @@ -92,12 +92,19 @@ type InitFactoryFn func( gossip *gossip.Gossip, ) kv.TxnSenderFactory +// Stopper returns the Stopper. +func (ltc *LocalTestCluster) Stopper() *stop.Stopper { + return ltc.stopper +} + // Start starts the test cluster by bootstrapping an in-memory store // (defaults to maximum of 50M). The server is started, launching the // node RPC server and all HTTP endpoints. Use the value of // TestServer.Addr after Start() for client connections. Use Stop() // to shutdown the server after the test completes. func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFactory InitFactoryFn) { + ltc.stopper = stop.NewStopper() + ltc.Manual = hlc.NewManualClock(123) ltc.Clock = hlc.NewClock(ltc.Manual.UnixNano, 50*time.Millisecond) cfg := kvserver.TestStoreConfig(ltc.Clock) @@ -112,38 +119,34 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } ltc.tester = t - if ltc.Stopper == nil { - ltc.Stopper = stop.NewStopper() - } cfg.RPCContext = rpc.NewContext(rpc.ContextOptions{ AmbientCtx: ambient, Config: baseCtx, Clock: ltc.Clock, - Stopper: ltc.Stopper, + Stopper: ltc.stopper, Settings: cfg.Settings, }) cfg.RPCContext.NodeID.Set(ambient.AnnotateCtx(context.Background()), nodeID) - c := &cfg.RPCContext.ClusterID + clusterID := &cfg.RPCContext.ClusterID server := rpc.NewServer(cfg.RPCContext) // never started - ltc.Gossip = gossip.New(ambient, c, nc, cfg.RPCContext, server, ltc.Stopper, metric.NewRegistry(), roachpb.Locality{}, zonepb.DefaultZoneConfigRef()) + ltc.Gossip = gossip.New(ambient, clusterID, nc, cfg.RPCContext, server, ltc.stopper, metric.NewRegistry(), roachpb.Locality{}, zonepb.DefaultZoneConfigRef()) ltc.Eng = storage.NewInMem(ambient.AnnotateCtx(context.Background()), storage.DefaultStorageEngine, roachpb.Attributes{}, 50<<20) - ltc.Stopper.AddCloser(ltc.Eng) + ltc.stopper.AddCloser(ltc.Eng) ltc.Stores = kvserver.NewStores(ambient, ltc.Clock) - factory := initFactory(cfg.Settings, nodeDesc, ambient.Tracer, ltc.Clock, ltc.Latency, ltc.Stores, ltc.Stopper, ltc.Gossip) - if ltc.DBContext == nil { - dbCtx := kv.DefaultDBContext() - dbCtx.Stopper = ltc.Stopper - ltc.DBContext = &dbCtx - } - { - var c base.NodeIDContainer - c.Set(context.Background(), nodeID) - ltc.DBContext.NodeID = base.NewSQLIDContainer(0, &c, true /* exposed */) + factory := initFactory(cfg.Settings, nodeDesc, ambient.Tracer, ltc.Clock, ltc.Latency, ltc.Stores, ltc.stopper, ltc.Gossip) + + var nodeIDContainer base.NodeIDContainer + nodeIDContainer.Set(context.Background(), nodeID) + + ltc.dbContext = &kv.DBContext{ + UserPriority: roachpb.NormalUserPriority, + Stopper: ltc.stopper, + NodeID: base.NewSQLIDContainer(0, &nodeIDContainer, true /* exposed */), } - ltc.DB = kv.NewDBWithContext(cfg.AmbientCtx, factory, ltc.Clock, *ltc.DBContext) + ltc.DB = kv.NewDBWithContext(cfg.AmbientCtx, factory, ltc.Clock, *ltc.dbContext) transport := kvserver.NewDummyRaftTransport(cfg.Settings) // By default, disable the replica scanner and split queue, which // confuse tests using LocalTestCluster. @@ -218,10 +221,10 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } if !ltc.DisableLivenessHeartbeat { - cfg.NodeLiveness.StartHeartbeat(ctx, ltc.Stopper, []storage.Engine{ltc.Eng}, nil /* alive */) + cfg.NodeLiveness.StartHeartbeat(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) } - if err := ltc.Store.Start(ctx, ltc.Stopper); err != nil { + if err := ltc.Store.Start(ctx, ltc.stopper); err != nil { t.Fatalf("unable to start local test cluster: %s", err) } @@ -244,5 +247,5 @@ func (ltc *LocalTestCluster) Stop() { if r := recover(); r != nil { panic(r) } - ltc.Stopper.Stop(context.TODO()) + ltc.stopper.Stop(context.TODO()) } diff --git a/pkg/util/leaktest/leaktest.go b/pkg/util/leaktest/leaktest.go index 18fea0533ecf..354277a43c2b 100644 --- a/pkg/util/leaktest/leaktest.go +++ b/pkg/util/leaktest/leaktest.go @@ -29,7 +29,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/petermattis/goid" @@ -86,6 +85,10 @@ func interestingGoroutines() map[int64]string { // mis-attributed as leaked by the currently-running test. var leakDetectorDisabled uint32 +// PrintLeakedStoppers is injected from `pkg/util/stop` to avoid a dependency +// cycle. +var PrintLeakedStoppers = func(t testing.TB) {} + // AfterTest snapshots the currently-running goroutines and returns a // function to be run at the end of tests to see whether any // goroutines leaked. @@ -111,7 +114,7 @@ func AfterTest(t testing.TB) func() { // TODO(tbg): make this call 't.Error' instead of 't.Logf' once there is // enough Stopper discipline. - stop.PrintLeakedStoppers(t) + PrintLeakedStoppers(t) // Loop, waiting for goroutines to shut down. // Wait up to 5 seconds, but finish as quickly as possible. diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 6c6b62130330..6e5731d45aa9 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -18,11 +18,13 @@ import ( "sort" "strings" "sync" + "testing" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/caller" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -31,6 +33,10 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +func init() { + leaktest.PrintLeakedStoppers = PrintLeakedStoppers +} + const asyncTaskNamePrefix = "[async] " // ErrThrottled is returned from RunLimitedAsyncTask in the event that there @@ -44,7 +50,7 @@ var ErrUnavailable = &roachpb.NodeUnavailableError{} func register(s *Stopper) { trackedStoppers.Lock() trackedStoppers.stoppers = append(trackedStoppers.stoppers, - stopperWithStack{s: s, stack: debug.Stack()}) + stopperWithStack{s: s, createdAt: string(debug.Stack())}) trackedStoppers.Unlock() } @@ -62,8 +68,8 @@ func unregister(s *Stopper) { } type stopperWithStack struct { - s *Stopper - stack []byte + s *Stopper + createdAt string // stack from NewStopper() } var trackedStoppers struct { @@ -86,13 +92,11 @@ func HandleDebug(w http.ResponseWriter, r *http.Request) { // PrintLeakedStoppers prints (using `t`) the creation site of each Stopper // for which `.Stop()` has not yet been called. -func PrintLeakedStoppers(t interface { - Logf(string, ...interface{}) -}) { +func PrintLeakedStoppers(t testing.TB) { trackedStoppers.Lock() defer trackedStoppers.Unlock() for _, tracked := range trackedStoppers.stoppers { - t.Logf("leaked stopper, created at:\n%s", tracked.stack) + t.Logf("leaked stopper, created at:\n%s", tracked.createdAt) } }