Skip to content

Commit

Permalink
execinfra: fix starting of processors
Browse files Browse the repository at this point in the history
Almost all processors embed `ProcessorBase` and call `StartInternal` on
it. That function derives a new context and starts a tracing span if the
tracing is enabled on the parent ctx and the derived context is
returned. Previously, in all callsites the inputs to the processors
would get `Start`ed before `StartInternal` is called, but this doesn't
make sense: the consumer's ctx and span should be encompassing the
producer's (input's) ctx and span, so this commit switches all callsites
to the following layout
```
  ctx = p.StartInternal(ctx)
  input.Start(ctx) // if there are any inputs
```

Release justification: bug fix.

Release note: None
  • Loading branch information
yuzefovich committed Mar 1, 2021
1 parent 8352297 commit 1032aea
Show file tree
Hide file tree
Showing 24 changed files with 33 additions and 105 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ func newRestoreDataProcessor(

// Start is part of the RowSource interface.
func (rd *restoreDataProcessor) Start(ctx context.Context) {
rd.input.Start(ctx)
ctx = rd.StartInternal(ctx, restoreDataProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
rd.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,11 +906,10 @@ func newChangeFrontierProcessor(

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) {
cf.input.Start(ctx)

// StartInternal called at the beginning of the function because there are
// early returns if errors are detected.
ctx = cf.StartInternal(ctx, changeFrontierProcName)
cf.input.Start(ctx)

// Pass a nil oracle because this sink is only used to emit resolved timestamps
// but the oracle is only used when emitting row updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ func newStreamIngestionFrontierProcessor(

// Start is part of the RowSource interface.
func (sf *streamIngestionFrontier) Start(ctx context.Context) {
sf.input.Start(ctx)
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
sf.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,8 @@ func (m *Materializer) Child(nth int, verbose bool) execinfra.OpNode {

// Start is part of the execinfra.RowSource interface.
func (m *Materializer) Start(ctx context.Context) {
m.drainHelper.Start(ctx)
ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
m.drainHelper.Start(ctx)
// We can encounter an expected error during Init (e.g. an operator
// attempts to allocate a batch, but the memory budget limit has been
// reached), so we need to wrap it with a catcher.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/execinfra/metadata_test_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,8 @@ func (mtr *MetadataTestReceiver) checkRowNumMetadata() *execinfrapb.ProducerMeta

// Start is part of the RowSource interface.
func (mtr *MetadataTestReceiver) Start(ctx context.Context) {
mtr.input.Start(ctx)
ctx = mtr.StartInternal(ctx, metadataTestReceiverProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
mtr.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/execinfra/metadata_test_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,8 @@ func NewMetadataTestSender(

// Start is part of the RowSource interface.
func (mts *MetadataTestSender) Start(ctx context.Context) {
mts.input.Start(ctx)
ctx = mts.StartInternal(ctx, metadataTestSenderProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
mts.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.
// It is likely that this method is called from RowSource.Start implementation,
// and the recommended layout is the following:
// ctx = pb.StartInternal(ctx, name)
// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb
// < other initialization >
// so that the caller doesn't mistakenly use old ctx object.
func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (ag *orderedAggregator) Start(ctx context.Context) {
}

func (ag *aggregatorBase) start(ctx context.Context, procName string) {
ag.input.Start(ctx)
ctx = ag.StartInternal(ctx, procName)
ag.input.Start(ctx)
ag.cancelChecker = cancelchecker.NewCancelChecker(ctx)
ag.runningState = aggAccumulating
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newBulkRowWriterProcessor(

// Start is part of the RowSource interface.
func (sp *bulkRowWriter) Start(ctx context.Context) {
sp.input.Start(ctx)
ctx = sp.StartInternal(ctx, "bulkRowWriter")
sp.input.Start(ctx)
err := sp.work(ctx)
sp.MoveToDraining(err)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/countrows.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ func newCountAggregator(
}

func (ag *countAggregator) Start(ctx context.Context) {
ag.input.Start(ctx)
ctx = ag.StartInternal(ctx, countRowsProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ag.input.Start(ctx)
}

func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
Expand Down
12 changes: 2 additions & 10 deletions pkg/sql/rowexec/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,14 @@ func newDistinct(

// Start is part of the RowSource interface.
func (d *distinct) Start(ctx context.Context) {
d.input.Start(ctx)
ctx = d.StartInternal(ctx, distinctProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
d.input.Start(ctx)
}

// Start is part of the RowSource interface.
func (d *sortedDistinct) Start(ctx context.Context) {
d.input.Start(ctx)
ctx = d.StartInternal(ctx, sortedDistinctProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
d.input.Start(ctx)
}

func (d *distinct) matchLastGroupKey(row rowenc.EncDatumRow) (bool, error) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ func newFiltererProcessor(

// Start is part of the RowSource interface.
func (f *filtererProcessor) Start(ctx context.Context) {
f.input.Start(ctx)
ctx = f.StartInternal(ctx, filtererProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
f.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func newHashJoiner(

// Start is part of the RowSource interface.
func (h *hashJoiner) Start(ctx context.Context) {
ctx = h.StartInternal(ctx, hashJoinerProcName)
h.leftSource.Start(ctx)
h.rightSource.Start(ctx)
ctx = h.StartInternal(ctx, hashJoinerProcName)
h.cancelChecker = cancelchecker.NewCancelChecker(ctx)
h.runningState = hjBuilding
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/inverted_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,8 @@ func (ifr *invertedFilterer) emitRow() (

// Start is part of the RowSource interface.
func (ifr *invertedFilterer) Start(ctx context.Context) {
ifr.input.Start(ctx)
ctx = ifr.StartInternal(ctx, invertedFiltererProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ifr.input.Start(ctx)
ifr.runningState = ifrReadingInput
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,8 @@ func (ij *invertedJoiner) transformToTableRow(indexRow rowenc.EncDatumRow) {

// Start is part of the RowSource interface.
func (ij *invertedJoiner) Start(ctx context.Context) {
ij.input.Start(ctx)
ctx = ij.StartInternal(ctx, invertedJoinerProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ij.input.Start(ctx)
ij.runningState = ijReadingInput
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,8 @@ func (jr *joinReader) emitRow() (

// Start is part of the RowSource interface.
func (jr *joinReader) Start(ctx context.Context) {
jr.input.Start(ctx)
ctx = jr.StartInternal(ctx, joinReaderProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
jr.input.Start(ctx)
jr.runningState = jrReadingInput
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func newMergeJoiner(

// Start is part of the RowSource interface.
func (m *mergeJoiner) Start(ctx context.Context) {
m.streamMerger.start(ctx)
ctx = m.StartInternal(ctx, mergeJoinerProcName)
m.streamMerger.start(ctx)
m.cancelChecker = cancelchecker.NewCancelChecker(ctx)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ func newNoopProcessor(

// Start is part of the RowSource interface.
func (n *noopProcessor) Start(ctx context.Context) {
n.input.Start(ctx)
ctx = n.StartInternal(ctx, noopProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
n.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/ordinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,8 @@ func newOrdinalityProcessor(

// Start is part of the RowSource interface.
func (o *ordinalityProcessor) Start(ctx context.Context) {
o.input.Start(ctx)
ctx = o.StartInternal(ctx, ordinalityProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
o.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/project_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func newProjectSetProcessor(

// Start is part of the RowSource interface.
func (ps *projectSetProcessor) Start(ctx context.Context) {
ps.input.Start(ctx)
ctx = ps.StartInternal(ctx, projectSetProcName)
ps.input.Start(ctx)
ps.cancelChecker = cancelchecker.NewCancelChecker(ctx)
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,14 @@ func (s *sampleAggregator) pushTrailingMeta(ctx context.Context) {

// Run is part of the Processor interface.
func (s *sampleAggregator) Run(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, sampleAggregatorProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
s.input.Start(ctx)

earlyExit, err := s.mainLoop(s.Ctx)
earlyExit, err := s.mainLoop(ctx)
if err != nil {
execinfra.DrainAndClose(s.Ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
execinfra.DrainAndClose(ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
} else if !earlyExit {
s.pushTrailingMeta(s.Ctx)
s.pushTrailingMeta(ctx)
s.input.ConsumerClosed()
s.Out.Close()
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/rowexec/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,14 @@ func (s *samplerProcessor) pushTrailingMeta(ctx context.Context) {

// Run is part of the Processor interface.
func (s *samplerProcessor) Run(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, samplerProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
s.input.Start(ctx)

earlyExit, err := s.mainLoop(s.Ctx)
earlyExit, err := s.mainLoop(ctx)
if err != nil {
execinfra.DrainAndClose(s.Ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
execinfra.DrainAndClose(ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
} else if !earlyExit {
s.pushTrailingMeta(s.Ctx)
s.pushTrailingMeta(ctx)
s.input.ConsumerClosed()
s.Out.Close()
}
Expand Down
14 changes: 3 additions & 11 deletions pkg/sql/rowexec/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,8 @@ func newSortAllProcessor(

// Start is part of the RowSource interface.
func (s *sortAllProcessor) Start(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, sortAllProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
s.input.Start(ctx)

valid, err := s.fill()
if !valid || err != nil {
Expand Down Expand Up @@ -343,8 +339,8 @@ func newSortTopKProcessor(

// Start is part of the RowSource interface.
func (s *sortTopKProcessor) Start(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, sortTopKProcName)
s.input.Start(ctx)

// The execution loop for the SortTopK processor is similar to that of the
// SortAll processor; the difference is that we push rows into a max-heap
Expand Down Expand Up @@ -526,12 +522,8 @@ func (s *sortChunksProcessor) fill() (bool, error) {

// Start is part of the RowSource interface.
func (s *sortChunksProcessor) Start(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, sortChunksProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
s.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func newWindower(

// Start is part of the RowSource interface.
func (w *windower) Start(ctx context.Context) {
w.input.Start(ctx)
ctx = w.StartInternal(ctx, windowerProcName)
w.input.Start(ctx)
w.cancelChecker = cancelchecker.NewCancelChecker(ctx)
w.runningState = windowerAccumulating
}
Expand Down

0 comments on commit 1032aea

Please sign in to comment.