From 2203baea4d66c93a3c5631b97c1b1b4072c8418e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 15 Dec 2021 12:24:14 -0800 Subject: [PATCH] colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC This commit fixes an oversight in the cancellation protocol of the vectorized inbox/outbox communication. Previously, when the flow context of the inbox host has been canceled (indicating that the whole query should be canceled) we would propagate it as a graceful completion of the `FlowStream` RPC which would result in the outbox cancelling only its own subtree on the remote node. However, what we ought to do is to propagate such cancellation as the ungraceful RPC completion so that the outbox would also cancel the flow context of its own host. In some rare cases the old behavior could result in some flows being stuck forever (until a node is restarted) because they would get blocked on producing the data when their consumer has already exited. The behavior in this fix is what we already have in place for the row-by-row engine (see `processInboundStreamHelper` in `flowinfra/inbound.go`). Release note (bug fix): The bug with the ungraceful shutdown of the distributed queries in some rare cases has been fixed. "Ungraceful" here means because of the `statement_timeout` (most likely) or because a node crashed. --- pkg/sql/colflow/colrpc/colrpc_test.go | 89 +++++++++++-------- pkg/sql/colflow/colrpc/inbox.go | 77 +++++++++++----- pkg/sql/colflow/colrpc/inbox_test.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 29 ++++-- .../colflow/vectorized_flow_shutdown_test.go | 2 +- pkg/sql/colflow/vectorized_flow_test.go | 6 +- 6 files changed, 138 insertions(+), 67 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 2ef627cec6d8..74e588da9770 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -116,25 +116,10 @@ func makeMockFlowStreamRPCLayer() mockFlowStreamRPCLayer { // will call doneFn if non-nil once the handler returns. func handleStream( ctx context.Context, inbox *Inbox, stream flowStreamServer, doneFn func(), -) chan error { - return handleStreamWithFlowCtxDone(ctx, inbox, stream, nil /* flowCtxDone */, doneFn) -} - -// handleStreamWithFlowCtxDone is the same as handleStream but also takes in -// an optional Done channel for the flow context of the inbox host. -func handleStreamWithFlowCtxDone( - ctx context.Context, - inbox *Inbox, - stream flowStreamServer, - flowCtxDone <-chan struct{}, - doneFn func(), ) chan error { handleStreamErrCh := make(chan error, 1) - if flowCtxDone == nil { - flowCtxDone = make(<-chan struct{}) - } go func() { - handleStreamErrCh <- inbox.RunWithStream(ctx, stream, flowCtxDone) + handleStreamErrCh <- inbox.RunWithStream(ctx, stream) if doneFn != nil { doneFn() } @@ -163,9 +148,12 @@ func TestOutboxInbox(t *testing.T) { // streamCtxCancel models a scenario in which the Outbox host cancels // the flow. streamCtxCancel - // readerCtxCancel models a scenario in which the Inbox host cancels the - // flow. This is considered a graceful termination, and the flow context - // isn't canceled. + // flowCtxCancel models a scenario in which the flow context of the + // Inbox host is canceled which is an ungraceful shutdown. + flowCtxCancel + // readerCtxCancel models a scenario in which the consumer of the Inbox + // cancels the context while the host's flow context is not canceled. + // This is considered a graceful termination. readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks @@ -175,16 +163,19 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.25: + case randVal <= 0.2: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.50: + case randVal <= 0.4: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.75: + case randVal <= 0.6: + cancellationScenario = flowCtxCancel + cancellationScenarioName = "flowCtxCancel" + case randVal <= 0.8: cancellationScenario = readerCtxCancel cancellationScenarioName = "readerCtxCancel" - case randVal <= 1: + default: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" } @@ -213,10 +204,10 @@ func TestOutboxInbox(t *testing.T) { inputBuffer = colexecop.NewBatchBuffer() // Generate some random behavior before passing the random number // generator to be used in the Outbox goroutine (to avoid races). - // These sleep variables enable a sleep for up to half a millisecond - // with a .25 probability before cancellation. - sleepBeforeCancellation = rng.Float64() <= 0.25 - sleepTime = time.Microsecond * time.Duration(rng.Intn(500)) + // These sleep variables enable a sleep for up to five milliseconds + // with a .5 probability before cancellation. + sleepBeforeCancellation = rng.Float64() <= 0.5 + sleepTime = time.Microsecond * time.Duration(rng.Intn(5000)) // stopwatch is used to measure how long it takes for the outbox to // exit once the transport broke. stopwatch = timeutil.NewStopWatch() @@ -299,7 +290,8 @@ func TestOutboxInbox(t *testing.T) { wg.Done() }() - readerCtx, readerCancelFn := context.WithCancel(context.Background()) + inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) + readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -309,6 +301,8 @@ func TestOutboxInbox(t *testing.T) { case noCancel: case streamCtxCancel: streamCancelFn() + case flowCtxCancel: + inboxFlowCtxCancelFn() case readerCtxCancel: readerCancelFn() case transportBreaks: @@ -321,7 +315,10 @@ func TestOutboxInbox(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(readerCtx) - inbox, err := NewInbox(colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInboxWithFlowCtxDone( + colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), + typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), + ) require.NoError(t, err) streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) @@ -420,10 +417,28 @@ func TestOutboxInbox(t *testing.T) { // which prompts the watchdog goroutine of the outbox to cancel the // flow. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1) + case flowCtxCancel: + // If the flow context of the Inbox host gets canceled, it is + // treated as an ungraceful termination of the stream, so we expect + // an error from the stream handler. + require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError)) + // The Inbox propagates this cancellation on its host. + require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) + + // In the production setup, the watchdog goroutine of the outbox + // would receive non-io.EOF error indicating an ungraceful + // completion of the FlowStream RPC call which would prompt the + // outbox to cancel the whole flow. + // + // However, because we're using a mock server, the propagation + // doesn't take place, so the flow context on the outbox side should + // not be canceled. + require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) + require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case readerCtxCancel: - // If the reader context gets canceled, it is treated as a graceful - // termination of the stream, so we expect no error from the stream - // handler. + // If the reader context gets canceled while the inbox's host flow + // context doesn't, it is treated as a graceful termination of the + // stream, so we expect no error from the stream handler. require.Nil(t, streamHandlerErr) // The Inbox should still propagate this error upwards. require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) @@ -505,17 +520,17 @@ func TestInboxHostCtxCancellation(t *testing.T) { inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background()) inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(inboxHostCtx) - inbox, err := NewInbox(colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInboxWithFlowCtxDone( + colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), + typs, execinfrapb.StreamID(0), inboxHostCtx.Done(), + ) require.NoError(t, err) // Spawn up the stream handler (a separate goroutine) for the server side // of the FlowStream RPC. serverStreamNotification := <-mockServer.InboundStreams serverStream := serverStreamNotification.Stream - streamHandlerErrCh := handleStreamWithFlowCtxDone( - serverStream.Context(), inbox, serverStream, - inboxHostCtx.Done(), func() { close(serverStreamNotification.Donec) }, - ) + streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) // Here is the meat of the test - the inbox is never initialized, and, // instead, the inbox host's flow context is canceled after some delay. diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 6962014a357b..f12406809952 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -78,6 +78,9 @@ type Inbox struct { // goroutine should exit while waiting for a stream. timeoutCh chan error + // flowCtxDone is the Done() channel of the flow context of the Inbox host. + flowCtxDone <-chan struct{} + // errCh is the channel that RunWithStream will block on, waiting until the // Inbox does not need a stream any more. An error will only be sent on this // channel in the event of a cancellation or a non-io.EOF error originating @@ -155,16 +158,33 @@ func NewInbox( return i, nil } +// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow +// context is available. +func NewInboxWithFlowCtxDone( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, +) (*Inbox, error) { + i, err := NewInbox(allocator, typs, streamID) + if err != nil { + return nil, err + } + i.flowCtxDone = flowCtxDone + return i, nil +} + // NewInboxWithAdmissionControl creates a new Inbox that does admission // control on responses received from DistSQL. func NewInboxWithAdmissionControl( allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, admissionQ *admission.WorkQueue, admissionInfo admission.WorkInfo, ) (*Inbox, error) { - i, err := NewInbox(allocator, typs, streamID) + i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) if err != nil { return nil, err } @@ -185,15 +205,22 @@ func (i *Inbox) close() { } } +// checkFlowCtxCancellation returns an error if the flow context has already +// been canceled. +func (i *Inbox) checkFlowCtxCancellation() error { + select { + case <-i.flowCtxDone: + return cancelchecker.QueryCanceledError + default: + return nil + } +} + // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, a caller of Next cancels the context passed into Init, or any error -// is encountered on the stream by the Next goroutine. -// -// flowCtxDone is listened on only during the setup of the handler, before the -// readerCtx is received. This is needed in case Inbox.Init is never called. -func (i *Inbox) RunWithStream( - streamCtx context.Context, stream flowStreamServer, flowCtxDone <-chan struct{}, -) error { +// canceled, the Inbox's host cancels the flow context, a caller of Next cancels +// the context passed into Init, or any error is encountered on the stream by +// the Next goroutine. +func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") defer log.VEvent(streamCtx, 2, "Inbox exited stream handler") @@ -209,7 +236,7 @@ func (i *Inbox) RunWithStream( log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)") - case <-flowCtxDone: + case <-i.flowCtxDone: // The flow context of the inbox host has been canceled. This can occur // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. @@ -219,18 +246,21 @@ func (i *Inbox) RunWithStream( // Now wait for one of the events described in the method comment. If a // cancellation is encountered, nothing special must be done to cancel the // reader goroutine as returning from the handler will close the stream. - // - // Note that we don't listen for cancellation on flowCtxDone because - // readerCtx must be the child of the flow context. select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err + case <-i.flowCtxDone: + // The flow context of the inbox host has been canceled. This can occur + // e.g. when the query is canceled, or when another stream encountered + // an unrecoverable error forcing it to shutdown the flow. + return cancelchecker.QueryCanceledError case <-readerCtx.Done(): - // The reader canceled the stream meaning that it no longer needs any - // more data from the outbox. This is a graceful termination, so we - // return nil. - return nil + // readerCtx is canceled, but we don't know whether it was because the + // flow context was canceled or for other reason. In the former case we + // have an ungraceful shutdown whereas in the latter case we have a + // graceful one. + return i.checkFlowCtxCancellation() case <-streamCtx.Done(): // The client canceled the stream. return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)") @@ -260,12 +290,15 @@ func (i *Inbox) Init(ctx context.Context) { case err := <-i.timeoutCh: i.errCh <- errors.Wrap(err, "remote stream arrived too late") return err + case <-i.flowCtxDone: + i.errCh <- cancelchecker.QueryCanceledError + return cancelchecker.QueryCanceledError case <-i.Ctx.Done(): - // Our reader canceled the context meaning that it no longer needs - // any more data from the outbox. This is a graceful termination, so - // we don't send any error on errCh and only return an error. This - // will close the inbox (making the stream handler exit gracefully) - // and will stop the current goroutine from proceeding further. + if err := i.checkFlowCtxCancellation(); err != nil { + // This is an ungraceful termination because the flow context + // has been canceled. + i.errCh <- err + } return i.Ctx.Err() } diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index c271e8840a37..03c0d4cdec65 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -70,7 +70,7 @@ func TestInboxCancellation(t *testing.T) { err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) }) require.True(t, testutils.IsError(err, "context canceled"), err) // Now, the remote stream arrives. - err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}, make(<-chan struct{})) + err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}) // We expect no error from the stream handler since we canceled it // ourselves (a graceful termination). require.Nil(t, err) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index fb8da7f20d16..80153b37f17f 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -453,6 +453,8 @@ type flowCreatorHelper interface { // addFlowCoordinator adds the FlowCoordinator to the flow. This is only // done on the gateway node. addFlowCoordinator(coordinator *FlowCoordinator) + // getCtxDone returns done channel of the context of this flow. + getFlowCtxDone() <-chan struct{} // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc } @@ -471,8 +473,13 @@ type remoteComponentCreator interface { typs []*types.T, getStats func() []*execinfrapb.ComponentStats, ) (*colrpc.Outbox, error) - newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, - admissionOpts admissionOptions) (*colrpc.Inbox, error) + newInbox( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, + admissionOpts admissionOptions, + ) (*colrpc.Inbox, error) } type vectorizedRemoteComponentCreator struct{} @@ -490,10 +497,13 @@ func (vectorizedRemoteComponentCreator) newInbox( allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, admissionOpts admissionOptions, ) (*colrpc.Inbox, error) { return colrpc.NewInboxWithAdmissionControl( - allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo) + allocator, typs, streamID, flowCtxDone, + admissionOpts.admissionQ, admissionOpts.admissionInfo, + ) } // vectorizedFlowCreator performs all the setup of vectorized flows. Depending @@ -845,6 +855,7 @@ func (s *vectorizedFlowCreator) setupInput( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID, + s.flowCreatorHelper.getFlowCtxDone(), admissionOptions{ admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ, admissionInfo: s.admissionInfo, @@ -1212,9 +1223,9 @@ func (s vectorizedInboundStreamHandler) Run( ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, _ *execinfrapb.ProducerMessage, - f *flowinfra.FlowBase, + _ *flowinfra.FlowBase, ) error { - return s.RunWithStream(ctx, stream, f.GetCtxDone()) + return s.RunWithStream(ctx, stream) } // Timeout is part of the flowinfra.InboundStreamHandler interface. @@ -1278,6 +1289,10 @@ func (r *vectorizedFlowCreatorHelper) addFlowCoordinator(f *FlowCoordinator) { r.f.SetProcessors(r.processors) } +func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return r.f.GetCtxDone() +} + func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return r.f.GetCancelFlowFn() } @@ -1333,6 +1348,10 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {} func (r *noopFlowCreatorHelper) addFlowCoordinator(coordinator *FlowCoordinator) {} +func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return nil +} + func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return nil } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 7c1714494425..d08308dfef3a 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -289,7 +289,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { doneFn := func() { close(serverStreamNotification.Donec) } wg.Add(1) go func(id int, stream execinfrapb.DistSQL_FlowStreamServer, doneFn func()) { - handleStreamErrCh[id] <- inbox.RunWithStream(stream.Context(), stream, make(<-chan struct{})) + handleStreamErrCh[id] <- inbox.RunWithStream(stream.Context(), stream) doneFn() wg.Done() }(id, serverStream, doneFn) diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 2bb6ca1a785c..177a3f20b92b 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -52,7 +52,11 @@ func (c callbackRemoteComponentCreator) newOutbox( } func (c callbackRemoteComponentCreator) newInbox( - allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ admissionOptions, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + _ <-chan struct{}, + _ admissionOptions, ) (*colrpc.Inbox, error) { return c.newInboxFn(allocator, typs, streamID) }