Skip to content

Commit

Permalink
sqlstats: unskip tests hitting combinedstmts and stmts endpoints
Browse files Browse the repository at this point in the history
Fixes: #109184.

Previously, tests which hit the `combinedstmts` and `statements`
endpoints were skipped under stress because they would occaisonally
fail. This commit unskips these tests and instead of unmarshalling the
metadata JSON blob, the select query directly extracts the values needed
from it. The http client's timeout is also increased by 15s.

Release note: None
  • Loading branch information
gtr committed Aug 28, 2023
1 parent 3020929 commit fda4a9e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 56 deletions.
51 changes: 35 additions & 16 deletions pkg/server/application_api/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package application_api_test
import (
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -48,8 +49,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 15 * time.Second
}

var params base.TestServerArgs
params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919.
Expand Down Expand Up @@ -128,7 +132,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) {

// Hit query endpoint.
var resp serverpb.StatementsResponse
if err := srvtestutils.GetStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -383,8 +387,11 @@ func TestStatusAPIStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 15 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -425,7 +432,7 @@ func TestStatusAPIStatements(t *testing.T) {

// Test that non-admin without VIEWACTIVITY privileges cannot access.
var resp serverpb.StatementsResponse
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout)
if !testutils.IsError(err, "status: 403") {
t.Fatalf("expected privilege error, got %v", err)
}
Expand Down Expand Up @@ -497,8 +504,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 15 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -591,7 +601,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
}

verifyCombinedStmtStats := func() {
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout)
require.NoError(t, err)

for _, respStatement := range resp.Statements {
Expand All @@ -607,10 +617,16 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
continue
}

require.Equal(t, expectedData.fullScan, actualFullScan)
require.Equal(t, expectedData.distSQL, actualDistSQL)
require.Equal(t, expectedData.failed, actualFailed)
require.Equal(t, expectedData.count, int(actualCount))
bytes, err := json.Marshal(respStatement)
if err != nil {
t.Fatal(err)
}
respString := (string(bytes))

require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", respString)
require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", respString)
require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", respString)
require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", respString)
}
}

Expand Down Expand Up @@ -652,8 +668,11 @@ func TestStatusAPICombinedStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 15 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -701,7 +720,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {

verifyStmts := func(path string, expectedStmts []string, hasTxns bool, t *testing.T) {
// Hit query endpoint.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down
106 changes: 66 additions & 40 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,20 +649,34 @@ func collectCombinedStatements(
tableSuffix string,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
aostClause := testingKnobs.GetAOSTClause()
const expectedNumDatums = 6
const expectedNumDatums = 11
const queryFormat = `
SELECT *
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM %s %s
GROUP BY
fingerprint_id,
app_name) %s
%s`
WITH Aggregated AS (
SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS merged_metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM %s %s
GROUP BY fingerprint_id, app_name
)
SELECT fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(merged_metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(merged_metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(merged_metadata -> 'failedCount' AS INT), 0) AS failedCount,
merged_metadata ->> 'query' AS query,
merged_metadata ->> 'querySummary' AS querySummary,
merged_metadata ->> 'db' AS databases,
statistics
FROM Aggregated %s %s`

var it isql.Rows
var err error
Expand All @@ -676,18 +690,32 @@ FROM (SELECT fingerprint_id,
ie,
// The statement activity table has aggregated metadata.
`
SELECT *
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM %s %s
GROUP BY
fingerprint_id,
app_name) %s
%s`,
WITH Aggregated AS (
SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS merged_metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM %s %s
GROUP BY fingerprint_id, app_name
)
SELECT fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(merged_metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(merged_metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(merged_metadata -> 'failedCount' AS INT), 0) AS failedCount,
merged_metadata ->> 'query' AS query,
merged_metadata ->> 'querySummary' AS querySummary,
merged_metadata ->> 'db' AS databases,
statistics
FROM Aggregated %s %s`,
"crdb_internal.statement_activity",
"combined-stmts-activity-by-interval",
whereClause,
Expand Down Expand Up @@ -768,28 +796,26 @@ FROM (SELECT fingerprint_id,
app := string(tree.MustBeDString(row[2]))

aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time

// The metadata is aggregated across all the statements with the same fingerprint.
aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON
var aggregateMetadata appstatspb.AggregatedStatementMetadata
if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil {
return nil, srverrors.ServerError(ctx, err)
}

distSQLCount := int64(*row[4].(*tree.DInt))
fullScanCount := int64(*row[5].(*tree.DInt))
failedCount := int64(*row[6].(*tree.DInt))
query := string(tree.MustBeDString(row[7]))
querySummary := string(tree.MustBeDString(row[8]))
databases := string(tree.MustBeDString(row[9]))
metadata := appstatspb.CollectedStatementStatistics{
Key: appstatspb.StatementStatisticsKey{
App: app,
DistSQL: aggregateMetadata.DistSQLCount > 0,
FullScan: aggregateMetadata.FullScanCount > 0,
Failed: aggregateMetadata.FailedCount > 0,
Query: aggregateMetadata.Query,
QuerySummary: aggregateMetadata.QuerySummary,
Database: strings.Join(aggregateMetadata.Databases, ","),
DistSQL: distSQLCount > 0,
FullScan: fullScanCount > 0,
Failed: failedCount > 0,
Query: query,
QuerySummary: querySummary,
Database: databases,
},
}

var stats appstatspb.StatementStatistics
statsJSON := tree.MustBeDJSON(row[5]).JSON
statsJSON := tree.MustBeDJSON(row[10]).JSON
if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/srvtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package srvtestutils
import (
"encoding/json"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/server/apiconstants"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -85,6 +86,18 @@ func GetStatusJSONProtoWithAdminOption(
return serverutils.GetJSONProtoWithAdminOption(ts, apiconstants.StatusPrefix+path, response, isAdmin)
}

// GetStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but
// the caller can specify an additional timeout duration for the request.
func GetStatusJSONProtoWithAdminAndTimeoutOption(
ts serverutils.ApplicationLayerInterface,
path string,
response protoutil.Message,
isAdmin bool,
additionalTimeout time.Duration,
) error {
return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, apiconstants.StatusPrefix+path, response, isAdmin, additionalTimeout)
}

// PostStatusJSONProtoWithAdminOption performs a RPC-over-HTTP request to
// the status endpoint and unmarshals the response into the specified
// proto message. It allows the caller to control whether the request
Expand Down
22 changes: 22 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -332,6 +333,27 @@ func GetJSONProtoWithAdminOption(
return httputil.GetJSON(httpClient, fullURL, response)
}

// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but
// the caller can specify an additional timeout duration for the request.
func GetJSONProtoWithAdminAndTimeoutOption(
ts ApplicationLayerInterface,
path string,
response protoutil.Message,
isAdmin bool,
additionalTimeout time.Duration,
) error {
httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin, SingleTenantSession)
if err != nil {
return err
}
httpClient.Timeout += additionalTimeout
u := ts.AdminURL().String()
fullURL := u + path
log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL)
log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout)
return httputil.GetJSON(httpClient, fullURL, response)
}

// PostJSONProto uses the supplied client to POST the URL specified by the parameters
// and unmarshals the result into response.
func PostJSONProto(
Expand Down

0 comments on commit fda4a9e

Please sign in to comment.