Skip to content

Commit

Permalink
Revert "colexec: close all Closers during colexectestutils.RunTests"
Browse files Browse the repository at this point in the history
This reverts commit bb4e8f6.
  • Loading branch information
yuzefovich committed Sep 8, 2023
1 parent bb4e8f6 commit 06691b7
Show file tree
Hide file tree
Showing 34 changed files with 142 additions and 143 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func TestAggregators(t *testing.T) {
verifier = colexectestutils.UnorderedVerifier
}
colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier,
func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
func(input []colexecop.Operator) (colexecop.Operator, error) {
args := &colexecagg.NewAggregatorArgs{
Allocator: testAllocator,
MemAccount: testMemAcc,
Expand All @@ -837,7 +837,7 @@ func TestAggregators(t *testing.T) {
OutputTypes: outputTypes,
}
args.TestingKnobs.HashTableNumBuckets = uint32(1 + rng.Intn(7))
return agg.new(ctx, args), nil, nil
return agg.new(ctx, args), nil
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/and_or_projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,17 @@ func TestAndOrOps(t *testing.T) {
[][]*types.T{{types.Bool, types.Bool}},
tc.expected,
colexectestutils.OrderedVerifier,
func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
projOp, closers, err := colexectestutils.CreateTestProjectingOperator(
func(input []colexecop.Operator) (colexecop.Operator, error) {
projOp, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, input[0], []*types.T{types.Bool, types.Bool},
fmt.Sprintf("@1 %s @2", test.operation), testMemAcc,
)
if err != nil {
return nil, nil, err
return nil, err
}
// We will project out the first two columns in order
// to have test cases be less verbose.
return colexecbase.NewSimpleProjectOp(projOp, 3 /* numInputCols */, []uint32{2}), closers, nil
return colexecbase.NewSimpleProjectOp(projOp, 3 /* numInputCols */, []uint32{2}), nil
})
}
})
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/colexec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestBasicBuiltinFunctions(t *testing.T) {
for _, tc := range testCases {
log.Infof(ctx, "%s", tc.desc)
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.inputTuples}, tc.outputTuples, colexectestutils.OrderedVerifier,
func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
func(input []colexecop.Operator) (colexecop.Operator, error) {
return colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, input[0], tc.inputTypes, tc.expr, testMemAcc,
)
Expand Down Expand Up @@ -168,11 +168,10 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b

typs := []*types.T{types.Int}
source := colexecop.NewRepeatableBatchSource(testAllocator, batch, typs)
op, closers, err := colexectestutils.CreateTestProjectingOperator(
op, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, source, typs, "abs(@1)" /* projectingExpr */, testMemAcc,
)
require.NoError(b, err)
require.Nil(b, closers)
op.Init(ctx)

b.SetBytes(int64(8 * coldata.BatchSize()))
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ func TestCaseOp(t *testing.T) {
inputTypes: []*types.T{types.Int},
},
} {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
caseOp, closers, err := colexectestutils.CreateTestProjectingOperator(
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) {
caseOp, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc,
)
if err != nil {
return nil, nil, err
return nil, err
}
// We will project out the input columns in order to have test
// cases be less verbose.
return colexecbase.NewSimpleProjectOp(caseOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), closers, nil
return colexecbase.NewSimpleProjectOp(caseOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/coalesce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ func TestCoalesceBasic(t *testing.T) {
runTests(
t, testAllocator, []colexectestutils.Tuples{tc.tuples}, [][]*types.T{tc.inputTypes},
tc.expected, colexectestutils.OrderedVerifier,
func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
coalesceOp, closers, err := colexectestutils.CreateTestProjectingOperator(
func(inputs []colexecop.Operator) (colexecop.Operator, error) {
coalesceOp, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc,
)
if err != nil {
return nil, nil, err
return nil, err
}
// We will project out the input columns in order to have test
// cases be less verbose.
return colexecbase.NewSimpleProjectOp(coalesceOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), closers, nil
return colexecbase.NewSimpleProjectOp(coalesceOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecbase/ordinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func BenchmarkOrdinality(b *testing.B) {

func createTestOrdinalityOperator(
ctx context.Context, flowCtx *execinfra.FlowCtx, input colexecop.Operator, inputTypes []*types.T,
) (colexecop.Operator, colexecop.Closers, error) {
) (colexecop.Operator, error) {
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}},
Core: execinfrapb.ProcessorCoreUnion{
Expand All @@ -123,5 +123,5 @@ func createTestOrdinalityOperator(
if err != nil {
return nil, err
}
return result.Root, result.ToClose, nil
return result.Root, nil
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecdisk/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func createDiskBackedSorter(
diskQueueCfg colcontainer.DiskQueueCfg,
testingSemaphore semaphore.Semaphore,
monitorRegistry *colexecargs.MonitorRegistry,
) (colexecop.Operator, colexecop.Closers, error) {
) (colexecop.Operator, []colexecop.Closer, error) {
sorterSpec := &execinfrapb.SorterSpec{
OutputOrdering: execinfrapb.Ordering{Columns: ordCols},
OrderingMatchLen: uint32(matchLen),
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/colexectestutils/proj_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ func CreateTestProjectingOperator(
inputTypes []*types.T,
projectingExpr string,
testMemAcc *mon.BoundAccount,
) (colexecop.Operator, colexecop.Closers, error) {
) (colexecop.Operator, error) {
expr, err := parser.ParseExpr(projectingExpr)
if err != nil {
return nil, nil, err
return nil, err
}
p := &MockTypeContext{Typs: inputTypes}
semaCtx := tree.MakeSemaContext()
semaCtx.IVarContainer = p
typedExpr, err := tree.TypeCheck(ctx, expr, &semaCtx, types.Any)
if err != nil {
return nil, nil, err
return nil, err
}
renderExprs := make([]execinfrapb.Expression, len(inputTypes)+1)
for i := range inputTypes {
Expand All @@ -100,7 +100,7 @@ func CreateTestProjectingOperator(
}
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, nil, err
return nil, err
}
return result.Root, result.ToClose, nil
return result.Root, nil
}
48 changes: 24 additions & 24 deletions pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func maybeHasNulls(b coldata.Batch) bool {

// TestRunner is the signature of RunTestsWithTyps that can be used to
// substitute it with RunTestsWithoutAllNullsInjection when applicable.
type TestRunner func(*testing.T, *colmem.Allocator, []Tuples, [][]*types.T, Tuples, VerifierType, func([]colexecop.Operator) (colexecop.Operator, colexecop.Closers, error))
type TestRunner func(*testing.T, *colmem.Allocator, []Tuples, [][]*types.T, Tuples, VerifierType, func([]colexecop.Operator) (colexecop.Operator, error))

// RunTests is a helper that automatically runs your tests with varied batch
// sizes and with and without a random selection vector.
Expand All @@ -320,7 +320,7 @@ func RunTests(
tups []Tuples,
expected Tuples,
verifier VerifierType,
constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error),
constructor func(inputs []colexecop.Operator) (colexecop.Operator, error),
) {
RunTestsWithTyps(t, allocator, tups, nil /* typs */, expected, verifier, constructor)
}
Expand All @@ -337,7 +337,7 @@ func RunTestsWithTyps(
typs [][]*types.T,
expected Tuples,
verifier VerifierType,
constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error),
constructor func(inputs []colexecop.Operator) (colexecop.Operator, error),
) {
RunTestsWithOrderedCols(t, allocator, tups, typs, expected, verifier, nil /* orderedCols */, constructor)
}
Expand All @@ -353,7 +353,7 @@ func RunTestsWithOrderedCols(
expected Tuples,
verifier VerifierType,
orderedCols []uint32,
constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error),
constructor func(inputs []colexecop.Operator) (colexecop.Operator, error),
) {
RunTestsWithoutAllNullsInjectionWithErrorHandler(t, allocator, tups, typs, expected, verifier, constructor, func(err error) { t.Fatal(err) }, orderedCols)

Expand All @@ -377,7 +377,7 @@ func RunTestsWithOrderedCols(
}
}
}
opConstructor := func(injectAllNulls bool) (colexecop.Operator, colexecop.Closers) {
opConstructor := func(injectAllNulls bool) colexecop.Operator {
inputSources := make([]colexecop.Operator, len(tups))
var inputTypes []*types.T
for i, tup := range tups {
Expand All @@ -388,17 +388,15 @@ func RunTestsWithOrderedCols(
input.injectAllNulls = injectAllNulls
inputSources[i] = input
}
op, closers, err := constructor(inputSources)
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
}
op.Init(ctx)
return op, closers
return op
}
originalOp, originalClosers := opConstructor(false /* injectAllNulls */)
opWithNulls, withNullsClosers := opConstructor(true /* injectAllNulls */)
defer closeClosers(t, originalClosers)
defer closeClosers(t, withNullsClosers)
originalOp := opConstructor(false /* injectAllNulls */)
opWithNulls := opConstructor(true /* injectAllNulls */)
foundDifference := false
for {
originalBatch := originalOp.Next()
Expand Down Expand Up @@ -431,16 +429,18 @@ func RunTestsWithOrderedCols(
"non-nulls in the input tuples, we expect for all nulls injection to "+
"change the output")
}
closeIfCloser(t, originalOp)
closeIfCloser(t, opWithNulls)
}
}

// closeClosers is a testing utility function that closes a set of
// colexecop.Closers created for a test.
// closeIfCloser is a testing utility function that checks whether op is a
// colexecop.Closer and closes it if so.
//
// RunTests harness needs to do that once it is done with the op(s). In non-test
// RunTests harness needs to do that once it is done with op. In non-test
// setting, the closing happens at the end of the query execution.
func closeClosers(t *testing.T, closers colexecop.Closers) {
for _, c := range closers {
func closeIfCloser(t *testing.T, op colexecop.Operator) {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(context.Background()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func RunTestsWithoutAllNullsInjection(
typs [][]*types.T,
expected Tuples,
verifier VerifierType,
constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error),
constructor func(inputs []colexecop.Operator) (colexecop.Operator, error),
) {
RunTestsWithoutAllNullsInjectionWithErrorHandler(t, allocator, tups, typs, expected, verifier, constructor, func(err error) { t.Fatal(err) }, nil /* orderedCols */)
}
Expand All @@ -495,7 +495,7 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
typs [][]*types.T,
expected Tuples,
verifier VerifierType,
constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error),
constructor func(inputs []colexecop.Operator) (colexecop.Operator, error),
errorHandler func(error),
orderedCols []uint32,
) {
Expand All @@ -521,11 +521,10 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
}
}
RunTestsWithFn(t, allocator, tups, typs, func(t *testing.T, inputs []colexecop.Operator) {
op, closers, err := constructor(inputs)
op, err := constructor(inputs)
if err != nil {
t.Fatal(err)
}
defer closeClosers(t, closers)
out := NewOpTestOutput(op, expected)
if len(typs) > 0 {
out.typs = typs[0]
Expand All @@ -541,6 +540,7 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
errorHandler(err)
}
}
closeIfCloser(t, op)
})

if !skipVerifySelAndNullsResets {
Expand All @@ -564,12 +564,12 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
}
inputSources[i] = NewOpTestInput(allocator, 1 /* batchSize */, tup, inputTypes)
}
op, closers, err := constructor(inputSources)
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
}
// We might short-circuit, so defer the closing of the operator.
defer closeClosers(t, closers)
defer closeIfCloser(t, op)
op.Init(ctx)
// NOTE: this test makes sense only if the operator returns two
// non-zero length batches (if not, we short-circuit the test since
Expand Down Expand Up @@ -640,18 +640,18 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
input.injectRandomNulls = true
inputSources[i] = input
}
op, closers, err := constructor(inputSources)
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
}
defer closeClosers(t, closers)
if err = colexecerror.CatchVectorizedRuntimeError(func() {
op.Init(ctx)
for b := op.Next(); b.Length() > 0; b = op.Next() {
}
}); err != nil {
errorHandler(err)
}
closeIfCloser(t, op)
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/colexec/colexecwindow/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,8 +1034,9 @@ func TestWindowFunctions(t *testing.T) {
},
} {
log.Infof(ctx, "spillForced=%t/%s", spillForced, tc.windowerSpec.WindowFns[0].Func.String())
var toClose []colexecop.Closers
var semsToCheck []semaphore.Semaphore
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, error) {
tc.init()
ct := make([]*types.T, len(tc.tuples[0]))
for i := range ct {
Expand Down Expand Up @@ -1078,8 +1079,14 @@ func TestWindowFunctions(t *testing.T) {
}
semsToCheck = append(semsToCheck, sem)
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
return result.Root, result.ToClose, err
toClose = append(toClose, result.ToClose)
return result.Root, err
})
// Close all closers manually (in production this is done on the
// flow cleanup).
for _, c := range toClose {
require.NoError(t, c.Close(ctx))
}
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestCount(t *testing.T) {
for _, tc := range tcs {
// The tuples consisting of all nulls still count as separate rows, so if
// we replace all values with nulls, we should get the same output.
colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
return NewCountOp(testAllocator, input[0]), nil, nil
colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewCountOp(testAllocator, input[0]), nil
})
}
}
7 changes: 3 additions & 4 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestCrossJoiner(t *testing.T) {
for _, tc := range getCJTestCases() {
for _, tc := range tc.mutateTypes() {
log.Infof(ctx, "spillForced=%t", spillForced)
runHashJoinTestCase(t, tc, nil /* rng */, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) {
runHashJoinTestCase(t, tc, nil /* rng */, func(sources []colexecop.Operator) (colexecop.Operator, error) {
spec := createSpecForHashJoiner(tc)
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Expand All @@ -401,9 +401,9 @@ func TestCrossJoiner(t *testing.T) {
}
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, nil, err
return nil, err
}
return result.Root, result.ToClose, nil
return result.Root, nil
})
}
}
Expand Down Expand Up @@ -476,7 +476,6 @@ func BenchmarkCrossJoiner(b *testing.B) {
args.Inputs[1].Root = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows)
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
require.NoError(b, err)
require.Nil(b, result.ToClose)
cj := result.Root
cj.Init(ctx)
for b := cj.Next(); b.Length() > 0; b = cj.Next() {
Expand Down
Loading

0 comments on commit 06691b7

Please sign in to comment.