Skip to content

Commit

Permalink
sql: make UI user active statements and open transactions consistent
Browse files Browse the repository at this point in the history
Previously, the number of Active SQL Statements could be greater than
the number of Open SQL Transactions on the UI. This inconsistency was
due to filtering out internal Open SQL Transactions but keeping internal
Active SQL Statements. This change fixes this inconsistency by filtering
out both internal Active SQL Statements and internal Open SQL
Transactions.

Release note (sql change): filter out internal statements and
transactions from UI timeseries metrics.
  • Loading branch information
Thomas Hardy committed Feb 9, 2022
1 parent 03c50b0 commit 406139e
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 5 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ func makeMetrics(internal bool) Metrics {
6*metricsSampleInterval),
SQLTxnLatency: metric.NewLatency(getMetricMeta(MetaSQLTxnLatency, internal),
6*metricsSampleInterval),
SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLActiveStatements: metric.NewGauge(getMetricMeta(MetaSQLActiveQueries, internal)),

TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)),
FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)),
Expand Down
14 changes: 12 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func (ex *connExecutor) execStmtInOpenState(
}

ex.addActiveQuery(ast, formatWithPlaceholders(ast, ex.planner.EvalContext()), queryID, ex.state.cancel)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
}

// Make sure that we always unregister the query. It also deals with
// overwriting res.Error to a more user-friendly message in case of query
Expand All @@ -330,6 +333,9 @@ func (ex *connExecutor) execStmtInOpenState(
}
}
ex.removeActiveQuery(queryID, ast)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
}

// Detect context cancelation and overwrite whatever error might have been
// set on the result before. The idea is that once the query's context is
Expand Down Expand Up @@ -1983,7 +1989,9 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64()
}

ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)
}

ex.extraTxnState.shouldExecuteOnTxnFinish = true
ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart
Expand Down Expand Up @@ -2020,7 +2028,9 @@ func (ex *connExecutor) recordTransactionFinish(

txnEnd := timeutil.Now()
txnTime := txnEnd.Sub(txnStart)
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
}
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
Expand Down
144 changes: 144 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1410,6 +1411,149 @@ func TestInjectRetryErrors(t *testing.T) {
})
}

func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params := base.TestServerArgs{}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
dbConn := serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper())
defer dbConn.Close()

waitChannel := make(chan struct{})

selectQuery := "SELECT * FROM t.foo"
selectInternalQueryActive := `SELECT count(*) FROM crdb_internal.cluster_queries WHERE query = '` + selectQuery + `'`
selectUserQueryActive := `SELECT count(*) FROM [SHOW STATEMENTS] WHERE query = '` + selectQuery + `'`

sqlServer := s.SQLServer().(*sql.Server)
testDB := sqlutils.MakeSQLRunner(sqlDB)
testDB.Exec(t, "CREATE DATABASE t")
testDB.Exec(t, "CREATE TABLE t.foo (i INT PRIMARY KEY)")
testDB.Exec(t, "INSERT INTO t.foo VALUES (1)")

// Begin a user-initiated transaction.
testDB.Exec(t, "BEGIN")

// Check that the number of open transactions has incremented.
require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())

// Create a state of contention.
testDB.Exec(t, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")

// Execute internal statement (this case is identical to opening an internal
// transaction).
go func() {
_, err := s.InternalExecutor().(*sql.InternalExecutor).ExecEx(ctx,
"test-internal-active-stmt-wait",
nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
selectQuery)
require.NoError(t, err, "expected internal SELECT query to be successful, but encountered an error")
waitChannel <- struct{}{}
}()

// Check that the internal statement is active.
testutils.SucceedsWithin(t, func() error {
row := testDB.QueryStr(t, selectInternalQueryActive)
if row[0][0] == "0" {
return errors.New("internal select query is not active yet")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has not incremented. We only
// want to track user's open transactions. Open transaction count already
// at one from initial user-initiated transaction.
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 1 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has not incremented. We only
// want to track user's active statements.
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Create active user-initiated statement.
go func() {
_, err := dbConn.Exec(selectQuery)
require.NoError(t, err, "expected user SELECT query to be successful, but encountered an error")
waitChannel <- struct{}{}
}()

// Check that the user statement is active.
testutils.SucceedsWithin(t, func() error {
row := testDB.QueryStr(t, selectUserQueryActive)
if row[0][0] == "0" {
return errors.New("user select query is not active yet")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has incremented. Second db
// connection creates an implicit user-initiated transaction.
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 2 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 2, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has incremented.
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 1 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(2), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Commit the initial user-initiated transaction. The internal and user
// select queries are no longer in contention.
testDB.Exec(t, "COMMIT")

// Check that both the internal & user statements are no longer active.
testutils.SucceedsWithin(t, func() error {
userRow := testDB.QueryStr(t, selectUserQueryActive)
internalRow := testDB.QueryStr(t, selectInternalQueryActive)

if userRow[0][0] != "0" {
return errors.New("user select query is still active")
} else if internalRow[0][0] != "0" {
return errors.New("internal select query is still active")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has decremented by 2 (should
// decrement for initial user transaction and user statement executed
// on second db connection).
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 0 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has decremented by 1 (should
// not decrement for the internal active statement).
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Wait for both goroutine queries to finish before calling defer.
<-waitChannel
<-waitChannel
}

// dynamicRequestFilter exposes a filter method which is a
// kvserverbase.ReplicaRequestFilter but can be set dynamically.
type dynamicRequestFilter struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,16 @@ var (
}
MetaSQLTxnsOpen = metric.Metadata{
Name: "sql.txns.open",
Help: "Number of currently open SQL transactions",
Help: "Number of currently open user SQL transactions",
Measurement: "Open SQL Transactions",
Unit: metric.Unit_COUNT,
}
MetaSQLActiveQueries = metric.Metadata{
Name: "sql.statements.active",
Help: "Number of currently active user SQL statements",
Measurement: "Active Statements",
Unit: metric.Unit_COUNT,
}
MetaFullTableOrIndexScan = metric.Metadata{
Name: "sql.full.scan.count",
Help: "Number of full table or index scans",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EngineMetrics struct {
SQLServiceLatency *metric.Histogram
SQLTxnLatency *metric.Histogram
SQLTxnsOpen *metric.Gauge
SQLActiveStatements *metric.Gauge

// TxnAbortCount counts transactions that were aborted, either due
// to non-retriable errors, or retriable errors when the client-side
Expand Down
8 changes: 8 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2131,6 +2131,14 @@ var charts = []sectionDescription{
},
AxisLabel: "Transactions",
},
{
Title: "Active Statements",
Metrics: []string{
"sql.statements.active",
"sql.statements.active.internal",
},
AxisLabel: "Active Statements",
},
{
Title: "Full Table Index Scans",
Metrics: []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default function(props: GraphDashboardProps) {
>
<Axis label="queries">
<Metric
name="cr.node.sql.distsql.queries.active"
name="cr.node.sql.statements.active"
title="Active Statements"
/>
</Axis>
Expand Down

0 comments on commit 406139e

Please sign in to comment.