diff --git a/pkg/server/application_api/sql_stats_test.go b/pkg/server/application_api/sql_stats_test.go index cbf3a895cd6e..21cadd94f6b2 100644 --- a/pkg/server/application_api/sql_stats_test.go +++ b/pkg/server/application_api/sql_stats_test.go @@ -13,6 +13,7 @@ package application_api_test import ( "context" gosql "database/sql" + "encoding/json" "fmt" "net/url" "reflect" @@ -48,8 +49,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = 15 * time.Second + } var params base.TestServerArgs params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919. @@ -128,7 +132,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Hit query endpoint. var resp serverpb.StatementsResponse - if err := srvtestutils.GetStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { + if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil { t.Fatal(err) } @@ -383,8 +387,11 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = 15 * time.Second + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -425,7 +432,7 @@ func TestStatusAPIStatements(t *testing.T) { // Test that non-admin without VIEWACTIVITY privileges cannot access. var resp serverpb.StatementsResponse - err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false) + err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout) if !testutils.IsError(err, "status: 403") { t.Fatalf("expected privilege error, got %v", err) } @@ -497,8 +504,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = 15 * time.Second + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -591,7 +601,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { } verifyCombinedStmtStats := func() { - err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false) + err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout) require.NoError(t, err) for _, respStatement := range resp.Statements { @@ -607,10 +617,16 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { continue } - require.Equal(t, expectedData.fullScan, actualFullScan) - require.Equal(t, expectedData.distSQL, actualDistSQL) - require.Equal(t, expectedData.failed, actualFailed) - require.Equal(t, expectedData.count, int(actualCount)) + bytes, err := json.Marshal(respStatement) + if err != nil { + t.Fatal(err) + } + respString := (string(bytes)) + + require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", respString) + require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", respString) + require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", respString) + require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", respString) } } @@ -652,8 +668,11 @@ func TestStatusAPICombinedStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skip under stress until we extend the timeout for the http client. - skip.UnderStressWithIssue(t, 109184) + // Increase the timeout for the http client if under stress. + additionalTimeout := 0 * time.Second + if skip.Stress() { + additionalTimeout = 15 * time.Second + } // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) @@ -701,7 +720,7 @@ func TestStatusAPICombinedStatements(t *testing.T) { verifyStmts := func(path string, expectedStmts []string, hasTxns bool, t *testing.T) { // Hit query endpoint. - if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil { + if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil { t.Fatal(err) } diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index d43f21547225..84a8f9c6b0d8 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -649,20 +649,34 @@ func collectCombinedStatements( tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - const expectedNumDatums = 6 + const expectedNumDatums = 11 const queryFormat = ` -SELECT * -FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), - app_name, - max(aggregated_ts) AS aggregated_ts, - crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, - crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics - FROM %s %s - GROUP BY - fingerprint_id, - app_name) %s -%s` +WITH Aggregated AS ( + SELECT fingerprint_id, + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, + app_name, + max(aggregated_ts) AS aggregated_ts, + crdb_internal.merge_stats_metadata(array_agg(metadata)) AS merged_metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM %s %s + GROUP BY fingerprint_id, app_name +) + +SELECT fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + + COALESCE(CAST(merged_metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(merged_metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(merged_metadata -> 'failedCount' AS INT), 0) AS failedCount, + merged_metadata ->> 'query' AS query, + merged_metadata ->> 'querySummary' AS querySummary, + merged_metadata ->> 'db' AS databases, + + statistics + +FROM Aggregated %s %s` var it isql.Rows var err error @@ -676,18 +690,32 @@ FROM (SELECT fingerprint_id, ie, // The statement activity table has aggregated metadata. ` -SELECT * -FROM (SELECT fingerprint_id, - array_agg(distinct transaction_fingerprint_id), - app_name, - max(aggregated_ts) AS aggregated_ts, - crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata, - crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics - FROM %s %s - GROUP BY - fingerprint_id, - app_name) %s -%s`, +WITH Aggregated AS ( + SELECT fingerprint_id, + array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints, + app_name, + max(aggregated_ts) AS aggregated_ts, + crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS merged_metadata, + crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics + FROM %s %s + GROUP BY fingerprint_id, app_name +) + +SELECT fingerprint_id, + txn_fingerprints, + app_name, + aggregated_ts, + + COALESCE(CAST(merged_metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount, + COALESCE(CAST(merged_metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount, + COALESCE(CAST(merged_metadata -> 'failedCount' AS INT), 0) AS failedCount, + merged_metadata ->> 'query' AS query, + merged_metadata ->> 'querySummary' AS querySummary, + merged_metadata ->> 'db' AS databases, + + statistics + +FROM Aggregated %s %s`, "crdb_internal.statement_activity", "combined-stmts-activity-by-interval", whereClause, @@ -768,28 +796,26 @@ FROM (SELECT fingerprint_id, app := string(tree.MustBeDString(row[2])) aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time - - // The metadata is aggregated across all the statements with the same fingerprint. - aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON - var aggregateMetadata appstatspb.AggregatedStatementMetadata - if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil { - return nil, srverrors.ServerError(ctx, err) - } - + distSQLCount := int64(*row[4].(*tree.DInt)) + fullScanCount := int64(*row[5].(*tree.DInt)) + failedCount := int64(*row[6].(*tree.DInt)) + query := string(tree.MustBeDString(row[7])) + querySummary := string(tree.MustBeDString(row[8])) + databases := string(tree.MustBeDString(row[9])) metadata := appstatspb.CollectedStatementStatistics{ Key: appstatspb.StatementStatisticsKey{ App: app, - DistSQL: aggregateMetadata.DistSQLCount > 0, - FullScan: aggregateMetadata.FullScanCount > 0, - Failed: aggregateMetadata.FailedCount > 0, - Query: aggregateMetadata.Query, - QuerySummary: aggregateMetadata.QuerySummary, - Database: strings.Join(aggregateMetadata.Databases, ","), + DistSQL: distSQLCount > 0, + FullScan: fullScanCount > 0, + Failed: failedCount > 0, + Query: query, + QuerySummary: querySummary, + Database: databases, }, } var stats appstatspb.StatementStatistics - statsJSON := tree.MustBeDJSON(row[5]).JSON + statsJSON := tree.MustBeDJSON(row[10]).JSON if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil { return nil, srverrors.ServerError(ctx, err) } diff --git a/pkg/server/srvtestutils/testutils.go b/pkg/server/srvtestutils/testutils.go index 5beab3e79a39..5801088e9579 100644 --- a/pkg/server/srvtestutils/testutils.go +++ b/pkg/server/srvtestutils/testutils.go @@ -13,6 +13,7 @@ package srvtestutils import ( "encoding/json" "io" + "time" "github.com/cockroachdb/cockroach/pkg/server/apiconstants" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -85,6 +86,18 @@ func GetStatusJSONProtoWithAdminOption( return serverutils.GetJSONProtoWithAdminOption(ts, apiconstants.StatusPrefix+path, response, isAdmin) } +// GetStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but +// the caller can specify an additional timeout duration for the request. +func GetStatusJSONProtoWithAdminAndTimeoutOption( + ts serverutils.ApplicationLayerInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, apiconstants.StatusPrefix+path, response, isAdmin, additionalTimeout) +} + // PostStatusJSONProtoWithAdminOption performs a RPC-over-HTTP request to // the status endpoint and unmarshals the response into the specified // proto message. It allows the caller to control whether the request diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 6ab76fbc7653..04ef54ef051f 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -23,6 +23,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" @@ -332,6 +333,27 @@ func GetJSONProtoWithAdminOption( return httputil.GetJSON(httpClient, fullURL, response) } +// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but +// the caller can specify an additional timeout duration for the request. +func GetJSONProtoWithAdminAndTimeoutOption( + ts ApplicationLayerInterface, + path string, + response protoutil.Message, + isAdmin bool, + additionalTimeout time.Duration, +) error { + httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin, SingleTenantSession) + if err != nil { + return err + } + httpClient.Timeout += additionalTimeout + u := ts.AdminURL().String() + fullURL := u + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout) + return httputil.GetJSON(httpClient, fullURL, response) +} + // PostJSONProto uses the supplied client to POST the URL specified by the parameters // and unmarshals the result into response. func PostJSONProto(