From b74c4e4dd5c0de416d401880a52c1f03695b288e Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Mar 2020 01:43:54 -0400 Subject: [PATCH] colexec: improve test coverage for joiners All Int64-specific merge join and hash join test cases are now duplicated for Decimals and Bytes types. Release note: None Release justification: test-only change --- pkg/sql/colexec/hashjoiner_test.go | 38 +++++----- pkg/sql/colexec/mergejoiner_test.go | 108 ++++++++++++++-------------- pkg/sql/colexec/utils_test.go | 94 +++++++++++++++++++++++- 3 files changed, 167 insertions(+), 73 deletions(-) diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 1621a4ffb828..d545567d4f4f 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -973,24 +973,26 @@ func TestHashJoiner(t *testing.T) { } for _, tcs := range [][]joinTestCase{hjTestCases, mjTestCases} { for _, tc := range tcs { - runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) { - spec := createSpecForHashJoiner(tc) - args := NewColOperatorArgs{ - Spec: spec, - Inputs: sources, - StreamingMemAccount: testMemAcc, - } - args.TestingKnobs.UseStreamingMemAccountForBuffering = true - args.TestingKnobs.DiskSpillingDisabled = true - result, err := NewColOperator(ctx, flowCtx, args) - if err != nil { - return nil, err - } - if hj, ok := result.Op.(*hashJoiner); ok { - hj.outputBatchSize = outputBatchSize - } - return result.Op, nil - }) + for _, tc := range tc.mutateTypes() { + runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) { + spec := createSpecForHashJoiner(tc) + args := NewColOperatorArgs{ + Spec: spec, + Inputs: sources, + StreamingMemAccount: testMemAcc, + } + args.TestingKnobs.UseStreamingMemAccountForBuffering = true + args.TestingKnobs.DiskSpillingDisabled = true + result, err := NewColOperator(ctx, flowCtx, args) + if err != nil { + return nil, err + } + if hj, ok := result.Op.(*hashJoiner); ok { + hj.outputBatchSize = outputBatchSize + } + return result.Op, nil + }) + } } } } diff --git a/pkg/sql/colexec/mergejoiner_test.go b/pkg/sql/colexec/mergejoiner_test.go index a2a1b0317ee9..42a93da21ee0 100644 --- a/pkg/sql/colexec/mergejoiner_test.go +++ b/pkg/sql/colexec/mergejoiner_test.go @@ -1500,62 +1500,66 @@ func TestMergeJoiner(t *testing.T) { monitors []*mon.BytesMonitor ) for _, tc := range mjTestCases { - tc.init() + for _, tc := range tc.mutateTypes() { + tc.init() - // We use a custom verifier function so that we can get the merge join op - // to use a custom output batch size per test, to exercise more cases. - var mergeJoinVerifier verifierFn = func(output *opTestOutput) error { - if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok { - mj.initWithOutputBatchSize(tc.outputBatchSize) + // We use a custom verifier function so that we can get the merge join op + // to use a custom output batch size per test, to exercise more cases. + var mergeJoinVerifier verifierFn = func(output *opTestOutput) error { + if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok { + mj.initWithOutputBatchSize(tc.outputBatchSize) + } else { + // When we have an inner join with ON expression, a filter operator + // will be put on top of the merge join, so to make life easier, we'll + // just ignore the requested output batch size. + output.input.Init() + } + verify := output.Verify + if _, isFullOuter := output.input.(*mergeJoinFullOuterOp); isFullOuter { + // FULL OUTER JOIN doesn't guarantee any ordering on its output (since + // it is ambiguous), so we're comparing the outputs as sets. + verify = output.VerifyAnyOrder + } + + return verify() + } + + var runner testRunner + if tc.skipAllNullsInjection { + // We're omitting all nulls injection test. See comments for each such + // test case. + runner = runTestsWithoutAllNullsInjection } else { - // When we have an inner join with ON expression, a filter operator - // will be put on top of the merge join, so to make life easier, we'll - // just ignore the requested output batch size. - output.input.Init() + runner = runTestsWithTyps } - verify := output.Verify - if _, isFullOuter := output.input.(*mergeJoinFullOuterOp); isFullOuter { - // FULL OUTER JOIN doesn't guarantee any ordering on its output (since - // it is ambiguous), so we're comparing the outputs as sets. - verify = output.VerifyAnyOrder + // We test all cases with the default memory limit (regular scenario) and a + // limit of 1 byte (to force the buffered groups to spill to disk). + for _, memoryLimit := range []int64{1, defaultMemoryLimit} { + t.Run(fmt.Sprintf("MemoryLimit=%s/%s", humanizeutil.IBytes(memoryLimit), tc.description), func(t *testing.T) { + runner(t, []tuples{tc.leftTuples, tc.rightTuples}, + [][]coltypes.T{tc.leftTypes, tc.rightTypes}, + tc.expected, mergeJoinVerifier, + func(input []Operator) (Operator, error) { + spec := createSpecForMergeJoiner(tc) + args := NewColOperatorArgs{ + Spec: spec, + Inputs: input, + StreamingMemAccount: testMemAcc, + DiskQueueCfg: queueCfg, + FDSemaphore: NewTestingSemaphore(mjFDLimit), + } + args.TestingKnobs.UseStreamingMemAccountForBuffering = true + flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit + result, err := NewColOperator(ctx, flowCtx, args) + if err != nil { + return nil, err + } + accounts = append(accounts, result.OpAccounts...) + monitors = append(monitors, result.OpMonitors...) + return result.Op, nil + }) + }) } - - return verify() - } - - var runner testRunner - if tc.skipAllNullsInjection { - // We're omitting all nulls injection test. See comments for each such - // test case. - runner = runTestsWithoutAllNullsInjection - } else { - runner = runTestsWithTyps - } - // We test all cases with the default memory limit (regular scenario) and a - // limit of 1 byte (to force the buffered groups to spill to disk). - for _, memoryLimit := range []int64{1, defaultMemoryLimit} { - t.Run(fmt.Sprintf("MemoryLimit=%s/%s", humanizeutil.IBytes(memoryLimit), tc.description), func(t *testing.T) { - runner(t, []tuples{tc.leftTuples, tc.rightTuples}, nil /* typs */, tc.expected, mergeJoinVerifier, - func(input []Operator) (Operator, error) { - spec := createSpecForMergeJoiner(tc) - args := NewColOperatorArgs{ - Spec: spec, - Inputs: input, - StreamingMemAccount: testMemAcc, - DiskQueueCfg: queueCfg, - FDSemaphore: NewTestingSemaphore(mjFDLimit), - } - args.TestingKnobs.UseStreamingMemAccountForBuffering = true - flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit - result, err := NewColOperator(ctx, flowCtx, args) - if err != nil { - return nil, err - } - accounts = append(accounts, result.OpAccounts...) - monitors = append(monitors, result.OpMonitors...) - return result.Op, nil - }) - }) } } for _, acc := range accounts { diff --git a/pkg/sql/colexec/utils_test.go b/pkg/sql/colexec/utils_test.go index 71a19e1140e3..90855e9f239d 100644 --- a/pkg/sql/colexec/utils_test.go +++ b/pkg/sql/colexec/utils_test.go @@ -48,6 +48,10 @@ func (t tuple) String() string { } if d, ok := t[i].(apd.Decimal); ok { sb.WriteString(d.String()) + } else if d, ok := t[i].(*apd.Decimal); ok { + sb.WriteString(d.String()) + } else if d, ok := t[i].([]byte); ok { + sb.WriteString(string(d)) } else { sb.WriteString(fmt.Sprintf("%v", t[i])) } @@ -121,9 +125,39 @@ func (t tuple) less(other tuple) bool { return false } +func (t tuple) clone() tuple { + b := make(tuple, len(t)) + for i := range b { + b[i] = t[i] + } + + return b +} + // tuples represents a table with any-type columns. type tuples []tuple +func (t tuples) clone() tuples { + b := make(tuples, len(t)) + for i := range b { + b[i] = t[i].clone() + } + return b +} + +func (t tuples) String() string { + var sb strings.Builder + sb.WriteString("[") + for i := range t { + if i != 0 { + sb.WriteString(", ") + } + sb.WriteString(t[i].String()) + } + sb.WriteString("]") + return sb.String() +} + // sort returns a copy of sorted tuples. func (t tuples) sort() tuples { b := make(tuples, len(t)) @@ -1313,19 +1347,19 @@ func (c *chunkingBatchSource) reset() { type joinTestCase struct { description string joinType sqlbase.JoinType - leftTuples []tuple + leftTuples tuples leftTypes []coltypes.T leftOutCols []uint32 leftEqCols []uint32 leftDirections []execinfrapb.Ordering_Column_Direction - rightTuples []tuple + rightTuples tuples rightTypes []coltypes.T rightOutCols []uint32 rightEqCols []uint32 rightDirections []execinfrapb.Ordering_Column_Direction leftEqColsAreKey bool rightEqColsAreKey bool - expected []tuple + expected tuples outputBatchSize int skipAllNullsInjection bool onExpr execinfrapb.Expression @@ -1351,6 +1385,60 @@ func (tc *joinTestCase) init() { } } +// mutateTypes returns a slice of joinTestCases with varied types. Assumes +// the input is made up of just int64s. Calling this +func (tc *joinTestCase) mutateTypes() []joinTestCase { + ret := []joinTestCase{*tc} + + for _, typ := range []coltypes.T{coltypes.Decimal, coltypes.Bytes} { + if typ == coltypes.Bytes { + // Skip test cases with ON conditions for now, since those expect + // numeric inputs. + if !tc.onExpr.Empty() { + continue + } + } + newTc := *tc + newTc.leftTypes = make([]coltypes.T, len(tc.leftTypes)) + newTc.rightTypes = make([]coltypes.T, len(tc.rightTypes)) + copy(newTc.leftTypes, tc.leftTypes) + copy(newTc.rightTypes, tc.rightTypes) + for _, typs := range [][]coltypes.T{newTc.leftTypes, newTc.rightTypes} { + for i := range typs { + if typs[i] != coltypes.Int64 { + // We currently can only mutate test cases that are made up of int64 + // only. + return ret + } + typs[i] = typ + } + } + newTc.leftTuples = tc.leftTuples.clone() + newTc.rightTuples = tc.rightTuples.clone() + newTc.expected = tc.expected.clone() + + for _, tups := range []tuples{newTc.leftTuples, newTc.rightTuples, newTc.expected} { + for i := range tups { + for j := range tups[i] { + if tups[i][j] == nil { + continue + } + switch typ { + case coltypes.Decimal: + var d apd.Decimal + _, _ = d.SetFloat64(float64(tups[i][j].(int))) + tups[i][j] = d + case coltypes.Bytes: + tups[i][j] = fmt.Sprintf("%.10d", tups[i][j].(int)) + } + } + } + } + ret = append(ret, newTc) + } + return ret +} + type sortTestCase struct { description string tuples tuples