From 83e644d081f87147b9f848269bb084501c299a93 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 21 Dec 2021 10:22:08 -0800 Subject: [PATCH] colflow: fix the shutdown for good This commit fixes a long standing bug in the distributed vectorized query shutdown where in case of a graceful completion of the flow on one node, we might get an error on another node resulting in the ungraceful termination of the query. This was caused by the fact that on remote nodes the last outbox to exit would cancel the flow context; however, when instantiating `FlowStream` RPC the outboxes used a child context of the flow context, so that "graceful" cancellation of the flow context would cause the inbox to get an ungraceful termination of the gRPC stream. As a result, the whole query could get "context canceled" error. I believe this bug was introduced by me over two years ago because I didn't fully understand how the shutdown should work, and in particular I was missing that when an inbox observes the flow context cancellation, it should terminate the `FlowStream` RPC ungracefully in order to propagate the ungracefullness to the other side of the stream. This shortcoming was fixed in the previous commit. Another possible bug was caused by the outbox canceling its own context in case of a graceful shutdown. As mentioned above, `FlowStream` RPC was issued using the outbox context, so there was a possibility of a race between `CloseSend` call being delivered to the inbox (graceful termination) and the context of the RPC being canceled (ungraceful termination). Both of these problems are now fixed, and the shutdown protocol now is as follows: - on the gateway node we keep on canceling the flow context at the very end of the query execution. It doesn't matter whether the query resulted in an error or not, and doing so allows us to ensure that everything exits on the gateway node. This behavior is already present. - due to the fix in a previous commit, that flow context cancellation terminates ungracefully all still open gRPC streams for `FlowStream` RPC for which the gateway node is the inbox host. - the outboxes on the remote nodes get the ungraceful termination and cancel the flow context of their hosts. This, in turn, would trigger propagation of the ungraceful termination on other gRPC streams, etc. - whenever an outbox exits gracefully, it cancels its own context, but the gRPC stream uses the flow context, so the stream is still alive. I debated a bit whether we want to keep this outbox context cancellation in case of a graceful completion and decided to keep it to minimize the scope of changes. Release note (bug fix): Previously, CockroachDB could return a spurious "context canceled" error for a query that actually succeeded in extremely rare cases, and this has now been fixed. --- pkg/sql/colflow/colrpc/outbox.go | 8 +++++++- pkg/sql/colflow/vectorized_flow.go | 15 --------------- pkg/sql/flowinfra/outbox.go | 2 ++ 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 1bd10f212bd4..9b097e05a3ba 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -152,6 +152,7 @@ func (o *Outbox) Run( flowCtxCancel context.CancelFunc, connectionTimeout time.Duration, ) { + flowCtx := ctx // Derive a child context so that we can cancel all components rooted in // this outbox. var outboxCtxCancel context.CancelFunc @@ -182,7 +183,12 @@ func (o *Outbox) Run( } client := execinfrapb.NewDistSQLClient(conn) - stream, err = client.FlowStream(ctx) + // We use the flow context for the RPC so that when outbox context is + // canceled in case of a graceful shutdown, the gRPC stream keeps on + // running. If, however, the flow context is canceled, then the + // termination of the whole query is ungraceful, so we're ok with the + // gRPC stream being ungracefully shutdown too. + stream, err = client.FlowStream(flowCtx) if err != nil { log.Warningf( ctx, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 8ea8e11f81a7..bcfc4043f77c 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -461,10 +461,6 @@ type vectorizedFlowCreator struct { numOutboxes int32 materializerAdded bool - // numOutboxesExited is an atomic that keeps track of how many outboxes have exited. - // When numOutboxesExited equals numOutboxes, the cancellation function for the flow - // is called. - numOutboxesExited int32 // numOutboxesDrained is an atomic that keeps track of how many outboxes have // been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is // added to a flow-level span. @@ -676,17 +672,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( flowCtxCancel, flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV), ) - // When the last Outbox on this node exits, we want to make sure that - // everything is shutdown; namely, we need to call cancelFn if: - // - it is the last Outbox - // - there is no root materializer on this node (if it were, it would take - // care of the cancellation itself) - // - cancelFn is non-nil (it can be nil in tests). - // Calling cancelFn will cancel the context that all infrastructure on this - // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && flowCtxCancel != nil { - flowCtxCancel() - } } s.accumulateAsyncComponent(run) return outbox, nil diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index cc5e1d212ec0..3593900cbde4 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -249,6 +249,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { log.Infof(ctx, "outbox: calling FlowStream") } // The context used here escapes, so it has to be a background context. + // TODO(yuzefovich): the usage of the TODO context here is suspicious. + // Investigate this. m.stream, err = client.FlowStream(context.TODO()) if err != nil { if log.V(1) {