diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index f0df7903bf64..89c9be6da902 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -151,10 +151,6 @@ func TestOutboxInbox(t *testing.T) { // flowCtxCancel models a scenario in which the flow context of the // Inbox host is canceled which is an ungraceful shutdown. flowCtxCancel - // readerCtxCancel models a scenario in which the consumer of the Inbox - // cancels the context while the host's flow context is not canceled. - // This is considered a graceful termination. - readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks ) @@ -163,18 +159,15 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.2: + case randVal <= 0.25: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.4: + case randVal <= 0.5: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.6: + case randVal <= 0.75: cancellationScenario = flowCtxCancel cancellationScenarioName = "flowCtxCancel" - case randVal <= 0.8: - cancellationScenario = readerCtxCancel - cancellationScenarioName = "readerCtxCancel" default: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" @@ -247,34 +240,25 @@ func TestOutboxInbox(t *testing.T) { var ( flowCtxCanceled uint32 - // Because the outboxCtx must be a child of the flow context, we - // assume that if flowCtxCanceled is non-zero, then - // outboxCtxCanceled is too and don't check that explicitly. - outboxCtxCanceled uint32 - wg sync.WaitGroup + wg sync.WaitGroup ) wg.Add(1) go func() { - flowCtx, flowCtxCancelFn := context.WithCancel(context.Background()) + outboxFlowCtx, outboxFlowCtxCancelFn := context.WithCancel(context.Background()) flowCtxCancel := func() { atomic.StoreUint32(&flowCtxCanceled, 1) - flowCtxCancelFn() - } - outboxCtx, outboxCtxCancelFn := context.WithCancel(flowCtx) - outboxCtxCancel := func() { - atomic.StoreUint32(&outboxCtxCanceled, 1) - outboxCtxCancelFn() + outboxFlowCtxCancelFn() } inputMemAcc := testMemMonitor.MakeBoundAccount() - defer inputMemAcc.Close(outboxCtx) + defer inputMemAcc.Close(outboxFlowCtx) input := coldatatestutils.NewRandomDataOp( - colmem.NewAllocator(outboxCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args, + colmem.NewAllocator(outboxFlowCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args, ) outboxMemAcc := testMemMonitor.MakeBoundAccount() - defer outboxMemAcc.Close(outboxCtx) + defer outboxMemAcc.Close(outboxFlowCtx) outbox, err := NewOutbox( - colmem.NewAllocator(outboxCtx, &outboxMemAcc, coldata.StandardColumnFactory), + colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory), colexecargs.OpWithMetaInfo{Root: input}, typs, nil, /* getStats */ ) require.NoError(t, err) @@ -285,13 +269,12 @@ func TestOutboxInbox(t *testing.T) { // to create a context of the node on which the outbox runs and keep // it different from the streamCtx. This matters in // 'transportBreaks' scenario. - outbox.runnerCtx = outboxCtx - outbox.runWithStream(streamCtx, clientStream, flowCtxCancel, outboxCtxCancel) + outbox.runnerCtx = outboxFlowCtx + outbox.runWithStream(streamCtx, clientStream, flowCtxCancel) wg.Done() }() inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) - readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -303,8 +286,6 @@ func TestOutboxInbox(t *testing.T) { streamCancelFn() case flowCtxCancel: inboxFlowCtxCancelFn() - case readerCtxCancel: - readerCancelFn() case transportBreaks: err := conn.Close() // nolint:grpcconnclose require.NoError(t, err) @@ -314,9 +295,9 @@ func TestOutboxInbox(t *testing.T) { }() inboxMemAcc := testMemMonitor.MakeBoundAccount() - defer inboxMemAcc.Close(readerCtx) - inbox, err := NewInboxWithFlowCtxDone( - colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), + defer inboxMemAcc.Close(inboxFlowCtx) + inbox, err := NewInbox( + colmem.NewAllocator(inboxFlowCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), ) require.NoError(t, err) @@ -326,11 +307,11 @@ func TestOutboxInbox(t *testing.T) { // Use a deselector op to verify that the Outbox gets rid of the selection // vector. deselectorMemAcc := testMemMonitor.MakeBoundAccount() - defer deselectorMemAcc.Close(readerCtx) + defer deselectorMemAcc.Close(inboxFlowCtx) inputBatches := colexecutils.NewDeselectorOp( - colmem.NewAllocator(readerCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs, + colmem.NewAllocator(inboxFlowCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs, ) - inputBatches.Init(readerCtx) + inputBatches.Init(inboxFlowCtx) outputBatches := colexecop.NewBatchBuffer() var readerErr error for { @@ -338,7 +319,7 @@ func TestOutboxInbox(t *testing.T) { if err := colexecerror.CatchVectorizedRuntimeError(func() { // Note that it is ok that we call Init on every iteration - it // is a noop every time except for the first one. - inbox.Init(readerCtx) + inbox.Init(inboxFlowCtx) outputBatch = inbox.Next() }); err != nil { readerErr = err @@ -382,7 +363,6 @@ func TestOutboxInbox(t *testing.T) { // Verify that the Outbox terminated gracefully (did not cancel the // flow context). require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) // And the Inbox did as well. require.NoError(t, streamHandlerErr) require.NoError(t, readerErr) @@ -444,20 +424,6 @@ func TestOutboxInbox(t *testing.T) { // doesn't take place, so the flow context on the outbox side should // not be canceled. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) - case readerCtxCancel: - // If the reader context gets canceled while the inbox's host flow - // context doesn't, it is treated as a graceful termination of the - // stream, so we expect no error from the stream handler. - require.Nil(t, streamHandlerErr) - // The Inbox should still propagate this error upwards. - require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) - - // The cancellation should have been communicated to the Outbox, - // resulting in the watchdog goroutine canceling the outbox context - // (but not the flow). - require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case transportBreaks: // If the transport breaks, the scenario is very similar to // streamCtxCancel. gRPC will cancel the stream handler's context. @@ -496,14 +462,11 @@ func TestInboxHostCtxCancellation(t *testing.T) { }() // Simulate the "remote" node with a separate context. - outboxHostCtx, outboxHostCtxCancel := context.WithCancel(context.Background()) - // Derive a separate context for the outbox itself (this is what is done in - // Outbox.Run). - outboxCtx, outboxCtxCancel := context.WithCancel(outboxHostCtx) + outboxFlowCtx, outboxFlowCtxCancel := context.WithCancel(context.Background()) // Initiate the FlowStream RPC from the outbox. client := execinfrapb.NewDistSQLClient(conn) - clientStream, err := client.FlowStream(outboxCtx) + clientStream, err := client.FlowStream(outboxFlowCtx) require.NoError(t, err) // Create and run the outbox. @@ -513,16 +476,16 @@ func TestInboxHostCtxCancellation(t *testing.T) { typs := []*types.T{} outboxInput := colexecutils.NewFixedNumTuplesNoInputOp(testAllocator, 1 /* numTuples */, nil /* opToInitialize */) outboxMemAcc := testMemMonitor.MakeBoundAccount() - defer outboxMemAcc.Close(outboxHostCtx) + defer outboxMemAcc.Close(outboxFlowCtx) outbox, err := NewOutbox( - colmem.NewAllocator(outboxHostCtx, &outboxMemAcc, coldata.StandardColumnFactory), + colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory), colexecargs.OpWithMetaInfo{Root: outboxInput}, typs, nil, /* getStats */ ) require.NoError(t, err) var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(outboxCtx, clientStream, outboxHostCtxCancel, outboxCtxCancel) + outbox.runWithStream(outboxFlowCtx, clientStream, outboxFlowCtxCancel) wg.Done() }() @@ -530,7 +493,7 @@ func TestInboxHostCtxCancellation(t *testing.T) { inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background()) inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(inboxHostCtx) - inbox, err := NewInboxWithFlowCtxDone( + inbox, err := NewInbox( colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxHostCtx.Done(), ) @@ -704,12 +667,12 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) var ( - flowCanceled, outboxCanceled uint32 - wg sync.WaitGroup + flowCanceled uint32 + wg sync.WaitGroup ) wg.Add(1) go func() { @@ -717,7 +680,6 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { ctx, clientStream, func() { atomic.StoreUint32(&flowCanceled, 1) }, - func() { atomic.StoreUint32(&outboxCanceled, 1) }, ) wg.Done() }() @@ -729,10 +691,8 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { wg.Wait() require.NoError(t, <-streamHanderErrCh) - // Require that the outbox did not cancel the flow and did cancel - // the outbox since this is a graceful drain. + // Require that the outbox did not cancel the flow. require.True(t, atomic.LoadUint32(&flowCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCanceled) == 1) // Verify that we received the expected metadata. if tc.verifyExpectedMetadata != nil { @@ -787,13 +747,13 @@ func BenchmarkOutboxInbox(b *testing.B) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(b, err) var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */) wg.Done() }() @@ -912,7 +872,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) { typs := []*types.T{types.Int} - inbox, err := NewInbox(testAllocator, typs, streamID) + inbox, err := NewInbox(testAllocator, typs, streamID, ctx.Done()) require.NoError(t, err) ctxExtract := make(chan struct{}) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index ad90a99cbafb..7fb81de05795 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -48,10 +48,8 @@ type flowStreamServer interface { // to the inbox. Next may be called before RunWithStream, it will just block // until the stream is made available or its context is canceled. Note that // ownership of the stream is passed from the RunWithStream goroutine to the -// Next goroutine. In exchange, the RunWithStream goroutine receives the first -// context passed into Next and listens for cancellation. Returning from -// RunWithStream (or more specifically, the RPC handler) will unblock Next by -// closing the stream. +// Next goroutine. Returning from RunWithStream (or more specifically, the RPC +// handler) will unblock Next by closing the stream. type Inbox struct { colexecop.ZeroInputNode colexecop.InitHelper @@ -69,10 +67,9 @@ type Inbox struct { // streamCh is the channel over which the stream is passed from the stream // handler to the reader goroutine. streamCh chan flowStreamServer - // contextCh is the channel over which the reader goroutine passes the - // context to the stream handler so that it can listen for context - // cancellation. - contextCh chan context.Context + // waitForReaderCh is the channel which the reader goroutine closes when it + // arrives. This allows for the handler goroutine to wait for the reader. + waitForReaderCh chan struct{} // timeoutCh is the channel over which an error will be sent if the reader // goroutine should exit while waiting for a stream. @@ -132,7 +129,10 @@ var _ colexecop.Operator = &Inbox{} // NewInbox creates a new Inbox. func NewInbox( - allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, ) (*Inbox, error) { c, err := colserde.NewArrowBatchConverter(typs) if err != nil { @@ -149,8 +149,9 @@ func NewInbox( serializer: s, streamID: streamID, streamCh: make(chan flowStreamServer, 1), - contextCh: make(chan context.Context, 1), + waitForReaderCh: make(chan struct{}), timeoutCh: make(chan error, 1), + flowCtxDone: flowCtxDone, errCh: make(chan error, 1), deserializationStopWatch: timeutil.NewStopWatch(), } @@ -158,22 +159,6 @@ func NewInbox( return i, nil } -// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow -// context is available. -func NewInboxWithFlowCtxDone( - allocator *colmem.Allocator, - typs []*types.T, - streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, -) (*Inbox, error) { - i, err := NewInbox(allocator, typs, streamID) - if err != nil { - return nil, err - } - i.flowCtxDone = flowCtxDone - return i, nil -} - // NewInboxWithAdmissionControl creates a new Inbox that does admission // control on responses received from DistSQL. func NewInboxWithAdmissionControl( @@ -184,7 +169,7 @@ func NewInboxWithAdmissionControl( admissionQ *admission.WorkQueue, admissionInfo admission.WorkInfo, ) (*Inbox, error) { - i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) + i, err := NewInbox(allocator, typs, streamID, flowCtxDone) if err != nil { return nil, err } @@ -205,34 +190,22 @@ func (i *Inbox) close() { } } -// checkFlowCtxCancellation returns an error if the flow context has already -// been canceled. -func (i *Inbox) checkFlowCtxCancellation() error { - select { - case <-i.flowCtxDone: - return cancelchecker.QueryCanceledError - default: - return nil - } -} - // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, the Inbox's host cancels the flow context, a caller of Next cancels -// the context passed into Init, or any error is encountered on the stream by -// the Next goroutine. +// canceled, the Inbox's host cancels the flow context, or any error is +// encountered on the stream by the Next goroutine (which includes io.EOF that +// indicates a graceful shutdown - CloseSend() called by the outbox). func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") defer log.VEvent(streamCtx, 2, "Inbox exited stream handler") - // Pass the stream to the reader goroutine (non-blocking) and get the - // context to listen for cancellation. + // Pass the stream to the reader goroutine (non-blocking) and wait for the + // reader to arrive. i.streamCh <- stream - var readerCtx context.Context select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err - case readerCtx = <-i.contextCh: + case <-i.waitForReaderCh: log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)") @@ -255,12 +228,6 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. return cancelchecker.QueryCanceledError - case <-readerCtx.Done(): - // readerCtx is canceled, but we don't know whether it was because the - // flow context was canceled or for other reason. In the former case we - // have an ungraceful shutdown whereas in the latter case we have a - // graceful one. - return i.checkFlowCtxCancellation() case <-streamCtx.Done(): // The client canceled the stream. return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)") @@ -293,22 +260,12 @@ func (i *Inbox) Init(ctx context.Context) { case <-i.flowCtxDone: i.errCh <- cancelchecker.QueryCanceledError return cancelchecker.QueryCanceledError - case <-i.Ctx.Done(): - // errToThrow is propagated to the reader of the Inbox. - errToThrow := i.Ctx.Err() - if err := i.checkFlowCtxCancellation(); err != nil { - // This is an ungraceful termination because the flow context - // has been canceled. - i.errCh <- err - errToThrow = err - } - return errToThrow } if i.ctxInterceptorFn != nil { i.ctxInterceptorFn(i.Ctx) } - i.contextCh <- i.Ctx + close(i.waitForReaderCh) return nil }(); err != nil { // An error occurred while initializing the Inbox and is likely caused @@ -321,8 +278,8 @@ func (i *Inbox) Init(ctx context.Context) { } // Next returns the next batch. It will block until there is data available. -// The Inbox will exit when either the context passed in Init() is canceled or -// when DrainMeta goroutine tells it to do so. +// The Inbox will exit when either the flow context is canceled or when +// DrainMeta goroutine tells it to do so. func (i *Inbox) Next() coldata.Batch { if i.done { return coldata.ZeroBatch diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 03c0d4cdec65..95c8a238a797 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -61,26 +61,24 @@ func TestInboxCancellation(t *testing.T) { typs := []*types.T{types.Int} t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) { - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) // Cancel the context. cancelFn() - // Init should encounter an error if the context is canceled. + // Init should encounter an error if the flow context is canceled. err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) }) - require.True(t, testutils.IsError(err, "context canceled"), err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) // Now, the remote stream arrives. err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}) - // We expect no error from the stream handler since we canceled it - // ourselves (a graceful termination). - require.Nil(t, err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) }) t.Run("DuringRecv", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) // Setup reader and stream. go func() { @@ -98,12 +96,10 @@ func TestInboxCancellation(t *testing.T) { // Now wait for the Inbox to call Recv on the stream. <-recvCalled - // Cancel the context. + // Cancel the flow context. cancelFn() err = <-streamHandlerErrCh - // Reader context cancellation is a graceful termination, so no error - // should be returned. - require.Nil(t, err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) // The mock RPC layer does not unblock the Recv for us on the server side, // so manually send an io.EOF to the reader goroutine. @@ -112,15 +108,14 @@ func TestInboxCancellation(t *testing.T) { t.Run("StreamHandlerWaitingForReader", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) - ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) cancelFn() // A stream arrives but there is no reader. - err = <-handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.client.csChan) }) - require.True(t, testutils.IsError(err, "while waiting for reader"), err) + err = <-handleStream(context.Background(), inbox, rpcLayer.server, func() { close(rpcLayer.client.csChan) }) + require.True(t, testutils.IsError(err, "query execution canceled"), err) }) } @@ -130,7 +125,7 @@ func TestInboxCancellation(t *testing.T) { func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) { defer leaktest.AfterTest(t)() - inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0), nil /* flowCtxDone */) require.NoError(t, err) rpcLayer := makeMockFlowStreamRPCLayer() @@ -160,7 +155,7 @@ func TestInboxTimeout(t *testing.T) { ctx := context.Background() - inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) var ( @@ -266,7 +261,7 @@ func TestInboxShutdown(t *testing.T) { inboxCtx, inboxCancel := context.WithCancel(context.Background()) inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(inboxCtx) - inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxCtx.Done()) require.NoError(t, err) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) @@ -434,7 +429,7 @@ func TestInboxShutdown(t *testing.T) { for i, errCh := range errChans { for err := <-errCh; err != nil; err = <-errCh { - if !testutils.IsError(err, "context canceled|artificial timeout") { + if !testutils.IsError(err, "context canceled|artificial timeout|query execution canceled") { // Error to keep on draining errors but mark this test as failed. t.Errorf("unexpected error %v from %s goroutine", err, goroutines[goroutineIndices[i]].name) } diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index d593e2e10595..0ea552671aa4 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -128,8 +128,8 @@ func (o *Outbox) close(ctx context.Context) { // coldata.Batches over the stream after sending a header with the provided flow // and stream ID. Note that an extra goroutine is spawned so that Recv may be // called concurrently wrt the Send goroutine to listen for drain signals. -// If an io.EOF is received while sending, the outbox will cancel all components -// from the same tree as the outbox. +// If an io.EOF is received while sending, this indicates the graceful +// completion of the FlowStream RPC, so the outbox simply exits. // If non-io.EOF is received while sending, the outbox will call flowCtxCancel // to shutdown all parts of the flow on this node. // If an error is encountered that cannot be sent over the stream, the error @@ -140,8 +140,8 @@ func (o *Outbox) close(ctx context.Context) { // metadata sources, send the metadata, and then call CloseSend on the // stream. The Outbox will wait until its Recv goroutine receives a non-nil // error to not leak resources. -// 2) A cancellation happened. This can come from the provided context or the -// remote reader. Refer to tests for expected behavior. +// 2) A cancellation happened. This can come from the provided flow context or +// the remote reader. Refer to tests for expected behavior. // 3) A drain signal was received from the server (consumer). In this case, the // Outbox goes through the same steps as 1). func (o *Outbox) Run( @@ -153,14 +153,6 @@ func (o *Outbox) Run( flowCtxCancel context.CancelFunc, connectionTimeout time.Duration, ) { - // Derive a child context so that we can cancel all components rooted in - // this outbox. - var outboxCtxCancel context.CancelFunc - ctx, outboxCtxCancel = context.WithCancel(ctx) - // Calling outboxCtxCancel is not strictly necessary, but we do it just to - // be safe. - defer outboxCtxCancel() - ctx, o.span = execinfra.ProcessorSpan(ctx, "outbox") if o.span != nil { defer o.span.Finish() @@ -213,23 +205,20 @@ func (o *Outbox) Run( } log.VEvent(ctx, 2, "Outbox starting normal operation") - o.runWithStream(ctx, stream, flowCtxCancel, outboxCtxCancel) + o.runWithStream(ctx, stream, flowCtxCancel) log.VEvent(ctx, 2, "Outbox exiting") } -// handleStreamErr is a utility method used to handle an error when calling -// a method on a flowStreamClient. If err is an io.EOF, outboxCtxCancel is -// called, for all other errors flowCtxCancel is. The given error is logged with -// the associated opName. +// handleStreamErr is a utility method used to handle an error when calling a +// method on a flowStreamClient. If err is an io.EOF, it is a graceful +// completion of the FlowStream RPC, so nothing is done, for all other errors +// flowCtxCancel is called. The given error is logged with the associated +// opName. func handleStreamErr( - ctx context.Context, - opName redact.SafeString, - err error, - flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, opName redact.SafeString, err error, flowCtxCancel context.CancelFunc, ) { if err == io.EOF { log.VEventf(ctx, 2, "Outbox calling outboxCtxCancel after %s EOF", opName) - outboxCtxCancel() } else { log.VEventf(ctx, 1, "Outbox calling flowCtxCancel after %s connection error: %+v", opName, err) flowCtxCancel() @@ -261,9 +250,9 @@ func (o *Outbox) moveToDraining(ctx context.Context, reason redact.RedactableStr // NOTE: if non-io.EOF error is encountered (indicating ungraceful shutdown // of the stream), flowCtxCancel will be called. If an io.EOF is encountered // (indicating a graceful shutdown initiated by the remote Inbox), -// outboxCtxCancel will be called. +// flowCtxCancel will **not** be called. func (o *Outbox) sendBatches( - ctx context.Context, stream flowStreamClient, flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, stream flowStreamClient, flowCtxCancel context.CancelFunc, ) (terminatedGracefully bool, errToSend error) { if o.runnerCtx == nil { // In the non-testing path, runnerCtx has been set in Run() method; @@ -313,7 +302,7 @@ func (o *Outbox) sendBatches( // soon as the message is written to the control buffer. The message is // marshaled (bytes are copied) before writing. if err := stream.Send(o.scratch.msg); err != nil { - handleStreamErr(ctx, "Send (batches)", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "Send (batches)", err, flowCtxCancel) return } } @@ -362,16 +351,13 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT // runWithStream should be called after sending the ProducerHeader on the // stream. It implements the behavior described in Run. func (o *Outbox) runWithStream( - ctx context.Context, stream flowStreamClient, flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, stream flowStreamClient, flowCtxCancel context.CancelFunc, ) { - // Cancellation functions might be nil in some tests, but we'll make them - // noops for convenience. + // flowCtxCancel might be nil in some tests, but we'll make it a noop for + // convenience. if flowCtxCancel == nil { flowCtxCancel = func() {} } - if outboxCtxCancel == nil { - outboxCtxCancel = func() {} - } waitCh := make(chan struct{}) go func() { // This goroutine's job is to listen continually on the stream from the @@ -384,14 +370,13 @@ func (o *Outbox) runWithStream( // forever after a connection is closed, since it wouldn't notice a // closed connection until it tried to Send over that connection. // - // Similarly, if an io.EOF error is received, it indicates that the - // server side of FlowStream RPC (the inbox) has exited gracefully, so - // the inbox doesn't need anything else from this outbox, and this - // goroutine will shut down the tree of operators rooted in this outbox. + // If an io.EOF error is received, it indicates that the server side of + // FlowStream RPC (the inbox) has exited gracefully, so nothing needs to + // be done on the outbox side anymore. for { msg, err := stream.Recv() if err != nil { - handleStreamErr(ctx, "watchdog Recv", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "watchdog Recv", err, flowCtxCancel) break } switch { @@ -405,7 +390,7 @@ func (o *Outbox) runWithStream( close(waitCh) }() - terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel) + terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel) if terminatedGracefully || errToSend != nil { var reason redact.RedactableString if errToSend != nil { @@ -415,14 +400,14 @@ func (o *Outbox) runWithStream( } o.moveToDraining(ctx, reason) if err := o.sendMetadata(ctx, stream, errToSend); err != nil { - handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel) } else { // Close the stream. Note that if this block isn't reached, the stream // is unusable. // The receiver goroutine will read from the stream until any error // is returned (most likely an io.EOF). if err := stream.CloseSend(); err != nil { - handleStreamErr(ctx, "CloseSend", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "CloseSend", err, flowCtxCancel) } } } diff --git a/pkg/sql/colflow/colrpc/outbox_test.go b/pkg/sql/colflow/colrpc/outbox_test.go index ec9bf627c2b4..186d015ea4ff 100644 --- a/pkg/sql/colflow/colrpc/outbox_test.go +++ b/pkg/sql/colflow/colrpc/outbox_test.go @@ -52,13 +52,13 @@ func TestOutboxCatchesPanics(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) wg.Done() }() inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) streamHandlerErrCh := handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) }) @@ -132,7 +132,7 @@ func TestOutboxDrainsMetadataSources(t *testing.T) { // Close the csChan to unblock the Recv goroutine (we don't need it for this // test). close(rpcLayer.client.csChan) - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) require.True(t, atomic.LoadUint32(sourceDrained) == 1) }) @@ -149,7 +149,7 @@ func TestOutboxDrainsMetadataSources(t *testing.T) { require.NoError(t, err) close(rpcLayer.client.csChan) - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) require.True(t, atomic.LoadUint32(sourceDrained) == 1) }) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 80153b37f17f..fdb4f4137bf9 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -537,10 +537,6 @@ type vectorizedFlowCreator struct { // numOutboxes counts how many colrpc.Outbox'es have been set up on this // node. It must be accessed atomically. numOutboxes int32 - // numOutboxesExited is an atomic that keeps track of how many outboxes have - // exited. When numOutboxesExited equals numOutboxes, the cancellation - // function for the flow is called on the non-gateway nodes. - numOutboxesExited int32 // numOutboxesDrained is an atomic that keeps track of how many outboxes // have been drained. When numOutboxesDrained equals numOutboxes, flow-level // metadata is added to a flow-level span on the non-gateway nodes. @@ -696,17 +692,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( flowCtxCancel, flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), ) - // When the last Outbox on this node exits, we want to make sure that - // everything is shutdown; namely, we need to call cancelFn if: - // - it is the last Outbox - // - the node is not the gateway (there is a flow coordinator on the - // gateway that will take care of the cancellation itself) - // - cancelFn is non-nil (it can be nil in tests). - // Calling cancelFn will cancel the context that all infrastructure on this - // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode && flowCtxCancel != nil { - flowCtxCancel() - } } s.accumulateAsyncComponent(run) return outbox, nil diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index d08308dfef3a..27e15ceba220 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -218,7 +218,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { for i := 0; i < numInboxes; i++ { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxLocal) - inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID)) + inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), ctxLocal.Done()) require.NoError(t, err) inboxes = append(inboxes, inbox) synchronizerInputs = append( @@ -330,7 +330,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { // Add another "remote" node to the flow. inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxAnotherRemote) - inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID)) + inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), ctxAnotherRemote.Done()) require.NoError(t, err) inboxes = append(inboxes, inbox) outboxMemAccount := testMemMonitor.MakeBoundAccount() @@ -449,9 +449,9 @@ func TestVectorizedFlowShutdown(t *testing.T) { for i := range inboxes { err = <-handleStreamErrCh[i] - // We either should get no error or a context cancellation error. + // We either should get no error or a cancellation error. if err != nil { - require.True(t, testutils.IsError(err, "context canceled"), err) + require.True(t, testutils.IsError(err, "canceled"), err) } } wg.Wait() diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 177a3f20b92b..563581a379f7 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -211,7 +211,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { return colrpc.NewOutbox(allocator, input, typs, nil /* getStats */) }, newInboxFn: func(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) { - inbox, err := colrpc.NewInbox(allocator, typs, streamID) + inbox, err := colrpc.NewInbox(allocator, typs, streamID, nil /* flowCtxDone */) inboxToNumInputTypes[inbox] = typs return inbox, err }, diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index cdf2e3a20dfd..f0e61ab5518b 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -232,6 +232,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { log.Infof(ctx, "outbox: calling FlowStream") } // The context used here escapes, so it has to be a background context. + // TODO(yuzefovich): the usage of the TODO context here is suspicious. + // Investigate this. m.stream, err = client.FlowStream(context.TODO()) if err != nil { if log.V(1) {