Skip to content

Commit

Permalink
sql/distsqlrun: avoid repeated EvalContext allocation (again)
Browse files Browse the repository at this point in the history
Plumb EvalContext into streamMerger and streamGroupAccumulator in order
to avoid allocating it on every call to advanceGroup.

name                      old time/op    new time/op     delta
MergeJoiner/rows=0-8        3.15µs ± 0%     3.29µs ± 1%    +4.54%  (p=0.000 n=10+9)
MergeJoiner/rows=4-8        6.50µs ± 1%     6.01µs ± 1%    -7.62%  (p=0.000 n=9+8)
MergeJoiner/rows=16-8       14.9µs ± 0%      9.2µs ± 1%   -38.01%  (p=0.000 n=8+9)
MergeJoiner/rows=256-8       170µs ± 1%       71µs ± 2%   -58.19%  (p=0.000 n=10+10)
MergeJoiner/rows=4096-8     2.64ms ± 0%     1.05ms ± 0%   -60.42%  (p=0.000 n=8+9)
MergeJoiner/rows=65536-8    44.4ms ± 1%     17.2ms ± 1%   -61.22%  (p=0.000 n=9+10)

name                      old speed      new speed       delta
MergeJoiner/rows=4-8      4.93MB/s ± 0%   5.33MB/s ± 1%    +8.15%  (p=0.000 n=8+8)
MergeJoiner/rows=16-8     8.61MB/s ± 0%  13.89MB/s ± 1%   +61.32%  (p=0.000 n=8+9)
MergeJoiner/rows=256-8    12.0MB/s ± 1%   28.8MB/s ± 2%  +139.20%  (p=0.000 n=10+10)
MergeJoiner/rows=4096-8   12.4MB/s ± 0%   31.3MB/s ± 0%  +152.68%  (p=0.000 n=8+9)
MergeJoiner/rows=65536-8  11.8MB/s ± 1%   30.4MB/s ± 1%  +157.84%  (p=0.000 n=9+10)

Release note (performance change): Speed up except and merge joins by
avoiding an unnecessary allocation.
  • Loading branch information
petermattis committed Dec 18, 2017
1 parent e77de23 commit bda143f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
22 changes: 15 additions & 7 deletions pkg/sql/distsqlrun/algebraic_set_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -30,6 +31,9 @@ import (
type algebraicSetOp struct {
processorBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource
opType AlgebraicSetOpSpec_SetOpType
ordering Ordering
Expand All @@ -47,6 +51,7 @@ func newAlgebraicSetOp(
output RowReceiver,
) (*algebraicSetOp, error) {
e := &algebraicSetOp{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
ordering: spec.Ordering,
Expand Down Expand Up @@ -99,6 +104,8 @@ func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) {
defer e.leftSource.ConsumerDone()
defer e.rightSource.ConsumerDone()

e.evalCtx = e.flowCtx.NewEvalCtx()

switch e.opType {
case AlgebraicSetOpSpec_Except_all:
err := e.exceptAll(ctx)
Expand All @@ -122,11 +129,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering),
)

leftRows, err := leftGroup.advanceGroup()
leftRows, err := leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err := rightGroup.advanceGroup()
rightRows, err := rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,6 +167,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering),
false, /* nullEquality */
e.datumAlloc,
e.evalCtx,
)
if err != nil {
return err
Expand Down Expand Up @@ -195,11 +203,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
}
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
Expand All @@ -214,13 +222,13 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {
return err
}
}
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
if err != nil {
return err
}
}
if cmp > 0 {
rightRows, err = rightGroup.advanceGroup()
rightRows, err = rightGroup.advanceGroup(e.evalCtx)
if len(rightRows) == 0 {
break
}
Expand All @@ -244,7 +252,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error {

// Emit all remaining rows.
for {
leftRows, err = leftGroup.advanceGroup()
leftRows, err = leftGroup.advanceGroup(e.evalCtx)
// Emit all left rows until completion/error.
if err != nil || len(leftRows) == 0 {
return err
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsqlrun/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand All @@ -32,6 +33,9 @@ import (
type mergeJoiner struct {
joinerBase

flowCtx *FlowCtx
evalCtx *tree.EvalContext

leftSource, rightSource RowSource

streamMerger streamMerger
Expand All @@ -54,6 +58,7 @@ func newMergeJoiner(
}

m := &mergeJoiner{
flowCtx: flowCtx,
leftSource: leftSource,
rightSource: rightSource,
}
Expand Down Expand Up @@ -90,6 +95,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
log.VEventf(ctx, 2, "starting merge joiner run")

cancelChecker := sqlbase.NewCancelChecker(ctx)
m.evalCtx = m.flowCtx.NewEvalCtx()

for {
moreBatches, err := m.outputBatch(ctx, cancelChecker)
Expand All @@ -112,7 +118,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
func (m *mergeJoiner) outputBatch(
ctx context.Context, cancelChecker *sqlbase.CancelChecker,
) (bool, error) {
leftRows, rightRows, err := m.streamMerger.NextBatch()
leftRows, rightRows, err := m.streamMerger.NextBatch(m.evalCtx)
if err != nil {
return false, err
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/sql/distsqlrun/stream_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func (s *streamGroupAccumulator) peekAtCurrentGroup() (sqlbase.EncDatumRow, erro
// advanceGroup returns all rows of the current group and advances the internal
// state to the next group, so that a subsequent peekAtCurrentGroup() will
// return the first row of the next group.
func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
func (s *streamGroupAccumulator) advanceGroup(
evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, error) {
if s.srcConsumed {
// If src has been exhausted, then we also must have advanced away from the
// last group.
return nil, nil
}
// TODO(radu): plumb EvalContext
evalCtx := &tree.EvalContext{}

for {
row, err := s.src.NextRow()
Expand All @@ -91,6 +91,9 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
}

if len(s.curGroup) == 0 {
if s.curGroup == nil {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
continue
}
Expand All @@ -107,8 +110,15 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) {
s.curGroup[0].String(s.types), row.String(s.types),
)
} else {
ret := s.curGroup
s.curGroup = []sqlbase.EncDatumRow{row}
n := len(s.curGroup)
ret := s.curGroup[:n:n]
// The curGroup slice possibly has additional space at the end of it. Use
// it if possible to avoid an allocation.
s.curGroup = s.curGroup[n:]
if cap(s.curGroup) == 0 {
s.curGroup = make([]sqlbase.EncDatumRow, 0, 64)
}
s.curGroup = append(s.curGroup, row)
return ret, nil
}
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/distsqlrun/stream_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type streamMerger struct {
// NextBatch returns a set of rows from the left stream and a set of rows from
// the right stream, all matching on the equality columns. One of the sets can
// be empty.
func (sm *streamMerger) NextBatch() ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, error) {
func (sm *streamMerger) NextBatch(
evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, error) {
lrow, err := sm.left.peekAtCurrentGroup()
if err != nil {
return nil, nil, err
Expand All @@ -52,20 +54,21 @@ func (sm *streamMerger) NextBatch() ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRo
}

cmp, err := CompareEncDatumRowForMerge(
sm.left.types, lrow, rrow, sm.left.ordering, sm.right.ordering, sm.nullEquality, &sm.datumAlloc,
sm.left.types, lrow, rrow, sm.left.ordering, sm.right.ordering,
sm.nullEquality, &sm.datumAlloc, evalCtx,
)
if err != nil {
return nil, nil, err
}
var leftGroup, rightGroup []sqlbase.EncDatumRow
if cmp <= 0 {
leftGroup, err = sm.left.advanceGroup()
leftGroup, err = sm.left.advanceGroup(evalCtx)
if err != nil {
return nil, nil, err
}
}
if cmp >= 0 {
rightGroup, err = sm.right.advanceGroup()
rightGroup, err = sm.right.advanceGroup(evalCtx)
if err != nil {
return nil, nil, err
}
Expand All @@ -89,6 +92,7 @@ func CompareEncDatumRowForMerge(
leftOrdering, rightOrdering sqlbase.ColumnOrdering,
nullEquality bool,
da *sqlbase.DatumAlloc,
evalCtx *tree.EvalContext,
) (int, error) {
if lhs == nil && rhs == nil {
return 0, nil
Expand All @@ -105,9 +109,6 @@ func CompareEncDatumRowForMerge(
)
}

// TODO(radu): plumb EvalContext
evalCtx := &tree.EvalContext{}

for i, ord := range leftOrdering {
lIdx := ord.ColIdx
rIdx := rightOrdering[i].ColIdx
Expand Down

0 comments on commit bda143f

Please sign in to comment.