Skip to content

Commit

Permalink
Merge #46669
Browse files Browse the repository at this point in the history
46669: colexec: improve test coverage for joiners r=jordanlewis a=jordanlewis

All Int64-specific merge join and hash join test cases are now
duplicated for Decimals and Bytes types.

Release note: None
Release justification: test-only change

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Mar 27, 2020
2 parents 8f793a0 + b74c4e4 commit c395c44
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 73 deletions.
38 changes: 20 additions & 18 deletions pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,24 +973,26 @@ func TestHashJoiner(t *testing.T) {
}
for _, tcs := range [][]joinTestCase{hjTestCases, mjTestCases} {
for _, tc := range tcs {
runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) {
spec := createSpecForHashJoiner(tc)
args := NewColOperatorArgs{
Spec: spec,
Inputs: sources,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
args.TestingKnobs.DiskSpillingDisabled = true
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
}
if hj, ok := result.Op.(*hashJoiner); ok {
hj.outputBatchSize = outputBatchSize
}
return result.Op, nil
})
for _, tc := range tc.mutateTypes() {
runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) {
spec := createSpecForHashJoiner(tc)
args := NewColOperatorArgs{
Spec: spec,
Inputs: sources,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
args.TestingKnobs.DiskSpillingDisabled = true
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
}
if hj, ok := result.Op.(*hashJoiner); ok {
hj.outputBatchSize = outputBatchSize
}
return result.Op, nil
})
}
}
}
}
Expand Down
108 changes: 56 additions & 52 deletions pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,62 +1500,66 @@ func TestMergeJoiner(t *testing.T) {
monitors []*mon.BytesMonitor
)
for _, tc := range mjTestCases {
tc.init()
for _, tc := range tc.mutateTypes() {
tc.init()

// We use a custom verifier function so that we can get the merge join op
// to use a custom output batch size per test, to exercise more cases.
var mergeJoinVerifier verifierFn = func(output *opTestOutput) error {
if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok {
mj.initWithOutputBatchSize(tc.outputBatchSize)
// We use a custom verifier function so that we can get the merge join op
// to use a custom output batch size per test, to exercise more cases.
var mergeJoinVerifier verifierFn = func(output *opTestOutput) error {
if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok {
mj.initWithOutputBatchSize(tc.outputBatchSize)
} else {
// When we have an inner join with ON expression, a filter operator
// will be put on top of the merge join, so to make life easier, we'll
// just ignore the requested output batch size.
output.input.Init()
}
verify := output.Verify
if _, isFullOuter := output.input.(*mergeJoinFullOuterOp); isFullOuter {
// FULL OUTER JOIN doesn't guarantee any ordering on its output (since
// it is ambiguous), so we're comparing the outputs as sets.
verify = output.VerifyAnyOrder
}

return verify()
}

var runner testRunner
if tc.skipAllNullsInjection {
// We're omitting all nulls injection test. See comments for each such
// test case.
runner = runTestsWithoutAllNullsInjection
} else {
// When we have an inner join with ON expression, a filter operator
// will be put on top of the merge join, so to make life easier, we'll
// just ignore the requested output batch size.
output.input.Init()
runner = runTestsWithTyps
}
verify := output.Verify
if _, isFullOuter := output.input.(*mergeJoinFullOuterOp); isFullOuter {
// FULL OUTER JOIN doesn't guarantee any ordering on its output (since
// it is ambiguous), so we're comparing the outputs as sets.
verify = output.VerifyAnyOrder
// We test all cases with the default memory limit (regular scenario) and a
// limit of 1 byte (to force the buffered groups to spill to disk).
for _, memoryLimit := range []int64{1, defaultMemoryLimit} {
t.Run(fmt.Sprintf("MemoryLimit=%s/%s", humanizeutil.IBytes(memoryLimit), tc.description), func(t *testing.T) {
runner(t, []tuples{tc.leftTuples, tc.rightTuples},
[][]coltypes.T{tc.leftTypes, tc.rightTypes},
tc.expected, mergeJoinVerifier,
func(input []Operator) (Operator, error) {
spec := createSpecForMergeJoiner(tc)
args := NewColOperatorArgs{
Spec: spec,
Inputs: input,
StreamingMemAccount: testMemAcc,
DiskQueueCfg: queueCfg,
FDSemaphore: NewTestingSemaphore(mjFDLimit),
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
}
accounts = append(accounts, result.OpAccounts...)
monitors = append(monitors, result.OpMonitors...)
return result.Op, nil
})
})
}

return verify()
}

var runner testRunner
if tc.skipAllNullsInjection {
// We're omitting all nulls injection test. See comments for each such
// test case.
runner = runTestsWithoutAllNullsInjection
} else {
runner = runTestsWithTyps
}
// We test all cases with the default memory limit (regular scenario) and a
// limit of 1 byte (to force the buffered groups to spill to disk).
for _, memoryLimit := range []int64{1, defaultMemoryLimit} {
t.Run(fmt.Sprintf("MemoryLimit=%s/%s", humanizeutil.IBytes(memoryLimit), tc.description), func(t *testing.T) {
runner(t, []tuples{tc.leftTuples, tc.rightTuples}, nil /* typs */, tc.expected, mergeJoinVerifier,
func(input []Operator) (Operator, error) {
spec := createSpecForMergeJoiner(tc)
args := NewColOperatorArgs{
Spec: spec,
Inputs: input,
StreamingMemAccount: testMemAcc,
DiskQueueCfg: queueCfg,
FDSemaphore: NewTestingSemaphore(mjFDLimit),
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
}
accounts = append(accounts, result.OpAccounts...)
monitors = append(monitors, result.OpMonitors...)
return result.Op, nil
})
})
}
}
for _, acc := range accounts {
Expand Down
94 changes: 91 additions & 3 deletions pkg/sql/colexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (t tuple) String() string {
}
if d, ok := t[i].(apd.Decimal); ok {
sb.WriteString(d.String())
} else if d, ok := t[i].(*apd.Decimal); ok {
sb.WriteString(d.String())
} else if d, ok := t[i].([]byte); ok {
sb.WriteString(string(d))
} else {
sb.WriteString(fmt.Sprintf("%v", t[i]))
}
Expand Down Expand Up @@ -121,9 +125,39 @@ func (t tuple) less(other tuple) bool {
return false
}

func (t tuple) clone() tuple {
b := make(tuple, len(t))
for i := range b {
b[i] = t[i]
}

return b
}

// tuples represents a table with any-type columns.
type tuples []tuple

func (t tuples) clone() tuples {
b := make(tuples, len(t))
for i := range b {
b[i] = t[i].clone()
}
return b
}

func (t tuples) String() string {
var sb strings.Builder
sb.WriteString("[")
for i := range t {
if i != 0 {
sb.WriteString(", ")
}
sb.WriteString(t[i].String())
}
sb.WriteString("]")
return sb.String()
}

// sort returns a copy of sorted tuples.
func (t tuples) sort() tuples {
b := make(tuples, len(t))
Expand Down Expand Up @@ -1313,19 +1347,19 @@ func (c *chunkingBatchSource) reset() {
type joinTestCase struct {
description string
joinType sqlbase.JoinType
leftTuples []tuple
leftTuples tuples
leftTypes []coltypes.T
leftOutCols []uint32
leftEqCols []uint32
leftDirections []execinfrapb.Ordering_Column_Direction
rightTuples []tuple
rightTuples tuples
rightTypes []coltypes.T
rightOutCols []uint32
rightEqCols []uint32
rightDirections []execinfrapb.Ordering_Column_Direction
leftEqColsAreKey bool
rightEqColsAreKey bool
expected []tuple
expected tuples
outputBatchSize int
skipAllNullsInjection bool
onExpr execinfrapb.Expression
Expand All @@ -1351,6 +1385,60 @@ func (tc *joinTestCase) init() {
}
}

// mutateTypes returns a slice of joinTestCases with varied types. Assumes
// the input is made up of just int64s. Calling this
func (tc *joinTestCase) mutateTypes() []joinTestCase {
ret := []joinTestCase{*tc}

for _, typ := range []coltypes.T{coltypes.Decimal, coltypes.Bytes} {
if typ == coltypes.Bytes {
// Skip test cases with ON conditions for now, since those expect
// numeric inputs.
if !tc.onExpr.Empty() {
continue
}
}
newTc := *tc
newTc.leftTypes = make([]coltypes.T, len(tc.leftTypes))
newTc.rightTypes = make([]coltypes.T, len(tc.rightTypes))
copy(newTc.leftTypes, tc.leftTypes)
copy(newTc.rightTypes, tc.rightTypes)
for _, typs := range [][]coltypes.T{newTc.leftTypes, newTc.rightTypes} {
for i := range typs {
if typs[i] != coltypes.Int64 {
// We currently can only mutate test cases that are made up of int64
// only.
return ret
}
typs[i] = typ
}
}
newTc.leftTuples = tc.leftTuples.clone()
newTc.rightTuples = tc.rightTuples.clone()
newTc.expected = tc.expected.clone()

for _, tups := range []tuples{newTc.leftTuples, newTc.rightTuples, newTc.expected} {
for i := range tups {
for j := range tups[i] {
if tups[i][j] == nil {
continue
}
switch typ {
case coltypes.Decimal:
var d apd.Decimal
_, _ = d.SetFloat64(float64(tups[i][j].(int)))
tups[i][j] = d
case coltypes.Bytes:
tups[i][j] = fmt.Sprintf("%.10d", tups[i][j].(int))
}
}
}
}
ret = append(ret, newTc)
}
return ret
}

type sortTestCase struct {
description string
tuples tuples
Expand Down

0 comments on commit c395c44

Please sign in to comment.