Skip to content

Commit

Permalink
server: use system stats view for stats overview queries
Browse files Browse the repository at this point in the history
This commit changes the table used for the combined
stats endpoint from the view combining persisted and
in-memory stats to the view that is just a wrapper
around the system table. Thus, we are now reading
only persisted stats from the system table for the
combined stats endpoint.

This commit updates tests using the combined api to
flush stats before using the api.

Epic: None
Release note (ui change): On the SQL Activity fingerprints
pages, users will not see stats that have not yet been
flushed to disk.
  • Loading branch information
xinhaoz committed May 16, 2023
1 parent 41f93d7 commit 94e9f8e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 129 deletions.
53 changes: 16 additions & 37 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,9 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
require.NoError(t, err)

request := &serverpb.StatementsRequest{}
combinedStatsRequest := &serverpb.CombinedStatementsStatsRequest{}
var tenantStats *serverpb.StatementsResponse
var tenantCombinedStats *serverpb.StatementsResponse

// Populate `tenantStats` and `tenantCombinedStats`. The tenant server
// Populate `tenantStats`. The tenant server
// `Statements` and `CombinedStatements` methods are backed by the
// sqlinstance system which uses a cache populated through rangefeed
// for keeping track of SQL pod data. We use `SucceedsSoon` to eliminate
Expand All @@ -206,10 +204,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
return errors.New("tenant statements are unexpectedly empty")
}

tenantCombinedStats, err = tenantStatusServer.CombinedStatementStats(ctx, combinedStatsRequest)
if tenantCombinedStats == nil || len(tenantCombinedStats.Statements) == 0 {
return errors.New("tenant combined statements are unexpectedly empty")
}
return nil
})

Expand All @@ -218,11 +212,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
err = serverutils.GetJSONProto(nonTenant, path, &nonTenantStats)
require.NoError(t, err)

path = "/_status/combinedstmts"
var nonTenantCombinedStats serverpb.StatementsResponse
err = serverutils.GetJSONProto(nonTenant, path, &nonTenantCombinedStats)
require.NoError(t, err)

checkStatements := func(t *testing.T, tc []testCase, actual *serverpb.StatementsResponse) {
t.Helper()
var expectedStatements []string
Expand Down Expand Up @@ -258,13 +247,11 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
// First we verify that we have expected stats from tenants.
t.Run("tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseTenant, tenantStats)
checkStatements(t, testCaseTenant, tenantCombinedStats)
})

// Now we verify the non tenant stats are what we expected.
t.Run("non-tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseNonTenant, &nonTenantStats)
checkStatements(t, testCaseNonTenant, &nonTenantCombinedStats)
})

// Now we verify that tenant and non-tenant have no visibility into each other's stats.
Expand All @@ -281,17 +268,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
}
}

for _, tenantStmt := range tenantCombinedStats.Statements {
for _, nonTenantStmt := range nonTenantCombinedStats.Statements {
require.NotEqual(t, tenantStmt, nonTenantStmt, "expected tenant to have no visibility to non-tenant's statement stats, but found:", nonTenantStmt)
}
}

for _, tenantTxn := range tenantCombinedStats.Transactions {
for _, nonTenantTxn := range nonTenantCombinedStats.Transactions {
require.NotEqual(t, tenantTxn, nonTenantTxn, "expected tenant to have no visibility to non-tenant's transaction stats, but found:", nonTenantTxn)
}
}
})
}

Expand All @@ -307,43 +283,46 @@ func testResetSQLStatsRPCForTenant(
testCluster := testHelper.TestCluster()
controlCluster := testHelper.ControlCluster()

// Disable automatic flush to ensure tests are deterministic.
// Set automatic flush to some long duration we'll never hit to
// ensure tests are deterministic.
testCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")
controlCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")

defer func() {
// Cleanup
testCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")
controlCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")

}()

for _, flushed := range []bool{false, true} {
testTenant := testCluster.Tenant(serverccl.RandomServer)
testTenantConn := testTenant.GetTenantConn()
t.Run(fmt.Sprintf("flushed=%t", flushed), func(t *testing.T) {
// Clears the SQL Stats at the end of each test via builtin.
defer func() {
testCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
testTenantConn.Exec(t, "SELECT crdb_internal.reset_sql_stats()")
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
}()

for _, stmt := range stmts {
testCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
testTenantConn.Exec(t, stmt)
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
}

if flushed {
testCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
testTenant.TenantSQLStats().Flush(ctx)
controlCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
}

status := testCluster.TenantStatusSrv(serverccl.RandomServer)
status := testTenant.TenantStatusSrv()

statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -357,7 +336,7 @@ func testResetSQLStatsRPCForTenant(
require.NoError(t, err)

statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -382,7 +361,7 @@ func testResetSQLStatsRPCForTenant(
// Ensures that sql stats reset is isolated by tenant boundary.
statsFromControlCluster, err :=
controlCluster.TenantStatusSrv(serverccl.RandomServer).Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand Down
88 changes: 24 additions & 64 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ SELECT
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
Expand Down Expand Up @@ -356,7 +356,7 @@ SELECT
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
FROM crdb_internal.transaction_statistics_persisted %s
GROUP BY
app_name,
fingerprint_id,
Expand Down Expand Up @@ -577,18 +577,14 @@ func getTotalStatementDetails(
query := fmt.Sprintf(
`SELECT
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
aggregation_interval,
array_agg(app_name) as app_names,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
encode(fingerprint_id, 'hex') as fingerprint_id
FROM crdb_internal.statement_statistics %s
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
aggregation_interval,
fingerprint_id
LIMIT 1`, whereClause)

const expectedNumDatums = 6
const expectedNumDatums = 3
var statement serverpb.StatementDetailsResponse_CollectedStatementSummary

row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil,
Expand All @@ -614,34 +610,23 @@ func getTotalStatementDetails(
return statement, serverError(ctx, err)
}

aggInterval := tree.MustBeDInterval(row[1]).Duration

apps := tree.MustBeDArray(row[2])
apps := tree.MustBeDArray(row[1])
var appNames []string
for _, s := range apps.Array {
appNames = util.CombineUniqueString(appNames, []string{string(tree.MustBeDString(s))})
}
aggregatedMetadata.AppNames = appNames

statsJSON := tree.MustBeDJSON(row[3]).JSON
statsJSON := tree.MustBeDJSON(row[2]).JSON
if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &statistics.Stats); err != nil {
return statement, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[4]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return statement, serverError(ctx, err)
}
statistics.Stats.SensitiveInfo.MostRecentPlanDescription = *plan

aggregatedMetadata.FormattedQuery = aggregatedMetadata.Query
aggregatedMetadata.FingerprintID = string(tree.MustBeDString(row[5]))

statement = serverpb.StatementDetailsResponse_CollectedStatementSummary{
Metadata: aggregatedMetadata,
AggregationInterval: time.Duration(aggInterval.Nanos()),
Stats: statistics.Stats,
Metadata: aggregatedMetadata,
Stats: statistics.Stats,
}

return statement, nil
Expand All @@ -661,18 +646,15 @@ func getStatementDetailsPerAggregatedTs(
`SELECT
aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics %s
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
aggregated_ts,
aggregation_interval
aggregated_ts
ORDER BY aggregated_ts ASC
LIMIT $%d`, whereClause, len(args)+1)

args = append(args, limit)
const expectedNumDatums = 5
const expectedNumDatums = 3

it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -716,20 +698,10 @@ func getStatementDetailsPerAggregatedTs(
return nil, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[3]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return nil, serverError(ctx, err)
}
metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan

aggInterval := tree.MustBeDInterval(row[4]).Duration

stmt := serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs{
AggregatedTs: aggregatedTs,
AggregationInterval: time.Duration(aggInterval.Nanos()),
Stats: metadata.Stats,
Metadata: aggregatedMetadata,
AggregatedTs: aggregatedTs,
Stats: metadata.Stats,
Metadata: aggregatedMetadata,
}

statements = append(statements, stmt)
Expand Down Expand Up @@ -790,18 +762,15 @@ func getStatementDetailsPerPlanHash(
plan_hash,
(statistics -> 'statistics' -> 'planGists'->>0) as plan_gist,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics %s
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
plan_hash,
plan_gist,
aggregation_interval
plan_gist
LIMIT $%d`, whereClause, len(args)+1)

args = append(args, limit)
const expectedNumDatums = 6
const expectedNumDatums = 4

it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash", nil,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -853,14 +822,6 @@ func getStatementDetailsPerPlanHash(
return nil, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[4]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return nil, serverError(ctx, err)
}
metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan
aggInterval := tree.MustBeDInterval(row[5]).Duration

// A metadata is unique for each plan, meaning if any of the counts are greater than zero,
// we can update the value of each count with the execution count of this plan hash to
// have the correct count of each metric.
Expand All @@ -879,11 +840,10 @@ func getStatementDetailsPerPlanHash(
aggregatedMetadata.TotalCount = metadata.Stats.Count

stmt := serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash{
AggregationInterval: time.Duration(aggInterval.Nanos()),
ExplainPlan: explainPlan,
PlanHash: planHash,
Stats: metadata.Stats,
Metadata: aggregatedMetadata,
ExplainPlan: explainPlan,
PlanHash: planHash,
Stats: metadata.Stats,
Metadata: aggregatedMetadata,
}

statements = append(statements, stmt)
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestClusterResetSQLStats(t *testing.T) {
}

statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -339,7 +339,7 @@ func TestClusterResetSQLStats(t *testing.T) {
require.NoError(t, err)

statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 94e9f8e

Please sign in to comment.