Skip to content

Commit

Permalink
sql: add nodes for each EXPLAIN ANALYZE operator
Browse files Browse the repository at this point in the history
Show the cluster nodes involved in the execution of each operator.

Note that this info does not show up in the non-analyze EXPLAIN. It is
technically much more challenging to do that because of the indirect
way we do distsql planning. Once we have the new DistSQL exec factory,
we will be able to add it.

To implement this, we make execution set `ComponentID.NodeID` in all
cases.

Unfortunately, there is not much we can do to test this currently
(other than manual testing). I will investigate making the
"deterministic" flag more fine-grained, so that we can hide truly
non-deterministic values (like timings) separately from those that
just vary with the configuration (query distribution).

Example:
```
movr> EXPLAIN ANALYZE SELECT * FROM rides JOIN vehicles ON rides.vehicle_id = vehicles.id;
                    info
--------------------------------------------
  planning time: 158µs
  execution time: 7ms
  distribution: full
  vectorized: true

   hash join
  │ cluster nodes: n1, n2, n3
  │ actual row count: 500
  │ equality: (vehicle_id) = (id)
  │
  ├──  scan
  │     cluster nodes: n1, n2, n3
  │     actual row count: 500
  │     KV rows read: 500
  │     KV bytes read: 86 KiB
  │     missing stats
  │     table: rides@primary
  │     spans: FULL SCAN
  │
  └──  scan
        cluster nodes: n1, n2, n3
        actual row count: 15
        KV rows read: 15
        KV bytes read: 2.3 KiB
        missing stats
        table: vehicles@primary
        spans: FULL SCAN
```

Release note (sql change): EXPLAIN ANALYZE now includes the nodes
which were involved in the execution of each operator in the tree.
  • Loading branch information
RaduBerinde committed Feb 16, 2021
1 parent aa8f949 commit 1a74e20
Show file tree
Hide file tree
Showing 22 changed files with 172 additions and 63 deletions.
7 changes: 5 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,12 @@ func (s *vectorizedFlowCreator) setupInput(
s.addStreamEndpoint(inputStream.StreamID, inbox, s.waitGroup)
op := colexecbase.Operator(inbox)
if s.recordingStats {
op, err = s.wrapWithNetworkVectorizedStatsCollector(
inbox, flowCtx.StreamComponentID(inputStream.StreamID), latency,
// Note: we can't use flowCtx.StreamComponentID because the stream does
// not originate from this node (we are the target node).
compID := execinfrapb.StreamComponentID(
inputStream.OriginNodeID, flowCtx.ID, inputStream.StreamID,
)
op, err = s.wrapWithNetworkVectorizedStatsCollector(inbox, compID, latency)
if err != nil {
return nil, nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) {
NodeDialer: nodedialer.New(rpcContext, staticAddressResolver(ln.Addr())),
Stopper: outboxStopper,
},
NodeID: base.NewSQLIDContainer(1, nil /* nodeID */),
}

streamID := execinfrapb.StreamID(1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2724,10 +2724,11 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(

if planCtx.traceMetadata != nil {
processors := make(execComponents, len(plan.ResultRouters))
for i := range plan.ResultRouters {
for i, resultProcIdx := range plan.ResultRouters {
processors[i] = execinfrapb.ProcessorComponentID(
plan.Processors[resultProcIdx].Node,
execinfrapb.FlowID{UUID: planCtx.infra.FlowID},
int32(plan.ResultRouters[i]),
int32(resultProcIdx),
)
}
planCtx.traceMetadata.associateNodeWithComponents(node, processors)
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -108,10 +109,15 @@ func (ctx *FlowCtx) Codec() keys.SQLCodec {
// ProcessorComponentID returns a ComponentID for the given processor in this
// flow.
func (ctx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.ComponentID {
return execinfrapb.ProcessorComponentID(ctx.ID, procID)
// TODO(radu): the component stats should store SQLInstanceID instead.
nodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID())
return execinfrapb.ProcessorComponentID(nodeID, ctx.ID, procID)
}

// StreamComponentID returns a ComponentID for the given stream in this flow.
// The stream must originate from the node associated with this FlowCtx.
func (ctx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID {
return execinfrapb.StreamComponentID(ctx.ID, streamID)
// TODO(radu): the component stats should store SQLInstanceID instead.
originNodeID := roachpb.NodeID(ctx.NodeID.SQLInstanceID())
return execinfrapb.StreamComponentID(originNodeID, ctx.ID, streamID)
}
6 changes: 4 additions & 2 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,22 @@ import (
)

// ProcessorComponentID returns a ComponentID for the given processor in a flow.
func ProcessorComponentID(flowID FlowID, processorID int32) ComponentID {
func ProcessorComponentID(nodeID roachpb.NodeID, flowID FlowID, processorID int32) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_PROCESSOR,
ID: processorID,
NodeID: nodeID,
}
}

// StreamComponentID returns a ComponentID for the given stream in a flow.
func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID {
func StreamComponentID(originNodeID roachpb.NodeID, flowID FlowID, streamID StreamID) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_STREAM,
ID: int32(streamID),
NodeID: originNodeID,
}
}

Expand Down
30 changes: 14 additions & 16 deletions pkg/sql/execinfrapb/component_stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ message ComponentID {
optional int32 id = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ID"];

// NodeID of the node this component ran on.
// TODO(asubiotto): This is only used when Type = FLOW to uniquely describe
// a flow (since flows on different nodes still have the same FlowID). Use
// this for processors/streams as well.
// NodeID of the node this component is associated with. For cross-node
// streams, this is the *origin* node for the stream.
optional int32 node_id = 4 [(gogoproto.nullable) = false,
(gogoproto.customname) = "NodeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"];
Expand Down
41 changes: 25 additions & 16 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,9 @@ type diagramData struct {
Processors []diagramProcessor `json:"processors"`
Edges []diagramEdge `json:"edges"`

flags DiagramFlags
flowID FlowID
flags DiagramFlags
flowID FlowID
nodeIDs []roachpb.NodeID
}

var _ FlowDiagram = &diagramData{}
Expand All @@ -595,27 +596,36 @@ func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) {
statsMap := ExtractStatsFromSpans(spans, d.flags.MakeDeterministic)
for i := range d.Processors {
p := &d.Processors[i]
component := ProcessorComponentID(d.flowID, p.processorID)
nodeID := d.nodeIDs[p.NodeIdx]
component := ProcessorComponentID(nodeID, d.flowID, p.processorID)
if compStats := statsMap[component]; compStats != nil {
p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...)
}
}
for i := range d.Edges {
component := StreamComponentID(d.flowID, d.Edges[i].streamID)
originNodeID := d.nodeIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx]
component := StreamComponentID(originNodeID, d.flowID, d.Edges[i].streamID)
if compStats := statsMap[component]; compStats != nil {
d.Edges[i].Stats = compStats.StatsForQueryPlan()
}
}
}

// generateDiagramData generates the diagram data, given a list of flows (one
// per node). The nodeIDs list corresponds 1-1 to the flows list.
func generateDiagramData(
sql string, flows []FlowSpec, nodeNames []string, flags DiagramFlags,
sql string, flows []FlowSpec, nodeIDs []roachpb.NodeID, flags DiagramFlags,
) (FlowDiagram, error) {
d := &diagramData{
SQL: sql,
NodeNames: nodeNames,
flags: flags,
SQL: sql,
nodeIDs: nodeIDs,
flags: flags,
}
d.NodeNames = make([]string, len(nodeIDs))
for i := range d.NodeNames {
d.NodeNames[i] = nodeIDs[i].String()
}

if len(flows) > 0 {
d.flowID = flows[0].FlowID
for i := 1; i < len(flows); i++ {
Expand Down Expand Up @@ -746,21 +756,20 @@ func GeneratePlanDiagram(
) (FlowDiagram, error) {
// We sort the flows by node because we want the diagram data to be
// deterministic.
nodeIDs := make([]int, 0, len(flows))
nodeIDs := make([]roachpb.NodeID, 0, len(flows))
for n := range flows {
nodeIDs = append(nodeIDs, int(n))
nodeIDs = append(nodeIDs, n)
}
sort.Ints(nodeIDs)
sort.Slice(nodeIDs, func(i, j int) bool {
return nodeIDs[i] < nodeIDs[j]
})

flowSlice := make([]FlowSpec, len(nodeIDs))
nodeNames := make([]string, len(nodeIDs))
for i, nVal := range nodeIDs {
n := roachpb.NodeID(nVal)
for i, n := range nodeIDs {
flowSlice[i] = *flows[n]
nodeNames[i] = n.String()
}

return generateDiagramData(sql, flowSlice, nodeNames, flags)
return generateDiagramData(sql, flowSlice, nodeIDs, flags)
}

// GeneratePlanDiagramURL generates the json data for a flow diagram and a
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
)

type processorStats struct {
// TODO(radu): this field redundant with stats.Component.NodeID.
nodeID roachpb.NodeID
stats *execinfrapb.ComponentStats
}

type streamStats struct {
// TODO(radu): this field redundant with stats.Component.NodeID.
originNodeID roachpb.NodeID
destinationNodeID roachpb.NodeID
stats *execinfrapb.ComponentStats
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
cumulativeContentionTime = node1ContentionTime + node2ContentionTime
)
a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}}
n1 := roachpb.NodeID(1)
n2 := roachpb.NodeID(2)
a.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
n1,
execinfrapb.FlowID{UUID: uuid.MakeV4()},
1, /* processorID */
),
Expand All @@ -209,9 +211,9 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
)

a.AddComponentStats(
2, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
n2,
execinfrapb.FlowID{UUID: uuid.MakeV4()},
2, /* processorID */
),
Expand Down Expand Up @@ -289,10 +291,12 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) {
// Artificially inject component stats directly into the FlowsMetadata (in
// the non-testing setting the stats come from the trace).
var f1, f2 execstats.FlowsMetadata
n1 := roachpb.NodeID(1)
n2 := roachpb.NodeID(2)
f1.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
n1,
execinfrapb.FlowID{UUID: uuid.MakeV4()},
1, /* processorID */
),
Expand All @@ -302,9 +306,9 @@ func TestGetQueryLevelStatsAccumulates(t *testing.T) {
},
)
f2.AddComponentStats(
2, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
n2,
execinfrapb.FlowID{UUID: uuid.MakeV4()},
2, /* processorID */
),
Expand Down
Loading

0 comments on commit 1a74e20

Please sign in to comment.