diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index eebc29ac9fca..17a91211fe24 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -824,7 +824,7 @@ func TestAggregators(t *testing.T) { verifier = colexectestutils.UnorderedVerifier } colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { args := &colexecagg.NewAggregatorArgs{ Allocator: testAllocator, MemAccount: testMemAcc, @@ -837,7 +837,7 @@ func TestAggregators(t *testing.T) { OutputTypes: outputTypes, } args.TestingKnobs.HashTableNumBuckets = uint32(1 + rng.Intn(7)) - return agg.new(ctx, args), nil, nil + return agg.new(ctx, args), nil }) } } diff --git a/pkg/sql/colexec/and_or_projection_test.go b/pkg/sql/colexec/and_or_projection_test.go index 99ace34d2fd9..b0c21a001872 100644 --- a/pkg/sql/colexec/and_or_projection_test.go +++ b/pkg/sql/colexec/and_or_projection_test.go @@ -200,17 +200,17 @@ func TestAndOrOps(t *testing.T) { [][]*types.T{{types.Bool, types.Bool}}, tc.expected, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - projOp, closers, err := colexectestutils.CreateTestProjectingOperator( + func(input []colexecop.Operator) (colexecop.Operator, error) { + projOp, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, input[0], []*types.T{types.Bool, types.Bool}, fmt.Sprintf("@1 %s @2", test.operation), testMemAcc, ) if err != nil { - return nil, nil, err + return nil, err } // We will project out the first two columns in order // to have test cases be less verbose. - return colexecbase.NewSimpleProjectOp(projOp, 3 /* numInputCols */, []uint32{2}), closers, nil + return colexecbase.NewSimpleProjectOp(projOp, 3 /* numInputCols */, []uint32{2}), nil }) } }) diff --git a/pkg/sql/colexec/builtin_funcs_test.go b/pkg/sql/colexec/builtin_funcs_test.go index 77acd5bd01c1..99ad9863cf58 100644 --- a/pkg/sql/colexec/builtin_funcs_test.go +++ b/pkg/sql/colexec/builtin_funcs_test.go @@ -115,7 +115,7 @@ func TestBasicBuiltinFunctions(t *testing.T) { for _, tc := range testCases { log.Infof(ctx, "%s", tc.desc) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.inputTuples}, tc.outputTuples, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { return colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, input[0], tc.inputTypes, tc.expr, testMemAcc, ) @@ -168,11 +168,10 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b typs := []*types.T{types.Int} source := colexecop.NewRepeatableBatchSource(testAllocator, batch, typs) - op, closers, err := colexectestutils.CreateTestProjectingOperator( + op, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, source, typs, "abs(@1)" /* projectingExpr */, testMemAcc, ) require.NoError(b, err) - require.Nil(b, closers) op.Init(ctx) b.SetBytes(int64(8 * coldata.BatchSize())) diff --git a/pkg/sql/colexec/case_test.go b/pkg/sql/colexec/case_test.go index dc9bcab3f5d4..6b26124d2317 100644 --- a/pkg/sql/colexec/case_test.go +++ b/pkg/sql/colexec/case_test.go @@ -84,16 +84,16 @@ func TestCaseOp(t *testing.T) { inputTypes: []*types.T{types.Int}, }, } { - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - caseOp, closers, err := colexectestutils.CreateTestProjectingOperator( + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { + caseOp, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc, ) if err != nil { - return nil, nil, err + return nil, err } // We will project out the input columns in order to have test // cases be less verbose. - return colexecbase.NewSimpleProjectOp(caseOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), closers, nil + return colexecbase.NewSimpleProjectOp(caseOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil }) } } diff --git a/pkg/sql/colexec/coalesce_test.go b/pkg/sql/colexec/coalesce_test.go index bf36e4f698dd..bae0a6e260cf 100644 --- a/pkg/sql/colexec/coalesce_test.go +++ b/pkg/sql/colexec/coalesce_test.go @@ -85,16 +85,16 @@ func TestCoalesceBasic(t *testing.T) { runTests( t, testAllocator, []colexectestutils.Tuples{tc.tuples}, [][]*types.T{tc.inputTypes}, tc.expected, colexectestutils.OrderedVerifier, - func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - coalesceOp, closers, err := colexectestutils.CreateTestProjectingOperator( + func(inputs []colexecop.Operator) (colexecop.Operator, error) { + coalesceOp, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc, ) if err != nil { - return nil, nil, err + return nil, err } // We will project out the input columns in order to have test // cases be less verbose. - return colexecbase.NewSimpleProjectOp(coalesceOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), closers, nil + return colexecbase.NewSimpleProjectOp(coalesceOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil }) } } diff --git a/pkg/sql/colexec/colexecbase/ordinality_test.go b/pkg/sql/colexec/colexecbase/ordinality_test.go index 4cd1f2f4c476..48bfebdf283e 100644 --- a/pkg/sql/colexec/colexecbase/ordinality_test.go +++ b/pkg/sql/colexec/colexecbase/ordinality_test.go @@ -106,7 +106,7 @@ func BenchmarkOrdinality(b *testing.B) { func createTestOrdinalityOperator( ctx context.Context, flowCtx *execinfra.FlowCtx, input colexecop.Operator, inputTypes []*types.T, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, error) { spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, Core: execinfrapb.ProcessorCoreUnion{ @@ -123,5 +123,5 @@ func createTestOrdinalityOperator( if err != nil { return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil } diff --git a/pkg/sql/colexec/colexecdisk/external_sort_test.go b/pkg/sql/colexec/colexecdisk/external_sort_test.go index 5a1dd90318b1..d0e661339149 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort_test.go +++ b/pkg/sql/colexec/colexecdisk/external_sort_test.go @@ -206,7 +206,7 @@ func createDiskBackedSorter( diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), diff --git a/pkg/sql/colexec/colexectestutils/proj_utils.go b/pkg/sql/colexec/colexectestutils/proj_utils.go index 010402ee686e..6630e10c8279 100644 --- a/pkg/sql/colexec/colexectestutils/proj_utils.go +++ b/pkg/sql/colexec/colexectestutils/proj_utils.go @@ -66,17 +66,17 @@ func CreateTestProjectingOperator( inputTypes []*types.T, projectingExpr string, testMemAcc *mon.BoundAccount, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, error) { expr, err := parser.ParseExpr(projectingExpr) if err != nil { - return nil, nil, err + return nil, err } p := &MockTypeContext{Typs: inputTypes} semaCtx := tree.MakeSemaContext() semaCtx.IVarContainer = p typedExpr, err := tree.TypeCheck(ctx, expr, &semaCtx, types.Any) if err != nil { - return nil, nil, err + return nil, err } renderExprs := make([]execinfrapb.Expression, len(inputTypes)+1) for i := range inputTypes { @@ -100,7 +100,7 @@ func CreateTestProjectingOperator( } result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) if err != nil { - return nil, nil, err + return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil } diff --git a/pkg/sql/colexec/colexectestutils/utils.go b/pkg/sql/colexec/colexectestutils/utils.go index 83a860aa7dd7..6984f85774b0 100644 --- a/pkg/sql/colexec/colexectestutils/utils.go +++ b/pkg/sql/colexec/colexectestutils/utils.go @@ -306,7 +306,7 @@ func maybeHasNulls(b coldata.Batch) bool { // TestRunner is the signature of RunTestsWithTyps that can be used to // substitute it with RunTestsWithoutAllNullsInjection when applicable. -type TestRunner func(*testing.T, *colmem.Allocator, []Tuples, [][]*types.T, Tuples, VerifierType, func([]colexecop.Operator) (colexecop.Operator, colexecop.Closers, error)) +type TestRunner func(*testing.T, *colmem.Allocator, []Tuples, [][]*types.T, Tuples, VerifierType, func([]colexecop.Operator) (colexecop.Operator, error)) // RunTests is a helper that automatically runs your tests with varied batch // sizes and with and without a random selection vector. @@ -320,7 +320,7 @@ func RunTests( tups []Tuples, expected Tuples, verifier VerifierType, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), ) { RunTestsWithTyps(t, allocator, tups, nil /* typs */, expected, verifier, constructor) } @@ -337,7 +337,7 @@ func RunTestsWithTyps( typs [][]*types.T, expected Tuples, verifier VerifierType, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), ) { RunTestsWithOrderedCols(t, allocator, tups, typs, expected, verifier, nil /* orderedCols */, constructor) } @@ -353,7 +353,7 @@ func RunTestsWithOrderedCols( expected Tuples, verifier VerifierType, orderedCols []uint32, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), ) { RunTestsWithoutAllNullsInjectionWithErrorHandler(t, allocator, tups, typs, expected, verifier, constructor, func(err error) { t.Fatal(err) }, orderedCols) @@ -377,7 +377,7 @@ func RunTestsWithOrderedCols( } } } - opConstructor := func(injectAllNulls bool) (colexecop.Operator, colexecop.Closers) { + opConstructor := func(injectAllNulls bool) colexecop.Operator { inputSources := make([]colexecop.Operator, len(tups)) var inputTypes []*types.T for i, tup := range tups { @@ -388,17 +388,15 @@ func RunTestsWithOrderedCols( input.injectAllNulls = injectAllNulls inputSources[i] = input } - op, closers, err := constructor(inputSources) + op, err := constructor(inputSources) if err != nil { t.Fatal(err) } op.Init(ctx) - return op, closers + return op } - originalOp, originalClosers := opConstructor(false /* injectAllNulls */) - opWithNulls, withNullsClosers := opConstructor(true /* injectAllNulls */) - defer closeClosers(t, originalClosers) - defer closeClosers(t, withNullsClosers) + originalOp := opConstructor(false /* injectAllNulls */) + opWithNulls := opConstructor(true /* injectAllNulls */) foundDifference := false for { originalBatch := originalOp.Next() @@ -431,16 +429,18 @@ func RunTestsWithOrderedCols( "non-nulls in the input tuples, we expect for all nulls injection to "+ "change the output") } + closeIfCloser(t, originalOp) + closeIfCloser(t, opWithNulls) } } -// closeClosers is a testing utility function that closes a set of -// colexecop.Closers created for a test. +// closeIfCloser is a testing utility function that checks whether op is a +// colexecop.Closer and closes it if so. // -// RunTests harness needs to do that once it is done with the op(s). In non-test +// RunTests harness needs to do that once it is done with op. In non-test // setting, the closing happens at the end of the query execution. -func closeClosers(t *testing.T, closers colexecop.Closers) { - for _, c := range closers { +func closeIfCloser(t *testing.T, op colexecop.Operator) { + if c, ok := op.(colexecop.Closer); ok { if err := c.Close(context.Background()); err != nil { t.Fatal(err) } @@ -473,7 +473,7 @@ func RunTestsWithoutAllNullsInjection( typs [][]*types.T, expected Tuples, verifier VerifierType, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), ) { RunTestsWithoutAllNullsInjectionWithErrorHandler(t, allocator, tups, typs, expected, verifier, constructor, func(err error) { t.Fatal(err) }, nil /* orderedCols */) } @@ -495,7 +495,7 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( typs [][]*types.T, expected Tuples, verifier VerifierType, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), errorHandler func(error), orderedCols []uint32, ) { @@ -521,11 +521,10 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( } } RunTestsWithFn(t, allocator, tups, typs, func(t *testing.T, inputs []colexecop.Operator) { - op, closers, err := constructor(inputs) + op, err := constructor(inputs) if err != nil { t.Fatal(err) } - defer closeClosers(t, closers) out := NewOpTestOutput(op, expected) if len(typs) > 0 { out.typs = typs[0] @@ -541,6 +540,7 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( errorHandler(err) } } + closeIfCloser(t, op) }) if !skipVerifySelAndNullsResets { @@ -564,12 +564,12 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( } inputSources[i] = NewOpTestInput(allocator, 1 /* batchSize */, tup, inputTypes) } - op, closers, err := constructor(inputSources) + op, err := constructor(inputSources) if err != nil { t.Fatal(err) } // We might short-circuit, so defer the closing of the operator. - defer closeClosers(t, closers) + defer closeIfCloser(t, op) op.Init(ctx) // NOTE: this test makes sense only if the operator returns two // non-zero length batches (if not, we short-circuit the test since @@ -640,11 +640,10 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( input.injectRandomNulls = true inputSources[i] = input } - op, closers, err := constructor(inputSources) + op, err := constructor(inputSources) if err != nil { t.Fatal(err) } - defer closeClosers(t, closers) if err = colexecerror.CatchVectorizedRuntimeError(func() { op.Init(ctx) for b := op.Next(); b.Length() > 0; b = op.Next() { @@ -652,6 +651,7 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler( }); err != nil { errorHandler(err) } + closeIfCloser(t, op) } } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 2e97b1a70cb4..eb993dbaf381 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -1034,8 +1034,9 @@ func TestWindowFunctions(t *testing.T) { }, } { log.Infof(ctx, "spillForced=%t/%s", spillForced, tc.windowerSpec.WindowFns[0].Func.String()) + var toClose []colexecop.Closers var semsToCheck []semaphore.Semaphore - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, error) { tc.init() ct := make([]*types.T, len(tc.tuples[0])) for i := range ct { @@ -1078,8 +1079,14 @@ func TestWindowFunctions(t *testing.T) { } semsToCheck = append(semsToCheck, sem) result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + toClose = append(toClose, result.ToClose) + return result.Root, err }) + // Close all closers manually (in production this is done on the + // flow cleanup). + for _, c := range toClose { + require.NoError(t, c.Close(ctx)) + } for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } diff --git a/pkg/sql/colexec/count_test.go b/pkg/sql/colexec/count_test.go index 1ede8edb68be..5497aba9ab36 100644 --- a/pkg/sql/colexec/count_test.go +++ b/pkg/sql/colexec/count_test.go @@ -38,8 +38,8 @@ func TestCount(t *testing.T) { for _, tc := range tcs { // The tuples consisting of all nulls still count as separate rows, so if // we replace all values with nulls, we should get the same output. - colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewCountOp(testAllocator, input[0]), nil, nil + colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewCountOp(testAllocator, input[0]), nil }) } } diff --git a/pkg/sql/colexec/crossjoiner_test.go b/pkg/sql/colexec/crossjoiner_test.go index 7f5af77d5e19..ffbc4417d143 100644 --- a/pkg/sql/colexec/crossjoiner_test.go +++ b/pkg/sql/colexec/crossjoiner_test.go @@ -389,7 +389,7 @@ func TestCrossJoiner(t *testing.T) { for _, tc := range getCJTestCases() { for _, tc := range tc.mutateTypes() { log.Infof(ctx, "spillForced=%t", spillForced) - runHashJoinTestCase(t, tc, nil /* rng */, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + runHashJoinTestCase(t, tc, nil /* rng */, func(sources []colexecop.Operator) (colexecop.Operator, error) { spec := createSpecForHashJoiner(tc) args := &colexecargs.NewColOperatorArgs{ Spec: spec, @@ -401,9 +401,9 @@ func TestCrossJoiner(t *testing.T) { } result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) if err != nil { - return nil, nil, err + return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil }) } } @@ -476,7 +476,6 @@ func BenchmarkCrossJoiner(b *testing.B) { args.Inputs[1].Root = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows) result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) require.NoError(b, err) - require.Nil(b, result.ToClose) cj := result.Root cj.Init(ctx) for b := cj.Next(); b.Length() > 0; b = cj.Next() { diff --git a/pkg/sql/colexec/default_agg_test.go b/pkg/sql/colexec/default_agg_test.go index 65daaf3bb172..f1bb1966825c 100644 --- a/pkg/sql/colexec/default_agg_test.go +++ b/pkg/sql/colexec/default_agg_test.go @@ -147,7 +147,7 @@ func TestDefaultAggregateFunc(t *testing.T) { context.Background(), &evalCtx, &semaCtx, tc.spec.Aggregations, tc.typs, ) require.NoError(t, err) - colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { return agg.new(context.Background(), &colexecagg.NewAggregatorArgs{ Allocator: testAllocator, MemAccount: testMemAcc, @@ -158,7 +158,7 @@ func TestDefaultAggregateFunc(t *testing.T) { Constructors: constructors, ConstArguments: constArguments, OutputTypes: outputTypes, - }), nil, nil + }), nil }) }) } diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index cc3c59cf5521..75f022f69413 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -333,7 +333,7 @@ func mustParseJSON(s string) json.JSON { func (tc *distinctTestCase) runTests( t *testing.T, verifier colexectestutils.VerifierType, - constructor func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + constructor func(inputs []colexecop.Operator) (colexecop.Operator, error), ) { if tc.errorOnDup == "" { colexectestutils.RunTestsWithTyps( @@ -354,7 +354,7 @@ func (tc *distinctTestCase) runTests( // lower bound because in some cases we will reset the operator chain // for a second run, and the constructor won't get called then. var numConstructorCalls int - instrumentedConstructor := func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + instrumentedConstructor := func(inputs []colexecop.Operator) (colexecop.Operator, error) { numConstructorCalls++ return constructor(inputs) } @@ -391,12 +391,12 @@ func TestDistinct(t *testing.T) { rng, _ := randutil.NewTestRand() for _, tc := range distinctTestCases { log.Infof(context.Background(), "unordered") - tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { ud := NewUnorderedDistinct( testAllocator, input[0], tc.distinctCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, ) ud.(*UnorderedDistinct).hashTableNumBuckets = uint32(1 + rng.Intn(7)) - return ud, nil, nil + return ud, nil }) if tc.isOrderedOnDistinctCols { for numOrderedCols := 1; numOrderedCols < len(tc.distinctCols); numOrderedCols++ { @@ -405,16 +405,15 @@ func TestDistinct(t *testing.T) { for i, j := range rng.Perm(len(tc.distinctCols))[:numOrderedCols] { orderedCols[i] = tc.distinctCols[j] } - tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - op, err := newPartiallyOrderedDistinct( + tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return newPartiallyOrderedDistinct( testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, ) - return op, nil, err }) } log.Info(context.Background(), "ordered") - tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return colexecbase.NewOrderedDistinct(input[0], tc.distinctCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup), nil, nil + tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return colexecbase.NewOrderedDistinct(input[0], tc.distinctCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup), nil }) } } @@ -444,10 +443,10 @@ func TestUnorderedDistinctRandom(t *testing.T) { } tups, expected := generateRandomDataForUnorderedDistinct(rng, nTuples, nCols, newTupleProbability) colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tups}, [][]*types.T{typs}, expected, colexectestutils.UnorderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { ud := NewUnorderedDistinct(testAllocator, input[0], distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */) ud.(*UnorderedDistinct).hashTableNumBuckets = uint32(1 + rng.Intn(7)) - return ud, nil, nil + return ud, nil }, ) } diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 65ffd6df5ff3..3e5298c8d455 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -79,7 +79,7 @@ func TestExternalDistinct(t *testing.T) { // disk-backed sort must also be added as Closers. numExpectedClosers += 2 } - tc.runTests(t, verifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + tc.runTests(t, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. @@ -91,7 +91,7 @@ func TestExternalDistinct(t *testing.T) { &monitorRegistry, ) require.Equal(t, numExpectedClosers, len(closers)) - return distinct, closers, err + return distinct, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -180,7 +180,7 @@ func TestExternalDistinctSpilling(t *testing.T) { // tups and expected are in an arbitrary order, so we use an unordered // verifier. colexectestutils.UnorderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { // Since we're giving very low memory limit to the operator, in // order to make the test run faster, we'll use an unlimited number // of file descriptors. @@ -198,7 +198,7 @@ func TestExternalDistinctSpilling(t *testing.T) { numExpectedClosers := 4 require.Equal(t, numExpectedClosers, len(closers)) numRuns++ - return distinct, closers, nil + return distinct, nil }, ) for i, sem := range semsToCheck { @@ -334,7 +334,7 @@ func createExternalDistinct( spillingCallbackFn func(), numForcedRepartitions int, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { distinctSpec := &execinfrapb.DistinctSpec{ DistinctColumns: distinctCols, NullsAreDistinct: nullsAreDistinct, diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index e3cf95ae8ab0..dc1d6e8d4f0b 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -126,7 +126,7 @@ func TestExternalHashAggregator(t *testing.T) { numExpectedClosers = 1 } var semsToCheck []semaphore.Semaphore - colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { // ehaNumRequiredFDs is the minimum number of file descriptors // that are needed for the machinery of the external aggregator // (plus 1 is needed for the in-memory hash aggregator in order @@ -155,7 +155,7 @@ func TestExternalHashAggregator(t *testing.T) { _, isHashAgg := MaybeUnwrapInvariantsChecker(op).(*hashAggregator) require.True(t, isHashAgg) } - return op, closers, err + return op, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -234,7 +234,7 @@ func createExternalHashAggregator( testingSemaphore semaphore.Semaphore, numForcedRepartitions int, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: newAggArgs.InputTypes}}, Core: execinfrapb.ProcessorCoreUnion{ diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index 9bd87f7ff996..a1cf1c503742 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -77,7 +77,7 @@ func TestExternalHashJoiner(t *testing.T) { // allNullsInjection test for now. tc.skipAllNullsInjection = true } - runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) { sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) semsToCheck = append(semsToCheck, sem) spec := createSpecForHashJoiner(tc) @@ -91,7 +91,7 @@ func TestExternalHashJoiner(t *testing.T) { // - 1 for the external hash joiner // - 2 for each of the external sorts (4 total here). require.Equal(t, 6, len(closers)) - return hjOp, closers, err + return hjOp, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -297,7 +297,7 @@ func createDiskBackedHashJoiner( delegateFDAcquisitions bool, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { args := &colexecargs.NewColOperatorArgs{ Spec: spec, Inputs: colexectestutils.MakeInputs(sources), diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index c9217f933c7a..5ad2a9e29487 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -81,7 +81,7 @@ func TestExternalSort(t *testing.T) { [][]*types.T{tc.typs}, tc.expected, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. @@ -102,7 +102,7 @@ func TestExternalSort(t *testing.T) { // Check that the sort as well as the disk spiller were // added as Closers. require.Equal(t, 2, len(closers)) - return sorter, closers, err + return sorter, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -190,7 +190,7 @@ func TestExternalSortRandomized(t *testing.T) { []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { sem := colexecop.NewTestingSemaphore(colexecop.ExternalSorterMinPartitions) semsToCheck = append(semsToCheck, sem) sorter, closers, err := createDiskBackedSorter( @@ -202,7 +202,7 @@ func TestExternalSortRandomized(t *testing.T) { // TODO(asubiotto): Explicitly Close when testing.T is passed into // this constructor and we do a substring match. require.Equal(t, 2, len(closers)) - return sorter, closers, err + return sorter, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -312,7 +312,7 @@ func createDiskBackedSorter( diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), diff --git a/pkg/sql/colexec/hash_aggregator_test.go b/pkg/sql/colexec/hash_aggregator_test.go index 677edec77c82..d4dc72e04939 100644 --- a/pkg/sql/colexec/hash_aggregator_test.go +++ b/pkg/sql/colexec/hash_aggregator_test.go @@ -431,7 +431,7 @@ func TestHashAggregator(t *testing.T) { verifier = colexectestutils.PartialOrderedVerifier } colexectestutils.RunTestsWithOrderedCols(t, testAllocator, []colexectestutils.Tuples{tc.input}, typs, tc.expected, verifier, tc.orderedCols, - func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(sources []colexecop.Operator) (colexecop.Operator, error) { args := &colexecagg.NewAggregatorArgs{ Allocator: testAllocator, MemAccount: testMemAcc, @@ -453,7 +453,7 @@ func TestHashAggregator(t *testing.T) { MaxOutputBatchMemSize: math.MaxInt64, }, nil, /* newSpillingQueueArgs */ - ), nil, nil + ), nil }) } } diff --git a/pkg/sql/colexec/hash_group_joiner_test.go b/pkg/sql/colexec/hash_group_joiner_test.go index 1bff1b129cff..d02d4d98c904 100644 --- a/pkg/sql/colexec/hash_group_joiner_test.go +++ b/pkg/sql/colexec/hash_group_joiner_test.go @@ -157,7 +157,7 @@ func TestHashGroupJoiner(t *testing.T) { var spilled bool colexectestutils.RunTests( t, testAllocator, []colexectestutils.Tuples{tc.jtc.leftTuples, tc.jtc.rightTuples}, tc.atc.expected, colexectestutils.UnorderedVerifier, - func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(inputs []colexecop.Operator) (colexecop.Operator, error) { hgjOp, closers, err := createDiskBackedHashGroupJoiner( ctx, flowCtx, tc, inputs, func() { spilled = true }, queueCfg, &monitorRegistry, ) @@ -170,7 +170,7 @@ func TestHashGroupJoiner(t *testing.T) { // - 1: for the external hash aggregator // - 1: for the disk spiller around the hash group joiner. require.Equal(t, 10, len(closers)) - return hgjOp, closers, err + return hgjOp, err }, ) @@ -187,7 +187,7 @@ func createDiskBackedHashGroupJoiner( spillingCallbackFn func(), diskQueueCfg colcontainer.DiskQueueCfg, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, colexecop.Closers, error) { +) (colexecop.Operator, []colexecop.Closer, error) { tc.jtc.init() hjSpec := createSpecForHashJoiner(&tc.jtc) tc.atc.unorderedInput = true diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 29eab204ff85..025240d96aa7 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -983,7 +983,7 @@ func runHashJoinTestCase( t *testing.T, tc *joinTestCase, rng *rand.Rand, - hjOpConstructor func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error), + hjOpConstructor func(sources []colexecop.Operator) (colexecop.Operator, error), ) { tc.init() verifier := colexectestutils.OrderedVerifier @@ -1025,7 +1025,7 @@ func TestHashJoiner(t *testing.T) { for _, tcs := range [][]*joinTestCase{getHJTestCases(), getMJTestCases()} { for _, tc := range tcs { for _, tc := range tc.mutateTypes() { - runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) { spec := createSpecForHashJoiner(tc) args := &colexecargs.NewColOperatorArgs{ Spec: spec, @@ -1036,9 +1036,9 @@ func TestHashJoiner(t *testing.T) { args.TestingKnobs.DiskSpillingDisabled = true result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) if err != nil { - return nil, nil, err + return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil }) } } @@ -1218,7 +1218,6 @@ func TestHashJoinerProjection(t *testing.T) { args.TestingKnobs.DiskSpillingDisabled = true hjOp, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) require.NoError(t, err) - require.Nil(t, hjOp.ToClose) hjOp.Root.Init(ctx) for b := hjOp.Root.Next(); b.Length() > 0; b = hjOp.Root.Next() { // The output types should be {Int64, Int64, Bool, Decimal, Float64, Bytes} diff --git a/pkg/sql/colexec/if_expr_test.go b/pkg/sql/colexec/if_expr_test.go index 659867c797f6..9fc4d75fb2bc 100644 --- a/pkg/sql/colexec/if_expr_test.go +++ b/pkg/sql/colexec/if_expr_test.go @@ -76,16 +76,16 @@ func TestIfExprBasic(t *testing.T) { colexectestutils.RunTestsWithoutAllNullsInjectionWithErrorHandler( t, testAllocator, []colexectestutils.Tuples{tc.tuples}, [][]*types.T{tc.inputTypes}, tc.expected, colexectestutils.OrderedVerifier, - func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - ifExprOp, closers, err := colexectestutils.CreateTestProjectingOperator( + func(inputs []colexecop.Operator) (colexecop.Operator, error) { + ifExprOp, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc, ) if err != nil { - return nil, nil, err + return nil, err } // We will project out the input columns in order to have test // cases be less verbose. - return colexecbase.NewSimpleProjectOp(ifExprOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), closers, nil + return colexecbase.NewSimpleProjectOp(ifExprOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil }, // Random nulls injection can lead to a division by zero error in // some test cases, so we want to skip it. diff --git a/pkg/sql/colexec/is_null_ops_test.go b/pkg/sql/colexec/is_null_ops_test.go index 115ef1be9911..a108cc3d3967 100644 --- a/pkg/sql/colexec/is_null_ops_test.go +++ b/pkg/sql/colexec/is_null_ops_test.go @@ -124,7 +124,7 @@ func TestIsNullProjOp(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) - opConstructor := func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + opConstructor := func(input []colexecop.Operator) (colexecop.Operator, error) { return colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, input[0], []*types.T{types.Int}, fmt.Sprintf("@1 %s", c.projExpr), testMemAcc, @@ -231,7 +231,7 @@ func TestIsNullSelOp(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) - opConstructor := func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + opConstructor := func(sources []colexecop.Operator) (colexecop.Operator, error) { typs := []*types.T{types.Int} spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, @@ -249,9 +249,9 @@ func TestIsNullSelOp(t *testing.T) { } result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) if err != nil { - return nil, nil, err + return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil } colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, c.outputTuples, colexectestutils.OrderedVerifier, opConstructor) } diff --git a/pkg/sql/colexec/limit_test.go b/pkg/sql/colexec/limit_test.go index 644e30d73e14..1c0460d0e817 100644 --- a/pkg/sql/colexec/limit_test.go +++ b/pkg/sql/colexec/limit_test.go @@ -67,8 +67,8 @@ func TestLimit(t *testing.T) { for _, tc := range tcs { // The tuples consisting of all nulls still count as separate rows, so if // we replace all values with nulls, we should get the same output. - colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewLimitOp(input[0], tc.limit), nil, nil + colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewLimitOp(input[0], tc.limit), nil }) } } diff --git a/pkg/sql/colexec/mergejoiner_test.go b/pkg/sql/colexec/mergejoiner_test.go index 03b726328d5a..bbfcceaf5efd 100644 --- a/pkg/sql/colexec/mergejoiner_test.go +++ b/pkg/sql/colexec/mergejoiner_test.go @@ -1699,7 +1699,7 @@ func TestMergeJoiner(t *testing.T) { runner(t, testAllocator, []colexectestutils.Tuples{tc.leftTuples, tc.rightTuples}, [][]*types.T{tc.leftTypes, tc.rightTypes}, tc.expected, verifier, - func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(sources []colexecop.Operator) (colexecop.Operator, error) { spec := createSpecForMergeJoiner(tc) args := &colexecargs.NewColOperatorArgs{ Spec: spec, @@ -1712,9 +1712,9 @@ func TestMergeJoiner(t *testing.T) { flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) if err != nil { - return nil, nil, err + return nil, err } - return result.Root, result.ToClose, nil + return result.Root, nil }) } } diff --git a/pkg/sql/colexec/not_expr_ops_test.go b/pkg/sql/colexec/not_expr_ops_test.go index 8034c1c2987c..0a58a8e802dc 100644 --- a/pkg/sql/colexec/not_expr_ops_test.go +++ b/pkg/sql/colexec/not_expr_ops_test.go @@ -83,9 +83,8 @@ func TestNotExprProjOp(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) - opConstructor := func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - op, err := NewNotExprProjOp(types.BoolFamily, testAllocator, input[0], 0, 1) - return op, nil, err + opConstructor := func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewNotExprProjOp(types.BoolFamily, testAllocator, input[0], 0, 1) } colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, [][]*types.T{{types.Bool}}, c.outputTuples, colexectestutils.OrderedVerifier, opConstructor) } @@ -157,9 +156,8 @@ func TestNotExprSelOp(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) - opConstructor := func(sources []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - op, err := NewNotExprSelOp(types.BoolFamily, sources[0], 0) - return op, nil, err + opConstructor := func(sources []colexecop.Operator) (colexecop.Operator, error) { + return NewNotExprSelOp(types.BoolFamily, sources[0], 0) } colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, [][]*types.T{{types.Bool}}, c.outputTuples, colexectestutils.OrderedVerifier, opConstructor) } @@ -189,8 +187,8 @@ func TestNotNullProjOp(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) - opConstructor := func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return newNotNullProjOp(testAllocator, input[0], 0, 1), nil, nil + opConstructor := func(input []colexecop.Operator) (colexecop.Operator, error) { + return newNotNullProjOp(testAllocator, input[0], 0, 1), nil } colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, [][]*types.T{{types.Bool}}, c.outputTuples, colexectestutils.OrderedVerifier, opConstructor) } diff --git a/pkg/sql/colexec/offset_test.go b/pkg/sql/colexec/offset_test.go index baedb060c3d8..62bf3fc3fe32 100644 --- a/pkg/sql/colexec/offset_test.go +++ b/pkg/sql/colexec/offset_test.go @@ -60,8 +60,8 @@ func TestOffset(t *testing.T) { for _, tc := range tcs { // The tuples consisting of all nulls still count as separate rows, so if // we replace all values with nulls, we should get the same output. - colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewOffsetOp(input[0], tc.offset), nil, nil + colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, nil, tc.expected, colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewOffsetOp(input[0], tc.offset), nil }) } } diff --git a/pkg/sql/colexec/ordered_synchronizer_test.go b/pkg/sql/colexec/ordered_synchronizer_test.go index 45de89a05563..737bfbd58d6e 100644 --- a/pkg/sql/colexec/ordered_synchronizer_test.go +++ b/pkg/sql/colexec/ordered_synchronizer_test.go @@ -147,9 +147,8 @@ func TestOrderedSync(t *testing.T) { for i := range typs { typs[i] = types.Int } - colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - op := NewOrderedSynchronizer(&execinfra.FlowCtx{Gateway: true}, 0 /* processorID */, testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */) - return op, colexecop.Closers{op}, nil + colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { + return NewOrderedSynchronizer(&execinfra.FlowCtx{Gateway: true}, 0 /* processorID */, testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil }) } } diff --git a/pkg/sql/colexec/proj_utils_test.go b/pkg/sql/colexec/proj_utils_test.go index 5f01d154b5fe..4797fdaed65e 100644 --- a/pkg/sql/colexec/proj_utils_test.go +++ b/pkg/sql/colexec/proj_utils_test.go @@ -68,11 +68,10 @@ func assertProjOpAgainstRowByRow( ctx := context.Background() input := execinfra.NewRepeatableRowSource(inputTypes, inputRows) columnarizer := NewBufferingColumnarizerForTests(testAllocator, flowCtx, 1 /* processorID */, input) - projOp, closers, err := colexectestutils.CreateTestProjectingOperator( + projOp, err := colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, columnarizer, inputTypes, projExpr, testMemAcc, ) require.NoError(t, err) - require.Nil(t, closers) // We will project out all input columns while keeping only the output // column of the projection operator. op := colexecbase.NewSimpleProjectOp(projOp, len(inputTypes)+1, []uint32{uint32(len(inputTypes))}) diff --git a/pkg/sql/colexec/select_in_test.go b/pkg/sql/colexec/select_in_test.go index b55b92e63646..60eba19f7d0a 100644 --- a/pkg/sql/colexec/select_in_test.go +++ b/pkg/sql/colexec/select_in_test.go @@ -74,7 +74,7 @@ func TestSelectInInt64(t *testing.T) { for _, c := range testCases { log.Infof(context.Background(), "%s", c.desc) - opConstructor := func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + opConstructor := func(input []colexecop.Operator) (colexecop.Operator, error) { op := selectInOpInt64{ OneInputHelper: colexecop.MakeOneInputHelper(input[0]), colIdx: 0, @@ -82,7 +82,7 @@ func TestSelectInInt64(t *testing.T) { negate: c.negate, hasNulls: c.hasNulls, } - return &op, nil, nil + return &op, nil } if !c.hasNulls || !c.negate { colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, c.outputTuples, colexectestutils.OrderedVerifier, opConstructor) @@ -217,7 +217,7 @@ func TestProjectInInt64(t *testing.T) { for _, c := range testCases { log.Infof(ctx, "%s", c.desc) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{c.inputTuples}, c.outputTuples, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(input []colexecop.Operator) (colexecop.Operator, error) { return colexectestutils.CreateTestProjectingOperator( ctx, flowCtx, input[0], []*types.T{types.Int}, fmt.Sprintf("@1 %s", c.inClause), testMemAcc, diff --git a/pkg/sql/colexec/sort_chunks_test.go b/pkg/sql/colexec/sort_chunks_test.go index 65d4beb395ec..2428dce424ce 100644 --- a/pkg/sql/colexec/sort_chunks_test.go +++ b/pkg/sql/colexec/sort_chunks_test.go @@ -188,8 +188,8 @@ func TestSortChunks(t *testing.T) { defer log.Scope(t).Close(t) for _, tc := range sortChunksTestCases { - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil, nil + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -230,8 +230,8 @@ func TestSortChunksRandomized(t *testing.T) { copy(expected, tups) sort.Slice(expected, less(expected, ordCols)) - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil, nil + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil }) } } diff --git a/pkg/sql/colexec/sort_test.go b/pkg/sql/colexec/sort_test.go index d0c9f890a4d6..252130cf3661 100644 --- a/pkg/sql/colexec/sort_test.go +++ b/pkg/sql/colexec/sort_test.go @@ -145,8 +145,8 @@ func TestSort(t *testing.T) { defer log.Scope(t).Close(t) for _, tc := range sortAllTestCases { colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, [][]*types.T{tc.typs}, tc.expected, colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewSorter(testAllocator, input[0], tc.typs, tc.ordCols, execinfra.DefaultMemoryLimit), nil, nil + func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewSorter(testAllocator, input[0], tc.typs, tc.ordCols, execinfra.DefaultMemoryLimit), nil }) } } @@ -176,11 +176,11 @@ func TestSortRandomized(t *testing.T) { if topK { expected = expected[:k] } - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { if topK { - return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, matchLen, uint64(k), execinfra.DefaultMemoryLimit), nil, nil + return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, matchLen, uint64(k), execinfra.DefaultMemoryLimit), nil } - return NewSorter(testAllocator, input[0], typs[:nCols], ordCols, execinfra.DefaultMemoryLimit), nil, nil + return NewSorter(testAllocator, input[0], typs[:nCols], ordCols, execinfra.DefaultMemoryLimit), nil }) } } diff --git a/pkg/sql/colexec/sorttopk_test.go b/pkg/sql/colexec/sorttopk_test.go index 9440f8230fd4..61d3bad21743 100644 --- a/pkg/sql/colexec/sorttopk_test.go +++ b/pkg/sql/colexec/sorttopk_test.go @@ -113,8 +113,8 @@ func TestTopKSorter(t *testing.T) { for _, tc := range topKSortTestCases { log.Infof(context.Background(), "%s", tc.description) - colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewTopKSorter(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, tc.k, execinfra.DefaultMemoryLimit), nil, nil + colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewTopKSorter(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, tc.k, execinfra.DefaultMemoryLimit), nil }) } } @@ -157,8 +157,8 @@ func TestTopKSortRandomized(t *testing.T) { log.Infof(ctx, "%s", name) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected[:k], colexectestutils.OrderedVerifier, - func(input []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { - return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, matchLen, uint64(k), execinfra.DefaultMemoryLimit), nil, nil + func(input []colexecop.Operator) (colexecop.Operator, error) { + return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, matchLen, uint64(k), execinfra.DefaultMemoryLimit), nil }) } } diff --git a/pkg/sql/colexec/values_test.go b/pkg/sql/colexec/values_test.go index b55d3b23aaba..26da1cc780fb 100644 --- a/pkg/sql/colexec/values_test.go +++ b/pkg/sql/colexec/values_test.go @@ -84,12 +84,12 @@ func TestValues(t *testing.T) { } colexectestutils.RunTests(t, testAllocator, nil, expected, colexectestutils.OrderedVerifier, - func(inputs []colexecop.Operator) (colexecop.Operator, colexecop.Closers, error) { + func(inputs []colexecop.Operator) (colexecop.Operator, error) { spec, err := execinfra.GenerateValuesSpec(colTypes, rows) if err != nil { - return nil, nil, err + return nil, err } - return NewValuesOp(testAllocator, &spec, math.MaxInt64), nil, nil + return NewValuesOp(testAllocator, &spec, math.MaxInt64), nil }) } }