diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index c29598994d88..a07bc5777888 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -384,10 +384,9 @@ FROM %s %s`, table, whereClauseOldestDate), args...) createActivityTableQuery := func(table string) string { return fmt.Sprintf(` -SELECT COALESCE( - execution_total_cluster_seconds, - 0) -FROM %s %s LIMIT 1`, table, whereClause) +SELECT COALESCE(sum(total_latency), 0) from (SELECT aggregated_ts, + max(execution_total_cluster_seconds) as total_latency +FROM %s %s GROUP BY aggregated_ts) %s`, table, whereClause, testingKnobs.GetAOSTClause()) } createStatsTableQuery := func(table string) string { diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index f4aea04decc8..091b7452d07e 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -2049,6 +2049,478 @@ func TestStatusAPIStatements(t *testing.T) { testPath(fmt.Sprintf("statements?combined=true&start=%d", aggregatedTs+60), nil) } +type testStatement struct { + query string + fingerprintID appstatspb.StmtFingerprintID + aggregatedTs time.Time + count int64 + serviceLatency appstatspb.NumericStat + addStatementStatistics bool + addStatementActivity bool +} + +func TestStatusAPICombinedStatementsTotalLatency(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t, "test is too slow to run under race") + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } + + testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + AOSTClause: "AS OF SYSTEM TIME '-1us'", + }, + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + }, + }, + }, + }) + defer testCluster.Stopper().Stop(context.Background()) + sqlDB := testCluster.ServerConn(0) + ts := testCluster.Server(0) + db := sqlutils.MakeSQLRunner(sqlDB) + + // Disabled flush so no extra data is inserted to the system table. + db.Exec(t, "SET CLUSTER SETTING sql.stats.activity.flush.enabled = 'f'") + // Give permission to write to system tables. + db.Exec(t, "INSERT INTO system.users VALUES ('node', NULL, true, 3)") + db.Exec(t, "GRANT node TO root") + + stmts := []testStatement{ + { + query: "SELECT * FROM foo", + fingerprintID: appstatspb.StmtFingerprintID(1088747230083123385), + aggregatedTs: timeutil.FromUnixMicros(1704103200000000), // January 1, 2024 10:00:00 + count: 1, + serviceLatency: appstatspb.NumericStat{Mean: 3, SquaredDiffs: 0}, + addStatementStatistics: true, + addStatementActivity: false, + }, + { + query: "SELECT * FROM bar", + fingerprintID: appstatspb.StmtFingerprintID(2422963415274537466), + aggregatedTs: timeutil.FromUnixMicros(1704106800000000), // January 1, 2024 11:00:00 + count: 5, + serviceLatency: appstatspb.NumericStat{Mean: 7, SquaredDiffs: 0}, + addStatementStatistics: true, + addStatementActivity: false, + }, + { + query: "SELECT * FROM abc", + fingerprintID: appstatspb.StmtFingerprintID(8810317408726404693), + aggregatedTs: timeutil.FromUnixMicros(1704110400000000), // January 1, 2024 12:00:00 + count: 11, + serviceLatency: appstatspb.NumericStat{Mean: 13, SquaredDiffs: 0}, + addStatementStatistics: true, + addStatementActivity: true, + }, + { + query: "SELECT * FROM xyz", + fingerprintID: appstatspb.StmtFingerprintID(2759320105256699956), + aggregatedTs: timeutil.FromUnixMicros(1704114000000000), // January 1, 2024 13:00:00 + count: 17, + serviceLatency: appstatspb.NumericStat{Mean: 19, SquaredDiffs: 0}, + addStatementStatistics: true, + addStatementActivity: true, + }, + } + + for _, stmt := range stmts { + s := generateStatement() + s.AggregatedTs = stmt.aggregatedTs + s.ID = stmt.fingerprintID + s.Key.Query = stmt.query + s.Key.QuerySummary = stmt.query + s.Stats.Count = stmt.count + s.Stats.ServiceLat = stmt.serviceLatency + + if stmt.addStatementStatistics { + insertStatementIntoSystemStmtStatsTable(t, db, s) + } + if stmt.addStatementActivity { + insertStatementIntoSystemStmtActivityTable(t, db, s) + } + } + + testCases := []struct { + testName string + start string + end string + count int + totalLatency float32 + sourceTable string + }{ + { + testName: "period with one statement on statement_statistics and none on statement_activity", + start: "1704103200", // January 1, 2024 10:00:00 + end: "1704104400", // January 1, 2024 10:20:00 + count: 1, + totalLatency: getTotalLatencyForStatements(stmts[0]), + sourceTable: "crdb_internal.statement_statistics_persisted", + }, + { + testName: "period with 2 statements on statement_statistics and none on statement_activity", + start: "1704103200", // January 1, 2024 10:00:00 + end: "1704109200", // January 1, 2024 11:40:00 + count: 2, + totalLatency: getTotalLatencyForStatements(stmts[0], stmts[1]), + sourceTable: "crdb_internal.statement_statistics_persisted", + }, + { + testName: "period with one statement on statement_activity", + start: "1704110400", // January 1, 2024 12:00:00 + end: "1704112800", // January 1, 2024 12:40:00 + count: 1, + totalLatency: getTotalLatencyForStatements(stmts[2]), + sourceTable: "crdb_internal.statement_activity", + }, + { + testName: "period with 2 statements on statement_activity", + start: "1704110400", // January 1, 2024 12:00:00 + end: "1704116400", // January 1, 2024 13:40:00 + count: 2, + totalLatency: getTotalLatencyForStatements(stmts[2], stmts[3]), + sourceTable: "crdb_internal.statement_activity", + }, + { + testName: "period with 2 statements on statement_activity and 4 statements on statement_statistics", + start: "1704103200", // January 1, 2024 10:00:00 + end: "1704116400", // January 1, 2024 13:40:00 + count: 4, + totalLatency: getTotalLatencyForStatements(stmts[0], stmts[1], stmts[2], stmts[3]), + sourceTable: "crdb_internal.statement_statistics_persisted", + }, + } + + for _, tc := range testCases { + var resp serverpb.StatementsResponse + endpoint := fmt.Sprintf("combinedstmts?start=%s&end=%s", tc.start, tc.end) + err := getStatusJSONProtoWithAdminAndTimeoutOption(ts, endpoint, &resp, true, additionalTimeout) + require.NoError(t, err, fmt.Sprintf("server error on %s", tc.testName)) + require.Equal(t, tc.sourceTable, resp.StmtsSourceTable, fmt.Sprintf("source table error on %s", tc.testName)) + require.Equal(t, tc.count, len(resp.Statements), fmt.Sprintf("stmts length error on %s", tc.testName)) + require.Equal(t, tc.totalLatency, resp.StmtsTotalRuntimeSecs, fmt.Sprintf("total latency on %s", tc.testName)) + } + +} + +func getTotalLatencyForStatements(statements ...testStatement) float32 { + total := float32(0) + for _, statement := range statements { + total = total + float32(statement.count)*float32(statement.serviceLatency.Mean) + } + return total +} + +func generateStatement() appstatspb.CollectedStatementStatistics { + return appstatspb.CollectedStatementStatistics{ + AggregatedTs: timeutil.FromUnixNanos(1704103200), // January 1, 2024 10:00:00 + ID: appstatspb.StmtFingerprintID(1088747230083123385), + AggregationInterval: time.Hour, + Key: appstatspb.StatementStatisticsKey{ + App: "test_app", + Database: "test_database", + DistSQL: true, + FullScan: true, + Failed: false, + ImplicitTxn: true, + PlanHash: uint64(200), + Query: "SELECT * FROM foo", + QuerySummary: "SELECT * FROM foo", + TransactionFingerprintID: appstatspb.TransactionFingerprintID(50), + Vec: true, + }, + + Stats: appstatspb.StatementStatistics{ + Count: 10, + Indexes: []string{"15@4"}, + LastErrorCode: "", + MaxRetries: 0, + Nodes: []int64{1}, + PlanGists: []string{"AgEeCADnAwIAAAMHEgUUIR4AAA=="}, + Regions: []string{"us-east1"}, + SQLType: "TypeDDL", + }, + } +} +func generateStatisticsColumn( + t *testing.T, statement appstatspb.CollectedStatementStatistics, +) string { + // Create execution Statistics JSON + executionStats := struct { + Cnt int `json:"cnt"` + ContentionTime appstatspb.NumericStat `json:"contentionTime"` + CpuSQLNanos appstatspb.NumericStat `json:"cpuSQLNanos"` + MaxDiskUsage appstatspb.NumericStat `json:"maxDiskUsage"` + MaxMemUsage appstatspb.NumericStat `json:"maxMemUsage"` + NetworkBytes appstatspb.NumericStat `json:"networkBytes"` + NetworkMsgs appstatspb.NumericStat `json:"networkMsgs"` + }{ + Cnt: 3, + ContentionTime: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + CpuSQLNanos: appstatspb.NumericStat{ + Mean: 5, + SquaredDiffs: 0, + }, + MaxDiskUsage: appstatspb.NumericStat{ + Mean: 10, + SquaredDiffs: 0, + }, + MaxMemUsage: appstatspb.NumericStat{ + Mean: 10, + SquaredDiffs: 0, + }, + NetworkBytes: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + NetworkMsgs: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + } + executionStatsBytes, err := json.Marshal(executionStats) + require.NoError(t, err) + + // Create stats JSON + stats := struct { + BytesRead appstatspb.NumericStat `json:"bytesRead"` + Cnt int64 `json:"cnt"` + FirstAttemptCnt int64 `json:"firstAttemptCnt"` + IdleLat appstatspb.NumericStat `json:"idleLat"` + Indexes []string `json:"indexes"` + LastErrorCode string `json:"lastErrorCode"` + LastExecAt time.Time `json:"lastExecAt"` + MaxRetries int `json:"maxRetries"` + Nodes []int64 `json:"nodes"` + NumRows appstatspb.NumericStat `json:"numRows"` + OvhLat appstatspb.NumericStat `json:"ovhLat"` + ParseLat appstatspb.NumericStat `json:"parseLat"` + PlanGists []string `json:"planGists"` + PlanLat appstatspb.NumericStat `json:"planLat"` + Regions []string `json:"regions"` + RowsRead appstatspb.NumericStat `json:"rowsRead"` + RowsWritten appstatspb.NumericStat `json:"rowsWritten"` + RunLat appstatspb.NumericStat `json:"runLat"` + SvcLat appstatspb.NumericStat `json:"svcLat"` + }{ + BytesRead: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + Cnt: statement.Stats.Count, + FirstAttemptCnt: statement.Stats.Count, + IdleLat: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + Indexes: statement.Stats.Indexes, + LastErrorCode: statement.Stats.LastErrorCode, + LastExecAt: statement.AggregatedTs.Add(time.Minute * 10), + MaxRetries: 0, + Nodes: statement.Stats.Nodes, + NumRows: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + OvhLat: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + ParseLat: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + PlanGists: statement.Stats.PlanGists, + PlanLat: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + Regions: statement.Stats.Regions, + RowsRead: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + RowsWritten: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + RunLat: appstatspb.NumericStat{ + Mean: 0, + SquaredDiffs: 0, + }, + SvcLat: statement.Stats.ServiceLat, + } + statsBytes, err := json.Marshal(stats) + require.NoError(t, err) + + return fmt.Sprintf("{\"execution_statistics\": %s, \"statistics\": %s"+ + ", \"index_recommendations\": []} ", string(executionStatsBytes), string(statsBytes)) +} + +func insertStatementIntoSystemStmtStatsTable( + t *testing.T, db *sqlutils.SQLRunner, statement appstatspb.CollectedStatementStatistics, +) { + // Create metadata JSON + metadata := struct { + Database string `json:"db"` + DistSQL bool `json:"distsql"` + Failed bool `json:"failed"` + FullScan bool `json:"fullScan"` + ImplicitTxn bool `json:"implicitTxn"` + Query string `json:"query"` + QuerySummary string `json:"querySummary"` + StmtType string `json:"stmtType"` + Vec bool `json:"vec"` + }{ + Database: statement.Key.Database, + DistSQL: statement.Key.DistSQL, + Failed: statement.Key.Failed, + FullScan: statement.Key.FullScan, + ImplicitTxn: statement.Key.ImplicitTxn, + Query: statement.Key.Query, + QuerySummary: statement.Key.QuerySummary, + StmtType: statement.Stats.SQLType, + Vec: statement.Key.Vec, + } + metadataBytes, err := json.Marshal(metadata) + require.NoError(t, err) + statistics := generateStatisticsColumn(t, statement) + + query := `INSERT INTO system.statement_statistics ( + aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + node_id, + agg_interval, + metadata, + statistics, + plan, + index_recommendations +) +VALUES ( + $1, + $2, + $3, + $4, + $5, + 1, + $6, + $7, + $8::JSONB, + '{"Children": [], "Name": ""}', + ARRAY['creation : CREATE INDEX t3_k_i_f ON t3(k, i, f)'] +)` + args := []interface{}{ + statement.AggregatedTs, + statement.ID, + statement.Key.TransactionFingerprintID, + statement.Key.PlanHash, + statement.Key.App, + statement.AggregationInterval, + string(metadataBytes), + statistics, + } + + db.Exec(t, query, args...) +} + +func insertStatementIntoSystemStmtActivityTable( + t *testing.T, db *sqlutils.SQLRunner, statement appstatspb.CollectedStatementStatistics, +) { + // Create metadata JSON + metadata := struct { + AppNames []string `json:"appNames"` + Database []string `json:"db"` + DistSQLCount int `json:"distSQLCount"` + FailedCount int `json:"failedCount"` + FullScanCount int `json:"fullScanCount"` + ImplicitTxn bool `json:"implicitTxn"` + Query string `json:"query"` + QuerySummary string `json:"querySummary"` + StmtType string `json:"stmtType"` + VecCount int `json:"vecCount"` + }{ + AppNames: []string{statement.Key.App}, + Database: []string{statement.Key.Database}, + DistSQLCount: 1, + FailedCount: 0, + FullScanCount: 1, + ImplicitTxn: statement.Key.ImplicitTxn, + Query: statement.Key.Query, + QuerySummary: statement.Key.QuerySummary, + StmtType: statement.Stats.SQLType, + VecCount: 1, + } + metadataBytes, err := json.Marshal(metadata) + require.NoError(t, err) + statistics := generateStatisticsColumn(t, statement) + + query := `INSERT INTO system.statement_activity ( + aggregated_ts, + fingerprint_id, + transaction_fingerprint_id, + plan_hash, + app_name, + agg_interval, + metadata, + statistics, + plan, + index_recommendations, + execution_count, + execution_total_seconds, + execution_total_cluster_seconds, + contention_time_avg_seconds, + cpu_sql_avg_nanos, + service_latency_avg_seconds, + service_latency_p99_seconds +) +VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8::JSONB, + '{"Children": [], "Name": ""}', + ARRAY['creation : CREATE INDEX t3_k_i_f ON t3(k, i, f)'], + $9, + $10, + $10, + 0, + 5, + 10, + 7 +)` + + args := []interface{}{ + statement.AggregatedTs, + statement.ID, + statement.Key.TransactionFingerprintID, + statement.Key.PlanHash, + statement.Key.App, + statement.AggregationInterval, + string(metadataBytes), + statistics, + statement.Stats.Count, + float64(statement.Stats.Count) * statement.Stats.ServiceLat.Mean, + } + db.Exec(t, query, args...) +} + func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)