Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix cluster_execution_insights priority level #86901

Merged
merged 1 commit into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2296,6 +2296,7 @@ func (ex *connExecutor) recordTransactionFinish(
RowsRead: ex.extraTxnState.rowsRead,
RowsWritten: ex.extraTxnState.rowsWritten,
BytesRead: ex.extraTxnState.bytesRead,
Priority: ex.state.priority,
}

if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6344,7 +6344,7 @@ CREATE TABLE crdb_internal.%s (
plan_gist STRING NOT NULL,
rows_read INT8 NOT NULL,
rows_written INT8 NOT NULL,
priority FLOAT NOT NULL,
priority STRING NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING,
exec_node_ids INT[] NOT NULL,
Expand Down Expand Up @@ -6456,7 +6456,7 @@ func populateExecutionInsights(
tree.NewDString(insight.Statement.PlanGist),
tree.NewDInt(tree.DInt(insight.Statement.RowsRead)),
tree.NewDInt(tree.DInt(insight.Statement.RowsWritten)),
tree.NewDFloat(tree.DFloat(insight.Transaction.UserPriority)),
tree.NewDString(insight.Transaction.UserPriority),
tree.NewDInt(tree.DInt(insight.Statement.Retries)),
autoRetryReason,
execNodeIDs,
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/create_statements
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
plan_gist STRING NOT NULL,
rows_read INT8 NOT NULL,
rows_written INT8 NOT NULL,
priority FLOAT8 NOT NULL,
priority STRING NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
Expand All @@ -285,7 +285,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
plan_gist STRING NOT NULL,
rows_read INT8 NOT NULL,
rows_written INT8 NOT NULL,
priority FLOAT8 NOT NULL,
priority STRING NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
Expand Down Expand Up @@ -977,7 +977,7 @@ CREATE TABLE crdb_internal.node_execution_insights (
plan_gist STRING NOT NULL,
rows_read INT8 NOT NULL,
rows_written INT8 NOT NULL,
priority FLOAT8 NOT NULL,
priority STRING NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
Expand All @@ -1001,7 +1001,7 @@ CREATE TABLE crdb_internal.node_execution_insights (
plan_gist STRING NOT NULL,
rows_read INT8 NOT NULL,
rows_written INT8 NOT NULL,
priority FLOAT8 NOT NULL,
priority STRING NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/insights/insights.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ message Transaction {
[(gogoproto.customname) = "FingerprintID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID",
(gogoproto.nullable) = false];
double user_priority = 3;
string user_priority = 3;
}

message Statement {
Expand Down
106 changes: 106 additions & 0 deletions pkg/sql/sqlstats/insights/integration/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,112 @@ func TestInsightsIntegration(t *testing.T) {
}, 1*time.Second)
}

func TestInsightsPriorityIntegration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const appName = "TestInsightsPriorityIntegration"

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}}
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

// Enable detection by setting a latencyThreshold > 0.
latencyThreshold := 50 * time.Millisecond
insights.LatencyThreshold.Override(ctx, &settings.SV, latencyThreshold)

_, err := conn.ExecContext(ctx, "SET SESSION application_name=$1", appName)
require.NoError(t, err)

_, err = conn.Exec("CREATE TABLE t (id string, s string);")
require.NoError(t, err)

queryDelayInSeconds := 2 * latencyThreshold.Seconds()
// Execute a "long-running" statement, running longer than our latencyThreshold.
_, err = conn.ExecContext(ctx, "SELECT pg_sleep($1)", queryDelayInSeconds)
require.NoError(t, err)

var priorities = []struct {
setPriorityQuery string
query string
queryNoValues string
expectedPriorityValue string
}{
{
setPriorityQuery: "SET TRANSACTION PRIORITY LOW",
query: "INSERT INTO t(id, s) VALUES ('test', 'originalValue')",
queryNoValues: "INSERT INTO t(id, s) VALUES ('_', '_')",
expectedPriorityValue: "low",
},
{
setPriorityQuery: "SET TRANSACTION PRIORITY NORMAL",
query: "UPDATE t set s = 'updatedValue' where id = 'test'",
queryNoValues: "UPDATE t SET s = '_' WHERE id = '_'",
expectedPriorityValue: "normal",
},
{
setPriorityQuery: "SELECT 1", // use a dummy query to validate default scenario
query: "UPDATE t set s = 'updatedValue'",
queryNoValues: "UPDATE t SET s = '_'",
expectedPriorityValue: "normal",
},
{
setPriorityQuery: "SET TRANSACTION PRIORITY HIGH",
query: "DELETE FROM t WHERE t.s = 'originalValue'",
queryNoValues: "DELETE FROM t WHERE t.s = '_'",
expectedPriorityValue: "high",
},
}

for _, p := range priorities {
testutils.SucceedsWithin(t, func() error {
tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{})
require.NoError(t, errTxn)

_, errTxn = tx.ExecContext(ctx, p.setPriorityQuery)
require.NoError(t, errTxn)

_, errTxn = tx.ExecContext(ctx, p.query)
require.NoError(t, errTxn)

_, errTxn = tx.ExecContext(ctx, "select pg_sleep(.1);")
require.NoError(t, errTxn)
errTxn = tx.Commit()
require.NoError(t, errTxn)
return nil
}, 2*time.Second)

testutils.SucceedsWithin(t, func() error {
row := conn.QueryRowContext(ctx, "SELECT "+
"query, "+
"priority "+
"FROM crdb_internal.node_execution_insights where "+
"app_name = $1 and query = $2 ", appName, p.queryNoValues)

var query, priority string
err = row.Scan(&query, &priority)

if err != nil {
return err
}

if query != p.queryNoValues {
return fmt.Errorf("expected '%s', but was %s", p.queryNoValues, query)
}

if priority != p.expectedPriorityValue {
return fmt.Errorf("expected '%s', but was %s", p.expectedPriorityValue, priority)
}

return nil
}, 2*time.Second)
}
}

func TestInsightsIntegrationForContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ func (s *Container) RecordTransaction(

s.insights.ObserveTransaction(value.SessionID, &insights.Transaction{
ID: value.TransactionID,
FingerprintID: key})
FingerprintID: key,
UserPriority: value.Priority.String()})

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,5 @@ type RecordedTxnStats struct {
RowsRead int64
RowsWritten int64
BytesRead int64
Priority roachpb.UserPriority
}