diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index 65ce69737475..08356039df3a 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -408,21 +408,31 @@ func collectCombinedStatements( ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - const expectedNumDatums = 6 - queryFormat := ` -SELECT * FROM ( -SELECT + const expectedNumDatums = 11 + const queryFormat = ` +SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + txn_fingerprints, app_name, - max(aggregated_ts) as aggregated_ts, - crdb_internal.merge_stats_metadata(array_agg(metadata)) as metadata, - crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics -FROM %s %s -GROUP BY - fingerprint_id, - app_name -) %s + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics +FROM (SELECT fingerprint_id, + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, + app_name, + max(aggregated_ts) AS aggregated_ts, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM %s %s + GROUP BY + fingerprint_id, + app_name) %s %s` query := fmt.Sprintf( @@ -492,28 +502,27 @@ GROUP BY app := string(tree.MustBeDString(row[2])) aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time - - // The metadata is aggregated across all the statements with the same fingerprint. - aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON - var aggregateMetadata roachpb.AggregatedStatementMetadata - if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil { - return nil, serverError(ctx, err) - } + distSQLCount := int64(*row[4].(*tree.DInt)) + fullScanCount := int64(*row[5].(*tree.DInt)) + failedCount := int64(*row[6].(*tree.DInt)) + query := string(tree.MustBeDString(row[7])) + querySummary := string(tree.MustBeDString(row[8])) + databases := string(tree.MustBeDString(row[9])) metadata := roachpb.CollectedStatementStatistics{ Key: roachpb.StatementStatisticsKey{ App: app, - DistSQL: aggregateMetadata.DistSQLCount > 0, - FullScan: aggregateMetadata.FullScanCount > 0, - Failed: aggregateMetadata.FailedCount > 0, - Query: aggregateMetadata.Query, - QuerySummary: aggregateMetadata.QuerySummary, - Database: strings.Join(aggregateMetadata.Databases, ","), + DistSQL: distSQLCount > 0, + FullScan: fullScanCount > 0, + Failed: failedCount > 0, + Query: query, + QuerySummary: querySummary, + Database: databases, }, } var stats roachpb.StatementStatistics - statsJSON := tree.MustBeDJSON(row[5]).JSON + statsJSON := tree.MustBeDJSON(row[10]).JSON if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil { return nil, serverError(ctx, err) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 518fcea416ee..66a99982408d 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -15,6 +15,7 @@ import ( "context" gosql "database/sql" "encoding/hex" + "encoding/json" "fmt" "math" "net/url" @@ -72,6 +73,10 @@ import ( "github.com/stretchr/testify/require" ) +// additionalTimeoutUnderStress is the additional timeout to use for the http +// client if under stress. +const additionalTimeoutUnderStress = 30 * time.Second + func getStatusJSONProto( ts serverutils.TestServerInterface, path string, response protoutil.Message, ) error { @@ -84,6 +89,18 @@ func postStatusJSONProto( return serverutils.PostJSONProto(ts, statusPrefix+path, request, response) } +// getStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but +// the caller can specify an additional timeout duration for the request. +func getStatusJSONProtoWithAdminAndTimeoutOption( + ts serverutils.TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, statusPrefix+path, response, isAdmin, additionalTimeout) +} + func getStatusJSONProtoWithAdminOption( ts serverutils.TestServerInterface, path string, response protoutil.Message, isAdmin bool, ) error { @@ -1415,8 +1432,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } params, _ := tests.CreateTestServerParams() params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919. @@ -1495,7 +1515,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Hit query endpoint. var resp serverpb.StatementsResponse - if err := getStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil { t.Fatal(err) } @@ -1753,8 +1773,11 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -1796,14 +1819,14 @@ func TestStatusAPIStatements(t *testing.T) { // Test that non-admin without VIEWACTIVITY privileges cannot access. var resp serverpb.StatementsResponse - err := getStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout) if !testutils.IsError(err, "status: 403") { t.Fatalf("expected privilege error, got %v", err) } testPath := func(path string, expectedStmts []string) { // Hit query endpoint. - if err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil { t.Fatal(err) } @@ -1868,8 +1891,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -1891,6 +1917,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs) findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE description = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';" + testAppName := "TestCombinedStatementsWithFullScans" firstServerProto := testCluster.Server(0) sqlSB := testCluster.ServerConn(0) @@ -1904,6 +1931,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", authenticatedUserNameNoAdmin().Normalized())) + thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName)) type TestCases struct { stmt string @@ -1914,7 +1942,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { count int } - // These test statements are executed before any indexes are introduced. + // These test statements are executed before the index is introduced. statementsBeforeIndex := []TestCases{ {stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1}, {stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1}, @@ -1938,22 +1966,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { {stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2}, } - type StatementData struct { + type ExpectedStatementData struct { count int fullScan bool distSQL bool failed bool } - // Declare the map outside of the executeStatements function. - statementDataMap := make(map[string]StatementData) + // expectedStatementStatsMap maps the query response format to the associated + // expected statement statistics for verification. + expectedStatementStatsMap := make(map[string]ExpectedStatementData) + // Helper function to execute the statements and store the expected statement executeStatements := func(statements []TestCases) { - // For each statement in the test case, execute the statement and store the - // expected statement data in a map. + // Clear the map at the start of each execution batch. + expectedStatementStatsMap = make(map[string]ExpectedStatementData) for _, stmt := range statements { thirdServerSQL.Exec(t, stmt.stmt) - statementDataMap[stmt.respQuery] = StatementData{ + expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{ fullScan: stmt.fullScan, distSQL: stmt.distSQL, failed: stmt.failed, @@ -1962,34 +1992,54 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } } + // Helper function to convert a response into a JSON string representation. + responseToJSON := func(resp interface{}) string { + bytes, err := json.Marshal(resp) + if err != nil { + t.Fatal(err) + } + return string(bytes) + } + + // Helper function to verify the combined statement statistics response. verifyCombinedStmtStats := func() { - err := getStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout) require.NoError(t, err) + // actualResponseStatsMap maps the query response format to the actual + // statement statistics received from the server response. + actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics) for _, respStatement := range resp.Statements { - respQuery := respStatement.Key.KeyData.Query + // Skip failed statements: The test app may encounter transient 40001 + // errors that are automatically retried. Thus, we only consider + // statements that were that were successfully executed by the test app + // to avoid counting such failures. If a statement that we expect to be + // successful is not found in the response, the test will fail later. + if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed { + actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement + } + } + + for respQuery, expectedData := range expectedStatementStatsMap { + respStatement, exists := actualResponseStatsMap[respQuery] + require.True(t, exists, "Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp)) + actualCount := respStatement.Stats.Count actualFullScan := respStatement.Key.KeyData.FullScan actualDistSQL := respStatement.Key.KeyData.DistSQL actualFailed := respStatement.Key.KeyData.Failed - // If the response has a query that isn't in our map, it means that it's - // not part of our test, so we ignore it. - expectedData, ok := statementDataMap[respQuery] - if !ok { - continue - } - require.Equal(t, expectedData.fullScan, actualFullScan) - require.Equal(t, expectedData.distSQL, actualDistSQL) - require.Equal(t, expectedData.failed, actualFailed) - require.Equal(t, expectedData.count, int(actualCount)) + stmtJSONString := responseToJSON(respStatement) + + require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString) } } - // Execute the queries that will be executed before the index is created. + // Execute and verify the queries that will be executed before the index is created. executeStatements(statementsBeforeIndex) - - // Test the statements that will be executed before the index is created. verifyCombinedStmtStats() // Execute the queries that will create the index. @@ -2013,10 +2063,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { return nil }, 3*time.Second) - // Execute the queries that will be executed after the index is created. + // Execute and verify the queries that will be executed after the index is created. executeStatements(statementsAfterIndex) - - // Test the statements that will be executed after the index is created. verifyCombinedStmtStats() } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index a18693b49f47..7edc71c44cab 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -25,6 +25,7 @@ import ( "net/url" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" @@ -456,6 +457,27 @@ func GetJSONProtoWithAdminOption( return httputil.GetJSON(httpClient, ts.AdminURL()+path, response) } +// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but +// the caller can specify an additional timeout duration for the request. +func GetJSONProtoWithAdminAndTimeoutOption( + ts TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin) + if err != nil { + return err + } + httpClient.Timeout += additionalTimeout + u := ts.AdminURL() + fullURL := u + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout) + return httputil.GetJSON(httpClient, fullURL, response) +} + // PostJSONProto uses the supplied client to POST the URL specified by the parameters // and unmarshals the result into response. func PostJSONProto(ts TestServerInterface, path string, request, response protoutil.Message) error {