Skip to content

Commit

Permalink
Merge pull request #20759 from petermattis/pmattis/distsql-benchmarks
Browse files Browse the repository at this point in the history
sql/distsqlrun: add benchmarks for aggregator and mergeJoiner
  • Loading branch information
petermattis authored Dec 20, 2017
2 parents 90078d2 + bda143f commit 533e942
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 39 deletions.
87 changes: 87 additions & 0 deletions pkg/sql/distsqlrun/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,90 @@ func TestAggregator(t *testing.T) {
})
}
}

func BenchmarkAggregation(b *testing.B) {
const numCols = 1
const numRows = 1000

aggFuncs := []AggregatorSpec_Func{
AggregatorSpec_IDENT,
AggregatorSpec_AVG,
AggregatorSpec_COUNT,
AggregatorSpec_MAX,
AggregatorSpec_MIN,
AggregatorSpec_STDDEV,
AggregatorSpec_SUM,
AggregatorSpec_SUM_INT,
AggregatorSpec_VARIANCE,
AggregatorSpec_XOR_AGG,
}

ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

for _, aggFunc := range aggFuncs {
b.Run(aggFunc.String(), func(b *testing.B) {
spec := &AggregatorSpec{
Aggregations: []AggregatorSpec_Aggregation{
{
Func: aggFunc,
ColIdx: []uint32{0},
},
},
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
})
}
}

func BenchmarkGrouping(b *testing.B) {
const numCols = 1
const numRows = 1000

ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}
spec := &AggregatorSpec{
GroupCols: []uint32{0},
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
}
22 changes: 15 additions & 7 deletions pkg/sql/distsqlrun/algebraic_set_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -30,6 +31,9 @@ import (
type algebraicSetOp struct {
processorBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource
opType AlgebraicSetOpSpec_SetOpType
ordering Ordering
Expand All @@ -47,6 +51,7 @@ func newAlgebraicSetOp(
output RowReceiver,
) (*algebraicSetOp, error) {
e := &algebraicSetOp{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
ordering: spec.Ordering,
Expand Down Expand Up @@ -99,6 +104,8 @@ func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) {
defer e.leftSource.ConsumerDone()
defer e.rightSource.ConsumerDone()

e.evalCtx = e.flowCtx.NewEvalCtx()

switch e.opType {
case AlgebraicSetOpSpec_Except_all:
err := e.exceptAll(ctx)
Expand All @@ -122,11 +129,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering),
)

leftRows, err := leftGroup.advanceGroup()
leftRows, err := leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err := rightGroup.advanceGroup()
rightRows, err := rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,6 +167,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering),
false, /* nullEquality */
e.datumAlloc,
e.evalCtx,
)
if err != nil {
return err
Expand Down Expand Up @@ -195,11 +203,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
}
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand All @@ -214,13 +222,13 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
return err
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
}
if cmp > 0 {
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if len(rightRows) == 0 {
break
}
Expand All @@ -244,7 +252,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {

// Emit all remaining rows.
for {
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
// Emit all left rows until completion/error.
if err != nil || len(leftRows) == 0 {
return err
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsqlrun/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,9 @@ func BenchmarkDistinct(b *testing.B) {
DistinctColumns: []uint32{0},
}
post := &PostProcessSpec{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

types := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
types[i] = intType
}
input := NewRepeatableRowSource(types, makeIntRows(numRows, numCols))
b.SetBytes(8 * numRows * numCols)
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newDistinct(flowCtx, spec, input, post, &RowDisposer{})
Expand All @@ -201,4 +198,5 @@ func BenchmarkDistinct(b *testing.B) {
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
}
26 changes: 12 additions & 14 deletions pkg/sql/distsqlrun/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,33 +860,31 @@ func BenchmarkHashJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := FlowCtx{
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := HashJoinerSpec{
spec := &HashJoinerSpec{
LeftEqColumns: []uint32{0},
RightEqColumns: []uint32{0},
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := PostProcessSpec{Projection: true, OutputColumns: []uint32{0}}
post := &PostProcessSpec{}

const numCols = 4
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
types := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
types[i] = intType
}
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(types, rows)
rightInput := NewRepeatableRowSource(types, rows)
const numCols = 1
for _, numRows := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) {
rows := makeIntRows(numRows, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// TODO(asubiotto): Get rid of uncleared state between
// hashJoiner Run()s to omit instantiation time from benchmarks.
h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, &RowDisposer{})
h, err := newHashJoiner(flowCtx, spec, leftInput, rightInput, post, &RowDisposer{})
if err != nil {
b.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsqlrun/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -32,6 +33,9 @@ import (
type mergeJoiner struct {
joinerBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource

streamMerger streamMerger
Expand All @@ -54,6 +58,7 @@ func newMergeJoiner(
}

m := &mergeJoiner{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
}
Expand Down Expand Up @@ -90,6 +95,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
log.VEventf(ctx, 2, "starting merge joiner run")

cancelChecker := sqlbase.NewCancelChecker(ctx)
m.evalCtx = m.flowCtx.NewEvalCtx()

for {
moreBatches, err := m.outputBatch(ctx, cancelChecker)
Expand All @@ -112,7 +118,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
func (m *mergeJoiner) outputBatch(
ctx context.Context, cancelChecker *sqlbase.CancelChecker,
) (bool, error) {
leftRows, rightRows, err := m.streamMerger.NextBatch()
leftRows, rightRows, err := m.streamMerger.NextBatch(m.evalCtx)
if err != nil {
return false, err
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/distsqlrun/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package distsqlrun

import (
"fmt"
"testing"

"golang.org/x/net/context"
Expand Down Expand Up @@ -553,3 +554,48 @@ func TestConsumerClosed(t *testing.T) {
})
}
}

func BenchmarkMergeJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := &MergeJoinerSpec{
LeftOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
RightOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}

const numCols = 1
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * inputSize * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
m, err := newMergeJoiner(flowCtx, spec, leftInput, rightInput, post, disposer)
if err != nil {
b.Fatal(err)
}
m.Run(ctx, nil)
leftInput.Reset()
rightInput.Reset()
}
})
}
}
20 changes: 15 additions & 5 deletions pkg/sql/distsqlrun/stream_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func (s *streamGroupAccumulator) peekAtCurrentGroup() (sqlbase.EncDatumRow, erro
// advanceGroup returns all rows of the current group and advances the internal
// state to the next group, so that a subsequent peekAtCurrentGroup() will
// return the first row of the next group.
func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
func (s *streamGroupAccumulator) advanceGroup(
evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, error) {
if s.srcConsumed {
// If src has been exhausted, then we also must have advanced away from the
// last group.
return nil, nil
}
// TODO(radu): plumb EvalContext
evalCtx := &tree.EvalContext{}

for {
row, err := s.src.NextRow()
Expand All @@ -91,6 +91,9 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
}

if len(s.curGroup) == 0 {
if s.curGroup == nil {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
continue
}
Expand All @@ -107,8 +110,15 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
s.curGroup[0].String(s.types), row.String(s.types),
)
} else {
ret := s.curGroup
s.curGroup = []sqlbase.EncDatumRow{row}
n := len(s.curGroup)
ret := s.curGroup[:n:n]
// The curGroup slice possibly has additional space at the end of it. Use
// it if possible to avoid an allocation.
s.curGroup = s.curGroup[n:]
if cap(s.curGroup) == 0 {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
return ret, nil
}
}
Expand Down
Loading

0 comments on commit 533e942

Please sign in to comment.