Skip to content

Commit

Permalink
sqlstats: unskip tests hitting combinedstmts and stmts endpoints
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#109184.

Previously, tests which hit the `combinedstmts` and `statements`
endpoints were skipped under stress because they would occaisonally
fail. This commit unskips these tests and instead of unmarshalling the
metadata JSON blob, the select query directly extracts the values needed
from it. The http client's timeout is also increased by 30s.

Release note: None
  • Loading branch information
gtr committed Aug 28, 2023
1 parent 1577334 commit 3088301
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 56 deletions.
113 changes: 74 additions & 39 deletions pkg/server/application_api/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package application_api_test
import (
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -48,8 +49,11 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

var params base.TestServerArgs
params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919.
Expand Down Expand Up @@ -128,7 +132,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) {

// Hit query endpoint.
var resp serverpb.StatementsResponse
if err := srvtestutils.GetStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -383,8 +387,11 @@ func TestStatusAPIStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -425,14 +432,14 @@ func TestStatusAPIStatements(t *testing.T) {

// Test that non-admin without VIEWACTIVITY privileges cannot access.
var resp serverpb.StatementsResponse
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "statements", &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "statements", &resp, false, additionalTimeout)
if !testutils.IsError(err, "status: 403") {
t.Fatalf("expected privilege error, got %v", err)
}

testPath := func(path string, expectedStmts []string) {
// Hit query endpoint.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -497,8 +504,11 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 30 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand All @@ -519,6 +529,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {

endpoint := fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs)
findJobQuery := "SELECT status FROM [SHOW JOBS] WHERE statement = 'CREATE INDEX idx_age ON football.public.players (age) STORING (name)';"
testAppName := "TestCombinedStatementsWithFullScans"

firstServerProto := testCluster.Server(0).ApplicationLayer()
sqlSB := testCluster.ServerConn(0)
Expand All @@ -532,6 +543,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
}

thirdServerSQL.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", apiconstants.TestingUserNameNoAdmin().Normalized()))
thirdServerSQL.Exec(t, fmt.Sprintf(`SET application_name = '%s'`, testAppName))

type TestCases struct {
stmt string
Expand All @@ -542,7 +554,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
count int
}

// These test statements are executed before any indexes are introduced.
// These test statements are executed before the index is introduced.
statementsBeforeIndex := []TestCases{
{stmt: `CREATE DATABASE football`, respQuery: `CREATE DATABASE football`, fullScan: false, distSQL: false, failed: false, count: 1},
{stmt: `SET database = football`, respQuery: `SET database = football`, fullScan: false, distSQL: false, failed: false, count: 1},
Expand All @@ -566,22 +578,24 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
{stmt: `SELECT name FROM players WHERE age >= 32`, respQuery: `SELECT name FROM players WHERE age >= _`, fullScan: true, distSQL: true, failed: false, count: 2},
}

type StatementData struct {
type ExpectedStatementData struct {
count int
fullScan bool
distSQL bool
failed bool
}

// Declare the map outside of the executeStatements function.
statementDataMap := make(map[string]StatementData)
// expectedStatementStatsMap maps the query response format to the associated
// expected statement statistics for verification.
expectedStatementStatsMap := make(map[string]ExpectedStatementData)

// Helper function to execute the statements and store the expected statement
executeStatements := func(statements []TestCases) {
// For each statement in the test case, execute the statement and store the
// expected statement data in a map.
// Clear the map at the start of each execution batch.
expectedStatementStatsMap = make(map[string]ExpectedStatementData)
for _, stmt := range statements {
thirdServerSQL.Exec(t, stmt.stmt)
statementDataMap[stmt.respQuery] = StatementData{
expectedStatementStatsMap[stmt.respQuery] = ExpectedStatementData{
fullScan: stmt.fullScan,
distSQL: stmt.distSQL,
failed: stmt.failed,
Expand All @@ -590,34 +604,54 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
}
}

// Helper function to convert a response into a JSON string representation.
responseToJSON := func(resp interface{}) string {
bytes, err := json.Marshal(resp)
if err != nil {
t.Fatal(err)
}
return string(bytes)
}

// Helper function to verify the combined statement statistics response.
verifyCombinedStmtStats := func() {
err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, endpoint, &resp, false)
err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, endpoint, &resp, false, additionalTimeout)
require.NoError(t, err)

// actualResponseStatsMap maps the query response format to the actual
// statement statistics received from the server response.
actualResponseStatsMap := make(map[string]serverpb.StatementsResponse_CollectedStatementStatistics)
for _, respStatement := range resp.Statements {
respQuery := respStatement.Key.KeyData.Query
// Skip failed statements: The test app may encounter transient 40001
// errors that are automatically retried. Thus, we only consider
// statements that were that were successfully executed by the test app
// to avoid counting such failures. If a statement that we expect to be
// successful is not found in the response, the test will fail later.
if respStatement.Key.KeyData.App == testAppName && !respStatement.Key.KeyData.Failed {
actualResponseStatsMap[respStatement.Key.KeyData.Query] = respStatement
}
}

for respQuery, expectedData := range expectedStatementStatsMap {
respStatement, exists := actualResponseStatsMap[respQuery]
require.True(t, exists, "Expected statement '%s' not found in response: %v", respQuery, responseToJSON(resp))

actualCount := respStatement.Stats.Count
actualFullScan := respStatement.Key.KeyData.FullScan
actualDistSQL := respStatement.Key.KeyData.DistSQL
actualFailed := respStatement.Key.KeyData.Failed
// If the response has a query that isn't in our map, it means that it's
// not part of our test, so we ignore it.
expectedData, ok := statementDataMap[respQuery]
if !ok {
continue
}

require.Equal(t, expectedData.fullScan, actualFullScan)
require.Equal(t, expectedData.distSQL, actualDistSQL)
require.Equal(t, expectedData.failed, actualFailed)
require.Equal(t, expectedData.count, int(actualCount))
stmtJSONString := responseToJSON(respStatement)

require.Equal(t, expectedData.fullScan, actualFullScan, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.distSQL, actualDistSQL, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.failed, actualFailed, "failed for respStatement: %v", stmtJSONString)
require.Equal(t, expectedData.count, int(actualCount), "failed for respStatement: %v", stmtJSONString)
}
}

// Execute the queries that will be executed before the index is created.
// Execute and verify the queries that will be executed before the index is created.
executeStatements(statementsBeforeIndex)

// Test the statements that will be executed before the index is created.
verifyCombinedStmtStats()

// Execute the queries that will create the index.
Expand All @@ -641,19 +675,20 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
return nil
}, 3*time.Second)

// Execute the queries that will be executed after the index is created.
// Execute and verify the queries that will be executed after the index is created.
executeStatements(statementsAfterIndex)

// Test the statements that will be executed after the index is created.
verifyCombinedStmtStats()
}

func TestStatusAPICombinedStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip under stress until we extend the timeout for the http client.
skip.UnderStressWithIssue(t, 109184)
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
if skip.Stress() {
additionalTimeout = 45 * time.Second
}

// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
Expand Down Expand Up @@ -701,7 +736,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {

verifyStmts := func(path string, expectedStmts []string, hasTxns bool, t *testing.T) {
// Hit query endpoint.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, path, &resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -798,8 +833,8 @@ func TestStatusAPICombinedStatements(t *testing.T) {
t.Run("fetch_mode=TxnsOnly with limit", func(t *testing.T) {
// Verify that we only return stmts for the txns in the response.
// We'll add a limit in a later commit to help verify this behaviour.
if err := srvtestutils.GetStatusJSONProtoWithAdminOption(firstServerProto, "combinedstmts?fetch_mode.stats_type=1&limit=2",
&resp, false); err != nil {
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts?fetch_mode.stats_type=1&limit=2",
&resp, false, additionalTimeout); err != nil {
t.Fatal(err)
}

Expand Down
62 changes: 45 additions & 17 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package server

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -649,11 +650,22 @@ func collectCombinedStatements(
tableSuffix string,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
aostClause := testingKnobs.GetAOSTClause()
const expectedNumDatums = 6
const expectedNumDatums = 11
const queryFormat = `
SELECT *
SELECT
fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount,
metadata ->> 'query' AS query,
metadata ->> 'querySummary' AS querySummary,
metadata -> 'db' AS databases,
statistics
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
Expand All @@ -676,9 +688,20 @@ FROM (SELECT fingerprint_id,
ie,
// The statement activity table has aggregated metadata.
`
SELECT *
SELECT
fingerprint_id,
txn_fingerprints,
app_name,
aggregated_ts,
COALESCE(CAST(metadata -> 'distSQLCount' AS INT), 0) AS distSQLCount,
COALESCE(CAST(metadata -> 'fullScanCount' AS INT), 0) AS fullScanCount,
COALESCE(CAST(metadata -> 'failedCount' AS INT), 0) AS failedCount,
metadata ->> 'query' AS query,
metadata ->> 'querySummary' AS querySummary,
metadata -> 'db' AS databases,
statistics
FROM (SELECT fingerprint_id,
array_agg(distinct transaction_fingerprint_id),
array_agg(distinct transaction_fingerprint_id) AS txn_fingerprints,
app_name,
max(aggregated_ts) AS aggregated_ts,
crdb_internal.merge_aggregated_stmt_metadata(array_agg(metadata)) AS metadata,
Expand Down Expand Up @@ -768,28 +791,33 @@ FROM (SELECT fingerprint_id,
app := string(tree.MustBeDString(row[2]))

aggregatedTs := tree.MustBeDTimestampTZ(row[3]).Time

// The metadata is aggregated across all the statements with the same fingerprint.
aggregateMetadataJSON := tree.MustBeDJSON(row[4]).JSON
var aggregateMetadata appstatspb.AggregatedStatementMetadata
if err = sqlstatsutil.DecodeAggregatedMetadataJSON(aggregateMetadataJSON, &aggregateMetadata); err != nil {
distSQLCount := int64(*row[4].(*tree.DInt))
fullScanCount := int64(*row[5].(*tree.DInt))
failedCount := int64(*row[6].(*tree.DInt))
query := string(tree.MustBeDString(row[7]))
querySummary := string(tree.MustBeDString(row[8]))

// The databases column is a JSON array of strings. It must be unmarshalled.
databasesDJSON := tree.MustBeDJSON(row[9]).JSON
var databases []string
if err := json.Unmarshal([]byte(databasesDJSON.String()), &databases); err != nil {
return nil, srverrors.ServerError(ctx, err)
}

metadata := appstatspb.CollectedStatementStatistics{
Key: appstatspb.StatementStatisticsKey{
App: app,
DistSQL: aggregateMetadata.DistSQLCount > 0,
FullScan: aggregateMetadata.FullScanCount > 0,
Failed: aggregateMetadata.FailedCount > 0,
Query: aggregateMetadata.Query,
QuerySummary: aggregateMetadata.QuerySummary,
Database: strings.Join(aggregateMetadata.Databases, ","),
DistSQL: distSQLCount > 0,
FullScan: fullScanCount > 0,
Failed: failedCount > 0,
Query: query,
QuerySummary: querySummary,
Database: strings.Join(databases, ","),
},
}

var stats appstatspb.StatementStatistics
statsJSON := tree.MustBeDJSON(row[5]).JSON
statsJSON := tree.MustBeDJSON(row[10]).JSON
if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &stats); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/srvtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package srvtestutils
import (
"encoding/json"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/server/apiconstants"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -85,6 +86,18 @@ func GetStatusJSONProtoWithAdminOption(
return serverutils.GetJSONProtoWithAdminOption(ts, apiconstants.StatusPrefix+path, response, isAdmin)
}

// GetStatusJSONProtoWithAdminAndTimeoutOption is similar to GetStatusJSONProtoWithAdminOption, but
// the caller can specify an additional timeout duration for the request.
func GetStatusJSONProtoWithAdminAndTimeoutOption(
ts serverutils.ApplicationLayerInterface,
path string,
response protoutil.Message,
isAdmin bool,
additionalTimeout time.Duration,
) error {
return serverutils.GetJSONProtoWithAdminAndTimeoutOption(ts, apiconstants.StatusPrefix+path, response, isAdmin, additionalTimeout)
}

// PostStatusJSONProtoWithAdminOption performs a RPC-over-HTTP request to
// the status endpoint and unmarshals the response into the specified
// proto message. It allows the caller to control whether the request
Expand Down
Loading

0 comments on commit 3088301

Please sign in to comment.