diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 64fa1d7c8603..8ffd605db4c6 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/colflow", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/col/coldata", "//pkg/col/coldataext", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 5dd4d32f3817..283df6e9cdc6 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -834,7 +835,7 @@ func (s *vectorizedFlowCreator) setupInput( // Note: we can't use flowCtx.StreamComponentID because the stream does // not originate from this node (we are the target node). compID := execinfrapb.StreamComponentID( - inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID, + base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID, ) op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency) if err != nil { @@ -961,7 +962,7 @@ func (s *vectorizedFlowCreator) setupOutput( // flow-level span. span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID) span.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID), + Component: execinfrapb.FlowComponentID(base.SQLInstanceID(outputStream.OriginNodeID), flowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 6e22c1c7eef6..b24db1eaa35e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -16,6 +16,7 @@ import ( "reflect" "sort" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" @@ -2744,7 +2745,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( processors := make(execComponents, len(plan.ResultRouters)) for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( - plan.Processors[resultProcIdx].Node, + base.SQLInstanceID(plan.Processors[resultProcIdx].Node), execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, int32(resultProcIdx), ) diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index b144768751eb..cace95667d84 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -109,15 +108,11 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec { // ProcessorComponentID returns a ComponentID for the given processor in this // flow. func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID { - // TODO(radu): the component stats should store SQLInstanceID instead. - nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) - return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID) + return execinfrapb.ProcessorComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, procID) } // StreamComponentID returns a ComponentID for the given stream in this flow. // The stream must originate from the node associated with this FlowCtx. func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID { - // TODO(radu): the component stats should store SQLInstanceID instead. - originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID()) - return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID) + return execinfrapb.StreamComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, streamID) } diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 6e8f8a5b3bd3..75f059bf5fdf 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -26,31 +26,35 @@ import ( ) // ProcessorComponentID returns a ComponentID for the given processor in a flow. -func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID { +func ProcessorComponentID( + instanceID base.SQLInstanceID, flowID FlowID, processorID int32, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_PROCESSOR, - ID: processorID, - NodeID: nodeID, + FlowID: flowID, + Type: ComponentID_PROCESSOR, + ID: processorID, + SQLInstanceID: instanceID, } } // StreamComponentID returns a ComponentID for the given stream in a flow. -func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID { +func StreamComponentID( + originInstanceID base.SQLInstanceID, flowID FlowID, streamID StreamID, +) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_STREAM, - ID: int32(streamID), - NodeID: originNodeID, + FlowID: flowID, + Type: ComponentID_STREAM, + ID: int32(streamID), + SQLInstanceID: originInstanceID, } } // FlowComponentID returns a ComponentID for the given flow. -func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID { +func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID { return ComponentID{ - FlowID: flowID, - Type: ComponentID_FLOW, - NodeID: nodeID, + FlowID: flowID, + Type: ComponentID_FLOW, + SQLInstanceID: instanceID, } } diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index 3ac9af753875..f8f28058d193 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -8,7 +8,7 @@ import fmt "fmt" import math "math" import optional "github.com/cockroachdb/cockroach/pkg/util/optional" -import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import github_com_cockroachdb_cockroach_pkg_base "github.com/cockroachdb/cockroach/pkg/base" import io "io" @@ -69,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{0, 0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -81,16 +81,16 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` - // NodeID of the node this component is associated with. For cross-node + // SQLInstanceID of the node this component is associated with. For cross-node // streams, this is the *origin* node for the stream. - NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` + SQLInstanceID github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID `protobuf:"varint,4,opt,name=sql_instance_id,json=sqlInstanceId,casttype=github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID" json:"sql_instance_id"` } func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{0} + return fileDescriptor_component_stats_011c4899fda43780, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,7 +137,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{1} + return fileDescriptor_component_stats_011c4899fda43780, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -174,7 +174,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{2} + return fileDescriptor_component_stats_011c4899fda43780, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -218,7 +218,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{3} + return fileDescriptor_component_stats_011c4899fda43780, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -256,7 +256,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{4} + return fileDescriptor_component_stats_011c4899fda43780, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -297,7 +297,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{5} + return fileDescriptor_component_stats_011c4899fda43780, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -336,7 +336,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{6} + return fileDescriptor_component_stats_011c4899fda43780, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -373,7 +373,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{7} + return fileDescriptor_component_stats_011c4899fda43780, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -407,7 +407,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_e22770c51d19c14e, []int{8} + return fileDescriptor_component_stats_011c4899fda43780, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -475,7 +475,7 @@ func (m *ComponentID) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintComponentStats(dAtA, i, uint64(m.ID)) dAtA[i] = 0x20 i++ - i = encodeVarintComponentStats(dAtA, i, uint64(m.NodeID)) + i = encodeVarintComponentStats(dAtA, i, uint64(m.SQLInstanceID)) return i, nil } @@ -878,7 +878,7 @@ func (m *ComponentID) Size() (n int) { n += 1 + l + sovComponentStats(uint64(l)) n += 1 + sovComponentStats(uint64(m.Type)) n += 1 + sovComponentStats(uint64(m.ID)) - n += 1 + sovComponentStats(uint64(m.NodeID)) + n += 1 + sovComponentStats(uint64(m.SQLInstanceID)) return n } @@ -1128,9 +1128,9 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SQLInstanceID", wireType) } - m.NodeID = 0 + m.SQLInstanceID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowComponentStats @@ -1140,7 +1140,7 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + m.SQLInstanceID |= (github_com_cockroachdb_cockroach_pkg_base.SQLInstanceID(b) & 0x7F) << shift if b < 0x80 { break } @@ -2543,70 +2543,71 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_e22770c51d19c14e) -} - -var fileDescriptor_component_stats_e22770c51d19c14e = []byte{ - // 970 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, - 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, - 0xa1, 0x4a, 0x05, 0x95, 0x4a, 0xbd, 0xe5, 0x60, 0x0c, 0x69, 0x88, 0x63, 0xe3, 0x2e, 0xd8, 0x95, - 0x72, 0x28, 0x5a, 0x76, 0xc7, 0x78, 0xb5, 0x2f, 0xb3, 0xde, 0x9d, 0xc5, 0xeb, 0x7e, 0x8a, 0x9e, - 0xda, 0xcf, 0xd1, 0x6b, 0x3e, 0x81, 0x8f, 0x39, 0x46, 0x3d, 0x58, 0x0d, 0xee, 0x57, 0xe8, 0xa5, - 0xa7, 0x6a, 0x66, 0x5f, 0xd8, 0xb8, 0x42, 0x61, 0xab, 0xde, 0x46, 0xcb, 0xf3, 0xff, 0xcd, 0xf3, - 0x3e, 0xc0, 0x53, 0xef, 0xd2, 0xec, 0xe0, 0x00, 0xab, 0xba, 0x7d, 0xee, 0x2a, 0xce, 0xac, 0xa3, - 0x12, 0xcb, 0x21, 0x36, 0xb6, 0xe9, 0xd4, 0xa3, 0x0a, 0xf5, 0xda, 0x8e, 0x4b, 0x28, 0x41, 0x75, - 0x95, 0xa8, 0x86, 0x4b, 0x14, 0xf5, 0xa2, 0xed, 0x5d, 0x9a, 0x6d, 0x4d, 0xf7, 0xa8, 0x77, 0x69, - 0xba, 0xbe, 0xbd, 0xf3, 0x78, 0x4e, 0xe6, 0x84, 0x1b, 0x75, 0xd8, 0x29, 0xb4, 0xdf, 0xf9, 0xd4, - 0xa7, 0xba, 0xd9, 0x21, 0x0e, 0xd5, 0x89, 0xad, 0xac, 0x0e, 0xe1, 0xaf, 0xcd, 0x37, 0x22, 0x54, - 0x0e, 0xe2, 0x7b, 0x86, 0x7d, 0xf4, 0x35, 0x94, 0xce, 0x4d, 0x72, 0x35, 0xd5, 0xb5, 0xba, 0xb0, - 0x27, 0xb4, 0x1e, 0xf4, 0xea, 0x37, 0xb7, 0xbb, 0xb9, 0xdf, 0x6f, 0x77, 0xa5, 0xe7, 0x26, 0xb9, - 0x1a, 0xf6, 0x97, 0xc9, 0x49, 0x96, 0x98, 0xe1, 0x50, 0x43, 0x7d, 0x28, 0xd0, 0x6b, 0x07, 0xd7, - 0xc5, 0x3d, 0xa1, 0x55, 0xed, 0x7e, 0xd9, 0x5e, 0xe7, 0x5f, 0x3b, 0x75, 0x4f, 0x7b, 0x72, 0xed, - 0xe0, 0x5e, 0x81, 0xb1, 0x65, 0xae, 0x46, 0x3b, 0x20, 0xea, 0x5a, 0x3d, 0xbf, 0x27, 0xb4, 0x8a, - 0x3d, 0x60, 0xdf, 0x97, 0xb7, 0xbb, 0xe2, 0xb0, 0x2f, 0x8b, 0xba, 0x86, 0x7e, 0x84, 0x92, 0x4d, - 0x34, 0xcc, 0x9c, 0x2a, 0x70, 0x83, 0x41, 0x64, 0x20, 0x1d, 0x13, 0x0d, 0x0f, 0xfb, 0x7f, 0xdf, - 0xee, 0x7e, 0x33, 0xd7, 0xe9, 0x85, 0x3f, 0x6b, 0xab, 0xc4, 0xea, 0x24, 0x0e, 0x68, 0xb3, 0xd5, - 0xb9, 0xe3, 0x18, 0xf3, 0x0e, 0x3f, 0x39, 0xb3, 0x76, 0x28, 0x93, 0x25, 0x46, 0x1d, 0x6a, 0xcd, - 0x6f, 0xa1, 0xc0, 0xfc, 0x41, 0x65, 0x28, 0x9e, 0x1e, 0x8f, 0x07, 0x93, 0x5a, 0x0e, 0x6d, 0x43, - 0xf9, 0x44, 0x1e, 0x1d, 0x0c, 0xc6, 0xe3, 0x91, 0x5c, 0x13, 0x10, 0x80, 0x34, 0x9e, 0xc8, 0x83, - 0xfd, 0xa3, 0x9a, 0x88, 0xb6, 0xa0, 0xf0, 0xfc, 0xd5, 0xe8, 0x87, 0x5a, 0xbe, 0xf9, 0xa6, 0x00, - 0xd5, 0x24, 0xa8, 0x31, 0xab, 0x11, 0x1a, 0x42, 0x39, 0x29, 0x1b, 0xcf, 0x60, 0xa5, 0xfb, 0x74, - 0xa3, 0x8c, 0x44, 0xc9, 0x58, 0xa9, 0xd1, 0x00, 0x24, 0x1b, 0xd3, 0xa9, 0x1b, 0xf0, 0xcc, 0x56, - 0xba, 0xad, 0xf5, 0x9c, 0x63, 0x4c, 0xaf, 0x88, 0x6b, 0xc8, 0x01, 0x77, 0x22, 0x42, 0x15, 0x6d, - 0x4c, 0xe5, 0x20, 0xc6, 0xd0, 0x80, 0x27, 0x77, 0x13, 0xcc, 0xe4, 0x5f, 0x98, 0x49, 0x80, 0x9e, - 0x81, 0x68, 0x2c, 0x78, 0xfa, 0x2b, 0xdd, 0x27, 0xeb, 0x11, 0x87, 0x67, 0xa1, 0x36, 0x29, 0xe1, - 0xe1, 0x99, 0x2c, 0x1a, 0x0b, 0xf4, 0x0c, 0x0a, 0xac, 0xb5, 0xeb, 0x45, 0x0e, 0xf8, 0x7c, 0x3d, - 0x60, 0x10, 0x60, 0x35, 0x7d, 0x3d, 0x97, 0xa1, 0x03, 0x90, 0x88, 0x4f, 0x1d, 0x9f, 0xd6, 0xa5, - 0x8f, 0xe5, 0x74, 0xc4, 0xed, 0xd2, 0x88, 0x48, 0x8a, 0x7a, 0x20, 0xe9, 0xb6, 0xe3, 0x53, 0xaf, - 0x5e, 0xda, 0xcb, 0xb7, 0x2a, 0xdd, 0x2f, 0xd6, 0x43, 0x86, 0xf6, 0x7d, 0x46, 0xa8, 0x44, 0x2f, - 0x00, 0xf8, 0x7c, 0xf0, 0x89, 0xac, 0x6f, 0x7d, 0x2c, 0x1a, 0x36, 0x2a, 0x69, 0x4c, 0xf9, 0x3c, - 0xfe, 0xd0, 0xfc, 0x45, 0x00, 0x58, 0x5d, 0x83, 0x7a, 0x00, 0xb6, 0x6f, 0x4d, 0xa9, 0xef, 0x98, - 0xd8, 0x8b, 0x3a, 0xe7, 0xb3, 0x14, 0x98, 0x4d, 0x71, 0x3b, 0x19, 0xde, 0x53, 0xdd, 0xa6, 0x31, - 0xd2, 0xf6, 0xad, 0x09, 0x57, 0xa1, 0x3e, 0x94, 0xaf, 0x14, 0x9d, 0x4e, 0xa9, 0x6e, 0xe1, 0xa8, - 0x69, 0x9e, 0xac, 0x45, 0xf4, 0x7d, 0x57, 0x61, 0xc7, 0x08, 0xb3, 0xc5, 0x94, 0x13, 0xdd, 0xc2, - 0xcd, 0xf7, 0x79, 0xa8, 0x7e, 0xd8, 0x50, 0x68, 0x1f, 0x4a, 0xa6, 0x42, 0xb1, 0xad, 0x5e, 0x47, - 0x9e, 0x6d, 0x8c, 0x8d, 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x35, 0x3c, 0xd6, 0xb0, 0x87, 0x5d, 0x5d, - 0x31, 0xf5, 0x9f, 0xb8, 0x49, 0x08, 0xcc, 0x67, 0x03, 0x7e, 0x72, 0x0f, 0xc2, 0xd9, 0xaf, 0xe0, - 0x61, 0x98, 0xfd, 0xa9, 0x8b, 0x55, 0xac, 0x2f, 0xb0, 0x16, 0xb5, 0xfb, 0x46, 0x65, 0xa8, 0x86, - 0x5a, 0x39, 0x92, 0xa2, 0x97, 0x50, 0x9d, 0x5d, 0xd3, 0x34, 0xac, 0xb8, 0x39, 0x6c, 0x9b, 0x4b, - 0x13, 0xd6, 0x09, 0x3c, 0xb2, 0xb0, 0xe7, 0x29, 0xf3, 0x34, 0x4e, 0xda, 0x1c, 0x57, 0x8b, 0xd5, - 0x31, 0xb1, 0xf9, 0xa7, 0x90, 0xd4, 0x38, 0x9a, 0x76, 0xd4, 0x87, 0x4a, 0x14, 0xbe, 0xb7, 0xda, - 0x5d, 0x1b, 0xe1, 0x21, 0xd4, 0x8d, 0xd9, 0xd2, 0xea, 0x01, 0x84, 0x61, 0x73, 0x88, 0x98, 0xa1, - 0x8d, 0xb9, 0x8c, 0x33, 0x5e, 0xc0, 0x76, 0x12, 0x2e, 0xc7, 0xe4, 0x37, 0xc7, 0x3c, 0x88, 0x95, - 0x8c, 0xd4, 0xfc, 0x4d, 0x84, 0x52, 0xb4, 0x91, 0x56, 0x9e, 0xb9, 0x58, 0xd1, 0x32, 0x0d, 0x58, - 0x54, 0x0c, 0x45, 0x4b, 0xe5, 0x88, 0x43, 0xc4, 0xcc, 0x39, 0xe2, 0x94, 0x97, 0x50, 0x32, 0x16, - 0x19, 0xfb, 0xb6, 0x1a, 0xbf, 0x78, 0x87, 0x67, 0xac, 0x59, 0x65, 0xc9, 0x58, 0xf0, 0xa6, 0x3d, - 0x81, 0x87, 0x2a, 0xb1, 0x29, 0xb6, 0x57, 0xb3, 0x50, 0xc8, 0x36, 0x0b, 0xd5, 0x95, 0x9e, 0x8f, - 0xff, 0x5f, 0x02, 0x94, 0x93, 0x25, 0xcc, 0xc6, 0x96, 0x2d, 0xe0, 0x90, 0x9c, 0x71, 0xf6, 0xb7, - 0x98, 0x92, 0x7b, 0x39, 0x82, 0x47, 0x96, 0x12, 0x4c, 0x15, 0xd3, 0x24, 0xaa, 0x42, 0xb1, 0x36, - 0xb5, 0xb0, 0x95, 0x25, 0x7b, 0x0f, 0x2d, 0x25, 0xd8, 0x8f, 0xc5, 0x47, 0xd8, 0x42, 0xdf, 0x03, - 0xfa, 0x10, 0xa8, 0xe9, 0x9e, 0x91, 0xa5, 0x4f, 0x6a, 0x69, 0x62, 0x5f, 0xf7, 0x8c, 0xe6, 0xaf, - 0x02, 0x54, 0x52, 0x6f, 0x07, 0xab, 0x35, 0x5b, 0xc8, 0x33, 0x85, 0xaa, 0x17, 0xd9, 0x36, 0x32, - 0x5b, 0xe4, 0xbd, 0x50, 0x76, 0x6f, 0xad, 0x8b, 0xff, 0x65, 0xad, 0x37, 0x27, 0x50, 0x4e, 0xde, - 0x11, 0xf4, 0x1d, 0x6c, 0xb3, 0xc8, 0x2d, 0x6c, 0x4d, 0x7d, 0xd6, 0xe8, 0x59, 0x1c, 0xab, 0x58, - 0x4a, 0x70, 0x84, 0xad, 0x53, 0xa6, 0xeb, 0x7d, 0x75, 0xf3, 0xbe, 0x91, 0xbb, 0x59, 0x36, 0x84, - 0xb7, 0xcb, 0x86, 0xf0, 0x6e, 0xd9, 0x10, 0xfe, 0x58, 0x36, 0x84, 0x9f, 0xef, 0x1a, 0xb9, 0xb7, - 0x77, 0x8d, 0xdc, 0xbb, 0xbb, 0x46, 0xee, 0x75, 0x25, 0xf5, 0x67, 0xf4, 0x9f, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xce, 0xc1, 0x00, 0xbc, 0x9e, 0x0a, 0x00, 0x00, + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_011c4899fda43780) +} + +var fileDescriptor_component_stats_011c4899fda43780 = []byte{ + // 982 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0xe2, 0x46, + 0x18, 0xc7, 0xb1, 0x21, 0x26, 0x3c, 0x2c, 0x84, 0x9d, 0xee, 0x01, 0x45, 0x2d, 0xc9, 0xd2, 0xae, + 0x84, 0x2a, 0x15, 0x54, 0x0e, 0xed, 0x69, 0x0f, 0x21, 0xb0, 0x5d, 0x36, 0xc9, 0x92, 0x35, 0x24, + 0x95, 0xf6, 0x62, 0x19, 0x7b, 0x42, 0x46, 0xf8, 0x0d, 0x7b, 0x4c, 0x9c, 0x7e, 0x8a, 0x9e, 0xda, + 0xcf, 0xd1, 0x6b, 0x3f, 0x41, 0x8e, 0x7b, 0xe8, 0x61, 0xd5, 0x43, 0xd4, 0x25, 0xfd, 0x0a, 0xbd, + 0xf4, 0x54, 0xcd, 0xd8, 0x18, 0x27, 0x15, 0x5a, 0x5c, 0xf5, 0x36, 0xd8, 0xcf, 0xff, 0x37, 0xcf, + 0xbb, 0x81, 0x67, 0xde, 0xcc, 0x68, 0xe1, 0x00, 0x6b, 0xc4, 0xba, 0x70, 0x55, 0x67, 0xdc, 0xd2, + 0x6c, 0xd3, 0xb1, 0x2d, 0x6c, 0x51, 0xc5, 0xa3, 0x2a, 0xf5, 0x9a, 0x8e, 0x6b, 0x53, 0x1b, 0x55, + 0x35, 0x5b, 0x9b, 0xba, 0xb6, 0xaa, 0x5d, 0x36, 0xbd, 0x99, 0xd1, 0xd4, 0x89, 0x47, 0xbd, 0x99, + 0xe1, 0xfa, 0xd6, 0xee, 0x93, 0x89, 0x3d, 0xb1, 0xb9, 0x51, 0x8b, 0x9d, 0x42, 0xfb, 0xdd, 0x4f, + 0x7d, 0x4a, 0x8c, 0x96, 0xed, 0x50, 0x62, 0x5b, 0xea, 0xea, 0x10, 0xbe, 0xad, 0xff, 0x26, 0x42, + 0xf1, 0x70, 0x79, 0x4f, 0xbf, 0x8b, 0xbe, 0x86, 0xfc, 0x85, 0x61, 0x5f, 0x29, 0x44, 0xaf, 0x0a, + 0xfb, 0x42, 0xe3, 0x51, 0xa7, 0x7a, 0x73, 0xbb, 0x97, 0xf9, 0xfd, 0x76, 0x4f, 0x7a, 0x61, 0xd8, + 0x57, 0xfd, 0xee, 0x22, 0x3e, 0xc9, 0x12, 0x33, 0xec, 0xeb, 0xa8, 0x0b, 0x39, 0x7a, 0xed, 0xe0, + 0xaa, 0xb8, 0x2f, 0x34, 0xca, 0xed, 0x2f, 0x9b, 0xeb, 0xfc, 0x6b, 0x26, 0xee, 0x69, 0x8e, 0xae, + 0x1d, 0xdc, 0xc9, 0x31, 0xb6, 0xcc, 0xd5, 0x68, 0x17, 0x44, 0xa2, 0x57, 0xb3, 0xfb, 0x42, 0x63, + 0xab, 0x03, 0xec, 0xf9, 0xe2, 0x76, 0x4f, 0xec, 0x77, 0x65, 0x91, 0xe8, 0x28, 0x80, 0x1d, 0x6f, + 0x66, 0x28, 0xc4, 0xf2, 0xa8, 0x6a, 0x69, 0x98, 0x39, 0x97, 0xe3, 0x86, 0xa7, 0x91, 0x61, 0x69, + 0xf8, 0xe6, 0xb8, 0x1f, 0xbd, 0xed, 0x77, 0xff, 0xbe, 0xdd, 0xfb, 0x76, 0x42, 0xe8, 0xa5, 0x3f, + 0x6e, 0x6a, 0xb6, 0xd9, 0x8a, 0xfd, 0xd1, 0xc7, 0xab, 0x73, 0xcb, 0x99, 0x4e, 0x5a, 0x63, 0xd5, + 0xc3, 0xcd, 0x7b, 0x52, 0xb9, 0xe4, 0xcd, 0x8c, 0xf8, 0xa7, 0x5e, 0xff, 0x06, 0x72, 0xcc, 0x53, + 0x54, 0x80, 0xad, 0xb3, 0xd7, 0xc3, 0xde, 0xa8, 0x92, 0x41, 0x25, 0x28, 0x9c, 0xca, 0x83, 0xc3, + 0xde, 0x70, 0x38, 0x90, 0x2b, 0x02, 0x02, 0x90, 0x86, 0x23, 0xb9, 0x77, 0x70, 0x52, 0x11, 0xd1, + 0x36, 0xe4, 0x5e, 0x1c, 0x0f, 0xbe, 0xaf, 0x64, 0xeb, 0xbf, 0xe6, 0xa0, 0x1c, 0x87, 0x3b, 0x64, + 0xd5, 0x43, 0x7d, 0x28, 0xc4, 0x05, 0xe5, 0xb9, 0x2d, 0xb6, 0x9f, 0x6d, 0x94, 0xab, 0x28, 0x4d, + 0x2b, 0x35, 0xea, 0x81, 0x64, 0x61, 0xaa, 0xb8, 0x01, 0xcf, 0x79, 0xb1, 0xdd, 0x58, 0xcf, 0x79, + 0x8d, 0xe9, 0x95, 0xed, 0x4e, 0xe5, 0x80, 0x3b, 0x11, 0xa1, 0xb6, 0x2c, 0x4c, 0xe5, 0x60, 0x89, + 0xa1, 0x01, 0x4f, 0xfb, 0x26, 0x98, 0xd1, 0xbf, 0x30, 0xa3, 0x00, 0x3d, 0x07, 0x71, 0x3a, 0xe7, + 0x05, 0x29, 0xb6, 0x9f, 0xae, 0x47, 0x1c, 0x9d, 0x87, 0xda, 0xb8, 0xb8, 0x47, 0xe7, 0xb2, 0x38, + 0x9d, 0xa3, 0xe7, 0x90, 0x63, 0x4d, 0x5f, 0xdd, 0xe2, 0x80, 0xcf, 0xd7, 0x03, 0x7a, 0x01, 0xd6, + 0x92, 0xd7, 0x73, 0x19, 0x3a, 0x04, 0xc9, 0xf6, 0xa9, 0xe3, 0xd3, 0xaa, 0xf4, 0xb1, 0x9c, 0x0e, + 0xb8, 0x5d, 0x12, 0x11, 0x49, 0x51, 0x07, 0x24, 0x62, 0x39, 0x3e, 0xf5, 0xaa, 0xf9, 0xfd, 0x6c, + 0xa3, 0xd8, 0xfe, 0x62, 0x3d, 0xa4, 0x6f, 0x3d, 0x64, 0x84, 0x4a, 0xf4, 0x12, 0x80, 0x4f, 0x0e, + 0x9f, 0xd5, 0xea, 0xf6, 0xc7, 0xa2, 0x61, 0x43, 0x94, 0xc4, 0x14, 0x2e, 0x96, 0x0f, 0xea, 0x3f, + 0x09, 0x00, 0xab, 0x6b, 0x50, 0x07, 0xc0, 0xf2, 0x4d, 0x85, 0xfa, 0x8e, 0x81, 0xbd, 0xa8, 0x73, + 0x3e, 0x4b, 0x80, 0xd9, 0x7c, 0x37, 0xe3, 0xb1, 0x3e, 0x23, 0x16, 0x5d, 0x22, 0x2d, 0xdf, 0x1c, + 0x71, 0x15, 0xea, 0x42, 0xe1, 0x4a, 0x25, 0x54, 0xa1, 0xc4, 0xc4, 0x51, 0xd3, 0x3c, 0x5d, 0x8b, + 0xe8, 0xfa, 0xae, 0xca, 0x8e, 0x11, 0x66, 0x9b, 0x29, 0x47, 0xc4, 0xc4, 0xf5, 0x0f, 0x59, 0x28, + 0xdf, 0x6f, 0x28, 0x74, 0x00, 0x79, 0x43, 0xa5, 0xd8, 0xd2, 0xae, 0x23, 0xcf, 0x36, 0xc6, 0x2e, + 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x2d, 0x3c, 0xd1, 0xb1, 0x87, 0x5d, 0xa2, 0x1a, 0xe4, 0x07, 0x6e, + 0x12, 0x02, 0xb3, 0xe9, 0x80, 0x9f, 0x3c, 0x80, 0x70, 0xf6, 0x31, 0xec, 0x84, 0xd9, 0x57, 0x5c, + 0xac, 0x61, 0x32, 0xc7, 0x7a, 0xd4, 0xee, 0x1b, 0x95, 0xa1, 0x1c, 0x6a, 0xe5, 0x48, 0x8a, 0x5e, + 0x41, 0x79, 0x7c, 0x4d, 0x93, 0xb0, 0xad, 0xcd, 0x61, 0x25, 0x2e, 0x8d, 0x59, 0xa7, 0xf0, 0xd8, + 0xc4, 0x9e, 0xa7, 0x4e, 0x92, 0x38, 0x69, 0x73, 0x5c, 0x65, 0xa9, 0x5e, 0x12, 0xeb, 0x7f, 0x0a, + 0x71, 0x8d, 0xa3, 0x69, 0x47, 0x5d, 0x28, 0x46, 0xe1, 0x7b, 0xab, 0xdd, 0xb5, 0x11, 0x1e, 0x42, + 0xdd, 0x90, 0x2d, 0xad, 0x0e, 0x40, 0x18, 0x36, 0x87, 0x88, 0x29, 0xda, 0x98, 0xcb, 0x38, 0xe3, + 0x25, 0x94, 0xe2, 0x70, 0x39, 0x26, 0xbb, 0x39, 0xe6, 0xd1, 0x52, 0xc9, 0x48, 0xf5, 0x5f, 0x44, + 0xc8, 0x47, 0x1b, 0x69, 0xe5, 0x99, 0x8b, 0x55, 0x3d, 0xd5, 0x80, 0x45, 0xc5, 0x50, 0xf5, 0x44, + 0x8e, 0x38, 0x44, 0x4c, 0x9d, 0x23, 0x4e, 0x79, 0x05, 0xf9, 0xe9, 0x3c, 0x65, 0xdf, 0x96, 0xa3, + 0x7d, 0x2a, 0x1d, 0x9d, 0xb3, 0x66, 0x95, 0xa5, 0xe9, 0x9c, 0x37, 0xed, 0x29, 0xec, 0x68, 0xb6, + 0x45, 0xb1, 0xb5, 0x9a, 0x85, 0x5c, 0xba, 0x59, 0x28, 0xaf, 0xf4, 0x7c, 0xfc, 0xff, 0x12, 0xa0, + 0x10, 0x2f, 0x61, 0x36, 0xb6, 0x6c, 0x01, 0x87, 0xe4, 0x94, 0xb3, 0xbf, 0xcd, 0x94, 0xdc, 0xcb, + 0x01, 0x3c, 0x36, 0xd5, 0x40, 0x51, 0x0d, 0xc3, 0xd6, 0x54, 0x8a, 0x75, 0xc5, 0xc4, 0x66, 0x9a, + 0xec, 0xed, 0x98, 0x6a, 0x70, 0xb0, 0x14, 0x9f, 0x60, 0x13, 0xbd, 0x01, 0x74, 0x1f, 0xa8, 0x13, + 0x6f, 0x9a, 0xa6, 0x4f, 0x2a, 0x49, 0x62, 0x97, 0x78, 0xd3, 0xfa, 0xcf, 0x02, 0x14, 0x13, 0xdf, + 0x0e, 0x56, 0x6b, 0xb6, 0x90, 0xc7, 0x2a, 0xd5, 0x2e, 0xd3, 0x6d, 0x64, 0xb6, 0xc8, 0x3b, 0xa1, + 0xec, 0xc1, 0x5a, 0x17, 0xff, 0xcb, 0x5a, 0xaf, 0x8f, 0xa0, 0x10, 0x7f, 0x47, 0xd0, 0x77, 0x50, + 0x62, 0x91, 0x9b, 0xd8, 0x54, 0x7c, 0xd6, 0xe8, 0x69, 0x1c, 0x2b, 0x9a, 0x6a, 0x70, 0x82, 0xcd, + 0x33, 0xa6, 0xeb, 0x7c, 0x75, 0xf3, 0xa1, 0x96, 0xb9, 0x59, 0xd4, 0x84, 0x77, 0x8b, 0x9a, 0xf0, + 0x7e, 0x51, 0x13, 0xfe, 0x58, 0xd4, 0x84, 0x1f, 0xef, 0x6a, 0x99, 0x77, 0x77, 0xb5, 0xcc, 0xfb, + 0xbb, 0x5a, 0xe6, 0x6d, 0x31, 0xf1, 0x37, 0xf5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xaa, 0xf0, + 0x57, 0x37, 0xb8, 0x0a, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index a02f9f920403..8b55abca2799 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -48,11 +48,12 @@ message ComponentID { optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; - // NodeID of the node this component is associated with. For cross-node + // SQLInstanceID of the node this component is associated with. For cross-node // streams, this is the *origin* node for the stream. - optional int32 node_id = 4 [(gogoproto.nullable) = false, - (gogoproto.customname) = "NodeID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + optional int32 sql_instance_id = 4 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SQLInstanceID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; } // ComponentStats contains statistics for an execution component. A component is diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 77d542825778..ff3ec8deeec4 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -600,14 +601,14 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) { for i := range d.Processors { p := &d.Processors[i] nodeID := d.nodeIDs[p.NodeIdx] - component := ProcessorComponentID(nodeID, d.flowID, p.processorID) + component := ProcessorComponentID(base.SQLInstanceID(nodeID), d.flowID, p.processorID) if compStats := statsMap[component]; compStats != nil { p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...) } } for i := range d.Edges { originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx] - component := StreamComponentID(originNodeID, d.flowID, d.Edges[i].streamID) + component := StreamComponentID(base.SQLInstanceID(originNodeID), d.flowID, d.Edges[i].streamID) if compStats := statsMap[component]; compStats != nil { d.Edges[i].Stats = compStats.StatsForQueryPlan() } diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 79e34ce05b9d..49c28ae7c807 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/roachpb", "//pkg/sql/execinfrapb", "//pkg/util/tracing/tracingpb", diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index b927c477c7d0..68a58fcc67ef 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -13,6 +13,7 @@ package execstats import ( "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -20,13 +21,13 @@ import ( ) type processorStats struct { - // TODO(radu): this field redundant with stats.Component.NodeID. + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. nodeID roachpb.NodeID stats *execinfrapb.ComponentStats } type streamStats struct { - // TODO(radu): this field redundant with stats.Component.NodeID. + // TODO(radu): this field redundant with stats.Component.SQLInstanceID. originNodeID roachpb.NodeID destinationNodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -53,7 +54,7 @@ type FlowsMetadata struct { // flowStats maps a node ID to flow level stats extracted from a trace. Note // that the key is not a FlowID because the same FlowID is used across // nodes. - flowStats map[roachpb.NodeID]*flowStats + flowStats map[base.SQLInstanceID]*flowStats } // NewFlowsMetadata creates a FlowsMetadata for the given physical plan @@ -62,12 +63,12 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta a := &FlowsMetadata{ processorStats: make(map[execinfrapb.ProcessorID]*processorStats), streamStats: make(map[execinfrapb.StreamID]*streamStats), - flowStats: make(map[roachpb.NodeID]*flowStats), + flowStats: make(map[base.SQLInstanceID]*flowStats), } // Annotate the maps with physical plan information. for nodeID, flow := range flows { - a.flowStats[nodeID] = &flowStats{} + a.flowStats[base.SQLInstanceID(nodeID)] = &flowStats{} for _, proc := range flow.Processors { a.processorStats[execinfrapb.ProcessorID(proc.ProcessorID)] = &processorStats{nodeID: nodeID} for _, output := range proc.Output { @@ -89,13 +90,13 @@ func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMeta // TODO(asubiotto): Flatten this struct, we're currently allocating a map per // stat. type NodeLevelStats struct { - NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64 - MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 - KVBytesReadGroupedByNode map[roachpb.NodeID]int64 - KVRowsReadGroupedByNode map[roachpb.NodeID]int64 - KVTimeGroupedByNode map[roachpb.NodeID]time.Duration - NetworkMessagesGroupedByNode map[roachpb.NodeID]int64 - ContentionTimeGroupedByNode map[roachpb.NodeID]time.Duration + NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64 + MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64 + KVBytesReadGroupedByNode map[base.SQLInstanceID]int64 + KVRowsReadGroupedByNode map[base.SQLInstanceID]int64 + KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration + NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 + ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the @@ -164,12 +165,12 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist streamStats.stats = componentStats case execinfrapb.ComponentID_FLOW: - flowStats := a.flowStats[component.NodeID] + flowStats := a.flowStats[component.SQLInstanceID] if flowStats == nil { return errors.Errorf( "trace has span for flow %s on node %s but the flow does not exist in the physical plan", component.FlowID, - component.NodeID, + component.SQLInstanceID, ) } flowStats.stats = append(flowStats.stats, componentStats) @@ -186,13 +187,13 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist func (a *TraceAnalyzer) ProcessStats() error { // Process node level stats. a.nodeLevelStats = NodeLevelStats{ - NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64), - MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), - KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), - KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), - NetworkMessagesGroupedByNode: make(map[roachpb.NodeID]int64), - ContentionTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), + NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64), + MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64), + KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64), + KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), + NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), + ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), } var errs error @@ -201,10 +202,11 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } - a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) - a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) - a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value() - a.nodeLevelStats.ContentionTimeGroupedByNode[stats.nodeID] += stats.stats.KV.ContentionTime.Value() + instanceID := base.SQLInstanceID(stats.nodeID) + a.nodeLevelStats.KVBytesReadGroupedByNode[instanceID] += int64(stats.stats.KV.BytesRead.Value()) + a.nodeLevelStats.KVRowsReadGroupedByNode[instanceID] += int64(stats.stats.KV.TuplesRead.Value()) + a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.stats.KV.KVTime.Value() + a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.stats.KV.ContentionTime.Value() } // Process streamStats. @@ -212,13 +214,14 @@ func (a *TraceAnalyzer) ProcessStats() error { if stats.stats == nil { continue } + originInstanceID := base.SQLInstanceID(stats.originNodeID) // Set networkBytesSentGroupedByNode. bytes, err := getNetworkBytesFromComponentStats(stats.stats) if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating network bytes sent")) } else { - a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes + a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes } // The row execution flow attaches this stat to a stream stat with the @@ -230,8 +233,9 @@ func (a *TraceAnalyzer) ProcessStats() error { // removed, getting maxMemUsage from streamStats should be removed as // well. if stats.stats.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage + memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()) + if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage } } @@ -239,12 +243,12 @@ func (a *TraceAnalyzer) ProcessStats() error { if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages")) } else { - a.nodeLevelStats.NetworkMessagesGroupedByNode[stats.originNodeID] += numMessages + a.nodeLevelStats.NetworkMessagesGroupedByNode[originInstanceID] += numMessages } } // Process flowStats. - for nodeID, stats := range a.flowStats { + for instanceID, stats := range a.flowStats { if stats.stats == nil { continue } @@ -253,8 +257,8 @@ func (a *TraceAnalyzer) ProcessStats() error { // span, so we need to check flow stats for max memory usage. for _, v := range stats.stats { if v.FlowStats.MaxMemUsage.HasValue() { - if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] = memUsage + if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] { + a.nodeLevelStats.MaxMemoryUsageGroupedByNode[instanceID] = memUsage } } } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 4480a1db1ee8..32898e11e866 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -194,8 +194,8 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} - n1 := roachpb.NodeID(1) - n2 := roachpb.NodeID(2) + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) a.AddComponentStats( &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( @@ -291,8 +291,8 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) { // Artificially inject component stats directly into the FlowsMetadata (in // the non-testing setting the stats come from the trace). var f1, f2 execstats.FlowsMetadata - n1 := roachpb.NodeID(1) - n2 := roachpb.NodeID(2) + n1 := base.SQLInstanceID(1) + n2 := base.SQLInstanceID(2) f1.AddComponentStats( &execinfrapb.ComponentStats{ Component: execinfrapb.ProcessorComponentID( diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index d25fd92c3870..c9488a054e5b 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -11,6 +11,7 @@ package execstats import ( + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" ) @@ -27,7 +28,7 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: processorStat := &processorStats{ - nodeID: stats.Component.NodeID, + nodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.processorStats == nil { @@ -36,7 +37,7 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ - originNodeID: stats.Component.NodeID, + originNodeID: roachpb.NodeID(stats.Component.SQLInstanceID), stats: stats, } if m.streamStats == nil { @@ -47,8 +48,8 @@ func (m *FlowsMetadata) AddComponentStats(stats *execinfrapb.ComponentStats) { flowStat := &flowStats{} flowStat.stats = append(flowStat.stats, stats) if m.flowStats == nil { - m.flowStats = make(map[roachpb.NodeID]*flowStats) + m.flowStats = make(map[base.SQLInstanceID]*flowStats) } - m.flowStats[stats.Component.NodeID] = flowStat + m.flowStats[stats.Component.SQLInstanceID] = flowStat } } diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 3eb52b00aae3..7e8a1903b209 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -435,7 +434,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { // information over. if sp := tracing.SpanFromContext(ctx); sp != nil { sp.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.FlowComponentID(roachpb.NodeID(f.NodeID.SQLInstanceID()), f.FlowCtx.ID), + Component: execinfrapb.FlowComponentID(f.NodeID.SQLInstanceID(), f.FlowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.EvalCtx.Mon.MaximumBytes())), }, diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index d22e60901389..af62c46dc1d6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -497,7 +497,7 @@ func (m execNodeTraceMetadata) annotateExplain( var nodes util.FastIntSet for _, c := range components { if c.Type == execinfrapb.ComponentID_PROCESSOR { - nodes.Add(int(c.NodeID)) + nodes.Add(int(c.SQLInstanceID)) } stats := statsMap[c] if stats == nil {