diff --git a/pkg/sql/colflow/colrpc/BUILD.bazel b/pkg/sql/colflow/colrpc/BUILD.bazel index 80d08ac5a502..31a4722c6f66 100644 --- a/pkg/sql/colflow/colrpc/BUILD.bazel +++ b/pkg/sql/colflow/colrpc/BUILD.bazel @@ -57,6 +57,7 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/types", "//pkg/testutils", + "//pkg/util/cancelchecker", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 62810a878482..a60199ad62c9 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -146,9 +147,12 @@ func TestOutboxInbox(t *testing.T) { // streamCtxCancel models a scenario in which the Outbox host cancels // the flow. streamCtxCancel - // readerCtxCancel models a scenario in which the Inbox host cancels the - // flow. This is considered a graceful termination, and the flow context - // isn't canceled. + // flowCtxCancel models a scenario in which the flow context of the + // Inbox host is canceled which is an ungraceful shutdown. + flowCtxCancel + // readerCtxCancel models a scenario in which the consumer of the Inbox + // cancels the context while the host's flow context is not canceled. + // This is considered a graceful termination. readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks @@ -158,16 +162,19 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.25: + case randVal <= 0.2: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.50: + case randVal <= 0.4: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.75: + case randVal <= 0.6: + cancellationScenario = flowCtxCancel + cancellationScenarioName = "flowCtxCancel" + case randVal <= 0.8: cancellationScenario = readerCtxCancel cancellationScenarioName = "readerCtxCancel" - case randVal <= 1: + default: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" } @@ -196,10 +203,10 @@ func TestOutboxInbox(t *testing.T) { inputBuffer = colexecop.NewBatchBuffer() // Generate some random behavior before passing the random number // generator to be used in the Outbox goroutine (to avoid races). - // These sleep variables enable a sleep for up to half a millisecond - // with a .25 probability before cancellation. - sleepBeforeCancellation = rng.Float64() <= 0.25 - sleepTime = time.Microsecond * time.Duration(rng.Intn(500)) + // These sleep variables enable a sleep for up to five milliseconds + // with a .5 probability before cancellation. + sleepBeforeCancellation = rng.Float64() <= 0.5 + sleepTime = time.Microsecond * time.Duration(rng.Intn(5000)) // stopwatch is used to measure how long it takes for the outbox to // exit once the transport broke. stopwatch = timeutil.NewStopWatch() @@ -250,9 +257,14 @@ func TestOutboxInbox(t *testing.T) { ) require.NoError(t, err) + inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) + readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) inboxMemAcc := testMemMonitor.MakeBoundAccount() - defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + defer inboxMemAcc.Close(readerCtx) + inbox, err := NewInboxWithFlowCtxDone( + colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), + typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), + ) require.NoError(t, err) streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) @@ -283,7 +295,6 @@ func TestOutboxInbox(t *testing.T) { wg.Done() }() - readerCtx, readerCancelFn := context.WithCancel(ctx) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -293,6 +304,8 @@ func TestOutboxInbox(t *testing.T) { case noCancel: case streamCtxCancel: streamCancelFn() + case flowCtxCancel: + inboxFlowCtxCancelFn() case readerCtxCancel: readerCancelFn() case transportBreaks: @@ -396,10 +409,38 @@ func TestOutboxInbox(t *testing.T) { // which prompts the watchdog goroutine of the outbox to cancel the // flow. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1) + case flowCtxCancel: + // If the flow context of the Inbox host gets canceled, it is + // treated as an ungraceful termination of the stream, so we expect + // an error from the stream handler. + require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError)) + // The Inbox propagates this cancellation on its host. Depending on + // when the cancellation is noticed by the reader, a different error + // is used, so we allow for both of them. + // + // QueryCanceledError is used when the flow ctx cancellation is + // observed before the stream arrived whereas wrapped + // context.Canceled error is used when the inbox handler goroutine + // notices the cancellation first and ungracefully shuts down the + // stream. + ok := errors.Is(readerErr, cancelchecker.QueryCanceledError) || + testutils.IsError(readerErr, "context canceled") + require.True(t, ok, readerErr) + + // In the production setup, the watchdog goroutine of the outbox + // would receive non-io.EOF error indicating an ungraceful + // completion of the FlowStream RPC call which would prompt the + // outbox to cancel the whole flow. + // + // However, because we're using a mock server, the propagation + // doesn't take place, so the flow context on the outbox side should + // not be canceled. + require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) + require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case readerCtxCancel: - // If the reader context gets canceled, it is treated as a graceful - // termination of the stream, so we expect no error from the stream - // handler. + // If the reader context gets canceled while the inbox's host flow + // context doesn't, it is treated as a graceful termination of the + // stream, so we expect no error from the stream handler. require.Nil(t, streamHandlerErr) // The Inbox should still propagate this error upwards. require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) @@ -560,7 +601,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) var ( @@ -642,7 +683,7 @@ func BenchmarkOutboxInbox(b *testing.B) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(b, err) var wg sync.WaitGroup @@ -766,7 +807,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) { typs := []*types.T{types.Int} - inbox, err := NewInbox(ctx, testAllocator, typs, streamID) + inbox, err := NewInbox(testAllocator, typs, streamID) require.NoError(t, err) ctxExtract := make(chan struct{}) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index fcc9149e8f01..79842dec0716 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -75,6 +75,9 @@ type Inbox struct { // goroutine should exit while waiting for a stream. timeoutCh chan error + // flowCtxDone is the Done() channel of the flow context of the Inbox host. + flowCtxDone <-chan struct{} + // errCh is the channel that RunWithStream will block on, waiting until the // Inbox does not need a stream any more. An error will only be sent on this // channel in the event of a cancellation or a non-io.EOF error originating @@ -99,14 +102,6 @@ type Inbox struct { // only the Next/DrainMeta goroutine may access it. stream flowStreamServer - // flowCtx is a temporary field that captures a flow's context during - // initialization. This is so that RunWithStream can listen for cancellation - // even in the case in which Next is not called (e.g. in cases where the Inbox - // is the left side of a HashJoiner). The best solution for this problem would - // be to refactor Operator.Init to accept a context since that must be called - // regardless of whether or not Next is called. - flowCtx context.Context - // statsAtomics are the execution statistics that need to be atomically // accessed. This is necessary since Get*() methods can be called from // different goroutine than Next(). @@ -134,7 +129,7 @@ var _ colexecop.Operator = &Inbox{} // NewInbox creates a new Inbox. func NewInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, ) (*Inbox, error) { c, err := colserde.NewArrowBatchConverter(typs) if err != nil { @@ -154,13 +149,28 @@ func NewInbox( contextCh: make(chan context.Context, 1), timeoutCh: make(chan error, 1), errCh: make(chan error, 1), - flowCtx: ctx, deserializationStopWatch: timeutil.NewStopWatch(), } i.scratch.data = make([]*array.Data, len(typs)) return i, nil } +// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow +// context is available. +func NewInboxWithFlowCtxDone( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, +) (*Inbox, error) { + i, err := NewInbox(allocator, typs, streamID) + if err != nil { + return nil, err + } + i.flowCtxDone = flowCtxDone + return i, nil +} + // maybeInit calls Inbox.init if the inbox is not initialized and returns an // error if the initialization was not successful. Usually this is because the // given context is canceled before the remote stream arrives. @@ -188,12 +198,15 @@ func (i *Inbox) init(ctx context.Context) error { i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err) return err case <-ctx.Done(): - // Our reader canceled the context meaning that it no longer needs - // any more data from the outbox. This is a graceful termination, so - // we don't send any error on errCh and only return an error. This - // will close the inbox (making the stream handler exit gracefully) - // and will stop the current goroutine from proceeding further. - return ctx.Err() + // errToThrow is propagated to the reader of the Inbox. + errToThrow := ctx.Err() + if err := i.checkFlowCtxCancellation(); err != nil { + // This is an ungraceful termination because the flow context has + // been canceled. + i.errCh <- err + errToThrow = err + } + return errToThrow } if i.ctxInterceptorFn != nil { @@ -215,9 +228,21 @@ func (i *Inbox) close() { } } +// checkFlowCtxCancellation returns an error if the flow context has already +// been canceled. +func (i *Inbox) checkFlowCtxCancellation() error { + select { + case <-i.flowCtxDone: + return cancelchecker.QueryCanceledError + default: + return nil + } +} + // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, a caller of Next cancels the first context passed into Next, or any -// error is encountered on the stream by the Next goroutine. +// canceled, the Inbox's host cancels the flow context, a caller of Next cancels +// the context passed into Init, or any error is encountered on the stream by +// the Next goroutine. func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") @@ -234,7 +259,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return fmt.Errorf("%s: streamCtx while waiting for reader (remote client canceled)", streamCtx.Err()) - case <-i.flowCtx.Done(): + case <-i.flowCtxDone: // The flow context of the inbox host has been canceled. This can occur // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. @@ -244,18 +269,21 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer // Now wait for one of the events described in the method comment. If a // cancellation is encountered, nothing special must be done to cancel the // reader goroutine as returning from the handler will close the stream. - // - // Note that we don't listen for cancellation on flowCtx.Done() because - // readerCtx must be the child of the flow context. select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err + case <-i.flowCtxDone: + // The flow context of the inbox host has been canceled. This can occur + // e.g. when the query is canceled, or when another stream encountered + // an unrecoverable error forcing it to shutdown the flow. + return cancelchecker.QueryCanceledError case <-readerCtx.Done(): - // The reader canceled the stream meaning that it no longer needs any - // more data from the outbox. This is a graceful termination, so we - // return nil. - return nil + // readerCtx is canceled, but we don't know whether it was because the + // flow context was canceled or for other reason. In the former case we + // have an ungraceful shutdown whereas in the latter case we have a + // graceful one. + return i.checkFlowCtxCancellation() case <-streamCtx.Done(): // The client canceled the stream. return fmt.Errorf("%s: streamCtx in Inbox stream handler (remote client canceled)", streamCtx.Err()) diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 44f70e51dad2..f0e216b39015 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -61,7 +61,7 @@ func TestInboxCancellation(t *testing.T) { typs := []*types.T{types.Int} t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) { - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) // Cancel the context. @@ -78,7 +78,7 @@ func TestInboxCancellation(t *testing.T) { t.Run("DuringRecv", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) @@ -111,7 +111,7 @@ func TestInboxCancellation(t *testing.T) { t.Run("StreamHandlerWaitingForReader", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) @@ -129,7 +129,7 @@ func TestInboxCancellation(t *testing.T) { func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) { defer leaktest.AfterTest(t)() - inbox, err := NewInbox(context.Background(), testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) require.NoError(t, err) rpcLayer := makeMockFlowStreamRPCLayer() @@ -156,7 +156,7 @@ func TestInboxTimeout(t *testing.T) { ctx := context.Background() - inbox, err := NewInbox(ctx, testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) require.NoError(t, err) var ( @@ -259,7 +259,7 @@ func TestInboxShutdown(t *testing.T) { inboxCtx, inboxCancel := context.WithCancel(context.Background()) inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(inboxCtx) - inbox, err := NewInbox(context.Background(), colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 1bd10f212bd4..9b097e05a3ba 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -152,6 +152,7 @@ func (o *Outbox) Run( flowCtxCancel context.CancelFunc, connectionTimeout time.Duration, ) { + flowCtx := ctx // Derive a child context so that we can cancel all components rooted in // this outbox. var outboxCtxCancel context.CancelFunc @@ -182,7 +183,12 @@ func (o *Outbox) Run( } client := execinfrapb.NewDistSQLClient(conn) - stream, err = client.FlowStream(ctx) + // We use the flow context for the RPC so that when outbox context is + // canceled in case of a graceful shutdown, the gRPC stream keeps on + // running. If, however, the flow context is canceled, then the + // termination of the whole query is ungraceful, so we're ok with the + // gRPC stream being ungracefully shutdown too. + stream, err = client.FlowStream(flowCtx) if err != nil { log.Warningf( ctx, diff --git a/pkg/sql/colflow/colrpc/outbox_test.go b/pkg/sql/colflow/colrpc/outbox_test.go index 59dbbcca9266..179945a716ea 100644 --- a/pkg/sql/colflow/colrpc/outbox_test.go +++ b/pkg/sql/colflow/colrpc/outbox_test.go @@ -55,7 +55,7 @@ func TestOutboxCatchesPanics(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) streamHandlerErrCh := handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) }) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1f8b574848b4..bcfc4043f77c 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -380,6 +380,8 @@ type flowCreatorHelper interface { accumulateAsyncComponent(runFn) // addMaterializer adds a materializer to the flow. addMaterializer(*colexec.Materializer) + // getCtxDone returns done channel of the context of this flow. + getFlowCtxDone() <-chan struct{} // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc } @@ -405,7 +407,12 @@ type remoteComponentCreator interface { metadataSources []execinfrapb.MetadataSource, toClose []colexecop.Closer, ) (*colrpc.Outbox, error) - newInbox(ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) + newInbox( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, + ) (*colrpc.Inbox, error) } type vectorizedRemoteComponentCreator struct{} @@ -422,9 +429,12 @@ func (vectorizedRemoteComponentCreator) newOutbox( } func (vectorizedRemoteComponentCreator) newInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, ) (*colrpc.Inbox, error) { - return colrpc.NewInbox(ctx, allocator, typs, streamID) + return colrpc.NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) } // vectorizedFlowCreator performs all the setup of vectorized flows. Depending @@ -451,10 +461,6 @@ type vectorizedFlowCreator struct { numOutboxes int32 materializerAdded bool - // numOutboxesExited is an atomic that keeps track of how many outboxes have exited. - // When numOutboxesExited equals numOutboxes, the cancellation function for the flow - // is called. - numOutboxesExited int32 // numOutboxesDrained is an atomic that keeps track of how many outboxes have // been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is // added to a flow-level span. @@ -666,17 +672,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( flowCtxCancel, flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), ) - // When the last Outbox on this node exits, we want to make sure that - // everything is shutdown; namely, we need to call cancelFn if: - // - it is the last Outbox - // - there is no root materializer on this node (if it were, it would take - // care of the cancellation itself) - // - cancelFn is non-nil (it can be nil in tests). - // Calling cancelFn will cancel the context that all infrastructure on this - // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && flowCtxCancel != nil { - flowCtxCancel() - } } s.accumulateAsyncComponent(run) return outbox, nil @@ -843,7 +838,8 @@ func (s *vectorizedFlowCreator) setupInput( } inbox, err := s.remoteComponentCreator.newInbox( - ctx, colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID, + colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), + input.ColumnTypes, inputStream.StreamID, s.flowCreatorHelper.getFlowCtxDone(), ) if err != nil { @@ -1332,6 +1328,10 @@ func (r *vectorizedFlowCreatorHelper) addMaterializer(m *colexec.Materializer) { r.f.SetProcessors(r.processors) } +func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return r.f.GetCtxDone() +} + func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return r.f.GetCancelFlowFn() } @@ -1387,6 +1387,10 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {} func (r *noopFlowCreatorHelper) addMaterializer(*colexec.Materializer) {} +func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return nil +} + func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return nil } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 40400e338b19..b206866c1fa8 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -210,7 +210,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxLocal) inbox, err := colrpc.NewInbox( - ctxLocal, colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), @@ -325,7 +324,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxAnotherRemote) inbox, err := colrpc.NewInbox( - ctxAnotherRemote, colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 7de96b67df56..600afb4a0fb8 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -52,7 +52,7 @@ func (c callbackRemoteComponentCreator) newOutbox( } func (c callbackRemoteComponentCreator) newInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ <-chan struct{}, ) (*colrpc.Inbox, error) { return c.newInboxFn(allocator, typs, streamID) } @@ -207,7 +207,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { return colrpc.NewOutbox(allocator, op, typs, nil /* getStats */, sources, nil /* toClose */) }, newInboxFn: func(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) { - inbox, err := colrpc.NewInbox(context.Background(), allocator, typs, streamID) + inbox, err := colrpc.NewInbox(allocator, typs, streamID) inboxToNumInputTypes[inbox] = typs return inbox, err }, diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index cc5e1d212ec0..3593900cbde4 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -249,6 +249,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { log.Infof(ctx, "outbox: calling FlowStream") } // The context used here escapes, so it has to be a background context. + // TODO(yuzefovich): the usage of the TODO context here is suspicious. + // Investigate this. m.stream, err = client.FlowStream(context.TODO()) if err != nil { if log.V(1) {