Skip to content

Commit

Permalink
colflow: clean up cancel flow fn retrieval
Browse files Browse the repository at this point in the history
Not sure why but in the flow coordinator we were using a function that
returns a flow cancellation function. I believe the lazy evaluation is
not needed because `FlowBase.Setup` is called before we perform the
vectorized flow setup, so the proper cancellation function is available
when we need it.

Release note: None
  • Loading branch information
yuzefovich committed May 19, 2021
1 parent 242b68b commit 8369afa
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
10 changes: 4 additions & 6 deletions pkg/sql/colflow/flow_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ type FlowCoordinator struct {
row rowenc.EncDatumRow
meta *execinfrapb.ProducerMetadata

// cancelFlow will return a function to cancel the context of the flow. It
// is a function in order to be lazily evaluated, since the context
// cancellation function is only available after the flow is Start()'ed.
cancelFlow func() context.CancelFunc
// cancelFlow cancels the context of the flow.
cancelFlow context.CancelFunc
}

var flowCoordinatorPool = sync.Pool{
Expand All @@ -57,7 +55,7 @@ func NewFlowCoordinator(
processorID int32,
input execinfra.RowSource,
output execinfra.RowReceiver,
cancelFlow func() context.CancelFunc,
cancelFlow context.CancelFunc,
) *FlowCoordinator {
f := flowCoordinatorPool.Get().(*FlowCoordinator)
f.input = input
Expand Down Expand Up @@ -155,7 +153,7 @@ func (f *FlowCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

func (f *FlowCoordinator) close() {
if f.InternalClose() {
f.cancelFlow()()
f.cancelFlow()
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (s *vectorizedFlowCreator) setupOutput(
pspec.ProcessorID,
input,
s.syncFlowConsumer,
s.getCancelFlowFn,
s.getCancelFlowFn(),
)
// The flow coordinator is a root of its operator chain.
s.opChains = append(s.opChains, f)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestVectorizedFlowShutdown(t *testing.T) {
1, /* processorID */
materializer,
nil, /* output */
func() context.CancelFunc { return cancelLocal },
cancelLocal,
)
coordinator.Start(ctxLocal)

Expand Down

0 comments on commit 8369afa

Please sign in to comment.