Skip to content

Commit

Permalink
colflow: fix the shutdown for good
Browse files Browse the repository at this point in the history
This commit fixes a long standing bug in the distributed vectorized
query shutdown where in case of a graceful completion of the flow on one
node, we might get an error on another node resulting in the ungraceful
termination of the query. This was caused by the fact that on remote
nodes the last outbox to exit would cancel the flow context; however,
when instantiating `FlowStream` RPC the outboxes used a child context of
the flow context, so that "graceful" cancellation of the flow context
would cause the inbox to get an ungraceful termination of the gRPC
stream. As a result, the whole query could get "context canceled" error.

I believe this bug was introduced by me over two years ago because
I didn't fully understand how the shutdown should work, and in
particular I was missing that when an inbox observes the flow context
cancellation, it should terminate the `FlowStream` RPC ungracefully in
order to propagate the ungracefullness to the other side of the stream.
This shortcoming was fixed in the previous commit.

Another possible bug was caused by the outbox canceling its own context
in case of a graceful shutdown. As mentioned above, `FlowStream` RPC was
issued using the outbox context, so there was a possibility of a race
between `CloseSend` call being delivered to the inbox (graceful
termination) and the context of the RPC being canceled (ungraceful
termination).

Both of these problems are now fixed, and the shutdown protocol now is as
follows:
- on the gateway node we keep on canceling the flow context at the very
end of the query execution. It doesn't matter whether the query resulted
in an error or not, and doing so allows us to ensure that everything
exits on the gateway node. This behavior is already present.
- due to the fix in a previous commit, that flow context cancellation
terminates ungracefully all still open gRPC streams for `FlowStream` RPC
for which the gateway node is the inbox host.
- the outboxes on the remote nodes get the ungraceful termination and
cancel the flow context of their hosts. This, in turn, would trigger
propagation of the ungraceful termination on other gRPC streams, etc.
- whenever an outbox exits gracefully, it cancels its own context, but
the gRPC stream uses the flow context, so the stream is still alive.
I debated a bit whether we want to keep this outbox context cancellation
in case of a graceful completion and decided to keep it to minimize the
scope of changes.

Release note (bug fix): Previously, CockroachDB could return a spurious
"context canceled" error for a query that actually succeeded in
extremely rare cases, and this has now been fixed.
  • Loading branch information
yuzefovich committed Dec 24, 2021
1 parent 57881a3 commit 83e644d
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 16 deletions.
8 changes: 7 additions & 1 deletion pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (o *Outbox) Run(
flowCtxCancel context.CancelFunc,
connectionTimeout time.Duration,
) {
flowCtx := ctx
// Derive a child context so that we can cancel all components rooted in
// this outbox.
var outboxCtxCancel context.CancelFunc
Expand Down Expand Up @@ -182,7 +183,12 @@ func (o *Outbox) Run(
}

client := execinfrapb.NewDistSQLClient(conn)
stream, err = client.FlowStream(ctx)
// We use the flow context for the RPC so that when outbox context is
// canceled in case of a graceful shutdown, the gRPC stream keeps on
// running. If, however, the flow context is canceled, then the
// termination of the whole query is ungraceful, so we're ok with the
// gRPC stream being ungracefully shutdown too.
stream, err = client.FlowStream(flowCtx)
if err != nil {
log.Warningf(
ctx,
Expand Down
15 changes: 0 additions & 15 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,6 @@ type vectorizedFlowCreator struct {
numOutboxes int32
materializerAdded bool

// numOutboxesExited is an atomic that keeps track of how many outboxes have exited.
// When numOutboxesExited equals numOutboxes, the cancellation function for the flow
// is called.
numOutboxesExited int32
// numOutboxesDrained is an atomic that keeps track of how many outboxes have
// been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is
// added to a flow-level span.
Expand Down Expand Up @@ -676,17 +672,6 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
flowCtxCancel,
flowinfra.SettingFlowStreamTimeout.Get(&flowCtx.Cfg.Settings.SV),
)
// When the last Outbox on this node exits, we want to make sure that
// everything is shutdown; namely, we need to call cancelFn if:
// - it is the last Outbox
// - there is no root materializer on this node (if it were, it would take
// care of the cancellation itself)
// - cancelFn is non-nil (it can be nil in tests).
// Calling cancelFn will cancel the context that all infrastructure on this
// node is listening on, so it will shut everything down.
if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && flowCtxCancel != nil {
flowCtxCancel()
}
}
s.accumulateAsyncComponent(run)
return outbox, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error {
log.Infof(ctx, "outbox: calling FlowStream")
}
// The context used here escapes, so it has to be a background context.
// TODO(yuzefovich): the usage of the TODO context here is suspicious.
// Investigate this.
m.stream, err = client.FlowStream(context.TODO())
if err != nil {
if log.V(1) {
Expand Down

0 comments on commit 83e644d

Please sign in to comment.