From e77de23369e3b6d4d508b51d791141cf05fedbaf Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 15 Dec 2017 15:56:04 -0500 Subject: [PATCH] sql/distsqlrun: add BenchmarkMergeJoiner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adjust BenchmarkHashJoiner to remove the projection and only use a single column so that the benchmark focuses on "speed of light" of the processor itself. name time/op HashJoiner/rows=0-8 2.90µs ± 1% HashJoiner/rows=4-8 7.50µs ± 1% HashJoiner/rows=16-8 18.0µs ± 1% HashJoiner/rows=256-8 217µs ± 1% HashJoiner/rows=4096-8 3.39ms ± 1% HashJoiner/rows=65536-8 64.4ms ± 4% MergeJoiner/rows=0-8 3.15µs ± 0% MergeJoiner/rows=4-8 6.50µs ± 1% MergeJoiner/rows=16-8 14.9µs ± 0% MergeJoiner/rows=256-8 170µs ± 1% MergeJoiner/rows=4096-8 2.64ms ± 0% MergeJoiner/rows=65536-8 44.4ms ± 1% name speed HashJoiner/rows=0-8 HashJoiner/rows=4-8 4.27MB/s ± 1% HashJoiner/rows=16-8 7.09MB/s ± 1% HashJoiner/rows=256-8 9.43MB/s ± 1% HashJoiner/rows=4096-8 9.66MB/s ± 1% HashJoiner/rows=65536-8 8.14MB/s ± 4% MergeJoiner/rows=0-8 MergeJoiner/rows=4-8 4.93MB/s ± 0% MergeJoiner/rows=16-8 8.61MB/s ± 0% MergeJoiner/rows=256-8 12.0MB/s ± 1% MergeJoiner/rows=4096-8 12.4MB/s ± 0% MergeJoiner/rows=65536-8 11.8MB/s ± 1% Release note: None --- pkg/sql/distsqlrun/hashjoiner_test.go | 26 +++++++-------- pkg/sql/distsqlrun/mergejoiner_test.go | 46 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/pkg/sql/distsqlrun/hashjoiner_test.go b/pkg/sql/distsqlrun/hashjoiner_test.go index dc071578738d..522ec2e61580 100644 --- a/pkg/sql/distsqlrun/hashjoiner_test.go +++ b/pkg/sql/distsqlrun/hashjoiner_test.go @@ -860,33 +860,31 @@ func BenchmarkHashJoiner(b *testing.B) { ctx := context.Background() evalCtx := tree.MakeTestingEvalContext() defer evalCtx.Stop(ctx) - flowCtx := FlowCtx{ + flowCtx := &FlowCtx{ Settings: cluster.MakeTestingClusterSettings(), EvalCtx: evalCtx, } - spec := HashJoinerSpec{ + spec := &HashJoinerSpec{ LeftEqColumns: []uint32{0}, RightEqColumns: []uint32{0}, Type: JoinType_INNER, + // Implicit @1 = @2 constraint. } - post := PostProcessSpec{Projection: true, OutputColumns: []uint32{0}} + post := &PostProcessSpec{} - const numCols = 4 - for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} { - b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) { - types := make([]sqlbase.ColumnType, numCols) - for i := 0; i < numCols; i++ { - types[i] = intType - } - rows := makeIntRows(inputSize, numCols) - leftInput := NewRepeatableRowSource(types, rows) - rightInput := NewRepeatableRowSource(types, rows) + const numCols = 1 + for _, numRows := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} { + b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) { + rows := makeIntRows(numRows, numCols) + leftInput := NewRepeatableRowSource(oneIntCol, rows) + rightInput := NewRepeatableRowSource(oneIntCol, rows) + b.SetBytes(int64(8 * numRows * numCols)) b.ResetTimer() for i := 0; i < b.N; i++ { // TODO(asubiotto): Get rid of uncleared state between // hashJoiner Run()s to omit instantiation time from benchmarks. - h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, &RowDisposer{}) + h, err := newHashJoiner(flowCtx, spec, leftInput, rightInput, post, &RowDisposer{}) if err != nil { b.Fatal(err) } diff --git a/pkg/sql/distsqlrun/mergejoiner_test.go b/pkg/sql/distsqlrun/mergejoiner_test.go index 52af7a625ad1..48cc87d7c088 100644 --- a/pkg/sql/distsqlrun/mergejoiner_test.go +++ b/pkg/sql/distsqlrun/mergejoiner_test.go @@ -15,6 +15,7 @@ package distsqlrun import ( + "fmt" "testing" "golang.org/x/net/context" @@ -553,3 +554,48 @@ func TestConsumerClosed(t *testing.T) { }) } } + +func BenchmarkMergeJoiner(b *testing.B) { + ctx := context.Background() + evalCtx := tree.MakeTestingEvalContext() + defer evalCtx.Stop(ctx) + flowCtx := &FlowCtx{ + Settings: cluster.MakeTestingClusterSettings(), + EvalCtx: evalCtx, + } + + spec := &MergeJoinerSpec{ + LeftOrdering: convertToSpecOrdering( + sqlbase.ColumnOrdering{ + {ColIdx: 0, Direction: encoding.Ascending}, + }), + RightOrdering: convertToSpecOrdering( + sqlbase.ColumnOrdering{ + {ColIdx: 0, Direction: encoding.Ascending}, + }), + Type: JoinType_INNER, + // Implicit @1 = @2 constraint. + } + post := &PostProcessSpec{} + disposer := &RowDisposer{} + + const numCols = 1 + for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} { + b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) { + rows := makeIntRows(inputSize, numCols) + leftInput := NewRepeatableRowSource(oneIntCol, rows) + rightInput := NewRepeatableRowSource(oneIntCol, rows) + b.SetBytes(int64(8 * inputSize * numCols)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m, err := newMergeJoiner(flowCtx, spec, leftInput, rightInput, post, disposer) + if err != nil { + b.Fatal(err) + } + m.Run(ctx, nil) + leftInput.Reset() + rightInput.Reset() + } + }) + } +}