Skip to content

Commit

Permalink
colexec: fix recent regression with cancellation of inboxes
Browse files Browse the repository at this point in the history
Recent change 773d9ca fixed the way
inboxes handle regular query errors so that now the gRPC streams are not
broken whenever a query error is encountered. However, that change
introduced a regression - it is now possible that the inbox handler
goroutine (the one instantiated to handle FlowStream gRPC call) never
exits when the inbox is an input to the parallel unordered synchronizer.

In particular, the following sequence of events can happen:
1. the reader goroutine of the inbox receives an error from the
corresponding outbox
2. this error is propagated to one of the input goroutines of the
unordered synchronizer via a panic. Notably, this is considered
a "graceful" termination from the perspective of the gRPC stream
handling, so the handler goroutine is not notified of this error, and
the inbox is not closed. It is expected that the inbox will be drained
which will close the handler goroutine.
3. however, the synchronizer input goroutine currently simply receives
the error, propagates it to the coordinator goroutine, and exits,
without performing the draining.

Thus, we get into such a state that the inbox is never drained, so the
handler goroutine will stay alive forever. In particular, this will
block `Flow.Wait` calls, and `Flow.Cleanup` will never be called. This
could leak, for example, to leaking file descriptors used by the
temporary disk storage in the vectorized engine.

This fix is quite simple - instead of exiting in step 3, the
synchronizer's input goroutine should just proceed to draining, and only
exit once draining is performed. I believe this was always the
intention, but it didn't really matter until the fix in
773d9ca because draining of the inbox
was a noop since the gRPC stream was prematurely broken.

Release note: None
  • Loading branch information
yuzefovich committed Apr 11, 2022
1 parent efe653c commit ecd6403
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
11 changes: 10 additions & 1 deletion pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ func (s *ParallelUnorderedSynchronizer) init() {
continue
}
sendErr(err)
return
// After we encounter an error, we proceed to draining.
// If this is a context cancellation, we'll realize that
// in the select below, so the drained meta will be
// ignored, for all other errors the drained meta will
// be sent to the coordinator goroutine.
s.setState(parallelUnorderedSynchronizerStateDraining)
continue
}
msg.b = s.batches[inputIdx]
if s.batches[inputIdx].Length() != 0 {
Expand Down Expand Up @@ -341,6 +347,9 @@ func (s *ParallelUnorderedSynchronizer) Next() coldata.Batch {
// is safe to retrieve the next batch. Since Next has been called, we can
// reuse memory instead of making safe copies of batches returned.
s.notifyInputToReadNextBatch(s.lastReadInputIdx)
case parallelUnorderedSynchronizerStateDraining:
// One of the inputs has just encountered an error. We do nothing
// here and will read that error from the errCh below.
default:
colexecerror.InternalError(errors.AssertionFailedf("unhandled state in ParallelUnorderedSynchronizer Next goroutine: %d", state))
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
colmem.NewAllocator(ctx, &acc, coldata.StandardColumnFactory),
)
}
// Also add a metadata source to each input.
for i := 0; i < len(inputs); i++ {
inputs[i].MetadataSources = colexecop.MetadataSources{colexectestutils.CallbackMetadataSource{
DrainMetaCb: func() []execinfrapb.ProducerMetadata {
// Note that we don't care about the type of metadata, so we can
// just use an empty metadata object.
return []execinfrapb.ProducerMetadata{{}}
},
}}
}

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(inputs, &wg)
Expand All @@ -228,7 +238,12 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
// Loop until we get an error.
}
// The caller must call DrainMeta on error.
require.Zero(t, len(s.DrainMeta()))
//
// Ensure that all inputs, including the one that encountered an error,
// properly drain their metadata sources. Notably, the error itself should
// not be propagated as metadata (i.e. we don't want it to be duplicated),
// but each input should produce a single metadata object.
require.Equal(t, len(inputs), len(s.DrainMeta()))
// This is the crux of the test: assert that all inputs have finished.
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
}
Expand Down

0 comments on commit ecd6403

Please sign in to comment.