From ecd6403efb02fac4e46458a4341e3f3204298c6e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 8 Apr 2022 22:43:59 -0700 Subject: [PATCH] colexec: fix recent regression with cancellation of inboxes Recent change 773d9ca171edcaf2f5dd0795481c5f673aad30b5 fixed the way inboxes handle regular query errors so that now the gRPC streams are not broken whenever a query error is encountered. However, that change introduced a regression - it is now possible that the inbox handler goroutine (the one instantiated to handle FlowStream gRPC call) never exits when the inbox is an input to the parallel unordered synchronizer. In particular, the following sequence of events can happen: 1. the reader goroutine of the inbox receives an error from the corresponding outbox 2. this error is propagated to one of the input goroutines of the unordered synchronizer via a panic. Notably, this is considered a "graceful" termination from the perspective of the gRPC stream handling, so the handler goroutine is not notified of this error, and the inbox is not closed. It is expected that the inbox will be drained which will close the handler goroutine. 3. however, the synchronizer input goroutine currently simply receives the error, propagates it to the coordinator goroutine, and exits, without performing the draining. Thus, we get into such a state that the inbox is never drained, so the handler goroutine will stay alive forever. In particular, this will block `Flow.Wait` calls, and `Flow.Cleanup` will never be called. This could leak, for example, to leaking file descriptors used by the temporary disk storage in the vectorized engine. This fix is quite simple - instead of exiting in step 3, the synchronizer's input goroutine should just proceed to draining, and only exit once draining is performed. I believe this was always the intention, but it didn't really matter until the fix in 773d9ca171edcaf2f5dd0795481c5f673aad30b5 because draining of the inbox was a noop since the gRPC stream was prematurely broken. Release note: None --- .../colexec/parallel_unordered_synchronizer.go | 11 ++++++++++- .../parallel_unordered_synchronizer_test.go | 17 ++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 3d561c2b7555..eb650a84a273 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -260,7 +260,13 @@ func (s *ParallelUnorderedSynchronizer) init() { continue } sendErr(err) - return + // After we encounter an error, we proceed to draining. + // If this is a context cancellation, we'll realize that + // in the select below, so the drained meta will be + // ignored, for all other errors the drained meta will + // be sent to the coordinator goroutine. + s.setState(parallelUnorderedSynchronizerStateDraining) + continue } msg.b = s.batches[inputIdx] if s.batches[inputIdx].Length() != 0 { @@ -341,6 +347,9 @@ func (s *ParallelUnorderedSynchronizer) Next() coldata.Batch { // is safe to retrieve the next batch. Since Next has been called, we can // reuse memory instead of making safe copies of batches returned. s.notifyInputToReadNextBatch(s.lastReadInputIdx) + case parallelUnorderedSynchronizerStateDraining: + // One of the inputs has just encountered an error. We do nothing + // here and will read that error from the errCh below. default: colexecerror.InternalError(errors.AssertionFailedf("unhandled state in ParallelUnorderedSynchronizer Next goroutine: %d", state)) } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index 85c058cf8146..b7c3c4e676ac 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -216,6 +216,16 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) { colmem.NewAllocator(ctx, &acc, coldata.StandardColumnFactory), ) } + // Also add a metadata source to each input. + for i := 0; i < len(inputs); i++ { + inputs[i].MetadataSources = colexecop.MetadataSources{colexectestutils.CallbackMetadataSource{ + DrainMetaCb: func() []execinfrapb.ProducerMetadata { + // Note that we don't care about the type of metadata, so we can + // just use an empty metadata object. + return []execinfrapb.ProducerMetadata{{}} + }, + }} + } var wg sync.WaitGroup s := NewParallelUnorderedSynchronizer(inputs, &wg) @@ -228,7 +238,12 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) { // Loop until we get an error. } // The caller must call DrainMeta on error. - require.Zero(t, len(s.DrainMeta())) + // + // Ensure that all inputs, including the one that encountered an error, + // properly drain their metadata sources. Notably, the error itself should + // not be propagated as metadata (i.e. we don't want it to be duplicated), + // but each input should produce a single metadata object. + require.Equal(t, len(inputs), len(s.DrainMeta())) // This is the crux of the test: assert that all inputs have finished. require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs))) }