diff --git a/pkg/sql/colexec/disk_spiller.go b/pkg/sql/colexec/disk_spiller.go index 2e3ad36f3dd8..1b0dfdbd3885 100644 --- a/pkg/sql/colexec/disk_spiller.go +++ b/pkg/sql/colexec/disk_spiller.go @@ -239,8 +239,17 @@ func (d *diskSpillerBase) Close(ctx context.Context) error { return nil } var retErr error + for _, input := range d.inputs { + if c, ok := input.(colexecop.Closer); ok { + if err := c.Close(ctx); err != nil { + retErr = err + } + } + } if c, ok := d.inMemoryOp.(colexecop.Closer); ok { - retErr = c.Close(ctx) + if err := c.Close(ctx); err != nil { + retErr = err + } } if c, ok := d.diskBackedOp.(colexecop.Closer); ok { if err := c.Close(ctx); err != nil { diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 0b34d35da753..f35dd55d7d98 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -91,14 +91,8 @@ func TestExternalDistinct(t *testing.T) { require.Equal(t, numExpectedClosers, len(closers)) return distinct, err }) - if tc.errorOnDup == "" || tc.noError { - // We don't check that all FDs were released if an error is - // expected to be returned because our utility closeIfCloser() - // doesn't handle multiple closers (which is always the case for - // the external distinct). - for i, sem := range semsToCheck { - require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) - } + for i, sem := range semsToCheck { + require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } } } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 3d561c2b7555..3151abe2362f 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -477,9 +477,10 @@ func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error { // Note that at this point we know that the input goroutines won't be // spawned up (our consumer won't call Next/DrainMeta after calling Close), // so it is safe to close all closers from this goroutine. - for _, span := range s.tracingSpans { + for i, span := range s.tracingSpans { if span != nil { span.Finish() + s.tracingSpans[i] = nil } } var lastErr error diff --git a/pkg/sql/colexec/serial_unordered_synchronizer.go b/pkg/sql/colexec/serial_unordered_synchronizer.go index 02288277f4c6..1bd385848512 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer.go @@ -120,6 +120,7 @@ func (s *SerialUnorderedSynchronizer) Close(context.Context) error { } if s.span != nil { s.span.Finish() + s.span = nil } return lastErr } diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 86e75d750093..9ff092181c3d 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -143,7 +143,9 @@ type BufferingInMemoryOperator interface { type Closer interface { // Close releases the resources associated with this Closer. If this Closer // is an Operator, the implementation of Close must be safe to execute even - // if Operator.Init wasn't called. + // if Operator.Init wasn't called. Multiple calls to Close() are allowed, + // and most of the implementations should make all calls except for the + // first one no-ops. // // Unless the Closer derives its own context with a separate tracing span, // the argument context rather than the one from Init() must be used