Skip to content

Commit

Permalink
Merge #106587
Browse files Browse the repository at this point in the history
106587: sql: fix StatementStatistics.Nodes list r=j82w a=j82w

1. Fixes the nodes list to show all the nodes in StatementStatistics.Nodes.
2. The node list is now empty if tracing is disabled. Previously it would always include the current gateway node id, but it would be missing all the other nodes. This causes confusion because it's uncertain whether the node list is complete or not.
3. Fixes regions on `EXPLAIN ANALYSE (DISTSQL)` to show regions information on secondary tenenants. It was not shown before because only system tenants have acces to gossip which is used under the covers to get the node descriptors.
4. Fixes the performance issues previously listed in #102170.
5. Fixes the test to actually validate the nodes list.

The fix was done by adding the region name to the Flow ComponentID. This means the region name is now part of the traces for the Flow ComponentID, so it no longer needs figure out the region. It gets the region information from the same trace the SQL Instance ID is obtained. Moving the collection to the QueryLevelStats avoids iterating the traces multiple times.

Fixes: #102170, fixes: #96647, fixes: #91219;
Epic: none
Release note (sql change): Fixes the StatementStatistics.Nodes to
 contain all the nodes involved in the query. Adds region info to
 `EXPLAIN ANALYSE (DISTSQL)` for seconary tenants.

Co-authored-by: j82w <[email protected]>
  • Loading branch information
craig[bot] and j82w committed Jul 12, 2023
2 parents 7339e0f + 44e6054 commit 269b9e3
Show file tree
Hide file tree
Showing 18 changed files with 229 additions and 113 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/testccl/sqlstatsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/appstatspb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
174 changes: 134 additions & 40 deletions pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ import (
gosql "database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSQLStatsRegions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "test is to slow for race")
skip.UnderStress(t, "test is too heavy to run under stress")

// We build a small multiregion cluster, with the proper settings for
Expand All @@ -44,30 +51,78 @@ func TestSQLStatsRegions(t *testing.T) {
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &st.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &st.SV, true)

numServers := 9
numServers := 3
regionNames := []string{
"gcp-us-west1",
"gcp-us-central1",
"gcp-us-east1",
}

serverArgs := make(map[int]base.TestServerArgs)
signalAfter := make([]chan struct{}, numServers)
for i := 0; i < numServers; i++ {
serverArgs[i] = base.TestServerArgs{
signalAfter[i] = make(chan struct{})
args := base.TestServerArgs{
Settings: st,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i%len(regionNames)]}},
},
// We'll start our own test tenant manually below.
DefaultTestTenant: base.TestTenantDisabled,
}

serverKnobs := &server.TestingKnobs{
SignalAfterGettingRPCAddress: signalAfter[i],
}

args.Knobs.Server = serverKnobs
serverArgs[i] = args
}

host := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ParallelStart: true,
})
defer host.Stopper().Stop(ctx)

go func() {
for _, c := range signalAfter {
<-c
}
}()

tdb := sqlutils.MakeSQLRunner(host.ServerConn(1))

// Shorten the closed timestamp target duration so that span configs
// propagate more rapidly.
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`)
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off")
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'")
// Lengthen the lead time for the global tables to prevent overload from
// resulting in delays in propagating closed timestamps and, ultimately
// forcing requests from being redirected to the leaseholder. Without this
// change, the test sometimes is flakey because the latency budget allocated
// to closed timestamp propagation proves to be insufficient. This value is
// very cautious, and makes this already slow test even slower.
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'")
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`)
tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`)

tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true")
tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.multi_region.allow_abstractions_for_secondary_tenants.enabled = true")
tdb.Exec(t, `ALTER RANGE meta configure zone using constraints = '{"+region=gcp-us-west1": 1, "+region=gcp-us-central1": 1, "+region=gcp-us-east1": 1}';`)

// Create secondary tenants
var tenantDbs []*gosql.DB
for _, server := range host.Servers {
_, tenantDb := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
tenantDbs = append(tenantDbs, tenantDb)
}

testCases := []struct {
name string
db func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner
Expand All @@ -84,24 +139,16 @@ func TestSQLStatsRegions(t *testing.T) {
// connection to the first one.
name: "secondary tenant",
db: func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner {
var dbs []*gosql.DB
for _, server := range host.Servers {
_, db := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
dbs = append(dbs, db)
}
return sqlutils.MakeSQLRunner(dbs[0])
return sqlutils.MakeSQLRunner(tenantDbs[1])
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
db := tc.db(t, host, st)

// Create a multi-region database.
db.Exec(t, "SET enable_multiregion_placement_policy = true")
db.Exec(t, `SET CLUSTER SETTING sql.txn_stats.sample_rate = 1;`)

db.Exec(t, fmt.Sprintf(`CREATE DATABASE testdb PRIMARY REGION "%s" PLACEMENT RESTRICTED`, regionNames[0]))
for i := 1; i < len(regionNames); i++ {
db.Exec(t, fmt.Sprintf(`ALTER DATABASE testdb ADD region "%s"`, regionNames[i]))
Expand All @@ -113,39 +160,86 @@ func TestSQLStatsRegions(t *testing.T) {

// Add some data to each region.
for i, regionName := range regionNames {
db.Exec(t, "INSERT INTO test (crdb_region, a) VALUES ($1, $2)", regionName, i)
db.Exec(t, "INSERT INTO test (a, crdb_region) VALUES ($1, $2)", i, regionName)
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
// It takes a while for the region replication to complete.
testutils.SucceedsWithin(t, func() error {
var expectedNodes []int64
var expectedRegions []string
_, err := db.DB.ExecContext(ctx, `USE testdb`)
if err != nil {
return err
}

// Use EXPLAIN ANALYSE (DISTSQL) to get the accurate list of nodes.
explainInfo, err := db.DB.QueryContext(ctx, `EXPLAIN ANALYSE (DISTSQL) SELECT * FROM test`)
if err != nil {
return err
}
for explainInfo.Next() {
var explainStr string
if err := explainInfo.Scan(&explainStr); err != nil {
t.Fatal(err)
}

explainStr = strings.ReplaceAll(explainStr, " ", "")
// Example str " regions: cp-us-central1,gcp-us-east1,gcp-us-west1"
if strings.HasPrefix(explainStr, "regions:") {
explainStr = strings.ReplaceAll(explainStr, "regions:", "")
explainStr = strings.ReplaceAll(explainStr, " ", "")
expectedRegions = strings.Split(explainStr, ",")
if len(expectedRegions) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", expectedRegions)
}
}

// Example str " nodes: n1, n2, n4, n9"
if strings.HasPrefix(explainStr, "nodes:") {
explainStr = strings.ReplaceAll(explainStr, "nodes:", "")
explainStr = strings.ReplaceAll(explainStr, "n", "")

split := strings.Split(explainStr, ",")
if len(split) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", split)
}

// Gateway node was not included in the explain plan. Add it to the list
if split[0] != "1" {
expectedNodes = append(expectedNodes, int64(1))
}

for _, val := range split {
node, err := strconv.Atoi(val)
require.NoError(t, err)
expectedNodes = append(expectedNodes, int64(node))
}
}
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
SELECT statistics->>'statistics'
FROM crdb_internal.statement_statistics
WHERE app_name = $1`, t.Name())

var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err := json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

require.Equal(t,
appstatspb.StatementStatistics{
// TODO(todd): It appears we do not yet reliably record
// the nodes for the statement. (I have manually verified
// that the above query does indeed fan out across the
// regions, via EXPLAIN (DISTSQL).) Filed as #96647.
//Nodes: []int64{1, 2, 3},
//Regions: regionNames,
Nodes: []int64{1},
Regions: []string{regionNames[0]},
},
appstatspb.StatementStatistics{
Nodes: actual.Nodes,
Regions: actual.Regions,
},
)
var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err = json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

// Replication to all regions can take some time to complete. During
// this time a incomplete list will be returned.
if !assert.ObjectsAreEqual(expectedNodes, actual.Nodes) {
return fmt.Errorf("nodes are not equal. Expected: %d, Actual: %d", expectedNodes, actual.Nodes)
}

require.Equal(t, expectedRegions, actual.Regions)
return nil
}, 3*time.Minute)
})
}
}
9 changes: 3 additions & 6 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -467,9 +466,7 @@ func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector(
// statistics that the outbox is responsible for, nil is returned if stats are
// not being collected.
func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
flowCtx *execinfra.FlowCtx,
statsCollectors []colexecop.VectorizedStatsCollector,
originSQLInstanceID base.SQLInstanceID,
flowCtx *execinfra.FlowCtx, statsCollectors []colexecop.VectorizedStatsCollector,
) func(context.Context) []*execinfrapb.ComponentStats {
if !s.recordingStats {
return nil
Expand All @@ -489,7 +486,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
// whole flow from parent monitors. These stats are added to a
// flow-level span.
result = append(result, &execinfrapb.ComponentStats{
Component: execinfrapb.FlowComponentID(originSQLInstanceID, flowCtx.ID),
Component: flowCtx.FlowComponentID(),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.Mon.MaximumBytes())),
MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())),
Expand Down Expand Up @@ -1069,7 +1066,7 @@ func (s *vectorizedFlowCreator) setupOutput(
// Set up an Outbox.
outbox, err := s.setupRemoteOutputStream(
ctx, flowCtx, pspec.ProcessorID, opWithMetaInfo, opOutputTypes, outputStream, factory,
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors, outputStream.OriginNodeID),
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors),
)
if err != nil {
return err
Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,15 +1691,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "populate query level stats and regions",
f: func() {
populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
},
})
} else {
populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
stmtFingerprintID = ex.recordStatementSummary(
ctx, planner,
int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
Expand Down Expand Up @@ -1747,12 +1747,11 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStatsAndRegions collects query-level execution statistics
// populateQueryLevelStats collects query-level execution statistics
// and populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata. It also populates the regions field and
// annotates the explainPlan field of the instrumentationHelper.
func populateQueryLevelStatsAndRegions(
// and the plan's flow metadata.
func populateQueryLevelStats(
ctx context.Context,
p *planner,
cfg *ExecutorConfig,
Expand Down Expand Up @@ -1794,7 +1793,7 @@ func populateQueryLevelStatsAndRegions(
}
}
if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.regions = ih.traceMetadata.annotateExplain(
ih.traceMetadata.annotateExplain(
ih.explainPlan,
trace,
cfg.TestingKnobs.DeterministicExplain,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (p *planner) maybeLogStatementInternal(
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(),
Regions: p.curPlan.instrumentation.regions,
Regions: queryLevelStats.Regions,
NetworkBytesSent: queryLevelStats.NetworkBytesSent,
MaxMemUsage: queryLevelStats.MaxMemUsage,
MaxDiskUsage: queryLevelStats.MaxDiskUsage,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,9 @@ func (flowCtx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.Component
func (flowCtx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID {
return execinfrapb.StreamComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, streamID)
}

// FlowComponentID returns a ComponentID for the given flow.
func (flowCtx *FlowCtx) FlowComponentID() execinfrapb.ComponentID {
region, _ := flowCtx.Cfg.Locality.Find("region")
return execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, region)
}
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func StreamComponentID(
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID {
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID, region string) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
SQLInstanceID: instanceID,
Region: region,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ message ComponentID {
(gogoproto.nullable) = false,
(gogoproto.customname) = "SQLInstanceID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"];

// The region the component is associated with.
// For the initial implementation only ComponentIDs of Flow type might have this set.
optional string region = 5 [(gogoproto.nullable) = false];
}

// ComponentStats contains statistics for an execution component. A component is
Expand Down
Loading

0 comments on commit 269b9e3

Please sign in to comment.