Skip to content

Commit

Permalink
sqlstats: unskip tests hitting combinedstmts and stmts endpoints
Browse files Browse the repository at this point in the history
Fixes: #109184.

Previously, tests which hit the `combinedstmts` and `statements`
endpoints were skipped under stress because they would occaisonally
fail. This commit unskips these tests and instead of unmarshalling the
metadata JSON blob, the select query directly extracts the values needed
from it. The http client's timeout is also increased by 30s.

Release note: None
  • Loading branch information
gtr committed Aug 28, 2023
1 parent 3020929 commit b62647f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 59 deletions.
111 changes: 72 additions & 39 deletions pkg/server/application_api/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package application_api_test
import (
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -48,8 +49,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

var params base.TestServerArgs
params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919.
Expand Down Expand Up @@ -128,7 +132,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) {

// Hit query endpoint.
var resp serverpb.StatementsResponse
if err := srvtestutils.GetStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -383,8 +387,11 @@ func TestStatusAPIStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -425,14 +432,14 @@ func TestStatusAPIStatements(t *testing.T) {

// Test that non-admin without VIEWACTIVITY privileges cannot access.
var resp serverpb.StatementsResponse
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout)
if !testutils.IsError(err, "status: 403") {
t.Fatalf("expected privilege error, got %v", err)
}

testPath := func(path string, expectedStmts []string) {
// Hit query endpoint.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -497,8 +504,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand All @@ -519,6 +529,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {

endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs)
findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE statement = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';"
testAppName := "TestCombinedStatementsWithFullScans"

firstServerProto := testCluster.Server(0)
sqlSB := testCluster.ServerConn(0)
Expand All @@ -532,6 +543,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
}

thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", apiconstants.TestingUserNameNoAdmin().Normalized()))
thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName))

type TestCases struct {
stmt string
Expand All @@ -542,7 +554,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
count int
}

// These test statements are executed before any indexes are introduced.
// These test statements are executed before the index is introduced.
statementsBeforeIndex := []TestCases{
{stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1},
{stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1},
Expand All @@ -566,22 +578,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
{stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2},
}

type StatementData struct {
type ExpectedStatementData struct {
count int
fullScan bool
distSQL bool
failed bool
}

// Declare the map outside of the executeStatements function.
statementDataMap := make(map[string]StatementData)

// expectedStatementStatsMap maps the query response format to the associated
// expected statement statistics for verification.
expectedStatementStatsMap := make(map[string]ExpectedStatementData)

// Helper function to execute the statements and store the expected statement
executeStatements := func(statements []TestCases) {
// For each statement in the test case, execute the statement and store the
// expected statement data in a map.
// Clear the map at the start of each execution batch.
expectedStatementStatsMap = make(map[string]ExpectedStatementData)
for _, stmt := range statements {
thirdServerSQL.Exec(t, stmt.stmt)
statementDataMap[stmt.respQuery] = StatementData{
expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{
fullScan: stmt.fullScan,
distSQL: stmt.distSQL,
failed: stmt.failed,
Expand All @@ -590,34 +604,52 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
}
}

// Helper function to convert a response into a JSON string representation.
responseToJSON := func(resp interface{}) string {
bytes, err := json.Marshal(resp)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}

// Helper function to verify the combined statement statistics response.
verifyCombinedStmtStats := func() {
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout)
require.NoError(t, err)

for _, respStatement := range resp.Statements {
respQuery := respStatement.Key.KeyData.Query
// actualResponseStatsMap maps the query response format to the actual
// statement statistics received from the server response.
actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics)
for _, respStatement := range resp.Statements {
// Only consider statements that were successfully executed by the test app.
if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed {
actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement
}
}

for respQuery, expectedData := range expectedStatementStatsMap {
respStatement, exists := actualResponseStatsMap[respQuery]
if !exists {
t.Fatalf("Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp))
}

actualCount := respStatement.Stats.Count
actualFullScan := respStatement.Key.KeyData.FullScan
actualDistSQL := respStatement.Key.KeyData.DistSQL
actualFailed := respStatement.Key.KeyData.Failed
// If the response has a query that isn't in our map, it means that it's
// not part of our test, so we ignore it.
expectedData, ok := statementDataMap[respQuery]
if !ok {
continue
}

require.Equal(t, expectedData.fullScan, actualFullScan)
require.Equal(t, expectedData.distSQL, actualDistSQL)
require.Equal(t, expectedData.failed, actualFailed)
require.Equal(t, expectedData.count, int(actualCount))
stmtJSONString := responseToJSON(respStatement)

require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString)
}
}

// Execute the queries that will be executed before the index is created.
// Execute and verify the queries that will be executed before the index is created.
executeStatements(statementsBeforeIndex)

// Test the statements that will be executed before the index is created.
verifyCombinedStmtStats()

// Execute the queries that will create the index.
Expand All @@ -641,19 +673,20 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
return nil
}, 3*time.Second)

// Execute the queries that will be executed after the index is created.
// Execute and verify the queries that will be executed after the index is created.
executeStatements(statementsAfterIndex)

// Test the statements that will be executed after the index is created.
verifyCombinedStmtStats()
}

func TestStatusAPICombinedStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -701,7 +734,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {

verifyStmts := func(path string, expectedStmts []string, hasTxns bool, t *testing.T) {
// Hit query endpoint.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down
60 changes: 40 additions & 20 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,22 @@ func collectCombinedStatements(
tableSuffix string,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
aostClause := testingKnobs.GetAOSTClause()
const expectedNumDatums = 6
const expectedNumDatums = 11
const queryFormat = `
SELECT *
SELECT
fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount,
metadata ->> 'query' AS query,
metadata ->> 'querySummary' AS querySummary,
metadata ->> 'db' AS databases,
statistics
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
Expand All @@ -676,9 +687,20 @@ FROM (SELECT fingerprint_id,
ie,
// The statement activity table has aggregated metadata.
`
SELECT *
SELECT
fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount,
metadata ->> 'query' AS query,
metadata ->> 'querySummary' AS querySummary,
metadata ->> 'db' AS databases,
statistics
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata,
Expand Down Expand Up @@ -768,28 +790,26 @@ FROM (SELECT fingerprint_id,
app := string(tree.MustBeDString(row[2]))

aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time

// The metadata is aggregated across all the statements with the same fingerprint.
aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON
var aggregateMetadata appstatspb.AggregatedStatementMetadata
if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil {
return nil, srverrors.ServerError(ctx, err)
}

distSQLCount := int64(*row[4].(*tree.DInt))
fullScanCount := int64(*row[5].(*tree.DInt))
failedCount := int64(*row[6].(*tree.DInt))
query := string(tree.MustBeDString(row[7]))
querySummary := string(tree.MustBeDString(row[8]))
databases := string(tree.MustBeDString(row[9]))
metadata := appstatspb.CollectedStatementStatistics{
Key: appstatspb.StatementStatisticsKey{
App: app,
DistSQL: aggregateMetadata.DistSQLCount > 0,
FullScan: aggregateMetadata.FullScanCount > 0,
Failed: aggregateMetadata.FailedCount > 0,
Query: aggregateMetadata.Query,
QuerySummary: aggregateMetadata.QuerySummary,
Database: strings.Join(aggregateMetadata.Databases, ","),
DistSQL: distSQLCount > 0,
FullScan: fullScanCount > 0,
Failed: failedCount > 0,
Query: query,
QuerySummary: querySummary,
Database: databases,
},
}

var stats appstatspb.StatementStatistics
statsJSON := tree.MustBeDJSON(row[5]).JSON
statsJSON := tree.MustBeDJSON(row[10]).JSON
if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/srvtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package srvtestutils
import (
"encoding/json"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/server/apiconstants"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -85,6 +86,18 @@ func GetStatusJSONProtoWithAdminOption(
return serverutils.GetJSONProtoWithAdminOption(ts, apiconstants.StatusPrefix+path, response, isAdmin)
}

// GetStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but
// the caller can specify an additional timeout duration for the request.
func GetStatusJSONProtoWithAdminAndTimeoutOption(
ts serverutils.ApplicationLayerInterface,
path string,
response protoutil.Message,
isAdmin bool,
additionalTimeout time.Duration,
) error {
return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, apiconstants.StatusPrefix+path, response, isAdmin, additionalTimeout)
}

// PostStatusJSONProtoWithAdminOption performs a RPC-over-HTTP request to
// the status endpoint and unmarshals the response into the specified
// proto message. It allows the caller to control whether the request
Expand Down
22 changes: 22 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -332,6 +333,27 @@ func GetJSONProtoWithAdminOption(
return httputil.GetJSON(httpClient, fullURL, response)
}

// GetJSONProtoWithAdminAndTimeoutOption is like GetJSONProtoWithAdminOption but
// the caller can specify an additional timeout duration for the request.
func GetJSONProtoWithAdminAndTimeoutOption(
ts ApplicationLayerInterface,
path string,
response protoutil.Message,
isAdmin bool,
additionalTimeout time.Duration,
) error {
httpClient, err := ts.GetAuthenticatedHTTPClient(isAdmin, SingleTenantSession)
if err != nil {
return err
}
httpClient.Timeout += additionalTimeout
u := ts.AdminURL().String()
fullURL := u + path
log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL)
log.Infof(context.Background(), "set HTTP client timeout to: %s", httpClient.Timeout)
return httputil.GetJSON(httpClient, fullURL, response)
}

// PostJSONProto uses the supplied client to POST the URL specified by the parameters
// and unmarshals the result into response.
func PostJSONProto(
Expand Down

0 comments on commit b62647f

Please sign in to comment.