Skip to content

Commit

Permalink
execinfra: correctly propagate processorID for LocalProcessors
Browse files Browse the repository at this point in the history
Previously, this was incorrectly hard-coded as zero. The impact of this
seems minor (I believe this would only make it so that we could
incorrectly attribute `ComponentStats` object of `planNodeToRowSource`
to the wrong processor), but I think it still deserves to be backported.

Release note: None
  • Loading branch information
yuzefovich committed Mar 15, 2023
1 parent 5660a05 commit c4ca22d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ func NewLimitedMonitorNoFlowCtx(
type LocalProcessor interface {
RowSourcedProcessor
// InitWithOutput initializes this processor.
InitWithOutput(flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
InitWithOutput(flowCtx *FlowCtx, processorID int32, post *execinfrapb.PostProcessSpec, output RowReceiver) error
// SetInput initializes this LocalProcessor with an input RowSource. Not all
// LocalProcessors need inputs, but this needs to be called if a
// LocalProcessor expects to get its data from another RowSource.
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (p *planNodeToRowSource) MustBeStreaming() bool {

// InitWithOutput implements the LocalProcessor interface.
func (p *planNodeToRowSource) InitWithOutput(
flowCtx *execinfra.FlowCtx, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver,
flowCtx *execinfra.FlowCtx,
processorID int32,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) error {
if err := p.InitWithEvalCtx(
p,
Expand All @@ -111,7 +114,7 @@ func (p *planNodeToRowSource) InitWithOutput(
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func NewProcessor(
return nil, err
}
processor := localProcessors[core.LocalPlanNode.RowSourceIdx]
if err := processor.InitWithOutput(flowCtx, post, outputs[0]); err != nil {
if err := processor.InitWithOutput(flowCtx, processorID, post, outputs[0]); err != nil {
return nil, err
}
if numInputs == 1 {
Expand Down

0 comments on commit c4ca22d

Please sign in to comment.