Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make UI user active statements and open transactions consistent #75815

Merged
merged 1 commit into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ func makeMetrics(internal bool) Metrics {
6*metricsSampleInterval),
SQLTxnLatency: metric.NewLatency(getMetricMeta(MetaSQLTxnLatency, internal),
6*metricsSampleInterval),
SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLActiveStatements: metric.NewGauge(getMetricMeta(MetaSQLActiveQueries, internal)),

TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)),
FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)),
Expand Down
14 changes: 12 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func (ex *connExecutor) execStmtInOpenState(
}

ex.addActiveQuery(ast, formatWithPlaceholders(ast, ex.planner.EvalContext()), queryID, ex.state.cancel)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
}

// Make sure that we always unregister the query. It also deals with
// overwriting res.Error to a more user-friendly message in case of query
Expand All @@ -330,6 +333,9 @@ func (ex *connExecutor) execStmtInOpenState(
}
}
ex.removeActiveQuery(queryID, ast)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
}

// Detect context cancelation and overwrite whatever error might have been
// set on the result before. The idea is that once the query's context is
Expand Down Expand Up @@ -1983,7 +1989,9 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64()
}

ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)
}

ex.extraTxnState.shouldExecuteOnTxnFinish = true
ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart
Expand Down Expand Up @@ -2020,7 +2028,9 @@ func (ex *connExecutor) recordTransactionFinish(

txnEnd := timeutil.Now()
txnTime := txnEnd.Sub(txnStart)
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
}
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
Expand Down
144 changes: 144 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1410,6 +1411,149 @@ func TestInjectRetryErrors(t *testing.T) {
})
}

func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params := base.TestServerArgs{}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
dbConn := serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper())
defer dbConn.Close()

waitChannel := make(chan struct{})

selectQuery := "SELECT * FROM t.foo"
selectInternalQueryActive := `SELECT count(*) FROM crdb_internal.cluster_queries WHERE query = '` + selectQuery + `'`
selectUserQueryActive := `SELECT count(*) FROM [SHOW STATEMENTS] WHERE query = '` + selectQuery + `'`

sqlServer := s.SQLServer().(*sql.Server)
testDB := sqlutils.MakeSQLRunner(sqlDB)
testDB.Exec(t, "CREATE DATABASE t")
testDB.Exec(t, "CREATE TABLE t.foo (i INT PRIMARY KEY)")
testDB.Exec(t, "INSERT INTO t.foo VALUES (1)")

// Begin a user-initiated transaction.
testDB.Exec(t, "BEGIN")

// Check that the number of open transactions has incremented.
require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())

// Create a state of contention.
testDB.Exec(t, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")

// Execute internal statement (this case is identical to opening an internal
// transaction).
go func() {
_, err := s.InternalExecutor().(*sql.InternalExecutor).ExecEx(ctx,
"test-internal-active-stmt-wait",
nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
selectQuery)
require.NoError(t, err, "expected internal SELECT query to be successful, but encountered an error")
waitChannel <- struct{}{}
}()

// Check that the internal statement is active.
testutils.SucceedsWithin(t, func() error {
row := testDB.QueryStr(t, selectInternalQueryActive)
if row[0][0] == "0" {
return errors.New("internal select query is not active yet")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has not incremented. We only
// want to track user's open transactions. Open transaction count already
// at one from initial user-initiated transaction.
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 1 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has not incremented. We only
// want to track user's active statements.
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Create active user-initiated statement.
go func() {
_, err := dbConn.Exec(selectQuery)
require.NoError(t, err, "expected user SELECT query to be successful, but encountered an error")
waitChannel <- struct{}{}
}()

// Check that the user statement is active.
testutils.SucceedsWithin(t, func() error {
row := testDB.QueryStr(t, selectUserQueryActive)
if row[0][0] == "0" {
return errors.New("user select query is not active yet")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has incremented. Second db
// connection creates an implicit user-initiated transaction.
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 2 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 2, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has incremented.
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 1 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 1, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(2), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Commit the initial user-initiated transaction. The internal and user
// select queries are no longer in contention.
testDB.Exec(t, "COMMIT")

// Check that both the internal & user statements are no longer active.
testutils.SucceedsWithin(t, func() error {
userRow := testDB.QueryStr(t, selectUserQueryActive)
internalRow := testDB.QueryStr(t, selectInternalQueryActive)

if userRow[0][0] != "0" {
return errors.New("user select query is still active")
} else if internalRow[0][0] != "0" {
return errors.New("internal select query is still active")
}
return nil
}, 5*time.Second)

testutils.SucceedsWithin(t, func() error {
// Check that the number of open transactions has decremented by 2 (should
// decrement for initial user transaction and user statement executed
// on second db connection).
if sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value() != 0 {
return errors.Newf("Wrong SQLTxnsOpen value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
}
// Check that the number of active statements has decremented by 1 (should
// not decrement for the internal active statement).
if sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value() != 0 {
return errors.Newf("Wrong SQLActiveStatements value. Expected: %d. Actual: %d", 0, sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
}
return nil
}, 5*time.Second)

require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())

// Wait for both goroutine queries to finish before calling defer.
<-waitChannel
<-waitChannel
}

// dynamicRequestFilter exposes a filter method which is a
// kvserverbase.ReplicaRequestFilter but can be set dynamically.
type dynamicRequestFilter struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,16 @@ var (
}
MetaSQLTxnsOpen = metric.Metadata{
Name: "sql.txns.open",
Help: "Number of currently open SQL transactions",
Help: "Number of currently open user SQL transactions",
Measurement: "Open SQL Transactions",
Unit: metric.Unit_COUNT,
}
MetaSQLActiveQueries = metric.Metadata{
Name: "sql.statements.active",
Help: "Number of currently active user SQL statements",
Measurement: "Active Statements",
Unit: metric.Unit_COUNT,
}
MetaFullTableOrIndexScan = metric.Metadata{
Name: "sql.full.scan.count",
Help: "Number of full table or index scans",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EngineMetrics struct {
SQLServiceLatency *metric.Histogram
SQLTxnLatency *metric.Histogram
SQLTxnsOpen *metric.Gauge
SQLActiveStatements *metric.Gauge

// TxnAbortCount counts transactions that were aborted, either due
// to non-retriable errors, or retriable errors when the client-side
Expand Down
8 changes: 8 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2131,6 +2131,14 @@ var charts = []sectionDescription{
},
AxisLabel: "Transactions",
},
{
Title: "Active Statements",
Metrics: []string{
"sql.statements.active",
"sql.statements.active.internal",
},
AxisLabel: "Active Statements",
},
{
Title: "Full Table Index Scans",
Metrics: []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default function(props: GraphDashboardProps) {
>
<Axis label="queries">
<Metric
name="cr.node.sql.distsql.queries.active"
name="cr.node.sql.statements.active"
title="Active Statements"
/>
</Axis>
Expand Down