Skip to content

Commit

Permalink
rowexec: mark eventStream as a "streaming" value gen for projectSet proc
Browse files Browse the repository at this point in the history
Currently, `projectSet` processor is reported as not of "streaming"
nature. This means that if we have a vectorized flow, and the project
set processor is wrapped into it, the columnarizer on top of the
processor will buffer rows internally. This is done so that the operator
on top of the columnarizer has enough things to process within a single
batch.

However, some value generators (like event stream) are of "streaming"
nature, and we want to avoid all delays in propagating their rows. Thus,
this commit introduces a way for a value generator to opt in to be of
"streaming" nature, and then marks the event stream generator as such.

Release note: None
  • Loading branch information
yuzefovich committed Apr 4, 2022
1 parent bb2c29c commit 9bb5d30
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ func streamPartition(

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)

return &eventStream{
return tree.MakeStreamingValueGenerator(&eventStream{
streamID: streamID,
spec: spec,
execCfg: execCfg,
mon: evalCtx.Mon,
}, nil
}), nil
}
11 changes: 11 additions & 0 deletions pkg/sql/rowexec/project_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ func newProjectSetProcessor(
return ps, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (ps *projectSetProcessor) MustBeStreaming() bool {
// If we have a single streaming generator, then the processor is such too.
for _, gen := range ps.gens {
if tree.IsStreamingValueGenerator(gen) {
return true
}
}
return false
}

// Start is part of the RowSource interface.
func (ps *projectSetProcessor) Start(ctx context.Context) {
ctx = ps.StartInternal(ctx, projectSetProcName)
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/sem/tree/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,22 @@ type GeneratorFactory func(ctx *EvalContext, args Datums) (ValueGenerator, error
// ValueGenerators that gives implementations the ability to see the builtin's
// arguments before evaluation, as Exprs.
type GeneratorWithExprsFactory func(ctx *EvalContext, args Exprs) (ValueGenerator, error)

// streamingValueGenerator is a marker-type indicating that the wrapped
// generator is of "streaming" nature, thus, projectSet processor must be
// streaming too.
type streamingValueGenerator struct {
ValueGenerator
}

// MakeStreamingValueGenerator marks the generator as "streaming".
func MakeStreamingValueGenerator(gen ValueGenerator) ValueGenerator {
return streamingValueGenerator{ValueGenerator: gen}
}

// IsStreamingValueGenerator returns whether the generator is of the "streaming"
// nature.
func IsStreamingValueGenerator(gen ValueGenerator) bool {
_, ok := gen.(streamingValueGenerator)
return ok
}

0 comments on commit 9bb5d30

Please sign in to comment.