Skip to content

Commit

Permalink
server: add limit and sort capabilities to sql stats requests
Browse files Browse the repository at this point in the history
This commit adds limit and sort fields to the combined
statements request. These params can be used to specify
how much data is returned and the priority at which to
return data (e.g. top-k). The current sort options are:
- service latency
- cpu time

Epic: none
Part of: cockroachdb#97876
Part of: cockroachdb#97875

Release note: None
  • Loading branch information
xinhaoz authored and maryliag committed Mar 18, 2023
1 parent 0ebb842 commit 4d4ea75
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 42 deletions.
117 changes: 75 additions & 42 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ func getCombinedStatementStats(
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) (*serverpb.StatementsResponse, error) {
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
showInternal := SQLStatsShowInternal.Get(&settings.SV)
whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs(
startTime, endTime, limit, testingKnobs, showInternal)
req, testingKnobs, showInternal, settings)

var statements []serverpb.StatementsResponse_CollectedStatementStatistics
var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics
var err error

if req.FetchMode != serverpb.StatsFetchMode_StmtStatsOnly {
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit)
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -92,7 +90,7 @@ func getCombinedStatementStats(
whereClause, args = buildWhereClauseForStmtsByTxn(req, transactions, testingKnobs)
}

statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit)
statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -107,6 +105,19 @@ func getCombinedStatementStats(
return response, nil
}

func getColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
case serverpb.StatsSortOptions_CPU_TIME:
return `(statistics -> 'statistics' -> 'execution_statistics' -> 'cpuSQLNanos' ->> 'mean')::FLOAT DESC`
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return `(statistics -> 'statistics' ->> 'cnt')::INT DESC`
default:
return `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
}
}

// buildWhereClauseForStmtsByTxn builds the where clause to get the statement
// stats based on a list of transactions. The list of transactions provided must
// contain no duplicate transaction fingerprint ids.
Expand Down Expand Up @@ -152,7 +163,10 @@ func buildWhereClauseForStmtsByTxn(
// The whereClause will be in the format `WHERE A = $1 AND B = $2` and
// args will return the list of arguments in order that will replace the actual values.
func getCombinedStatementsQueryClausesAndArgs(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs, showInternal bool,
req *serverpb.CombinedStatementsStatsRequest,
testingKnobs *sqlstats.TestingKnobs,
showInternal bool,
settings *cluster.Settings,
) (whereClause string, orderAndLimitClause string, args []interface{}) {
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())
Expand All @@ -167,17 +181,29 @@ func getCombinedStatementsQueryClausesAndArgs(
catconstants.DelegatedAppNamePrefix))
}

if start != nil {
// Add start and end filters from request.
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
if startTime != nil {
buffer.WriteString(" AND aggregated_ts >= $1")
args = append(args, *start)
args = append(args, *startTime)
}

if end != nil {
args = append(args, *end)
if endTime != nil {
args = append(args, *endTime)
buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
}

// Add LIMIT from request.
limit := req.Limit
if limit == 0 {
limit = SQLStatsResponseMax.Get(&settings.SV)
}
args = append(args, limit)
orderAndLimitClause = fmt.Sprintf(` ORDER BY aggregated_ts DESC LIMIT $%d`, len(args))

// Determine sort column.
col := getColumnFromSortOption(req.Sort)
orderAndLimitClause = fmt.Sprintf(` ORDER BY %s LIMIT $%d`, col, len(args))

return buffer.String(), orderAndLimitClause, args
}
Expand All @@ -188,23 +214,26 @@ func collectCombinedStatements(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {

query := fmt.Sprintf(
`SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
%s`, whereClause, orderAndLimit)
aostClause := testingKnobs.GetAOSTClause()
query := fmt.Sprintf(`
SELECT * FROM (
SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 6

Expand Down Expand Up @@ -288,21 +317,25 @@ func collectCombinedTransactions(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) {

query := fmt.Sprintf(
`SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata
%s`, whereClause, orderAndLimit)
aostClause := testingKnobs.GetAOSTClause()

query := fmt.Sprintf(`
SELECT * FROM (
SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
GROUP BY
app_name,
fingerprint_id,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 5

Expand Down
25 changes: 25 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,8 @@ message StatementsRequest {
int64 start = 3 [(gogoproto.nullable) = true];
int64 end = 4 [(gogoproto.nullable) = true];
StatsFetchMode fetch_mode = 5;
int64 limit = 6;
StatsSortOptions sort = 7;
}

message StatementsResponse {
Expand Down Expand Up @@ -1615,17 +1617,40 @@ message StatementsResponse {
repeated ExtendedCollectedTransactionStatistics transactions = 5 [(gogoproto.nullable) = false];
}

enum StatsSortOptions {
SERVICE_LAT = 0;
CPU_TIME = 1;
EXECUTION_COUNT = 2;
}
message CombinedStatementsStatsRequest {
// Unix time range for aggregated statements.
int64 start = 1 [(gogoproto.nullable) = true];
int64 end = 2 [(gogoproto.nullable) = true];

// Note that if fetch_mode is set to transactions only, we will also
// include the statement statistics for the stmts in the transactions
// response. This is more of a hack-y method to get the complete stats
// for txns, because in the client we need to fill in some txn stats info
// from its stmt stats, such as the query string.
//
// We prefer this hackier method right now to reduce surface area for backporting
// these changes, but in the future we will introduce more endpoints to properly
// organize these differing requests.
// TODO (xinhaoz) - Split this API into stmts and txns properly instead of using
// this fetch_mode.
StatsFetchMode fetch_mode = 5;

int64 limit = 6;

StatsSortOptions sort = 7;
}

message FlushedTxnsStatsRequest {
// Unix time range for aggregated statements.
int64 start = 1 [(gogoproto.nullable) = true];
int64 end = 2 [(gogoproto.nullable) = true];
int64 limit = 3;
StatsSortOptions sort = 4;
}

// StatementDetailsRequest requests the details of a Statement, based on its keys.
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (s *statusServer) Statements(
Start: req.Start,
End: req.End,
FetchMode: req.FetchMode,
Limit: req.Limit,
Sort: req.Sort,
}
return s.CombinedStatementStats(ctx, &combinedRequest)
}
Expand Down

0 comments on commit 4d4ea75

Please sign in to comment.