From 1a74e20cfd80726e34c55b25ecee7d673bd43c89 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 16 Feb 2021 13:08:30 -0500 Subject: [PATCH] sql: add nodes for each EXPLAIN ANALYZE operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show the cluster nodes involved in the execution of each operator. Note that this info does not show up in the non-analyze EXPLAIN. It is technically much more challenging to do that because of the indirect way we do distsql planning. Once we have the new DistSQL exec factory, we will be able to add it. To implement this, we make execution set `ComponentID.NodeID` in all cases. Unfortunately, there is not much we can do to test this currently (other than manual testing). I will investigate making the "deterministic" flag more fine-grained, so that we can hide truly non-deterministic values (like timings) separately from those that just vary with the configuration (query distribution). Example: ``` movr> EXPLAIN ANALYZE SELECT * FROM rides JOIN vehicles ON rides.vehicle_id = vehicles.id; info -------------------------------------------- planning time: 158µs execution time: 7ms distribution: full vectorized: true hash join │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ equality: (vehicle_id) = (id) │ ├── scan │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ KV rows read: 500 │ KV bytes read: 86 KiB │ missing stats │ table: rides@primary │ spans: FULL SCAN │ └── scan cluster nodes: n1, n2, n3 actual row count: 15 KV rows read: 15 KV bytes read: 2.3 KiB missing stats table: vehicles@primary spans: FULL SCAN ``` Release note (sql change): EXPLAIN ANALYZE now includes the nodes which were involved in the execution of each operator in the tree. --- pkg/sql/colflow/vectorized_flow.go | 7 +++- pkg/sql/distsql/inbound_test.go | 1 + pkg/sql/distsql_physical_planner.go | 5 ++- pkg/sql/execinfra/flow_context.go | 10 ++++- pkg/sql/execinfrapb/component_stats.go | 6 ++- pkg/sql/execinfrapb/component_stats.pb.go | 30 +++++++------- pkg/sql/execinfrapb/component_stats.proto | 6 +-- pkg/sql/execinfrapb/flow_diagram.go | 41 +++++++++++-------- pkg/sql/execstats/traceanalyzer.go | 2 + pkg/sql/execstats/traceanalyzer_test.go | 12 ++++-- pkg/sql/execstats/utils_test.go | 16 +++----- pkg/sql/flowinfra/outbox_test.go | 8 ++++ pkg/sql/instrumentation.go | 20 +++++++-- .../testdata/logic_test/dist_vectorize | 5 +++ .../testdata/logic_test/explain_analyze | 5 +++ .../testdata/logic_test/explain_analyze_plans | 24 +++++++++++ .../logic_test/inverted_index_geospatial | 15 +++++++ .../logic_test/show_create_all_tables | 1 - .../testdata/logic_test/vectorize_local | 6 +++ pkg/sql/opt/exec/explain/emit.go | 3 ++ pkg/sql/opt/exec/explain/output.go | 9 ++++ pkg/sql/opt/exec/factory.go | 3 ++ 22 files changed, 172 insertions(+), 63 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1c4a872af752..5dd4d32f3817 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -831,9 +831,12 @@ func (s *vectorizedFlowCreator) setupInput( s.addStreamEndpoint(inputStream.StreamID, inbox, s.waitGroup) op := colexecbase.Operator(inbox) if s.recordingStats { - op, err = s.wrapWithNetworkVectorizedStatsCollector( - inbox, flowCtx.StreamComponentID(inputStream.StreamID), latency, + // Note: we can't use flowCtx.StreamComponentID because the stream does + // not originate from this node (we are the target node). + compID := execinfrapb.StreamComponentID( + inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID, ) + op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency) if err != nil { return nil, nil, nil, err } diff --git a/pkg/sql/distsql/inbound_test.go b/pkg/sql/distsql/inbound_test.go index 412194b224e2..40cd60b5e90f 100644 --- a/pkg/sql/distsql/inbound_test.go +++ b/pkg/sql/distsql/inbound_test.go @@ -92,6 +92,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, staticAddressResolver(ln.Addr())), Stopper: outboxStopper, }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(1) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 83b1b8fc75e8..c0236a436a28 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2724,10 +2724,11 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if planCtx.traceMetadata != nil { processors := make(execComponents, len(plan.ResultRouters)) - for i := range plan.ResultRouters { + for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( + plan.Processors[resultProcIdx].Node, execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, - int32(plan.ResultRouters[i]), + int32(resultProcIdx), ) } planCtx.traceMetadata.associateNodeWithComponents(node, processors) diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 4a1eaebac855..b144768751eb 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -108,10 +109,15 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec { // ProcessorComponentID returns a ComponentID for the given processor in this // flow. func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID { - return execinfrapb.ProcessorComponentID(ctx.ID, procID) + // TODO(radu): the component stats should store SQLInstanceID instead. + nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) + return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID) } // StreamComponentID returns a ComponentID for the given stream in this flow. +// The stream must originate from the node associated with this FlowCtx. func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { - return execinfrapb.StreamComponentID(ctx.ID, streamID) + // TODO(radu): the component stats should store SQLInstanceID instead. + originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) + return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID) } diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 4f7b554d5a5e..6e8f8a5b3bd3 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -26,20 +26,22 @@ import ( ) // ProcessorComponentID returns a ComponentID for the given processor in a flow. -func ProcessorComponentID(flowID FlowID, processorID int32) ComponentID { +func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID { return ComponentID{ FlowID: flowID, Type: ComponentID_PROCESSOR, ID: processorID, + NodeID: nodeID, } } // StreamComponentID returns a ComponentID for the given stream in a flow. -func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID { +func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID { return ComponentID{ FlowID: flowID, Type: ComponentID_STREAM, ID: int32(streamID), + NodeID: originNodeID, } } diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index 5ae32c27e5b3..3ac9af753875 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -69,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0, 0} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -81,10 +81,8 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. + // NodeID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` } @@ -92,7 +90,7 @@ func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -139,7 +137,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{1} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -176,7 +174,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{2} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -220,7 +218,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{3} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +256,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{4} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -299,7 +297,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{5} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -338,7 +336,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{6} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -375,7 +373,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{7} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -409,7 +407,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{8} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2545,10 +2543,10 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_1e4b47cb6b511195) + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_e22770c51d19c14e) } -var fileDescriptor_component_stats_1e4b47cb6b511195 = []byte{ +var fileDescriptor_component_stats_e22770c51d19c14e = []byte{ // 970 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index de63f1f2d7dd..a02f9f920403 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -48,10 +48,8 @@ message ComponentID { optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. + // NodeID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. optional int32 node_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 0e96f410d4cc..8ef06d90753f 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -575,8 +575,9 @@ type diagramData struct { Processors []diagramProcessor `json:"processors"` Edges []diagramEdge `json:"edges"` - flags DiagramFlags - flowID FlowID + flags DiagramFlags + flowID FlowID + nodeIDs []roachpb.NodeID } var _ FlowDiagram = &diagramData{} @@ -595,27 +596,36 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) { statsMap := ExtractStatsFromSpans(spans, d.flags.MakeDeterministic) for i := range d.Processors { p := &d.Processors[i] - component := ProcessorComponentID(d.flowID, p.processorID) + nodeID := d.nodeIDs[p.NodeIdx] + component := ProcessorComponentID(nodeID, d.flowID, p.processorID) if compStats := statsMap[component]; compStats != nil { p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...) } } for i := range d.Edges { - component := StreamComponentID(d.flowID, d.Edges[i].streamID) + originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx] + component := StreamComponentID(originNodeID, d.flowID, d.Edges[i].streamID) if compStats := statsMap[component]; compStats != nil { d.Edges[i].Stats = compStats.StatsForQueryPlan() } } } +// generateDiagramData generates the diagram data, given a list of flows (one +// per node). The nodeIDs list corresponds 1-1 to the flows list. func generateDiagramData( - sql string, flows []FlowSpec, nodeNames []string, flags DiagramFlags, + sql string, flows []FlowSpec, nodeIDs []roachpb.NodeID, flags DiagramFlags, ) (FlowDiagram, error) { d := &diagramData{ - SQL: sql, - NodeNames: nodeNames, - flags: flags, + SQL: sql, + nodeIDs: nodeIDs, + flags: flags, } + d.NodeNames = make([]string, len(nodeIDs)) + for i := range d.NodeNames { + d.NodeNames[i] = nodeIDs[i].String() + } + if len(flows) > 0 { d.flowID = flows[0].FlowID for i := 1; i < len(flows); i++ { @@ -746,21 +756,20 @@ func GeneratePlanDiagram( ) (FlowDiagram, error) { // We sort the flows by node because we want the diagram data to be // deterministic. - nodeIDs := make([]int, 0, len(flows)) + nodeIDs := make([]roachpb.NodeID, 0, len(flows)) for n := range flows { - nodeIDs = append(nodeIDs, int(n)) + nodeIDs = append(nodeIDs, n) } - sort.Ints(nodeIDs) + sort.Slice(nodeIDs, func(i, j int) bool { + return nodeIDs[i] < nodeIDs[j] + }) flowSlice := make([]FlowSpec, len(nodeIDs)) - nodeNames := make([]string, len(nodeIDs)) - for i, nVal := range nodeIDs { - n := roachpb.NodeID(nVal) + for i, n := range nodeIDs { flowSlice[i] = *flows[n] - nodeNames[i] = n.String() } - return generateDiagramData(sql, flowSlice, nodeNames, flags) + return generateDiagramData(sql, flowSlice, nodeIDs, flags) } // GeneratePlanDiagramURL generates the json data for a flow diagram and a diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 232415de34d5..b927c477c7d0 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -20,11 +20,13 @@ import ( ) type processorStats struct { + // TODO(radu): this field redundant with stats.Component.NodeID. nodeID roachpb.NodeID stats *execinfrapb.ComponentStats } type streamStats struct { + // TODO(radu): this field redundant with stats.Component.NodeID. originNodeID roachpb.NodeID destinationNodeID roachpb.NodeID stats *execinfrapb.ComponentStats diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 758ead28c526..4480a1db1ee8 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -194,10 +194,12 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} + n1 := roachpb.NodeID(1) + n2 := roachpb.NodeID(2) a.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -209,9 +211,9 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { ) a.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), @@ -289,10 +291,12 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { // Artificially inject component stats directly into the FlowsMetadata (in // the non-testing setting the stats come from the trace). var f1, f2 execstats.FlowsMetadata + n1 := roachpb.NodeID(1) + n2 := roachpb.NodeID(2) f1.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -302,9 +306,9 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { }, ) f2.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index 9519591726f8..d25fd92c3870 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -17,21 +17,17 @@ import ( // AddComponentStats modifies TraceAnalyzer internal state to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (a *TraceAnalyzer) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { - a.FlowsMetadata.AddComponentStats(nodeID, stats) +func (a *TraceAnalyzer) AddComponentStats(stats *execinfrapb.ComponentStats) { + a.FlowsMetadata.AddComponentStats(stats) } // AddComponentStats modifies FlowsMetadata to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (m *FlowsMetadata) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { +func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: processorStat := &processorStats{ - nodeID: nodeID, + nodeID: stats.Component.NodeID, stats: stats, } if m.processorStats == nil { @@ -40,7 +36,7 @@ func (m *FlowsMetadata) AddComponentStats( m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ - originNodeID: nodeID, + originNodeID: stats.Component.NodeID, stats: stats, } if m.streamStats == nil { @@ -53,6 +49,6 @@ func (m *FlowsMetadata) AddComponentStats( if m.flowStats == nil { m.flowStats = make(map[roachpb.NodeID]*flowStats) } - m.flowStats[nodeID] = flowStat + m.flowStats[stats.Component.NodeID] = flowStat } } diff --git a/pkg/sql/flowinfra/outbox_test.go b/pkg/sql/flowinfra/outbox_test.go index 944678aefa4e..4b0874bc2214 100644 --- a/pkg/sql/flowinfra/outbox_test.go +++ b/pkg/sql/flowinfra/outbox_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -71,6 +72,7 @@ func TestOutbox(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -227,6 +229,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -301,6 +304,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -376,6 +380,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), Stopper: stopper, }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), }) outbox.Init(rowenc.OneIntCol) // In a RunSyncFlow call, the outbox runs under the call's context. @@ -437,6 +442,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -490,6 +496,7 @@ func TestOutboxUnblocksProducers(t *testing.T) { // a nil nodeDialer will always fail to connect. NodeDialer: nil, }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -560,6 +567,7 @@ func BenchmarkOutbox(b *testing.B) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.NewSQLIDContainer(1, nil /* nodeID */), } outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.MakeIntCols(numCols)) diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 31fd858a01a2..27a60b70d9dc 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -224,7 +224,12 @@ func (ih *instrumentationHelper) Finish( } if ih.traceMetadata != nil && ih.explainPlan != nil { - ih.traceMetadata.annotateExplain(ih.explainPlan, trace, cfg.TestingKnobs.DeterministicExplainAnalyze) + ih.traceMetadata.annotateExplain( + ih.explainPlan, + p.curPlan.distSQLFlowInfos, + trace, + cfg.TestingKnobs.DeterministicExplainAnalyze, + ) } // TODO(radu): this should be unified with other stmt stats accesses. @@ -480,7 +485,7 @@ func (m execNodeTraceMetadata) associateNodeWithComponents( // annotateExplain aggregates the statistics in the trace and annotates // explain.Nodes with execution stats. func (m execNodeTraceMetadata) annotateExplain( - plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, + plan *explain.Plan, flowInfos []flowInfo, spans []tracingpb.RecordedSpan, makeDeterministic bool, ) { statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic) @@ -491,8 +496,12 @@ func (m execNodeTraceMetadata) annotateExplain( var nodeStats exec.ExecutionStats incomplete := false - for i := range components { - stats := statsMap[components[i]] + var nodes util.FastIntSet + for _, c := range components { + if c.Type == execinfrapb.ComponentID_PROCESSOR { + nodes.Add(int(c.NodeID)) + } + stats := statsMap[c] if stats == nil { incomplete = true break @@ -505,6 +514,9 @@ func (m execNodeTraceMetadata) annotateExplain( // incomplete results. In the future, we may consider an incomplete flag // if we want to show them with a warning. if !incomplete { + for i, ok := nodes.Next(0); ok; i, ok = nodes.Next(i + 1) { + nodeStats.Nodes = append(nodeStats.Nodes, fmt.Sprintf("n%d", i)) + } n.Annotate(exec.ExecutionStatsID, &nodeStats) } } diff --git a/pkg/sql/logictest/testdata/logic_test/dist_vectorize b/pkg/sql/logictest/testdata/logic_test/dist_vectorize index ce2244803a9b..9aebf8aa4b79 100644 --- a/pkg/sql/logictest/testdata/logic_test/dist_vectorize +++ b/pkg/sql/logictest/testdata/logic_test/dist_vectorize @@ -57,9 +57,11 @@ distribution: vectorized: · • group (scalar) +│ cluster nodes: │ actual row count: 1 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -80,12 +82,14 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -94,6 +98,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze b/pkg/sql/logictest/testdata/logic_test/explain_analyze index e7275d1aecd3..2c3218f7f6ae 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze @@ -15,6 +15,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -36,6 +37,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B @@ -59,6 +61,7 @@ vectorized: · • hash join (inner) │ columns: (k, v, a, b) +│ cluster nodes: │ actual row count: 2 │ estimated row count: 990 (missing stats) │ equality: (v) = (a) @@ -66,6 +69,7 @@ vectorized: │ ├── • scan │ columns: (k, v) +│ cluster nodes: │ actual row count: 4 │ KV rows read: 4 │ KV bytes read: 32 B @@ -75,6 +79,7 @@ vectorized: │ └── • scan columns: (a, b) + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans index b937a454fa79..8bd1be1c73f2 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans @@ -66,17 +66,20 @@ distribution: vectorized: · • group +│ cluster nodes: │ actual row count: 5 │ group by: k │ ordered: +k │ └── • merge join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -85,6 +88,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -106,19 +110,23 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 5 │ order: +w │ └── • distinct + │ cluster nodes: │ actual row count: 5 │ distinct on: w │ └── • hash join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (w) │ left cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -127,6 +135,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -148,12 +157,15 @@ distribution: vectorized: · • cross join +│ cluster nodes: │ actual row count: 25 │ ├── • ordinality +│ │ cluster nodes: │ │ actual row count: 5 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -162,9 +174,11 @@ vectorized: │ spans: FULL SCAN │ └── • ordinality + │ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -206,9 +220,11 @@ distribution: vectorized: · • window +│ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -231,6 +247,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -259,6 +276,7 @@ vectorized: • root │ ├── • insert +│ │ cluster nodes: │ │ actual row count: 1 │ │ into: child(c, p) │ │ @@ -274,9 +292,11 @@ vectorized: │ │ exec mode: one row │ │ │ └── • group (scalar) +│ │ cluster nodes: │ │ actual row count: 1 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 1 │ KV rows read: 1 │ KV bytes read: 8 B @@ -288,9 +308,11 @@ vectorized: └── • constraint-check │ └── • error if rows + │ cluster nodes: │ actual row count: 0 │ └── • lookup join (anti) + │ cluster nodes: │ actual row count: 0 │ KV rows read: 1 │ KV bytes read: 8 B @@ -299,10 +321,12 @@ vectorized: │ equality cols are key │ └── • filter + │ cluster nodes: │ actual row count: 1 │ filter: column2 IS NOT NULL │ └── • scan buffer + cluster nodes: actual row count: 1 label: buffer 1 · diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial index 1cfe1f353c69..21fabaa3b6a4 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial @@ -34,25 +34,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 4 KV bytes read: 32 B @@ -104,25 +109,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -150,25 +160,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables index edcd216cbc54..367741095fb8 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables +++ b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables @@ -414,4 +414,3 @@ SHOW CREATE ALL TABLES ---- create_statement CREATE SEQUENCE public.s1 MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 123 START 1; - diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_local b/pkg/sql/logictest/testdata/logic_test/vectorize_local index a2e42992156f..c83f3026a0bf 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_local +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_local @@ -44,6 +44,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 2,001 KV rows read: 2,001 KV bytes read: 16 KiB @@ -64,6 +65,7 @@ distribution: vectorized: · • lookup join +│ cluster nodes: │ actual row count: 2 │ KV rows read: 1 │ KV bytes read: 8 B @@ -71,6 +73,7 @@ vectorized: │ equality: (b) = (b) │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -91,10 +94,12 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 2 │ equality: (a) = (b) │ ├── • scan +│ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B @@ -103,6 +108,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index 9596e2ee4d00..bd44a7d2fa1f 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -331,6 +331,9 @@ func (e *emitter) joinNodeName(algo string, joinType descpb.JoinType) string { func (e *emitter) emitNodeAttributes(n *Node) error { if stats, ok := n.annotations[exec.ExecutionStatsID]; ok { s := stats.(*exec.ExecutionStats) + if len(s.Nodes) > 0 { + e.ob.AddNonDeterministicField("cluster nodes", strings.Join(s.Nodes, ", ")) + } if s.RowCount.HasValue() { e.ob.AddField("actual row count", humanizeutil.Count(s.RowCount.Value())) } diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index 86a2c406f2f3..4a13417b9cdb 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -105,6 +105,15 @@ func (ob *OutputBuilder) AddField(key, value string) { ob.entries = append(ob.entries, entry{field: key, fieldVal: value}) } +// AddNonDeterministicField adds an information field under the current node, +// but hides the information if the MakeDeterministic flag is set. +func (ob *OutputBuilder) AddNonDeterministicField(key, value string) { + if ob.flags.MakeDeterministic { + value = "" + } + ob.AddField(key, value) +} + // Attr adds an information field under the current node. func (ob *OutputBuilder) Attr(key string, value interface{}) { ob.AddField(key, fmt.Sprint(value)) diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index d18179869a62..9be03805d27c 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -304,6 +304,9 @@ type ExecutionStats struct { KVBytesRead optional.Uint KVRowsRead optional.Uint + + // Nodes on which this operator was executed. + Nodes []string } // BuildPlanForExplainFn builds an execution plan against the given