Skip to content

Commit

Permalink
Revert "colrpc: propagate the flow cancellation as ungraceful for Flo…
Browse files Browse the repository at this point in the history
…wStream RPC"

This reverts commit 2203bae.
  • Loading branch information
yuzefovich committed Dec 17, 2021
1 parent 9557b42 commit 48dd74c
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 138 deletions.
89 changes: 37 additions & 52 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,25 @@ func makeMockFlowStreamRPCLayer() mockFlowStreamRPCLayer {
// will call doneFn if non-nil once the handler returns.
func handleStream(
ctx context.Context, inbox *Inbox, stream flowStreamServer, doneFn func(),
) chan error {
return handleStreamWithFlowCtxDone(ctx, inbox, stream, nil /* flowCtxDone */, doneFn)
}

// handleStreamWithFlowCtxDone is the same as handleStream but also takes in
// an optional Done channel for the flow context of the inbox host.
func handleStreamWithFlowCtxDone(
ctx context.Context,
inbox *Inbox,
stream flowStreamServer,
flowCtxDone <-chan struct{},
doneFn func(),
) chan error {
handleStreamErrCh := make(chan error, 1)
if flowCtxDone == nil {
flowCtxDone = make(<-chan struct{})
}
go func() {
handleStreamErrCh <- inbox.RunWithStream(ctx, stream)
handleStreamErrCh <- inbox.RunWithStream(ctx, stream, flowCtxDone)
if doneFn != nil {
doneFn()
}
Expand Down Expand Up @@ -148,12 +163,9 @@ func TestOutboxInbox(t *testing.T) {
// streamCtxCancel models a scenario in which the Outbox host cancels
// the flow.
streamCtxCancel
// flowCtxCancel models a scenario in which the flow context of the
// Inbox host is canceled which is an ungraceful shutdown.
flowCtxCancel
// readerCtxCancel models a scenario in which the consumer of the Inbox
// cancels the context while the host's flow context is not canceled.
// This is considered a graceful termination.
// readerCtxCancel models a scenario in which the Inbox host cancels the
// flow. This is considered a graceful termination, and the flow context
// isn't canceled.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
Expand All @@ -163,19 +175,16 @@ func TestOutboxInbox(t *testing.T) {
cancellationScenarioName string
)
switch randVal := rng.Float64(); {
case randVal <= 0.2:
case randVal <= 0.25:
cancellationScenario = noCancel
cancellationScenarioName = "noCancel"
case randVal <= 0.4:
case randVal <= 0.50:
cancellationScenario = streamCtxCancel
cancellationScenarioName = "streamCtxCancel"
case randVal <= 0.6:
cancellationScenario = flowCtxCancel
cancellationScenarioName = "flowCtxCancel"
case randVal <= 0.8:
case randVal <= 0.75:
cancellationScenario = readerCtxCancel
cancellationScenarioName = "readerCtxCancel"
default:
case randVal <= 1:
cancellationScenario = transportBreaks
cancellationScenarioName = "transportBreaks"
}
Expand Down Expand Up @@ -204,10 +213,10 @@ func TestOutboxInbox(t *testing.T) {
inputBuffer = colexecop.NewBatchBuffer()
// Generate some random behavior before passing the random number
// generator to be used in the Outbox goroutine (to avoid races).
// These sleep variables enable a sleep for up to five milliseconds
// with a .5 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.5
sleepTime = time.Microsecond * time.Duration(rng.Intn(5000))
// These sleep variables enable a sleep for up to half a millisecond
// with a .25 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.25
sleepTime = time.Microsecond * time.Duration(rng.Intn(500))
// stopwatch is used to measure how long it takes for the outbox to
// exit once the transport broke.
stopwatch = timeutil.NewStopWatch()
Expand Down Expand Up @@ -290,8 +299,7 @@ func TestOutboxInbox(t *testing.T) {
wg.Done()
}()

inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background())
readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx)
readerCtx, readerCancelFn := context.WithCancel(context.Background())
wg.Add(1)
go func() {
if sleepBeforeCancellation {
Expand All @@ -301,8 +309,6 @@ func TestOutboxInbox(t *testing.T) {
case noCancel:
case streamCtxCancel:
streamCancelFn()
case flowCtxCancel:
inboxFlowCtxCancelFn()
case readerCtxCancel:
readerCancelFn()
case transportBreaks:
Expand All @@ -315,10 +321,7 @@ func TestOutboxInbox(t *testing.T) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(readerCtx)
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(),
)
inbox, err := NewInbox(colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)

streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })
Expand Down Expand Up @@ -417,28 +420,10 @@ func TestOutboxInbox(t *testing.T) {
// which prompts the watchdog goroutine of the outbox to cancel the
// flow.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1)
case flowCtxCancel:
// If the flow context of the Inbox host gets canceled, it is
// treated as an ungraceful termination of the stream, so we expect
// an error from the stream handler.
require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError))
// The Inbox propagates this cancellation on its host.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)

// In the production setup, the watchdog goroutine of the outbox
// would receive non-io.EOF error indicating an ungraceful
// completion of the FlowStream RPC call which would prompt the
// outbox to cancel the whole flow.
//
// However, because we're using a mock server, the propagation
// doesn't take place, so the flow context on the outbox side should
// not be canceled.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled while the inbox's host flow
// context doesn't, it is treated as a graceful termination of the
// stream, so we expect no error from the stream handler.
// If the reader context gets canceled, it is treated as a graceful
// termination of the stream, so we expect no error from the stream
// handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)
Expand Down Expand Up @@ -520,17 +505,17 @@ func TestInboxHostCtxCancellation(t *testing.T) {
inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background())
inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(inboxHostCtx)
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxHostCtx.Done(),
)
inbox, err := NewInbox(colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)

// Spawn up the stream handler (a separate goroutine) for the server side
// of the FlowStream RPC.
serverStreamNotification := <-mockServer.InboundStreams
serverStream := serverStreamNotification.Stream
streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })
streamHandlerErrCh := handleStreamWithFlowCtxDone(
serverStream.Context(), inbox, serverStream,
inboxHostCtx.Done(), func() { close(serverStreamNotification.Donec) },
)

// Here is the meat of the test - the inbox is never initialized, and,
// instead, the inbox host's flow context is canceled after some delay.
Expand Down
77 changes: 22 additions & 55 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ type Inbox struct {
// goroutine should exit while waiting for a stream.
timeoutCh chan error

// flowCtxDone is the Done() channel of the flow context of the Inbox host.
flowCtxDone <-chan struct{}

// errCh is the channel that RunWithStream will block on, waiting until the
// Inbox does not need a stream any more. An error will only be sent on this
// channel in the event of a cancellation or a non-io.EOF error originating
Expand Down Expand Up @@ -158,33 +155,16 @@ func NewInbox(
return i, nil
}

// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow
// context is available.
func NewInboxWithFlowCtxDone(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.flowCtxDone = flowCtxDone
return i, nil
}

// NewInboxWithAdmissionControl creates a new Inbox that does admission
// control on responses received from DistSQL.
func NewInboxWithAdmissionControl(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionQ *admission.WorkQueue,
admissionInfo admission.WorkInfo,
) (*Inbox, error) {
i, err := NewInboxWithFlowCtxDone(allocator, typs, streamID, flowCtxDone)
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
Expand All @@ -205,22 +185,15 @@ func (i *Inbox) close() {
}
}

// checkFlowCtxCancellation returns an error if the flow context has already
// been canceled.
func (i *Inbox) checkFlowCtxCancellation() error {
select {
case <-i.flowCtxDone:
return cancelchecker.QueryCanceledError
default:
return nil
}
}

// RunWithStream sets the Inbox's stream and waits until either streamCtx is
// canceled, the Inbox's host cancels the flow context, a caller of Next cancels
// the context passed into Init, or any error is encountered on the stream by
// the Next goroutine.
func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error {
// canceled, a caller of Next cancels the context passed into Init, or any error
// is encountered on the stream by the Next goroutine.
//
// flowCtxDone is listened on only during the setup of the handler, before the
// readerCtx is received. This is needed in case Inbox.Init is never called.
func (i *Inbox) RunWithStream(
streamCtx context.Context, stream flowStreamServer, flowCtxDone <-chan struct{},
) error {
streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID)
log.VEvent(streamCtx, 2, "Inbox handling stream")
defer log.VEvent(streamCtx, 2, "Inbox exited stream handler")
Expand All @@ -236,7 +209,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
log.VEvent(streamCtx, 2, "Inbox reader arrived")
case <-streamCtx.Done():
return errors.Wrap(streamCtx.Err(), "streamCtx error while waiting for reader (remote client canceled)")
case <-i.flowCtxDone:
case <-flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
Expand All @@ -246,21 +219,18 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
// Now wait for one of the events described in the method comment. If a
// cancellation is encountered, nothing special must be done to cancel the
// reader goroutine as returning from the handler will close the stream.
//
// Note that we don't listen for cancellation on flowCtxDone because
// readerCtx must be the child of the flow context.
select {
case err := <-i.errCh:
// nil will be read from errCh when the channel is closed.
return err
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
return cancelchecker.QueryCanceledError
case <-readerCtx.Done():
// readerCtx is canceled, but we don't know whether it was because the
// flow context was canceled or for other reason. In the former case we
// have an ungraceful shutdown whereas in the latter case we have a
// graceful one.
return i.checkFlowCtxCancellation()
// The reader canceled the stream meaning that it no longer needs any
// more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
case <-streamCtx.Done():
// The client canceled the stream.
return errors.Wrap(streamCtx.Err(), "streamCtx error in Inbox stream handler (remote client canceled)")
Expand Down Expand Up @@ -290,15 +260,12 @@ func (i *Inbox) Init(ctx context.Context) {
case err := <-i.timeoutCh:
i.errCh <- errors.Wrap(err, "remote stream arrived too late")
return err
case <-i.flowCtxDone:
i.errCh <- cancelchecker.QueryCanceledError
return cancelchecker.QueryCanceledError
case <-i.Ctx.Done():
if err := i.checkFlowCtxCancellation(); err != nil {
// This is an ungraceful termination because the flow context
// has been canceled.
i.errCh <- err
}
// Our reader canceled the context meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so
// we don't send any error on errCh and only return an error. This
// will close the inbox (making the stream handler exit gracefully)
// and will stop the current goroutine from proceeding further.
return i.Ctx.Err()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestInboxCancellation(t *testing.T) {
err = colexecerror.CatchVectorizedRuntimeError(func() { inbox.Init(ctx) })
require.True(t, testutils.IsError(err, "context canceled"), err)
// Now, the remote stream arrives.
err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{})
err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{}, make(<-chan struct{}))
// We expect no error from the stream handler since we canceled it
// ourselves (a graceful termination).
require.Nil(t, err)
Expand Down
29 changes: 5 additions & 24 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,6 @@ type flowCreatorHelper interface {
// addFlowCoordinator adds the FlowCoordinator to the flow. This is only
// done on the gateway node.
addFlowCoordinator(coordinator *FlowCoordinator)
// getCtxDone returns done channel of the context of this flow.
getFlowCtxDone() <-chan struct{}
// getCancelFlowFn returns a flow cancellation function.
getCancelFlowFn() context.CancelFunc
}
Expand All @@ -473,13 +471,8 @@ type remoteComponentCreator interface {
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
) (*colrpc.Outbox, error)
newInbox(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionOpts admissionOptions,
) (*colrpc.Inbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
admissionOpts admissionOptions) (*colrpc.Inbox, error)
}

type vectorizedRemoteComponentCreator struct{}
Expand All @@ -497,13 +490,10 @@ func (vectorizedRemoteComponentCreator) newInbox(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
admissionOpts admissionOptions,
) (*colrpc.Inbox, error) {
return colrpc.NewInboxWithAdmissionControl(
allocator, typs, streamID, flowCtxDone,
admissionOpts.admissionQ, admissionOpts.admissionInfo,
)
allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo)
}

// vectorizedFlowCreator performs all the setup of vectorized flows. Depending
Expand Down Expand Up @@ -855,7 +845,6 @@ func (s *vectorizedFlowCreator) setupInput(
colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory),
input.ColumnTypes,
inputStream.StreamID,
s.flowCreatorHelper.getFlowCtxDone(),
admissionOptions{
admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ,
admissionInfo: s.admissionInfo,
Expand Down Expand Up @@ -1223,9 +1212,9 @@ func (s vectorizedInboundStreamHandler) Run(
ctx context.Context,
stream execinfrapb.DistSQL_FlowStreamServer,
_ *execinfrapb.ProducerMessage,
_ *flowinfra.FlowBase,
f *flowinfra.FlowBase,
) error {
return s.RunWithStream(ctx, stream)
return s.RunWithStream(ctx, stream, f.GetCtxDone())
}

// Timeout is part of the flowinfra.InboundStreamHandler interface.
Expand Down Expand Up @@ -1289,10 +1278,6 @@ func (r *vectorizedFlowCreatorHelper) addFlowCoordinator(f *FlowCoordinator) {
r.f.SetProcessors(r.processors)
}

func (r *vectorizedFlowCreatorHelper) getFlowCtxDone() <-chan struct{} {
return r.f.GetCtxDone()
}

func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return r.f.GetCancelFlowFn()
}
Expand Down Expand Up @@ -1348,10 +1333,6 @@ func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {}

func (r *noopFlowCreatorHelper) addFlowCoordinator(coordinator *FlowCoordinator) {}

func (r *noopFlowCreatorHelper) getFlowCtxDone() <-chan struct{} {
return nil
}

func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return nil
}
Expand Down
Loading

0 comments on commit 48dd74c

Please sign in to comment.