Skip to content

Commit

Permalink
colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC
Browse files Browse the repository at this point in the history
This commit fixes an oversight in the cancellation protocol of the
vectorized inbox/outbox communication. Previously, when the flow context
of the inbox host has been canceled (indicating that the whole query
should be canceled) we would propagate it as a graceful completion of
the `FlowStream` RPC which would result in the outbox cancelling only
its own subtree on the remote node. However, what we ought to do is to
propagate such cancellation as the ungraceful RPC completion so that the
outbox would also cancel the flow context of its own host.

In some rare cases the old behavior could result in some flows being
stuck forever (until a node is restarted) because they would get blocked
on producing the data when their consumer has already exited.

The behavior in this fix is what we already have in place for the
row-by-row engine (see `processInboundStreamHelper` in
`flowinfra/inbound.go`).

Release note (bug fix): The bug with the ungraceful shutdown of the
distributed queries in some rare cases has been fixed. "Ungraceful" here
means because of the `statement_timeout` (most likely) or because a node
crashed.
  • Loading branch information
yuzefovich committed Dec 16, 2021
1 parent 0f69631 commit 2203bae
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 67 deletions.
89 changes: 52 additions & 37 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,10 @@ func makeMockFlowStreamRPCLayer() mockFlowStreamRPCLayer {
// will call doneFn if non-nil once the handler returns.
func handleStream(
ctx context.Context, inbox *Inbox, stream flowStreamServer, doneFn func(),
) chan error {
return handleStreamWithFlowCtxDone(ctx, inbox, stream, nil /* flowCtxDone */, doneFn)
}

// handleStreamWithFlowCtxDone is the same as handleStream but also takes in
// an optional Done channel for the flow context of the inbox host.
func handleStreamWithFlowCtxDone(
ctx context.Context,
inbox *Inbox,
stream flowStreamServer,
flowCtxDone <-chan struct{},
doneFn func(),
) chan error {
handleStreamErrCh := make(chan error, 1)
if flowCtxDone == nil {
flowCtxDone = make(<-chan struct{})
}
go func() {
handleStreamErrCh <- inbox.RunWithStream(ctx, stream, flowCtxDone)
handleStreamErrCh <- inbox.RunWithStream(ctx, stream)
if doneFn != nil {
doneFn()
}
Expand Down Expand Up @@ -163,9 +148,12 @@ func TestOutboxInbox(t *testing.T) {
// streamCtxCancel models a scenario in which the Outbox host cancels
// the flow.
streamCtxCancel
// readerCtxCancel models a scenario in which the Inbox host cancels the
// flow. This is considered a graceful termination, and the flow context
// isn't canceled.
// flowCtxCancel models a scenario in which the flow context of the
// Inbox host is canceled which is an ungraceful shutdown.
flowCtxCancel
// readerCtxCancel models a scenario in which the consumer of the Inbox
// cancels the context while the host's flow context is not canceled.
// This is considered a graceful termination.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
Expand All @@ -175,16 +163,19 @@ func TestOutboxInbox(t *testing.T) {
cancellationScenarioName string
)
switch randVal := rng.Float64(); {
case randVal <= 0.25:
case randVal <= 0.2:
cancellationScenario = noCancel
cancellationScenarioName = "noCancel"
case randVal <= 0.50:
case randVal <= 0.4:
cancellationScenario = streamCtxCancel
cancellationScenarioName = "streamCtxCancel"
case randVal <= 0.75:
case randVal <= 0.6:
cancellationScenario = flowCtxCancel
cancellationScenarioName = "flowCtxCancel"
case randVal <= 0.8:
cancellationScenario = readerCtxCancel
cancellationScenarioName = "readerCtxCancel"
case randVal <= 1:
default:
cancellationScenario = transportBreaks
cancellationScenarioName = "transportBreaks"
}
Expand Down Expand Up @@ -213,10 +204,10 @@ func TestOutboxInbox(t *testing.T) {
inputBuffer = colexecop.NewBatchBuffer()
// Generate some random behavior before passing the random number
// generator to be used in the Outbox goroutine (to avoid races).
// These sleep variables enable a sleep for up to half a millisecond
// with a .25 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.25
sleepTime = time.Microsecond * time.Duration(rng.Intn(500))
// These sleep variables enable a sleep for up to five milliseconds
// with a .5 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.5
sleepTime = time.Microsecond * time.Duration(rng.Intn(5000))
// stopwatch is used to measure how long it takes for the outbox to
// exit once the transport broke.
stopwatch = timeutil.NewStopWatch()
Expand Down Expand Up @@ -299,7 +290,8 @@ func TestOutboxInbox(t *testing.T) {
wg.Done()
}()

readerCtx, readerCancelFn := context.WithCancel(context.Background())
inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background())
readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx)
wg.Add(1)
go func() {
if sleepBeforeCancellation {
Expand All @@ -309,6 +301,8 @@ func TestOutboxInbox(t *testing.T) {
case noCancel:
case streamCtxCancel:
streamCancelFn()
case flowCtxCancel:
inboxFlowCtxCancelFn()
case readerCtxCancel:
readerCancelFn()
case transportBreaks:
Expand All @@ -321,7 +315,10 @@ func TestOutboxInbox(t *testing.T) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(readerCtx)
inbox, err := NewInbox(colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(),
)
require.NoError(t, err)

streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })
Expand Down Expand Up @@ -420,10 +417,28 @@ func TestOutboxInbox(t *testing.T) {
// which prompts the watchdog goroutine of the outbox to cancel the
// flow.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1)
case flowCtxCancel:
// If the flow context of the Inbox host gets canceled, it is
// treated as an ungraceful termination of the stream, so we expect
// an error from the stream handler.
require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError))
// The Inbox propagates this cancellation on its host.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)

// In the production setup, the watchdog goroutine of the outbox
// would receive non-io.EOF error indicating an ungraceful
// completion of the FlowStream RPC call which would prompt the
// outbox to cancel the whole flow.
//
// However, because we're using a mock server, the propagation
// doesn't take place, so the flow context on the outbox side should
// not be canceled.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled, it is treated as a graceful
// termination of the stream, so we expect no error from the stream
// handler.
// If the reader context gets canceled while the inbox's host flow
// context doesn't, it is treated as a graceful termination of the
// stream, so we expect no error from the stream handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)
Expand Down Expand Up @@ -505,17 +520,17 @@ func TestInboxHostCtxCancellation(t *testing.T) {
inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background())
inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(inboxHostCtx)
inbox, err := NewInbox(colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxHostCtx.Done(),
)
require.NoError(t, err)

// Spawn up the stream handler (a separate goroutine) for the server side
// of the FlowStream RPC.
serverStreamNotification := <-mockServer.InboundStreams
serverStream := serverStreamNotification.Stream
streamHandlerErrCh := handleStreamWithFlowCtxDone(
serverStream.Context(), inbox, serverStream,
inboxHostCtx.Done(), func() { close(serverStreamNotification.Donec) },
)
streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })

// Here is the meat of the test - the inbox is never initialized, and,
// instead, the inbox host's flow context is canceled after some delay.
Expand Down
77 changes: 55 additions & 22 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Inbox struct {
// goroutine should exit while waiting for a stream.
timeoutCh chan error

// flowCtxDone is the Done() channel of the flow context of the Inbox host.
flowCtxDone <-chan struct{}

// errCh is the channel that RunWithStream will block on, waiting until the
// Inbox does not need a stream any more. An error will only be sent on this
// channel in the event of a cancellation or a non-io.EOF error originating
Expand Down Expand Up @@ -155,16 +158,33 @@ func NewInbox(
return i, nil
}

// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow
// context is available.
func NewInboxWithFlowCtxDone(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.flowCtxDone = flowCtxDone
return i, nil
}

// NewInboxWithAdmissionControl creates a new Inbox that does admission
// control on responses received from DistSQL.
func NewInboxWithAdmissionControl(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionQ *admission.WorkQueue,
admissionInfo admission.WorkInfo,
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone)
if err != nil {
return nil, err
}
Expand All @@ -185,15 +205,22 @@ func (i *Inbox) close() {
}
}

// checkFlowCtxCancellation returns an error if the flow context has already
// been canceled.
func (i *Inbox) checkFlowCtxCancellation() error {
select {
case <-i.flowCtxDone:
return cancelchecker.QueryCanceledError
default:
return nil
}
}

// RunWithStream sets the Inbox's stream and waits until either streamCtx is
// canceled, a caller of Next cancels the context passed into Init, or any error
// is encountered on the stream by the Next goroutine.
//
// flowCtxDone is listened on only during the setup of the handler, before the
// readerCtx is received. This is needed in case Inbox.Init is never called.
func (i *Inbox) RunWithStream(
streamCtx context.Context, stream flowStreamServer, flowCtxDone <-chan struct{},
) error {
// canceled, the Inbox's host cancels the flow context, a caller of Next cancels
// the context passed into Init, or any error is encountered on the stream by
// the Next goroutine.
func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error {
streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID)
log.VEvent(streamCtx, 2, "Inbox handling stream")
defer log.VEvent(streamCtx, 2, "Inbox exited stream handler")
Expand All @@ -209,7 +236,7 @@ func (i *Inbox) RunWithStream(
log.VEvent(streamCtx, 2, "Inbox reader arrived")
case <-streamCtx.Done():
return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)")
case <-flowCtxDone:
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
Expand All @@ -219,18 +246,21 @@ func (i *Inbox) RunWithStream(
// Now wait for one of the events described in the method comment. If a
// cancellation is encountered, nothing special must be done to cancel the
// reader goroutine as returning from the handler will close the stream.
//
// Note that we don't listen for cancellation on flowCtxDone because
// readerCtx must be the child of the flow context.
select {
case err := <-i.errCh:
// nil will be read from errCh when the channel is closed.
return err
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
return cancelchecker.QueryCanceledError
case <-readerCtx.Done():
// The reader canceled the stream meaning that it no longer needs any
// more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
// readerCtx is canceled, but we don't know whether it was because the
// flow context was canceled or for other reason. In the former case we
// have an ungraceful shutdown whereas in the latter case we have a
// graceful one.
return i.checkFlowCtxCancellation()
case <-streamCtx.Done():
// The client canceled the stream.
return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)")
Expand Down Expand Up @@ -260,12 +290,15 @@ func (i *Inbox) Init(ctx context.Context) {
case err := <-i.timeoutCh:
i.errCh <- errors.Wrap(err, "remote stream arrived too late")
return err
case <-i.flowCtxDone:
i.errCh <- cancelchecker.QueryCanceledError
return cancelchecker.QueryCanceledError
case <-i.Ctx.Done():
// Our reader canceled the context meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so
// we don't send any error on errCh and only return an error. This
// will close the inbox (making the stream handler exit gracefully)
// and will stop the current goroutine from proceeding further.
if err := i.checkFlowCtxCancellation(); err != nil {
// This is an ungraceful termination because the flow context
// has been canceled.
i.errCh <- err
}
return i.Ctx.Err()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestInboxCancellation(t *testing.T) {
err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) })
require.True(t, testutils.IsError(err, "context canceled"), err)
// Now, the remote stream arrives.
err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}, make(<-chan struct{}))
err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{})
// We expect no error from the stream handler since we canceled it
// ourselves (a graceful termination).
require.Nil(t, err)
Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ type flowCreatorHelper interface {
// addFlowCoordinator adds the FlowCoordinator to the flow. This is only
// done on the gateway node.
addFlowCoordinator(coordinator *FlowCoordinator)
// getCtxDone returns done channel of the context of this flow.
getFlowCtxDone() <-chan struct{}
// getCancelFlowFn returns a flow cancellation function.
getCancelFlowFn() context.CancelFunc
}
Expand All @@ -471,8 +473,13 @@ type remoteComponentCreator interface {
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
) (*colrpc.Outbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
admissionOpts admissionOptions) (*colrpc.Inbox, error)
newInbox(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionOpts admissionOptions,
) (*colrpc.Inbox, error)
}

type vectorizedRemoteComponentCreator struct{}
Expand All @@ -490,10 +497,13 @@ func (vectorizedRemoteComponentCreator) newInbox(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionOpts admissionOptions,
) (*colrpc.Inbox, error) {
return colrpc.NewInboxWithAdmissionControl(
allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo)
allocator, typs, streamID, flowCtxDone,
admissionOpts.admissionQ, admissionOpts.admissionInfo,
)
}

// vectorizedFlowCreator performs all the setup of vectorized flows. Depending
Expand Down Expand Up @@ -845,6 +855,7 @@ func (s *vectorizedFlowCreator) setupInput(
colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory),
input.ColumnTypes,
inputStream.StreamID,
s.flowCreatorHelper.getFlowCtxDone(),
admissionOptions{
admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ,
admissionInfo: s.admissionInfo,
Expand Down Expand Up @@ -1212,9 +1223,9 @@ func (s vectorizedInboundStreamHandler) Run(
ctx context.Context,
stream execinfrapb.DistSQL_FlowStreamServer,
_ *execinfrapb.ProducerMessage,
f *flowinfra.FlowBase,
_ *flowinfra.FlowBase,
) error {
return s.RunWithStream(ctx, stream, f.GetCtxDone())
return s.RunWithStream(ctx, stream)
}

// Timeout is part of the flowinfra.InboundStreamHandler interface.
Expand Down Expand Up @@ -1278,6 +1289,10 @@ func (r *vectorizedFlowCreatorHelper) addFlowCoordinator(f *FlowCoordinator) {
r.f.SetProcessors(r.processors)
}

func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} {
return r.f.GetCtxDone()
}

func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return r.f.GetCancelFlowFn()
}
Expand Down Expand Up @@ -1333,6 +1348,10 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {}

func (r *noopFlowCreatorHelper) addFlowCoordinator(coordinator *FlowCoordinator) {}

func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} {
return nil
}

func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return nil
}
Expand Down
Loading

0 comments on commit 2203bae

Please sign in to comment.