Skip to content

Commit

Permalink
Merge #106459
Browse files Browse the repository at this point in the history
106459: colexec: close operators at the end of colexec tests r=yuzefovich,michae2 a=michae2

Fixes: #106119

**colcontainer: close PDQ during TestPartitionedDiskQueueSimulatedExternal**

Close the `PartitionedDiskQueue` at the end of `TestPartitionedDiskQueueSimulatedExternal/HashJoin`.

Release note: None

---

**colexecutils: close SpillingBuffer during TestSpillingBuffer**

Release note: None

---

**colexec: make some operators implement colexecop.Closer interface**

This commit makes the following adjustments that fix "leaked files" in
the disk-spilling tests:
- `orderedDistinct` now implements `colexecop.Closer` (needed for
`TestExternalDistinct`)
- `orderedAggregator` and `distinctChainOps` now implement
`colexecop.Closer` (needed for `TestExternalHashAggregator`)
- `TestCrossJoiner` is adjusted to explicitly close all closers when
non-empty ON expression is used (in this case we have selection and
projection operators on top of the cross joiner which currently don't
implement the `Closer` interface).

Release note: None

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Sep 8, 2023
2 parents 52f0a7e + 2cf55aa commit 0fd3da5
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/sql/colcontainer/partitionedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,5 +310,8 @@ func TestPartitionedDiskQueueSimulatedExternal(t *testing.T) {
}

joinRepartition(0, 0, numRepartitions, 0)

require.NoError(t, p.Close(ctx))
countingFS.assertOpenFDs(t, sem, 0, 0)
})
}
23 changes: 23 additions & 0 deletions pkg/sql/colexec/colexecbase/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
func OrderedDistinctColsToOperators(
input colexecop.Operator, distinctCols []uint32, typs []*types.T, nullsAreDistinct bool,
) (colexecop.ResettableOperator, []bool) {
var inputToClose colexecop.Closer
if c, ok := input.(colexecop.Closer); ok {
inputToClose = c
}
distinctCol := make([]bool, coldata.BatchSize())
// zero the boolean column on every iteration.
input = &fnOp{
Expand All @@ -55,16 +59,26 @@ func OrderedDistinctColsToOperators(
}
distinctChain := &distinctChainOps{
ResettableOperator: r,
inputToClose: inputToClose,
}
return distinctChain, distinctCol
}

type distinctChainOps struct {
colexecop.ResettableOperator
colexecop.NonExplainable
inputToClose colexecop.Closer
}

var _ colexecop.ResettableOperator = &distinctChainOps{}
var _ colexecop.ClosableOperator = &distinctChainOps{}

func (d *distinctChainOps) Close(ctx context.Context) error {
if d.inputToClose != nil {
return d.inputToClose.Close(ctx)
}
return nil
}

// NewOrderedDistinct creates a new ordered distinct operator on the given
// input columns with the given types.
Expand Down Expand Up @@ -104,6 +118,7 @@ type orderedDistinct struct {
}

var _ colexecop.ResettableOperator = &orderedDistinct{}
var _ colexecop.ClosableOperator = &orderedDistinct{}

// Init implements the colexecop.Operator interface.
func (d *orderedDistinct) Init(ctx context.Context) {
Expand Down Expand Up @@ -140,6 +155,14 @@ func (d *orderedDistinct) Reset(ctx context.Context) {
d.distinctChain.Reset(ctx)
}

// Close implements the colexecop.Closer interface.
func (d *orderedDistinct) Close(ctx context.Context) error {
if c, ok := d.Input.(colexecop.Closer); ok {
return c.Close(ctx)
}
return nil
}

// UpsertDistinctHelper is a utility that helps distinct operators emit errors
// when they observe duplicate tuples. This behavior is needed by UPSERT
// operations.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecutils/spilling_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestSpillingBuffer(t *testing.T) {
spillingQueueUnlimitedAllocator, memoryLimit, queueCfg,
colexecop.NewTestingSemaphore(2), typs, testDiskAcc, testMemAcc, colsToStore...,
)
defer buf.Close(ctx)
if setInMemTuplesLimit {
buf.testingKnobs.maxTuplesStoredInMemory = numBatches * inputBatchSize / 2
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ func TestCrossJoiner(t *testing.T) {
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)

// When we have non-empty ON expression, we will plan additional operators
// on top of the cross joiner (selection and projection ops). Those
// operators currently don't implement the colexecop.Closer interface, so
// the closers aren't automatically closed by the RunTests harness (i.e.
// closeIfCloser stops early), so we need to close all closers explicitly.
// (The alternative would be to make all these selection and projection
// operators implement the interface, but it doesn't seem worth it.)
var onExprToClose colexecop.Closers
defer func() {
for _, c := range onExprToClose {
require.NoError(t, c.Close(ctx))
}
}()

for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
for _, tc := range getCJTestCases() {
Expand All @@ -403,6 +417,9 @@ func TestCrossJoiner(t *testing.T) {
if err != nil {
return nil, err
}
if !tc.onExpr.Empty() {
onExprToClose = append(onExprToClose, result.ToClose...)
}
return result.Root, nil
})
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
type orderedAggregator struct {
colexecop.OneInputNode
colexecop.InitHelper
colexecop.CloserHelper

state orderedAggregatorState

Expand Down Expand Up @@ -415,5 +416,14 @@ func (a *orderedAggregator) Reset(ctx context.Context) {
}

func (a *orderedAggregator) Close(ctx context.Context) error {
return a.toClose.Close(ctx)
if !a.CloserHelper.Close() {
return nil
}
retErr := a.toClose.Close(ctx)
if c, ok := a.Input.(colexecop.Closer); ok {
if err := c.Close(ctx); err != nil {
retErr = err
}
}
return retErr
}

0 comments on commit 0fd3da5

Please sign in to comment.