Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: colrpc: propagate the flow cancellation as ungraceful for FlowStream RPC #73959

Merged
merged 3 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util/cancelchecker",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
81 changes: 61 additions & 20 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -146,9 +147,12 @@ func TestOutboxInbox(t *testing.T) {
// streamCtxCancel models a scenario in which the Outbox host cancels
// the flow.
streamCtxCancel
// readerCtxCancel models a scenario in which the Inbox host cancels the
// flow. This is considered a graceful termination, and the flow context
// isn't canceled.
// flowCtxCancel models a scenario in which the flow context of the
// Inbox host is canceled which is an ungraceful shutdown.
flowCtxCancel
// readerCtxCancel models a scenario in which the consumer of the Inbox
// cancels the context while the host's flow context is not canceled.
// This is considered a graceful termination.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
Expand All @@ -158,16 +162,19 @@ func TestOutboxInbox(t *testing.T) {
cancellationScenarioName string
)
switch randVal := rng.Float64(); {
case randVal <= 0.25:
case randVal <= 0.2:
cancellationScenario = noCancel
cancellationScenarioName = "noCancel"
case randVal <= 0.50:
case randVal <= 0.4:
cancellationScenario = streamCtxCancel
cancellationScenarioName = "streamCtxCancel"
case randVal <= 0.75:
case randVal <= 0.6:
cancellationScenario = flowCtxCancel
cancellationScenarioName = "flowCtxCancel"
case randVal <= 0.8:
cancellationScenario = readerCtxCancel
cancellationScenarioName = "readerCtxCancel"
case randVal <= 1:
default:
cancellationScenario = transportBreaks
cancellationScenarioName = "transportBreaks"
}
Expand Down Expand Up @@ -196,10 +203,10 @@ func TestOutboxInbox(t *testing.T) {
inputBuffer = colexecop.NewBatchBuffer()
// Generate some random behavior before passing the random number
// generator to be used in the Outbox goroutine (to avoid races).
// These sleep variables enable a sleep for up to half a millisecond
// with a .25 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.25
sleepTime = time.Microsecond * time.Duration(rng.Intn(500))
// These sleep variables enable a sleep for up to five milliseconds
// with a .5 probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.5
sleepTime = time.Microsecond * time.Duration(rng.Intn(5000))
// stopwatch is used to measure how long it takes for the outbox to
// exit once the transport broke.
stopwatch = timeutil.NewStopWatch()
Expand Down Expand Up @@ -250,9 +257,14 @@ func TestOutboxInbox(t *testing.T) {
)
require.NoError(t, err)

inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background())
readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx)
inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
defer inboxMemAcc.Close(readerCtx)
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(),
)
require.NoError(t, err)

streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) })
Expand Down Expand Up @@ -283,7 +295,6 @@ func TestOutboxInbox(t *testing.T) {
wg.Done()
}()

readerCtx, readerCancelFn := context.WithCancel(ctx)
wg.Add(1)
go func() {
if sleepBeforeCancellation {
Expand All @@ -293,6 +304,8 @@ func TestOutboxInbox(t *testing.T) {
case noCancel:
case streamCtxCancel:
streamCancelFn()
case flowCtxCancel:
inboxFlowCtxCancelFn()
case readerCtxCancel:
readerCancelFn()
case transportBreaks:
Expand Down Expand Up @@ -396,10 +409,38 @@ func TestOutboxInbox(t *testing.T) {
// which prompts the watchdog goroutine of the outbox to cancel the
// flow.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 1)
case flowCtxCancel:
// If the flow context of the Inbox host gets canceled, it is
// treated as an ungraceful termination of the stream, so we expect
// an error from the stream handler.
require.True(t, errors.Is(streamHandlerErr, cancelchecker.QueryCanceledError))
// The Inbox propagates this cancellation on its host. Depending on
// when the cancellation is noticed by the reader, a different error
// is used, so we allow for both of them.
//
// QueryCanceledError is used when the flow ctx cancellation is
// observed before the stream arrived whereas wrapped
// context.Canceled error is used when the inbox handler goroutine
// notices the cancellation first and ungracefully shuts down the
// stream.
ok := errors.Is(readerErr, cancelchecker.QueryCanceledError) ||
testutils.IsError(readerErr, "context canceled")
require.True(t, ok, readerErr)

// In the production setup, the watchdog goroutine of the outbox
// would receive non-io.EOF error indicating an ungraceful
// completion of the FlowStream RPC call which would prompt the
// outbox to cancel the whole flow.
//
// However, because we're using a mock server, the propagation
// doesn't take place, so the flow context on the outbox side should
// not be canceled.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled, it is treated as a graceful
// termination of the stream, so we expect no error from the stream
// handler.
// If the reader context gets canceled while the inbox's host flow
// context doesn't, it is treated as a graceful termination of the
// stream, so we expect no error from the stream handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)
Expand Down Expand Up @@ -560,7 +601,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)

var (
Expand Down Expand Up @@ -642,7 +683,7 @@ func BenchmarkOutboxInbox(b *testing.B) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(ctx, colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(b, err)

var wg sync.WaitGroup
Expand Down Expand Up @@ -766,7 +807,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) {

typs := []*types.T{types.Int}

inbox, err := NewInbox(ctx, testAllocator, typs, streamID)
inbox, err := NewInbox(testAllocator, typs, streamID)
require.NoError(t, err)

ctxExtract := make(chan struct{})
Expand Down
80 changes: 54 additions & 26 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Inbox struct {
// goroutine should exit while waiting for a stream.
timeoutCh chan error

// flowCtxDone is the Done() channel of the flow context of the Inbox host.
flowCtxDone <-chan struct{}

// errCh is the channel that RunWithStream will block on, waiting until the
// Inbox does not need a stream any more. An error will only be sent on this
// channel in the event of a cancellation or a non-io.EOF error originating
Expand All @@ -99,14 +102,6 @@ type Inbox struct {
// only the Next/DrainMeta goroutine may access it.
stream flowStreamServer

// flowCtx is a temporary field that captures a flow's context during
// initialization. This is so that RunWithStream can listen for cancellation
// even in the case in which Next is not called (e.g. in cases where the Inbox
// is the left side of a HashJoiner). The best solution for this problem would
// be to refactor Operator.Init to accept a context since that must be called
// regardless of whether or not Next is called.
flowCtx context.Context

// statsAtomics are the execution statistics that need to be atomically
// accessed. This is necessary since Get*() methods can be called from
// different goroutine than Next().
Expand Down Expand Up @@ -134,7 +129,7 @@ var _ colexecop.Operator = &Inbox{}

// NewInbox creates a new Inbox.
func NewInbox(
ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
) (*Inbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
Expand All @@ -154,13 +149,28 @@ func NewInbox(
contextCh: make(chan context.Context, 1),
timeoutCh: make(chan error, 1),
errCh: make(chan error, 1),
flowCtx: ctx,
deserializationStopWatch: timeutil.NewStopWatch(),
}
i.scratch.data = make([]*array.Data, len(typs))
return i, nil
}

// NewInboxWithFlowCtxDone creates a new Inbox when the done channel of the flow
// context is available.
func NewInboxWithFlowCtxDone(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
flowCtxDone <-chan struct{},
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.flowCtxDone = flowCtxDone
return i, nil
}

// maybeInit calls Inbox.init if the inbox is not initialized and returns an
// error if the initialization was not successful. Usually this is because the
// given context is canceled before the remote stream arrives.
Expand Down Expand Up @@ -188,12 +198,15 @@ func (i *Inbox) init(ctx context.Context) error {
i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err)
return err
case <-ctx.Done():
// Our reader canceled the context meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so
// we don't send any error on errCh and only return an error. This
// will close the inbox (making the stream handler exit gracefully)
// and will stop the current goroutine from proceeding further.
return ctx.Err()
// errToThrow is propagated to the reader of the Inbox.
errToThrow := ctx.Err()
if err := i.checkFlowCtxCancellation(); err != nil {
// This is an ungraceful termination because the flow context has
// been canceled.
i.errCh <- err
errToThrow = err
}
return errToThrow
}

if i.ctxInterceptorFn != nil {
Expand All @@ -215,9 +228,21 @@ func (i *Inbox) close() {
}
}

// checkFlowCtxCancellation returns an error if the flow context has already
// been canceled.
func (i *Inbox) checkFlowCtxCancellation() error {
select {
case <-i.flowCtxDone:
return cancelchecker.QueryCanceledError
default:
return nil
}
}

// RunWithStream sets the Inbox's stream and waits until either streamCtx is
// canceled, a caller of Next cancels the first context passed into Next, or any
// error is encountered on the stream by the Next goroutine.
// canceled, the Inbox's host cancels the flow context, a caller of Next cancels
// the context passed into Init, or any error is encountered on the stream by
// the Next goroutine.
func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer) error {
streamCtx = logtags.AddTag(streamCtx, "streamID", i.streamID)
log.VEvent(streamCtx, 2, "Inbox handling stream")
Expand All @@ -234,7 +259,7 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
log.VEvent(streamCtx, 2, "Inbox reader arrived")
case <-streamCtx.Done():
return fmt.Errorf("%s: streamCtx while waiting for reader (remote client canceled)", streamCtx.Err())
case <-i.flowCtx.Done():
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
Expand All @@ -244,18 +269,21 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
// Now wait for one of the events described in the method comment. If a
// cancellation is encountered, nothing special must be done to cancel the
// reader goroutine as returning from the handler will close the stream.
//
// Note that we don't listen for cancellation on flowCtx.Done() because
// readerCtx must be the child of the flow context.
select {
case err := <-i.errCh:
// nil will be read from errCh when the channel is closed.
return err
case <-i.flowCtxDone:
// The flow context of the inbox host has been canceled. This can occur
// e.g. when the query is canceled, or when another stream encountered
// an unrecoverable error forcing it to shutdown the flow.
return cancelchecker.QueryCanceledError
case <-readerCtx.Done():
// The reader canceled the stream meaning that it no longer needs any
// more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
// readerCtx is canceled, but we don't know whether it was because the
// flow context was canceled or for other reason. In the former case we
// have an ungraceful shutdown whereas in the latter case we have a
// graceful one.
return i.checkFlowCtxCancellation()
case <-streamCtx.Done():
// The client canceled the stream.
return fmt.Errorf("%s: streamCtx in Inbox stream handler (remote client canceled)", streamCtx.Err())
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestInboxCancellation(t *testing.T) {

typs := []*types.T{types.Int}
t.Run("ReaderWaitingForStreamHandler", func(t *testing.T) {
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)
ctx, cancelFn := context.WithCancel(context.Background())
// Cancel the context.
Expand All @@ -78,7 +78,7 @@ func TestInboxCancellation(t *testing.T) {

t.Run("DuringRecv", func(t *testing.T) {
rpcLayer := makeMockFlowStreamRPCLayer()
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)
ctx, cancelFn := context.WithCancel(context.Background())

Expand Down Expand Up @@ -111,7 +111,7 @@ func TestInboxCancellation(t *testing.T) {

t.Run("StreamHandlerWaitingForReader", func(t *testing.T) {
rpcLayer := makeMockFlowStreamRPCLayer()
inbox, err := NewInbox(context.Background(), testAllocator, typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, typs, execinfrapb.StreamID(0))
require.NoError(t, err)

ctx, cancelFn := context.WithCancel(context.Background())
Expand All @@ -129,7 +129,7 @@ func TestInboxCancellation(t *testing.T) {
func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
defer leaktest.AfterTest(t)()

inbox, err := NewInbox(context.Background(), testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
require.NoError(t, err)

rpcLayer := makeMockFlowStreamRPCLayer()
Expand All @@ -156,7 +156,7 @@ func TestInboxTimeout(t *testing.T) {

ctx := context.Background()

inbox, err := NewInbox(ctx, testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
inbox, err := NewInbox(testAllocator, []*types.T{types.Int}, execinfrapb.StreamID(0))
require.NoError(t, err)

var (
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestInboxShutdown(t *testing.T) {
inboxCtx, inboxCancel := context.WithCancel(context.Background())
inboxMemAccount := testMemMonitor.MakeBoundAccount()
defer inboxMemAccount.Close(inboxCtx)
inbox, err := NewInbox(context.Background(), colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
require.NoError(t, err)
c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (o *Outbox) Run(
flowCtxCancel context.CancelFunc,
connectionTimeout time.Duration,
) {
flowCtx := ctx
// Derive a child context so that we can cancel all components rooted in
// this outbox.
var outboxCtxCancel context.CancelFunc
Expand Down Expand Up @@ -182,7 +183,12 @@ func (o *Outbox) Run(
}

client := execinfrapb.NewDistSQLClient(conn)
stream, err = client.FlowStream(ctx)
// We use the flow context for the RPC so that when outbox context is
// canceled in case of a graceful shutdown, the gRPC stream keeps on
// running. If, however, the flow context is canceled, then the
// termination of the whole query is ungraceful, so we're ok with the
// gRPC stream being ungracefully shutdown too.
stream, err = client.FlowStream(flowCtx)
if err != nil {
log.Warningf(
ctx,
Expand Down
Loading