diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index be2106cb3629..bd5e4512e76f 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -214,6 +214,10 @@ type FlowBase struct { // goroutines. startedGoroutines bool + // headProcStarted tracks whether Start was called on the "head" processor + // in Run. + headProcStarted bool + // inboundStreams are streams that receive data from other hosts; this map // is to be passed to FlowRegistry.RegisterFlow. This map is populated in // Flow.Setup(), so it is safe to lookup into concurrently later. @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) { } f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) + f.headProcStarted = true headProc.Run(ctx, headOutput) } @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { // ConsumerClosed on the source (i.e. the "head" processor). // // The method is only called if: +// - the flow is local (pausable portals currently don't support DistSQL) // - there is exactly 1 processor in the flow that runs in its own goroutine // (which is always the case for pausable portal model at this time) +// - Start was called on that processor (ConsumerClosed is only valid to be +// called after Start) // - that single processor implements execinfra.RowSource interface (those // processors that don't implement it shouldn't be running through pausable // portal model). // // Otherwise, this method is a noop. func (f *FlowBase) ConsumerClosedOnHeadProc() { + if !f.IsLocal() { + return + } if len(f.processors) != 1 { return } + if !f.headProcStarted { + return + } rs, ok := f.processors[0].(execinfra.RowSource) if !ok { return