diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index b9debb1da7fc..626497e3b245 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -416,7 +416,8 @@ func makeMetrics(internal bool) Metrics { 6*metricsSampleInterval), SQLTxnLatency: metric.NewLatency(getMetricMeta(MetaSQLTxnLatency, internal), 6*metricsSampleInterval), - SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)), + SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)), + SQLActiveStatements: metric.NewGauge(getMetricMeta(MetaSQLActiveQueries, internal)), TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)), FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)), diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 925f09df5ed1..d96b15caf3b4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -317,6 +317,9 @@ func (ex *connExecutor) execStmtInOpenState( } ex.addActiveQuery(ast, formatWithPlaceholders(ast, ex.planner.EvalContext()), queryID, ex.state.cancel) + if ex.executorType != executorTypeInternal { + ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1) + } // Make sure that we always unregister the query. It also deals with // overwriting res.Error to a more user-friendly message in case of query @@ -330,6 +333,9 @@ func (ex *connExecutor) execStmtInOpenState( } } ex.removeActiveQuery(queryID, ast) + if ex.executorType != executorTypeInternal { + ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1) + } // Detect context cancelation and overwrite whatever error might have been // set on the result before. The idea is that once the query's context is @@ -1983,7 +1989,9 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64() } - ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1) + if ex.executorType != executorTypeInternal { + ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1) + } ex.extraTxnState.shouldExecuteOnTxnFinish = true ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart @@ -2020,7 +2028,9 @@ func (ex *connExecutor) recordTransactionFinish( txnEnd := timeutil.Now() txnTime := txnEnd.Sub(txnStart) - ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) + if ex.executorType != executorTypeInternal { + ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) + } ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds()) ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 858a9d9e5f0f..0f91ff9e85ed 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -1410,6 +1411,149 @@ func TestInjectRetryErrors(t *testing.T) { }) } +func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params := base.TestServerArgs{} + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + dbConn := serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper()) + defer dbConn.Close() + + waitChannel := make(chan struct{}) + + selectQuery := "SELECT * FROM t.foo" + selectInternalQueryActive := `SELECT count(*) FROM crdb_internal.cluster_queries WHERE query = '` + selectQuery + `'` + selectUserQueryActive := `SELECT count(*) FROM [SHOW STATEMENTS] WHERE query = '` + selectQuery + `'` + + sqlServer := s.SQLServer().(*sql.Server) + testDB := sqlutils.MakeSQLRunner(sqlDB) + testDB.Exec(t, "CREATE DATABASE t") + testDB.Exec(t, "CREATE TABLE t.foo (i INT PRIMARY KEY)") + testDB.Exec(t, "INSERT INTO t.foo VALUES (1)") + + // Begin a user-initiated transaction. + testDB.Exec(t, "BEGIN") + + // Check that the number of open transactions has incremented. + require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + + // Create a state of contention. + testDB.Exec(t, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE") + + // Execute internal statement (this case is identical to opening an internal + // transaction). + go func() { + _, err := s.InternalExecutor().(*sql.InternalExecutor).ExecEx(ctx, + "test-internal-active-stmt-wait", + nil, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + selectQuery) + require.NoError(t, err, "expected internal SELECT query to be successful, but encountered an error") + waitChannel <- struct{}{} + }() + + // Check that the internal statement is active. + testutils.SucceedsWithin(t, func() error { + row := testDB.QueryStr(t, selectInternalQueryActive) + if row[0][0] == "0" { + return errors.New("internal select query is not active yet") + } + return nil + }, 5*time.Second) + + testutils.SucceedsWithin(t, func() error { + // Check that the number of open transactions has not incremented. We only + // want to track user's open transactions. Open transaction count already + // at one from initial user-initiated transaction. + if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 1 { + return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + } + // Check that the number of active statements has not incremented. We only + // want to track user's active statements. + if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 { + return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + } + return nil + }, 5*time.Second) + + require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + + // Create active user-initiated statement. + go func() { + _, err := dbConn.Exec(selectQuery) + require.NoError(t, err, "expected user SELECT query to be successful, but encountered an error") + waitChannel <- struct{}{} + }() + + // Check that the user statement is active. + testutils.SucceedsWithin(t, func() error { + row := testDB.QueryStr(t, selectUserQueryActive) + if row[0][0] == "0" { + return errors.New("user select query is not active yet") + } + return nil + }, 5*time.Second) + + testutils.SucceedsWithin(t, func() error { + // Check that the number of open transactions has incremented. Second db + // connection creates an implicit user-initiated transaction. + if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 2 { + return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 2, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + } + // Check that the number of active statements has incremented. + if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 1 { + return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + } + return nil + }, 5*time.Second) + + require.Equal(t, int64(2), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + + // Commit the initial user-initiated transaction. The internal and user + // select queries are no longer in contention. + testDB.Exec(t, "COMMIT") + + // Check that both the internal & user statements are no longer active. + testutils.SucceedsWithin(t, func() error { + userRow := testDB.QueryStr(t, selectUserQueryActive) + internalRow := testDB.QueryStr(t, selectInternalQueryActive) + + if userRow[0][0] != "0" { + return errors.New("user select query is still active") + } else if internalRow[0][0] != "0" { + return errors.New("internal select query is still active") + } + return nil + }, 5*time.Second) + + testutils.SucceedsWithin(t, func() error { + // Check that the number of open transactions has decremented by 2 (should + // decrement for initial user transaction and user statement executed + // on second db connection). + if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 0 { + return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + } + // Check that the number of active statements has decremented by 1 (should + // not decrement for the internal active statement). + if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 { + return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + } + return nil + }, 5*time.Second) + + require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) + require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) + + // Wait for both goroutine queries to finish before calling defer. + <-waitChannel + <-waitChannel +} + // dynamicRequestFilter exposes a filter method which is a // kvserverbase.ReplicaRequestFilter but can be set dynamically. type dynamicRequestFilter struct { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 2680236bf4ac..8467f83ce41b 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -789,10 +789,16 @@ var ( } MetaSQLTxnsOpen = metric.Metadata{ Name: "sql.txns.open", - Help: "Number of currently open SQL transactions", + Help: "Number of currently open user SQL transactions", Measurement: "Open SQL Transactions", Unit: metric.Unit_COUNT, } + MetaSQLActiveQueries = metric.Metadata{ + Name: "sql.statements.active", + Help: "Number of currently active user SQL statements", + Measurement: "Active Statements", + Unit: metric.Unit_COUNT, + } MetaFullTableOrIndexScan = metric.Metadata{ Name: "sql.full.scan.count", Help: "Number of full table or index scans", diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 4f4ee3ab6132..fec766453f19 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -39,6 +39,7 @@ type EngineMetrics struct { SQLServiceLatency *metric.Histogram SQLTxnLatency *metric.Histogram SQLTxnsOpen *metric.Gauge + SQLActiveStatements *metric.Gauge // TxnAbortCount counts transactions that were aborted, either due // to non-retriable errors, or retriable errors when the client-side diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index dd0008c11631..b1fc8e9427f8 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2131,6 +2131,14 @@ var charts = []sectionDescription{ }, AxisLabel: "Transactions", }, + { + Title: "Active Statements", + Metrics: []string{ + "sql.statements.active", + "sql.statements.active.internal", + }, + AxisLabel: "Active Statements", + }, { Title: "Full Table Index Scans", Metrics: []string{ diff --git a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/sql.tsx b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/sql.tsx index a8b1203bcde6..a765d2e6e330 100644 --- a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/sql.tsx +++ b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/sql.tsx @@ -63,7 +63,7 @@ export default function(props: GraphDashboardProps) { >