From a6c12503c6166ce8dde1dfd9125e4d5f84812eca Mon Sep 17 00:00:00 2001 From: gtr Date: Wed, 23 Aug 2023 16:22:12 -0400 Subject: [PATCH] sqlstats: unskip tests hitting combinedstmts and stmts endpoints Fixes: #109184. Previously, tests which hit the `combinedstmts` and `statements` endpoints were skipped under stress because they would occaisonally fail. This commit unskips these tests and instead of unmarshalling the metadata JSON blob, the select query directly extracts the values needed from it. The http client's timeout is also increased by 30s. Release note: None --- pkg/server/combined_statement_stats.go | 61 ++++++--- pkg/server/status_test.go | 116 +++++++++++++----- pkg/testutils/serverutils/test_server_shim.go | 23 ++++ 3 files changed, 147 insertions(+), 53 deletions(-) diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index a095c02fb904..9cf849179ab3 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -647,11 +647,23 @@ func collectCombinedStatements( tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - const expectedNumDatums = 6 + const expectedNumDatums = 11 const queryFormat = ` -SELECT * +SELECT + fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, app_name, max(aggregated_ts) AS aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, @@ -674,9 +686,21 @@ FROM (SELECT fingerprint_id, ie, // The statement activity table has aggregated metadata. ` -SELECT * +SELECT + fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, app_name, max(aggregated_ts) AS aggregated_ts, crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata, @@ -766,28 +790,27 @@ FROM (SELECT fingerprint_id, app := string(tree.MustBeDString(row[2])) aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time - - // The metadata is aggregated across all the statements with the same fingerprint. - aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON - var aggregateMetadata appstatspb.AggregatedStatementMetadata - if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil { - return nil, serverError(ctx, err) - } + distSQLCount := int64(*row[4].(*tree.DInt)) + fullScanCount := int64(*row[5].(*tree.DInt)) + failedCount := int64(*row[6].(*tree.DInt)) + query := string(tree.MustBeDString(row[7])) + querySummary := string(tree.MustBeDString(row[8])) + databases := string(tree.MustBeDString(row[9])) metadata := appstatspb.CollectedStatementStatistics{ Key: appstatspb.StatementStatisticsKey{ App: app, - DistSQL: aggregateMetadata.DistSQLCount > 0, - FullScan: aggregateMetadata.FullScanCount > 0, - Failed: aggregateMetadata.FailedCount > 0, - Query: aggregateMetadata.Query, - QuerySummary: aggregateMetadata.QuerySummary, - Database: strings.Join(aggregateMetadata.Databases, ","), + DistSQL: distSQLCount > 0, + FullScan: fullScanCount > 0, + Failed: failedCount > 0, + Query: query, + QuerySummary: querySummary, + Database: databases, }, } var stats appstatspb.StatementStatistics - statsJSON := tree.MustBeDJSON(row[5]).JSON + statsJSON := tree.MustBeDJSON(row[10]).JSON if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil { return nil, serverError(ctx, err) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 1838a9879368..1f497cbcf398 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -15,6 +15,7 @@ import ( "context" gosql "database/sql" "encoding/hex" + "encoding/json" "fmt" "math" "net/url" @@ -78,6 +79,10 @@ import ( "golang.org/x/sync/errgroup" ) +// additionalTimeoutUnderStress is the additional timeout to use for the http +// client if under stress. +const additionalTimeoutUnderStress = 30 * time.Second + func getStatusJSONProto( ts serverutils.TestTenantInterface, path string, response protoutil.Message, ) error { @@ -96,6 +101,18 @@ func getStatusJSONProtoWithAdminOption( return serverutils.GetJSONProtoWithAdminOption(ts, statusPrefix+path, response, isAdmin) } +// getStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but +// the caller can specify an additional timeout duration for the request. +func getStatusJSONProtoWithAdminAndTimeoutOption( + ts serverutils.TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, statusPrefix+path, response, isAdmin, additionalTimeout) +} + func postStatusJSONProtoWithAdminOption( ts serverutils.TestTenantInterface, path string, @@ -1577,8 +1594,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } params, _ := tests.CreateTestServerParams() params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919. @@ -1657,7 +1677,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Hit query endpoint. var resp serverpb.StatementsResponse - if err := getStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil { t.Fatal(err) } @@ -1915,8 +1935,11 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -1958,14 +1981,14 @@ func TestStatusAPIStatements(t *testing.T) { // Test that non-admin without VIEWACTIVITY privileges cannot access. var resp serverpb.StatementsResponse - err := getStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout) if !testutils.IsError(err, "status: 403") { t.Fatalf("expected privilege error, got %v", err) } testPath := func(path string, expectedStmts []string) { // Hit query endpoint. - if err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil { + if err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil { t.Fatal(err) } @@ -2030,8 +2053,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -2053,6 +2079,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs) findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE statement = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';" + testAppName := "TestCombinedStatementsWithFullScans" firstServerProto := testCluster.Server(0) sqlSB := testCluster.ServerConn(0) @@ -2066,6 +2093,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", authenticatedUserNameNoAdmin().Normalized())) + thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName)) type TestCases struct { stmt string @@ -2076,7 +2104,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { count int } - // These test statements are executed before any indexes are introduced. + // These test statements are executed before the index is introduced. statementsBeforeIndex := []TestCases{ {stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1}, {stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1}, @@ -2100,22 +2128,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { {stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2}, } - type StatementData struct { + type ExpectedStatementData struct { count int fullScan bool distSQL bool failed bool } - // Declare the map outside of the executeStatements function. - statementDataMap := make(map[string]StatementData) + // expectedStatementStatsMap maps the query response format to the associated + // expected statement statistics for verification. + expectedStatementStatsMap := make(map[string]ExpectedStatementData) + // Helper function to execute the statements and store the expected statement executeStatements := func(statements []TestCases) { - // For each statement in the test case, execute the statement and store the - // expected statement data in a map. + // Clear the map at the start of each execution batch. + expectedStatementStatsMap = make(map[string]ExpectedStatementData) for _, stmt := range statements { thirdServerSQL.Exec(t, stmt.stmt) - statementDataMap[stmt.respQuery] = StatementData{ + expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{ fullScan: stmt.fullScan, distSQL: stmt.distSQL, failed: stmt.failed, @@ -2124,34 +2154,54 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } } + // Helper function to convert a response into a JSON string representation. + responseToJSON := func(resp interface{}) string { + bytes, err := json.Marshal(resp) + if err != nil { + t.Fatal(err) + } + return string(bytes) + } + + // Helper function to verify the combined statement statistics response. verifyCombinedStmtStats := func() { - err := getStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false) + err := getStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout) require.NoError(t, err) + // actualResponseStatsMap maps the query response format to the actual + // statement statistics received from the server response. + actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics) for _, respStatement := range resp.Statements { - respQuery := respStatement.Key.KeyData.Query + // Skip failed statements: The test app may encounter transient 40001 + // errors that are automatically retried. Thus, we only consider + // statements that were that were successfully executed by the test app + // to avoid counting such failures. If a statement that we expect to be + // successful is not found in the response, the test will fail later. + if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed { + actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement + } + } + + for respQuery, expectedData := range expectedStatementStatsMap { + respStatement, exists := actualResponseStatsMap[respQuery] + require.True(t, exists, "Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp)) + actualCount := respStatement.Stats.Count actualFullScan := respStatement.Key.KeyData.FullScan actualDistSQL := respStatement.Key.KeyData.DistSQL actualFailed := respStatement.Key.KeyData.Failed - // If the response has a query that isn't in our map, it means that it's - // not part of our test, so we ignore it. - expectedData, ok := statementDataMap[respQuery] - if !ok { - continue - } - require.Equal(t, expectedData.fullScan, actualFullScan) - require.Equal(t, expectedData.distSQL, actualDistSQL) - require.Equal(t, expectedData.failed, actualFailed) - require.Equal(t, expectedData.count, int(actualCount)) + stmtJSONString := responseToJSON(respStatement) + + require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString) } } - // Execute the queries that will be executed before the index is created. + // Execute and verify the queries that will be executed before the index is created. executeStatements(statementsBeforeIndex) - - // Test the statements that will be executed before the index is created. verifyCombinedStmtStats() // Execute the queries that will create the index. @@ -2175,10 +2225,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { return nil }, 3*time.Second) - // Execute the queries that will be executed after the index is created. + // Execute and verify the queries that will be executed after the index is created. executeStatements(statementsAfterIndex) - - // Test the statements that will be executed after the index is created. verifyCombinedStmtStats() } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 4b0587589247..bfb5af8b2b4a 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/url" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -477,6 +479,27 @@ func GetJSONProtoWithAdminOption( return httputil.GetJSON(httpClient, ts.AdminURL()+path, response) } +// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but +// the caller can specify an additional timeout duration for the request. +func GetJSONProtoWithAdminAndTimeoutOption( + ts TestTenantInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin, SingleTenantSession) + if err != nil { + return err + } + httpClient.Timeout += additionalTimeout + u := ts.AdminURL() + fullURL := u + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout) + return httputil.GetJSON(httpClient, fullURL, response) +} + // PostJSONProto uses the supplied client to POST the URL specified by the parameters // and unmarshals the result into response. func PostJSONProto(ts TestTenantInterface, path string, request, response protoutil.Message) error {