Skip to content

Commit

Permalink
server: use system stats view for stats overview queries
Browse files Browse the repository at this point in the history
This commit changes the table used for the combined
stats endpoint from the view combining persisted and
in-memory stats to the view that is just a wrapper
around the system table. Thus, we are now reading
only persisted stats from the system table for the
combined stats endpoint.

This commit updates tests using the combined api to
flush stats before using the api.

Epic: None
Release note (ui change): On the SQL Activity fingerprints
pages, users will not see stats that have not yet been
flushed to disk.
  • Loading branch information
xinhaoz committed Mar 20, 2023
1 parent 1e31033 commit 19a5821
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 108 deletions.
51 changes: 16 additions & 35 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,9 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
require.NoError(t, err)

request := &serverpb.StatementsRequest{}
combinedStatsRequest := &serverpb.CombinedStatementsStatsRequest{}
var tenantStats *serverpb.StatementsResponse
var tenantCombinedStats *serverpb.StatementsResponse

// Populate `tenantStats` and `tenantCombinedStats`. The tenant server
// Populate `tenantStats`. The tenant server
// `Statements` and `CombinedStatements` methods are backed by the
// sqlinstance system which uses a cache populated through rangefeed
// for keeping track of SQL pod data. We use `SucceedsSoon` to eliminate
Expand All @@ -362,10 +360,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
return errors.New("tenant statements are unexpectedly empty")
}

tenantCombinedStats, err = tenantStatusServer.CombinedStatementStats(ctx, combinedStatsRequest)
if tenantCombinedStats == nil || len(tenantCombinedStats.Statements) == 0 {
return errors.New("tenant combined statements are unexpectedly empty")
}
return nil
})

Expand All @@ -374,9 +368,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
err = serverutils.GetJSONProto(nonTenant, path, &nonTenantStats)
require.NoError(t, err)

path = "/_status/combinedstmts"
var nonTenantCombinedStats serverpb.StatementsResponse
err = serverutils.GetJSONProto(nonTenant, path, &nonTenantCombinedStats)
require.NoError(t, err)

checkStatements := func(t *testing.T, tc []testCase, actual *serverpb.StatementsResponse) {
Expand Down Expand Up @@ -414,13 +405,11 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
// First we verify that we have expected stats from tenants.
t.Run("tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseTenant, tenantStats)
checkStatements(t, testCaseTenant, tenantCombinedStats)
})

// Now we verify the non tenant stats are what we expected.
t.Run("non-tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseNonTenant, &nonTenantStats)
checkStatements(t, testCaseNonTenant, &nonTenantCombinedStats)
})

// Now we verify that tenant and non-tenant have no visibility into each other's stats.
Expand All @@ -437,17 +426,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
}
}

for _, tenantStmt := range tenantCombinedStats.Statements {
for _, nonTenantStmt := range nonTenantCombinedStats.Statements {
require.NotEqual(t, tenantStmt, nonTenantStmt, "expected tenant to have no visibility to non-tenant's statement stats, but found:", nonTenantStmt)
}
}

for _, tenantTxn := range tenantCombinedStats.Transactions {
for _, nonTenantTxn := range nonTenantCombinedStats.Transactions {
require.NotEqual(t, tenantTxn, nonTenantTxn, "expected tenant to have no visibility to non-tenant's transaction stats, but found:", nonTenantTxn)
}
}
})
}

Expand All @@ -463,43 +441,46 @@ func testResetSQLStatsRPCForTenant(
testCluster := testHelper.TestCluster()
controlCluster := testHelper.ControlCluster()

// Disable automatic flush to ensure tests are deterministic.
// Set automatic flush to some long duration we'll never hit to
// ensure tests are deterministic.
testCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")
controlCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")

defer func() {
// Cleanup
testCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")
controlCluster.TenantConn(0 /* idx */).
Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")

}()

for _, flushed := range []bool{false, true} {
testTenant := testCluster.Tenant(serverccl.RandomServer)
testTenantConn := testTenant.GetTenantConn()
t.Run(fmt.Sprintf("flushed=%t", flushed), func(t *testing.T) {
// Clears the SQL Stats at the end of each test via builtin.
defer func() {
testCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
testTenantConn.Exec(t, "SELECT crdb_internal.reset_sql_stats()")
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
}()

for _, stmt := range stmts {
testCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
testTenantConn.Exec(t, stmt)
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
}

if flushed {
testCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
testTenant.TenantSQLStats().Flush(ctx)
controlCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
}

status := testCluster.TenantStatusSrv(serverccl.RandomServer)
status := testTenant.TenantStatusSrv()

statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -513,7 +494,7 @@ func testResetSQLStatsRPCForTenant(
require.NoError(t, err)

statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -538,7 +519,7 @@ func testResetSQLStatsRPCForTenant(
// Ensures that sql stats reset is isolated by tenant boundary.
statsFromControlCluster, err :=
controlCluster.TenantStatusSrv(serverccl.RandomServer).Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand Down
59 changes: 18 additions & 41 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ SELECT
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.statement_statistics %s
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
Expand Down Expand Up @@ -364,7 +364,7 @@ SELECT
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics %s
FROM crdb_internal.transaction_statistics_persisted %s
GROUP BY
app_name,
fingerprint_id,
Expand Down Expand Up @@ -583,18 +583,17 @@ func getTotalStatementDetails(
query := fmt.Sprintf(
`SELECT
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
aggregation_interval,
agg_interval,
array_agg(app_name) as app_names,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
encode(fingerprint_id, 'hex') as fingerprint_id
FROM crdb_internal.statement_statistics %s
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
aggregation_interval,
agg_interval,
fingerprint_id
LIMIT 1`, whereClause)

const expectedNumDatums = 6
const expectedNumDatums = 5
var statement serverpb.StatementDetailsResponse_CollectedStatementSummary

row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil,
Expand Down Expand Up @@ -632,13 +631,6 @@ func getTotalStatementDetails(
return statement, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[4]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return statement, serverError(ctx, err)
}
statistics.Stats.SensitiveInfo.MostRecentPlanDescription = *plan

queryTree, err := parser.ParseOne(aggregatedMetadata.Query)
if err != nil {
return statement, serverError(ctx, err)
Expand All @@ -648,7 +640,7 @@ func getTotalStatementDetails(
cfg.LineWidth = tree.ConsoleLineWidth
aggregatedMetadata.FormattedQuery = cfg.Pretty(queryTree.AST)

aggregatedMetadata.FingerprintID = string(tree.MustBeDString(row[5]))
aggregatedMetadata.FingerprintID = string(tree.MustBeDString(row[4]))

statement = serverpb.StatementDetailsResponse_CollectedStatementSummary{
Metadata: aggregatedMetadata,
Expand All @@ -674,17 +666,16 @@ func getStatementDetailsPerAggregatedTs(
aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics %s
agg_interval
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
aggregated_ts,
aggregation_interval
agg_interval
ORDER BY aggregated_ts ASC
LIMIT $%d`, whereClause, len(args)+1)

args = append(args, limit)
const expectedNumDatums = 5
const expectedNumDatums = 4

it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil,
sessiondata.NodeUserSessionDataOverride, query, args...)
Expand Down Expand Up @@ -726,14 +717,7 @@ func getStatementDetailsPerAggregatedTs(
return nil, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[3]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return nil, serverError(ctx, err)
}
metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan

aggInterval := tree.MustBeDInterval(row[4]).Duration
aggInterval := tree.MustBeDInterval(row[3]).Duration

stmt := serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs{
AggregatedTs: aggregatedTs,
Expand Down Expand Up @@ -829,17 +813,16 @@ func getStatementDetailsPerPlanHash(
(statistics -> 'statistics' -> 'planGists'->>0) as plan_gist,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
aggregation_interval,
agg_interval,
index_recommendations
FROM crdb_internal.statement_statistics %s
FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
plan_hash,
plan_gist,
aggregation_interval,
agg_interval,
index_recommendations
LIMIT $%d`, whereClause, len(args)+1)
expectedNumDatums := 7
expectedNumDatums := 6

args = append(args, limit)

Expand Down Expand Up @@ -891,15 +874,9 @@ func getStatementDetailsPerPlanHash(
return nil, serverError(ctx, err)
}

planJSON := tree.MustBeDJSON(row[4]).JSON
plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
if err != nil {
return nil, serverError(ctx, err)
}
metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan
aggInterval := tree.MustBeDInterval(row[5]).Duration
aggInterval := tree.MustBeDInterval(row[4]).Duration

recommendations := tree.MustBeDArray(row[6])
recommendations := tree.MustBeDArray(row[5])
var idxRecommendations []string
for _, s := range recommendations.Array {
idxRecommendations = util.CombineUniqueString(idxRecommendations, []string{string(tree.MustBeDString(s))})
Expand Down
4 changes: 0 additions & 4 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ func (s *statusServer) Statements(
combinedRequest := serverpb.CombinedStatementsStatsRequest{
Start: req.Start,
End: req.End,
<<<<<<< HEAD
=======
Limit: req.Limit,
>>>>>>> 20292009d1f (server: add limit and sort capabilities to sql stats requests)
}
return s.CombinedStatementStats(ctx, &combinedRequest)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestClusterResetSQLStats(t *testing.T) {
}

statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand All @@ -339,7 +339,7 @@ func TestClusterResetSQLStats(t *testing.T) {
require.NoError(t, err)

statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Combined: true,
Combined: flushed,
})
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 19a5821

Please sign in to comment.