From 44e6054aa5729f168d9ecaac1856dd25e1760159 Mon Sep 17 00:00:00 2001 From: j82w Date: Tue, 11 Jul 2023 10:24:09 -0400 Subject: [PATCH] sql: fix StatementStatistics.Nodes list 1. Fixes the nodes list to show all the nodes in StatementStatistics.Nodes. 2. The node list is now empty if tracing is disabled. Previously it would always include the current gateway node id, but it would be missing all the other nodes. This causes confusion because it's uncertain whether the node list is complete or not. 3. Fixes regions on `EXPLAIN ANALYSE (DISTSQL)` to show regions information on secondary tenenants. It was not shown before because only system tenants have acces to gossip which is used under the covers to get the node descriptors. 4. Fixes the performance issues previously listed in #102170. 5. Fixes the test to actually validate the nodes list. The fix was done by adding the region name to the Flow ComponentID. This means the region name is now part of the traces for the Flow ComponentID, so it no longer needs figure out the region. It gets the region information from the same trace the SQL Instance ID is obtained. Moving the collection to the QueryLevelStats avoids iterating the traces multiple times. Fixes: #102170, #96647, #91219 Epic: none Release note (bug fix): Fixed the StatementStatistics.Nodes to contain all the nodes involved in the query. Fixed region info in `EXPLAIN ANALYSE (DISTSQL)` for seconary tenants. --- pkg/ccl/testccl/sqlstatsccl/BUILD.bazel | 2 + pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go | 174 ++++++++++++++---- pkg/sql/colflow/vectorized_flow.go | 9 +- pkg/sql/conn_executor_exec.go | 13 +- pkg/sql/exec_log.go | 2 +- pkg/sql/execinfra/flow_context.go | 6 + pkg/sql/execinfrapb/component_stats.go | 3 +- pkg/sql/execinfrapb/component_stats.proto | 4 + pkg/sql/execstats/BUILD.bazel | 1 + pkg/sql/execstats/traceanalyzer.go | 26 +++ pkg/sql/execstats/traceanalyzer_test.go | 22 ++- pkg/sql/executor_statement_metrics.go | 41 ++--- pkg/sql/flowinfra/flow.go | 2 +- pkg/sql/flowinfra/outbox.go | 2 +- pkg/sql/instrumentation.go | 29 +-- .../exec/execbuilder/testdata/distsql_misc | 1 + .../sqlstats/ssmemstorage/ss_mem_writer.go | 4 +- pkg/sql/sqlstats/ssprovider.go | 1 - 18 files changed, 229 insertions(+), 113 deletions(-) diff --git a/pkg/ccl/testccl/sqlstatsccl/BUILD.bazel b/pkg/ccl/testccl/sqlstatsccl/BUILD.bazel index 2268b4dd377a..a3b53447f87e 100644 --- a/pkg/ccl/testccl/sqlstatsccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlstatsccl/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/appstatspb", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", @@ -25,6 +26,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go b/pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go index c1e6ac1bee5d..51cfa9a31337 100644 --- a/pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go +++ b/pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go @@ -13,19 +13,25 @@ import ( gosql "database/sql" "encoding/json" "fmt" + "strconv" + "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -33,6 +39,7 @@ func TestSQLStatsRegions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "test is to slow for race") skip.UnderStress(t, "test is too heavy to run under stress") // We build a small multiregion cluster, with the proper settings for @@ -44,7 +51,7 @@ func TestSQLStatsRegions(t *testing.T) { sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &st.SV, true) sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &st.SV, true) - numServers := 9 + numServers := 3 regionNames := []string{ "gcp-us-west1", "gcp-us-central1", @@ -52,8 +59,10 @@ func TestSQLStatsRegions(t *testing.T) { } serverArgs := make(map[int]base.TestServerArgs) + signalAfter := make([]chan struct{}, numServers) for i := 0; i < numServers; i++ { - serverArgs[i] = base.TestServerArgs{ + signalAfter[i] = make(chan struct{}) + args := base.TestServerArgs{ Settings: st, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i%len(regionNames)]}}, @@ -61,13 +70,59 @@ func TestSQLStatsRegions(t *testing.T) { // We'll start our own test tenant manually below. DefaultTestTenant: base.TestTenantDisabled, } + + serverKnobs := &server.TestingKnobs{ + SignalAfterGettingRPCAddress: signalAfter[i], + } + + args.Knobs.Server = serverKnobs + serverArgs[i] = args } host := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ ServerArgsPerNode: serverArgs, + ParallelStart: true, }) defer host.Stopper().Stop(ctx) + go func() { + for _, c := range signalAfter { + <-c + } + }() + + tdb := sqlutils.MakeSQLRunner(host.ServerConn(1)) + + // Shorten the closed timestamp target duration so that span configs + // propagate more rapidly. + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`) + tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off") + tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'") + // Lengthen the lead time for the global tables to prevent overload from + // resulting in delays in propagating closed timestamps and, ultimately + // forcing requests from being redirected to the leaseholder. Without this + // change, the test sometimes is flakey because the latency budget allocated + // to closed timestamp propagation proves to be insufficient. This value is + // very cautious, and makes this already slow test even slower. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'") + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`) + tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`) + + tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true") + tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.multi_region.allow_abstractions_for_secondary_tenants.enabled = true") + tdb.Exec(t, `ALTER RANGE meta configure zone using constraints = '{"+region=gcp-us-west1": 1, "+region=gcp-us-central1": 1, "+region=gcp-us-east1": 1}';`) + + // Create secondary tenants + var tenantDbs []*gosql.DB + for _, server := range host.Servers { + _, tenantDb := serverutils.StartTenant(t, server, base.TestTenantArgs{ + Settings: st, + TenantID: roachpb.MustMakeTenantID(11), + Locality: *server.Locality(), + }) + tenantDbs = append(tenantDbs, tenantDb) + } + testCases := []struct { name string db func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner @@ -84,24 +139,16 @@ func TestSQLStatsRegions(t *testing.T) { // connection to the first one. name: "secondary tenant", db: func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner { - var dbs []*gosql.DB - for _, server := range host.Servers { - _, db := serverutils.StartTenant(t, server, base.TestTenantArgs{ - Settings: st, - TenantID: roachpb.MustMakeTenantID(11), - Locality: *server.Locality(), - }) - dbs = append(dbs, db) - } - return sqlutils.MakeSQLRunner(dbs[0]) + return sqlutils.MakeSQLRunner(tenantDbs[1]) }, }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { db := tc.db(t, host, st) - // Create a multi-region database. db.Exec(t, "SET enable_multiregion_placement_policy = true") + db.Exec(t, `SET CLUSTER SETTING sql.txn_stats.sample_rate = 1;`) + db.Exec(t, fmt.Sprintf(`CREATE DATABASE testdb PRIMARY REGION "%s" PLACEMENT RESTRICTED`, regionNames[0])) for i := 1; i < len(regionNames); i++ { db.Exec(t, fmt.Sprintf(`ALTER DATABASE testdb ADD region "%s"`, regionNames[i])) @@ -113,39 +160,86 @@ func TestSQLStatsRegions(t *testing.T) { // Add some data to each region. for i, regionName := range regionNames { - db.Exec(t, "INSERT INTO test (crdb_region, a) VALUES ($1, $2)", regionName, i) + db.Exec(t, "INSERT INTO test (a, crdb_region) VALUES ($1, $2)", i, regionName) } - // Select from the table and see what statement statistics were written. - db.Exec(t, "SET application_name = $1", t.Name()) - db.Exec(t, "SELECT * FROM test") - row := db.QueryRow(t, ` + // It takes a while for the region replication to complete. + testutils.SucceedsWithin(t, func() error { + var expectedNodes []int64 + var expectedRegions []string + _, err := db.DB.ExecContext(ctx, `USE testdb`) + if err != nil { + return err + } + + // Use EXPLAIN ANALYSE (DISTSQL) to get the accurate list of nodes. + explainInfo, err := db.DB.QueryContext(ctx, `EXPLAIN ANALYSE (DISTSQL) SELECT * FROM test`) + if err != nil { + return err + } + for explainInfo.Next() { + var explainStr string + if err := explainInfo.Scan(&explainStr); err != nil { + t.Fatal(err) + } + + explainStr = strings.ReplaceAll(explainStr, " ", "") + // Example str " regions: cp-us-central1,gcp-us-east1,gcp-us-west1" + if strings.HasPrefix(explainStr, "regions:") { + explainStr = strings.ReplaceAll(explainStr, "regions:", "") + explainStr = strings.ReplaceAll(explainStr, " ", "") + expectedRegions = strings.Split(explainStr, ",") + if len(expectedRegions) < len(regionNames) { + return fmt.Errorf("rows are not replicated to all regions %s\n", expectedRegions) + } + } + + // Example str " nodes: n1, n2, n4, n9" + if strings.HasPrefix(explainStr, "nodes:") { + explainStr = strings.ReplaceAll(explainStr, "nodes:", "") + explainStr = strings.ReplaceAll(explainStr, "n", "") + + split := strings.Split(explainStr, ",") + if len(split) < len(regionNames) { + return fmt.Errorf("rows are not replicated to all regions %s\n", split) + } + + // Gateway node was not included in the explain plan. Add it to the list + if split[0] != "1" { + expectedNodes = append(expectedNodes, int64(1)) + } + + for _, val := range split { + node, err := strconv.Atoi(val) + require.NoError(t, err) + expectedNodes = append(expectedNodes, int64(node)) + } + } + } + + // Select from the table and see what statement statistics were written. + db.Exec(t, "SET application_name = $1", t.Name()) + db.Exec(t, "SELECT * FROM test") + row := db.QueryRow(t, ` SELECT statistics->>'statistics' FROM crdb_internal.statement_statistics WHERE app_name = $1`, t.Name()) - var actualJSON string - row.Scan(&actualJSON) - var actual appstatspb.StatementStatistics - err := json.Unmarshal([]byte(actualJSON), &actual) - require.NoError(t, err) - - require.Equal(t, - appstatspb.StatementStatistics{ - // TODO(todd): It appears we do not yet reliably record - // the nodes for the statement. (I have manually verified - // that the above query does indeed fan out across the - // regions, via EXPLAIN (DISTSQL).) Filed as #96647. - //Nodes: []int64{1, 2, 3}, - //Regions: regionNames, - Nodes: []int64{1}, - Regions: []string{regionNames[0]}, - }, - appstatspb.StatementStatistics{ - Nodes: actual.Nodes, - Regions: actual.Regions, - }, - ) + var actualJSON string + row.Scan(&actualJSON) + var actual appstatspb.StatementStatistics + err = json.Unmarshal([]byte(actualJSON), &actual) + require.NoError(t, err) + + // Replication to all regions can take some time to complete. During + // this time a incomplete list will be returned. + if !assert.ObjectsAreEqual(expectedNodes, actual.Nodes) { + return fmt.Errorf("nodes are not equal. Expected: %d, Actual: %d", expectedNodes, actual.Nodes) + } + + require.Equal(t, expectedRegions, actual.Regions) + return nil + }, 3*time.Minute) }) } } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index c50082f963a7..6bcd7e6472b3 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -19,7 +19,6 @@ import ( "time" "unsafe" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -467,9 +466,7 @@ func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector( // statistics that the outbox is responsible for, nil is returned if stats are // not being collected. func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox( - flowCtx *execinfra.FlowCtx, - statsCollectors []colexecop.VectorizedStatsCollector, - originSQLInstanceID base.SQLInstanceID, + flowCtx *execinfra.FlowCtx, statsCollectors []colexecop.VectorizedStatsCollector, ) func(context.Context) []*execinfrapb.ComponentStats { if !s.recordingStats { return nil @@ -489,7 +486,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox( // whole flow from parent monitors. These stats are added to a // flow-level span. result = append(result, &execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(originSQLInstanceID, flowCtx.ID), + Component: flowCtx.FlowComponentID(), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.Mon.MaximumBytes())), MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())), @@ -1069,7 +1066,7 @@ func (s *vectorizedFlowCreator) setupOutput( // Set up an Outbox. outbox, err := s.setupRemoteOutputStream( ctx, flowCtx, pspec.ProcessorID, opWithMetaInfo, opOutputTypes, outputStream, factory, - s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors, outputStream.OriginNodeID), + s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors), ) if err != nil { return err diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index cd38a3628fcd..2e0ad5ee9993 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1691,7 +1691,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ fName: "populate query level stats and regions", f: func() { - populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector) + populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector) ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary( ctx, &curPlanner, int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats, @@ -1699,7 +1699,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( }, }) } else { - populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector) + populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector) stmtFingerprintID = ex.recordStatementSummary( ctx, planner, int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats, @@ -1747,12 +1747,11 @@ func (ex *connExecutor) dispatchToExecutionEngine( return err } -// populateQueryLevelStatsAndRegions collects query-level execution statistics +// populateQueryLevelStats collects query-level execution statistics // and populates it in the instrumentationHelper's queryLevelStatsWithErr field. // Query-level execution statistics are collected using the statement's trace -// and the plan's flow metadata. It also populates the regions field and -// annotates the explainPlan field of the instrumentationHelper. -func populateQueryLevelStatsAndRegions( +// and the plan's flow metadata. +func populateQueryLevelStats( ctx context.Context, p *planner, cfg *ExecutorConfig, @@ -1794,7 +1793,7 @@ func populateQueryLevelStatsAndRegions( } } if ih.traceMetadata != nil && ih.explainPlan != nil { - ih.regions = ih.traceMetadata.annotateExplain( + ih.traceMetadata.annotateExplain( ih.explainPlan, trace, cfg.TestingKnobs.DeterministicExplain, diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 1d4afddcb52b..be658b4a9778 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -358,7 +358,7 @@ func (p *planner) maybeLogStatementInternal( ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(), - Regions: p.curPlan.instrumentation.regions, + Regions: queryLevelStats.Regions, NetworkBytesSent: queryLevelStats.NetworkBytesSent, MaxMemUsage: queryLevelStats.MaxMemUsage, MaxDiskUsage: queryLevelStats.MaxDiskUsage, diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 12bafaf9c2c8..2a87849676ac 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -184,3 +184,9 @@ func (flowCtx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.Component func (flowCtx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { return execinfrapb.StreamComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, streamID) } + +// FlowComponentID returns a ComponentID for the given flow. +func (flowCtx *FlowCtx) FlowComponentID() execinfrapb.ComponentID { + region, _ := flowCtx.Cfg.Locality.Find("region") + return execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, region) +} diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index f4c1d4208d27..f608a0b6de08 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -50,11 +50,12 @@ func StreamComponentID( } // FlowComponentID returns a ComponentID for the given flow. -func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID { +func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID, region string) ComponentID { return ComponentID{ FlowID: flowID, Type: ComponentID_FLOW, SQLInstanceID: instanceID, + Region: region, } } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 905e3cd18d6b..a78f8f1145f0 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -54,6 +54,10 @@ message ComponentID { (gogoproto.nullable) = false, (gogoproto.customname) = "SQLInstanceID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; + + // The region the component is associated with. + // For the initial implementation only ComponentIDs of Flow type might have this set. + optional string region = 5 [(gogoproto.nullable) = false]; } // ComponentStats contains statistics for an execution component. A component is diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index e55a519b3bd6..dfdc9cad329d 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/base", "//pkg/kv/kvpb", "//pkg/sql/execinfrapb", + "//pkg/util", "//pkg/util/buildutil", "//pkg/util/optional", "//pkg/util/protoutil", diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 54e5846e8e14..d431f7c4d1fe 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -11,11 +11,13 @@ package execstats import ( + "sort" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" @@ -158,6 +160,8 @@ type QueryLevelStats struct { ContentionEvents []kvpb.ContentionEvent RUEstimate int64 CPUTime time.Duration + SqlInstanceIds map[base.SQLInstanceID]struct{} + Regions []string } // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks @@ -208,6 +212,15 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) s.RUEstimate += other.RUEstimate s.CPUTime += other.CPUTime + if len(s.SqlInstanceIds) == 0 && len(other.SqlInstanceIds) > 0 { + s.SqlInstanceIds = other.SqlInstanceIds + } else if len(other.SqlInstanceIds) > 0 && len(s.SqlInstanceIds) > 0 { + for id := range other.SqlInstanceIds { + s.SqlInstanceIds[id] = struct{}{} + } + } + + s.Regions = util.CombineUnique(s.Regions, other.Regions) } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -354,13 +367,23 @@ func (a *TraceAnalyzer) ProcessStats() error { } } + instanceIds := make(map[base.SQLInstanceID]struct{}, len(a.flowStats)) + // Default to 1 since most queries only use a single region. + regions := make([]string, 0, 1) + // Process flowStats. for instanceID, stats := range a.flowStats { if stats.stats == nil { continue } + instanceIds[instanceID] = struct{}{} for _, v := range stats.stats { + // Avoid duplicates and empty string. + if v.Component.Region != "" { + regions = util.CombineUnique(regions, []string{v.Component.Region}) + } + if v.FlowStats.MaxMemUsage.HasValue() { if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] { a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] = memUsage @@ -379,7 +402,10 @@ func (a *TraceAnalyzer) ProcessStats() error { // Process query level stats. a.queryLevelStats = QueryLevelStats{} + a.queryLevelStats.SqlInstanceIds = instanceIds + sort.Strings(regions) + a.queryLevelStats.Regions = regions for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode { a.queryLevelStats.NetworkBytesSent += bytesSentByNode } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index c08be75e8a5f..dd670908bbe0 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -231,16 +231,18 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { expected := execstats.QueryLevelStats{ KVTime: cumulativeKVTime, ContentionTime: cumulativeContentionTime, + Regions: []string{}, + SqlInstanceIds: make(map[base.SQLInstanceID]struct{}), } assert.NoError(t, a.ProcessStats()) - if got := a.GetQueryLevelStats(); !reflect.DeepEqual(got, expected) { - t.Errorf("ProcessStats() = %v, want %v", got, expected) - } + require.Equal(t, a.GetQueryLevelStats(), expected) } func TestQueryLevelStatsAccumulate(t *testing.T) { aEvent := kvpb.ContentionEvent{Duration: 7 * time.Second} + aSQLInstanceIds := map[base.SQLInstanceID]struct{}{} + aSQLInstanceIds[1] = struct{}{} a := execstats.QueryLevelStats{ NetworkBytesSent: 1, MaxMemUsage: 2, @@ -268,8 +270,12 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { MvccRangeKeyCount: 21, MvccRangeKeyContainedPoints: 22, MvccRangeKeySkippedPoints: 23, + SqlInstanceIds: aSQLInstanceIds, + Regions: []string{"east-usA"}, } bEvent := kvpb.ContentionEvent{Duration: 14 * time.Second} + bSQLInstanceIds := map[base.SQLInstanceID]struct{}{} + bSQLInstanceIds[2] = struct{}{} b := execstats.QueryLevelStats{ NetworkBytesSent: 8, MaxMemUsage: 9, @@ -297,7 +303,12 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { MvccRangeKeyCount: 28, MvccRangeKeyContainedPoints: 29, MvccRangeKeySkippedPoints: 30, + SqlInstanceIds: bSQLInstanceIds, + Regions: []string{"east-usB"}, } + cSQLInstanceIds := map[base.SQLInstanceID]struct{}{} + cSQLInstanceIds[1] = struct{}{} + cSQLInstanceIds[2] = struct{}{} expected := execstats.QueryLevelStats{ NetworkBytesSent: 9, MaxMemUsage: 9, @@ -325,9 +336,14 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { MvccRangeKeyCount: 49, MvccRangeKeyContainedPoints: 51, MvccRangeKeySkippedPoints: 53, + SqlInstanceIds: cSQLInstanceIds, + Regions: []string{"east-usA", "east-usB"}, } aCopy := a + // Copy will point to the same array. + aCopy.SqlInstanceIds = map[base.SQLInstanceID]struct{}{} + cSQLInstanceIds[1] = struct{}{} a.Accumulate(b) require.Equal(t, expected, a) diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index d5bcb703f968..8fa0830675df 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -12,19 +12,16 @@ package sql import ( "context" - "strconv" + "sort" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" ) // EngineMetrics groups a set of SQL metrics. @@ -178,18 +175,16 @@ func (ex *connExecutor) recordStatementSummary( idxRecommendations := idxrecommendations.FormatIdxRecommendations(planner.instrumentation.indexRecs) queryLevelStats, queryLevelStatsOk := planner.instrumentation.GetQueryLevelStats() - // We only have node information when it was collected with trace, but we know at least the current - // node should be on the list. - nodeID, err := strconv.ParseInt(ex.server.sqlStats.GetSQLInstanceID().String(), 10, 64) - if err != nil { - log.Warningf(ctx, "failed to convert node ID to int: %s", err) - } - - nodes := util.CombineUnique(getNodesFromPlanner(planner), []int64{nodeID}) + var sqlInstanceIds []int64 + if queryLevelStatsOk { + sqlInstanceIds = make([]int64, 0, len(queryLevelStats.SqlInstanceIds)) + for sqlInstanceId := range queryLevelStats.SqlInstanceIds { + sqlInstanceIds = append(sqlInstanceIds, int64(sqlInstanceId)) + } - regions := []string{} - if region, ok := ex.server.cfg.Locality.Find("region"); ok { - regions = append(regions, region) + sort.Slice(sqlInstanceIds, func(i, j int) bool { + return sqlInstanceIds[i] < sqlInstanceIds[j] + }) } recordedStmtStats := sqlstats.RecordedStmtStats{ @@ -207,8 +202,7 @@ func (ex *connExecutor) recordStatementSummary( BytesRead: stats.bytesRead, RowsRead: stats.rowsRead, RowsWritten: stats.rowsWritten, - Nodes: nodes, - Regions: regions, + Nodes: sqlInstanceIds, StatementType: stmt.AST.StatementType(), Plan: planner.instrumentation.PlanForStats(ctx), PlanGist: planner.instrumentation.planGist.String(), @@ -315,16 +309,3 @@ func (ex *connExecutor) updateOptCounters(planFlags planFlags) { func shouldIncludeStmtInLatencyMetrics(stmt *Statement) bool { return stmt.AST.StatementType() == tree.TypeDML } - -func getNodesFromPlanner(planner *planner) []int64 { - // Retrieve the list of all nodes which the statement was executed on. - var nodes []int64 - if _, ok := planner.instrumentation.Tracing(); !ok { - trace := planner.instrumentation.sp.GetRecording(tracingpb.RecordingStructured) - // ForEach returns nodes in order. - execinfrapb.ExtractNodesFromSpans(trace).ForEach(func(i int) { - nodes = append(nodes, int64(i)) - }) - } - return nodes -} diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index f0dbd575cdee..109bd1782095 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -679,7 +679,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { // non-gateway nodes use the last outbox to send this information // over. f.sp.RecordStructured(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(f.NodeID.SQLInstanceID(), f.FlowCtx.ID), + Component: f.FlowCtx.FlowComponentID(), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.Mon.MaximumBytes())), MaxDiskUsage: optional.MakeUint(uint64(f.FlowCtx.DiskMonitor.MaximumBytes())), diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 8bee6c7717ff..f60305e8afd0 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -110,7 +110,7 @@ func NewOutbox( m.numOutboxes = numOutboxes m.isGatewayNode = isGatewayNode m.streamStats.Component = flowCtx.StreamComponentID(streamID) - m.flowStats.Component = execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID) + m.flowStats.Component = flowCtx.FlowComponentID() return m } diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 77e3f9fb5654..cf2646ed65c0 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/grunning" @@ -140,9 +139,6 @@ type instrumentationHelper struct { traceMetadata execNodeTraceMetadata - // regions used only on EXPLAIN ANALYZE to be displayed as top-level stat. - regions []string - // planGist is a compressed version of plan that can be converted (lossily) // back into a logical plan or be used to get a plan hash. planGist explain.PlanGist @@ -571,6 +567,10 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( ob.AddMaxMemUsage(queryStats.MaxMemUsage) ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent) ob.AddMaxDiskUsage(queryStats.MaxDiskUsage) + if len(queryStats.Regions) > 0 { + ob.AddRegionsStats(queryStats.Regions) + } + if !ih.containsMutation && ih.vectorized && grunning.Supported() { // Currently we cannot separate SQL CPU time from local KV CPU time for // mutations, since they do not collect statistics. Additionally, CPU time @@ -588,10 +588,6 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( } } - if len(ih.regions) > 0 { - ob.AddRegionsStats(ih.regions) - } - if err := emitExplain(ob, ih.evalCtx, ih.codec, ih.explainPlan); err != nil { ob.AddTopLevelField("error emitting plan", fmt.Sprint(err)) } @@ -679,22 +675,16 @@ func (m execNodeTraceMetadata) associateNodeWithComponents( // annotateExplain aggregates the statistics in the trace and annotates // explain.Nodes with execution stats. -// It returns a list of all regions on which any of the statements -// where executed on. func (m execNodeTraceMetadata) annotateExplain( plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, p *planner, -) []string { +) { statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic) - var allRegions []string // Retrieve which region each node is on. regionsInfo := make(map[int64]string) - descriptors, _ := getAllNodeDescriptors(p) - for _, descriptor := range descriptors { - for _, tier := range descriptor.Locality.Tiers { - if tier.Key == "region" { - regionsInfo[int64(descriptor.NodeID)] = tier.Value - } + for componentId := range statsMap { + if componentId.Region != "" { + regionsInfo[int64(componentId.SQLInstanceID)] = componentId.Region } } @@ -762,7 +752,6 @@ func (m execNodeTraceMetadata) annotateExplain( } sort.Strings(regions) nodeStats.Regions = regions - allRegions = util.CombineUnique(allRegions, regions) n.Annotate(exec.ExecutionStatsID, &nodeStats) } } @@ -779,8 +768,6 @@ func (m execNodeTraceMetadata) annotateExplain( for i := range plan.Checks { walk(plan.Checks[i]) } - - return allRegions } // SetIndexRecommendations checks if we should generate a new index recommendation. diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_misc b/pkg/sql/opt/exec/execbuilder/testdata/distsql_misc index 99337888a4c4..091b9a5e78ec 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_misc +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_misc @@ -108,6 +108,7 @@ vectorized: rows decoded from KV: 1,000 (7.8 KiB, 2,000 KVs, 1,000 gRPC calls) maximum memory usage: network usage: +regions: · • create statistics · diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 57e4c7094f0a..9751379a8466 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -137,7 +137,9 @@ func (s *Container) RecordStatement( stats.mu.data.RowsWritten.Record(stats.mu.data.Count, float64(value.RowsWritten)) stats.mu.data.LastExecTimestamp = s.getTimeNow() stats.mu.data.Nodes = util.CombineUnique(stats.mu.data.Nodes, value.Nodes) - stats.mu.data.Regions = util.CombineUnique(stats.mu.data.Regions, value.Regions) + if value.ExecStats != nil { + stats.mu.data.Regions = util.CombineUnique(stats.mu.data.Regions, value.ExecStats.Regions) + } stats.mu.data.PlanGists = util.CombineUnique(stats.mu.data.PlanGists, []string{value.PlanGist}) stats.mu.data.IndexRecommendations = value.IndexRecommendations stats.mu.data.Indexes = util.CombineUnique(stats.mu.data.Indexes, value.Indexes) diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 19ff5c12e00f..f375b3335ac1 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -209,7 +209,6 @@ type RecordedStmtStats struct { RowsRead int64 RowsWritten int64 Nodes []int64 - Regions []string StatementType tree.StatementType Plan *appstatspb.ExplainTreePlanNode PlanGist string