From 410044692eaec04d622548869bac20e6c99cd921 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 20 Dec 2021 18:54:40 -0800 Subject: [PATCH] colflow: fix the shutdown for good This commit fixes a long standing bug in the distributed vectorized query shutdown where in case of a graceful completion of the flow on one node, we might get an error on another node resulting in the ungraceful termination of the query. This was caused by the fact that on remote nodes the last outbox to exit would cancel the flow context; however, when instantiating `FlowStream` RPC the outboxes used a child context of the flow context, so that "graceful" cancellation of the flow context would cause the inbox to get an ungraceful termination of the gRPC stream. As a result, the whole query could get "context canceled" error. I believe this bug was introduced by me over two years ago because I didn't fully understand how the shutdown should work, and in particular I was missing that when an inbox observes the flow context cancellation, it should terminate the `FlowStream` RPC ungracefully in order to propagate the ungracefullness to the other side of the stream. This shortcoming was fixed in the previous commit. The shutdown protocol is now as follows: - on the gateway node we keep on canceling the flow context at the very end of the query execution. It doesn't matter whether the query resulted in an error or not, and doing so allows us to ensure that everything exits on the gateway node. This behavior is already present. - due to the fix in a previous commit, that flow context cancellation terminates ungracefully all still open gRPC streams for `FlowStream` RPC for which the gateway node is the inbox host. - the outboxes on the remote nodes get the ungraceful termination and cancel the flow context of their hosts. This, in turn, would trigger propagation of the ungraceful termination on other gRPC streams, etc. Release note (bug fix): Previously, CockroachDB could return a spurious "context canceled" error for a query that actually succeeded in extremely rare cases, and this has now been fixed. --- pkg/sql/colflow/colrpc/colrpc_test.go | 104 ++++++------------ pkg/sql/colflow/colrpc/inbox.go | 85 ++++---------- pkg/sql/colflow/colrpc/inbox_test.go | 39 +++---- pkg/sql/colflow/colrpc/outbox.go | 63 ++++------- pkg/sql/colflow/colrpc/outbox_test.go | 8 +- pkg/sql/colflow/vectorized_flow.go | 15 --- .../colflow/vectorized_flow_shutdown_test.go | 8 +- pkg/sql/colflow/vectorized_flow_test.go | 2 +- pkg/sql/flowinfra/outbox.go | 2 + 9 files changed, 105 insertions(+), 221 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index f0df7903bf64..89c9be6da902 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -151,10 +151,6 @@ func TestOutboxInbox(t *testing.T) { // flowCtxCancel models a scenario in which the flow context of the // Inbox host is canceled which is an ungraceful shutdown. flowCtxCancel - // readerCtxCancel models a scenario in which the consumer of the Inbox - // cancels the context while the host's flow context is not canceled. - // This is considered a graceful termination. - readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks ) @@ -163,18 +159,15 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.2: + case randVal <= 0.25: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.4: + case randVal <= 0.5: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.6: + case randVal <= 0.75: cancellationScenario = flowCtxCancel cancellationScenarioName = "flowCtxCancel" - case randVal <= 0.8: - cancellationScenario = readerCtxCancel - cancellationScenarioName = "readerCtxCancel" default: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" @@ -247,34 +240,25 @@ func TestOutboxInbox(t *testing.T) { var ( flowCtxCanceled uint32 - // Because the outboxCtx must be a child of the flow context, we - // assume that if flowCtxCanceled is non-zero, then - // outboxCtxCanceled is too and don't check that explicitly. - outboxCtxCanceled uint32 - wg sync.WaitGroup + wg sync.WaitGroup ) wg.Add(1) go func() { - flowCtx, flowCtxCancelFn := context.WithCancel(context.Background()) + outboxFlowCtx, outboxFlowCtxCancelFn := context.WithCancel(context.Background()) flowCtxCancel := func() { atomic.StoreUint32(&flowCtxCanceled, 1) - flowCtxCancelFn() - } - outboxCtx, outboxCtxCancelFn := context.WithCancel(flowCtx) - outboxCtxCancel := func() { - atomic.StoreUint32(&outboxCtxCanceled, 1) - outboxCtxCancelFn() + outboxFlowCtxCancelFn() } inputMemAcc := testMemMonitor.MakeBoundAccount() - defer inputMemAcc.Close(outboxCtx) + defer inputMemAcc.Close(outboxFlowCtx) input := coldatatestutils.NewRandomDataOp( - colmem.NewAllocator(outboxCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args, + colmem.NewAllocator(outboxFlowCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args, ) outboxMemAcc := testMemMonitor.MakeBoundAccount() - defer outboxMemAcc.Close(outboxCtx) + defer outboxMemAcc.Close(outboxFlowCtx) outbox, err := NewOutbox( - colmem.NewAllocator(outboxCtx, &outboxMemAcc, coldata.StandardColumnFactory), + colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory), colexecargs.OpWithMetaInfo{Root: input}, typs, nil, /* getStats */ ) require.NoError(t, err) @@ -285,13 +269,12 @@ func TestOutboxInbox(t *testing.T) { // to create a context of the node on which the outbox runs and keep // it different from the streamCtx. This matters in // 'transportBreaks' scenario. - outbox.runnerCtx = outboxCtx - outbox.runWithStream(streamCtx, clientStream, flowCtxCancel, outboxCtxCancel) + outbox.runnerCtx = outboxFlowCtx + outbox.runWithStream(streamCtx, clientStream, flowCtxCancel) wg.Done() }() inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) - readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -303,8 +286,6 @@ func TestOutboxInbox(t *testing.T) { streamCancelFn() case flowCtxCancel: inboxFlowCtxCancelFn() - case readerCtxCancel: - readerCancelFn() case transportBreaks: err := conn.Close() // nolint:grpcconnclose require.NoError(t, err) @@ -314,9 +295,9 @@ func TestOutboxInbox(t *testing.T) { }() inboxMemAcc := testMemMonitor.MakeBoundAccount() - defer inboxMemAcc.Close(readerCtx) - inbox, err := NewInboxWithFlowCtxDone( - colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), + defer inboxMemAcc.Close(inboxFlowCtx) + inbox, err := NewInbox( + colmem.NewAllocator(inboxFlowCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), ) require.NoError(t, err) @@ -326,11 +307,11 @@ func TestOutboxInbox(t *testing.T) { // Use a deselector op to verify that the Outbox gets rid of the selection // vector. deselectorMemAcc := testMemMonitor.MakeBoundAccount() - defer deselectorMemAcc.Close(readerCtx) + defer deselectorMemAcc.Close(inboxFlowCtx) inputBatches := colexecutils.NewDeselectorOp( - colmem.NewAllocator(readerCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs, + colmem.NewAllocator(inboxFlowCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs, ) - inputBatches.Init(readerCtx) + inputBatches.Init(inboxFlowCtx) outputBatches := colexecop.NewBatchBuffer() var readerErr error for { @@ -338,7 +319,7 @@ func TestOutboxInbox(t *testing.T) { if err := colexecerror.CatchVectorizedRuntimeError(func() { // Note that it is ok that we call Init on every iteration - it // is a noop every time except for the first one. - inbox.Init(readerCtx) + inbox.Init(inboxFlowCtx) outputBatch = inbox.Next() }); err != nil { readerErr = err @@ -382,7 +363,6 @@ func TestOutboxInbox(t *testing.T) { // Verify that the Outbox terminated gracefully (did not cancel the // flow context). require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) // And the Inbox did as well. require.NoError(t, streamHandlerErr) require.NoError(t, readerErr) @@ -444,20 +424,6 @@ func TestOutboxInbox(t *testing.T) { // doesn't take place, so the flow context on the outbox side should // not be canceled. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) - case readerCtxCancel: - // If the reader context gets canceled while the inbox's host flow - // context doesn't, it is treated as a graceful termination of the - // stream, so we expect no error from the stream handler. - require.Nil(t, streamHandlerErr) - // The Inbox should still propagate this error upwards. - require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) - - // The cancellation should have been communicated to the Outbox, - // resulting in the watchdog goroutine canceling the outbox context - // (but not the flow). - require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case transportBreaks: // If the transport breaks, the scenario is very similar to // streamCtxCancel. gRPC will cancel the stream handler's context. @@ -496,14 +462,11 @@ func TestInboxHostCtxCancellation(t *testing.T) { }() // Simulate the "remote" node with a separate context. - outboxHostCtx, outboxHostCtxCancel := context.WithCancel(context.Background()) - // Derive a separate context for the outbox itself (this is what is done in - // Outbox.Run). - outboxCtx, outboxCtxCancel := context.WithCancel(outboxHostCtx) + outboxFlowCtx, outboxFlowCtxCancel := context.WithCancel(context.Background()) // Initiate the FlowStream RPC from the outbox. client := execinfrapb.NewDistSQLClient(conn) - clientStream, err := client.FlowStream(outboxCtx) + clientStream, err := client.FlowStream(outboxFlowCtx) require.NoError(t, err) // Create and run the outbox. @@ -513,16 +476,16 @@ func TestInboxHostCtxCancellation(t *testing.T) { typs := []*types.T{} outboxInput := colexecutils.NewFixedNumTuplesNoInputOp(testAllocator, 1 /* numTuples */, nil /* opToInitialize */) outboxMemAcc := testMemMonitor.MakeBoundAccount() - defer outboxMemAcc.Close(outboxHostCtx) + defer outboxMemAcc.Close(outboxFlowCtx) outbox, err := NewOutbox( - colmem.NewAllocator(outboxHostCtx, &outboxMemAcc, coldata.StandardColumnFactory), + colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory), colexecargs.OpWithMetaInfo{Root: outboxInput}, typs, nil, /* getStats */ ) require.NoError(t, err) var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(outboxCtx, clientStream, outboxHostCtxCancel, outboxCtxCancel) + outbox.runWithStream(outboxFlowCtx, clientStream, outboxFlowCtxCancel) wg.Done() }() @@ -530,7 +493,7 @@ func TestInboxHostCtxCancellation(t *testing.T) { inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background()) inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(inboxHostCtx) - inbox, err := NewInboxWithFlowCtxDone( + inbox, err := NewInbox( colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxHostCtx.Done(), ) @@ -704,12 +667,12 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) var ( - flowCanceled, outboxCanceled uint32 - wg sync.WaitGroup + flowCanceled uint32 + wg sync.WaitGroup ) wg.Add(1) go func() { @@ -717,7 +680,6 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { ctx, clientStream, func() { atomic.StoreUint32(&flowCanceled, 1) }, - func() { atomic.StoreUint32(&outboxCanceled, 1) }, ) wg.Done() }() @@ -729,10 +691,8 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { wg.Wait() require.NoError(t, <-streamHanderErrCh) - // Require that the outbox did not cancel the flow and did cancel - // the outbox since this is a graceful drain. + // Require that the outbox did not cancel the flow. require.True(t, atomic.LoadUint32(&flowCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCanceled) == 1) // Verify that we received the expected metadata. if tc.verifyExpectedMetadata != nil { @@ -787,13 +747,13 @@ func BenchmarkOutboxInbox(b *testing.B) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(b, err) var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */) wg.Done() }() @@ -912,7 +872,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) { typs := []*types.T{types.Int} - inbox, err := NewInbox(testAllocator, typs, streamID) + inbox, err := NewInbox(testAllocator, typs, streamID, ctx.Done()) require.NoError(t, err) ctxExtract := make(chan struct{}) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index ad90a99cbafb..7fb81de05795 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -48,10 +48,8 @@ type flowStreamServer interface { // to the inbox. Next may be called before RunWithStream, it will just block // until the stream is made available or its context is canceled. Note that // ownership of the stream is passed from the RunWithStream goroutine to the -// Next goroutine. In exchange, the RunWithStream goroutine receives the first -// context passed into Next and listens for cancellation. Returning from -// RunWithStream (or more specifically, the RPC handler) will unblock Next by -// closing the stream. +// Next goroutine. Returning from RunWithStream (or more specifically, the RPC +// handler) will unblock Next by closing the stream. type Inbox struct { colexecop.ZeroInputNode colexecop.InitHelper @@ -69,10 +67,9 @@ type Inbox struct { // streamCh is the channel over which the stream is passed from the stream // handler to the reader goroutine. streamCh chan flowStreamServer - // contextCh is the channel over which the reader goroutine passes the - // context to the stream handler so that it can listen for context - // cancellation. - contextCh chan context.Context + // waitForReaderCh is the channel which the reader goroutine closes when it + // arrives. This allows for the handler goroutine to wait for the reader. + waitForReaderCh chan struct{} // timeoutCh is the channel over which an error will be sent if the reader // goroutine should exit while waiting for a stream. @@ -132,7 +129,10 @@ var _ colexecop.Operator = &Inbox{} // NewInbox creates a new Inbox. func NewInbox( - allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, ) (*Inbox, error) { c, err := colserde.NewArrowBatchConverter(typs) if err != nil { @@ -149,8 +149,9 @@ func NewInbox( serializer: s, streamID: streamID, streamCh: make(chan flowStreamServer, 1), - contextCh: make(chan context.Context, 1), + waitForReaderCh: make(chan struct{}), timeoutCh: make(chan error, 1), + flowCtxDone: flowCtxDone, errCh: make(chan error, 1), deserializationStopWatch: timeutil.NewStopWatch(), } @@ -158,22 +159,6 @@ func NewInbox( return i, nil } -// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow -// context is available. -func NewInboxWithFlowCtxDone( - allocator *colmem.Allocator, - typs []*types.T, - streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, -) (*Inbox, error) { - i, err := NewInbox(allocator, typs, streamID) - if err != nil { - return nil, err - } - i.flowCtxDone = flowCtxDone - return i, nil -} - // NewInboxWithAdmissionControl creates a new Inbox that does admission // control on responses received from DistSQL. func NewInboxWithAdmissionControl( @@ -184,7 +169,7 @@ func NewInboxWithAdmissionControl( admissionQ *admission.WorkQueue, admissionInfo admission.WorkInfo, ) (*Inbox, error) { - i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) + i, err := NewInbox(allocator, typs, streamID, flowCtxDone) if err != nil { return nil, err } @@ -205,34 +190,22 @@ func (i *Inbox) close() { } } -// checkFlowCtxCancellation returns an error if the flow context has already -// been canceled. -func (i *Inbox) checkFlowCtxCancellation() error { - select { - case <-i.flowCtxDone: - return cancelchecker.QueryCanceledError - default: - return nil - } -} - // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, the Inbox's host cancels the flow context, a caller of Next cancels -// the context passed into Init, or any error is encountered on the stream by -// the Next goroutine. +// canceled, the Inbox's host cancels the flow context, or any error is +// encountered on the stream by the Next goroutine (which includes io.EOF that +// indicates a graceful shutdown - CloseSend() called by the outbox). func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") defer log.VEvent(streamCtx, 2, "Inbox exited stream handler") - // Pass the stream to the reader goroutine (non-blocking) and get the - // context to listen for cancellation. + // Pass the stream to the reader goroutine (non-blocking) and wait for the + // reader to arrive. i.streamCh <- stream - var readerCtx context.Context select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err - case readerCtx = <-i.contextCh: + case <-i.waitForReaderCh: log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)") @@ -255,12 +228,6 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. return cancelchecker.QueryCanceledError - case <-readerCtx.Done(): - // readerCtx is canceled, but we don't know whether it was because the - // flow context was canceled or for other reason. In the former case we - // have an ungraceful shutdown whereas in the latter case we have a - // graceful one. - return i.checkFlowCtxCancellation() case <-streamCtx.Done(): // The client canceled the stream. return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)") @@ -293,22 +260,12 @@ func (i *Inbox) Init(ctx context.Context) { case <-i.flowCtxDone: i.errCh <- cancelchecker.QueryCanceledError return cancelchecker.QueryCanceledError - case <-i.Ctx.Done(): - // errToThrow is propagated to the reader of the Inbox. - errToThrow := i.Ctx.Err() - if err := i.checkFlowCtxCancellation(); err != nil { - // This is an ungraceful termination because the flow context - // has been canceled. - i.errCh <- err - errToThrow = err - } - return errToThrow } if i.ctxInterceptorFn != nil { i.ctxInterceptorFn(i.Ctx) } - i.contextCh <- i.Ctx + close(i.waitForReaderCh) return nil }(); err != nil { // An error occurred while initializing the Inbox and is likely caused @@ -321,8 +278,8 @@ func (i *Inbox) Init(ctx context.Context) { } // Next returns the next batch. It will block until there is data available. -// The Inbox will exit when either the context passed in Init() is canceled or -// when DrainMeta goroutine tells it to do so. +// The Inbox will exit when either the flow context is canceled or when +// DrainMeta goroutine tells it to do so. func (i *Inbox) Next() coldata.Batch { if i.done { return coldata.ZeroBatch diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 03c0d4cdec65..95c8a238a797 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -61,26 +61,24 @@ func TestInboxCancellation(t *testing.T) { typs := []*types.T{types.Int} t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) { - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) // Cancel the context. cancelFn() - // Init should encounter an error if the context is canceled. + // Init should encounter an error if the flow context is canceled. err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) }) - require.True(t, testutils.IsError(err, "context canceled"), err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) // Now, the remote stream arrives. err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}) - // We expect no error from the stream handler since we canceled it - // ourselves (a graceful termination). - require.Nil(t, err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) }) t.Run("DuringRecv", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) // Setup reader and stream. go func() { @@ -98,12 +96,10 @@ func TestInboxCancellation(t *testing.T) { // Now wait for the Inbox to call Recv on the stream. <-recvCalled - // Cancel the context. + // Cancel the flow context. cancelFn() err = <-streamHandlerErrCh - // Reader context cancellation is a graceful termination, so no error - // should be returned. - require.Nil(t, err) + require.True(t, testutils.IsError(err, "query execution canceled"), err) // The mock RPC layer does not unblock the Recv for us on the server side, // so manually send an io.EOF to the reader goroutine. @@ -112,15 +108,14 @@ func TestInboxCancellation(t *testing.T) { t.Run("StreamHandlerWaitingForReader", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) - require.NoError(t, err) - ctx, cancelFn := context.WithCancel(context.Background()) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0), ctx.Done()) + require.NoError(t, err) cancelFn() // A stream arrives but there is no reader. - err = <-handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.client.csChan) }) - require.True(t, testutils.IsError(err, "while waiting for reader"), err) + err = <-handleStream(context.Background(), inbox, rpcLayer.server, func() { close(rpcLayer.client.csChan) }) + require.True(t, testutils.IsError(err, "query execution canceled"), err) }) } @@ -130,7 +125,7 @@ func TestInboxCancellation(t *testing.T) { func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) { defer leaktest.AfterTest(t)() - inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0), nil /* flowCtxDone */) require.NoError(t, err) rpcLayer := makeMockFlowStreamRPCLayer() @@ -160,7 +155,7 @@ func TestInboxTimeout(t *testing.T) { ctx := context.Background() - inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) var ( @@ -266,7 +261,7 @@ func TestInboxShutdown(t *testing.T) { inboxCtx, inboxCancel := context.WithCancel(context.Background()) inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(inboxCtx) - inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), inboxCtx.Done()) require.NoError(t, err) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) @@ -434,7 +429,7 @@ func TestInboxShutdown(t *testing.T) { for i, errCh := range errChans { for err := <-errCh; err != nil; err = <-errCh { - if !testutils.IsError(err, "context canceled|artificial timeout") { + if !testutils.IsError(err, "context canceled|artificial timeout|query execution canceled") { // Error to keep on draining errors but mark this test as failed. t.Errorf("unexpected error %v from %s goroutine", err, goroutines[goroutineIndices[i]].name) } diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index d593e2e10595..0ea552671aa4 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -128,8 +128,8 @@ func (o *Outbox) close(ctx context.Context) { // coldata.Batches over the stream after sending a header with the provided flow // and stream ID. Note that an extra goroutine is spawned so that Recv may be // called concurrently wrt the Send goroutine to listen for drain signals. -// If an io.EOF is received while sending, the outbox will cancel all components -// from the same tree as the outbox. +// If an io.EOF is received while sending, this indicates the graceful +// completion of the FlowStream RPC, so the outbox simply exits. // If non-io.EOF is received while sending, the outbox will call flowCtxCancel // to shutdown all parts of the flow on this node. // If an error is encountered that cannot be sent over the stream, the error @@ -140,8 +140,8 @@ func (o *Outbox) close(ctx context.Context) { // metadata sources, send the metadata, and then call CloseSend on the // stream. The Outbox will wait until its Recv goroutine receives a non-nil // error to not leak resources. -// 2) A cancellation happened. This can come from the provided context or the -// remote reader. Refer to tests for expected behavior. +// 2) A cancellation happened. This can come from the provided flow context or +// the remote reader. Refer to tests for expected behavior. // 3) A drain signal was received from the server (consumer). In this case, the // Outbox goes through the same steps as 1). func (o *Outbox) Run( @@ -153,14 +153,6 @@ func (o *Outbox) Run( flowCtxCancel context.CancelFunc, connectionTimeout time.Duration, ) { - // Derive a child context so that we can cancel all components rooted in - // this outbox. - var outboxCtxCancel context.CancelFunc - ctx, outboxCtxCancel = context.WithCancel(ctx) - // Calling outboxCtxCancel is not strictly necessary, but we do it just to - // be safe. - defer outboxCtxCancel() - ctx, o.span = execinfra.ProcessorSpan(ctx, "outbox") if o.span != nil { defer o.span.Finish() @@ -213,23 +205,20 @@ func (o *Outbox) Run( } log.VEvent(ctx, 2, "Outbox starting normal operation") - o.runWithStream(ctx, stream, flowCtxCancel, outboxCtxCancel) + o.runWithStream(ctx, stream, flowCtxCancel) log.VEvent(ctx, 2, "Outbox exiting") } -// handleStreamErr is a utility method used to handle an error when calling -// a method on a flowStreamClient. If err is an io.EOF, outboxCtxCancel is -// called, for all other errors flowCtxCancel is. The given error is logged with -// the associated opName. +// handleStreamErr is a utility method used to handle an error when calling a +// method on a flowStreamClient. If err is an io.EOF, it is a graceful +// completion of the FlowStream RPC, so nothing is done, for all other errors +// flowCtxCancel is called. The given error is logged with the associated +// opName. func handleStreamErr( - ctx context.Context, - opName redact.SafeString, - err error, - flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, opName redact.SafeString, err error, flowCtxCancel context.CancelFunc, ) { if err == io.EOF { log.VEventf(ctx, 2, "Outbox calling outboxCtxCancel after %s EOF", opName) - outboxCtxCancel() } else { log.VEventf(ctx, 1, "Outbox calling flowCtxCancel after %s connection error: %+v", opName, err) flowCtxCancel() @@ -261,9 +250,9 @@ func (o *Outbox) moveToDraining(ctx context.Context, reason redact.RedactableStr // NOTE: if non-io.EOF error is encountered (indicating ungraceful shutdown // of the stream), flowCtxCancel will be called. If an io.EOF is encountered // (indicating a graceful shutdown initiated by the remote Inbox), -// outboxCtxCancel will be called. +// flowCtxCancel will **not** be called. func (o *Outbox) sendBatches( - ctx context.Context, stream flowStreamClient, flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, stream flowStreamClient, flowCtxCancel context.CancelFunc, ) (terminatedGracefully bool, errToSend error) { if o.runnerCtx == nil { // In the non-testing path, runnerCtx has been set in Run() method; @@ -313,7 +302,7 @@ func (o *Outbox) sendBatches( // soon as the message is written to the control buffer. The message is // marshaled (bytes are copied) before writing. if err := stream.Send(o.scratch.msg); err != nil { - handleStreamErr(ctx, "Send (batches)", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "Send (batches)", err, flowCtxCancel) return } } @@ -362,16 +351,13 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT // runWithStream should be called after sending the ProducerHeader on the // stream. It implements the behavior described in Run. func (o *Outbox) runWithStream( - ctx context.Context, stream flowStreamClient, flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, stream flowStreamClient, flowCtxCancel context.CancelFunc, ) { - // Cancellation functions might be nil in some tests, but we'll make them - // noops for convenience. + // flowCtxCancel might be nil in some tests, but we'll make it a noop for + // convenience. if flowCtxCancel == nil { flowCtxCancel = func() {} } - if outboxCtxCancel == nil { - outboxCtxCancel = func() {} - } waitCh := make(chan struct{}) go func() { // This goroutine's job is to listen continually on the stream from the @@ -384,14 +370,13 @@ func (o *Outbox) runWithStream( // forever after a connection is closed, since it wouldn't notice a // closed connection until it tried to Send over that connection. // - // Similarly, if an io.EOF error is received, it indicates that the - // server side of FlowStream RPC (the inbox) has exited gracefully, so - // the inbox doesn't need anything else from this outbox, and this - // goroutine will shut down the tree of operators rooted in this outbox. + // If an io.EOF error is received, it indicates that the server side of + // FlowStream RPC (the inbox) has exited gracefully, so nothing needs to + // be done on the outbox side anymore. for { msg, err := stream.Recv() if err != nil { - handleStreamErr(ctx, "watchdog Recv", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "watchdog Recv", err, flowCtxCancel) break } switch { @@ -405,7 +390,7 @@ func (o *Outbox) runWithStream( close(waitCh) }() - terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel) + terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel) if terminatedGracefully || errToSend != nil { var reason redact.RedactableString if errToSend != nil { @@ -415,14 +400,14 @@ func (o *Outbox) runWithStream( } o.moveToDraining(ctx, reason) if err := o.sendMetadata(ctx, stream, errToSend); err != nil { - handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel) } else { // Close the stream. Note that if this block isn't reached, the stream // is unusable. // The receiver goroutine will read from the stream until any error // is returned (most likely an io.EOF). if err := stream.CloseSend(); err != nil { - handleStreamErr(ctx, "CloseSend", err, flowCtxCancel, outboxCtxCancel) + handleStreamErr(ctx, "CloseSend", err, flowCtxCancel) } } } diff --git a/pkg/sql/colflow/colrpc/outbox_test.go b/pkg/sql/colflow/colrpc/outbox_test.go index ec9bf627c2b4..186d015ea4ff 100644 --- a/pkg/sql/colflow/colrpc/outbox_test.go +++ b/pkg/sql/colflow/colrpc/outbox_test.go @@ -52,13 +52,13 @@ func TestOutboxCatchesPanics(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) wg.Done() }() inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctx) - inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done()) require.NoError(t, err) streamHandlerErrCh := handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) }) @@ -132,7 +132,7 @@ func TestOutboxDrainsMetadataSources(t *testing.T) { // Close the csChan to unblock the Recv goroutine (we don't need it for this // test). close(rpcLayer.client.csChan) - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) require.True(t, atomic.LoadUint32(sourceDrained) == 1) }) @@ -149,7 +149,7 @@ func TestOutboxDrainsMetadataSources(t *testing.T) { require.NoError(t, err) close(rpcLayer.client.csChan) - outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */, nil /* outboxCtxCancel */) + outbox.runWithStream(ctx, rpcLayer.client, nil /* flowCtxCancel */) require.True(t, atomic.LoadUint32(sourceDrained) == 1) }) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 80153b37f17f..fdb4f4137bf9 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -537,10 +537,6 @@ type vectorizedFlowCreator struct { // numOutboxes counts how many colrpc.Outbox'es have been set up on this // node. It must be accessed atomically. numOutboxes int32 - // numOutboxesExited is an atomic that keeps track of how many outboxes have - // exited. When numOutboxesExited equals numOutboxes, the cancellation - // function for the flow is called on the non-gateway nodes. - numOutboxesExited int32 // numOutboxesDrained is an atomic that keeps track of how many outboxes // have been drained. When numOutboxesDrained equals numOutboxes, flow-level // metadata is added to a flow-level span on the non-gateway nodes. @@ -696,17 +692,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( flowCtxCancel, flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), ) - // When the last Outbox on this node exits, we want to make sure that - // everything is shutdown; namely, we need to call cancelFn if: - // - it is the last Outbox - // - the node is not the gateway (there is a flow coordinator on the - // gateway that will take care of the cancellation itself) - // - cancelFn is non-nil (it can be nil in tests). - // Calling cancelFn will cancel the context that all infrastructure on this - // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode && flowCtxCancel != nil { - flowCtxCancel() - } } s.accumulateAsyncComponent(run) return outbox, nil diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index d08308dfef3a..27e15ceba220 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -218,7 +218,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { for i := 0; i < numInboxes; i++ { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxLocal) - inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID)) + inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), ctxLocal.Done()) require.NoError(t, err) inboxes = append(inboxes, inbox) synchronizerInputs = append( @@ -330,7 +330,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { // Add another "remote" node to the flow. inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxAnotherRemote) - inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID)) + inbox, err := colrpc.NewInbox(colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), ctxAnotherRemote.Done()) require.NoError(t, err) inboxes = append(inboxes, inbox) outboxMemAccount := testMemMonitor.MakeBoundAccount() @@ -449,9 +449,9 @@ func TestVectorizedFlowShutdown(t *testing.T) { for i := range inboxes { err = <-handleStreamErrCh[i] - // We either should get no error or a context cancellation error. + // We either should get no error or a cancellation error. if err != nil { - require.True(t, testutils.IsError(err, "context canceled"), err) + require.True(t, testutils.IsError(err, "canceled"), err) } } wg.Wait() diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 177a3f20b92b..563581a379f7 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -211,7 +211,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { return colrpc.NewOutbox(allocator, input, typs, nil /* getStats */) }, newInboxFn: func(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) { - inbox, err := colrpc.NewInbox(allocator, typs, streamID) + inbox, err := colrpc.NewInbox(allocator, typs, streamID, nil /* flowCtxDone */) inboxToNumInputTypes[inbox] = typs return inbox, err }, diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index cdf2e3a20dfd..f0e61ab5518b 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -232,6 +232,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { log.Infof(ctx, "outbox: calling FlowStream") } // The context used here escapes, so it has to be a background context. + // TODO(yuzefovich): the usage of the TODO context here is suspicious. + // Investigate this. m.stream, err = client.FlowStream(context.TODO()) if err != nil { if log.V(1) {