Skip to content

Commit

Permalink
Merge pull request #73959 from yuzefovich/backport21.1-73887
Browse files Browse the repository at this point in the history
release-21.1: colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC
  • Loading branch information
yuzefovich authored Dec 28, 2021
2 parents 7b787af + 83e644d commit 697d711
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 77 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util/cancelchecker",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
81 changes: 61 additions & 20 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -146,9 +147,12 @@ func TestOutboxInbox(t *testing.T) {
// streamCtxCancel models a scenario in which the Outbox host cancels
// the flow.
streamCtxCancel
// readerCtxCancel models a scenario in which the Inbox host cancels the
// flow. This is considered a graceful termination, and the flow context
// isn't canceled.
// flowCtxCancel models a scenario in which the flow context of the
// Inbox host is canceled which is an ungraceful shutdown.
flowCtxCancel
// readerCtxCancel models a scenario in which the consumer of the Inbox
// cancels the context while the host's flow context is not canceled.
// This is considered a graceful termination.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
Expand All @@ -158,16 +162,19 @@ func TestOutboxInbox(t *testing.T) {
cancellationScenarioName string
)
switch randVal := rng.Float64(); {
case randVal <= 0.25:
case randVal <= 0.2:
cancellationScenario = noCancel
cancellationScenarioName = "noCancel"
case randVal <= 0.50:
case randVal <= 0.4:
cancellationScenario = streamCtxCancel
cancellationScenarioName = "streamCtxCancel"
case randVal <= 0.75:
case randVal <= 0.6:
cancellationScenario = flowCtxCancel
cancellationScenarioName = "flowCtxCancel"
case randVal <= 0.8:
cancellationScenario = readerCtxCancel
cancellationScenarioName = "readerCtxCancel"
case randVal <= 1:
default:
cancellationScenario = transportBreaks
cancellationScenarioName = "transportBreaks"
}
Expand Down Expand Up @@ -196,10 +203,10 @@ func TestOutboxInbox(t *testing.T) {
inputBuffer = colexecop.NewBatchBuffer()
// Generate some random behavior before passing the random number
// generator to be used in the Outbox goroutine (to avoid races).
// These sleep variables enable a sleep for up to half a millisecond
// with a .25 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.25
sleepTime = time.Microsecond * time.Duration(rng.Intn(500))
// These sleep variables enable a sleep for up to five milliseconds
// with a .5 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.5
sleepTime = time.Microsecond * time.Duration(rng.Intn(5000))
// stopwatch is used to measure how long it takes for the outbox to
// exit once the transport broke.
stopwatch = timeutil.NewStopWatch()
Expand Down Expand Up @@ -250,9 +257,14 @@ func TestOutboxInbox(t *testing.T) {
)
require.NoError(t, err)

inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background())
readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx)
inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
defer inboxMemAcc.Close(readerCtx)
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(),
)
require.NoError(t, err)

streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })
Expand Down Expand Up @@ -283,7 +295,6 @@ func TestOutboxInbox(t *testing.T) {
wg.Done()
}()

readerCtx, readerCancelFn := context.WithCancel(ctx)
wg.Add(1)
go func() {
if sleepBeforeCancellation {
Expand All @@ -293,6 +304,8 @@ func TestOutboxInbox(t *testing.T) {
case noCancel:
case streamCtxCancel:
streamCancelFn()
case flowCtxCancel:
inboxFlowCtxCancelFn()
case readerCtxCancel:
readerCancelFn()
case transportBreaks:
Expand Down Expand Up @@ -396,10 +409,38 @@ func TestOutboxInbox(t *testing.T) {
// which prompts the watchdog goroutine of the outbox to cancel the
// flow.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1)
case flowCtxCancel:
// If the flow context of the Inbox host gets canceled, it is
// treated as an ungraceful termination of the stream, so we expect
// an error from the stream handler.
require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError))
// The Inbox propagates this cancellation on its host. Depending on
// when the cancellation is noticed by the reader, a different error
// is used, so we allow for both of them.
//
// QueryCanceledError is used when the flow ctx cancellation is
// observed before the stream arrived whereas wrapped
// context.Canceled error is used when the inbox handler goroutine
// notices the cancellation first and ungracefully shuts down the
// stream.
ok := errors.Is(readerErr, cancelchecker.QueryCanceledError) ||
testutils.IsError(readerErr, "context canceled")
require.True(t, ok, readerErr)

// In the production setup, the watchdog goroutine of the outbox
// would receive non-io.EOF error indicating an ungraceful
// completion of the FlowStream RPC call which would prompt the
// outbox to cancel the whole flow.
//
// However, because we're using a mock server, the propagation
// doesn't take place, so the flow context on the outbox side should
// not be canceled.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled, it is treated as a graceful
// termination of the stream, so we expect no error from the stream
// handler.
// If the reader context gets canceled while the inbox's host flow
// context doesn't, it is treated as a graceful termination of the
// stream, so we expect no error from the stream handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)
Expand Down Expand Up @@ -560,7 +601,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)

var (
Expand Down Expand Up @@ -642,7 +683,7 @@ func BenchmarkOutboxInbox(b *testing.B) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(b, err)

var wg sync.WaitGroup
Expand Down Expand Up @@ -766,7 +807,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) {

typs := []*types.T{types.Int}

inbox, err := NewInbox(ctx, testAllocator, typs, streamID)
inbox, err := NewInbox(testAllocator, typs, streamID)
require.NoError(t, err)

ctxExtract := make(chan struct{})
Expand Down
80 changes: 54 additions & 26 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Inbox struct {
// goroutine should exit while waiting for a stream.
timeoutCh chan error

// flowCtxDone is the Done() channel of the flow context of the Inbox host.
flowCtxDone <-chan struct{}

// errCh is the channel that RunWithStream will block on, waiting until the
// Inbox does not need a stream any more. An error will only be sent on this
// channel in the event of a cancellation or a non-io.EOF error originating
Expand All @@ -99,14 +102,6 @@ type Inbox struct {
// only the Next/DrainMeta goroutine may access it.
stream flowStreamServer

// flowCtx is a temporary field that captures a flow's context during
// initialization. This is so that RunWithStream can listen for cancellation
// even in the case in which Next is not called (e.g. in cases where the Inbox
// is the left side of a HashJoiner). The best solution for this problem would
// be to refactor Operator.Init to accept a context since that must be called
// regardless of whether or not Next is called.
flowCtx context.Context

// statsAtomics are the execution statistics that need to be atomically
// accessed. This is necessary since Get*() methods can be called from
// different goroutine than Next().
Expand Down Expand Up @@ -134,7 +129,7 @@ var _ colexecop.Operator = &Inbox{}

// NewInbox creates a new Inbox.
func NewInbox(
ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
) (*Inbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
Expand All @@ -154,13 +149,28 @@ func NewInbox(
contextCh: make(chan context.Context, 1),
timeoutCh: make(chan error, 1),
errCh: make(chan error, 1),
flowCtx: ctx,
deserializationStopWatch: timeutil.NewStopWatch(),
}
i.scratch.data = make([]*array.Data, len(typs))
return i, nil
}

// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow
// context is available.
func NewInboxWithFlowCtxDone(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.flowCtxDone = flowCtxDone
return i, nil
}

// maybeInit calls Inbox.init if the inbox is not initialized and returns an
// error if the initialization was not successful. Usually this is because the
// given context is canceled before the remote stream arrives.
Expand Down Expand Up @@ -188,12 +198,15 @@ func (i *Inbox) init(ctx context.Context) error {
i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err)
return err
case <-ctx.Done():
// Our reader canceled the context meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so
// we don't send any error on errCh and only return an error. This
// will close the inbox (making the stream handler exit gracefully)
// and will stop the current goroutine from proceeding further.
return ctx.Err()
// errToThrow is propagated to the reader of the Inbox.
errToThrow := ctx.Err()
if err := i.checkFlowCtxCancellation(); err != nil {
// This is an ungraceful termination because the flow context has
// been canceled.
i.errCh <- err
errToThrow = err
}
return errToThrow
}

if i.ctxInterceptorFn != nil {
Expand All @@ -215,9 +228,21 @@ func (i *Inbox) close() {
}
}

// checkFlowCtxCancellation returns an error if the flow context has already
// been canceled.
func (i *Inbox) checkFlowCtxCancellation() error {
select {
case <-i.flowCtxDone:
return cancelchecker.QueryCanceledError
default:
return nil
}
}

// RunWithStream sets the Inbox's stream and waits until either streamCtx is
// canceled, a caller of Next cancels the first context passed into Next, or any
// error is encountered on the stream by the Next goroutine.
// canceled, the Inbox's host cancels the flow context, a caller of Next cancels
// the context passed into Init, or any error is encountered on the stream by
// the Next goroutine.
func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error {
streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID)
log.VEvent(streamCtx, 2, "Inbox handling stream")
Expand All @@ -234,7 +259,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
log.VEvent(streamCtx, 2, "Inbox reader arrived")
case <-streamCtx.Done():
return fmt.Errorf("%s: streamCtx while waiting for reader (remote client canceled)", streamCtx.Err())
case <-i.flowCtx.Done():
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
Expand All @@ -244,18 +269,21 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
// Now wait for one of the events described in the method comment. If a
// cancellation is encountered, nothing special must be done to cancel the
// reader goroutine as returning from the handler will close the stream.
//
// Note that we don't listen for cancellation on flowCtx.Done() because
// readerCtx must be the child of the flow context.
select {
case err := <-i.errCh:
// nil will be read from errCh when the channel is closed.
return err
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
return cancelchecker.QueryCanceledError
case <-readerCtx.Done():
// The reader canceled the stream meaning that it no longer needs any
// more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
// readerCtx is canceled, but we don't know whether it was because the
// flow context was canceled or for other reason. In the former case we
// have an ungraceful shutdown whereas in the latter case we have a
// graceful one.
return i.checkFlowCtxCancellation()
case <-streamCtx.Done():
// The client canceled the stream.
return fmt.Errorf("%s: streamCtx in Inbox stream handler (remote client canceled)", streamCtx.Err())
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestInboxCancellation(t *testing.T) {

typs := []*types.T{types.Int}
t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) {
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)
ctx, cancelFn := context.WithCancel(context.Background())
// Cancel the context.
Expand All @@ -78,7 +78,7 @@ func TestInboxCancellation(t *testing.T) {

t.Run("DuringRecv", func(t *testing.T) {
rpcLayer := makeMockFlowStreamRPCLayer()
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)
ctx, cancelFn := context.WithCancel(context.Background())

Expand Down Expand Up @@ -111,7 +111,7 @@ func TestInboxCancellation(t *testing.T) {

t.Run("StreamHandlerWaitingForReader", func(t *testing.T) {
rpcLayer := makeMockFlowStreamRPCLayer()
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)

ctx, cancelFn := context.WithCancel(context.Background())
Expand All @@ -129,7 +129,7 @@ func TestInboxCancellation(t *testing.T) {
func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
defer leaktest.AfterTest(t)()

inbox, err := NewInbox(context.Background(), testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
require.NoError(t, err)

rpcLayer := makeMockFlowStreamRPCLayer()
Expand All @@ -156,7 +156,7 @@ func TestInboxTimeout(t *testing.T) {

ctx := context.Background()

inbox, err := NewInbox(ctx, testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
require.NoError(t, err)

var (
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestInboxShutdown(t *testing.T) {
inboxCtx, inboxCancel := context.WithCancel(context.Background())
inboxMemAccount := testMemMonitor.MakeBoundAccount()
defer inboxMemAccount.Close(inboxCtx)
inbox, err := NewInbox(context.Background(), colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)
c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (o *Outbox) Run(
flowCtxCancel context.CancelFunc,
connectionTimeout time.Duration,
) {
flowCtx := ctx
// Derive a child context so that we can cancel all components rooted in
// this outbox.
var outboxCtxCancel context.CancelFunc
Expand Down Expand Up @@ -182,7 +183,12 @@ func (o *Outbox) Run(
}

client := execinfrapb.NewDistSQLClient(conn)
stream, err = client.FlowStream(ctx)
// We use the flow context for the RPC so that when outbox context is
// canceled in case of a graceful shutdown, the gRPC stream keeps on
// running. If, however, the flow context is canceled, then the
// termination of the whole query is ungraceful, so we're ok with the
// gRPC stream being ungracefully shutdown too.
stream, err = client.FlowStream(flowCtx)
if err != nil {
log.Warningf(
ctx,
Expand Down
Loading

0 comments on commit 697d711

Please sign in to comment.