Skip to content

Commit

Permalink
sql: use SQLInstanceID in ComponentStats
Browse files Browse the repository at this point in the history
Switch to using SQLInstanceID instead of NodeID in ComponentStats.
Eventually, the same change should happen in the exec protos (e.g.
StreamEndpointSpec).

Release note: None
  • Loading branch information
RaduBerinde committed Feb 18, 2021
1 parent a24fa86 commit 878bf45
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 158 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/colflow",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/rpc/nodedialer",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand Down Expand Up @@ -834,7 +835,7 @@ func (s *vectorizedFlowCreator) setupInput(
// Note: we can't use flowCtx.StreamComponentID because the stream does
// not originate from this node (we are the target node).
compID := execinfrapb.StreamComponentID(
inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID,
base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID,
)
op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency)
if err != nil {
Expand Down Expand Up @@ -961,7 +962,7 @@ func (s *vectorizedFlowCreator) setupOutput(
// flow-level span.
span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID)
span.SetSpanStats(&execinfrapb.ComponentStats{
Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID),
Component: execinfrapb.FlowComponentID(base.SQLInstanceID(outputStream.OriginNodeID), flowCtx.ID),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())),
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"reflect"
"sort"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -2744,7 +2745,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
processors := make(execComponents, len(plan.ResultRouters))
for i, resultProcIdx := range plan.ResultRouters {
processors[i] = execinfrapb.ProcessorComponentID(
plan.Processors[resultProcIdx].Node,
base.SQLInstanceID(plan.Processors[resultProcIdx].Node),
execinfrapb.FlowID{UUID: planCtx.infra.FlowID},
int32(resultProcIdx),
)
Expand Down
9 changes: 2 additions & 7 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -109,15 +108,11 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec {
// ProcessorComponentID returns a ComponentID for the given processor in this
// flow.
func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID {
// TODO(radu): the component stats should store SQLInstanceID instead.
nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID())
return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID)
return execinfrapb.ProcessorComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, procID)
}

// StreamComponentID returns a ComponentID for the given stream in this flow.
// The stream must originate from the node associated with this FlowCtx.
func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID {
// TODO(radu): the component stats should store SQLInstanceID instead.
originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID())
return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID)
return execinfrapb.StreamComponentID(ctx.NodeID.SQLInstanceID(), ctx.ID, streamID)
}
34 changes: 19 additions & 15 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -26,31 +26,35 @@ import (
)

// ProcessorComponentID returns a ComponentID for the given processor in a flow.
func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID {
func ProcessorComponentID(
instanceID base.SQLInstanceID, flowID FlowID, processorID int32,
) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_PROCESSOR,
ID: processorID,
NodeID: nodeID,
FlowID: flowID,
Type: ComponentID_PROCESSOR,
ID: processorID,
SQLInstanceID: instanceID,
}
}

// StreamComponentID returns a ComponentID for the given stream in a flow.
func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID {
func StreamComponentID(
originInstanceID base.SQLInstanceID, flowID FlowID, streamID StreamID,
) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_STREAM,
ID: int32(streamID),
NodeID: originNodeID,
FlowID: flowID,
Type: ComponentID_STREAM,
ID: int32(streamID),
SQLInstanceID: originInstanceID,
}
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID {
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
NodeID: nodeID,
FlowID: flowID,
Type: ComponentID_FLOW,
SQLInstanceID: instanceID,
}
}

Expand Down
Loading

0 comments on commit 878bf45

Please sign in to comment.