diff --git a/pkg/sql/distsqlrun/algebraic_set_op.go b/pkg/sql/distsqlrun/algebraic_set_op.go index c05bf703ae60..c4c425b6e0f6 100644 --- a/pkg/sql/distsqlrun/algebraic_set_op.go +++ b/pkg/sql/distsqlrun/algebraic_set_op.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -30,6 +31,9 @@ import ( type algebraicSetOp struct { processorBase + flowCtx *FlowCtx + evalCtx *tree.EvalContext + leftSource, rightSource RowSource opType AlgebraicSetOpSpec_SetOpType ordering Ordering @@ -47,6 +51,7 @@ func newAlgebraicSetOp( output RowReceiver, ) (*algebraicSetOp, error) { e := &algebraicSetOp{ + flowCtx: flowCtx, leftSource: leftSource, rightSource: rightSource, ordering: spec.Ordering, @@ -99,6 +104,8 @@ func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) { defer e.leftSource.ConsumerDone() defer e.rightSource.ConsumerDone() + e.evalCtx = e.flowCtx.NewEvalCtx() + switch e.opType { case AlgebraicSetOpSpec_Except_all: err := e.exceptAll(ctx) @@ -122,11 +129,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error { convertToColumnOrdering(e.ordering), ) - leftRows, err := leftGroup.advanceGroup() + leftRows, err := leftGroup.advanceGroup(e.evalCtx) if err != nil { return err } - rightRows, err := rightGroup.advanceGroup() + rightRows, err := rightGroup.advanceGroup(e.evalCtx) if err != nil { return err } @@ -160,6 +167,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error { convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering), false, /* nullEquality */ e.datumAlloc, + e.evalCtx, ) if err != nil { return err @@ -195,11 +203,11 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error { } } } - leftRows, err = leftGroup.advanceGroup() + leftRows, err = leftGroup.advanceGroup(e.evalCtx) if err != nil { return err } - rightRows, err = rightGroup.advanceGroup() + rightRows, err = rightGroup.advanceGroup(e.evalCtx) if err != nil { return err } @@ -214,13 +222,13 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error { return err } } - leftRows, err = leftGroup.advanceGroup() + leftRows, err = leftGroup.advanceGroup(e.evalCtx) if err != nil { return err } } if cmp > 0 { - rightRows, err = rightGroup.advanceGroup() + rightRows, err = rightGroup.advanceGroup(e.evalCtx) if len(rightRows) == 0 { break } @@ -244,7 +252,7 @@ func (e *algebraicSetOp) exceptAll(ctx context.Context) error { // Emit all remaining rows. for { - leftRows, err = leftGroup.advanceGroup() + leftRows, err = leftGroup.advanceGroup(e.evalCtx) // Emit all left rows until completion/error. if err != nil || len(leftRows) == 0 { return err diff --git a/pkg/sql/distsqlrun/mergejoiner.go b/pkg/sql/distsqlrun/mergejoiner.go index fdcffb2c09c9..12fc0045367f 100644 --- a/pkg/sql/distsqlrun/mergejoiner.go +++ b/pkg/sql/distsqlrun/mergejoiner.go @@ -20,6 +20,7 @@ import ( "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -32,6 +33,9 @@ import ( type mergeJoiner struct { joinerBase + flowCtx *FlowCtx + evalCtx *tree.EvalContext + leftSource, rightSource RowSource streamMerger streamMerger @@ -54,6 +58,7 @@ func newMergeJoiner( } m := &mergeJoiner{ + flowCtx: flowCtx, leftSource: leftSource, rightSource: rightSource, } @@ -90,6 +95,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) { log.VEventf(ctx, 2, "starting merge joiner run") cancelChecker := sqlbase.NewCancelChecker(ctx) + m.evalCtx = m.flowCtx.NewEvalCtx() for { moreBatches, err := m.outputBatch(ctx, cancelChecker) @@ -112,7 +118,7 @@ func (m *mergeJoiner) Run(ctx context.Context, wg *sync.WaitGroup) { func (m *mergeJoiner) outputBatch( ctx context.Context, cancelChecker *sqlbase.CancelChecker, ) (bool, error) { - leftRows, rightRows, err := m.streamMerger.NextBatch() + leftRows, rightRows, err := m.streamMerger.NextBatch(m.evalCtx) if err != nil { return false, err } diff --git a/pkg/sql/distsqlrun/stream_group_accumulator.go b/pkg/sql/distsqlrun/stream_group_accumulator.go index 24b025d82e2e..41db0c965287 100644 --- a/pkg/sql/distsqlrun/stream_group_accumulator.go +++ b/pkg/sql/distsqlrun/stream_group_accumulator.go @@ -71,14 +71,14 @@ func (s *streamGroupAccumulator) peekAtCurrentGroup() (sqlbase.EncDatumRow, erro // advanceGroup returns all rows of the current group and advances the internal // state to the next group, so that a subsequent peekAtCurrentGroup() will // return the first row of the next group. -func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) { +func (s *streamGroupAccumulator) advanceGroup( + evalCtx *tree.EvalContext, +) ([]sqlbase.EncDatumRow, error) { if s.srcConsumed { // If src has been exhausted, then we also must have advanced away from the // last group. return nil, nil } - // TODO(radu): plumb EvalContext - evalCtx := &tree.EvalContext{} for { row, err := s.src.NextRow() @@ -91,6 +91,9 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) { } if len(s.curGroup) == 0 { + if s.curGroup == nil { + s.curGroup = make([]sqlbase.EncDatumRow, 0, 64) + } s.curGroup = append(s.curGroup, row) continue } @@ -107,8 +110,15 @@ func (s *streamGroupAccumulator) advanceGroup() ([]sqlbase.EncDatumRow, error) { s.curGroup[0].String(s.types), row.String(s.types), ) } else { - ret := s.curGroup - s.curGroup = []sqlbase.EncDatumRow{row} + n := len(s.curGroup) + ret := s.curGroup[:n:n] + // The curGroup slice possibly has additional space at the end of it. Use + // it if possible to avoid an allocation. + s.curGroup = s.curGroup[n:] + if cap(s.curGroup) == 0 { + s.curGroup = make([]sqlbase.EncDatumRow, 0, 64) + } + s.curGroup = append(s.curGroup, row) return ret, nil } } diff --git a/pkg/sql/distsqlrun/stream_merger.go b/pkg/sql/distsqlrun/stream_merger.go index b49a5a615b8b..0dbad59e3eff 100644 --- a/pkg/sql/distsqlrun/stream_merger.go +++ b/pkg/sql/distsqlrun/stream_merger.go @@ -38,7 +38,9 @@ type streamMerger struct { // NextBatch returns a set of rows from the left stream and a set of rows from // the right stream, all matching on the equality columns. One of the sets can // be empty. -func (sm *streamMerger) NextBatch() ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, error) { +func (sm *streamMerger) NextBatch( + evalCtx *tree.EvalContext, +) ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, error) { lrow, err := sm.left.peekAtCurrentGroup() if err != nil { return nil, nil, err @@ -52,20 +54,21 @@ func (sm *streamMerger) NextBatch() ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRo } cmp, err := CompareEncDatumRowForMerge( - sm.left.types, lrow, rrow, sm.left.ordering, sm.right.ordering, sm.nullEquality, &sm.datumAlloc, + sm.left.types, lrow, rrow, sm.left.ordering, sm.right.ordering, + sm.nullEquality, &sm.datumAlloc, evalCtx, ) if err != nil { return nil, nil, err } var leftGroup, rightGroup []sqlbase.EncDatumRow if cmp <= 0 { - leftGroup, err = sm.left.advanceGroup() + leftGroup, err = sm.left.advanceGroup(evalCtx) if err != nil { return nil, nil, err } } if cmp >= 0 { - rightGroup, err = sm.right.advanceGroup() + rightGroup, err = sm.right.advanceGroup(evalCtx) if err != nil { return nil, nil, err } @@ -89,6 +92,7 @@ func CompareEncDatumRowForMerge( leftOrdering, rightOrdering sqlbase.ColumnOrdering, nullEquality bool, da *sqlbase.DatumAlloc, + evalCtx *tree.EvalContext, ) (int, error) { if lhs == nil && rhs == nil { return 0, nil @@ -105,9 +109,6 @@ func CompareEncDatumRowForMerge( ) } - // TODO(radu): plumb EvalContext - evalCtx := &tree.EvalContext{} - for i, ord := range leftOrdering { lIdx := ord.ColIdx rIdx := rightOrdering[i].ColIdx