Skip to content

Commit

Permalink
server: add limit and sort capabilities to sql stats requests
Browse files Browse the repository at this point in the history
This commit adds limit and sort fields to the combined
statements request. These params can be used to specify
how much data is returned and the priority at which to
return data (e.g. top-k). The current sort options are:
- service latency
- cpu time
- p99 (statements only)
- contention time
- execution count

Epic: none
Part of: cockroachdb#97876
Part of: cockroachdb#97875

Release note: None
  • Loading branch information
xinhaoz committed May 15, 2023
1 parent bdfb915 commit 41f93d7
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 64 deletions.
171 changes: 107 additions & 64 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ func getCombinedStatementStats(
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) (*serverpb.StatementsResponse, error) {
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
showInternal := SQLStatsShowInternal.Get(&settings.SV)
whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs(
startTime, endTime, limit, testingKnobs, showInternal)
req, testingKnobs, showInternal, settings)

var statements []serverpb.StatementsResponse_CollectedStatementStatistics
var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics
var err error

if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit)
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -92,7 +90,7 @@ func getCombinedStatementStats(
whereClause, args = buildWhereClauseForStmtsByTxn(req, transactions, testingKnobs)
}

statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit)
statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -107,6 +105,39 @@ func getCombinedStatementStats(
return response, nil
}

// Common stmt and txn columns to sort on.
const (
sortSvcLatDesc = `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
sortExecCountDesc = `(statistics -> 'statistics' ->> 'cnt')::INT DESC`
sortContentionTimeDesc = `(statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::FLOAT DESC`
)

func getStmtColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

func getTxnColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

// buildWhereClauseForStmtsByTxn builds the where clause to get the statement
// stats based on a list of transactions. The list of transactions provided must
// contain no duplicate transaction fingerprint ids.
Expand Down Expand Up @@ -152,7 +183,10 @@ func buildWhereClauseForStmtsByTxn(
// The whereClause will be in the format `WHERE A = $1 AND B = $2` and
// args will return the list of arguments in order that will replace the actual values.
func getCombinedStatementsQueryClausesAndArgs(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs, showInternal bool,
req *serverpb.CombinedStatementsStatsRequest,
testingKnobs *sqlstats.TestingKnobs,
showInternal bool,
settings *cluster.Settings,
) (whereClause string, orderAndLimitClause string, args []interface{}) {
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())
Expand All @@ -164,17 +198,37 @@ func getCombinedStatementsQueryClausesAndArgs(
buffer.WriteString(fmt.Sprintf(" WHERE app_name NOT LIKE '%s%%'", catconstants.InternalAppNamePrefix))
}

if start != nil {
// Add start and end filters from request.
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
if startTime != nil {
buffer.WriteString(" AND aggregated_ts >= $1")
args = append(args, *start)
args = append(args, *startTime)
}

if end != nil {
args = append(args, *end)
if endTime != nil {
args = append(args, *endTime)
buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
}

// Add LIMIT from request.
limit := req.Limit
if limit == 0 {
limit = SQLStatsResponseMax.Get(&settings.SV)
}
args = append(args, limit)
orderAndLimitClause = fmt.Sprintf(` ORDER BY aggregated_ts DESC LIMIT $%d`, len(args))

// Determine sort column.
var col string
if req.FetchMode == nil {
col = "fingerprint_id"
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_StmtStatsOnly {
col = getStmtColumnFromSortOption(req.FetchMode.Sort)
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
col = getTxnColumnFromSortOption(req.FetchMode.Sort)
}

orderAndLimitClause = fmt.Sprintf(` ORDER BY %s LIMIT $%d`, col, len(args))

return buffer.String(), orderAndLimitClause, args
}
Expand All @@ -185,28 +239,28 @@ func collectCombinedStatements(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
aostClause := testingKnobs.GetAOSTClause()
query := fmt.Sprintf(`
SELECT * FROM (
SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

query := fmt.Sprintf(
`SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) AS sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata,
aggregation_interval
%s`, whereClause, orderAndLimit)

const expectedNumDatums = 8
const expectedNumDatums = 6

it, err := ie.QueryIteratorEx(ctx, "combined-stmts-by-interval", nil,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -264,20 +318,10 @@ func collectCombinedStatements(
return nil, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[6]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return nil, serverError(ctx, err)
}
metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan

aggInterval := tree.MustBeDInterval(row[7]).Duration

stmt := serverpb.StatementsResponse_CollectedStatementStatistics{
Key: serverpb.StatementsResponse_ExtendedStatementStatisticsKey{
KeyData: metadata.Key,
AggregatedTs: aggregatedTs,
AggregationInterval: time.Duration(aggInterval.Nanos()),
KeyData: metadata.Key,
AggregatedTs: aggregatedTs,
},
ID: roachpb.StmtFingerprintID(statementFingerprintID),
Stats: metadata.Stats,
Expand All @@ -300,25 +344,27 @@ func collectCombinedTransactions(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) {
aostClause := testingKnobs.GetAOSTClause()

query := fmt.Sprintf(`
SELECT * FROM (
SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

query := fmt.Sprintf(
`SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics,
aggregation_interval
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata,
aggregation_interval
%s`, whereClause, orderAndLimit)

const expectedNumDatums = 6
const expectedNumDatums = 5

it, err := ie.QueryIteratorEx(ctx, "combined-txns-by-interval", nil,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -366,15 +412,12 @@ func collectCombinedTransactions(
return nil, serverError(ctx, err)
}

aggInterval := tree.MustBeDInterval(row[5]).Duration

txnStats := serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics{
StatsData: roachpb.CollectedTransactionStatistics{
StatementFingerprintIDs: metadata.StatementFingerprintIDs,
App: app,
Stats: metadata.Stats,
AggregatedTs: aggregatedTs,
AggregationInterval: time.Duration(aggInterval.Nanos()),
TransactionFingerprintID: roachpb.TransactionFingerprintID(fingerprintID),
},
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,14 @@ message StatementsResponse {
repeated ExtendedCollectedTransactionStatistics transactions = 5 [(gogoproto.nullable) = false];
}

enum StatsSortOptions {
SERVICE_LAT = 0;
reserved 1; // This is for CPU Time in 23.1
EXECUTION_COUNT = 2;
reserved 3; // This is for P99 in 23.1
CONTENTION_TIME = 4;
}

message CombinedStatementsStatsRequest {
enum StatsType {
StmtStatsOnly = 0;
Expand All @@ -1462,6 +1470,7 @@ message CombinedStatementsStatsRequest {

message FetchMode {
StatsType stats_type = 1;
StatsSortOptions sort = 2;
}

// Unix time range for aggregated statements.
Expand All @@ -1480,6 +1489,8 @@ message CombinedStatementsStatsRequest {
// TODO (xinhaoz) - Split this API into stmts and txns properly instead of using
// this param.
FetchMode fetch_mode = 5 [(gogoproto.nullable) = true];

int64 limit = 6;
}

// StatementDetailsRequest requests the details of a Statement, based on its keys.
Expand Down

0 comments on commit 41f93d7

Please sign in to comment.