diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 0edc40f75edd..bcc6c7a895af 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -158,11 +158,6 @@ type splitAndScatterProcessor struct { var _ execinfra.Processor = &splitAndScatterProcessor{} -// OutputTypes implements the execinfra.Processor interface. -func (ssp *splitAndScatterProcessor) OutputTypes() []*types.T { - return splitAndScatterOutputTypes -} - func newSplitAndScatterProcessor( flowCtx *execinfra.FlowCtx, processorID int32, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 986d1fc04068..c58b38657831 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -49,7 +49,6 @@ import ( type changeAggregator struct { execinfra.ProcessorBase - execinfra.StreamingProcessor flowCtx *execinfra.FlowCtx spec execinfrapb.ChangeAggregatorSpec @@ -190,6 +189,11 @@ func newChangeAggregatorProcessor( return ca, nil } +// MustBeStreaming implements the execinfra.Processor interface. +func (ca *changeAggregator) MustBeStreaming() bool { + return true +} + // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { ctx = ca.StartInternal(ctx, changeAggregatorProcName) @@ -834,7 +838,6 @@ const ( type changeFrontier struct { execinfra.ProcessorBase - execinfra.StreamingProcessor flowCtx *execinfra.FlowCtx spec execinfrapb.ChangeFrontierSpec @@ -967,6 +970,11 @@ func newChangeFrontierProcessor( return cf, nil } +// MustBeStreaming implements the execinfra.Processor interface. +func (cf *changeFrontier) MustBeStreaming() bool { + return true +} + // Start is part of the RowSource interface. func (cf *changeFrontier) Start(ctx context.Context) { // StartInternal called at the beginning of the function because there are diff --git a/pkg/ccl/importccl/exportcsv.go b/pkg/ccl/importccl/exportcsv.go index cece1cf12820..06e42364ee5d 100644 --- a/pkg/ccl/importccl/exportcsv.go +++ b/pkg/ccl/importccl/exportcsv.go @@ -164,6 +164,10 @@ func (sp *csvWriter) OutputTypes() []*types.T { return res } +func (sp *csvWriter) MustBeStreaming() bool { + return false +} + func (sp *csvWriter) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "csvWriter") defer span.Finish() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 8fcac23eafad..9d2aff6bd1aa 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -30,7 +30,6 @@ const streamIngestionFrontierProcName = `ingestfntr` type streamIngestionFrontier struct { execinfra.ProcessorBase - execinfra.StreamingProcessor flowCtx *execinfra.FlowCtx spec execinfrapb.StreamIngestionFrontierSpec @@ -86,6 +85,11 @@ func newStreamIngestionFrontierProcessor( return sf, nil } +// MustBeStreaming implements the execinfra.Processor interface. +func (sf *streamIngestionFrontier) MustBeStreaming() bool { + return true +} + // Start is part of the RowSource interface. func (sf *streamIngestionFrontier) Start(ctx context.Context) { ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index de69eabc1280..db32fa46a765 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -118,8 +118,12 @@ func wrapRowSources( return nil, releasables, err } + proc, isProcessor := toWrap.(execinfra.Processor) + if !isProcessor { + return nil, nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap) + } var c *colexec.Columnarizer - if _, mustBeStreaming := toWrap.(execinfra.StreamingProcessor); mustBeStreaming { + if proc.MustBeStreaming() { c, err = colexec.NewStreamingColumnarizer( ctx, colmem.NewAllocator(ctx, args.StreamingMemAccount, factory), flowCtx, args.Spec.ProcessorID, toWrap, ) diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index dccb24d535d7..40c01409cbc8 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -163,7 +163,7 @@ type RowSource interface { // RowSourcedProcessor is the union of RowSource and Processor. type RowSourcedProcessor interface { RowSource - Run(context.Context) + Processor } // Run reads records from the source and outputs them to the receiver, properly diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 77a99e226f20..b51378f04109 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -35,6 +35,11 @@ type Processor interface { // through an output router). OutputTypes() []*types.T + // MustBeStreaming indicates whether this processor is of "streaming" nature + // and is expected to emit the output one row at a time (in both row-by-row + // and the vectorized engines). + MustBeStreaming() bool + // Run is the main loop of the processor. Run(context.Context) } @@ -505,6 +510,11 @@ type ProcessorBase struct { curInputToDrain int } +// MustBeStreaming implements the Processor interface. +func (pb *ProcessorBase) MustBeStreaming() bool { + return false +} + // Reset resets this ProcessorBase, retaining allocated memory in slices. func (pb *ProcessorBase) Reset() { pb.Out.Reset() @@ -975,7 +985,6 @@ func NewLimitedMonitor( // these objects at creation time. type LocalProcessor interface { RowSourcedProcessor - StreamingProcessor // InitWithOutput initializes this processor. InitWithOutput(flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error // SetInput initializes this LocalProcessor with an input RowSource. Not all @@ -983,10 +992,3 @@ type LocalProcessor interface { // LocalProcessor expects to get its data from another RowSource. SetInput(ctx context.Context, input RowSource) error } - -// StreamingProcessor is a marker interface that indicates that the processor is -// of "streaming" nature and is expected to emit the output one tuple at a time -// (in both row-by-row and the vectorized engines). -type StreamingProcessor interface { - mustBeStreaming() -} diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index effee1407265..b71bc6d608d4 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -28,7 +28,6 @@ type metadataForwarder interface { type planNodeToRowSource struct { execinfra.ProcessorBase - execinfra.StreamingProcessor input execinfra.RowSource @@ -71,6 +70,14 @@ func makePlanNodeToRowSource( var _ execinfra.LocalProcessor = &planNodeToRowSource{} +// MustBeStreaming implements the execinfra.Processor interface. +func (p *planNodeToRowSource) MustBeStreaming() bool { + // hookFnNode is special because it might be blocked forever if we decide to + // buffer its output. + _, isHookFnNode := p.node.(*hookFnNode) + return isHookFnNode +} + // InitWithOutput implements the LocalProcessor interface. func (p *planNodeToRowSource) InitWithOutput( flowCtx *execinfra.FlowCtx, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index 5904312cfaf5..746fcad23ce2 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -73,13 +73,18 @@ type backfiller struct { processorID int32 } -// OutputTypes is part of the processor interface. +// OutputTypes is part of the execinfra.Processor interface. func (*backfiller) OutputTypes() []*types.T { // No output types. return nil } -// Run is part of the Processor interface. +// MustBeStreaming is part of the execinfra.Processor interface. +func (*backfiller) MustBeStreaming() bool { + return false +} + +// Run is part of the execinfra.Processor interface. func (b *backfiller) Run(ctx context.Context) { opName := fmt.Sprintf("%sBackfiller", b.name) ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID)) diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index ce9843527322..231603956022 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -104,6 +104,10 @@ func (ib *indexBackfiller) OutputTypes() []*types.T { return nil } +func (ib *indexBackfiller) MustBeStreaming() bool { + return false +} + // indexEntryBatch represents a "batch" of index entries which are constructed // and sent for ingestion. Breaking up the index entries into these batches // serves for better progress reporting as explained in the ingestIndexEntries