Skip to content

Commit

Permalink
server: add limit and sort capabilities to sql stats requests
Browse files Browse the repository at this point in the history
This commit adds limit and sort fields to the combined
statements request. These params can be used to specify
how much data is returned and the priority at which to
return data (e.g. top-k). The current sort options are:
- service latency
- cpu time
- p99 (statements only)
- contention time
- execution count

Epic: none
Part of: cockroachdb#97876
Part of: cockroachdb#97875

Release note: None
  • Loading branch information
xinhaoz committed Mar 20, 2023
1 parent b7f6e1a commit aa382df
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 42 deletions.
152 changes: 110 additions & 42 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ func getCombinedStatementStats(
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) (*serverpb.StatementsResponse, error) {
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
showInternal := SQLStatsShowInternal.Get(&settings.SV)
whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs(
startTime, endTime, limit, testingKnobs, showInternal)
req, testingKnobs, showInternal, settings)

var statements []serverpb.StatementsResponse_CollectedStatementStatistics
var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics
var err error

if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit)
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -92,7 +90,7 @@ func getCombinedStatementStats(
whereClause, args = buildWhereClauseForStmtsByTxn(req, transactions, testingKnobs)
}

statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit)
statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -107,6 +105,46 @@ func getCombinedStatementStats(
return response, nil
}

// Common stmt and txn columns to sort on.
const (
sortSvcLatDesc = `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
sortCPUTimeDesc = `(statistics -> 'statistics' -> 'execution_statistics' -> 'cpuSQLNanos' ->> 'mean')::FLOAT DESC`
sortExecCountDesc = `(statistics -> 'statistics' ->> 'cnt')::INT DESC`
sortContentionTimeDesc = `(statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::FLOAT DESC`
)

func getStmtColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_CPU_TIME:
return sortCPUTimeDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_P99_STMTS_ONLY:
return `(statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::FLOAT DESC`
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

func getTxnColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_CPU_TIME:
return sortCPUTimeDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

// buildWhereClauseForStmtsByTxn builds the where clause to get the statement
// stats based on a list of transactions. The list of transactions provided must
// contain no duplicate transaction fingerprint ids.
Expand Down Expand Up @@ -152,7 +190,10 @@ func buildWhereClauseForStmtsByTxn(
// The whereClause will be in the format `WHERE A = $1 AND B = $2` and
// args will return the list of arguments in order that will replace the actual values.
func getCombinedStatementsQueryClausesAndArgs(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs, showInternal bool,
req *serverpb.CombinedStatementsStatsRequest,
testingKnobs *sqlstats.TestingKnobs,
showInternal bool,
settings *cluster.Settings,
) (whereClause string, orderAndLimitClause string, args []interface{}) {
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())
Expand All @@ -167,17 +208,37 @@ func getCombinedStatementsQueryClausesAndArgs(
catconstants.DelegatedAppNamePrefix))
}

if start != nil {
// Add start and end filters from request.
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
if startTime != nil {
buffer.WriteString(" AND aggregated_ts >= $1")
args = append(args, *start)
args = append(args, *startTime)
}

if end != nil {
args = append(args, *end)
if endTime != nil {
args = append(args, *endTime)
buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
}

// Add LIMIT from request.
limit := req.Limit
if limit == 0 {
limit = SQLStatsResponseMax.Get(&settings.SV)
}
args = append(args, limit)
orderAndLimitClause = fmt.Sprintf(` ORDER BY aggregated_ts DESC LIMIT $%d`, len(args))

// Determine sort column.
var col string
if req.FetchMode == nil {
col = "fingerprint_id"
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_StmtStatsOnly {
col = getStmtColumnFromSortOption(req.FetchMode.Sort)
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
col = getTxnColumnFromSortOption(req.FetchMode.Sort)
}

orderAndLimitClause = fmt.Sprintf(` ORDER BY %s LIMIT $%d`, col, len(args))

return buffer.String(), orderAndLimitClause, args
}
Expand All @@ -188,23 +249,26 @@ func collectCombinedStatements(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {

query := fmt.Sprintf(
`SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
%s`, whereClause, orderAndLimit)
aostClause := testingKnobs.GetAOSTClause()
query := fmt.Sprintf(`
SELECT * FROM (
SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 6

Expand Down Expand Up @@ -288,21 +352,25 @@ func collectCombinedTransactions(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) {

query := fmt.Sprintf(
`SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata
%s`, whereClause, orderAndLimit)
aostClause := testingKnobs.GetAOSTClause()

query := fmt.Sprintf(`
SELECT * FROM (
SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 5

Expand Down
10 changes: 10 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,13 @@ message StatementsResponse {
repeated ExtendedCollectedTransactionStatistics transactions = 5 [(gogoproto.nullable) = false];
}

enum StatsSortOptions {
SERVICE_LAT = 0;
CPU_TIME = 1;
EXECUTION_COUNT = 2;
P99_STMTS_ONLY = 3;
CONTENTION_TIME = 4;
}
message CombinedStatementsStatsRequest {
enum StatsType {
StmtStatsOnly = 0;
Expand All @@ -1623,6 +1630,7 @@ message CombinedStatementsStatsRequest {

message FetchMode {
StatsType stats_type = 1;
StatsSortOptions sort = 2;
}

// Unix time range for aggregated statements.
Expand All @@ -1641,6 +1649,8 @@ message CombinedStatementsStatsRequest {
// TODO (xinhaoz) - Split this API into stmts and txns properly instead of using
// this param.
FetchMode fetch_mode = 5 [(gogoproto.nullable) = true];

int64 limit = 6;
}

// StatementDetailsRequest requests the details of a Statement, based on its keys.
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (s *statusServer) Statements(
combinedRequest := serverpb.CombinedStatementsStatsRequest{
Start: req.Start,
End: req.End,
<<<<<<< HEAD
=======
Limit: req.Limit,
>>>>>>> 20292009d1f (server: add limit and sort capabilities to sql stats requests)
}
return s.CombinedStatementStats(ctx, &combinedRequest)
}
Expand Down

0 comments on commit aa382df

Please sign in to comment.