Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/distsqlrun: add benchmarks for aggregator and mergeJoiner #20759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions pkg/sql/distsqlrun/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,90 @@ func TestAggregator(t *testing.T) {
})
}
}

func BenchmarkAggregation(b *testing.B) {
const numCols = 1
const numRows = 1000

aggFuncs := []AggregatorSpec_Func{
AggregatorSpec_IDENT,
AggregatorSpec_AVG,
AggregatorSpec_COUNT,
AggregatorSpec_MAX,
AggregatorSpec_MIN,
AggregatorSpec_STDDEV,
AggregatorSpec_SUM,
AggregatorSpec_SUM_INT,
AggregatorSpec_VARIANCE,
AggregatorSpec_XOR_AGG,
}

ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

for _, aggFunc := range aggFuncs {
b.Run(aggFunc.String(), func(b *testing.B) {
spec := &AggregatorSpec{
Aggregations: []AggregatorSpec_Aggregation{
{
Func: aggFunc,
ColIdx: []uint32{0},
},
},
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
})
}
}

func BenchmarkGrouping(b *testing.B) {
const numCols = 1
const numRows = 1000

ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}
spec := &AggregatorSpec{
GroupCols: []uint32{0},
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
}
22 changes: 15 additions & 7 deletions pkg/sql/distsqlrun/algebraic_set_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -30,6 +31,9 @@ import (
type algebraicSetOp struct {
processorBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource
opType AlgebraicSetOpSpec_SetOpType
ordering Ordering
Expand All @@ -47,6 +51,7 @@ func newAlgebraicSetOp(
output RowReceiver,
) (*algebraicSetOp, error) {
e := &algebraicSetOp{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
ordering: spec.Ordering,
Expand Down Expand Up @@ -99,6 +104,8 @@ func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) {
defer e.leftSource.ConsumerDone()
defer e.rightSource.ConsumerDone()

e.evalCtx = e.flowCtx.NewEvalCtx()

switch e.opType {
case AlgebraicSetOpSpec_Except_all:
err := e.exceptAll(ctx)
Expand All @@ -122,11 +129,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering),
)

leftRows, err := leftGroup.advanceGroup()
leftRows, err := leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err := rightGroup.advanceGroup()
rightRows, err := rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,6 +167,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering),
false, /* nullEquality */
e.datumAlloc,
e.evalCtx,
)
if err != nil {
return err
Expand Down Expand Up @@ -195,11 +203,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
}
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand All @@ -214,13 +222,13 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
return err
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
}
if cmp > 0 {
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if len(rightRows) == 0 {
break
}
Expand All @@ -244,7 +252,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {

// Emit all remaining rows.
for {
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
// Emit all left rows until completion/error.
if err != nil || len(leftRows) == 0 {
return err
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsqlrun/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,9 @@ func BenchmarkDistinct(b *testing.B) {
DistinctColumns: []uint32{0},
}
post := &PostProcessSpec{}
input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

types := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
types[i] = intType
}
input := NewRepeatableRowSource(types, makeIntRows(numRows, numCols))
b.SetBytes(8 * numRows * numCols)
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newDistinct(flowCtx, spec, input, post, &RowDisposer{})
Expand All @@ -201,4 +198,5 @@ func BenchmarkDistinct(b *testing.B) {
d.Run(ctx, nil)
input.Reset()
}
b.StopTimer()
}
26 changes: 12 additions & 14 deletions pkg/sql/distsqlrun/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,33 +860,31 @@ func BenchmarkHashJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := FlowCtx{
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := HashJoinerSpec{
spec := &HashJoinerSpec{
LeftEqColumns: []uint32{0},
RightEqColumns: []uint32{0},
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := PostProcessSpec{Projection: true, OutputColumns: []uint32{0}}
post := &PostProcessSpec{}

const numCols = 4
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
types := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
types[i] = intType
}
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(types, rows)
rightInput := NewRepeatableRowSource(types, rows)
const numCols = 1
for _, numRows := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) {
rows := makeIntRows(numRows, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// TODO(asubiotto): Get rid of uncleared state between
// hashJoiner Run()s to omit instantiation time from benchmarks.
h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, &RowDisposer{})
h, err := newHashJoiner(flowCtx, spec, leftInput, rightInput, post, &RowDisposer{})
if err != nil {
b.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsqlrun/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -32,6 +33,9 @@ import (
type mergeJoiner struct {
joinerBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource

streamMerger streamMerger
Expand All @@ -54,6 +58,7 @@ func newMergeJoiner(
}

m := &mergeJoiner{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
}
Expand Down Expand Up @@ -90,6 +95,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
log.VEventf(ctx, 2, "starting merge joiner run")

cancelChecker := sqlbase.NewCancelChecker(ctx)
m.evalCtx = m.flowCtx.NewEvalCtx()

for {
moreBatches, err := m.outputBatch(ctx, cancelChecker)
Expand All @@ -112,7 +118,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
func (m *mergeJoiner) outputBatch(
ctx context.Context, cancelChecker *sqlbase.CancelChecker,
) (bool, error) {
leftRows, rightRows, err := m.streamMerger.NextBatch()
leftRows, rightRows, err := m.streamMerger.NextBatch(m.evalCtx)
if err != nil {
return false, err
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/distsqlrun/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package distsqlrun

import (
"fmt"
"testing"

"golang.org/x/net/context"
Expand Down Expand Up @@ -553,3 +554,48 @@ func TestConsumerClosed(t *testing.T) {
})
}
}

func BenchmarkMergeJoiner(b *testing.B) {
ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext()
defer evalCtx.Stop(ctx)
flowCtx := &FlowCtx{
Settings: cluster.MakeTestingClusterSettings(),
EvalCtx: evalCtx,
}

spec := &MergeJoinerSpec{
LeftOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
RightOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 0, Direction: encoding.Ascending},
}),
Type: JoinType_INNER,
// Implicit @1 = @2 constraint.
}
post := &PostProcessSpec{}
disposer := &RowDisposer{}

const numCols = 1
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
rows := makeIntRows(inputSize, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, rows)
rightInput := NewRepeatableRowSource(oneIntCol, rows)
b.SetBytes(int64(8 * inputSize * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
m, err := newMergeJoiner(flowCtx, spec, leftInput, rightInput, post, disposer)
if err != nil {
b.Fatal(err)
}
m.Run(ctx, nil)
leftInput.Reset()
rightInput.Reset()
}
})
}
}
20 changes: 15 additions & 5 deletions pkg/sql/distsqlrun/stream_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func (s *streamGroupAccumulator) peekAtCurrentGroup() (sqlbase.EncDatumRow, erro
// advanceGroup returns all rows of the current group and advances the internal
// state to the next group, so that a subsequent peekAtCurrentGroup() will
// return the first row of the next group.
func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
func (s *streamGroupAccumulator) advanceGroup(
evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, error) {
if s.srcConsumed {
// If src has been exhausted, then we also must have advanced away from the
// last group.
return nil, nil
}
// TODO(radu): plumb EvalContext
evalCtx := &tree.EvalContext{}

for {
row, err := s.src.NextRow()
Expand All @@ -91,6 +91,9 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
}

if len(s.curGroup) == 0 {
if s.curGroup == nil {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
continue
}
Expand All @@ -107,8 +110,15 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
s.curGroup[0].String(s.types), row.String(s.types),
)
} else {
ret := s.curGroup
s.curGroup = []sqlbase.EncDatumRow{row}
n := len(s.curGroup)
ret := s.curGroup[:n:n]
// The curGroup slice possibly has additional space at the end of it. Use
// it if possible to avoid an allocation.
s.curGroup = s.curGroup[n:]
if cap(s.curGroup) == 0 {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
return ret, nil
}
}
Expand Down
Loading