Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: close disk resources in tests #106459

Merged
merged 3 commits into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/sql/colcontainer/partitionedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,5 +310,8 @@ func TestPartitionedDiskQueueSimulatedExternal(t *testing.T) {
}

joinRepartition(0, 0, numRepartitions, 0)

require.NoError(t, p.Close(ctx))
countingFS.assertOpenFDs(t, sem, 0, 0)
})
}
23 changes: 23 additions & 0 deletions pkg/sql/colexec/colexecbase/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
func OrderedDistinctColsToOperators(
input colexecop.Operator, distinctCols []uint32, typs []*types.T, nullsAreDistinct bool,
) (colexecop.ResettableOperator, []bool) {
var inputToClose colexecop.Closer
if c, ok := input.(colexecop.Closer); ok {
inputToClose = c
}
distinctCol := make([]bool, coldata.BatchSize())
// zero the boolean column on every iteration.
input = &fnOp{
Expand All @@ -55,16 +59,26 @@ func OrderedDistinctColsToOperators(
}
distinctChain := &distinctChainOps{
ResettableOperator: r,
inputToClose: inputToClose,
}
return distinctChain, distinctCol
}

type distinctChainOps struct {
colexecop.ResettableOperator
colexecop.NonExplainable
inputToClose colexecop.Closer
}

var _ colexecop.ResettableOperator = &distinctChainOps{}
var _ colexecop.ClosableOperator = &distinctChainOps{}

func (d *distinctChainOps) Close(ctx context.Context) error {
if d.inputToClose != nil {
return d.inputToClose.Close(ctx)
}
return nil
}

// NewOrderedDistinct creates a new ordered distinct operator on the given
// input columns with the given types.
Expand Down Expand Up @@ -104,6 +118,7 @@ type orderedDistinct struct {
}

var _ colexecop.ResettableOperator = &orderedDistinct{}
var _ colexecop.ClosableOperator = &orderedDistinct{}

// Init implements the colexecop.Operator interface.
func (d *orderedDistinct) Init(ctx context.Context) {
Expand Down Expand Up @@ -140,6 +155,14 @@ func (d *orderedDistinct) Reset(ctx context.Context) {
d.distinctChain.Reset(ctx)
}

// Close implements the colexecop.Closer interface.
func (d *orderedDistinct) Close(ctx context.Context) error {
if c, ok := d.Input.(colexecop.Closer); ok {
return c.Close(ctx)
}
return nil
}

// UpsertDistinctHelper is a utility that helps distinct operators emit errors
// when they observe duplicate tuples. This behavior is needed by UPSERT
// operations.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecutils/spilling_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestSpillingBuffer(t *testing.T) {
spillingQueueUnlimitedAllocator, memoryLimit, queueCfg,
colexecop.NewTestingSemaphore(2), typs, testDiskAcc, testMemAcc, colsToStore...,
)
defer buf.Close(ctx)
if setInMemTuplesLimit {
buf.testingKnobs.maxTuplesStoredInMemory = numBatches * inputBatchSize / 2
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ func TestCrossJoiner(t *testing.T) {
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)

// When we have non-empty ON expression, we will plan additional operators
// on top of the cross joiner (selection and projection ops). Those
// operators currently don't implement the colexecop.Closer interface, so
// the closers aren't automatically closed by the RunTests harness (i.e.
// closeIfCloser stops early), so we need to close all closers explicitly.
// (The alternative would be to make all these selection and projection
// operators implement the interface, but it doesn't seem worth it.)
var onExprToClose colexecop.Closers
defer func() {
for _, c := range onExprToClose {
require.NoError(t, c.Close(ctx))
}
}()

for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
for _, tc := range getCJTestCases() {
Expand All @@ -403,6 +417,9 @@ func TestCrossJoiner(t *testing.T) {
if err != nil {
return nil, err
}
if !tc.onExpr.Empty() {
onExprToClose = append(onExprToClose, result.ToClose...)
}
return result.Root, nil
})
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
type orderedAggregator struct {
colexecop.OneInputNode
colexecop.InitHelper
colexecop.CloserHelper

state orderedAggregatorState

Expand Down Expand Up @@ -415,5 +416,14 @@ func (a *orderedAggregator) Reset(ctx context.Context) {
}

func (a *orderedAggregator) Close(ctx context.Context) error {
return a.toClose.Close(ctx)
if !a.CloserHelper.Close() {
return nil
}
retErr := a.toClose.Close(ctx)
if c, ok := a.Input.(colexecop.Closer); ok {
if err := c.Close(ctx); err != nil {
retErr = err
}
}
return retErr
}