diff --git a/pkg/server/application_api/sql_stats_test.go b/pkg/server/application_api/sql_stats_test.go index c26b55ad931e..4aca4c86703a 100644 --- a/pkg/server/application_api/sql_stats_test.go +++ b/pkg/server/application_api/sql_stats_test.go @@ -13,6 +13,7 @@ package application_api_test import ( "context" gosql "database/sql" + "encoding/json" "fmt" "net/url" "reflect" @@ -44,12 +45,19 @@ import ( "github.com/stretchr/testify/require" ) +// additionalTimeoutUnderStress is the additional timeout to use for the http +// client if under stress. +const additionalTimeoutUnderStress = 30 * time.Second + func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } var params base.TestServerArgs params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919. @@ -128,7 +136,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Hit query endpoint. var resp serverpb.StatementsResponse - if err := srvtestutils.GetStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { + if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil { t.Fatal(err) } @@ -383,8 +391,11 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -425,14 +436,14 @@ func TestStatusAPIStatements(t *testing.T) { // Test that non-admin without VIEWACTIVITY privileges cannot access. var resp serverpb.StatementsResponse - err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false) + err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout) if !testutils.IsError(err, "status: 403") { t.Fatalf("expected privilege error, got %v", err) } testPath := func(path string, expectedStmts []string) { // Hit query endpoint. - if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil { + if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil { t.Fatal(err) } @@ -497,8 +508,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = additionalTimeoutUnderStress + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -519,6 +533,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs) findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE statement = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';" + testAppName := "TestCombinedStatementsWithFullScans" firstServerProto := testCluster.Server(0).ApplicationLayer() sqlSB := testCluster.ServerConn(0) @@ -532,6 +547,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", apiconstants.TestingUserNameNoAdmin().Normalized())) + thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName)) type TestCases struct { stmt string @@ -542,7 +558,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { count int } - // These test statements are executed before any indexes are introduced. + // These test statements are executed before the index is introduced. statementsBeforeIndex := []TestCases{ {stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1}, {stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1}, @@ -566,22 +582,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { {stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2}, } - type StatementData struct { + type ExpectedStatementData struct { count int fullScan bool distSQL bool failed bool } - // Declare the map outside of the executeStatements function. - statementDataMap := make(map[string]StatementData) + // expectedStatementStatsMap maps the query response format to the associated + // expected statement statistics for verification. + expectedStatementStatsMap := make(map[string]ExpectedStatementData) + // Helper function to execute the statements and store the expected statement executeStatements := func(statements []TestCases) { - // For each statement in the test case, execute the statement and store the - // expected statement data in a map. + // Clear the map at the start of each execution batch. + expectedStatementStatsMap = make(map[string]ExpectedStatementData) for _, stmt := range statements { thirdServerSQL.Exec(t, stmt.stmt) - statementDataMap[stmt.respQuery] = StatementData{ + expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{ fullScan: stmt.fullScan, distSQL: stmt.distSQL, failed: stmt.failed, @@ -590,34 +608,54 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } } + // Helper function to convert a response into a JSON string representation. + responseToJSON := func(resp interface{}) string { + bytes, err := json.Marshal(resp) + if err != nil { + t.Fatal(err) + } + return string(bytes) + } + + // Helper function to verify the combined statement statistics response. verifyCombinedStmtStats := func() { - err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false) + err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout) require.NoError(t, err) + // actualResponseStatsMap maps the query response format to the actual + // statement statistics received from the server response. + actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics) for _, respStatement := range resp.Statements { - respQuery := respStatement.Key.KeyData.Query + // Skip failed statements: The test app may encounter transient 40001 + // errors that are automatically retried. Thus, we only consider + // statements that were that were successfully executed by the test app + // to avoid counting such failures. If a statement that we expect to be + // successful is not found in the response, the test will fail later. + if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed { + actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement + } + } + + for respQuery, expectedData := range expectedStatementStatsMap { + respStatement, exists := actualResponseStatsMap[respQuery] + require.True(t, exists, "Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp)) + actualCount := respStatement.Stats.Count actualFullScan := respStatement.Key.KeyData.FullScan actualDistSQL := respStatement.Key.KeyData.DistSQL actualFailed := respStatement.Key.KeyData.Failed - // If the response has a query that isn't in our map, it means that it's - // not part of our test, so we ignore it. - expectedData, ok := statementDataMap[respQuery] - if !ok { - continue - } - require.Equal(t, expectedData.fullScan, actualFullScan) - require.Equal(t, expectedData.distSQL, actualDistSQL) - require.Equal(t, expectedData.failed, actualFailed) - require.Equal(t, expectedData.count, int(actualCount)) + stmtJSONString := responseToJSON(respStatement) + + require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString) + require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString) } } - // Execute the queries that will be executed before the index is created. + // Execute and verify the queries that will be executed before the index is created. executeStatements(statementsBeforeIndex) - - // Test the statements that will be executed before the index is created. verifyCombinedStmtStats() // Execute the queries that will create the index. @@ -641,10 +679,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { return nil }, 3*time.Second) - // Execute the queries that will be executed after the index is created. + // Execute and verify the queries that will be executed after the index is created. executeStatements(statementsAfterIndex) - - // Test the statements that will be executed after the index is created. verifyCombinedStmtStats() } @@ -652,8 +688,8 @@ func TestStatusAPICombinedStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Resource-intensive test, times out under stress. + skip.UnderStressRace(t, "expensive tests") // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index d43f21547225..fe9e64d3fbf7 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -649,11 +649,23 @@ func collectCombinedStatements( tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - const expectedNumDatums = 6 + const expectedNumDatums = 11 const queryFormat = ` -SELECT * +SELECT + fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, app_name, max(aggregated_ts) AS aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, @@ -676,9 +688,21 @@ FROM (SELECT fingerprint_id, ie, // The statement activity table has aggregated metadata. ` -SELECT * +SELECT + fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount, + metadata ->> 'query' AS query, + metadata ->> 'querySummary' AS querySummary, + (SELECT string_agg(elem::text, ',') + FROM json_array_elements_text(metadata->'db') AS elem) AS databases, + statistics FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, app_name, max(aggregated_ts) AS aggregated_ts, crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata, @@ -768,28 +792,27 @@ FROM (SELECT fingerprint_id, app := string(tree.MustBeDString(row[2])) aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time - - // The metadata is aggregated across all the statements with the same fingerprint. - aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON - var aggregateMetadata appstatspb.AggregatedStatementMetadata - if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil { - return nil, srverrors.ServerError(ctx, err) - } + distSQLCount := int64(*row[4].(*tree.DInt)) + fullScanCount := int64(*row[5].(*tree.DInt)) + failedCount := int64(*row[6].(*tree.DInt)) + query := string(tree.MustBeDString(row[7])) + querySummary := string(tree.MustBeDString(row[8])) + databases := string(tree.MustBeDString(row[9])) metadata := appstatspb.CollectedStatementStatistics{ Key: appstatspb.StatementStatisticsKey{ App: app, - DistSQL: aggregateMetadata.DistSQLCount > 0, - FullScan: aggregateMetadata.FullScanCount > 0, - Failed: aggregateMetadata.FailedCount > 0, - Query: aggregateMetadata.Query, - QuerySummary: aggregateMetadata.QuerySummary, - Database: strings.Join(aggregateMetadata.Databases, ","), + DistSQL: distSQLCount > 0, + FullScan: fullScanCount > 0, + Failed: failedCount > 0, + Query: query, + QuerySummary: querySummary, + Database: databases, }, } var stats appstatspb.StatementStatistics - statsJSON := tree.MustBeDJSON(row[5]).JSON + statsJSON := tree.MustBeDJSON(row[10]).JSON if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil { return nil, srverrors.ServerError(ctx, err) } diff --git a/pkg/server/srvtestutils/testutils.go b/pkg/server/srvtestutils/testutils.go index 5beab3e79a39..5801088e9579 100644 --- a/pkg/server/srvtestutils/testutils.go +++ b/pkg/server/srvtestutils/testutils.go @@ -13,6 +13,7 @@ package srvtestutils import ( "encoding/json" "io" + "time" "github.com/cockroachdb/cockroach/pkg/server/apiconstants" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -85,6 +86,18 @@ func GetStatusJSONProtoWithAdminOption( return serverutils.GetJSONProtoWithAdminOption(ts, apiconstants.StatusPrefix+path, response, isAdmin) } +// GetStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but +// the caller can specify an additional timeout duration for the request. +func GetStatusJSONProtoWithAdminAndTimeoutOption( + ts serverutils.ApplicationLayerInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, apiconstants.StatusPrefix+path, response, isAdmin, additionalTimeout) +} + // PostStatusJSONProtoWithAdminOption performs a RPC-over-HTTP request to // the status endpoint and unmarshals the response into the specified // proto message. It allows the caller to control whether the request diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 91fe8f9755e0..a216916446c5 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -23,6 +23,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" @@ -361,6 +362,27 @@ func GetJSONProtoWithAdminOption( return httputil.GetJSON(httpClient, fullURL, response) } +// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but +// the caller can specify an additional timeout duration for the request. +func GetJSONProtoWithAdminAndTimeoutOption( + ts ApplicationLayerInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin, SingleTenantSession) + if err != nil { + return err + } + httpClient.Timeout += additionalTimeout + u := ts.AdminURL().String() + fullURL := u + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout) + return httputil.GetJSON(httpClient, fullURL, response) +} + // PostJSONProto uses the supplied client to POST the URL specified by the parameters // and unmarshals the result into response. func PostJSONProto(