From 8369afa893138baf268b72b319143ffde89373c2 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sat, 15 May 2021 11:22:48 -0700 Subject: [PATCH] colflow: clean up cancel flow fn retrieval Not sure why but in the flow coordinator we were using a function that returns a flow cancellation function. I believe the lazy evaluation is not needed because `FlowBase.Setup` is called before we perform the vectorized flow setup, so the proper cancellation function is available when we need it. Release note: None --- pkg/sql/colflow/flow_coordinator.go | 10 ++++------ pkg/sql/colflow/vectorized_flow.go | 2 +- pkg/sql/colflow/vectorized_flow_shutdown_test.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 1b2adb06426b..952d088bf484 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -36,10 +36,8 @@ type FlowCoordinator struct { row rowenc.EncDatumRow meta *execinfrapb.ProducerMetadata - // cancelFlow will return a function to cancel the context of the flow. It - // is a function in order to be lazily evaluated, since the context - // cancellation function is only available after the flow is Start()'ed. - cancelFlow func() context.CancelFunc + // cancelFlow cancels the context of the flow. + cancelFlow context.CancelFunc } var flowCoordinatorPool = sync.Pool{ @@ -57,7 +55,7 @@ func NewFlowCoordinator( processorID int32, input execinfra.RowSource, output execinfra.RowReceiver, - cancelFlow func() context.CancelFunc, + cancelFlow context.CancelFunc, ) *FlowCoordinator { f := flowCoordinatorPool.Get().(*FlowCoordinator) f.input = input @@ -155,7 +153,7 @@ func (f *FlowCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad func (f *FlowCoordinator) close() { if f.InternalClose() { - f.cancelFlow()() + f.cancelFlow() } } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index abba1b32f07f..1c2a5d1b0052 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -1009,7 +1009,7 @@ func (s *vectorizedFlowCreator) setupOutput( pspec.ProcessorID, input, s.syncFlowConsumer, - s.getCancelFlowFn, + s.getCancelFlowFn(), ) // The flow coordinator is a root of its operator chain. s.opChains = append(s.opChains, f) diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index da247a455ff8..c2bc520d03ab 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -364,7 +364,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { 1, /* processorID */ materializer, nil, /* output */ - func() context.CancelFunc { return cancelLocal }, + cancelLocal, ) coordinator.Start(ctxLocal)