diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index ade924808a02..e0512349e242 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2296,6 +2296,7 @@ func (ex *connExecutor) recordTransactionFinish( RowsRead: ex.extraTxnState.rowsRead, RowsWritten: ex.extraTxnState.rowsWritten, BytesRead: ex.extraTxnState.bytesRead, + Priority: ex.state.priority, } if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index d8a7ee973a36..a9b81ba63cb0 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6344,7 +6344,7 @@ CREATE TABLE crdb_internal.%s ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING, exec_node_ids INT[] NOT NULL, @@ -6456,7 +6456,7 @@ func populateExecutionInsights( tree.NewDString(insight.Statement.PlanGist), tree.NewDInt(tree.DInt(insight.Statement.RowsRead)), tree.NewDInt(tree.DInt(insight.Statement.RowsWritten)), - tree.NewDFloat(tree.DFloat(insight.Transaction.UserPriority)), + tree.NewDString(insight.Transaction.UserPriority), tree.NewDInt(tree.DInt(insight.Statement.Retries)), autoRetryReason, execNodeIDs, diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 7c528e201e62..2a8a57affc42 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -261,7 +261,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -285,7 +285,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -977,7 +977,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -1001,7 +1001,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, diff --git a/pkg/sql/sqlstats/insights/insights.proto b/pkg/sql/sqlstats/insights/insights.proto index 31c4a2fac5f5..214e97e749ce 100644 --- a/pkg/sql/sqlstats/insights/insights.proto +++ b/pkg/sql/sqlstats/insights/insights.proto @@ -56,7 +56,7 @@ message Transaction { [(gogoproto.customname) = "FingerprintID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", (gogoproto.nullable) = false]; - double user_priority = 3; + string user_priority = 3; } message Statement { diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 129aa003ecc0..2c4aa4da337d 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -123,6 +123,112 @@ func TestInsightsIntegration(t *testing.T) { }, 1*time.Second) } +func TestInsightsPriorityIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const appName = "TestInsightsPriorityIntegration" + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}} + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Enable detection by setting a latencyThreshold > 0. + latencyThreshold := 50 * time.Millisecond + insights.LatencyThreshold.Override(ctx, &settings.SV, latencyThreshold) + + _, err := conn.ExecContext(ctx, "SET SESSION application_name=$1", appName) + require.NoError(t, err) + + _, err = conn.Exec("CREATE TABLE t (id string, s string);") + require.NoError(t, err) + + queryDelayInSeconds := 2 * latencyThreshold.Seconds() + // Execute a "long-running" statement, running longer than our latencyThreshold. + _, err = conn.ExecContext(ctx, "SELECT pg_sleep($1)", queryDelayInSeconds) + require.NoError(t, err) + + var priorities = []struct { + setPriorityQuery string + query string + queryNoValues string + expectedPriorityValue string + }{ + { + setPriorityQuery: "SET TRANSACTION PRIORITY LOW", + query: "INSERT INTO t(id, s) VALUES ('test', 'originalValue')", + queryNoValues: "INSERT INTO t(id, s) VALUES ('_', '_')", + expectedPriorityValue: "low", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY NORMAL", + query: "UPDATE t set s = 'updatedValue' where id = 'test'", + queryNoValues: "UPDATE t SET s = '_' WHERE id = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SELECT 1", // use a dummy query to validate default scenario + query: "UPDATE t set s = 'updatedValue'", + queryNoValues: "UPDATE t SET s = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY HIGH", + query: "DELETE FROM t WHERE t.s = 'originalValue'", + queryNoValues: "DELETE FROM t WHERE t.s = '_'", + expectedPriorityValue: "high", + }, + } + + for _, p := range priorities { + testutils.SucceedsWithin(t, func() error { + tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.setPriorityQuery) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.query) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, "select pg_sleep(.1);") + require.NoError(t, errTxn) + errTxn = tx.Commit() + require.NoError(t, errTxn) + return nil + }, 2*time.Second) + + testutils.SucceedsWithin(t, func() error { + row := conn.QueryRowContext(ctx, "SELECT "+ + "query, "+ + "priority "+ + "FROM crdb_internal.node_execution_insights where "+ + "app_name = $1 and query = $2 ", appName, p.queryNoValues) + + var query, priority string + err = row.Scan(&query, &priority) + + if err != nil { + return err + } + + if query != p.queryNoValues { + return fmt.Errorf("expected '%s', but was %s", p.queryNoValues, query) + } + + if priority != p.expectedPriorityValue { + return fmt.Errorf("expected '%s', but was %s", p.expectedPriorityValue, priority) + } + + return nil + }, 2*time.Second) + } +} + func TestInsightsIntegrationForContention(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index d5faea4db507..4c5056637097 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -313,7 +313,8 @@ func (s *Container) RecordTransaction( s.insights.ObserveTransaction(value.SessionID, &insights.Transaction{ ID: value.TransactionID, - FingerprintID: key}) + FingerprintID: key, + UserPriority: value.Priority.String()}) return nil } diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 650781052435..eb4d7b92a66a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -235,4 +235,5 @@ type RecordedTxnStats struct { RowsRead int64 RowsWritten int64 BytesRead int64 + Priority roachpb.UserPriority }