diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 1d778c9d5c66..000e045be53d 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -484,7 +484,9 @@ func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) * } } -func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) { +func (rd *replicationDriver) setupC2C( + ctx context.Context, t test.Test, c cluster.Cluster, +) (cleanup func()) { if len(rd.rs.multiregion.srcLocalities) != 0 { nodeCount := rd.rs.srcNodes + rd.rs.dstNodes localityCount := len(rd.rs.multiregion.srcLocalities) + len(rd.rs.multiregion.destLocalities) @@ -585,6 +587,10 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } + return func() { + srcDB.Close() + destDB.Close() + } } func (rd *replicationDriver) crdbNodes() option.NodeListOption { @@ -776,7 +782,9 @@ func (rd *replicationDriver) onFingerprintMismatch( ) { rd.t.L().Printf("conducting table level fingerprints") srcTenantConn := rd.c.Conn(ctx, rd.t.L(), 1, option.TenantName(rd.setup.src.name)) + defer srcTenantConn.Close() dstTenantConn := rd.c.Conn(ctx, rd.t.L(), rd.rs.srcNodes+1, option.TenantName(rd.setup.dst.name)) + defer dstTenantConn.Close() fingerprintBisectErr := replicationutils.InvestigateFingerprints(ctx, srcTenantConn, dstTenantConn, startTime, endTime) @@ -957,7 +965,8 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.t.L().Printf("starting the destination tenant") - startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes) + conn := startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes) + conn.Close() rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -1030,7 +1039,8 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster suites: registry.Suites("nightly"), } rd := makeReplicationDriver(t, c, sp) - rd.setupC2C(ctx, t, c) + cleanup := rd.setupC2C(ctx, t, c) + defer cleanup() // Spin up a monitor to capture any node deaths. m := rd.newMonitor(ctx) @@ -1227,7 +1237,8 @@ func registerClusterToCluster(r registry.Registry) { c2cRegisterWrapper(r, sp, func(ctx context.Context, t test.Test, c cluster.Cluster) { rd := makeReplicationDriver(t, c, sp) - rd.setupC2C(ctx, t, c) + cleanup := rd.setupC2C(ctx, t, c) + defer cleanup() // Spin up a monitor to capture any node deaths. m := rd.newMonitor(ctx) m.Go(func(ctx context.Context) error { @@ -1499,7 +1510,8 @@ func registerClusterReplicationResilience(r registry.Registry) { rrd := makeReplShutdownDriver(t, c, rsp) rrd.t.L().Printf("Planning to shut down node during %s phase", rrd.phase) - rrd.setupC2C(ctx, t, c) + cleanup := rrd.setupC2C(ctx, t, c) + defer cleanup() shutdownSetupDone := make(chan struct{}) @@ -1609,7 +1621,8 @@ func registerClusterReplicationDisconnect(r registry.Registry) { } c2cRegisterWrapper(r, sp, func(ctx context.Context, t test.Test, c cluster.Cluster) { rd := makeReplicationDriver(t, c, sp) - rd.setupC2C(ctx, t, c) + cleanup := rd.setupC2C(ctx, t, c) + defer cleanup() shutdownSetupDone := make(chan struct{}) diff --git a/pkg/cmd/roachtest/tests/multitenant_tpch.go b/pkg/cmd/roachtest/tests/multitenant_tpch.go index 1cc23ffd26f9..606ce20284a2 100644 --- a/pkg/cmd/roachtest/tests/multitenant_tpch.go +++ b/pkg/cmd/roachtest/tests/multitenant_tpch.go @@ -93,7 +93,8 @@ func runMultiTenantTPCH( // Now we create a tenant and run all TPCH queries within it. if sharedProcess { - db := createInMemoryTenant(ctx, t, c, appTenantName, c.All(), true /* secure */) + db := createInMemoryTenantWithConn(ctx, t, c, appTenantName, c.All(), true /* secure */) + defer db.Close() url := fmt.Sprintf("{pgurl:1:%s}", appTenantName) runTPCH(db, url, 1 /* setupIdx */) } else { diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 0d7dab42ab95..3859d3ecc765 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -327,9 +327,7 @@ func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.S const appTenantName = "app" // createInMemoryTenant runs through the necessary steps to create an in-memory -// tenant without resource limits and full dbconsole viewing privileges. As a -// convenience, it also returns a connection to the tenant (on a random node in -// the cluster). +// tenant without resource limits and full dbconsole viewing privileges. func createInMemoryTenant( ctx context.Context, t test.Test, @@ -337,8 +335,26 @@ func createInMemoryTenant( tenantName string, nodes option.NodeListOption, secure bool, +) { + db := createInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure) + db.Close() +} + +// createInMemoryTenantWithConn runs through the necessary steps to create an +// in-memory tenant without resource limits and full dbconsole viewing +// privileges. As a convenience, it also returns a connection to the tenant (on +// a random node in the cluster). +func createInMemoryTenantWithConn( + ctx context.Context, + t test.Test, + c cluster.Cluster, + tenantName string, + nodes option.NodeListOption, + secure bool, ) *gosql.DB { - sysSQL := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), nodes.RandNode()[0])) + sysDB := c.Conn(ctx, t.L(), nodes.RandNode()[0]) + defer sysDB.Close() + sysSQL := sqlutils.MakeSQLRunner(sysDB) sysSQL.Exec(t, "CREATE TENANT $1", tenantName) tenantConn := startInMemoryTenant(ctx, t, c, tenantName, nodes) @@ -360,7 +376,9 @@ func startInMemoryTenant( tenantName string, nodes option.NodeListOption, ) *gosql.DB { - sysSQL := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), nodes.RandNode()[0])) + sysDB := c.Conn(ctx, t.L(), nodes.RandNode()[0]) + defer sysDB.Close() + sysSQL := sqlutils.MakeSQLRunner(sysDB) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) @@ -384,6 +402,7 @@ func startInMemoryTenant( return err } if err = tenantConn.Ping(); err != nil { + tenantConn.Close() return err } return nil diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index 00a1594cc3f1..ea2ca43e90a5 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -1431,7 +1431,7 @@ func runTPCCBench(ctx context.Context, t test.Test, c cluster.Cluster, b tpccBen var db *gosql.DB if b.SharedProcessMT { - db = createInMemoryTenant(ctx, t, c, appTenantName, roachNodes, false /* secure */) + db = createInMemoryTenantWithConn(ctx, t, c, appTenantName, roachNodes, false /* secure */) } else { db = c.Conn(ctx, t.L(), 1) } diff --git a/pkg/cmd/roachtest/tests/tpchvec.go b/pkg/cmd/roachtest/tests/tpchvec.go index 48805496d1f1..4c34837a26c2 100644 --- a/pkg/cmd/roachtest/tests/tpchvec.go +++ b/pkg/cmd/roachtest/tests/tpchvec.go @@ -601,7 +601,7 @@ func runTPCHVec( if _, err := singleTenantConn.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false;"); err != nil { t.Fatal(err) } - conn = createInMemoryTenant(ctx, t, c, appTenantName, c.All(), false /* secure */) + conn = createInMemoryTenantWithConn(ctx, t, c, appTenantName, c.All(), false /* secure */) } else { conn = c.Conn(ctx, t.L(), 1) disableMergeQueue = true