diff --git a/pkg/sql/colcontainer/partitionedqueue_test.go b/pkg/sql/colcontainer/partitionedqueue_test.go index 3efbe2d9d6de..ec1aa78db95f 100644 --- a/pkg/sql/colcontainer/partitionedqueue_test.go +++ b/pkg/sql/colcontainer/partitionedqueue_test.go @@ -310,5 +310,8 @@ func TestPartitionedDiskQueueSimulatedExternal(t *testing.T) { } joinRepartition(0, 0, numRepartitions, 0) + + require.NoError(t, p.Close(ctx)) + countingFS.assertOpenFDs(t, sem, 0, 0) }) } diff --git a/pkg/sql/colexec/colexecbase/distinct.go b/pkg/sql/colexec/colexecbase/distinct.go index ba0510f4cba6..b516c0d4a090 100644 --- a/pkg/sql/colexec/colexecbase/distinct.go +++ b/pkg/sql/colexec/colexecbase/distinct.go @@ -29,6 +29,10 @@ import ( func OrderedDistinctColsToOperators( input colexecop.Operator, distinctCols []uint32, typs []*types.T, nullsAreDistinct bool, ) (colexecop.ResettableOperator, []bool) { + var inputToClose colexecop.Closer + if c, ok := input.(colexecop.Closer); ok { + inputToClose = c + } distinctCol := make([]bool, coldata.BatchSize()) // zero the boolean column on every iteration. input = &fnOp{ @@ -55,6 +59,7 @@ func OrderedDistinctColsToOperators( } distinctChain := &distinctChainOps{ ResettableOperator: r, + inputToClose: inputToClose, } return distinctChain, distinctCol } @@ -62,9 +67,18 @@ func OrderedDistinctColsToOperators( type distinctChainOps struct { colexecop.ResettableOperator colexecop.NonExplainable + inputToClose colexecop.Closer } var _ colexecop.ResettableOperator = &distinctChainOps{} +var _ colexecop.ClosableOperator = &distinctChainOps{} + +func (d *distinctChainOps) Close(ctx context.Context) error { + if d.inputToClose != nil { + return d.inputToClose.Close(ctx) + } + return nil +} // NewOrderedDistinct creates a new ordered distinct operator on the given // input columns with the given types. @@ -104,6 +118,7 @@ type orderedDistinct struct { } var _ colexecop.ResettableOperator = &orderedDistinct{} +var _ colexecop.ClosableOperator = &orderedDistinct{} // Init implements the colexecop.Operator interface. func (d *orderedDistinct) Init(ctx context.Context) { @@ -140,6 +155,14 @@ func (d *orderedDistinct) Reset(ctx context.Context) { d.distinctChain.Reset(ctx) } +// Close implements the colexecop.Closer interface. +func (d *orderedDistinct) Close(ctx context.Context) error { + if c, ok := d.Input.(colexecop.Closer); ok { + return c.Close(ctx) + } + return nil +} + // UpsertDistinctHelper is a utility that helps distinct operators emit errors // when they observe duplicate tuples. This behavior is needed by UPSERT // operations. diff --git a/pkg/sql/colexec/colexecutils/spilling_buffer_test.go b/pkg/sql/colexec/colexecutils/spilling_buffer_test.go index 07cc0978ce43..3488ff82fcbc 100644 --- a/pkg/sql/colexec/colexecutils/spilling_buffer_test.go +++ b/pkg/sql/colexec/colexecutils/spilling_buffer_test.go @@ -122,6 +122,7 @@ func TestSpillingBuffer(t *testing.T) { spillingQueueUnlimitedAllocator, memoryLimit, queueCfg, colexecop.NewTestingSemaphore(2), typs, testDiskAcc, testMemAcc, colsToStore..., ) + defer buf.Close(ctx) if setInMemTuplesLimit { buf.testingKnobs.maxTuplesStoredInMemory = numBatches * inputBatchSize / 2 } diff --git a/pkg/sql/colexec/crossjoiner_test.go b/pkg/sql/colexec/crossjoiner_test.go index ffbc4417d143..b9eeabdead9c 100644 --- a/pkg/sql/colexec/crossjoiner_test.go +++ b/pkg/sql/colexec/crossjoiner_test.go @@ -384,6 +384,20 @@ func TestCrossJoiner(t *testing.T) { var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + // When we have non-empty ON expression, we will plan additional operators + // on top of the cross joiner (selection and projection ops). Those + // operators currently don't implement the colexecop.Closer interface, so + // the closers aren't automatically closed by the RunTests harness (i.e. + // closeIfCloser stops early), so we need to close all closers explicitly. + // (The alternative would be to make all these selection and projection + // operators implement the interface, but it doesn't seem worth it.) + var onExprToClose colexecop.Closers + defer func() { + for _, c := range onExprToClose { + require.NoError(t, c.Close(ctx)) + } + }() + for _, spillForced := range []bool{false, true} { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced for _, tc := range getCJTestCases() { @@ -403,6 +417,9 @@ func TestCrossJoiner(t *testing.T) { if err != nil { return nil, err } + if !tc.onExpr.Empty() { + onExprToClose = append(onExprToClose, result.ToClose...) + } return result.Root, nil }) } diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index 9987852bb421..548f367eb6be 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -83,6 +83,7 @@ const ( type orderedAggregator struct { colexecop.OneInputNode colexecop.InitHelper + colexecop.CloserHelper state orderedAggregatorState @@ -415,5 +416,14 @@ func (a *orderedAggregator) Reset(ctx context.Context) { } func (a *orderedAggregator) Close(ctx context.Context) error { - return a.toClose.Close(ctx) + if !a.CloserHelper.Close() { + return nil + } + retErr := a.toClose.Close(ctx) + if c, ok := a.Input.(colexecop.Closer); ok { + if err := c.Close(ctx); err != nil { + retErr = err + } + } + return retErr }