diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 64fa1d7c8603..8ffd605db4c6 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/colflow", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/col/coldata", "//pkg/col/coldataext", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1c4a872af752..283df6e9cdc6 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -831,9 +832,12 @@ func (s *vectorizedFlowCreator) setupInput( s.addStreamEndpoint(inputStream.StreamID, inbox, s.waitGroup) op := colexecbase.Operator(inbox) if s.recordingStats { - op, err = s.wrapWithNetworkVectorizedStatsCollector( - inbox, flowCtx.StreamComponentID(inputStream.StreamID), latency, + // Note: we can't use flowCtx.StreamComponentID because the stream does + // not originate from this node (we are the target node). + compID := execinfrapb.StreamComponentID( + base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID, ) + op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency) if err != nil { return nil, nil, nil, err } @@ -958,7 +962,7 @@ func (s *vectorizedFlowCreator) setupOutput( // flow-level span. span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID) span.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID), + Component: execinfrapb.FlowComponentID(base.SQLInstanceID(outputStream.OriginNodeID), flowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/distsql/inbound_test.go b/pkg/sql/distsql/inbound_test.go index 412194b224e2..ace463bb8223 100644 --- a/pkg/sql/distsql/inbound_test.go +++ b/pkg/sql/distsql/inbound_test.go @@ -92,6 +92,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, staticAddressResolver(ln.Addr())), Stopper: outboxStopper, }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(1) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 926980f1e41c..b24db1eaa35e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -16,6 +16,7 @@ import ( "reflect" "sort" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" @@ -2742,10 +2743,11 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if planCtx.traceMetadata != nil { processors := make(execComponents, len(plan.ResultRouters)) - for i := range plan.ResultRouters { + for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( + base.SQLInstanceID(plan.Processors[resultProcIdx].Node), execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, - int32(plan.ResultRouters[i]), + int32(resultProcIdx), ) } planCtx.traceMetadata.associateNodeWithComponents(node, processors) diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 4a1eaebac855..cace95667d84 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -108,10 +108,11 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec { // ProcessorComponentID returns a ComponentID for the given processor in this // flow. func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID { - return execinfrapb.ProcessorComponentID(ctx.ID, procID) + return execinfrapb.ProcessorComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, procID) } // StreamComponentID returns a ComponentID for the given stream in this flow. +// The stream must originate from the node associated with this FlowCtx. func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { - return execinfrapb.StreamComponentID(ctx.ID, streamID) + return execinfrapb.StreamComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, streamID) } diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 4f7b554d5a5e..75f059bf5fdf 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -26,29 +26,35 @@ import ( ) // ProcessorComponentID returns a ComponentID for the given processor in a flow. -func ProcessorComponentID(flowID FlowID, processorID int32) ComponentID { +func ProcessorComponentID( + instanceID base.SQLInstanceID, flowID FlowID, processorID int32, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_PROCESSOR, - ID: processorID, + FlowID: flowID, + Type: ComponentID_PROCESSOR, + ID: processorID, + SQLInstanceID: instanceID, } } // StreamComponentID returns a ComponentID for the given stream in a flow. -func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID { +func StreamComponentID( + originInstanceID base.SQLInstanceID, flowID FlowID, streamID StreamID, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_STREAM, - ID: int32(streamID), + FlowID: flowID, + Type: ComponentID_STREAM, + ID: int32(streamID), + SQLInstanceID: originInstanceID, } } // FlowComponentID returns a ComponentID for the given flow. -func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID { +func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_FLOW, - NodeID: nodeID, + FlowID: flowID, + Type: ComponentID_FLOW, + SQLInstanceID: instanceID, } } diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index 5ae32c27e5b3..f8f28058d193 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -8,7 +8,7 @@ import fmt "fmt" import math "math" import optional "github.com/cockroachdb/cockroach/pkg/util/optional" -import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import github_com_cockroachdb_cockroach_pkg_base "github.com/cockroachdb/cockroach/pkg/base" import io "io" @@ -69,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0, 0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -81,18 +81,16 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. - NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` + // SQLInstanceID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. + SQLInstanceID github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID `protobuf:"varint,4,opt,name=sql_instance_id,json=sqlInstanceId,casttype=github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID" json:"sql_instance_id"` } func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -139,7 +137,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{1} + return fileDescriptor_component_stats_011c4899fda43780, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -176,7 +174,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{2} + return fileDescriptor_component_stats_011c4899fda43780, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -220,7 +218,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{3} + return fileDescriptor_component_stats_011c4899fda43780, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +256,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{4} + return fileDescriptor_component_stats_011c4899fda43780, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -299,7 +297,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{5} + return fileDescriptor_component_stats_011c4899fda43780, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -338,7 +336,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{6} + return fileDescriptor_component_stats_011c4899fda43780, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -375,7 +373,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{7} + return fileDescriptor_component_stats_011c4899fda43780, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -409,7 +407,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_1e4b47cb6b511195, []int{8} + return fileDescriptor_component_stats_011c4899fda43780, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -477,7 +475,7 @@ func (m *ComponentID) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintComponentStats(dAtA, i, uint64(m.ID)) dAtA[i] = 0x20 i++ - i = encodeVarintComponentStats(dAtA, i, uint64(m.NodeID)) + i = encodeVarintComponentStats(dAtA, i, uint64(m.SQLInstanceID)) return i, nil } @@ -880,7 +878,7 @@ func (m *ComponentID) Size() (n int) { n += 1 + l + sovComponentStats(uint64(l)) n += 1 + sovComponentStats(uint64(m.Type)) n += 1 + sovComponentStats(uint64(m.ID)) - n += 1 + sovComponentStats(uint64(m.NodeID)) + n += 1 + sovComponentStats(uint64(m.SQLInstanceID)) return n } @@ -1130,9 +1128,9 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SQLInstanceID", wireType) } - m.NodeID = 0 + m.SQLInstanceID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowComponentStats @@ -1142,7 +1140,7 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + m.SQLInstanceID |= (github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID(b) & 0x7F) << shift if b < 0x80 { break } @@ -2545,70 +2543,71 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_1e4b47cb6b511195) -} - -var fileDescriptor_component_stats_1e4b47cb6b511195 = []byte{ - // 970 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, - 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, - 0xa1, 0x4a, 0x05, 0x95, 0x4a, 0xbd, 0xe5, 0x60, 0x0c, 0x69, 0x88, 0x63, 0xe3, 0x2e, 0xd8, 0x95, - 0x72, 0x28, 0x5a, 0x76, 0xc7, 0x78, 0xb5, 0x2f, 0xb3, 0xde, 0x9d, 0xc5, 0xeb, 0x7e, 0x8a, 0x9e, - 0xda, 0xcf, 0xd1, 0x6b, 0x3e, 0x81, 0x8f, 0x39, 0x46, 0x3d, 0x58, 0x0d, 0xee, 0x57, 0xe8, 0xa5, - 0xa7, 0x6a, 0x66, 0x5f, 0xd8, 0xb8, 0x42, 0x61, 0xab, 0xde, 0x46, 0xcb, 0xf3, 0xff, 0xcd, 0xf3, - 0x3e, 0xc0, 0x53, 0xef, 0xd2, 0xec, 0xe0, 0x00, 0xab, 0xba, 0x7d, 0xee, 0x2a, 0xce, 0xac, 0xa3, - 0x12, 0xcb, 0x21, 0x36, 0xb6, 0xe9, 0xd4, 0xa3, 0x0a, 0xf5, 0xda, 0x8e, 0x4b, 0x28, 0x41, 0x75, - 0x95, 0xa8, 0x86, 0x4b, 0x14, 0xf5, 0xa2, 0xed, 0x5d, 0x9a, 0x6d, 0x4d, 0xf7, 0xa8, 0x77, 0x69, - 0xba, 0xbe, 0xbd, 0xf3, 0x78, 0x4e, 0xe6, 0x84, 0x1b, 0x75, 0xd8, 0x29, 0xb4, 0xdf, 0xf9, 0xd4, - 0xa7, 0xba, 0xd9, 0x21, 0x0e, 0xd5, 0x89, 0xad, 0xac, 0x0e, 0xe1, 0xaf, 0xcd, 0x37, 0x22, 0x54, - 0x0e, 0xe2, 0x7b, 0x86, 0x7d, 0xf4, 0x35, 0x94, 0xce, 0x4d, 0x72, 0x35, 0xd5, 0xb5, 0xba, 0xb0, - 0x27, 0xb4, 0x1e, 0xf4, 0xea, 0x37, 0xb7, 0xbb, 0xb9, 0xdf, 0x6f, 0x77, 0xa5, 0xe7, 0x26, 0xb9, - 0x1a, 0xf6, 0x97, 0xc9, 0x49, 0x96, 0x98, 0xe1, 0x50, 0x43, 0x7d, 0x28, 0xd0, 0x6b, 0x07, 0xd7, - 0xc5, 0x3d, 0xa1, 0x55, 0xed, 0x7e, 0xd9, 0x5e, 0xe7, 0x5f, 0x3b, 0x75, 0x4f, 0x7b, 0x72, 0xed, - 0xe0, 0x5e, 0x81, 0xb1, 0x65, 0xae, 0x46, 0x3b, 0x20, 0xea, 0x5a, 0x3d, 0xbf, 0x27, 0xb4, 0x8a, - 0x3d, 0x60, 0xdf, 0x97, 0xb7, 0xbb, 0xe2, 0xb0, 0x2f, 0x8b, 0xba, 0x86, 0x7e, 0x84, 0x92, 0x4d, - 0x34, 0xcc, 0x9c, 0x2a, 0x70, 0x83, 0x41, 0x64, 0x20, 0x1d, 0x13, 0x0d, 0x0f, 0xfb, 0x7f, 0xdf, - 0xee, 0x7e, 0x33, 0xd7, 0xe9, 0x85, 0x3f, 0x6b, 0xab, 0xc4, 0xea, 0x24, 0x0e, 0x68, 0xb3, 0xd5, - 0xb9, 0xe3, 0x18, 0xf3, 0x0e, 0x3f, 0x39, 0xb3, 0x76, 0x28, 0x93, 0x25, 0x46, 0x1d, 0x6a, 0xcd, - 0x6f, 0xa1, 0xc0, 0xfc, 0x41, 0x65, 0x28, 0x9e, 0x1e, 0x8f, 0x07, 0x93, 0x5a, 0x0e, 0x6d, 0x43, - 0xf9, 0x44, 0x1e, 0x1d, 0x0c, 0xc6, 0xe3, 0x91, 0x5c, 0x13, 0x10, 0x80, 0x34, 0x9e, 0xc8, 0x83, - 0xfd, 0xa3, 0x9a, 0x88, 0xb6, 0xa0, 0xf0, 0xfc, 0xd5, 0xe8, 0x87, 0x5a, 0xbe, 0xf9, 0xa6, 0x00, - 0xd5, 0x24, 0xa8, 0x31, 0xab, 0x11, 0x1a, 0x42, 0x39, 0x29, 0x1b, 0xcf, 0x60, 0xa5, 0xfb, 0x74, - 0xa3, 0x8c, 0x44, 0xc9, 0x58, 0xa9, 0xd1, 0x00, 0x24, 0x1b, 0xd3, 0xa9, 0x1b, 0xf0, 0xcc, 0x56, - 0xba, 0xad, 0xf5, 0x9c, 0x63, 0x4c, 0xaf, 0x88, 0x6b, 0xc8, 0x01, 0x77, 0x22, 0x42, 0x15, 0x6d, - 0x4c, 0xe5, 0x20, 0xc6, 0xd0, 0x80, 0x27, 0x77, 0x13, 0xcc, 0xe4, 0x5f, 0x98, 0x49, 0x80, 0x9e, - 0x81, 0x68, 0x2c, 0x78, 0xfa, 0x2b, 0xdd, 0x27, 0xeb, 0x11, 0x87, 0x67, 0xa1, 0x36, 0x29, 0xe1, - 0xe1, 0x99, 0x2c, 0x1a, 0x0b, 0xf4, 0x0c, 0x0a, 0xac, 0xb5, 0xeb, 0x45, 0x0e, 0xf8, 0x7c, 0x3d, - 0x60, 0x10, 0x60, 0x35, 0x7d, 0x3d, 0x97, 0xa1, 0x03, 0x90, 0x88, 0x4f, 0x1d, 0x9f, 0xd6, 0xa5, - 0x8f, 0xe5, 0x74, 0xc4, 0xed, 0xd2, 0x88, 0x48, 0x8a, 0x7a, 0x20, 0xe9, 0xb6, 0xe3, 0x53, 0xaf, - 0x5e, 0xda, 0xcb, 0xb7, 0x2a, 0xdd, 0x2f, 0xd6, 0x43, 0x86, 0xf6, 0x7d, 0x46, 0xa8, 0x44, 0x2f, - 0x00, 0xf8, 0x7c, 0xf0, 0x89, 0xac, 0x6f, 0x7d, 0x2c, 0x1a, 0x36, 0x2a, 0x69, 0x4c, 0xf9, 0x3c, - 0xfe, 0xd0, 0xfc, 0x45, 0x00, 0x58, 0x5d, 0x83, 0x7a, 0x00, 0xb6, 0x6f, 0x4d, 0xa9, 0xef, 0x98, - 0xd8, 0x8b, 0x3a, 0xe7, 0xb3, 0x14, 0x98, 0x4d, 0x71, 0x3b, 0x19, 0xde, 0x53, 0xdd, 0xa6, 0x31, - 0xd2, 0xf6, 0xad, 0x09, 0x57, 0xa1, 0x3e, 0x94, 0xaf, 0x14, 0x9d, 0x4e, 0xa9, 0x6e, 0xe1, 0xa8, - 0x69, 0x9e, 0xac, 0x45, 0xf4, 0x7d, 0x57, 0x61, 0xc7, 0x08, 0xb3, 0xc5, 0x94, 0x13, 0xdd, 0xc2, - 0xcd, 0xf7, 0x79, 0xa8, 0x7e, 0xd8, 0x50, 0x68, 0x1f, 0x4a, 0xa6, 0x42, 0xb1, 0xad, 0x5e, 0x47, - 0x9e, 0x6d, 0x8c, 0x8d, 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x35, 0x3c, 0xd6, 0xb0, 0x87, 0x5d, 0x5d, - 0x31, 0xf5, 0x9f, 0xb8, 0x49, 0x08, 0xcc, 0x67, 0x03, 0x7e, 0x72, 0x0f, 0xc2, 0xd9, 0xaf, 0xe0, - 0x61, 0x98, 0xfd, 0xa9, 0x8b, 0x55, 0xac, 0x2f, 0xb0, 0x16, 0xb5, 0xfb, 0x46, 0x65, 0xa8, 0x86, - 0x5a, 0x39, 0x92, 0xa2, 0x97, 0x50, 0x9d, 0x5d, 0xd3, 0x34, 0xac, 0xb8, 0x39, 0x6c, 0x9b, 0x4b, - 0x13, 0xd6, 0x09, 0x3c, 0xb2, 0xb0, 0xe7, 0x29, 0xf3, 0x34, 0x4e, 0xda, 0x1c, 0x57, 0x8b, 0xd5, - 0x31, 0xb1, 0xf9, 0xa7, 0x90, 0xd4, 0x38, 0x9a, 0x76, 0xd4, 0x87, 0x4a, 0x14, 0xbe, 0xb7, 0xda, - 0x5d, 0x1b, 0xe1, 0x21, 0xd4, 0x8d, 0xd9, 0xd2, 0xea, 0x01, 0x84, 0x61, 0x73, 0x88, 0x98, 0xa1, - 0x8d, 0xb9, 0x8c, 0x33, 0x5e, 0xc0, 0x76, 0x12, 0x2e, 0xc7, 0xe4, 0x37, 0xc7, 0x3c, 0x88, 0x95, - 0x8c, 0xd4, 0xfc, 0x4d, 0x84, 0x52, 0xb4, 0x91, 0x56, 0x9e, 0xb9, 0x58, 0xd1, 0x32, 0x0d, 0x58, - 0x54, 0x0c, 0x45, 0x4b, 0xe5, 0x88, 0x43, 0xc4, 0xcc, 0x39, 0xe2, 0x94, 0x97, 0x50, 0x32, 0x16, - 0x19, 0xfb, 0xb6, 0x1a, 0xbf, 0x78, 0x87, 0x67, 0xac, 0x59, 0x65, 0xc9, 0x58, 0xf0, 0xa6, 0x3d, - 0x81, 0x87, 0x2a, 0xb1, 0x29, 0xb6, 0x57, 0xb3, 0x50, 0xc8, 0x36, 0x0b, 0xd5, 0x95, 0x9e, 0x8f, - 0xff, 0x5f, 0x02, 0x94, 0x93, 0x25, 0xcc, 0xc6, 0x96, 0x2d, 0xe0, 0x90, 0x9c, 0x71, 0xf6, 0xb7, - 0x98, 0x92, 0x7b, 0x39, 0x82, 0x47, 0x96, 0x12, 0x4c, 0x15, 0xd3, 0x24, 0xaa, 0x42, 0xb1, 0x36, - 0xb5, 0xb0, 0x95, 0x25, 0x7b, 0x0f, 0x2d, 0x25, 0xd8, 0x8f, 0xc5, 0x47, 0xd8, 0x42, 0xdf, 0x03, - 0xfa, 0x10, 0xa8, 0xe9, 0x9e, 0x91, 0xa5, 0x4f, 0x6a, 0x69, 0x62, 0x5f, 0xf7, 0x8c, 0xe6, 0xaf, - 0x02, 0x54, 0x52, 0x6f, 0x07, 0xab, 0x35, 0x5b, 0xc8, 0x33, 0x85, 0xaa, 0x17, 0xd9, 0x36, 0x32, - 0x5b, 0xe4, 0xbd, 0x50, 0x76, 0x6f, 0xad, 0x8b, 0xff, 0x65, 0xad, 0x37, 0x27, 0x50, 0x4e, 0xde, - 0x11, 0xf4, 0x1d, 0x6c, 0xb3, 0xc8, 0x2d, 0x6c, 0x4d, 0x7d, 0xd6, 0xe8, 0x59, 0x1c, 0xab, 0x58, - 0x4a, 0x70, 0x84, 0xad, 0x53, 0xa6, 0xeb, 0x7d, 0x75, 0xf3, 0xbe, 0x91, 0xbb, 0x59, 0x36, 0x84, - 0xb7, 0xcb, 0x86, 0xf0, 0x6e, 0xd9, 0x10, 0xfe, 0x58, 0x36, 0x84, 0x9f, 0xef, 0x1a, 0xb9, 0xb7, - 0x77, 0x8d, 0xdc, 0xbb, 0xbb, 0x46, 0xee, 0x75, 0x25, 0xf5, 0x67, 0xf4, 0x9f, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xce, 0xc1, 0x00, 0xbc, 0x9e, 0x0a, 0x00, 0x00, + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_011c4899fda43780) +} + +var fileDescriptor_component_stats_011c4899fda43780 = []byte{ + // 982 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0xe2, 0x46, + 0x18, 0xc7, 0xb1, 0x21, 0x26, 0x3c, 0x2c, 0x84, 0x9d, 0xee, 0x01, 0x45, 0x2d, 0xc9, 0xd2, 0xae, + 0x84, 0x2a, 0x15, 0x54, 0x0e, 0xed, 0x69, 0x0f, 0x21, 0xb0, 0x5d, 0x36, 0xc9, 0x92, 0x35, 0x24, + 0x95, 0xf6, 0x62, 0x19, 0x7b, 0x42, 0x46, 0xf8, 0x0d, 0x7b, 0x4c, 0x9c, 0x7e, 0x8a, 0x9e, 0xda, + 0xcf, 0xd1, 0x6b, 0x3f, 0x41, 0x8e, 0x7b, 0xe8, 0x61, 0xd5, 0x43, 0xd4, 0x25, 0xfd, 0x0a, 0xbd, + 0xf4, 0x54, 0xcd, 0xd8, 0x18, 0x27, 0x15, 0x5a, 0x5c, 0xf5, 0x36, 0xd8, 0xcf, 0xff, 0x37, 0xcf, + 0xbb, 0x81, 0x67, 0xde, 0xcc, 0x68, 0xe1, 0x00, 0x6b, 0xc4, 0xba, 0x70, 0x55, 0x67, 0xdc, 0xd2, + 0x6c, 0xd3, 0xb1, 0x2d, 0x6c, 0x51, 0xc5, 0xa3, 0x2a, 0xf5, 0x9a, 0x8e, 0x6b, 0x53, 0x1b, 0x55, + 0x35, 0x5b, 0x9b, 0xba, 0xb6, 0xaa, 0x5d, 0x36, 0xbd, 0x99, 0xd1, 0xd4, 0x89, 0x47, 0xbd, 0x99, + 0xe1, 0xfa, 0xd6, 0xee, 0x93, 0x89, 0x3d, 0xb1, 0xb9, 0x51, 0x8b, 0x9d, 0x42, 0xfb, 0xdd, 0x4f, + 0x7d, 0x4a, 0x8c, 0x96, 0xed, 0x50, 0x62, 0x5b, 0xea, 0xea, 0x10, 0xbe, 0xad, 0xff, 0x26, 0x42, + 0xf1, 0x70, 0x79, 0x4f, 0xbf, 0x8b, 0xbe, 0x86, 0xfc, 0x85, 0x61, 0x5f, 0x29, 0x44, 0xaf, 0x0a, + 0xfb, 0x42, 0xe3, 0x51, 0xa7, 0x7a, 0x73, 0xbb, 0x97, 0xf9, 0xfd, 0x76, 0x4f, 0x7a, 0x61, 0xd8, + 0x57, 0xfd, 0xee, 0x22, 0x3e, 0xc9, 0x12, 0x33, 0xec, 0xeb, 0xa8, 0x0b, 0x39, 0x7a, 0xed, 0xe0, + 0xaa, 0xb8, 0x2f, 0x34, 0xca, 0xed, 0x2f, 0x9b, 0xeb, 0xfc, 0x6b, 0x26, 0xee, 0x69, 0x8e, 0xae, + 0x1d, 0xdc, 0xc9, 0x31, 0xb6, 0xcc, 0xd5, 0x68, 0x17, 0x44, 0xa2, 0x57, 0xb3, 0xfb, 0x42, 0x63, + 0xab, 0x03, 0xec, 0xf9, 0xe2, 0x76, 0x4f, 0xec, 0x77, 0x65, 0x91, 0xe8, 0x28, 0x80, 0x1d, 0x6f, + 0x66, 0x28, 0xc4, 0xf2, 0xa8, 0x6a, 0x69, 0x98, 0x39, 0x97, 0xe3, 0x86, 0xa7, 0x91, 0x61, 0x69, + 0xf8, 0xe6, 0xb8, 0x1f, 0xbd, 0xed, 0x77, 0xff, 0xbe, 0xdd, 0xfb, 0x76, 0x42, 0xe8, 0xa5, 0x3f, + 0x6e, 0x6a, 0xb6, 0xd9, 0x8a, 0xfd, 0xd1, 0xc7, 0xab, 0x73, 0xcb, 0x99, 0x4e, 0x5a, 0x63, 0xd5, + 0xc3, 0xcd, 0x7b, 0x52, 0xb9, 0xe4, 0xcd, 0x8c, 0xf8, 0xa7, 0x5e, 0xff, 0x06, 0x72, 0xcc, 0x53, + 0x54, 0x80, 0xad, 0xb3, 0xd7, 0xc3, 0xde, 0xa8, 0x92, 0x41, 0x25, 0x28, 0x9c, 0xca, 0x83, 0xc3, + 0xde, 0x70, 0x38, 0x90, 0x2b, 0x02, 0x02, 0x90, 0x86, 0x23, 0xb9, 0x77, 0x70, 0x52, 0x11, 0xd1, + 0x36, 0xe4, 0x5e, 0x1c, 0x0f, 0xbe, 0xaf, 0x64, 0xeb, 0xbf, 0xe6, 0xa0, 0x1c, 0x87, 0x3b, 0x64, + 0xd5, 0x43, 0x7d, 0x28, 0xc4, 0x05, 0xe5, 0xb9, 0x2d, 0xb6, 0x9f, 0x6d, 0x94, 0xab, 0x28, 0x4d, + 0x2b, 0x35, 0xea, 0x81, 0x64, 0x61, 0xaa, 0xb8, 0x01, 0xcf, 0x79, 0xb1, 0xdd, 0x58, 0xcf, 0x79, + 0x8d, 0xe9, 0x95, 0xed, 0x4e, 0xe5, 0x80, 0x3b, 0x11, 0xa1, 0xb6, 0x2c, 0x4c, 0xe5, 0x60, 0x89, + 0xa1, 0x01, 0x4f, 0xfb, 0x26, 0x98, 0xd1, 0xbf, 0x30, 0xa3, 0x00, 0x3d, 0x07, 0x71, 0x3a, 0xe7, + 0x05, 0x29, 0xb6, 0x9f, 0xae, 0x47, 0x1c, 0x9d, 0x87, 0xda, 0xb8, 0xb8, 0x47, 0xe7, 0xb2, 0x38, + 0x9d, 0xa3, 0xe7, 0x90, 0x63, 0x4d, 0x5f, 0xdd, 0xe2, 0x80, 0xcf, 0xd7, 0x03, 0x7a, 0x01, 0xd6, + 0x92, 0xd7, 0x73, 0x19, 0x3a, 0x04, 0xc9, 0xf6, 0xa9, 0xe3, 0xd3, 0xaa, 0xf4, 0xb1, 0x9c, 0x0e, + 0xb8, 0x5d, 0x12, 0x11, 0x49, 0x51, 0x07, 0x24, 0x62, 0x39, 0x3e, 0xf5, 0xaa, 0xf9, 0xfd, 0x6c, + 0xa3, 0xd8, 0xfe, 0x62, 0x3d, 0xa4, 0x6f, 0x3d, 0x64, 0x84, 0x4a, 0xf4, 0x12, 0x80, 0x4f, 0x0e, + 0x9f, 0xd5, 0xea, 0xf6, 0xc7, 0xa2, 0x61, 0x43, 0x94, 0xc4, 0x14, 0x2e, 0x96, 0x0f, 0xea, 0x3f, + 0x09, 0x00, 0xab, 0x6b, 0x50, 0x07, 0xc0, 0xf2, 0x4d, 0x85, 0xfa, 0x8e, 0x81, 0xbd, 0xa8, 0x73, + 0x3e, 0x4b, 0x80, 0xd9, 0x7c, 0x37, 0xe3, 0xb1, 0x3e, 0x23, 0x16, 0x5d, 0x22, 0x2d, 0xdf, 0x1c, + 0x71, 0x15, 0xea, 0x42, 0xe1, 0x4a, 0x25, 0x54, 0xa1, 0xc4, 0xc4, 0x51, 0xd3, 0x3c, 0x5d, 0x8b, + 0xe8, 0xfa, 0xae, 0xca, 0x8e, 0x11, 0x66, 0x9b, 0x29, 0x47, 0xc4, 0xc4, 0xf5, 0x0f, 0x59, 0x28, + 0xdf, 0x6f, 0x28, 0x74, 0x00, 0x79, 0x43, 0xa5, 0xd8, 0xd2, 0xae, 0x23, 0xcf, 0x36, 0xc6, 0x2e, + 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x2d, 0x3c, 0xd1, 0xb1, 0x87, 0x5d, 0xa2, 0x1a, 0xe4, 0x07, 0x6e, + 0x12, 0x02, 0xb3, 0xe9, 0x80, 0x9f, 0x3c, 0x80, 0x70, 0xf6, 0x31, 0xec, 0x84, 0xd9, 0x57, 0x5c, + 0xac, 0x61, 0x32, 0xc7, 0x7a, 0xd4, 0xee, 0x1b, 0x95, 0xa1, 0x1c, 0x6a, 0xe5, 0x48, 0x8a, 0x5e, + 0x41, 0x79, 0x7c, 0x4d, 0x93, 0xb0, 0xad, 0xcd, 0x61, 0x25, 0x2e, 0x8d, 0x59, 0xa7, 0xf0, 0xd8, + 0xc4, 0x9e, 0xa7, 0x4e, 0x92, 0x38, 0x69, 0x73, 0x5c, 0x65, 0xa9, 0x5e, 0x12, 0xeb, 0x7f, 0x0a, + 0x71, 0x8d, 0xa3, 0x69, 0x47, 0x5d, 0x28, 0x46, 0xe1, 0x7b, 0xab, 0xdd, 0xb5, 0x11, 0x1e, 0x42, + 0xdd, 0x90, 0x2d, 0xad, 0x0e, 0x40, 0x18, 0x36, 0x87, 0x88, 0x29, 0xda, 0x98, 0xcb, 0x38, 0xe3, + 0x25, 0x94, 0xe2, 0x70, 0x39, 0x26, 0xbb, 0x39, 0xe6, 0xd1, 0x52, 0xc9, 0x48, 0xf5, 0x5f, 0x44, + 0xc8, 0x47, 0x1b, 0x69, 0xe5, 0x99, 0x8b, 0x55, 0x3d, 0xd5, 0x80, 0x45, 0xc5, 0x50, 0xf5, 0x44, + 0x8e, 0x38, 0x44, 0x4c, 0x9d, 0x23, 0x4e, 0x79, 0x05, 0xf9, 0xe9, 0x3c, 0x65, 0xdf, 0x96, 0xa3, + 0x7d, 0x2a, 0x1d, 0x9d, 0xb3, 0x66, 0x95, 0xa5, 0xe9, 0x9c, 0x37, 0xed, 0x29, 0xec, 0x68, 0xb6, + 0x45, 0xb1, 0xb5, 0x9a, 0x85, 0x5c, 0xba, 0x59, 0x28, 0xaf, 0xf4, 0x7c, 0xfc, 0xff, 0x12, 0xa0, + 0x10, 0x2f, 0x61, 0x36, 0xb6, 0x6c, 0x01, 0x87, 0xe4, 0x94, 0xb3, 0xbf, 0xcd, 0x94, 0xdc, 0xcb, + 0x01, 0x3c, 0x36, 0xd5, 0x40, 0x51, 0x0d, 0xc3, 0xd6, 0x54, 0x8a, 0x75, 0xc5, 0xc4, 0x66, 0x9a, + 0xec, 0xed, 0x98, 0x6a, 0x70, 0xb0, 0x14, 0x9f, 0x60, 0x13, 0xbd, 0x01, 0x74, 0x1f, 0xa8, 0x13, + 0x6f, 0x9a, 0xa6, 0x4f, 0x2a, 0x49, 0x62, 0x97, 0x78, 0xd3, 0xfa, 0xcf, 0x02, 0x14, 0x13, 0xdf, + 0x0e, 0x56, 0x6b, 0xb6, 0x90, 0xc7, 0x2a, 0xd5, 0x2e, 0xd3, 0x6d, 0x64, 0xb6, 0xc8, 0x3b, 0xa1, + 0xec, 0xc1, 0x5a, 0x17, 0xff, 0xcb, 0x5a, 0xaf, 0x8f, 0xa0, 0x10, 0x7f, 0x47, 0xd0, 0x77, 0x50, + 0x62, 0x91, 0x9b, 0xd8, 0x54, 0x7c, 0xd6, 0xe8, 0x69, 0x1c, 0x2b, 0x9a, 0x6a, 0x70, 0x82, 0xcd, + 0x33, 0xa6, 0xeb, 0x7c, 0x75, 0xf3, 0xa1, 0x96, 0xb9, 0x59, 0xd4, 0x84, 0x77, 0x8b, 0x9a, 0xf0, + 0x7e, 0x51, 0x13, 0xfe, 0x58, 0xd4, 0x84, 0x1f, 0xef, 0x6a, 0x99, 0x77, 0x77, 0xb5, 0xcc, 0xfb, + 0xbb, 0x5a, 0xe6, 0x6d, 0x31, 0xf1, 0x37, 0xf5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xaa, 0xf0, + 0x57, 0x37, 0xb8, 0x0a, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index de63f1f2d7dd..8b55abca2799 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -48,13 +48,12 @@ message ComponentID { optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; - // NodeID of the node this component ran on. - // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe - // a flow (since flows on different nodes still have the same FlowID). Use - // this for processors/streams as well. - optional int32 node_id = 4 [(gogoproto.nullable) = false, - (gogoproto.customname) = "NodeID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // SQLInstanceID of the node this component is associated with. For cross-node + // streams, this is the *origin* node for the stream. + optional int32 sql_instance_id = 4 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SQLInstanceID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; } // ComponentStats contains statistics for an execution component. A component is diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index c6a2b1026575..ff3ec8deeec4 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -578,8 +579,9 @@ type diagramData struct { Processors []diagramProcessor `json:"processors"` Edges []diagramEdge `json:"edges"` - flags DiagramFlags - flowID FlowID + flags DiagramFlags + flowID FlowID + nodeIDs []roachpb.NodeID } var _ FlowDiagram = &diagramData{} @@ -598,27 +600,36 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) { statsMap := ExtractStatsFromSpans(spans, d.flags.MakeDeterministic) for i := range d.Processors { p := &d.Processors[i] - component := ProcessorComponentID(d.flowID, p.processorID) + nodeID := d.nodeIDs[p.NodeIdx] + component := ProcessorComponentID(base.SQLInstanceID(nodeID), d.flowID, p.processorID) if compStats := statsMap[component]; compStats != nil { p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...) } } for i := range d.Edges { - component := StreamComponentID(d.flowID, d.Edges[i].streamID) + originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx] + component := StreamComponentID(base.SQLInstanceID(originNodeID), d.flowID, d.Edges[i].streamID) if compStats := statsMap[component]; compStats != nil { d.Edges[i].Stats = compStats.StatsForQueryPlan() } } } +// generateDiagramData generates the diagram data, given a list of flows (one +// per node). The nodeIDs list corresponds 1-1 to the flows list. func generateDiagramData( - sql string, flows []FlowSpec, nodeNames []string, flags DiagramFlags, + sql string, flows []FlowSpec, nodeIDs []roachpb.NodeID, flags DiagramFlags, ) (FlowDiagram, error) { d := &diagramData{ - SQL: sql, - NodeNames: nodeNames, - flags: flags, + SQL: sql, + nodeIDs: nodeIDs, + flags: flags, } + d.NodeNames = make([]string, len(nodeIDs)) + for i := range d.NodeNames { + d.NodeNames[i] = nodeIDs[i].String() + } + if len(flows) > 0 { d.flowID = flows[0].FlowID for i := 1; i < len(flows); i++ { @@ -749,21 +760,20 @@ func GeneratePlanDiagram( ) (FlowDiagram, error) { // We sort the flows by node because we want the diagram data to be // deterministic. - nodeIDs := make([]int, 0, len(flows)) + nodeIDs := make([]roachpb.NodeID, 0, len(flows)) for n := range flows { - nodeIDs = append(nodeIDs, int(n)) + nodeIDs = append(nodeIDs, n) } - sort.Ints(nodeIDs) + sort.Slice(nodeIDs, func(i, j int) bool { + return nodeIDs[i] < nodeIDs[j] + }) flowSlice := make([]FlowSpec, len(nodeIDs)) - nodeNames := make([]string, len(nodeIDs)) - for i, nVal := range nodeIDs { - n := roachpb.NodeID(nVal) + for i, n := range nodeIDs { flowSlice[i] = *flows[n] - nodeNames[i] = n.String() } - return generateDiagramData(sql, flowSlice, nodeNames, flags) + return generateDiagramData(sql, flowSlice, nodeIDs, flags) } // GeneratePlanDiagramURL generates the json data for a flow diagram and a diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 79e34ce05b9d..49c28ae7c807 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/roachpb", "//pkg/sql/execinfrapb", "//pkg/util/tracing/tracingpb", diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 232415de34d5..68a58fcc67ef 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -13,6 +13,7 @@ package execstats import ( "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -20,11 +21,13 @@ import ( ) type processorStats struct { + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. nodeID roachpb.NodeID stats *execinfrapb.ComponentStats } type streamStats struct { + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. originNodeID roachpb.NodeID destinationNodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -51,7 +54,7 @@ type FlowsMetadata struct { // flowStats maps a node ID to flow level stats extracted from a trace. Note // that the key is not a FlowID because the same FlowID is used across // nodes. - flowStats map[roachpb.NodeID]*flowStats + flowStats map[base.SQLInstanceID]*flowStats } // NewFlowsMetadata creates a FlowsMetadata for the given physical plan @@ -60,12 +63,12 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta a := &FlowsMetadata{ processorStats: make(map[execinfrapb.ProcessorID]*processorStats), streamStats: make(map[execinfrapb.StreamID]*streamStats), - flowStats: make(map[roachpb.NodeID]*flowStats), + flowStats: make(map[base.SQLInstanceID]*flowStats), } // Annotate the maps with physical plan information. for nodeID, flow := range flows { - a.flowStats[nodeID] = &flowStats{} + a.flowStats[base.SQLInstanceID(nodeID)] = &flowStats{} for _, proc := range flow.Processors { a.processorStats[execinfrapb.ProcessorID(proc.ProcessorID)] = &processorStats{nodeID: nodeID} for _, output := range proc.Output { @@ -87,13 +90,13 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta // TODO(asubiotto): Flatten this struct, we're currently allocating a map per // stat. type NodeLevelStats struct { - NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64 - MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 - KVBytesReadGroupedByNode map[roachpb.NodeID]int64 - KVRowsReadGroupedByNode map[roachpb.NodeID]int64 - KVTimeGroupedByNode map[roachpb.NodeID]time.Duration - NetworkMessagesGroupedByNode map[roachpb.NodeID]int64 - ContentionTimeGroupedByNode map[roachpb.NodeID]time.Duration + NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64 + MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64 + KVBytesReadGroupedByNode map[base.SQLInstanceID]int64 + KVRowsReadGroupedByNode map[base.SQLInstanceID]int64 + KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration + NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 + ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the @@ -162,12 +165,12 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist streamStats.stats = componentStats case execinfrapb.ComponentID_FLOW: - flowStats := a.flowStats[component.NodeID] + flowStats := a.flowStats[component.SQLInstanceID] if flowStats == nil { return errors.Errorf( "trace has span for flow %s on node %s but the flow does not exist in the physical plan", component.FlowID, - component.NodeID, + component.SQLInstanceID, ) } flowStats.stats = append(flowStats.stats, componentStats) @@ -184,13 +187,13 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist func (a *TraceAnalyzer) ProcessStats() error { // Process node level stats. a.nodeLevelStats = NodeLevelStats{ - NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64), - MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), - KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), - NetworkMessagesGroupedByNode: make(map[roachpb.NodeID]int64), - ContentionTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), + NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64), + MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64), + KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), + NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), + ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), } var errs error @@ -199,10 +202,11 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } - a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) - a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) - a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value() - a.nodeLevelStats.ContentionTimeGroupedByNode[stats.nodeID] += stats.stats.KV.ContentionTime.Value() + instanceID := base.SQLInstanceID(stats.nodeID) + a.nodeLevelStats.KVBytesReadGroupedByNode[instanceID] += int64(stats.stats.KV.BytesRead.Value()) + a.nodeLevelStats.KVRowsReadGroupedByNode[instanceID] += int64(stats.stats.KV.TuplesRead.Value()) + a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.stats.KV.KVTime.Value() + a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.stats.KV.ContentionTime.Value() } // Process streamStats. @@ -210,13 +214,14 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } + originInstanceID := base.SQLInstanceID(stats.originNodeID) // Set networkBytesSentGroupedByNode. bytes, err := getNetworkBytesFromComponentStats(stats.stats) if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating network bytes sent")) } else { - a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes + a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes } // The row execution flow attaches this stat to a stream stat with the @@ -228,8 +233,9 @@ func (a *TraceAnalyzer) ProcessStats() error { // removed, getting maxMemUsage from streamStats should be removed as // well. if stats.stats.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage + memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()) + if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage } } @@ -237,12 +243,12 @@ func (a *TraceAnalyzer) ProcessStats() error { if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages")) } else { - a.nodeLevelStats.NetworkMessagesGroupedByNode[stats.originNodeID] += numMessages + a.nodeLevelStats.NetworkMessagesGroupedByNode[originInstanceID] += numMessages } } // Process flowStats. - for nodeID, stats := range a.flowStats { + for instanceID, stats := range a.flowStats { if stats.stats == nil { continue } @@ -251,8 +257,8 @@ func (a *TraceAnalyzer) ProcessStats() error { // span, so we need to check flow stats for max memory usage. for _, v := range stats.stats { if v.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] = memUsage + if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] = memUsage } } } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 758ead28c526..32898e11e866 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -194,10 +194,12 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) a.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -209,9 +211,9 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { ) a.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), @@ -289,10 +291,12 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { // Artificially inject component stats directly into the FlowsMetadata (in // the non-testing setting the stats come from the trace). var f1, f2 execstats.FlowsMetadata + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) f1.AddComponentStats( - 1, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n1, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 1, /* processorID */ ), @@ -302,9 +306,9 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { }, ) f2.AddComponentStats( - 2, /* nodeID */ &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( + n2, execinfrapb.FlowID{UUID: uuid.MakeV4()}, 2, /* processorID */ ), diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index 9519591726f8..c9488a054e5b 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -11,27 +11,24 @@ package execstats import ( + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" ) // AddComponentStats modifies TraceAnalyzer internal state to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (a *TraceAnalyzer) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { - a.FlowsMetadata.AddComponentStats(nodeID, stats) +func (a *TraceAnalyzer) AddComponentStats(stats *execinfrapb.ComponentStats) { + a.FlowsMetadata.AddComponentStats(stats) } // AddComponentStats modifies FlowsMetadata to add stats for the // processor/stream/flow specified in stats.ComponentID and the given node ID. -func (m *FlowsMetadata) AddComponentStats( - nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, -) { +func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: processorStat := &processorStats{ - nodeID: nodeID, + nodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.processorStats == nil { @@ -40,7 +37,7 @@ func (m *FlowsMetadata) AddComponentStats( m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ - originNodeID: nodeID, + originNodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.streamStats == nil { @@ -51,8 +48,8 @@ func (m *FlowsMetadata) AddComponentStats( flowStat := &flowStats{} flowStat.stats = append(flowStat.stats, stats) if m.flowStats == nil { - m.flowStats = make(map[roachpb.NodeID]*flowStats) + m.flowStats = make(map[base.SQLInstanceID]*flowStats) } - m.flowStats[nodeID] = flowStat + m.flowStats[stats.Component.SQLInstanceID] = flowStat } } diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 3eb52b00aae3..7e8a1903b209 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -435,7 +434,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { // information over. if sp := tracing.SpanFromContext(ctx); sp != nil { sp.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(roachpb.NodeID(f.NodeID.SQLInstanceID()), f.FlowCtx.ID), + Component: execinfrapb.FlowComponentID(f.NodeID.SQLInstanceID(), f.FlowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/flowinfra/outbox_test.go b/pkg/sql/flowinfra/outbox_test.go index 944678aefa4e..9916601cab91 100644 --- a/pkg/sql/flowinfra/outbox_test.go +++ b/pkg/sql/flowinfra/outbox_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -71,6 +72,7 @@ func TestOutbox(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -227,6 +229,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) @@ -301,6 +304,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -376,6 +380,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), Stopper: stopper, }, + NodeID: base.TestingIDContainer, }) outbox.Init(rowenc.OneIntCol) // In a RunSyncFlow call, the outbox runs under the call's context. @@ -437,6 +442,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -490,6 +496,7 @@ func TestOutboxUnblocksProducers(t *testing.T) { // a nil nodeDialer will always fail to connect. NodeDialer: nil, }, + NodeID: base.TestingIDContainer, } streamID := execinfrapb.StreamID(42) var outbox *Outbox @@ -560,6 +567,7 @@ func BenchmarkOutbox(b *testing.B) { Stopper: stopper, NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, + NodeID: base.TestingIDContainer, } outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.MakeIntCols(numCols)) diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 26b77b26be0c..af62c46dc1d6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -224,7 +224,12 @@ func (ih *instrumentationHelper) Finish( } if ih.traceMetadata != nil && ih.explainPlan != nil { - ih.traceMetadata.annotateExplain(ih.explainPlan, trace, cfg.TestingKnobs.DeterministicExplainAnalyze) + ih.traceMetadata.annotateExplain( + ih.explainPlan, + p.curPlan.distSQLFlowInfos, + trace, + cfg.TestingKnobs.DeterministicExplainAnalyze, + ) } // TODO(radu): this should be unified with other stmt stats accesses. @@ -478,7 +483,7 @@ func (m execNodeTraceMetadata) associateNodeWithComponents( // annotateExplain aggregates the statistics in the trace and annotates // explain.Nodes with execution stats. func (m execNodeTraceMetadata) annotateExplain( - plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, + plan *explain.Plan, flowInfos []flowInfo, spans []tracingpb.RecordedSpan, makeDeterministic bool, ) { statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic) @@ -489,8 +494,12 @@ func (m execNodeTraceMetadata) annotateExplain( var nodeStats exec.ExecutionStats incomplete := false - for i := range components { - stats := statsMap[components[i]] + var nodes util.FastIntSet + for _, c := range components { + if c.Type == execinfrapb.ComponentID_PROCESSOR { + nodes.Add(int(c.SQLInstanceID)) + } + stats := statsMap[c] if stats == nil { incomplete = true break @@ -503,6 +512,9 @@ func (m execNodeTraceMetadata) annotateExplain( // incomplete results. In the future, we may consider an incomplete flag // if we want to show them with a warning. if !incomplete { + for i, ok := nodes.Next(0); ok; i, ok = nodes.Next(i + 1) { + nodeStats.Nodes = append(nodeStats.Nodes, fmt.Sprintf("n%d", i)) + } n.Annotate(exec.ExecutionStatsID, &nodeStats) } } diff --git a/pkg/sql/logictest/testdata/logic_test/dist_vectorize b/pkg/sql/logictest/testdata/logic_test/dist_vectorize index da9e077cd4ed..21cd30465751 100644 --- a/pkg/sql/logictest/testdata/logic_test/dist_vectorize +++ b/pkg/sql/logictest/testdata/logic_test/dist_vectorize @@ -57,9 +57,11 @@ distribution: vectorized: · • group (scalar) +│ cluster nodes: │ actual row count: 1 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -78,12 +80,14 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -92,6 +96,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze b/pkg/sql/logictest/testdata/logic_test/explain_analyze index cc8534537cb0..1e44a1c9e85d 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze @@ -15,6 +15,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -34,6 +35,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B @@ -55,6 +57,7 @@ vectorized: · • hash join (inner) │ columns: (k, v, a, b) +│ cluster nodes: │ actual row count: 2 │ estimated row count: 990 (missing stats) │ equality: (v) = (a) @@ -62,6 +65,7 @@ vectorized: │ ├── • scan │ columns: (k, v) +│ cluster nodes: │ actual row count: 4 │ KV rows read: 4 │ KV bytes read: 32 B @@ -71,6 +75,7 @@ vectorized: │ └── • scan columns: (a, b) + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans index 1c1997e071b6..525dd98a2d36 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans @@ -66,17 +66,20 @@ distribution: vectorized: · • group +│ cluster nodes: │ actual row count: 5 │ group by: k │ ordered: +k │ └── • merge join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -85,6 +88,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -104,19 +108,23 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 5 │ order: +w │ └── • distinct + │ cluster nodes: │ actual row count: 5 │ distinct on: w │ └── • hash join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (w) │ left cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -125,6 +133,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -144,12 +153,15 @@ distribution: vectorized: · • cross join +│ cluster nodes: │ actual row count: 25 │ ├── • ordinality +│ │ cluster nodes: │ │ actual row count: 5 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -158,9 +170,11 @@ vectorized: │ spans: FULL SCAN │ └── • ordinality + │ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -200,9 +214,11 @@ distribution: vectorized: · • window +│ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -223,6 +239,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -249,6 +266,7 @@ vectorized: • root │ ├── • insert +│ │ cluster nodes: │ │ actual row count: 1 │ │ into: child(c, p) │ │ @@ -264,9 +282,11 @@ vectorized: │ │ exec mode: one row │ │ │ └── • group (scalar) +│ │ cluster nodes: │ │ actual row count: 1 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 1 │ KV rows read: 1 │ KV bytes read: 8 B @@ -278,9 +298,11 @@ vectorized: └── • constraint-check │ └── • error if rows + │ cluster nodes: │ actual row count: 0 │ └── • lookup join (anti) + │ cluster nodes: │ actual row count: 0 │ KV rows read: 1 │ KV bytes read: 8 B @@ -289,10 +311,12 @@ vectorized: │ equality cols are key │ └── • filter + │ cluster nodes: │ actual row count: 1 │ filter: column2 IS NOT NULL │ └── • scan buffer + cluster nodes: actual row count: 1 label: buffer 1 · diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial index c4844f3de938..92af0447fe00 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial @@ -34,25 +34,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 4 KV bytes read: 32 B @@ -102,25 +107,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -146,25 +156,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables index edcd216cbc54..367741095fb8 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_create_all_tables +++ b/pkg/sql/logictest/testdata/logic_test/show_create_all_tables @@ -414,4 +414,3 @@ SHOW CREATE ALL TABLES ---- create_statement CREATE SEQUENCE public.s1 MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 123 START 1; - diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_local b/pkg/sql/logictest/testdata/logic_test/vectorize_local index 1a5a3cda1cee..b33346a74d6e 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_local +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_local @@ -44,6 +44,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 2,001 KV rows read: 2,001 KV bytes read: 16 KiB @@ -62,6 +63,7 @@ distribution: vectorized: · • lookup join +│ cluster nodes: │ actual row count: 2 │ KV rows read: 1 │ KV bytes read: 8 B @@ -69,6 +71,7 @@ vectorized: │ equality: (b) = (b) │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -87,10 +90,12 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 2 │ equality: (a) = (b) │ ├── • scan +│ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B @@ -99,6 +104,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index df460c0ea00a..6466077eb31c 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -331,6 +331,9 @@ func (e *emitter) joinNodeName(algo string, joinType descpb.JoinType) string { func (e *emitter) emitNodeAttributes(n *Node) error { if stats, ok := n.annotations[exec.ExecutionStatsID]; ok { s := stats.(*exec.ExecutionStats) + if len(s.Nodes) > 0 { + e.ob.AddNonDeterministicField("cluster nodes", strings.Join(s.Nodes, ", ")) + } if s.RowCount.HasValue() { e.ob.AddField("actual row count", humanizeutil.Count(s.RowCount.Value())) } diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index 86a2c406f2f3..4a13417b9cdb 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -105,6 +105,15 @@ func (ob *OutputBuilder) AddField(key, value string) { ob.entries = append(ob.entries, entry{field: key, fieldVal: value}) } +// AddNonDeterministicField adds an information field under the current node, +// but hides the information if the MakeDeterministic flag is set. +func (ob *OutputBuilder) AddNonDeterministicField(key, value string) { + if ob.flags.MakeDeterministic { + value = "" + } + ob.AddField(key, value) +} + // Attr adds an information field under the current node. func (ob *OutputBuilder) Attr(key string, value interface{}) { ob.AddField(key, fmt.Sprint(value)) diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index d18179869a62..9be03805d27c 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -304,6 +304,9 @@ type ExecutionStats struct { KVBytesRead optional.Uint KVRowsRead optional.Uint + + // Nodes on which this operator was executed. + Nodes []string } // BuildPlanForExplainFn builds an execution plan against the given