diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 5da8fbe186bd..6d3e85e29316 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -430,10 +430,10 @@ func streamPartition( execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) - return &eventStream{ + return tree.MakeStreamingValueGenerator(&eventStream{ streamID: streamID, spec: spec, execCfg: execCfg, mon: evalCtx.Mon, - }, nil + }), nil } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 5f7a70273fd0..82176394ec93 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -120,6 +120,17 @@ func newProjectSetProcessor( return ps, nil } +// MustBeStreaming implements the execinfra.Processor interface. +func (ps *projectSetProcessor) MustBeStreaming() bool { + // If we have a single streaming generator, then the processor is such too. + for _, gen := range ps.gens { + if tree.IsStreamingValueGenerator(gen) { + return true + } + } + return false +} + // Start is part of the RowSource interface. func (ps *projectSetProcessor) Start(ctx context.Context) { ctx = ps.StartInternal(ctx, projectSetProcName) diff --git a/pkg/sql/sem/tree/generators.go b/pkg/sql/sem/tree/generators.go index 6ddea25cc37c..9fa94d8f755b 100644 --- a/pkg/sql/sem/tree/generators.go +++ b/pkg/sql/sem/tree/generators.go @@ -70,3 +70,22 @@ type GeneratorFactory func(ctx *EvalContext, args Datums) (ValueGenerator, error // ValueGenerators that gives implementations the ability to see the builtin's // arguments before evaluation, as Exprs. type GeneratorWithExprsFactory func(ctx *EvalContext, args Exprs) (ValueGenerator, error) + +// streamingValueGenerator is a marker-type indicating that the wrapped +// generator is of "streaming" nature, thus, projectSet processor must be +// streaming too. +type streamingValueGenerator struct { + ValueGenerator +} + +// MakeStreamingValueGenerator marks the generator as "streaming". +func MakeStreamingValueGenerator(gen ValueGenerator) ValueGenerator { + return streamingValueGenerator{ValueGenerator: gen} +} + +// IsStreamingValueGenerator returns whether the generator is of the "streaming" +// nature. +func IsStreamingValueGenerator(gen ValueGenerator) bool { + _, ok := gen.(streamingValueGenerator) + return ok +}