Skip to content

Commit

Permalink
colflow: fix the shutdown for good
Browse files Browse the repository at this point in the history
This commit fixes a long standing bug in the distributed vectorized
query shutdown where in case of a graceful completion of the flow on one
node, we might get an error on another node resulting in the ungraceful
termination of the query. This was caused by the fact that on remote
nodes the last outbox to exit would cancel the flow context; however,
when instantiating `FlowStream` RPC the outboxes used a child context of
the flow context, so that "graceful" cancellation of the flow context
would cause the inbox to get an ungraceful termination of the gRPC
stream. As a result, the whole query could get "context canceled" error.

I believe this bug was introduced by me over two years ago because
I didn't fully understand how the shutdown should work, and in
particular I was missing that when an inbox observes the flow context
cancellation, it should terminate the `FlowStream` RPC ungracefully in
order to propagate the ungracefullness to the other side of the stream.
This shortcoming was fixed in the previous commit.

The shutdown protocol is now as follows:
- on the gateway node we keep on canceling the flow context at the very
end of the query execution. It doesn't matter whether the query resulted
in an error or not, and doing so allows us to ensure that everything
exits on the gateway node. This behavior is already present.
- due to the fix in a previous commit, that flow context cancellation
terminates ungracefully all still open gRPC streams for `FlowStream` RPC
for which the gateway node is the inbox host.
- the outboxes on the remote nodes get the ungraceful termination and
cancel the flow context of their hosts. This, in turn, would trigger
propagation of the ungraceful termination on other gRPC streams, etc.

Release note (bug fix): Previously, CockroachDB could return a spurious
"context canceled" error for a query that actually succeeded in
extremely rare cases, and this has now been fixed.
  • Loading branch information
yuzefovich committed Dec 21, 2021
1 parent 798dadb commit 4100446
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 221 deletions.
104 changes: 32 additions & 72 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ func TestOutboxInbox(t *testing.T) {
// flowCtxCancel models a scenario in which the flow context of the
// Inbox host is canceled which is an ungraceful shutdown.
flowCtxCancel
// readerCtxCancel models a scenario in which the consumer of the Inbox
// cancels the context while the host's flow context is not canceled.
// This is considered a graceful termination.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
)
Expand All @@ -163,18 +159,15 @@ func TestOutboxInbox(t *testing.T) {
cancellationScenarioName string
)
switch randVal := rng.Float64(); {
case randVal <= 0.2:
case randVal <= 0.25:
cancellationScenario = noCancel
cancellationScenarioName = "noCancel"
case randVal <= 0.4:
case randVal <= 0.5:
cancellationScenario = streamCtxCancel
cancellationScenarioName = "streamCtxCancel"
case randVal <= 0.6:
case randVal <= 0.75:
cancellationScenario = flowCtxCancel
cancellationScenarioName = "flowCtxCancel"
case randVal <= 0.8:
cancellationScenario = readerCtxCancel
cancellationScenarioName = "readerCtxCancel"
default:
cancellationScenario = transportBreaks
cancellationScenarioName = "transportBreaks"
Expand Down Expand Up @@ -247,34 +240,25 @@ func TestOutboxInbox(t *testing.T) {

var (
flowCtxCanceled uint32
// Because the outboxCtx must be a child of the flow context, we
// assume that if flowCtxCanceled is non-zero, then
// outboxCtxCanceled is too and don't check that explicitly.
outboxCtxCanceled uint32
wg sync.WaitGroup
wg sync.WaitGroup
)
wg.Add(1)
go func() {
flowCtx, flowCtxCancelFn := context.WithCancel(context.Background())
outboxFlowCtx, outboxFlowCtxCancelFn := context.WithCancel(context.Background())
flowCtxCancel := func() {
atomic.StoreUint32(&flowCtxCanceled, 1)
flowCtxCancelFn()
}
outboxCtx, outboxCtxCancelFn := context.WithCancel(flowCtx)
outboxCtxCancel := func() {
atomic.StoreUint32(&outboxCtxCanceled, 1)
outboxCtxCancelFn()
outboxFlowCtxCancelFn()
}

inputMemAcc := testMemMonitor.MakeBoundAccount()
defer inputMemAcc.Close(outboxCtx)
defer inputMemAcc.Close(outboxFlowCtx)
input := coldatatestutils.NewRandomDataOp(
colmem.NewAllocator(outboxCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args,
colmem.NewAllocator(outboxFlowCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args,
)
outboxMemAcc := testMemMonitor.MakeBoundAccount()
defer outboxMemAcc.Close(outboxCtx)
defer outboxMemAcc.Close(outboxFlowCtx)
outbox, err := NewOutbox(
colmem.NewAllocator(outboxCtx, &outboxMemAcc, coldata.StandardColumnFactory),
colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory),
colexecargs.OpWithMetaInfo{Root: input}, typs, nil, /* getStats */
)
require.NoError(t, err)
Expand All @@ -285,13 +269,12 @@ func TestOutboxInbox(t *testing.T) {
// to create a context of the node on which the outbox runs and keep
// it different from the streamCtx. This matters in
// 'transportBreaks' scenario.
outbox.runnerCtx = outboxCtx
outbox.runWithStream(streamCtx, clientStream, flowCtxCancel, outboxCtxCancel)
outbox.runnerCtx = outboxFlowCtx
outbox.runWithStream(streamCtx, clientStream, flowCtxCancel)
wg.Done()
}()

inboxFlowCtx, inboxFlowCtxCancelFn := context.WithCancel(context.Background())
readerCtx, readerCancelFn := context.WithCancel(inboxFlowCtx)
wg.Add(1)
go func() {
if sleepBeforeCancellation {
Expand All @@ -303,8 +286,6 @@ func TestOutboxInbox(t *testing.T) {
streamCancelFn()
case flowCtxCancel:
inboxFlowCtxCancelFn()
case readerCtxCancel:
readerCancelFn()
case transportBreaks:
err := conn.Close() // nolint:grpcconnclose
require.NoError(t, err)
Expand All @@ -314,9 +295,9 @@ func TestOutboxInbox(t *testing.T) {
}()

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(readerCtx)
inbox, err := NewInboxWithFlowCtxDone(
colmem.NewAllocator(readerCtx, &inboxMemAcc, coldata.StandardColumnFactory),
defer inboxMemAcc.Close(inboxFlowCtx)
inbox, err := NewInbox(
colmem.NewAllocator(inboxFlowCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxFlowCtx.Done(),
)
require.NoError(t, err)
Expand All @@ -326,19 +307,19 @@ func TestOutboxInbox(t *testing.T) {
// Use a deselector op to verify that the Outbox gets rid of the selection
// vector.
deselectorMemAcc := testMemMonitor.MakeBoundAccount()
defer deselectorMemAcc.Close(readerCtx)
defer deselectorMemAcc.Close(inboxFlowCtx)
inputBatches := colexecutils.NewDeselectorOp(
colmem.NewAllocator(readerCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs,
colmem.NewAllocator(inboxFlowCtx, &deselectorMemAcc, coldata.StandardColumnFactory), inputBuffer, typs,
)
inputBatches.Init(readerCtx)
inputBatches.Init(inboxFlowCtx)
outputBatches := colexecop.NewBatchBuffer()
var readerErr error
for {
var outputBatch coldata.Batch
if err := colexecerror.CatchVectorizedRuntimeError(func() {
// Note that it is ok that we call Init on every iteration - it
// is a noop every time except for the first one.
inbox.Init(readerCtx)
inbox.Init(inboxFlowCtx)
outputBatch = inbox.Next()
}); err != nil {
readerErr = err
Expand Down Expand Up @@ -382,7 +363,6 @@ func TestOutboxInbox(t *testing.T) {
// Verify that the Outbox terminated gracefully (did not cancel the
// flow context).
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
// And the Inbox did as well.
require.NoError(t, streamHandlerErr)
require.NoError(t, readerErr)
Expand Down Expand Up @@ -444,20 +424,6 @@ func TestOutboxInbox(t *testing.T) {
// doesn't take place, so the flow context on the outbox side should
// not be canceled.
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled while the inbox's host flow
// context doesn't, it is treated as a graceful termination of the
// stream, so we expect no error from the stream handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)

// The cancellation should have been communicated to the Outbox,
// resulting in the watchdog goroutine canceling the outbox context
// (but not the flow).
require.True(t, atomic.LoadUint32(&flowCtxCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCtxCanceled) == 1)
case transportBreaks:
// If the transport breaks, the scenario is very similar to
// streamCtxCancel. gRPC will cancel the stream handler's context.
Expand Down Expand Up @@ -496,14 +462,11 @@ func TestInboxHostCtxCancellation(t *testing.T) {
}()

// Simulate the "remote" node with a separate context.
outboxHostCtx, outboxHostCtxCancel := context.WithCancel(context.Background())
// Derive a separate context for the outbox itself (this is what is done in
// Outbox.Run).
outboxCtx, outboxCtxCancel := context.WithCancel(outboxHostCtx)
outboxFlowCtx, outboxFlowCtxCancel := context.WithCancel(context.Background())

// Initiate the FlowStream RPC from the outbox.
client := execinfrapb.NewDistSQLClient(conn)
clientStream, err := client.FlowStream(outboxCtx)
clientStream, err := client.FlowStream(outboxFlowCtx)
require.NoError(t, err)

// Create and run the outbox.
Expand All @@ -513,24 +476,24 @@ func TestInboxHostCtxCancellation(t *testing.T) {
typs := []*types.T{}
outboxInput := colexecutils.NewFixedNumTuplesNoInputOp(testAllocator, 1 /* numTuples */, nil /* opToInitialize */)
outboxMemAcc := testMemMonitor.MakeBoundAccount()
defer outboxMemAcc.Close(outboxHostCtx)
defer outboxMemAcc.Close(outboxFlowCtx)
outbox, err := NewOutbox(
colmem.NewAllocator(outboxHostCtx, &outboxMemAcc, coldata.StandardColumnFactory),
colmem.NewAllocator(outboxFlowCtx, &outboxMemAcc, coldata.StandardColumnFactory),
colexecargs.OpWithMetaInfo{Root: outboxInput}, typs, nil, /* getStats */
)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
outbox.runWithStream(outboxCtx, clientStream, outboxHostCtxCancel, outboxCtxCancel)
outbox.runWithStream(outboxFlowCtx, clientStream, outboxFlowCtxCancel)
wg.Done()
}()

// Create the inbox on the "local" node (simulated by a separate context).
inboxHostCtx, inboxHostCtxCancel := context.WithCancel(context.Background())
inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(inboxHostCtx)
inbox, err := NewInboxWithFlowCtxDone(
inbox, err := NewInbox(
colmem.NewAllocator(inboxHostCtx, &inboxMemAcc, coldata.StandardColumnFactory),
typs, execinfrapb.StreamID(0), inboxHostCtx.Done(),
)
Expand Down Expand Up @@ -704,20 +667,19 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done())
require.NoError(t, err)

var (
flowCanceled, outboxCanceled uint32
wg sync.WaitGroup
flowCanceled uint32
wg sync.WaitGroup
)
wg.Add(1)
go func() {
outbox.runWithStream(
ctx,
clientStream,
func() { atomic.StoreUint32(&flowCanceled, 1) },
func() { atomic.StoreUint32(&outboxCanceled, 1) },
)
wg.Done()
}()
Expand All @@ -729,10 +691,8 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {

wg.Wait()
require.NoError(t, <-streamHanderErrCh)
// Require that the outbox did not cancel the flow and did cancel
// the outbox since this is a graceful drain.
// Require that the outbox did not cancel the flow.
require.True(t, atomic.LoadUint32(&flowCanceled) == 0)
require.True(t, atomic.LoadUint32(&outboxCanceled) == 1)

// Verify that we received the expected metadata.
if tc.verifyExpectedMetadata != nil {
Expand Down Expand Up @@ -787,13 +747,13 @@ func BenchmarkOutboxInbox(b *testing.B) {

inboxMemAcc := testMemMonitor.MakeBoundAccount()
defer inboxMemAcc.Close(ctx)
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0))
inbox, err := NewInbox(colmem.NewAllocator(ctx, &inboxMemAcc, coldata.StandardColumnFactory), typs, execinfrapb.StreamID(0), ctx.Done())
require.NoError(b, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */, nil /* outboxCtxCancel */)
outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */)
wg.Done()
}()

Expand Down Expand Up @@ -912,7 +872,7 @@ func TestInboxCtxStreamIDTagging(t *testing.T) {

typs := []*types.T{types.Int}

inbox, err := NewInbox(testAllocator, typs, streamID)
inbox, err := NewInbox(testAllocator, typs, streamID, ctx.Done())
require.NoError(t, err)

ctxExtract := make(chan struct{})
Expand Down
Loading

0 comments on commit 4100446

Please sign in to comment.