diff --git a/pkg/sql/opt/exec/explain/BUILD.bazel b/pkg/sql/opt/exec/explain/BUILD.bazel index a23a5e8a8d38..68f906c23879 100644 --- a/pkg/sql/opt/exec/explain/BUILD.bazel +++ b/pkg/sql/opt/exec/explain/BUILD.bazel @@ -60,6 +60,8 @@ go_test( embed = [":explain"], deps = [ "//pkg/base", + "//pkg/security/securityassets", + "//pkg/security/securitytest", "//pkg/server", "//pkg/sql/catalog/colinfo", "//pkg/sql/execinfra", diff --git a/pkg/sql/opt/exec/explain/main_test.go b/pkg/sql/opt/exec/explain/main_test.go index 9f85eaac495b..ef81e31d8df3 100644 --- a/pkg/sql/opt/exec/explain/main_test.go +++ b/pkg/sql/opt/exec/explain/main_test.go @@ -14,11 +14,14 @@ import ( "os" "testing" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" ) func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) serverutils.InitTestServerFactory(server.TestServerFactory) os.Exit(m.Run()) } diff --git a/pkg/sql/opt/exec/explain/output_test.go b/pkg/sql/opt/exec/explain/output_test.go index 596b54bc2fa0..3e8195763f58 100644 --- a/pkg/sql/opt/exec/explain/output_test.go +++ b/pkg/sql/opt/exec/explain/output_test.go @@ -19,14 +19,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -130,8 +128,6 @@ func TestMaxDiskSpillUsage(t *testing.T) { distSQLKnobs := &execinfra.TestingKnobs{} distSQLKnobs.ForceDiskSpill = true testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs - testClusterArgs.ServerArgs.Insecure = true - serverutils.InitTestServerFactory(server.TestServerFactory) tc := testcluster.StartTestCluster(t, 1, testClusterArgs) ctx := context.Background() defer tc.Stopper().Stop(ctx) @@ -150,9 +146,6 @@ CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 10) AS for rows.Next() { var res string assert.NoError(t, rows.Scan(&res)) - var sb strings.Builder - sb.WriteString(res) - sb.WriteByte('\n') if matches := re.FindStringSubmatch(res); len(matches) > 0 { return true } @@ -184,10 +177,8 @@ func TestCPUTimeEndToEnd(t *testing.T) { distSQLKnobs := &execinfra.TestingKnobs{} distSQLKnobs.ForceDiskSpill = true testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs - testClusterArgs.ServerArgs.Insecure = true const numNodes = 3 - serverutils.InitTestServerFactory(server.TestServerFactory) tc := testcluster.StartTestCluster(t, numNodes, testClusterArgs) ctx := context.Background() defer tc.Stopper().Stop(ctx) @@ -245,3 +236,123 @@ func TestCPUTimeEndToEnd(t *testing.T) { runQuery("SELECT * FROM (VALUES (1), (2), (3)) v(a) INNER LOOKUP JOIN t ON a = x", false /* hideCPU */) runQuery("SELECT count(*) FROM generate_series(1, 100000)", false /* hideCPU */) } + +// TestContentionTimeOnWrites verifies that the contention encountered during a +// mutation is reported on EXPLAIN ANALYZE output. +func TestContentionTimeOnWrites(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + runner := sqlutils.MakeSQLRunner(tc.Conns[0]) + runner.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY, v INT)") + + // The test involves three goroutines: + // - the main goroutine acts as the coordinator. It first waits for worker 1 + // to perform its mutation in an open txn, then blocks until worker 2 + // begins executing its mutation, then unblocks worker 1 and waits for + // both workers to exit. + // - worker 1 goroutine performs a mutation without committing a txn. It + // notifies the main goroutine by closing 'sem' once the mutation has been + // performed. It then blocks until 'commitCh' is closed by the main + // goroutine which allows worker 2 to experience contention. + // - worker 2 goroutine performs a mutation via EXPLAIN ANALYZE. This query + // will be blocked until worker 1 commits its txn, so it should see + // contention time reported in the output. + + sem := make(chan struct{}) + errCh := make(chan error, 1) + commitCh := make(chan struct{}) + go func() { + defer close(errCh) + // Ensure that sem is always closed (in case we encounter an error + // before the mutation is performed). + var closedSem bool + defer func() { + if !closedSem { + close(sem) + } + }() + txn, err := tc.Conns[0].Begin() + if err != nil { + errCh <- err + return + } + _, err = txn.Exec("INSERT INTO t VALUES (1, 1)") + if err != nil { + errCh <- err + return + } + // Notify the main goroutine that the mutation has been performed. + close(sem) + closedSem = true + // Block until the main goroutine tells us that we're good to commit. + <-commitCh + if err = txn.Commit(); err != nil { + errCh <- err + return + } + }() + + // Block until the mutation of worker 1 is done. + <-sem + // Check that no error was encountered before that. + select { + case err := <-errCh: + t.Fatal(err) + default: + } + + var foundContention bool + errCh2 := make(chan error, 1) + go func() { + defer close(errCh2) + // Execute the mutation via EXPLAIN ANALYZE and check whether the + // contention is reported. + contentionRE := regexp.MustCompile(`cumulative time spent due to contention.*`) + rows := runner.Query(t, "EXPLAIN ANALYZE UPSERT INTO t VALUES (1, 2)") + for rows.Next() { + var line string + if err := rows.Scan(&line); err != nil { + errCh2 <- err + return + } + if contentionRE.MatchString(line) { + foundContention = true + } + } + }() + + // Continuously poll the cluster queries until we see that the query that + // should be experiencing contention has started executing. + for { + row := runner.QueryRow(t, "SELECT count(*) FROM [SHOW CLUSTER QUERIES] WHERE query LIKE '%EXPLAIN ANALYZE UPSERT%'") + var count int + row.Scan(&count) + // Sleep for non-trivial amount of time to allow for worker 2 to start + // (if it hasn't already) and to experience the contention (if it has + // started). + time.Sleep(time.Second) + if count == 2 { + // We stop polling once we see 2 queries matching the LIKE pattern: + // the mutation query from worker 2 and the polling query itself. + break + } + } + + // Allow worker 1 to commit which should unblock both workers. + close(commitCh) + + // Wait for both workers to exit. Also perform sanity checks that the + // workers didn't run into any errors. + err := <-errCh + require.NoError(t, err) + err = <-errCh2 + require.NoError(t, err) + + // Meat of the test - verify that the contention was reported. + require.True(t, foundContention) +} diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 2ec93f5f88bd..1b0d35a58e58 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -47,6 +47,7 @@ type planNodeToRowSource struct { // run time state machine values row rowenc.EncDatumRow + contentionEventsListener execstats.ContentionEventsListener tenantConsumptionListener execstats.TenantConsumptionListener } @@ -168,7 +169,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS } func (p *planNodeToRowSource) Start(ctx context.Context) { - ctx = p.StartInternal(ctx, nodeName(p.node), &p.tenantConsumptionListener) + ctx = p.StartInternal(ctx, nodeName(p.node), &p.contentionEventsListener, &p.tenantConsumptionListener) p.params.ctx = ctx // This starts all of the nodes below this node. if err := startExec(p.params, p.node); err != nil { @@ -260,13 +261,14 @@ func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetad // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (p *planNodeToRowSource) execStatsForTrace() *execinfrapb.ComponentStats { - // Propagate RUs from IO requests. - // TODO(drewk): we should consider propagating other stats for planNode - // operators. - if p.tenantConsumptionListener.ConsumedRU == 0 { + // Propagate contention time and RUs from IO requests. + if p.contentionEventsListener.CumulativeContentionTime == 0 && p.tenantConsumptionListener.ConsumedRU == 0 { return nil } return &execinfrapb.ComponentStats{ + KV: execinfrapb.KVStats{ + ContentionTime: optional.MakeTimeValue(p.contentionEventsListener.CumulativeContentionTime), + }, Exec: execinfrapb.ExecStats{ ConsumedRU: optional.MakeUint(p.tenantConsumptionListener.ConsumedRU), },