Skip to content

Commit

Permalink
sql: mark planNodeToRowSource as streaming intelligently
Browse files Browse the repository at this point in the history
Previously, out of abundance of caution (and some laziness) we marked
all `planNodeToRowSource` processors as of "streaming" nature. This
marker influences whether we wrap it with a streaming or buffering
columnarizer into the vectorized flow. However, doing so is unnecessary
in most cases and kills some of the benefits of the vectorized model.
The only special planNode is `hookFnNode` which must be streaming, all
others are safe to have buffering around them. This commit implements
that idea. This required adding another method to `Processor` interface.

Release note: None
  • Loading branch information
yuzefovich committed Apr 20, 2021
1 parent defd143 commit 7b67857
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 21 deletions.
5 changes: 0 additions & 5 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ type splitAndScatterProcessor struct {

var _ execinfra.Processor = &splitAndScatterProcessor{}

// OutputTypes implements the execinfra.Processor interface.
func (ssp *splitAndScatterProcessor) OutputTypes() []*types.T {
return splitAndScatterOutputTypes
}

func newSplitAndScatterProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (

type changeAggregator struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.ChangeAggregatorSpec
Expand Down Expand Up @@ -190,6 +189,11 @@ func newChangeAggregatorProcessor(
return ca, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (ca *changeAggregator) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
ctx = ca.StartInternal(ctx, changeAggregatorProcName)
Expand Down Expand Up @@ -834,7 +838,6 @@ const (

type changeFrontier struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.ChangeFrontierSpec
Expand Down Expand Up @@ -967,6 +970,11 @@ func newChangeFrontierProcessor(
return cf, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (cf *changeFrontier) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) {
// StartInternal called at the beginning of the function because there are
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (sp *csvWriter) OutputTypes() []*types.T {
return res
}

func (sp *csvWriter) MustBeStreaming() bool {
return false
}

func (sp *csvWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer span.Finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const streamIngestionFrontierProcName = `ingestfntr`

type streamIngestionFrontier struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.StreamIngestionFrontierSpec
Expand Down Expand Up @@ -86,6 +85,11 @@ func newStreamIngestionFrontierProcessor(
return sf, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (sf *streamIngestionFrontier) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (sf *streamIngestionFrontier) Start(ctx context.Context) {
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ func wrapRowSources(
return nil, releasables, err
}

proc, isProcessor := toWrap.(execinfra.Processor)
if !isProcessor {
return nil, nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap)
}
var c *colexec.Columnarizer
if _, mustBeStreaming := toWrap.(execinfra.StreamingProcessor); mustBeStreaming {
if proc.MustBeStreaming() {
c, err = colexec.NewStreamingColumnarizer(
ctx, colmem.NewAllocator(ctx, args.StreamingMemAccount, factory), flowCtx, args.Spec.ProcessorID, toWrap,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type RowSource interface {
// RowSourcedProcessor is the union of RowSource and Processor.
type RowSourcedProcessor interface {
RowSource
Run(context.Context)
Processor
}

// Run reads records from the source and outputs them to the receiver, properly
Expand Down
18 changes: 10 additions & 8 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type Processor interface {
// through an output router).
OutputTypes() []*types.T

// MustBeStreaming indicates whether this processor is of "streaming" nature
// and is expected to emit the output one row at a time (in both row-by-row
// and the vectorized engines).
MustBeStreaming() bool

// Run is the main loop of the processor.
Run(context.Context)
}
Expand Down Expand Up @@ -505,6 +510,11 @@ type ProcessorBase struct {
curInputToDrain int
}

// MustBeStreaming implements the Processor interface.
func (pb *ProcessorBase) MustBeStreaming() bool {
return false
}

// Reset resets this ProcessorBase, retaining allocated memory in slices.
func (pb *ProcessorBase) Reset() {
pb.Out.Reset()
Expand Down Expand Up @@ -975,18 +985,10 @@ func NewLimitedMonitor(
// these objects at creation time.
type LocalProcessor interface {
RowSourcedProcessor
StreamingProcessor
// InitWithOutput initializes this processor.
InitWithOutput(flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
// SetInput initializes this LocalProcessor with an input RowSource. Not all
// LocalProcessors need inputs, but this needs to be called if a
// LocalProcessor expects to get its data from another RowSource.
SetInput(ctx context.Context, input RowSource) error
}

// StreamingProcessor is a marker interface that indicates that the processor is
// of "streaming" nature and is expected to emit the output one tuple at a time
// (in both row-by-row and the vectorized engines).
type StreamingProcessor interface {
mustBeStreaming()
}
9 changes: 8 additions & 1 deletion pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type metadataForwarder interface {

type planNodeToRowSource struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

input execinfra.RowSource

Expand Down Expand Up @@ -71,6 +70,14 @@ func makePlanNodeToRowSource(

var _ execinfra.LocalProcessor = &planNodeToRowSource{}

// MustBeStreaming implements the execinfra.Processor interface.
func (p *planNodeToRowSource) MustBeStreaming() bool {
// hookFnNode is special because it might be blocked forever if we decide to
// buffer its output.
_, isHookFnNode := p.node.(*hookFnNode)
return isHookFnNode
}

// InitWithOutput implements the LocalProcessor interface.
func (p *planNodeToRowSource) InitWithOutput(
flowCtx *execinfra.FlowCtx, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver,
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ type backfiller struct {
processorID int32
}

// OutputTypes is part of the processor interface.
// OutputTypes is part of the execinfra.Processor interface.
func (*backfiller) OutputTypes() []*types.T {
// No output types.
return nil
}

// Run is part of the Processor interface.
// MustBeStreaming is part of the execinfra.Processor interface.
func (*backfiller) MustBeStreaming() bool {
return false
}

// Run is part of the execinfra.Processor interface.
func (b *backfiller) Run(ctx context.Context) {
opName := fmt.Sprintf("%sBackfiller", b.name)
ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID))
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (ib *indexBackfiller) OutputTypes() []*types.T {
return nil
}

func (ib *indexBackfiller) MustBeStreaming() bool {
return false
}

// indexEntryBatch represents a "batch" of index entries which are constructed
// and sent for ingestion. Breaking up the index entries into these batches
// serves for better progress reporting as explained in the ingestIndexEntries
Expand Down

0 comments on commit 7b67857

Please sign in to comment.