From 86524f283fe22ba44795adaead0cbe13cc152ee5 Mon Sep 17 00:00:00 2001 From: gtr Date: Wed, 23 Aug 2023 16:22:12 -0400 Subject: [PATCH] sqlstats: unskip tests hitting combinedstmts and stmts endpoints Fixes: #109184. Previously, tests which hit the `combinedstmts` and `statements` endpoints were skipped under stress because they would occaisonally fail. This commit unskips these tests and instead of unmarshalling the metadata JSON blob, the select query directly extracts the values needed from it. The http client's timeout is also increased by 30s. Release note: None --- pkg/server/combined_statement_stats.go | 63 ++++++---- pkg/server/status_test.go | 116 +++++++++++++----- pkg/testutils/serverutils/test_server_shim.go | 22 ++++ 3 files changed, 140 insertions(+), 61 deletions(-) diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index 65ce69737475..08356039df3a 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -408,21 +408,31 @@ func collectCombinedStatements( ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - const expectedNumDatums = 6 - queryFormat := ` -SELECT * FROM ( -SELECT + const expectedNumDatums = 11 + const queryFormat = ` +SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + txn_fingerprints, app_name, - max(aggregated_ts) as aggregated_ts, - crdb_internal.merge_stats_metadata(array_agg(metadata)) as metadata, - crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics -FROM %s %s -GROUP BY - fingerprint_id, - app_name -) %s + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics +FROM (SELECT fingerprint_id, + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, + app_name, + max(aggregated_ts) AS aggregated_ts, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM %s %s + GROUP BY + fingerprint_id, + app_name) %s %s` query := fmt.Sprintf( @@ -492,28 +502,27 @@ GROUP BY app := string(tree.MustBeDString(row[2])) aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time - - // The metadata is aggregated across all the statements with the same fingerprint. - aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON - var aggregateMetadata roachpb.AggregatedStatementMetadata - if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil { - return nil, serverError(ctx, err) - } + distSQLCount := int64(*row[4].(*tree.DInt)) + fullScanCount := int64(*row[5].(*tree.DInt)) + failedCount := int64(*row[6].(*tree.DInt)) + query := string(tree.MustBeDString(row[7])) + querySummary := string(tree.MustBeDString(row[8])) + databases := string(tree.MustBeDString(row[9])) metadata := roachpb.CollectedStatementStatistics{ Key: roachpb.StatementStatisticsKey{ App: app, - DistSQL: aggregateMetadata.DistSQLCount > 0, - FullScan: aggregateMetadata.FullScanCount > 0, - Failed: aggregateMetadata.FailedCount > 0, - Query: aggregateMetadata.Query, - QuerySummary: aggregateMetadata.QuerySummary, - Database: strings.Join(aggregateMetadata.Databases, ","), + DistSQL: distSQLCount > 0, + FullScan: fullScanCount > 0, + Failed: failedCount > 0, + Query: query, + QuerySummary: querySummary, + Database: databases, }, } var stats roachpb.StatementStatistics - statsJSON := tree.MustBeDJSON(row[5]).JSON + statsJSON := tree.MustBeDJSON(row[10]).JSON if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil { return nil, serverError(ctx, err) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 518fcea416ee..66a99982408d 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -15,6 +15,7 @@ import ( "context" gosql "database/sql" "encoding/hex" + "encoding/json" "fmt" "math" "net/url" @@ -72,6 +73,10 @@ import ( "github.com/stretchr/testify/require" ) +// additionalTimeoutUnderStress is the additional timeout to use for the http +// client if under stress. +const additionalTimeoutUnderStress = 30 * time.Second + func getStatusJSONProto( ts serverutils.TestServerInterface, path string, response protoutil.Message, ) error { @@ -84,6 +89,18 @@ func postStatusJSONProto( return serverutils.PostJSONProto(ts, statusPrefix+path, request, response) } +// getStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but +// the caller can specify an additional timeout duration for the request. +func getStatusJSONProtoWithAdminAndTimeoutOption( + ts serverutils.TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, statusPrefix+path, response, isAdmin, additionalTimeout) +} + func getStatusJSONProtoWithAdminOption( ts serverutils.TestServerInterface, path string, response protoutil.Message, isAdmin bool, ) error { @@ -1415,8 +1432,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } params, _ := tests.CreateTestServerParams() params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919. @@ -1495,7 +1515,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Hit query endpoint. var resp serverpb.StatementsResponse - if err := getStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil { t.Fatal(err) } @@ -1753,8 +1773,11 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -1796,14 +1819,14 @@ func TestStatusAPIStatements(t *testing.T) { // Test that non-admin without VIEWACTIVITY privileges cannot access. var resp serverpb.StatementsResponse - err := getStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout) if !testutils.IsError(err, "status: 403") { t.Fatalf("expected privilege error, got %v", err) } testPath := func(path string, expectedStmts []string) { // Hit query endpoint. - if err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil { t.Fatal(err) } @@ -1868,8 +1891,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -1891,6 +1917,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs) findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE description = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';" + testAppName := "TestCombinedStatementsWithFullScans" firstServerProto := testCluster.Server(0) sqlSB := testCluster.ServerConn(0) @@ -1904,6 +1931,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", authenticatedUserNameNoAdmin().Normalized())) + thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName)) type TestCases struct { stmt string @@ -1914,7 +1942,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { count int } - // These test statements are executed before any indexes are introduced. + // These test statements are executed before the index is introduced. statementsBeforeIndex := []TestCases{ {stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1}, {stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1}, @@ -1938,22 +1966,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { {stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2}, } - type StatementData struct { + type ExpectedStatementData struct { count int fullScan bool distSQL bool failed bool } - // Declare the map outside of the executeStatements function. - statementDataMap := make(map[string]StatementData) + // expectedStatementStatsMap maps the query response format to the associated + // expected statement statistics for verification. + expectedStatementStatsMap := make(map[string]ExpectedStatementData) + // Helper function to execute the statements and store the expected statement executeStatements := func(statements []TestCases) { - // For each statement in the test case, execute the statement and store the - // expected statement data in a map. + // Clear the map at the start of each execution batch. + expectedStatementStatsMap = make(map[string]ExpectedStatementData) for _, stmt := range statements { thirdServerSQL.Exec(t, stmt.stmt) - statementDataMap[stmt.respQuery] = StatementData{ + expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{ fullScan: stmt.fullScan, distSQL: stmt.distSQL, failed: stmt.failed, @@ -1962,34 +1992,54 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } } + // Helper function to convert a response into a JSON string representation. + responseToJSON := func(resp interface{}) string { + bytes, err := json.Marshal(resp) + if err != nil { + t.Fatal(err) + } + return string(bytes) + } + + // Helper function to verify the combined statement statistics response. verifyCombinedStmtStats := func() { - err := getStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout) require.NoError(t, err) + // actualResponseStatsMap maps the query response format to the actual + // statement statistics received from the server response. + actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics) for _, respStatement := range resp.Statements { - respQuery := respStatement.Key.KeyData.Query + // Skip failed statements: The test app may encounter transient 40001 + // errors that are automatically retried. Thus, we only consider + // statements that were that were successfully executed by the test app + // to avoid counting such failures. If a statement that we expect to be + // successful is not found in the response, the test will fail later. + if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed { + actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement + } + } + + for respQuery, expectedData := range expectedStatementStatsMap { + respStatement, exists := actualResponseStatsMap[respQuery] + require.True(t, exists, "Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp)) + actualCount := respStatement.Stats.Count actualFullScan := respStatement.Key.KeyData.FullScan actualDistSQL := respStatement.Key.KeyData.DistSQL actualFailed := respStatement.Key.KeyData.Failed - // If the response has a query that isn't in our map, it means that it's - // not part of our test, so we ignore it. - expectedData, ok := statementDataMap[respQuery] - if !ok { - continue - } - require.Equal(t, expectedData.fullScan, actualFullScan) - require.Equal(t, expectedData.distSQL, actualDistSQL) - require.Equal(t, expectedData.failed, actualFailed) - require.Equal(t, expectedData.count, int(actualCount)) + stmtJSONString := responseToJSON(respStatement) + + require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString) } } - // Execute the queries that will be executed before the index is created. + // Execute and verify the queries that will be executed before the index is created. executeStatements(statementsBeforeIndex) - - // Test the statements that will be executed before the index is created. verifyCombinedStmtStats() // Execute the queries that will create the index. @@ -2013,10 +2063,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { return nil }, 3*time.Second) - // Execute the queries that will be executed after the index is created. + // Execute and verify the queries that will be executed after the index is created. executeStatements(statementsAfterIndex) - - // Test the statements that will be executed after the index is created. verifyCombinedStmtStats() } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index a18693b49f47..7edc71c44cab 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -25,6 +25,7 @@ import ( "net/url" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" @@ -456,6 +457,27 @@ func GetJSONProtoWithAdminOption( return httputil.GetJSON(httpClient, ts.AdminURL()+path, response) } +// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but +// the caller can specify an additional timeout duration for the request. +func GetJSONProtoWithAdminAndTimeoutOption( + ts TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin) + if err != nil { + return err + } + httpClient.Timeout += additionalTimeout + u := ts.AdminURL() + fullURL := u + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout) + return httputil.GetJSON(httpClient, fullURL, response) +} + // PostJSONProto uses the supplied client to POST the URL specified by the parameters // and unmarshals the result into response. func PostJSONProto(ts TestServerInterface, path string, request, response protoutil.Message) error {