From 48dd74ca479e0cd154277c79d1dfb787479d8ea1 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 17 Dec 2021 09:48:45 -0800 Subject: [PATCH] Revert "colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC" This reverts commit 2203baea4d66c93a3c5631b97c1b1b4072c8418e. --- pkg/sql/colflow/colrpc/colrpc_test.go | 89 ++++++++----------- pkg/sql/colflow/colrpc/inbox.go | 77 +++++----------- pkg/sql/colflow/colrpc/inbox_test.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 29 ++---- .../colflow/vectorized_flow_shutdown_test.go | 2 +- pkg/sql/colflow/vectorized_flow_test.go | 6 +- 6 files changed, 67 insertions(+), 138 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 74e588da9770..2ef627cec6d8 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -116,10 +116,25 @@ func makeMockFlowStreamRPCLayer() mockFlowStreamRPCLayer { // will call doneFn if non-nil once the handler returns. func handleStream( ctx context.Context, inbox *Inbox, stream flowStreamServer, doneFn func(), +) chan error { + return handleStreamWithFlowCtxDone(ctx, inbox, stream, nil /* flowCtxDone */, doneFn) +} + +// handleStreamWithFlowCtxDone is the same as handleStream but also takes in +// an optional Done channel for the flow context of the inbox host. +func handleStreamWithFlowCtxDone( + ctx context.Context, + inbox *Inbox, + stream flowStreamServer, + flowCtxDone <-chan struct{}, + doneFn func(), ) chan error { handleStreamErrCh := make(chan error, 1) + if flowCtxDone == nil { + flowCtxDone = make(<-chan struct{}) + } go func() { - handleStreamErrCh <- inbox.RunWithStream(ctx, stream) + handleStreamErrCh <- inbox.RunWithStream(ctx, stream, flowCtxDone) if doneFn != nil { doneFn() } @@ -148,12 +163,9 @@ func TestOutboxInbox(t *testing.T) { // streamCtxCancel models a scenario in which the Outbox host cancels // the flow. streamCtxCancel - // flowCtxCancel models a scenario in which the flow context of the - // Inbox host is canceled which is an ungraceful shutdown. - flowCtxCancel - // readerCtxCancel models a scenario in which the consumer of the Inbox - // cancels the context while the host's flow context is not canceled. - // This is considered a graceful termination. + // readerCtxCancel models a scenario in which the Inbox host cancels the + // flow. This is considered a graceful termination, and the flow context + // isn't canceled. readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks @@ -163,19 +175,16 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.2: + case randVal <= 0.25: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.4: + case randVal <= 0.50: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.6: - cancellationScenario = flowCtxCancel - cancellationScenarioName = "flowCtxCancel" - case randVal <= 0.8: + case randVal <= 0.75: cancellationScenario = readerCtxCancel cancellationScenarioName = "readerCtxCancel" - default: + case randVal <= 1: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" } @@ -204,10 +213,10 @@ func TestOutboxInbox(t *testing.T) { inputBuffer = colexecop.NewBatchBuffer() // Generate some random behavior before passing the random number // generator to be used in the Outbox goroutine (to avoid races). - // These sleep variables enable a sleep for up to five milliseconds - // with a .5 probability before cancellation. - sleepBeforeCancellation = rng.Float64() <= 0.5 - sleepTime = time.Microsecond * time.Duration(rng.Intn(5000)) + // These sleep variables enable a sleep for up to half a millisecond + // with a .25 probability before cancellation. + sleepBeforeCancellation = rng.Float64() <= 0.25 + sleepTime = time.Microsecond * time.Duration(rng.Intn(500)) // stopwatch is used to measure how long it takes for the outbox to // exit once the transport broke. stopwatch = timeutil.NewStopWatch() @@ -290,8 +299,7 @@ func TestOutboxInbox(t *testing.T) { wg.Done() }() - inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) - readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) + readerCtx, readerCancelFn := context.WithCancel(context.Background()) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -301,8 +309,6 @@ func TestOutboxInbox(t *testing.T) { case noCancel: case streamCtxCancel: streamCancelFn() - case flowCtxCancel: - inboxFlowCtxCancelFn() case readerCtxCancel: readerCancelFn() case transportBreaks: @@ -315,10 +321,7 @@ func TestOutboxInbox(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(readerCtx) - inbox, err := NewInboxWithFlowCtxDone( - colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), - typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), - ) + inbox, err := NewInbox(colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) @@ -417,28 +420,10 @@ func TestOutboxInbox(t *testing.T) { // which prompts the watchdog goroutine of the outbox to cancel the // flow. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1) - case flowCtxCancel: - // If the flow context of the Inbox host gets canceled, it is - // treated as an ungraceful termination of the stream, so we expect - // an error from the stream handler. - require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError)) - // The Inbox propagates this cancellation on its host. - require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) - - // In the production setup, the watchdog goroutine of the outbox - // would receive non-io.EOF error indicating an ungraceful - // completion of the FlowStream RPC call which would prompt the - // outbox to cancel the whole flow. - // - // However, because we're using a mock server, the propagation - // doesn't take place, so the flow context on the outbox side should - // not be canceled. - require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) - require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case readerCtxCancel: - // If the reader context gets canceled while the inbox's host flow - // context doesn't, it is treated as a graceful termination of the - // stream, so we expect no error from the stream handler. + // If the reader context gets canceled, it is treated as a graceful + // termination of the stream, so we expect no error from the stream + // handler. require.Nil(t, streamHandlerErr) // The Inbox should still propagate this error upwards. require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) @@ -520,17 +505,17 @@ func TestInboxHostCtxCancellation(t *testing.T) { inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background()) inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(inboxHostCtx) - inbox, err := NewInboxWithFlowCtxDone( - colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), - typs, execinfrapb.StreamID(0), inboxHostCtx.Done(), - ) + inbox, err := NewInbox(colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) // Spawn up the stream handler (a separate goroutine) for the server side // of the FlowStream RPC. serverStreamNotification := <-mockServer.InboundStreams serverStream := serverStreamNotification.Stream - streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) + streamHandlerErrCh := handleStreamWithFlowCtxDone( + serverStream.Context(), inbox, serverStream, + inboxHostCtx.Done(), func() { close(serverStreamNotification.Donec) }, + ) // Here is the meat of the test - the inbox is never initialized, and, // instead, the inbox host's flow context is canceled after some delay. diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index f12406809952..6962014a357b 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -78,9 +78,6 @@ type Inbox struct { // goroutine should exit while waiting for a stream. timeoutCh chan error - // flowCtxDone is the Done() channel of the flow context of the Inbox host. - flowCtxDone <-chan struct{} - // errCh is the channel that RunWithStream will block on, waiting until the // Inbox does not need a stream any more. An error will only be sent on this // channel in the event of a cancellation or a non-io.EOF error originating @@ -158,33 +155,16 @@ func NewInbox( return i, nil } -// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow -// context is available. -func NewInboxWithFlowCtxDone( - allocator *colmem.Allocator, - typs []*types.T, - streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, -) (*Inbox, error) { - i, err := NewInbox(allocator, typs, streamID) - if err != nil { - return nil, err - } - i.flowCtxDone = flowCtxDone - return i, nil -} - // NewInboxWithAdmissionControl creates a new Inbox that does admission // control on responses received from DistSQL. func NewInboxWithAdmissionControl( allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, admissionQ *admission.WorkQueue, admissionInfo admission.WorkInfo, ) (*Inbox, error) { - i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) + i, err := NewInbox(allocator, typs, streamID) if err != nil { return nil, err } @@ -205,22 +185,15 @@ func (i *Inbox) close() { } } -// checkFlowCtxCancellation returns an error if the flow context has already -// been canceled. -func (i *Inbox) checkFlowCtxCancellation() error { - select { - case <-i.flowCtxDone: - return cancelchecker.QueryCanceledError - default: - return nil - } -} - // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, the Inbox's host cancels the flow context, a caller of Next cancels -// the context passed into Init, or any error is encountered on the stream by -// the Next goroutine. -func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { +// canceled, a caller of Next cancels the context passed into Init, or any error +// is encountered on the stream by the Next goroutine. +// +// flowCtxDone is listened on only during the setup of the handler, before the +// readerCtx is received. This is needed in case Inbox.Init is never called. +func (i *Inbox) RunWithStream( + streamCtx context.Context, stream flowStreamServer, flowCtxDone <-chan struct{}, +) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") defer log.VEvent(streamCtx, 2, "Inbox exited stream handler") @@ -236,7 +209,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)") - case <-i.flowCtxDone: + case <-flowCtxDone: // The flow context of the inbox host has been canceled. This can occur // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. @@ -246,21 +219,18 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer // Now wait for one of the events described in the method comment. If a // cancellation is encountered, nothing special must be done to cancel the // reader goroutine as returning from the handler will close the stream. + // + // Note that we don't listen for cancellation on flowCtxDone because + // readerCtx must be the child of the flow context. select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err - case <-i.flowCtxDone: - // The flow context of the inbox host has been canceled. This can occur - // e.g. when the query is canceled, or when another stream encountered - // an unrecoverable error forcing it to shutdown the flow. - return cancelchecker.QueryCanceledError case <-readerCtx.Done(): - // readerCtx is canceled, but we don't know whether it was because the - // flow context was canceled or for other reason. In the former case we - // have an ungraceful shutdown whereas in the latter case we have a - // graceful one. - return i.checkFlowCtxCancellation() + // The reader canceled the stream meaning that it no longer needs any + // more data from the outbox. This is a graceful termination, so we + // return nil. + return nil case <-streamCtx.Done(): // The client canceled the stream. return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)") @@ -290,15 +260,12 @@ func (i *Inbox) Init(ctx context.Context) { case err := <-i.timeoutCh: i.errCh <- errors.Wrap(err, "remote stream arrived too late") return err - case <-i.flowCtxDone: - i.errCh <- cancelchecker.QueryCanceledError - return cancelchecker.QueryCanceledError case <-i.Ctx.Done(): - if err := i.checkFlowCtxCancellation(); err != nil { - // This is an ungraceful termination because the flow context - // has been canceled. - i.errCh <- err - } + // Our reader canceled the context meaning that it no longer needs + // any more data from the outbox. This is a graceful termination, so + // we don't send any error on errCh and only return an error. This + // will close the inbox (making the stream handler exit gracefully) + // and will stop the current goroutine from proceeding further. return i.Ctx.Err() } diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 03c0d4cdec65..c271e8840a37 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -70,7 +70,7 @@ func TestInboxCancellation(t *testing.T) { err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) }) require.True(t, testutils.IsError(err, "context canceled"), err) // Now, the remote stream arrives. - err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}) + err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}, make(<-chan struct{})) // We expect no error from the stream handler since we canceled it // ourselves (a graceful termination). require.Nil(t, err) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 80153b37f17f..fb8da7f20d16 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -453,8 +453,6 @@ type flowCreatorHelper interface { // addFlowCoordinator adds the FlowCoordinator to the flow. This is only // done on the gateway node. addFlowCoordinator(coordinator *FlowCoordinator) - // getCtxDone returns done channel of the context of this flow. - getFlowCtxDone() <-chan struct{} // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc } @@ -473,13 +471,8 @@ type remoteComponentCreator interface { typs []*types.T, getStats func() []*execinfrapb.ComponentStats, ) (*colrpc.Outbox, error) - newInbox( - allocator *colmem.Allocator, - typs []*types.T, - streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, - admissionOpts admissionOptions, - ) (*colrpc.Inbox, error) + newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + admissionOpts admissionOptions) (*colrpc.Inbox, error) } type vectorizedRemoteComponentCreator struct{} @@ -497,13 +490,10 @@ func (vectorizedRemoteComponentCreator) newInbox( allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, - flowCtxDone <-chan struct{}, admissionOpts admissionOptions, ) (*colrpc.Inbox, error) { return colrpc.NewInboxWithAdmissionControl( - allocator, typs, streamID, flowCtxDone, - admissionOpts.admissionQ, admissionOpts.admissionInfo, - ) + allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo) } // vectorizedFlowCreator performs all the setup of vectorized flows. Depending @@ -855,7 +845,6 @@ func (s *vectorizedFlowCreator) setupInput( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID, - s.flowCreatorHelper.getFlowCtxDone(), admissionOptions{ admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ, admissionInfo: s.admissionInfo, @@ -1223,9 +1212,9 @@ func (s vectorizedInboundStreamHandler) Run( ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, _ *execinfrapb.ProducerMessage, - _ *flowinfra.FlowBase, + f *flowinfra.FlowBase, ) error { - return s.RunWithStream(ctx, stream) + return s.RunWithStream(ctx, stream, f.GetCtxDone()) } // Timeout is part of the flowinfra.InboundStreamHandler interface. @@ -1289,10 +1278,6 @@ func (r *vectorizedFlowCreatorHelper) addFlowCoordinator(f *FlowCoordinator) { r.f.SetProcessors(r.processors) } -func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { - return r.f.GetCtxDone() -} - func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return r.f.GetCancelFlowFn() } @@ -1348,10 +1333,6 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {} func (r *noopFlowCreatorHelper) addFlowCoordinator(coordinator *FlowCoordinator) {} -func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { - return nil -} - func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return nil } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index d08308dfef3a..7c1714494425 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -289,7 +289,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { doneFn := func() { close(serverStreamNotification.Donec) } wg.Add(1) go func(id int, stream execinfrapb.DistSQL_FlowStreamServer, doneFn func()) { - handleStreamErrCh[id] <- inbox.RunWithStream(stream.Context(), stream) + handleStreamErrCh[id] <- inbox.RunWithStream(stream.Context(), stream, make(<-chan struct{})) doneFn() wg.Done() }(id, serverStream, doneFn) diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 177a3f20b92b..2bb6ca1a785c 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -52,11 +52,7 @@ func (c callbackRemoteComponentCreator) newOutbox( } func (c callbackRemoteComponentCreator) newInbox( - allocator *colmem.Allocator, - typs []*types.T, - streamID execinfrapb.StreamID, - _ <-chan struct{}, - _ admissionOptions, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ admissionOptions, ) (*colrpc.Inbox, error) { return c.newInboxFn(allocator, typs, streamID) }