From a24fa8675abfa4084f2789c49970205acf3bd09d Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 17 Feb 2021 14:34:51 -0500 Subject: [PATCH 1/2] sql: add nodes for each EXPLAIN ANALYZE operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show the cluster nodes involved in the execution of each operator. Note that this info does not show up in the non-analyze EXPLAIN. It is technically much more challenging to do that because of the indirect way we do distsql planning. Once we have the new DistSQL exec factory, we will be able to add it. To implement this, we make execution set `ComponentID.NodeID` in all cases. Unfortunately, there is not much we can do to test this currently (other than manual testing). I will investigate making the "deterministic" flag more fine-grained, so that we can hide truly non-deterministic values (like timings) separately from those that just vary with the configuration (query distribution). Example: ``` movr> EXPLAIN ANALYZE SELECT * FROM rides JOIN vehicles ON rides.vehicle_id = vehicles.id; info -------------------------------------------- planning time: 158µs execution time: 7ms distribution: full vectorized: true hash join │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ equality: (vehicle_id) = (id) │ ├── scan │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ KV rows read: 500 │ KV bytes read: 86 KiB │ missing stats │ table: rides@primary │ spans: FULL SCAN │ └── scan cluster nodes: n1, n2, n3 actual row count: 15 KV rows read: 15 KV bytes read: 2.3 KiB missing stats table: vehicles@primary spans: FULL SCAN ``` Release note (sql change): EXPLAIN ANALYZE now includes the nodes which were involved in the execution of each operator in the tree. --- pkg/sql/colflow/vectorized_flow.go | 7 +++- pkg/sql/distsql/inbound_test.go | 1 + pkg/sql/distsql_physical_planner.go | 5 ++- pkg/sql/execinfra/flow_context.go | 10 ++++- pkg/sql/execinfrapb/component_stats.go | 6 ++- pkg/sql/execinfrapb/component_stats.pb.go | 30 +++++++------- pkg/sql/execinfrapb/component_stats.proto | 6 +-- pkg/sql/execinfrapb/flow_diagram.go | 41 +++++++++++-------- pkg/sql/execstats/traceanalyzer.go | 2 + pkg/sql/execstats/traceanalyzer_test.go | 12 ++++-- pkg/sql/execstats/utils_test.go | 16 +++----- pkg/sql/flowinfra/outbox_test.go | 8 ++++ pkg/sql/instrumentation.go | 20 +++++++-- .../testdata/logic_test/dist_vectorize | 5 +++ .../testdata/logic_test/explain_analyze | 5 +++ .../testdata/logic_test/explain_analyze_plans | 24 +++++++++++ .../logic_test/inverted_index_geospatial | 15 +++++++ .../logic_test/show_create_all_tables | 1 - .../testdata/logic_test/vectorize_local | 6 +++ pkg/sql/opt/exec/explain/emit.go | 3 ++ pkg/sql/opt/exec/explain/output.go | 9 ++++ pkg/sql/opt/exec/factory.go | 3 ++ 22 files changed, 172 insertions(+), 63 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1c4a872af752..5dd4d32f3817 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -831,9 +831,12 @@ func (s *vectorizedFlowCreator) setupInput( s.addStreamEndpoint(inputStream.StreamID, inbox, s.waitGroup) op := colexecbase.Operator(inbox) if s.recordingStats { - op, err = s.wrapWithNetworkVectorizedStatsCollector( - inbox, flowCtx.StreamComponentID(inputStream.StreamID), latency, + // Note: we can't use flowCtx.StreamComponentID because the stream does + // not originate from this node (we are the target node). + compID := execinfrapb.StreamComponentID( + inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID, ) + op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency) if err != nil { return nil, nil, nil, err } diff --git a/pkg/sql/distsql/inbound_test.go b/pkg/sql/distsql/inbound_test.go index 412194b224e2..ace463bb8223 100644 --- a/pkg/sql/distsql/inbound_test.go +++ b/pkg/sql/distsql/inbound_test.go @@ -92,6 +92,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, staticAddressResolver(ln.Addr())), Stopper: outboxStopper, }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(1) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 926980f1e41c..6e22c1c7eef6 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2742,10 +2742,11 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if planCtx.traceMetadata != nil { processors := make(execComponents, len(plan.ResultRouters)) - for i := range plan.ResultRouters { + for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( + plan.Processors[resultProcIdx].Node, execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, - int32(plan.ResultRouters[i]), + int32(resultProcIdx), ) } planCtx.traceMetadata.associateNodeWithComponents(node, processors) diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 4a1eaebac855..b144768751eb 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -108,10 +109,15 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec { // ProcessorComponentID returns a ComponentID for the given processor in this // flow. func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID { - return execinfrapb.ProcessorComponentID(ctx.ID, procID) + // TODO(radu): the component stats should store SQLInstanceID instead. + nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) + return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID) } // StreamComponentID returns a ComponentID for the given stream in this flow. +// The stream must originate from the node associated with this FlowCtx. func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { - return execinfrapb.StreamComponentID(ctx.ID, streamID) + // TODO(radu): the component stats should store SQLInstanceID instead. + originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) + return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID) } diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 4f7b554d5a5e..6e8f8a5b3bd3 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -26,20 +26,22 @@ import ( ) // ProcessorComponentID returns a ComponentID for the given processor in a flow. -func ProcessorComponentID(flowID FlowID, processorID int32) ComponentID { +func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID { return ComponentID{ FlowID: flowID, Type: ComponentID_PROCESSOR, ID: processorID, + NodeID: nodeID, } } // StreamComponentID returns a ComponentID for the given stream in a flow. -func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID { +func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID { return ComponentID{ FlowID: flowID, Type: ComponentID_STREAM, ID: int32(streamID), + NodeID: originNodeID, } } diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index 5ae32c27e5b3..3ac9af753875 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -69,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0, 0} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -81,10 +81,8 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. + // NodeID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` } @@ -92,7 +90,7 @@ func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -139,7 +137,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{1} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -176,7 +174,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{2} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -220,7 +218,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{3} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +256,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{4} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -299,7 +297,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{5} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -338,7 +336,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{6} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -375,7 +373,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{7} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -409,7 +407,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{8} + return fileDescriptor_component_stats_e22770c51d19c14e, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2545,10 +2543,10 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_1e4b47cb6b511195) + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_e22770c51d19c14e) } -var fileDescriptor_component_stats_1e4b47cb6b511195 = []byte{ +var fileDescriptor_component_stats_e22770c51d19c14e = []byte{ // 970 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index de63f1f2d7dd..a02f9f920403 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -48,10 +48,8 @@ message ComponentID { optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. + // NodeID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. optional int32 node_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index c6a2b1026575..77d542825778 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -578,8 +578,9 @@ type diagramData struct { Processors []diagramProcessor `json:"processors"` Edges []diagramEdge `json:"edges"` - flags DiagramFlags - flowID FlowID + flags DiagramFlags + flowID FlowID + nodeIDs []roachpb.NodeID } var _ FlowDiagram = &diagramData{} @@ -598,27 +599,36 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) { statsMap := ExtractStatsFromSpans(spans, d.flags.MakeDeterministic) for i := range d.Processors { p := &d.Processors[i] - component := ProcessorComponentID(d.flowID, p.processorID) + nodeID := d.nodeIDs[p.NodeIdx] + component := ProcessorComponentID(nodeID, d.flowID, p.processorID) if compStats := statsMap[component]; compStats != nil { p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...) } } for i := range d.Edges { - component := StreamComponentID(d.flowID, d.Edges[i].streamID) + originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx] + component := StreamComponentID(originNodeID, d.flowID, d.Edges[i].streamID) if compStats := statsMap[component]; compStats != nil { d.Edges[i].Stats = compStats.StatsForQueryPlan() } } } +// generateDiagramData generates the diagram data, given a list of flows (one +// per node). The nodeIDs list corresponds 1-1 to the flows list. func generateDiagramData( - sql string, flows []FlowSpec, nodeNames []string, flags DiagramFlags, + sql string, flows []FlowSpec, nodeIDs []roachpb.NodeID, flags DiagramFlags, ) (FlowDiagram, error) { d := &diagramData{ - SQL: sql, - NodeNames: nodeNames, - flags: flags, + SQL: sql, + nodeIDs: nodeIDs, + flags: flags, } + d.NodeNames = make([]string, len(nodeIDs)) + for i := range d.NodeNames { + d.NodeNames[i] = nodeIDs[i].String() + } + if len(flows) > 0 { d.flowID = flows[0].FlowID for i := 1; i < len(flows); i++ { @@ -749,21 +759,20 @@ func GeneratePlanDiagram( ) (FlowDiagram, error) { // We sort the flows by node because we want the diagram data to be // deterministic. - nodeIDs := make([]int, 0, len(flows)) + nodeIDs := make([]roachpb.NodeID, 0, len(flows)) for n := range flows { - nodeIDs = append(nodeIDs, int(n)) + nodeIDs = append(nodeIDs, n) } - sort.Ints(nodeIDs) + sort.Slice(nodeIDs, func(i, j int) bool { + return nodeIDs[i] < nodeIDs[j] + }) flowSlice := make([]FlowSpec, len(nodeIDs)) - nodeNames := make([]string, len(nodeIDs)) - for i, nVal := range nodeIDs { - n := roachpb.NodeID(nVal) + for i, n := range nodeIDs { flowSlice[i] = *flows[n] - nodeNames[i] = n.String() } - return generateDiagramData(sql, flowSlice, nodeNames, flags) + return generateDiagramData(sql, flowSlice, nodeIDs, flags) } // GeneratePlanDiagramURL generates the json data for a flow diagram and a diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 232415de34d5..b927c477c7d0 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -20,11 +20,13 @@ import ( ) type processorStats struct { + // TODO(radu): this field redundant with stats.Component.NodeID. nodeID roachpb.NodeID stats *execinfrapb.ComponentStats } type streamStats struct { + // TODO(radu): this field redundant with stats.Component.NodeID. originNodeID roachpb.NodeID destinationNodeID roachpb.NodeID stats *execinfrapb.ComponentStats diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 758ead28c526..4480a1db1ee8 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -194,10 +194,12 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} + n1 := roachpb.NodeID(1) + n2 := roachpb.NodeID(2) a.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -209,9 +211,9 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { ) a.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), @@ -289,10 +291,12 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { // Artificially inject component stats directly into the FlowsMetadata (in // the non-testing setting the stats come from the trace). var f1, f2 execstats.FlowsMetadata + n1 := roachpb.NodeID(1) + n2 := roachpb.NodeID(2) f1.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -302,9 +306,9 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { }, ) f2.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index 9519591726f8..d25fd92c3870 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -17,21 +17,17 @@ import ( // AddComponentStats modifies TraceAnalyzer internal state to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (a *TraceAnalyzer) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { - a.FlowsMetadata.AddComponentStats(nodeID, stats) +func (a *TraceAnalyzer) AddComponentStats(stats *execinfrapb.ComponentStats) { + a.FlowsMetadata.AddComponentStats(stats) } // AddComponentStats modifies FlowsMetadata to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (m *FlowsMetadata) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { +func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: processorStat := &processorStats{ - nodeID: nodeID, + nodeID: stats.Component.NodeID, stats: stats, } if m.processorStats == nil { @@ -40,7 +36,7 @@ func (m *FlowsMetadata) AddComponentStats( m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ - originNodeID: nodeID, + originNodeID: stats.Component.NodeID, stats: stats, } if m.streamStats == nil { @@ -53,6 +49,6 @@ func (m *FlowsMetadata) AddComponentStats( if m.flowStats == nil { m.flowStats = make(map[roachpb.NodeID]*flowStats) } - m.flowStats[nodeID] = flowStat + m.flowStats[stats.Component.NodeID] = flowStat } } diff --git a/pkg/sql/flowinfra/outbox_test.go b/pkg/sql/flowinfra/outbox_test.go index 944678aefa4e..9916601cab91 100644 --- a/pkg/sql/flowinfra/outbox_test.go +++ b/pkg/sql/flowinfra/outbox_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -71,6 +72,7 @@ func TestOutbox(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -227,6 +229,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -301,6 +304,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -376,6 +380,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), Stopper: stopper, }, + NodeID: base.TestingIDContainer, }) outbox.Init(rowenc.OneIntCol) // In a RunSyncFlow call, the outbox runs under the call's context. @@ -437,6 +442,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -490,6 +496,7 @@ func TestOutboxUnblocksProducers(t *testing.T) { // a nil nodeDialer will always fail to connect. NodeDialer: nil, }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -560,6 +567,7 @@ func BenchmarkOutbox(b *testing.B) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.MakeIntCols(numCols)) diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 26b77b26be0c..d22e60901389 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -224,7 +224,12 @@ func (ih *instrumentationHelper) Finish( } if ih.traceMetadata != nil && ih.explainPlan != nil { - ih.traceMetadata.annotateExplain(ih.explainPlan, trace, cfg.TestingKnobs.DeterministicExplainAnalyze) + ih.traceMetadata.annotateExplain( + ih.explainPlan, + p.curPlan.distSQLFlowInfos, + trace, + cfg.TestingKnobs.DeterministicExplainAnalyze, + ) } // TODO(radu): this should be unified with other stmt stats accesses. @@ -478,7 +483,7 @@ func (m execNodeTraceMetadata) associateNodeWithComponents( // annotateExplain aggregates the statistics in the trace and annotates // explain.Nodes with execution stats. func (m execNodeTraceMetadata) annotateExplain( - plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, + plan *explain.Plan, flowInfos []flowInfo, spans []tracingpb.RecordedSpan, makeDeterministic bool, ) { statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic) @@ -489,8 +494,12 @@ func (m execNodeTraceMetadata) annotateExplain( var nodeStats exec.ExecutionStats incomplete := false - for i := range components { - stats := statsMap[components[i]] + var nodes util.FastIntSet + for _, c := range components { + if c.Type == execinfrapb.ComponentID_PROCESSOR { + nodes.Add(int(c.NodeID)) + } + stats := statsMap[c] if stats == nil { incomplete = true break @@ -503,6 +512,9 @@ func (m execNodeTraceMetadata) annotateExplain( // incomplete results. In the future, we may consider an incomplete flag // if we want to show them with a warning. if !incomplete { + for i, ok := nodes.Next(0); ok; i, ok = nodes.Next(i + 1) { + nodeStats.Nodes = append(nodeStats.Nodes, fmt.Sprintf("n%d", i)) + } n.Annotate(exec.ExecutionStatsID, &nodeStats) } } diff --git a/pkg/sql/logictest/testdata/logic_test/dist_vectorize b/pkg/sql/logictest/testdata/logic_test/dist_vectorize index da9e077cd4ed..21cd30465751 100644 --- a/pkg/sql/logictest/testdata/logic_test/dist_vectorize +++ b/pkg/sql/logictest/testdata/logic_test/dist_vectorize @@ -57,9 +57,11 @@ distribution: vectorized: · • group (scalar) +│ cluster nodes: │ actual row count: 1 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -78,12 +80,14 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -92,6 +96,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze b/pkg/sql/logictest/testdata/logic_test/explain_analyze index cc8534537cb0..1e44a1c9e85d 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze @@ -15,6 +15,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -34,6 +35,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B @@ -55,6 +57,7 @@ vectorized: · • hash join (inner) │ columns: (k, v, a, b) +│ cluster nodes: │ actual row count: 2 │ estimated row count: 990 (missing stats) │ equality: (v) = (a) @@ -62,6 +65,7 @@ vectorized: │ ├── • scan │ columns: (k, v) +│ cluster nodes: │ actual row count: 4 │ KV rows read: 4 │ KV bytes read: 32 B @@ -71,6 +75,7 @@ vectorized: │ └── • scan columns: (a, b) + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans index 1c1997e071b6..525dd98a2d36 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans @@ -66,17 +66,20 @@ distribution: vectorized: · • group +│ cluster nodes: │ actual row count: 5 │ group by: k │ ordered: +k │ └── • merge join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -85,6 +88,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -104,19 +108,23 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 5 │ order: +w │ └── • distinct + │ cluster nodes: │ actual row count: 5 │ distinct on: w │ └── • hash join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (w) │ left cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -125,6 +133,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -144,12 +153,15 @@ distribution: vectorized: · • cross join +│ cluster nodes: │ actual row count: 25 │ ├── • ordinality +│ │ cluster nodes: │ │ actual row count: 5 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -158,9 +170,11 @@ vectorized: │ spans: FULL SCAN │ └── • ordinality + │ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -200,9 +214,11 @@ distribution: vectorized: · • window +│ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -223,6 +239,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -249,6 +266,7 @@ vectorized: • root │ ├── • insert +│ │ cluster nodes: │ │ actual row count: 1 │ │ into: child(c, p) │ │ @@ -264,9 +282,11 @@ vectorized: │ │ exec mode: one row │ │ │ └── • group (scalar) +│ │ cluster nodes: │ │ actual row count: 1 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 1 │ KV rows read: 1 │ KV bytes read: 8 B @@ -278,9 +298,11 @@ vectorized: └── • constraint-check │ └── • error if rows + │ cluster nodes: │ actual row count: 0 │ └── • lookup join (anti) + │ cluster nodes: │ actual row count: 0 │ KV rows read: 1 │ KV bytes read: 8 B @@ -289,10 +311,12 @@ vectorized: │ equality cols are key │ └── • filter + │ cluster nodes: │ actual row count: 1 │ filter: column2 IS NOT NULL │ └── • scan buffer + cluster nodes: actual row count: 1 label: buffer 1 · diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial index c4844f3de938..92af0447fe00 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial @@ -34,25 +34,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 4 KV bytes read: 32 B @@ -102,25 +107,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -146,25 +156,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables index edcd216cbc54..367741095fb8 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables +++ b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables @@ -414,4 +414,3 @@ SHOW CREATE ALL TABLES ---- create_statement CREATE SEQUENCE public.s1 MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 123 START 1; - diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_local b/pkg/sql/logictest/testdata/logic_test/vectorize_local index 1a5a3cda1cee..b33346a74d6e 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_local +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_local @@ -44,6 +44,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 2,001 KV rows read: 2,001 KV bytes read: 16 KiB @@ -62,6 +63,7 @@ distribution: vectorized: · • lookup join +│ cluster nodes: │ actual row count: 2 │ KV rows read: 1 │ KV bytes read: 8 B @@ -69,6 +71,7 @@ vectorized: │ equality: (b) = (b) │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -87,10 +90,12 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 2 │ equality: (a) = (b) │ ├── • scan +│ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B @@ -99,6 +104,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index df460c0ea00a..6466077eb31c 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -331,6 +331,9 @@ func (e *emitter) joinNodeName(algo string, joinType descpb.JoinType) string { func (e *emitter) emitNodeAttributes(n *Node) error { if stats, ok := n.annotations[exec.ExecutionStatsID]; ok { s := stats.(*exec.ExecutionStats) + if len(s.Nodes) > 0 { + e.ob.AddNonDeterministicField("cluster nodes", strings.Join(s.Nodes, ", ")) + } if s.RowCount.HasValue() { e.ob.AddField("actual row count", humanizeutil.Count(s.RowCount.Value())) } diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index 86a2c406f2f3..4a13417b9cdb 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -105,6 +105,15 @@ func (ob *OutputBuilder) AddField(key, value string) { ob.entries = append(ob.entries, entry{field: key, fieldVal: value}) } +// AddNonDeterministicField adds an information field under the current node, +// but hides the information if the MakeDeterministic flag is set. +func (ob *OutputBuilder) AddNonDeterministicField(key, value string) { + if ob.flags.MakeDeterministic { + value = "" + } + ob.AddField(key, value) +} + // Attr adds an information field under the current node. func (ob *OutputBuilder) Attr(key string, value interface{}) { ob.AddField(key, fmt.Sprint(value)) diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index d18179869a62..9be03805d27c 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -304,6 +304,9 @@ type ExecutionStats struct { KVBytesRead optional.Uint KVRowsRead optional.Uint + + // Nodes on which this operator was executed. + Nodes []string } // BuildPlanForExplainFn builds an execution plan against the given From 878bf45cccf50d5adcb234475763f86d25e59b54 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 18 Feb 2021 13:09:27 -0500 Subject: [PATCH 2/2] sql: use SQLInstanceID in ComponentStats Switch to using SQLInstanceID instead of NodeID in ComponentStats. Eventually, the same change should happen in the exec protos (e.g. StreamEndpointSpec). Release note: None --- pkg/sql/colflow/BUILD.bazel | 1 + pkg/sql/colflow/vectorized_flow.go | 5 +- pkg/sql/distsql_physical_planner.go | 3 +- pkg/sql/execinfra/flow_context.go | 9 +- pkg/sql/execinfrapb/component_stats.go | 34 +++-- pkg/sql/execinfrapb/component_stats.pb.go | 169 +++++++++++----------- pkg/sql/execinfrapb/component_stats.proto | 9 +- pkg/sql/execinfrapb/flow_diagram.go | 5 +- pkg/sql/execstats/BUILD.bazel | 1 + pkg/sql/execstats/traceanalyzer.go | 68 +++++---- pkg/sql/execstats/traceanalyzer_test.go | 8 +- pkg/sql/execstats/utils_test.go | 9 +- pkg/sql/flowinfra/flow.go | 3 +- pkg/sql/instrumentation.go | 2 +- 14 files changed, 168 insertions(+), 158 deletions(-) diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 64fa1d7c8603..8ffd605db4c6 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/colflow", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/col/coldata", "//pkg/col/coldataext", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 5dd4d32f3817..283df6e9cdc6 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -834,7 +835,7 @@ func (s *vectorizedFlowCreator) setupInput( // Note: we can't use flowCtx.StreamComponentID because the stream does // not originate from this node (we are the target node). compID := execinfrapb.StreamComponentID( - inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID, + base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID, ) op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency) if err != nil { @@ -961,7 +962,7 @@ func (s *vectorizedFlowCreator) setupOutput( // flow-level span. span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID) span.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID), + Component: execinfrapb.FlowComponentID(base.SQLInstanceID(outputStream.OriginNodeID), flowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 6e22c1c7eef6..b24db1eaa35e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -16,6 +16,7 @@ import ( "reflect" "sort" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" @@ -2744,7 +2745,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( processors := make(execComponents, len(plan.ResultRouters)) for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( - plan.Processors[resultProcIdx].Node, + base.SQLInstanceID(plan.Processors[resultProcIdx].Node), execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, int32(resultProcIdx), ) diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index b144768751eb..cace95667d84 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -109,15 +108,11 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec { // ProcessorComponentID returns a ComponentID for the given processor in this // flow. func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID { - // TODO(radu): the component stats should store SQLInstanceID instead. - nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) - return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID) + return execinfrapb.ProcessorComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, procID) } // StreamComponentID returns a ComponentID for the given stream in this flow. // The stream must originate from the node associated with this FlowCtx. func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { - // TODO(radu): the component stats should store SQLInstanceID instead. - originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) - return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID) + return execinfrapb.StreamComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, streamID) } diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 6e8f8a5b3bd3..75f059bf5fdf 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -26,31 +26,35 @@ import ( ) // ProcessorComponentID returns a ComponentID for the given processor in a flow. -func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID { +func ProcessorComponentID( + instanceID base.SQLInstanceID, flowID FlowID, processorID int32, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_PROCESSOR, - ID: processorID, - NodeID: nodeID, + FlowID: flowID, + Type: ComponentID_PROCESSOR, + ID: processorID, + SQLInstanceID: instanceID, } } // StreamComponentID returns a ComponentID for the given stream in a flow. -func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID { +func StreamComponentID( + originInstanceID base.SQLInstanceID, flowID FlowID, streamID StreamID, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_STREAM, - ID: int32(streamID), - NodeID: originNodeID, + FlowID: flowID, + Type: ComponentID_STREAM, + ID: int32(streamID), + SQLInstanceID: originInstanceID, } } // FlowComponentID returns a ComponentID for the given flow. -func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID { +func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_FLOW, - NodeID: nodeID, + FlowID: flowID, + Type: ComponentID_FLOW, + SQLInstanceID: instanceID, } } diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index 3ac9af753875..f8f28058d193 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -8,7 +8,7 @@ import fmt "fmt" import math "math" import optional "github.com/cockroachdb/cockroach/pkg/util/optional" -import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import github_com_cockroachdb_cockroach_pkg_base "github.com/cockroachdb/cockroach/pkg/base" import io "io" @@ -69,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{0, 0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -81,16 +81,16 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` - // NodeID of the node this component is associated with. For cross-node + // SQLInstanceID of the node this component is associated with. For cross-node // streams, this is the *origin* node for the stream. - NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` + SQLInstanceID github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID `protobuf:"varint,4,opt,name=sql_instance_id,json=sqlInstanceId,casttype=github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID" json:"sql_instance_id"` } func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,7 +137,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{1} + return fileDescriptor_component_stats_011c4899fda43780, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -174,7 +174,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{2} + return fileDescriptor_component_stats_011c4899fda43780, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -218,7 +218,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{3} + return fileDescriptor_component_stats_011c4899fda43780, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -256,7 +256,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{4} + return fileDescriptor_component_stats_011c4899fda43780, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -297,7 +297,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{5} + return fileDescriptor_component_stats_011c4899fda43780, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -336,7 +336,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{6} + return fileDescriptor_component_stats_011c4899fda43780, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -373,7 +373,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{7} + return fileDescriptor_component_stats_011c4899fda43780, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -407,7 +407,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{8} + return fileDescriptor_component_stats_011c4899fda43780, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -475,7 +475,7 @@ func (m *ComponentID) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintComponentStats(dAtA, i, uint64(m.ID)) dAtA[i] = 0x20 i++ - i = encodeVarintComponentStats(dAtA, i, uint64(m.NodeID)) + i = encodeVarintComponentStats(dAtA, i, uint64(m.SQLInstanceID)) return i, nil } @@ -878,7 +878,7 @@ func (m *ComponentID) Size() (n int) { n += 1 + l + sovComponentStats(uint64(l)) n += 1 + sovComponentStats(uint64(m.Type)) n += 1 + sovComponentStats(uint64(m.ID)) - n += 1 + sovComponentStats(uint64(m.NodeID)) + n += 1 + sovComponentStats(uint64(m.SQLInstanceID)) return n } @@ -1128,9 +1128,9 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SQLInstanceID", wireType) } - m.NodeID = 0 + m.SQLInstanceID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowComponentStats @@ -1140,7 +1140,7 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + m.SQLInstanceID |= (github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID(b) & 0x7F) << shift if b < 0x80 { break } @@ -2543,70 +2543,71 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_e22770c51d19c14e) -} - -var fileDescriptor_component_stats_e22770c51d19c14e = []byte{ - // 970 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, - 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, - 0xa1, 0x4a, 0x05, 0x95, 0x4a, 0xbd, 0xe5, 0x60, 0x0c, 0x69, 0x88, 0x63, 0xe3, 0x2e, 0xd8, 0x95, - 0x72, 0x28, 0x5a, 0x76, 0xc7, 0x78, 0xb5, 0x2f, 0xb3, 0xde, 0x9d, 0xc5, 0xeb, 0x7e, 0x8a, 0x9e, - 0xda, 0xcf, 0xd1, 0x6b, 0x3e, 0x81, 0x8f, 0x39, 0x46, 0x3d, 0x58, 0x0d, 0xee, 0x57, 0xe8, 0xa5, - 0xa7, 0x6a, 0x66, 0x5f, 0xd8, 0xb8, 0x42, 0x61, 0xab, 0xde, 0x46, 0xcb, 0xf3, 0xff, 0xcd, 0xf3, - 0x3e, 0xc0, 0x53, 0xef, 0xd2, 0xec, 0xe0, 0x00, 0xab, 0xba, 0x7d, 0xee, 0x2a, 0xce, 0xac, 0xa3, - 0x12, 0xcb, 0x21, 0x36, 0xb6, 0xe9, 0xd4, 0xa3, 0x0a, 0xf5, 0xda, 0x8e, 0x4b, 0x28, 0x41, 0x75, - 0x95, 0xa8, 0x86, 0x4b, 0x14, 0xf5, 0xa2, 0xed, 0x5d, 0x9a, 0x6d, 0x4d, 0xf7, 0xa8, 0x77, 0x69, - 0xba, 0xbe, 0xbd, 0xf3, 0x78, 0x4e, 0xe6, 0x84, 0x1b, 0x75, 0xd8, 0x29, 0xb4, 0xdf, 0xf9, 0xd4, - 0xa7, 0xba, 0xd9, 0x21, 0x0e, 0xd5, 0x89, 0xad, 0xac, 0x0e, 0xe1, 0xaf, 0xcd, 0x37, 0x22, 0x54, - 0x0e, 0xe2, 0x7b, 0x86, 0x7d, 0xf4, 0x35, 0x94, 0xce, 0x4d, 0x72, 0x35, 0xd5, 0xb5, 0xba, 0xb0, - 0x27, 0xb4, 0x1e, 0xf4, 0xea, 0x37, 0xb7, 0xbb, 0xb9, 0xdf, 0x6f, 0x77, 0xa5, 0xe7, 0x26, 0xb9, - 0x1a, 0xf6, 0x97, 0xc9, 0x49, 0x96, 0x98, 0xe1, 0x50, 0x43, 0x7d, 0x28, 0xd0, 0x6b, 0x07, 0xd7, - 0xc5, 0x3d, 0xa1, 0x55, 0xed, 0x7e, 0xd9, 0x5e, 0xe7, 0x5f, 0x3b, 0x75, 0x4f, 0x7b, 0x72, 0xed, - 0xe0, 0x5e, 0x81, 0xb1, 0x65, 0xae, 0x46, 0x3b, 0x20, 0xea, 0x5a, 0x3d, 0xbf, 0x27, 0xb4, 0x8a, - 0x3d, 0x60, 0xdf, 0x97, 0xb7, 0xbb, 0xe2, 0xb0, 0x2f, 0x8b, 0xba, 0x86, 0x7e, 0x84, 0x92, 0x4d, - 0x34, 0xcc, 0x9c, 0x2a, 0x70, 0x83, 0x41, 0x64, 0x20, 0x1d, 0x13, 0x0d, 0x0f, 0xfb, 0x7f, 0xdf, - 0xee, 0x7e, 0x33, 0xd7, 0xe9, 0x85, 0x3f, 0x6b, 0xab, 0xc4, 0xea, 0x24, 0x0e, 0x68, 0xb3, 0xd5, - 0xb9, 0xe3, 0x18, 0xf3, 0x0e, 0x3f, 0x39, 0xb3, 0x76, 0x28, 0x93, 0x25, 0x46, 0x1d, 0x6a, 0xcd, - 0x6f, 0xa1, 0xc0, 0xfc, 0x41, 0x65, 0x28, 0x9e, 0x1e, 0x8f, 0x07, 0x93, 0x5a, 0x0e, 0x6d, 0x43, - 0xf9, 0x44, 0x1e, 0x1d, 0x0c, 0xc6, 0xe3, 0x91, 0x5c, 0x13, 0x10, 0x80, 0x34, 0x9e, 0xc8, 0x83, - 0xfd, 0xa3, 0x9a, 0x88, 0xb6, 0xa0, 0xf0, 0xfc, 0xd5, 0xe8, 0x87, 0x5a, 0xbe, 0xf9, 0xa6, 0x00, - 0xd5, 0x24, 0xa8, 0x31, 0xab, 0x11, 0x1a, 0x42, 0x39, 0x29, 0x1b, 0xcf, 0x60, 0xa5, 0xfb, 0x74, - 0xa3, 0x8c, 0x44, 0xc9, 0x58, 0xa9, 0xd1, 0x00, 0x24, 0x1b, 0xd3, 0xa9, 0x1b, 0xf0, 0xcc, 0x56, - 0xba, 0xad, 0xf5, 0x9c, 0x63, 0x4c, 0xaf, 0x88, 0x6b, 0xc8, 0x01, 0x77, 0x22, 0x42, 0x15, 0x6d, - 0x4c, 0xe5, 0x20, 0xc6, 0xd0, 0x80, 0x27, 0x77, 0x13, 0xcc, 0xe4, 0x5f, 0x98, 0x49, 0x80, 0x9e, - 0x81, 0x68, 0x2c, 0x78, 0xfa, 0x2b, 0xdd, 0x27, 0xeb, 0x11, 0x87, 0x67, 0xa1, 0x36, 0x29, 0xe1, - 0xe1, 0x99, 0x2c, 0x1a, 0x0b, 0xf4, 0x0c, 0x0a, 0xac, 0xb5, 0xeb, 0x45, 0x0e, 0xf8, 0x7c, 0x3d, - 0x60, 0x10, 0x60, 0x35, 0x7d, 0x3d, 0x97, 0xa1, 0x03, 0x90, 0x88, 0x4f, 0x1d, 0x9f, 0xd6, 0xa5, - 0x8f, 0xe5, 0x74, 0xc4, 0xed, 0xd2, 0x88, 0x48, 0x8a, 0x7a, 0x20, 0xe9, 0xb6, 0xe3, 0x53, 0xaf, - 0x5e, 0xda, 0xcb, 0xb7, 0x2a, 0xdd, 0x2f, 0xd6, 0x43, 0x86, 0xf6, 0x7d, 0x46, 0xa8, 0x44, 0x2f, - 0x00, 0xf8, 0x7c, 0xf0, 0x89, 0xac, 0x6f, 0x7d, 0x2c, 0x1a, 0x36, 0x2a, 0x69, 0x4c, 0xf9, 0x3c, - 0xfe, 0xd0, 0xfc, 0x45, 0x00, 0x58, 0x5d, 0x83, 0x7a, 0x00, 0xb6, 0x6f, 0x4d, 0xa9, 0xef, 0x98, - 0xd8, 0x8b, 0x3a, 0xe7, 0xb3, 0x14, 0x98, 0x4d, 0x71, 0x3b, 0x19, 0xde, 0x53, 0xdd, 0xa6, 0x31, - 0xd2, 0xf6, 0xad, 0x09, 0x57, 0xa1, 0x3e, 0x94, 0xaf, 0x14, 0x9d, 0x4e, 0xa9, 0x6e, 0xe1, 0xa8, - 0x69, 0x9e, 0xac, 0x45, 0xf4, 0x7d, 0x57, 0x61, 0xc7, 0x08, 0xb3, 0xc5, 0x94, 0x13, 0xdd, 0xc2, - 0xcd, 0xf7, 0x79, 0xa8, 0x7e, 0xd8, 0x50, 0x68, 0x1f, 0x4a, 0xa6, 0x42, 0xb1, 0xad, 0x5e, 0x47, - 0x9e, 0x6d, 0x8c, 0x8d, 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x35, 0x3c, 0xd6, 0xb0, 0x87, 0x5d, 0x5d, - 0x31, 0xf5, 0x9f, 0xb8, 0x49, 0x08, 0xcc, 0x67, 0x03, 0x7e, 0x72, 0x0f, 0xc2, 0xd9, 0xaf, 0xe0, - 0x61, 0x98, 0xfd, 0xa9, 0x8b, 0x55, 0xac, 0x2f, 0xb0, 0x16, 0xb5, 0xfb, 0x46, 0x65, 0xa8, 0x86, - 0x5a, 0x39, 0x92, 0xa2, 0x97, 0x50, 0x9d, 0x5d, 0xd3, 0x34, 0xac, 0xb8, 0x39, 0x6c, 0x9b, 0x4b, - 0x13, 0xd6, 0x09, 0x3c, 0xb2, 0xb0, 0xe7, 0x29, 0xf3, 0x34, 0x4e, 0xda, 0x1c, 0x57, 0x8b, 0xd5, - 0x31, 0xb1, 0xf9, 0xa7, 0x90, 0xd4, 0x38, 0x9a, 0x76, 0xd4, 0x87, 0x4a, 0x14, 0xbe, 0xb7, 0xda, - 0x5d, 0x1b, 0xe1, 0x21, 0xd4, 0x8d, 0xd9, 0xd2, 0xea, 0x01, 0x84, 0x61, 0x73, 0x88, 0x98, 0xa1, - 0x8d, 0xb9, 0x8c, 0x33, 0x5e, 0xc0, 0x76, 0x12, 0x2e, 0xc7, 0xe4, 0x37, 0xc7, 0x3c, 0x88, 0x95, - 0x8c, 0xd4, 0xfc, 0x4d, 0x84, 0x52, 0xb4, 0x91, 0x56, 0x9e, 0xb9, 0x58, 0xd1, 0x32, 0x0d, 0x58, - 0x54, 0x0c, 0x45, 0x4b, 0xe5, 0x88, 0x43, 0xc4, 0xcc, 0x39, 0xe2, 0x94, 0x97, 0x50, 0x32, 0x16, - 0x19, 0xfb, 0xb6, 0x1a, 0xbf, 0x78, 0x87, 0x67, 0xac, 0x59, 0x65, 0xc9, 0x58, 0xf0, 0xa6, 0x3d, - 0x81, 0x87, 0x2a, 0xb1, 0x29, 0xb6, 0x57, 0xb3, 0x50, 0xc8, 0x36, 0x0b, 0xd5, 0x95, 0x9e, 0x8f, - 0xff, 0x5f, 0x02, 0x94, 0x93, 0x25, 0xcc, 0xc6, 0x96, 0x2d, 0xe0, 0x90, 0x9c, 0x71, 0xf6, 0xb7, - 0x98, 0x92, 0x7b, 0x39, 0x82, 0x47, 0x96, 0x12, 0x4c, 0x15, 0xd3, 0x24, 0xaa, 0x42, 0xb1, 0x36, - 0xb5, 0xb0, 0x95, 0x25, 0x7b, 0x0f, 0x2d, 0x25, 0xd8, 0x8f, 0xc5, 0x47, 0xd8, 0x42, 0xdf, 0x03, - 0xfa, 0x10, 0xa8, 0xe9, 0x9e, 0x91, 0xa5, 0x4f, 0x6a, 0x69, 0x62, 0x5f, 0xf7, 0x8c, 0xe6, 0xaf, - 0x02, 0x54, 0x52, 0x6f, 0x07, 0xab, 0x35, 0x5b, 0xc8, 0x33, 0x85, 0xaa, 0x17, 0xd9, 0x36, 0x32, - 0x5b, 0xe4, 0xbd, 0x50, 0x76, 0x6f, 0xad, 0x8b, 0xff, 0x65, 0xad, 0x37, 0x27, 0x50, 0x4e, 0xde, - 0x11, 0xf4, 0x1d, 0x6c, 0xb3, 0xc8, 0x2d, 0x6c, 0x4d, 0x7d, 0xd6, 0xe8, 0x59, 0x1c, 0xab, 0x58, - 0x4a, 0x70, 0x84, 0xad, 0x53, 0xa6, 0xeb, 0x7d, 0x75, 0xf3, 0xbe, 0x91, 0xbb, 0x59, 0x36, 0x84, - 0xb7, 0xcb, 0x86, 0xf0, 0x6e, 0xd9, 0x10, 0xfe, 0x58, 0x36, 0x84, 0x9f, 0xef, 0x1a, 0xb9, 0xb7, - 0x77, 0x8d, 0xdc, 0xbb, 0xbb, 0x46, 0xee, 0x75, 0x25, 0xf5, 0x67, 0xf4, 0x9f, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xce, 0xc1, 0x00, 0xbc, 0x9e, 0x0a, 0x00, 0x00, + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_011c4899fda43780) +} + +var fileDescriptor_component_stats_011c4899fda43780 = []byte{ + // 982 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0xe2, 0x46, + 0x18, 0xc7, 0xb1, 0x21, 0x26, 0x3c, 0x2c, 0x84, 0x9d, 0xee, 0x01, 0x45, 0x2d, 0xc9, 0xd2, 0xae, + 0x84, 0x2a, 0x15, 0x54, 0x0e, 0xed, 0x69, 0x0f, 0x21, 0xb0, 0x5d, 0x36, 0xc9, 0x92, 0x35, 0x24, + 0x95, 0xf6, 0x62, 0x19, 0x7b, 0x42, 0x46, 0xf8, 0x0d, 0x7b, 0x4c, 0x9c, 0x7e, 0x8a, 0x9e, 0xda, + 0xcf, 0xd1, 0x6b, 0x3f, 0x41, 0x8e, 0x7b, 0xe8, 0x61, 0xd5, 0x43, 0xd4, 0x25, 0xfd, 0x0a, 0xbd, + 0xf4, 0x54, 0xcd, 0xd8, 0x18, 0x27, 0x15, 0x5a, 0x5c, 0xf5, 0x36, 0xd8, 0xcf, 0xff, 0x37, 0xcf, + 0xbb, 0x81, 0x67, 0xde, 0xcc, 0x68, 0xe1, 0x00, 0x6b, 0xc4, 0xba, 0x70, 0x55, 0x67, 0xdc, 0xd2, + 0x6c, 0xd3, 0xb1, 0x2d, 0x6c, 0x51, 0xc5, 0xa3, 0x2a, 0xf5, 0x9a, 0x8e, 0x6b, 0x53, 0x1b, 0x55, + 0x35, 0x5b, 0x9b, 0xba, 0xb6, 0xaa, 0x5d, 0x36, 0xbd, 0x99, 0xd1, 0xd4, 0x89, 0x47, 0xbd, 0x99, + 0xe1, 0xfa, 0xd6, 0xee, 0x93, 0x89, 0x3d, 0xb1, 0xb9, 0x51, 0x8b, 0x9d, 0x42, 0xfb, 0xdd, 0x4f, + 0x7d, 0x4a, 0x8c, 0x96, 0xed, 0x50, 0x62, 0x5b, 0xea, 0xea, 0x10, 0xbe, 0xad, 0xff, 0x26, 0x42, + 0xf1, 0x70, 0x79, 0x4f, 0xbf, 0x8b, 0xbe, 0x86, 0xfc, 0x85, 0x61, 0x5f, 0x29, 0x44, 0xaf, 0x0a, + 0xfb, 0x42, 0xe3, 0x51, 0xa7, 0x7a, 0x73, 0xbb, 0x97, 0xf9, 0xfd, 0x76, 0x4f, 0x7a, 0x61, 0xd8, + 0x57, 0xfd, 0xee, 0x22, 0x3e, 0xc9, 0x12, 0x33, 0xec, 0xeb, 0xa8, 0x0b, 0x39, 0x7a, 0xed, 0xe0, + 0xaa, 0xb8, 0x2f, 0x34, 0xca, 0xed, 0x2f, 0x9b, 0xeb, 0xfc, 0x6b, 0x26, 0xee, 0x69, 0x8e, 0xae, + 0x1d, 0xdc, 0xc9, 0x31, 0xb6, 0xcc, 0xd5, 0x68, 0x17, 0x44, 0xa2, 0x57, 0xb3, 0xfb, 0x42, 0x63, + 0xab, 0x03, 0xec, 0xf9, 0xe2, 0x76, 0x4f, 0xec, 0x77, 0x65, 0x91, 0xe8, 0x28, 0x80, 0x1d, 0x6f, + 0x66, 0x28, 0xc4, 0xf2, 0xa8, 0x6a, 0x69, 0x98, 0x39, 0x97, 0xe3, 0x86, 0xa7, 0x91, 0x61, 0x69, + 0xf8, 0xe6, 0xb8, 0x1f, 0xbd, 0xed, 0x77, 0xff, 0xbe, 0xdd, 0xfb, 0x76, 0x42, 0xe8, 0xa5, 0x3f, + 0x6e, 0x6a, 0xb6, 0xd9, 0x8a, 0xfd, 0xd1, 0xc7, 0xab, 0x73, 0xcb, 0x99, 0x4e, 0x5a, 0x63, 0xd5, + 0xc3, 0xcd, 0x7b, 0x52, 0xb9, 0xe4, 0xcd, 0x8c, 0xf8, 0xa7, 0x5e, 0xff, 0x06, 0x72, 0xcc, 0x53, + 0x54, 0x80, 0xad, 0xb3, 0xd7, 0xc3, 0xde, 0xa8, 0x92, 0x41, 0x25, 0x28, 0x9c, 0xca, 0x83, 0xc3, + 0xde, 0x70, 0x38, 0x90, 0x2b, 0x02, 0x02, 0x90, 0x86, 0x23, 0xb9, 0x77, 0x70, 0x52, 0x11, 0xd1, + 0x36, 0xe4, 0x5e, 0x1c, 0x0f, 0xbe, 0xaf, 0x64, 0xeb, 0xbf, 0xe6, 0xa0, 0x1c, 0x87, 0x3b, 0x64, + 0xd5, 0x43, 0x7d, 0x28, 0xc4, 0x05, 0xe5, 0xb9, 0x2d, 0xb6, 0x9f, 0x6d, 0x94, 0xab, 0x28, 0x4d, + 0x2b, 0x35, 0xea, 0x81, 0x64, 0x61, 0xaa, 0xb8, 0x01, 0xcf, 0x79, 0xb1, 0xdd, 0x58, 0xcf, 0x79, + 0x8d, 0xe9, 0x95, 0xed, 0x4e, 0xe5, 0x80, 0x3b, 0x11, 0xa1, 0xb6, 0x2c, 0x4c, 0xe5, 0x60, 0x89, + 0xa1, 0x01, 0x4f, 0xfb, 0x26, 0x98, 0xd1, 0xbf, 0x30, 0xa3, 0x00, 0x3d, 0x07, 0x71, 0x3a, 0xe7, + 0x05, 0x29, 0xb6, 0x9f, 0xae, 0x47, 0x1c, 0x9d, 0x87, 0xda, 0xb8, 0xb8, 0x47, 0xe7, 0xb2, 0x38, + 0x9d, 0xa3, 0xe7, 0x90, 0x63, 0x4d, 0x5f, 0xdd, 0xe2, 0x80, 0xcf, 0xd7, 0x03, 0x7a, 0x01, 0xd6, + 0x92, 0xd7, 0x73, 0x19, 0x3a, 0x04, 0xc9, 0xf6, 0xa9, 0xe3, 0xd3, 0xaa, 0xf4, 0xb1, 0x9c, 0x0e, + 0xb8, 0x5d, 0x12, 0x11, 0x49, 0x51, 0x07, 0x24, 0x62, 0x39, 0x3e, 0xf5, 0xaa, 0xf9, 0xfd, 0x6c, + 0xa3, 0xd8, 0xfe, 0x62, 0x3d, 0xa4, 0x6f, 0x3d, 0x64, 0x84, 0x4a, 0xf4, 0x12, 0x80, 0x4f, 0x0e, + 0x9f, 0xd5, 0xea, 0xf6, 0xc7, 0xa2, 0x61, 0x43, 0x94, 0xc4, 0x14, 0x2e, 0x96, 0x0f, 0xea, 0x3f, + 0x09, 0x00, 0xab, 0x6b, 0x50, 0x07, 0xc0, 0xf2, 0x4d, 0x85, 0xfa, 0x8e, 0x81, 0xbd, 0xa8, 0x73, + 0x3e, 0x4b, 0x80, 0xd9, 0x7c, 0x37, 0xe3, 0xb1, 0x3e, 0x23, 0x16, 0x5d, 0x22, 0x2d, 0xdf, 0x1c, + 0x71, 0x15, 0xea, 0x42, 0xe1, 0x4a, 0x25, 0x54, 0xa1, 0xc4, 0xc4, 0x51, 0xd3, 0x3c, 0x5d, 0x8b, + 0xe8, 0xfa, 0xae, 0xca, 0x8e, 0x11, 0x66, 0x9b, 0x29, 0x47, 0xc4, 0xc4, 0xf5, 0x0f, 0x59, 0x28, + 0xdf, 0x6f, 0x28, 0x74, 0x00, 0x79, 0x43, 0xa5, 0xd8, 0xd2, 0xae, 0x23, 0xcf, 0x36, 0xc6, 0x2e, + 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x2d, 0x3c, 0xd1, 0xb1, 0x87, 0x5d, 0xa2, 0x1a, 0xe4, 0x07, 0x6e, + 0x12, 0x02, 0xb3, 0xe9, 0x80, 0x9f, 0x3c, 0x80, 0x70, 0xf6, 0x31, 0xec, 0x84, 0xd9, 0x57, 0x5c, + 0xac, 0x61, 0x32, 0xc7, 0x7a, 0xd4, 0xee, 0x1b, 0x95, 0xa1, 0x1c, 0x6a, 0xe5, 0x48, 0x8a, 0x5e, + 0x41, 0x79, 0x7c, 0x4d, 0x93, 0xb0, 0xad, 0xcd, 0x61, 0x25, 0x2e, 0x8d, 0x59, 0xa7, 0xf0, 0xd8, + 0xc4, 0x9e, 0xa7, 0x4e, 0x92, 0x38, 0x69, 0x73, 0x5c, 0x65, 0xa9, 0x5e, 0x12, 0xeb, 0x7f, 0x0a, + 0x71, 0x8d, 0xa3, 0x69, 0x47, 0x5d, 0x28, 0x46, 0xe1, 0x7b, 0xab, 0xdd, 0xb5, 0x11, 0x1e, 0x42, + 0xdd, 0x90, 0x2d, 0xad, 0x0e, 0x40, 0x18, 0x36, 0x87, 0x88, 0x29, 0xda, 0x98, 0xcb, 0x38, 0xe3, + 0x25, 0x94, 0xe2, 0x70, 0x39, 0x26, 0xbb, 0x39, 0xe6, 0xd1, 0x52, 0xc9, 0x48, 0xf5, 0x5f, 0x44, + 0xc8, 0x47, 0x1b, 0x69, 0xe5, 0x99, 0x8b, 0x55, 0x3d, 0xd5, 0x80, 0x45, 0xc5, 0x50, 0xf5, 0x44, + 0x8e, 0x38, 0x44, 0x4c, 0x9d, 0x23, 0x4e, 0x79, 0x05, 0xf9, 0xe9, 0x3c, 0x65, 0xdf, 0x96, 0xa3, + 0x7d, 0x2a, 0x1d, 0x9d, 0xb3, 0x66, 0x95, 0xa5, 0xe9, 0x9c, 0x37, 0xed, 0x29, 0xec, 0x68, 0xb6, + 0x45, 0xb1, 0xb5, 0x9a, 0x85, 0x5c, 0xba, 0x59, 0x28, 0xaf, 0xf4, 0x7c, 0xfc, 0xff, 0x12, 0xa0, + 0x10, 0x2f, 0x61, 0x36, 0xb6, 0x6c, 0x01, 0x87, 0xe4, 0x94, 0xb3, 0xbf, 0xcd, 0x94, 0xdc, 0xcb, + 0x01, 0x3c, 0x36, 0xd5, 0x40, 0x51, 0x0d, 0xc3, 0xd6, 0x54, 0x8a, 0x75, 0xc5, 0xc4, 0x66, 0x9a, + 0xec, 0xed, 0x98, 0x6a, 0x70, 0xb0, 0x14, 0x9f, 0x60, 0x13, 0xbd, 0x01, 0x74, 0x1f, 0xa8, 0x13, + 0x6f, 0x9a, 0xa6, 0x4f, 0x2a, 0x49, 0x62, 0x97, 0x78, 0xd3, 0xfa, 0xcf, 0x02, 0x14, 0x13, 0xdf, + 0x0e, 0x56, 0x6b, 0xb6, 0x90, 0xc7, 0x2a, 0xd5, 0x2e, 0xd3, 0x6d, 0x64, 0xb6, 0xc8, 0x3b, 0xa1, + 0xec, 0xc1, 0x5a, 0x17, 0xff, 0xcb, 0x5a, 0xaf, 0x8f, 0xa0, 0x10, 0x7f, 0x47, 0xd0, 0x77, 0x50, + 0x62, 0x91, 0x9b, 0xd8, 0x54, 0x7c, 0xd6, 0xe8, 0x69, 0x1c, 0x2b, 0x9a, 0x6a, 0x70, 0x82, 0xcd, + 0x33, 0xa6, 0xeb, 0x7c, 0x75, 0xf3, 0xa1, 0x96, 0xb9, 0x59, 0xd4, 0x84, 0x77, 0x8b, 0x9a, 0xf0, + 0x7e, 0x51, 0x13, 0xfe, 0x58, 0xd4, 0x84, 0x1f, 0xef, 0x6a, 0x99, 0x77, 0x77, 0xb5, 0xcc, 0xfb, + 0xbb, 0x5a, 0xe6, 0x6d, 0x31, 0xf1, 0x37, 0xf5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xaa, 0xf0, + 0x57, 0x37, 0xb8, 0x0a, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index a02f9f920403..8b55abca2799 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -48,11 +48,12 @@ message ComponentID { optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; - // NodeID of the node this component is associated with. For cross-node + // SQLInstanceID of the node this component is associated with. For cross-node // streams, this is the *origin* node for the stream. - optional int32 node_id = 4 [(gogoproto.nullable) = false, - (gogoproto.customname) = "NodeID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + optional int32 sql_instance_id = 4 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SQLInstanceID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; } // ComponentStats contains statistics for an execution component. A component is diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 77d542825778..ff3ec8deeec4 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -600,14 +601,14 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) { for i := range d.Processors { p := &d.Processors[i] nodeID := d.nodeIDs[p.NodeIdx] - component := ProcessorComponentID(nodeID, d.flowID, p.processorID) + component := ProcessorComponentID(base.SQLInstanceID(nodeID), d.flowID, p.processorID) if compStats := statsMap[component]; compStats != nil { p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...) } } for i := range d.Edges { originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx] - component := StreamComponentID(originNodeID, d.flowID, d.Edges[i].streamID) + component := StreamComponentID(base.SQLInstanceID(originNodeID), d.flowID, d.Edges[i].streamID) if compStats := statsMap[component]; compStats != nil { d.Edges[i].Stats = compStats.StatsForQueryPlan() } diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 79e34ce05b9d..49c28ae7c807 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/roachpb", "//pkg/sql/execinfrapb", "//pkg/util/tracing/tracingpb", diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index b927c477c7d0..68a58fcc67ef 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -13,6 +13,7 @@ package execstats import ( "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -20,13 +21,13 @@ import ( ) type processorStats struct { - // TODO(radu): this field redundant with stats.Component.NodeID. + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. nodeID roachpb.NodeID stats *execinfrapb.ComponentStats } type streamStats struct { - // TODO(radu): this field redundant with stats.Component.NodeID. + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. originNodeID roachpb.NodeID destinationNodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -53,7 +54,7 @@ type FlowsMetadata struct { // flowStats maps a node ID to flow level stats extracted from a trace. Note // that the key is not a FlowID because the same FlowID is used across // nodes. - flowStats map[roachpb.NodeID]*flowStats + flowStats map[base.SQLInstanceID]*flowStats } // NewFlowsMetadata creates a FlowsMetadata for the given physical plan @@ -62,12 +63,12 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta a := &FlowsMetadata{ processorStats: make(map[execinfrapb.ProcessorID]*processorStats), streamStats: make(map[execinfrapb.StreamID]*streamStats), - flowStats: make(map[roachpb.NodeID]*flowStats), + flowStats: make(map[base.SQLInstanceID]*flowStats), } // Annotate the maps with physical plan information. for nodeID, flow := range flows { - a.flowStats[nodeID] = &flowStats{} + a.flowStats[base.SQLInstanceID(nodeID)] = &flowStats{} for _, proc := range flow.Processors { a.processorStats[execinfrapb.ProcessorID(proc.ProcessorID)] = &processorStats{nodeID: nodeID} for _, output := range proc.Output { @@ -89,13 +90,13 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta // TODO(asubiotto): Flatten this struct, we're currently allocating a map per // stat. type NodeLevelStats struct { - NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64 - MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 - KVBytesReadGroupedByNode map[roachpb.NodeID]int64 - KVRowsReadGroupedByNode map[roachpb.NodeID]int64 - KVTimeGroupedByNode map[roachpb.NodeID]time.Duration - NetworkMessagesGroupedByNode map[roachpb.NodeID]int64 - ContentionTimeGroupedByNode map[roachpb.NodeID]time.Duration + NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64 + MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64 + KVBytesReadGroupedByNode map[base.SQLInstanceID]int64 + KVRowsReadGroupedByNode map[base.SQLInstanceID]int64 + KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration + NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 + ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the @@ -164,12 +165,12 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist streamStats.stats = componentStats case execinfrapb.ComponentID_FLOW: - flowStats := a.flowStats[component.NodeID] + flowStats := a.flowStats[component.SQLInstanceID] if flowStats == nil { return errors.Errorf( "trace has span for flow %s on node %s but the flow does not exist in the physical plan", component.FlowID, - component.NodeID, + component.SQLInstanceID, ) } flowStats.stats = append(flowStats.stats, componentStats) @@ -186,13 +187,13 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist func (a *TraceAnalyzer) ProcessStats() error { // Process node level stats. a.nodeLevelStats = NodeLevelStats{ - NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64), - MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), - KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), - NetworkMessagesGroupedByNode: make(map[roachpb.NodeID]int64), - ContentionTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), + NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64), + MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64), + KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), + NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), + ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), } var errs error @@ -201,10 +202,11 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } - a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) - a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) - a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value() - a.nodeLevelStats.ContentionTimeGroupedByNode[stats.nodeID] += stats.stats.KV.ContentionTime.Value() + instanceID := base.SQLInstanceID(stats.nodeID) + a.nodeLevelStats.KVBytesReadGroupedByNode[instanceID] += int64(stats.stats.KV.BytesRead.Value()) + a.nodeLevelStats.KVRowsReadGroupedByNode[instanceID] += int64(stats.stats.KV.TuplesRead.Value()) + a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.stats.KV.KVTime.Value() + a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.stats.KV.ContentionTime.Value() } // Process streamStats. @@ -212,13 +214,14 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } + originInstanceID := base.SQLInstanceID(stats.originNodeID) // Set networkBytesSentGroupedByNode. bytes, err := getNetworkBytesFromComponentStats(stats.stats) if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating network bytes sent")) } else { - a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes + a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes } // The row execution flow attaches this stat to a stream stat with the @@ -230,8 +233,9 @@ func (a *TraceAnalyzer) ProcessStats() error { // removed, getting maxMemUsage from streamStats should be removed as // well. if stats.stats.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage + memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()) + if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage } } @@ -239,12 +243,12 @@ func (a *TraceAnalyzer) ProcessStats() error { if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages")) } else { - a.nodeLevelStats.NetworkMessagesGroupedByNode[stats.originNodeID] += numMessages + a.nodeLevelStats.NetworkMessagesGroupedByNode[originInstanceID] += numMessages } } // Process flowStats. - for nodeID, stats := range a.flowStats { + for instanceID, stats := range a.flowStats { if stats.stats == nil { continue } @@ -253,8 +257,8 @@ func (a *TraceAnalyzer) ProcessStats() error { // span, so we need to check flow stats for max memory usage. for _, v := range stats.stats { if v.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] = memUsage + if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] = memUsage } } } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 4480a1db1ee8..32898e11e866 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -194,8 +194,8 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} - n1 := roachpb.NodeID(1) - n2 := roachpb.NodeID(2) + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) a.AddComponentStats( &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( @@ -291,8 +291,8 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { // Artificially inject component stats directly into the FlowsMetadata (in // the non-testing setting the stats come from the trace). var f1, f2 execstats.FlowsMetadata - n1 := roachpb.NodeID(1) - n2 := roachpb.NodeID(2) + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) f1.AddComponentStats( &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index d25fd92c3870..c9488a054e5b 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -11,6 +11,7 @@ package execstats import ( + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" ) @@ -27,7 +28,7 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: processorStat := &processorStats{ - nodeID: stats.Component.NodeID, + nodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.processorStats == nil { @@ -36,7 +37,7 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ - originNodeID: stats.Component.NodeID, + originNodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.streamStats == nil { @@ -47,8 +48,8 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { flowStat := &flowStats{} flowStat.stats = append(flowStat.stats, stats) if m.flowStats == nil { - m.flowStats = make(map[roachpb.NodeID]*flowStats) + m.flowStats = make(map[base.SQLInstanceID]*flowStats) } - m.flowStats[stats.Component.NodeID] = flowStat + m.flowStats[stats.Component.SQLInstanceID] = flowStat } } diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 3eb52b00aae3..7e8a1903b209 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -435,7 +434,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { // information over. if sp := tracing.SpanFromContext(ctx); sp != nil { sp.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(roachpb.NodeID(f.NodeID.SQLInstanceID()), f.FlowCtx.ID), + Component: execinfrapb.FlowComponentID(f.NodeID.SQLInstanceID(), f.FlowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index d22e60901389..af62c46dc1d6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -497,7 +497,7 @@ func (m execNodeTraceMetadata) annotateExplain( var nodes util.FastIntSet for _, c := range components { if c.Type == execinfrapb.ComponentID_PROCESSOR { - nodes.Add(int(c.NodeID)) + nodes.Add(int(c.SQLInstanceID)) } stats := statsMap[c] if stats == nil {