From 38616797515ae7d9821964fb6c307a9d44a4c4e5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 15 Dec 2021 12:24:14 -0800 Subject: [PATCH 1/3] colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC This commit fixes an oversight in the cancellation protocol of the vectorized inbox/outbox communication. Previously, when the flow context of the inbox host has been canceled (indicating that the whole query should be canceled) we would propagate it as a graceful completion of the `FlowStream` RPC which would result in the outbox cancelling only its own subtree on the remote node. However, what we ought to do is to propagate such cancellation as the ungraceful RPC completion so that the outbox would also cancel the flow context of its own host. In some rare cases the old behavior could result in some flows being stuck forever (until a node is restarted) because they would get blocked on producing the data when their consumer has already exited. The behavior in this fix is what we already have in place for the row-by-row engine (see `processInboundStreamHelper` in `flowinfra/inbound.go`). Release note (bug fix): The bug with the ungraceful shutdown of the distributed queries in some rare cases has been fixed. "Ungraceful" here means because of the `statement_timeout` (most likely) or because a node crashed. --- pkg/sql/colflow/colrpc/BUILD.bazel | 1 + pkg/sql/colflow/colrpc/colrpc_test.go | 71 +++++++++++++----- pkg/sql/colflow/colrpc/inbox.go | 75 ++++++++++++------- pkg/sql/colflow/colrpc/inbox_test.go | 12 +-- pkg/sql/colflow/colrpc/outbox_test.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 27 ++++++- .../colflow/vectorized_flow_shutdown_test.go | 2 - pkg/sql/colflow/vectorized_flow_test.go | 4 +- 8 files changed, 134 insertions(+), 60 deletions(-) diff --git a/pkg/sql/colflow/colrpc/BUILD.bazel b/pkg/sql/colflow/colrpc/BUILD.bazel index 80d08ac5a502..31a4722c6f66 100644 --- a/pkg/sql/colflow/colrpc/BUILD.bazel +++ b/pkg/sql/colflow/colrpc/BUILD.bazel @@ -57,6 +57,7 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/types", "//pkg/testutils", + "//pkg/util/cancelchecker", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 62810a878482..61b7bb6e89e8 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -146,9 +147,12 @@ func TestOutboxInbox(t *testing.T) { // streamCtxCancel models a scenario in which the Outbox host cancels // the flow. streamCtxCancel - // readerCtxCancel models a scenario in which the Inbox host cancels the - // flow. This is considered a graceful termination, and the flow context - // isn't canceled. + // flowCtxCancel models a scenario in which the flow context of the + // Inbox host is canceled which is an ungraceful shutdown. + flowCtxCancel + // readerCtxCancel models a scenario in which the consumer of the Inbox + // cancels the context while the host's flow context is not canceled. + // This is considered a graceful termination. readerCtxCancel // transportBreaks models a scenario in which the transport breaks. transportBreaks @@ -158,16 +162,19 @@ func TestOutboxInbox(t *testing.T) { cancellationScenarioName string ) switch randVal := rng.Float64(); { - case randVal <= 0.25: + case randVal <= 0.2: cancellationScenario = noCancel cancellationScenarioName = "noCancel" - case randVal <= 0.50: + case randVal <= 0.4: cancellationScenario = streamCtxCancel cancellationScenarioName = "streamCtxCancel" - case randVal <= 0.75: + case randVal <= 0.6: + cancellationScenario = flowCtxCancel + cancellationScenarioName = "flowCtxCancel" + case randVal <= 0.8: cancellationScenario = readerCtxCancel cancellationScenarioName = "readerCtxCancel" - case randVal <= 1: + default: cancellationScenario = transportBreaks cancellationScenarioName = "transportBreaks" } @@ -196,10 +203,10 @@ func TestOutboxInbox(t *testing.T) { inputBuffer = colexecop.NewBatchBuffer() // Generate some random behavior before passing the random number // generator to be used in the Outbox goroutine (to avoid races). - // These sleep variables enable a sleep for up to half a millisecond - // with a .25 probability before cancellation. - sleepBeforeCancellation = rng.Float64() <= 0.25 - sleepTime = time.Microsecond * time.Duration(rng.Intn(500)) + // These sleep variables enable a sleep for up to five milliseconds + // with a .5 probability before cancellation. + sleepBeforeCancellation = rng.Float64() <= 0.5 + sleepTime = time.Microsecond * time.Duration(rng.Intn(5000)) // stopwatch is used to measure how long it takes for the outbox to // exit once the transport broke. stopwatch = timeutil.NewStopWatch() @@ -250,9 +257,14 @@ func TestOutboxInbox(t *testing.T) { ) require.NoError(t, err) + inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background()) + readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx) inboxMemAcc := testMemMonitor.MakeBoundAccount() - defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + defer inboxMemAcc.Close(readerCtx) + inbox, err := NewInboxWithFlowCtxDone( + colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), + typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(), + ) require.NoError(t, err) streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) @@ -283,7 +295,6 @@ func TestOutboxInbox(t *testing.T) { wg.Done() }() - readerCtx, readerCancelFn := context.WithCancel(ctx) wg.Add(1) go func() { if sleepBeforeCancellation { @@ -293,6 +304,8 @@ func TestOutboxInbox(t *testing.T) { case noCancel: case streamCtxCancel: streamCancelFn() + case flowCtxCancel: + inboxFlowCtxCancelFn() case readerCtxCancel: readerCancelFn() case transportBreaks: @@ -396,10 +409,28 @@ func TestOutboxInbox(t *testing.T) { // which prompts the watchdog goroutine of the outbox to cancel the // flow. require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1) + case flowCtxCancel: + // If the flow context of the Inbox host gets canceled, it is + // treated as an ungraceful termination of the stream, so we expect + // an error from the stream handler. + require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError)) + // The Inbox propagates this cancellation on its host. + require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) + + // In the production setup, the watchdog goroutine of the outbox + // would receive non-io.EOF error indicating an ungraceful + // completion of the FlowStream RPC call which would prompt the + // outbox to cancel the whole flow. + // + // However, because we're using a mock server, the propagation + // doesn't take place, so the flow context on the outbox side should + // not be canceled. + require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0) + require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1) case readerCtxCancel: - // If the reader context gets canceled, it is treated as a graceful - // termination of the stream, so we expect no error from the stream - // handler. + // If the reader context gets canceled while the inbox's host flow + // context doesn't, it is treated as a graceful termination of the + // stream, so we expect no error from the stream handler. require.Nil(t, streamHandlerErr) // The Inbox should still propagate this error upwards. require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) @@ -560,7 +591,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) var ( @@ -642,7 +673,7 @@ func BenchmarkOutboxInbox(b *testing.B) { inboxMemAcc := testMemMonitor.MakeBoundAccount() defer inboxMemAcc.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(b, err) var wg sync.WaitGroup @@ -766,7 +797,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) { typs := []*types.T{types.Int} - inbox, err := NewInbox(ctx, testAllocator, typs, streamID) + inbox, err := NewInbox(testAllocator, typs, streamID) require.NoError(t, err) ctxExtract := make(chan struct{}) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index fcc9149e8f01..d228aee72afa 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -75,6 +75,9 @@ type Inbox struct { // goroutine should exit while waiting for a stream. timeoutCh chan error + // flowCtxDone is the Done() channel of the flow context of the Inbox host. + flowCtxDone <-chan struct{} + // errCh is the channel that RunWithStream will block on, waiting until the // Inbox does not need a stream any more. An error will only be sent on this // channel in the event of a cancellation or a non-io.EOF error originating @@ -99,14 +102,6 @@ type Inbox struct { // only the Next/DrainMeta goroutine may access it. stream flowStreamServer - // flowCtx is a temporary field that captures a flow's context during - // initialization. This is so that RunWithStream can listen for cancellation - // even in the case in which Next is not called (e.g. in cases where the Inbox - // is the left side of a HashJoiner). The best solution for this problem would - // be to refactor Operator.Init to accept a context since that must be called - // regardless of whether or not Next is called. - flowCtx context.Context - // statsAtomics are the execution statistics that need to be atomically // accessed. This is necessary since Get*() methods can be called from // different goroutine than Next(). @@ -134,7 +129,7 @@ var _ colexecop.Operator = &Inbox{} // NewInbox creates a new Inbox. func NewInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, ) (*Inbox, error) { c, err := colserde.NewArrowBatchConverter(typs) if err != nil { @@ -154,13 +149,28 @@ func NewInbox( contextCh: make(chan context.Context, 1), timeoutCh: make(chan error, 1), errCh: make(chan error, 1), - flowCtx: ctx, deserializationStopWatch: timeutil.NewStopWatch(), } i.scratch.data = make([]*array.Data, len(typs)) return i, nil } +// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow +// context is available. +func NewInboxWithFlowCtxDone( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, +) (*Inbox, error) { + i, err := NewInbox(allocator, typs, streamID) + if err != nil { + return nil, err + } + i.flowCtxDone = flowCtxDone + return i, nil +} + // maybeInit calls Inbox.init if the inbox is not initialized and returns an // error if the initialization was not successful. Usually this is because the // given context is canceled before the remote stream arrives. @@ -188,11 +198,11 @@ func (i *Inbox) init(ctx context.Context) error { i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err) return err case <-ctx.Done(): - // Our reader canceled the context meaning that it no longer needs - // any more data from the outbox. This is a graceful termination, so - // we don't send any error on errCh and only return an error. This - // will close the inbox (making the stream handler exit gracefully) - // and will stop the current goroutine from proceeding further. + if err := i.checkFlowCtxCancellation(); err != nil { + // This is an ungraceful termination because the flow context has + // been canceled. + i.errCh <- err + } return ctx.Err() } @@ -215,9 +225,21 @@ func (i *Inbox) close() { } } +// checkFlowCtxCancellation returns an error if the flow context has already +// been canceled. +func (i *Inbox) checkFlowCtxCancellation() error { + select { + case <-i.flowCtxDone: + return cancelchecker.QueryCanceledError + default: + return nil + } +} + // RunWithStream sets the Inbox's stream and waits until either streamCtx is -// canceled, a caller of Next cancels the first context passed into Next, or any -// error is encountered on the stream by the Next goroutine. +// canceled, the Inbox's host cancels the flow context, a caller of Next cancels +// the context passed into Init, or any error is encountered on the stream by +// the Next goroutine. func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error { streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID) log.VEvent(streamCtx, 2, "Inbox handling stream") @@ -234,7 +256,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer log.VEvent(streamCtx, 2, "Inbox reader arrived") case <-streamCtx.Done(): return fmt.Errorf("%s: streamCtx while waiting for reader (remote client canceled)", streamCtx.Err()) - case <-i.flowCtx.Done(): + case <-i.flowCtxDone: // The flow context of the inbox host has been canceled. This can occur // e.g. when the query is canceled, or when another stream encountered // an unrecoverable error forcing it to shutdown the flow. @@ -244,18 +266,21 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer // Now wait for one of the events described in the method comment. If a // cancellation is encountered, nothing special must be done to cancel the // reader goroutine as returning from the handler will close the stream. - // - // Note that we don't listen for cancellation on flowCtx.Done() because - // readerCtx must be the child of the flow context. select { case err := <-i.errCh: // nil will be read from errCh when the channel is closed. return err + case <-i.flowCtxDone: + // The flow context of the inbox host has been canceled. This can occur + // e.g. when the query is canceled, or when another stream encountered + // an unrecoverable error forcing it to shutdown the flow. + return cancelchecker.QueryCanceledError case <-readerCtx.Done(): - // The reader canceled the stream meaning that it no longer needs any - // more data from the outbox. This is a graceful termination, so we - // return nil. - return nil + // readerCtx is canceled, but we don't know whether it was because the + // flow context was canceled or for other reason. In the former case we + // have an ungraceful shutdown whereas in the latter case we have a + // graceful one. + return i.checkFlowCtxCancellation() case <-streamCtx.Done(): // The client canceled the stream. return fmt.Errorf("%s: streamCtx in Inbox stream handler (remote client canceled)", streamCtx.Err()) diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 44f70e51dad2..f0e216b39015 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -61,7 +61,7 @@ func TestInboxCancellation(t *testing.T) { typs := []*types.T{types.Int} t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) { - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) // Cancel the context. @@ -78,7 +78,7 @@ func TestInboxCancellation(t *testing.T) { t.Run("DuringRecv", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) @@ -111,7 +111,7 @@ func TestInboxCancellation(t *testing.T) { t.Run("StreamHandlerWaitingForReader", func(t *testing.T) { rpcLayer := makeMockFlowStreamRPCLayer() - inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0)) require.NoError(t, err) ctx, cancelFn := context.WithCancel(context.Background()) @@ -129,7 +129,7 @@ func TestInboxCancellation(t *testing.T) { func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) { defer leaktest.AfterTest(t)() - inbox, err := NewInbox(context.Background(), testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) require.NoError(t, err) rpcLayer := makeMockFlowStreamRPCLayer() @@ -156,7 +156,7 @@ func TestInboxTimeout(t *testing.T) { ctx := context.Background() - inbox, err := NewInbox(ctx, testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) + inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0)) require.NoError(t, err) var ( @@ -259,7 +259,7 @@ func TestInboxShutdown(t *testing.T) { inboxCtx, inboxCancel := context.WithCancel(context.Background()) inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(inboxCtx) - inbox, err := NewInbox(context.Background(), colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) diff --git a/pkg/sql/colflow/colrpc/outbox_test.go b/pkg/sql/colflow/colrpc/outbox_test.go index 59dbbcca9266..179945a716ea 100644 --- a/pkg/sql/colflow/colrpc/outbox_test.go +++ b/pkg/sql/colflow/colrpc/outbox_test.go @@ -55,7 +55,7 @@ func TestOutboxCatchesPanics(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctx) - inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) + inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0)) require.NoError(t, err) streamHandlerErrCh := handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) }) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1f8b574848b4..8ea8e11f81a7 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -380,6 +380,8 @@ type flowCreatorHelper interface { accumulateAsyncComponent(runFn) // addMaterializer adds a materializer to the flow. addMaterializer(*colexec.Materializer) + // getCtxDone returns done channel of the context of this flow. + getFlowCtxDone() <-chan struct{} // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc } @@ -405,7 +407,12 @@ type remoteComponentCreator interface { metadataSources []execinfrapb.MetadataSource, toClose []colexecop.Closer, ) (*colrpc.Outbox, error) - newInbox(ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) + newInbox( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, + ) (*colrpc.Inbox, error) } type vectorizedRemoteComponentCreator struct{} @@ -422,9 +429,12 @@ func (vectorizedRemoteComponentCreator) newOutbox( } func (vectorizedRemoteComponentCreator) newInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + flowCtxDone <-chan struct{}, ) (*colrpc.Inbox, error) { - return colrpc.NewInbox(ctx, allocator, typs, streamID) + return colrpc.NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone) } // vectorizedFlowCreator performs all the setup of vectorized flows. Depending @@ -843,7 +853,8 @@ func (s *vectorizedFlowCreator) setupInput( } inbox, err := s.remoteComponentCreator.newInbox( - ctx, colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID, + colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), + input.ColumnTypes, inputStream.StreamID, s.flowCreatorHelper.getFlowCtxDone(), ) if err != nil { @@ -1332,6 +1343,10 @@ func (r *vectorizedFlowCreatorHelper) addMaterializer(m *colexec.Materializer) { r.f.SetProcessors(r.processors) } +func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return r.f.GetCtxDone() +} + func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return r.f.GetCancelFlowFn() } @@ -1387,6 +1402,10 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {} func (r *noopFlowCreatorHelper) addMaterializer(*colexec.Materializer) {} +func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} { + return nil +} + func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return nil } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 40400e338b19..b206866c1fa8 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -210,7 +210,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxLocal) inbox, err := colrpc.NewInbox( - ctxLocal, colmem.NewAllocator(ctxLocal, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), @@ -325,7 +324,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inboxMemAccount := testMemMonitor.MakeBoundAccount() defer inboxMemAccount.Close(ctxAnotherRemote) inbox, err := colrpc.NewInbox( - ctxAnotherRemote, colmem.NewAllocator(ctxAnotherRemote, &inboxMemAccount, testColumnFactory), typs, execinfrapb.StreamID(streamID), diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 7de96b67df56..600afb4a0fb8 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -52,7 +52,7 @@ func (c callbackRemoteComponentCreator) newOutbox( } func (c callbackRemoteComponentCreator) newInbox( - ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ <-chan struct{}, ) (*colrpc.Inbox, error) { return c.newInboxFn(allocator, typs, streamID) } @@ -207,7 +207,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { return colrpc.NewOutbox(allocator, op, typs, nil /* getStats */, sources, nil /* toClose */) }, newInboxFn: func(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) { - inbox, err := colrpc.NewInbox(context.Background(), allocator, typs, streamID) + inbox, err := colrpc.NewInbox(allocator, typs, streamID) inboxToNumInputTypes[inbox] = typs return inbox, err }, From 57881a3da40f13a68087f0f277762b988303b531 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 17 Dec 2021 10:09:25 -0800 Subject: [PATCH 2/3] colrpc: deflake TestOutboxInbox In `flowCtxCancel` scenario there are two possible errors that might be returned to the reader depending on the exact sequence of events: - `QueryCanceledError` is used when the flow ctx cancellation is observed before the stream arrived - wrapped `context.Canceled` error is used when the inbox handler goroutine notices the cancellation first and ungracefully shuts down the stream. Previously, we assumed that only the latter could occur, and we allow for either. Release note: None --- pkg/sql/colflow/colrpc/colrpc_test.go | 14 ++++++++++++-- pkg/sql/colflow/colrpc/inbox.go | 5 ++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 61b7bb6e89e8..a60199ad62c9 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -414,8 +414,18 @@ func TestOutboxInbox(t *testing.T) { // treated as an ungraceful termination of the stream, so we expect // an error from the stream handler. require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError)) - // The Inbox propagates this cancellation on its host. - require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr) + // The Inbox propagates this cancellation on its host. Depending on + // when the cancellation is noticed by the reader, a different error + // is used, so we allow for both of them. + // + // QueryCanceledError is used when the flow ctx cancellation is + // observed before the stream arrived whereas wrapped + // context.Canceled error is used when the inbox handler goroutine + // notices the cancellation first and ungracefully shuts down the + // stream. + ok := errors.Is(readerErr, cancelchecker.QueryCanceledError) || + testutils.IsError(readerErr, "context canceled") + require.True(t, ok, readerErr) // In the production setup, the watchdog goroutine of the outbox // would receive non-io.EOF error indicating an ungraceful diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index d228aee72afa..79842dec0716 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -198,12 +198,15 @@ func (i *Inbox) init(ctx context.Context) error { i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err) return err case <-ctx.Done(): + // errToThrow is propagated to the reader of the Inbox. + errToThrow := ctx.Err() if err := i.checkFlowCtxCancellation(); err != nil { // This is an ungraceful termination because the flow context has // been canceled. i.errCh <- err + errToThrow = err } - return ctx.Err() + return errToThrow } if i.ctxInterceptorFn != nil { From 83e644d081f87147b9f848269bb084501c299a93 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 21 Dec 2021 10:22:08 -0800 Subject: [PATCH 3/3] colflow: fix the shutdown for good This commit fixes a long standing bug in the distributed vectorized query shutdown where in case of a graceful completion of the flow on one node, we might get an error on another node resulting in the ungraceful termination of the query. This was caused by the fact that on remote nodes the last outbox to exit would cancel the flow context; however, when instantiating `FlowStream` RPC the outboxes used a child context of the flow context, so that "graceful" cancellation of the flow context would cause the inbox to get an ungraceful termination of the gRPC stream. As a result, the whole query could get "context canceled" error. I believe this bug was introduced by me over two years ago because I didn't fully understand how the shutdown should work, and in particular I was missing that when an inbox observes the flow context cancellation, it should terminate the `FlowStream` RPC ungracefully in order to propagate the ungracefullness to the other side of the stream. This shortcoming was fixed in the previous commit. Another possible bug was caused by the outbox canceling its own context in case of a graceful shutdown. As mentioned above, `FlowStream` RPC was issued using the outbox context, so there was a possibility of a race between `CloseSend` call being delivered to the inbox (graceful termination) and the context of the RPC being canceled (ungraceful termination). Both of these problems are now fixed, and the shutdown protocol now is as follows: - on the gateway node we keep on canceling the flow context at the very end of the query execution. It doesn't matter whether the query resulted in an error or not, and doing so allows us to ensure that everything exits on the gateway node. This behavior is already present. - due to the fix in a previous commit, that flow context cancellation terminates ungracefully all still open gRPC streams for `FlowStream` RPC for which the gateway node is the inbox host. - the outboxes on the remote nodes get the ungraceful termination and cancel the flow context of their hosts. This, in turn, would trigger propagation of the ungraceful termination on other gRPC streams, etc. - whenever an outbox exits gracefully, it cancels its own context, but the gRPC stream uses the flow context, so the stream is still alive. I debated a bit whether we want to keep this outbox context cancellation in case of a graceful completion and decided to keep it to minimize the scope of changes. Release note (bug fix): Previously, CockroachDB could return a spurious "context canceled" error for a query that actually succeeded in extremely rare cases, and this has now been fixed. --- pkg/sql/colflow/colrpc/outbox.go | 8 +++++++- pkg/sql/colflow/vectorized_flow.go | 15 --------------- pkg/sql/flowinfra/outbox.go | 2 ++ 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 1bd10f212bd4..9b097e05a3ba 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -152,6 +152,7 @@ func (o *Outbox) Run( flowCtxCancel context.CancelFunc, connectionTimeout time.Duration, ) { + flowCtx := ctx // Derive a child context so that we can cancel all components rooted in // this outbox. var outboxCtxCancel context.CancelFunc @@ -182,7 +183,12 @@ func (o *Outbox) Run( } client := execinfrapb.NewDistSQLClient(conn) - stream, err = client.FlowStream(ctx) + // We use the flow context for the RPC so that when outbox context is + // canceled in case of a graceful shutdown, the gRPC stream keeps on + // running. If, however, the flow context is canceled, then the + // termination of the whole query is ungraceful, so we're ok with the + // gRPC stream being ungracefully shutdown too. + stream, err = client.FlowStream(flowCtx) if err != nil { log.Warningf( ctx, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 8ea8e11f81a7..bcfc4043f77c 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -461,10 +461,6 @@ type vectorizedFlowCreator struct { numOutboxes int32 materializerAdded bool - // numOutboxesExited is an atomic that keeps track of how many outboxes have exited. - // When numOutboxesExited equals numOutboxes, the cancellation function for the flow - // is called. - numOutboxesExited int32 // numOutboxesDrained is an atomic that keeps track of how many outboxes have // been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is // added to a flow-level span. @@ -676,17 +672,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( flowCtxCancel, flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), ) - // When the last Outbox on this node exits, we want to make sure that - // everything is shutdown; namely, we need to call cancelFn if: - // - it is the last Outbox - // - there is no root materializer on this node (if it were, it would take - // care of the cancellation itself) - // - cancelFn is non-nil (it can be nil in tests). - // Calling cancelFn will cancel the context that all infrastructure on this - // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && flowCtxCancel != nil { - flowCtxCancel() - } } s.accumulateAsyncComponent(run) return outbox, nil diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index cc5e1d212ec0..3593900cbde4 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -249,6 +249,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { log.Infof(ctx, "outbox: calling FlowStream") } // The context used here escapes, so it has to be a background context. + // TODO(yuzefovich): the usage of the TODO context here is suspicious. + // Investigate this. m.stream, err = client.FlowStream(context.TODO()) if err != nil { if log.V(1) {