Skip to content

Commit

Permalink
sql/distsqlrun: add BenchmarkMergeJoiner
Browse files Browse the repository at this point in the history
Adjust BenchmarkHashJoiner to remove the projection and only use a
single column so that the benchmark focuses on "speed of light" of the
processor itself.

name                      time/op
HashJoiner/rows=0-8         2.90µs ± 1%
HashJoiner/rows=4-8         7.50µs ± 1%
HashJoiner/rows=16-8        18.0µs ± 1%
HashJoiner/rows=256-8        217µs ± 1%
HashJoiner/rows=4096-8      3.39ms ± 1%
HashJoiner/rows=65536-8     64.4ms ± 4%
MergeJoiner/rows=0-8        3.15µs ± 0%
MergeJoiner/rows=4-8        6.50µs ± 1%
MergeJoiner/rows=16-8       14.9µs ± 0%
MergeJoiner/rows=256-8       170µs ± 1%
MergeJoiner/rows=4096-8     2.64ms ± 0%
MergeJoiner/rows=65536-8    44.4ms ± 1%

name                      speed
HashJoiner/rows=0-8
HashJoiner/rows=4-8       4.27MB/s ± 1%
HashJoiner/rows=16-8      7.09MB/s ± 1%
HashJoiner/rows=256-8     9.43MB/s ± 1%
HashJoiner/rows=4096-8    9.66MB/s ± 1%
HashJoiner/rows=65536-8   8.14MB/s ± 4%
MergeJoiner/rows=0-8
MergeJoiner/rows=4-8      4.93MB/s ± 0%
MergeJoiner/rows=16-8     8.61MB/s ± 0%
MergeJoiner/rows=256-8    12.0MB/s ± 1%
MergeJoiner/rows=4096-8   12.4MB/s ± 0%
MergeJoiner/rows=65536-8  11.8MB/s ± 1%

Release note: None
  • Loading branch information
petermattis committed Dec 18, 2017
1 parent 091705b commit e77de23
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
26 changes: 12 additions & 14 deletions pkg/sql/distsqlrun/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,33 +860,31 @@ func BenchmarkHashJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := FlowCtx{
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := HashJoinerSpec{
spec := &HashJoinerSpec{
LeftEqColumns: []uint32{0},
RightEqColumns: []uint32{0},
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := PostProcessSpec{Projection: true, OutputColumns: []uint32{0}}
post := &PostProcessSpec{}

const numCols = 4
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
types := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
types[i] = intType
}
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(types, rows)
rightInput := NewRepeatableRowSource(types, rows)
const numCols = 1
for _, numRows := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) {
rows := makeIntRows(numRows, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// TODO(asubiotto): Get rid of uncleared state between
// hashJoiner Run()s to omit instantiation time from benchmarks.
h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, &RowDisposer{})
h, err := newHashJoiner(flowCtx, spec, leftInput, rightInput, post, &RowDisposer{})
if err != nil {
b.Fatal(err)
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/distsqlrun/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package distsqlrun

import (
"fmt"
"testing"

"golang.org/x/net/context"
Expand Down Expand Up @@ -553,3 +554,48 @@ func TestConsumerClosed(t *testing.T) {
})
}
}

func BenchmarkMergeJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := &MergeJoinerSpec{
LeftOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
RightOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}

const numCols = 1
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * inputSize * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
m, err := newMergeJoiner(flowCtx, spec, leftInput, rightInput, post, disposer)
if err != nil {
b.Fatal(err)
}
m.Run(ctx, nil)
leftInput.Reset()
rightInput.Reset()
}
})
}
}

0 comments on commit e77de23

Please sign in to comment.