Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leaktest: print non-closed stoppers #51413

Merged
merged 2 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ func TestOracleFactory(t *testing.T) {
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
c := kv.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, kv.MockTxnSenderFactory{},
hlc.NewClock(hlc.UnixNano, time.Nanosecond))
}, kv.MockTxnSenderFactory{}, hlc.NewClock(hlc.UnixNano, time.Nanosecond), stopper)
txn := kv.NewTxn(context.Background(), c, 0)
of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{
Settings: st,
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -687,6 +688,9 @@ func TestReadConsistencyTypes(t *testing.T) {
roachpb.INCONSISTENT,
} {
t.Run(rc.String(), func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := kv.NonTransactionalFactoryFunc(
Expand All @@ -699,8 +703,7 @@ func TestReadConsistencyTypes(t *testing.T) {
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
ctx := context.Background()
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

prepWithRC := func() *kv.Batch {
b := &kv.Batch{}
Expand Down Expand Up @@ -830,6 +833,10 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := kv.MakeMockTxnSenderFactory(
Expand All @@ -839,7 +846,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

setup := func(nodeID roachpb.NodeID) *kv.DB {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
dbCtx := kv.DefaultDBContext()
dbCtx := kv.DefaultDBContext(stopper)
var c base.NodeIDContainer
if nodeID != 0 {
c.Set(context.Background(), nodeID)
Expand All @@ -849,7 +856,6 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
db := kv.NewDBWithContext(testutils.MakeAmbientCtx(), factory, clock, dbCtx)
return db
}
ctx := context.Background()

// Verify direct creation of Txns.
directCases := []struct {
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ type DBContext struct {

// DefaultDBContext returns (a copy of) the default options for
// NewDBWithContext.
func DefaultDBContext() DBContext {
func DefaultDBContext(stopper *stop.Stopper) DBContext {
var c base.NodeIDContainer
return DBContext{
UserPriority: roachpb.NormalUserPriority,
// TODO(tbg): this is ugly. Force callers to pass in an SQLIDContainer.
NodeID: base.NewSQLIDContainer(0, &c, true /* exposed */),
Stopper: stop.NewStopper(),
Stopper: stopper,
}
}

Expand Down Expand Up @@ -278,8 +278,10 @@ func (db *DB) Clock() *hlc.Clock {
}

// NewDB returns a new DB.
func NewDB(actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock) *DB {
return NewDBWithContext(actx, factory, clock, DefaultDBContext())
func NewDB(
actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper,
) *DB {
return NewDBWithContext(actx, factory, clock, DefaultDBContext(stopper))
}

// NewDBWithContext returns a new DB with the given parameters.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
},
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock())
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())

// Now, with an intent pending, attempt (asynchronously) to read
// from an arbitrary key. This will cause the distributed sender to
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster)

// This is purely to silence log spam.
config.TestingSetupZoneConfigHook(s.Stopper)
config.TestingSetupZoneConfigHook(s.Stopper())
defer s.Stop()

// Start test writer write about a 32K/key so there aren't too many
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
func TestRangeSplitsWithSameKeyTwice(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
DisableScanner: true,
DisableSplitQueue: true,
DisableMergeQueue: true,
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestRangeSplitsWithSameKeyTwice(t *testing.T) {
func TestRangeSplitsStickyBit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
DisableScanner: true,
DisableSplitQueue: true,
DisableMergeQueue: true,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {
},
s.DistSenderI().(*kvcoord.DistSender),
)
db := kv.NewDB(ambient, tsf, s.Clock())
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
Expand Down
61 changes: 29 additions & 32 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ import (
// createTestDB creates a local test server and starts it. The caller
// is responsible for stopping the test server.
func createTestDB(t testing.TB) *localtestcluster.LocalTestCluster {
return createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), nil)
return createTestDBWithKnobs(t, nil)
}

func createTestDBWithContextAndKnobs(
t testing.TB, dbCtx kv.DBContext, knobs *kvserver.StoreTestingKnobs,
func createTestDBWithKnobs(
t testing.TB, knobs *kvserver.StoreTestingKnobs,
) *localtestcluster.LocalTestCluster {
s := &localtestcluster.LocalTestCluster{
DBContext: &dbCtx,
StoreTestingKnobs: knobs,
}
s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster)
Expand Down Expand Up @@ -236,11 +235,11 @@ func TestTxnCoordSenderCondenseLockSpans(t *testing.T) {
AmbientCtx: ambient,
Settings: st,
Clock: s.Clock,
Stopper: s.Stopper,
Stopper: s.Stopper(),
},
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock)
db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper())
ctx := context.Background()

txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
Expand Down Expand Up @@ -281,7 +280,7 @@ func TestTxnCoordSenderCondenseLockSpans(t *testing.T) {
func TestTxnCoordSenderHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), &kvserver.StoreTestingKnobs{
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
DisableScanner: true,
DisableSplitQueue: true,
DisableMergeQueue: true,
Expand All @@ -305,14 +304,14 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) {
HeartbeatInterval: time.Millisecond,
Settings: s.Cfg.Settings,
Clock: s.Clock,
Stopper: s.Stopper,
Stopper: s.Stopper(),
},
NewDistSenderForLocalTestCluster(
s.Cfg.Settings, &roachpb.NodeDescriptor{NodeID: 1},
ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper, s.Gossip,
ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper(), s.Gossip,
),
)
quickHeartbeatDB := kv.NewDB(ambient, tsf, s.Clock)
quickHeartbeatDB := kv.NewDB(ambient, tsf, s.Clock, s.Stopper())

// We're going to test twice. In both cases the heartbeat is supposed to
// notice that its transaction is aborted, but:
Expand Down Expand Up @@ -659,7 +658,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {
},
}

s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), knobs)
s := createTestDBWithKnobs(t, knobs)
defer s.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -817,7 +816,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
},
senderFn,
)
db := kv.NewDB(ambient, tsf, clock)
db := kv.NewDB(ambient, tsf, clock, stopper)
key := roachpb.Key("test-key")
now := clock.Now()
origTxnProto := roachpb.MakeTransaction(
Expand Down Expand Up @@ -961,7 +960,7 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
)
defer stopper.Stop(ctx)

db := kv.NewDB(ambient, factory, clock)
db := kv.NewDB(ambient, factory, clock, stopper)
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)

// Acquire locks on a-b, c, u-w before the final batch.
Expand Down Expand Up @@ -1052,9 +1051,7 @@ func checkTxnMetricsOnce(
// have a faster sample interval and returns a cleanup function to be
// executed by callers.
func setupMetricsTest(t *testing.T) (*localtestcluster.LocalTestCluster, TxnMetrics, func()) {
dbCtx := kv.DefaultDBContext()
s := &localtestcluster.LocalTestCluster{
DBContext: &dbCtx,
// Liveness heartbeat txns mess up the metrics.
DisableLivenessHeartbeat: true,
DontCreateSystemRanges: true,
Expand Down Expand Up @@ -1335,7 +1332,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
senderFn,
)

db := kv.NewDB(ambient, factory, clock)
db := kv.NewDB(ambient, factory, clock, stopper)
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
if pErr := txn.Put(ctx, "a", "b"); pErr != nil {
t.Fatalf("put failed: %s", pErr)
Expand Down Expand Up @@ -1415,7 +1412,7 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) {
},
sender,
)
db := kv.NewDB(ambient, factory, clock)
db := kv.NewDB(ambient, factory, clock, stopper)

sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if _, ok := ba.GetArg(roachpb.EndTxn); !ok {
Expand Down Expand Up @@ -1483,7 +1480,7 @@ func TestOnePCErrorTracking(t *testing.T) {
},
sender,
)
db := kv.NewDB(ambient, factory, clock)
db := kv.NewDB(ambient, factory, clock, stopper)
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

// Register a matcher catching the commit attempt.
Expand Down Expand Up @@ -1577,7 +1574,7 @@ func TestCommitReadOnlyTransaction(t *testing.T) {
testutils.RunTrueAndFalse(t, "explicit txn", func(t *testing.T, explicitTxn bool) {
testutils.RunTrueAndFalse(t, "with get", func(t *testing.T, withGet bool) {
calls = nil
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
if withGet {
Expand Down Expand Up @@ -1683,7 +1680,7 @@ func TestCommitMutatingTransaction(t *testing.T) {
for i, test := range testArgs {
t.Run(test.expMethod.String(), func(t *testing.T) {
calls = nil
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)
if err := db.Txn(ctx, test.f); err != nil {
t.Fatalf("%d: unexpected error on commit: %s", i, err)
}
Expand Down Expand Up @@ -1726,7 +1723,7 @@ func TestAbortReadOnlyTransaction(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)
if err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
return errors.New("foo")
}); err == nil {
Expand Down Expand Up @@ -1783,7 +1780,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
testutils.RunTrueAndFalse(t, "success", func(t *testing.T, success bool) {
Expand Down Expand Up @@ -1875,7 +1872,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

if err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
defer func() { attempt++ }()
Expand Down Expand Up @@ -1931,7 +1928,7 @@ func TestSequenceNumbers(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -1981,7 +1978,7 @@ func TestConcurrentTxnRequestsProhibited(t *testing.T) {
},
sender,
)
db := kv.NewDB(ambient, factory, clock)
db := kv.NewDB(ambient, factory, clock, stopper)

err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
g, gCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -2023,7 +2020,7 @@ func TestTxnRequestTxnTimestamp(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

curReq := 0
requests := []struct {
Expand Down Expand Up @@ -2099,7 +2096,7 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
},
sender,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

// We're going to run two tests: one where the EndTxn is by itself in a
// batch, one where it is not. As of June 2018, the EndTxn is elided in
Expand Down Expand Up @@ -2163,9 +2160,9 @@ func TestTxnCoordSenderPipelining(t *testing.T) {
AmbientCtx: ambientCtx,
Settings: s.Cfg.Settings,
Clock: s.Clock,
Stopper: s.Stopper,
Stopper: s.Stopper(),
}, senderFn)
db := kv.NewDB(ambientCtx, tsf, s.Clock)
db := kv.NewDB(ambientCtx, tsf, s.Clock, s.Stopper())

err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.Put(ctx, "key", "val")
Expand Down Expand Up @@ -2244,7 +2241,7 @@ func TestAnchorKey(t *testing.T) {
},
senderFn,
)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock, stopper)

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ba := txn.NewBatch()
Expand Down Expand Up @@ -2282,7 +2279,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
},
}

s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), knobs)
s := createTestDBWithKnobs(t, knobs)
defer s.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -2313,7 +2310,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
func TestUpdateRoootWithLeafFinalStateInAbortedTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDBWithContextAndKnobs(t, kv.DefaultDBContext(), nil /* knobs */)
s := createTestDBWithKnobs(t, nil /* knobs */)
defer s.Stop()
ctx := context.Background()

Expand Down
Loading