Skip to content

Commit

Permalink
sql: use distsql for local distinct
Browse files Browse the repository at this point in the history
Use the distsql distinct processor for processing local sql
queries.

Closes #23901.

Release note: none
  • Loading branch information
Arjun Narayan authored and jordanlewis committed Jun 11, 2018
1 parent 4088cdd commit c74fb21
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 135 deletions.
153 changes: 39 additions & 114 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package sql

import (
"bytes"
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/pkg/errors"
)

// distinctNode de-duplicates rows returned by a wrapped planNode.
Expand All @@ -41,7 +41,7 @@ type distinctNode struct {
// the DISTINCT ON (<exprs>) clause.
distinctOnColIdxs util.FastIntSet

run distinctRun
run *rowSourceToPlanNode
}

// distinct constructs a distinctNode.
Expand Down Expand Up @@ -187,132 +187,57 @@ func (p *planner) distinct(
return plan, d, nil
}

// distinctRun contains the run-time state of distinctNode during local execution.
type distinctRun struct {
// Encoding of the columnsInOrder columns for the previous row.
prefixSeen []byte
prefixMemAcc mon.BoundAccount

// Encoding of the non-columnInOrder columns for rows sharing the same
// prefixSeen value.
suffixSeen map[string]struct{}
suffixMemAcc mon.BoundAccount
}

func (n *distinctNode) startExec(params runParams) error {
n.run.prefixMemAcc = params.EvalContext().Mon.MakeBoundAccount()
n.run.suffixMemAcc = params.EvalContext().Mon.MakeBoundAccount()
n.run.suffixSeen = make(map[string]struct{})
return nil
}

func (n *distinctNode) Next(params runParams) (bool, error) {
ctx := params.ctx
flowCtx := &distsqlrun.FlowCtx{
EvalCtx: *params.EvalContext(),
}

for {
if err := params.p.cancelChecker.Check(); err != nil {
return false, err
}
cols := make([]int, len(planColumns(n.plan)))
for i := range cols {
cols[i] = i
}

next, err := n.plan.Next(params)
if !next {
return false, err
}
spec := createDistinctSpec(n, cols)

// Detect duplicates
prefix, suffix, err := n.encodeDistinctOnVals(n.plan.Values())
if err != nil {
return false, err
}
input, err := makePlanNodeToRowSource(n.plan, params)
if err != nil {
return err
}
if len(spec.DistinctColumns) == 0 {
return errors.New("cannot initialize a distinctNode with 0 columns")
}

if !bytes.Equal(prefix, n.run.prefixSeen) {
// The prefix of the row which is ordered differs from the last row;
// reset our seen set.
if len(n.run.suffixSeen) > 0 {
n.run.suffixMemAcc.Clear(ctx)
n.run.suffixSeen = make(map[string]struct{})
}
if err := n.run.prefixMemAcc.Resize(
ctx, int64(len(n.run.prefixSeen)), int64(len(prefix))); err != nil {
return false, err
}
n.run.prefixSeen = prefix
if suffix != nil {
if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, string(suffix)); err != nil {
return false, err
}
}
return true, nil
}
post := &distsqlrun.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.

// The prefix of the row is the same as the last row; check
// to see if the suffix which is not ordered has been seen.
if suffix != nil {
sKey := string(suffix)
if _, ok := n.run.suffixSeen[sKey]; !ok {
if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, sKey); err != nil {
return false, err
}
return true, nil
}
}
proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output)
if err != nil {
return err
}
}

func (n *distinctNode) Values() tree.Datums {
// We return only the required columns set during planning.
// These columns are always at the beginning of the child row since
// we _append_ additional DISTINCT ON columns.
// See planner.distinct.
return n.plan.Values()
}
n.run = makeRowSourceToPlanNode(proc)

func (n *distinctNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.run.prefixSeen = nil
n.run.prefixMemAcc.Close(ctx)
n.run.suffixSeen = nil
n.run.suffixMemAcc.Close(ctx)
}
n.run.source.Start(params.ctx)

func (n *distinctNode) addSuffixSeen(
ctx context.Context, acc *mon.BoundAccount, sKey string,
) error {
sz := int64(len(sKey))
if err := acc.Grow(ctx, sz); err != nil {
return err
}
n.run.suffixSeen[sKey] = struct{}{}
return nil
}

// TODO(irfansharif): This can be refactored away to use
// sqlbase.EncodeDatums([]byte, tree.Datums)
func (n *distinctNode) encodeDistinctOnVals(values tree.Datums) ([]byte, []byte, error) {
var prefix, suffix []byte
var err error
for i, val := range values {
// Only encode DISTINCT ON expressions/columns (if applicable).
if !n.distinctOnColIdxs.Empty() && !n.distinctOnColIdxs.Contains(i) {
continue
}
func (n *distinctNode) Next(params runParams) (bool, error) {
return n.run.Next(params)
}

if n.columnsInOrder != nil && n.columnsInOrder[i] {
if prefix == nil {
prefix = make([]byte, 0, 100)
}
prefix, err = sqlbase.EncodeDatum(prefix, val)
} else {
if suffix == nil {
suffix = make([]byte, 0, 100)
}
suffix, err = sqlbase.EncodeDatum(suffix, val)
}
if err != nil {
break
}
func (n *distinctNode) Values() tree.Datums {
return n.run.Values()
}

func (n *distinctNode) Close(ctx context.Context) {
if n.run != nil {
n.run.Close(ctx)
} else {
// If we haven't gotten around to initializing n.run yet, then we still
// need to propagate the close message to our inputs - do so directly.
n.plan.Close(ctx)
}
return prefix, suffix, err
}

// projectChildPropsToOnExprs takes the physical props (e.g. ordering info,
Expand Down
37 changes: 21 additions & 16 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
}

// getTypesForPlanResult returns the types of the elements in the result streams
// of a plan that corresponds to a given planNode. If planToSreamColMap is nil,
// of a plan that corresponds to a given planNode. If planToStreamColMap is nil,
// a 1-1 mapping is assumed.
func getTypesForPlanResult(node planNode, planToStreamColMap []int) ([]sqlbase.ColumnType, error) {
nodeColumns := planColumns(node)
Expand Down Expand Up @@ -2220,42 +2220,47 @@ func (dsp *DistSQLPlanner) createPlanForValues(
}, nil
}

func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *planningCtx, n *distinctNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
if err != nil {
return physicalPlan{}, err
}
currentResultRouters := plan.ResultRouters
func createDistinctSpec(n *distinctNode, cols []int) *distsqlrun.DistinctSpec {
var orderedColumns []uint32
for i, o := range n.columnsInOrder {
if o {
orderedColumns = append(orderedColumns, uint32(plan.planToStreamColMap[i]))
orderedColumns = append(orderedColumns, uint32(cols[i]))
}
}

var distinctColumns []uint32
if !n.distinctOnColIdxs.Empty() {
for planCol, streamCol := range plan.planToStreamColMap {
for planCol, streamCol := range cols {
if streamCol != -1 && n.distinctOnColIdxs.Contains(planCol) {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
} else {
// If no distinct columns were specified, run distinct on the entire row.
for planCol := range planColumns(n) {
if streamCol := plan.planToStreamColMap[planCol]; streamCol != -1 {
if streamCol := cols[planCol]; streamCol != -1 {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
}

return &distsqlrun.DistinctSpec{
OrderedColumns: orderedColumns,
DistinctColumns: distinctColumns,
}
}

func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *planningCtx, n *distinctNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
if err != nil {
return physicalPlan{}, err
}
currentResultRouters := plan.ResultRouters

distinctSpec := distsqlrun.ProcessorCoreUnion{
Distinct: &distsqlrun.DistinctSpec{
OrderedColumns: orderedColumns,
DistinctColumns: distinctColumns,
},
Distinct: createDistinctSpec(n, plan.planToStreamColMap),
}

if len(currentResultRouters) == 1 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ type RowSource interface {
ConsumerClosed()
}

// RowSourcedProcessor is the union of RowSource and Processor.
type RowSourcedProcessor interface {
RowSource
Run(_ context.Context, wg *sync.WaitGroup)
}

// Run reads records from the source and outputs them to the receiver, properly
// draining the source of metadata and closing both the source and receiver.
//
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/distsqlrun/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
)

// Distinct is the physical processor implementation of the DISTINCT relational operator.
type Distinct struct {
processorBase

Expand Down Expand Up @@ -57,14 +58,15 @@ var _ RowSource = &SortedDistinct{}

const sortedDistinctProcName = "sorted distinct"

func newDistinct(
// NewDistinct instantiates a new Distinct processor.
func NewDistinct(
flowCtx *FlowCtx,
processorID int32,
spec *DistinctSpec,
input RowSource,
post *PostProcessSpec,
output RowReceiver,
) (Processor, error) {
) (RowSourcedProcessor, error) {
if len(spec.DistinctColumns) == 0 {
return nil, errors.New("programming error: 0 distinct columns specified for distinct processor")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestDistinct(t *testing.T) {
EvalCtx: evalCtx,
}

d, err := newDistinct(&flowCtx, 0 /* processorID */, &ds, in, &PostProcessSpec{}, out)
d, err := NewDistinct(&flowCtx, 0 /* processorID */, &ds, in, &PostProcessSpec{}, out)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func benchmarkDistinct(b *testing.B, orderedColumns []uint32) {
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newDistinct(flowCtx, 0 /* processorID */, spec, input, post, &RowDisposer{})
d, err := NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, &RowDisposer{})
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func newProcessor(
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newDistinct(flowCtx, processorID, core.Distinct, inputs[0], post, outputs[0])
return NewDistinct(flowCtx, processorID, core.Distinct, inputs[0], post, outputs[0])
}
if core.Aggregator != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
Expand Down
Loading

0 comments on commit c74fb21

Please sign in to comment.