From b06375dac3327d9cd3a8db00e0570211d25016fb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 18 Sep 2023 06:08:50 -0700 Subject: [PATCH] flowinfra: fix recently introduced minor bug This commit fixes a minor bug introduced in #110625. In particular, that PR made so that we now unconditionally call `ConsumerClosed` on the "head" processor to make sure that resources are properly closed in the pausable portal model. However, `ConsumerClosed` is only valid to be called only if `Start` was called, so this commit fixes that. Additionally, to de-risk #110625 further we now only call that method for local flows since pausable portals currently disable DistSQL. Epic: None Release note: None --- pkg/sql/flowinfra/flow.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index be2106cb3629..bd5e4512e76f 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -214,6 +214,10 @@ type FlowBase struct { // goroutines. startedGoroutines bool + // headProcStarted tracks whether Start was called on the "head" processor + // in Run. + headProcStarted bool + // inboundStreams are streams that receive data from other hosts; this map // is to be passed to FlowRegistry.RegisterFlow. This map is populated in // Flow.Setup(), so it is safe to lookup into concurrently later. @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) { } f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) + f.headProcStarted = true headProc.Run(ctx, headOutput) } @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { // ConsumerClosed on the source (i.e. the "head" processor). // // The method is only called if: +// - the flow is local (pausable portals currently don't support DistSQL) // - there is exactly 1 processor in the flow that runs in its own goroutine // (which is always the case for pausable portal model at this time) +// - Start was called on that processor (ConsumerClosed is only valid to be +// called after Start) // - that single processor implements execinfra.RowSource interface (those // processors that don't implement it shouldn't be running through pausable // portal model). // // Otherwise, this method is a noop. func (f *FlowBase) ConsumerClosedOnHeadProc() { + if !f.IsLocal() { + return + } if len(f.processors) != 1 { return } + if !f.headProcStarted { + return + } rs, ok := f.processors[0].(execinfra.RowSource) if !ok { return