Skip to content

Commit

Permalink
Merge #25873
Browse files Browse the repository at this point in the history
25873: sql: use distsql for local distinct r=jordanlewis a=arjunravinarayan

Use the distsql distinct processor for processing local sql queries. This is a work-in-progress to demonstrate and begin a discussion around the shims used.

This PR is rebased on top of #25860, so only evaluate the last commit. The main interfaces on which I'd like discussion are RowSourceToPlanNode and PlanNodeToRowSource. The rest of this PR is exporting changes (2nd commit), some planning plumbing, and code removal in local distinct.

Co-authored-by: Arjun Narayan <[email protected]>
  • Loading branch information
craig[bot] and Arjun Narayan committed Jun 12, 2018
2 parents 50416d0 + c74fb21 commit 529f626
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 157 deletions.
153 changes: 39 additions & 114 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package sql

import (
"bytes"
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/pkg/errors"
)

// distinctNode de-duplicates rows returned by a wrapped planNode.
Expand All @@ -41,7 +41,7 @@ type distinctNode struct {
// the DISTINCT ON (<exprs>) clause.
distinctOnColIdxs util.FastIntSet

run distinctRun
run *rowSourceToPlanNode
}

// distinct constructs a distinctNode.
Expand Down Expand Up @@ -187,132 +187,57 @@ func (p *planner) distinct(
return plan, d, nil
}

// distinctRun contains the run-time state of distinctNode during local execution.
type distinctRun struct {
// Encoding of the columnsInOrder columns for the previous row.
prefixSeen []byte
prefixMemAcc mon.BoundAccount

// Encoding of the non-columnInOrder columns for rows sharing the same
// prefixSeen value.
suffixSeen map[string]struct{}
suffixMemAcc mon.BoundAccount
}

func (n *distinctNode) startExec(params runParams) error {
n.run.prefixMemAcc = params.EvalContext().Mon.MakeBoundAccount()
n.run.suffixMemAcc = params.EvalContext().Mon.MakeBoundAccount()
n.run.suffixSeen = make(map[string]struct{})
return nil
}

func (n *distinctNode) Next(params runParams) (bool, error) {
ctx := params.ctx
flowCtx := &distsqlrun.FlowCtx{
EvalCtx: *params.EvalContext(),
}

for {
if err := params.p.cancelChecker.Check(); err != nil {
return false, err
}
cols := make([]int, len(planColumns(n.plan)))
for i := range cols {
cols[i] = i
}

next, err := n.plan.Next(params)
if !next {
return false, err
}
spec := createDistinctSpec(n, cols)

// Detect duplicates
prefix, suffix, err := n.encodeDistinctOnVals(n.plan.Values())
if err != nil {
return false, err
}
input, err := makePlanNodeToRowSource(n.plan, params)
if err != nil {
return err
}
if len(spec.DistinctColumns) == 0 {
return errors.New("cannot initialize a distinctNode with 0 columns")
}

if !bytes.Equal(prefix, n.run.prefixSeen) {
// The prefix of the row which is ordered differs from the last row;
// reset our seen set.
if len(n.run.suffixSeen) > 0 {
n.run.suffixMemAcc.Clear(ctx)
n.run.suffixSeen = make(map[string]struct{})
}
if err := n.run.prefixMemAcc.Resize(
ctx, int64(len(n.run.prefixSeen)), int64(len(prefix))); err != nil {
return false, err
}
n.run.prefixSeen = prefix
if suffix != nil {
if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, string(suffix)); err != nil {
return false, err
}
}
return true, nil
}
post := &distsqlrun.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.

// The prefix of the row is the same as the last row; check
// to see if the suffix which is not ordered has been seen.
if suffix != nil {
sKey := string(suffix)
if _, ok := n.run.suffixSeen[sKey]; !ok {
if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, sKey); err != nil {
return false, err
}
return true, nil
}
}
proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output)
if err != nil {
return err
}
}

func (n *distinctNode) Values() tree.Datums {
// We return only the required columns set during planning.
// These columns are always at the beginning of the child row since
// we _append_ additional DISTINCT ON columns.
// See planner.distinct.
return n.plan.Values()
}
n.run = makeRowSourceToPlanNode(proc)

func (n *distinctNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.run.prefixSeen = nil
n.run.prefixMemAcc.Close(ctx)
n.run.suffixSeen = nil
n.run.suffixMemAcc.Close(ctx)
}
n.run.source.Start(params.ctx)

func (n *distinctNode) addSuffixSeen(
ctx context.Context, acc *mon.BoundAccount, sKey string,
) error {
sz := int64(len(sKey))
if err := acc.Grow(ctx, sz); err != nil {
return err
}
n.run.suffixSeen[sKey] = struct{}{}
return nil
}

// TODO(irfansharif): This can be refactored away to use
// sqlbase.EncodeDatums([]byte, tree.Datums)
func (n *distinctNode) encodeDistinctOnVals(values tree.Datums) ([]byte, []byte, error) {
var prefix, suffix []byte
var err error
for i, val := range values {
// Only encode DISTINCT ON expressions/columns (if applicable).
if !n.distinctOnColIdxs.Empty() && !n.distinctOnColIdxs.Contains(i) {
continue
}
func (n *distinctNode) Next(params runParams) (bool, error) {
return n.run.Next(params)
}

if n.columnsInOrder != nil && n.columnsInOrder[i] {
if prefix == nil {
prefix = make([]byte, 0, 100)
}
prefix, err = sqlbase.EncodeDatum(prefix, val)
} else {
if suffix == nil {
suffix = make([]byte, 0, 100)
}
suffix, err = sqlbase.EncodeDatum(suffix, val)
}
if err != nil {
break
}
func (n *distinctNode) Values() tree.Datums {
return n.run.Values()
}

func (n *distinctNode) Close(ctx context.Context) {
if n.run != nil {
n.run.Close(ctx)
} else {
// If we haven't gotten around to initializing n.run yet, then we still
// need to propagate the close message to our inputs - do so directly.
n.plan.Close(ctx)
}
return prefix, suffix, err
}

// projectChildPropsToOnExprs takes the physical props (e.g. ordering info,
Expand Down
37 changes: 21 additions & 16 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
}

// getTypesForPlanResult returns the types of the elements in the result streams
// of a plan that corresponds to a given planNode. If planToSreamColMap is nil,
// of a plan that corresponds to a given planNode. If planToStreamColMap is nil,
// a 1-1 mapping is assumed.
func getTypesForPlanResult(node planNode, planToStreamColMap []int) ([]sqlbase.ColumnType, error) {
nodeColumns := planColumns(node)
Expand Down Expand Up @@ -2220,42 +2220,47 @@ func (dsp *DistSQLPlanner) createPlanForValues(
}, nil
}

func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *planningCtx, n *distinctNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
if err != nil {
return physicalPlan{}, err
}
currentResultRouters := plan.ResultRouters
func createDistinctSpec(n *distinctNode, cols []int) *distsqlrun.DistinctSpec {
var orderedColumns []uint32
for i, o := range n.columnsInOrder {
if o {
orderedColumns = append(orderedColumns, uint32(plan.planToStreamColMap[i]))
orderedColumns = append(orderedColumns, uint32(cols[i]))
}
}

var distinctColumns []uint32
if !n.distinctOnColIdxs.Empty() {
for planCol, streamCol := range plan.planToStreamColMap {
for planCol, streamCol := range cols {
if streamCol != -1 && n.distinctOnColIdxs.Contains(planCol) {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
} else {
// If no distinct columns were specified, run distinct on the entire row.
for planCol := range planColumns(n) {
if streamCol := plan.planToStreamColMap[planCol]; streamCol != -1 {
if streamCol := cols[planCol]; streamCol != -1 {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
}

return &distsqlrun.DistinctSpec{
OrderedColumns: orderedColumns,
DistinctColumns: distinctColumns,
}
}

func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *planningCtx, n *distinctNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
if err != nil {
return physicalPlan{}, err
}
currentResultRouters := plan.ResultRouters

distinctSpec := distsqlrun.ProcessorCoreUnion{
Distinct: &distsqlrun.DistinctSpec{
OrderedColumns: orderedColumns,
DistinctColumns: distinctColumns,
},
Distinct: createDistinctSpec(n, plan.planToStreamColMap),
}

if len(currentResultRouters) == 1 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ type RowSource interface {
ConsumerClosed()
}

// RowSourcedProcessor is the union of RowSource and Processor.
type RowSourcedProcessor interface {
RowSource
Run(_ context.Context, wg *sync.WaitGroup)
}

// Run reads records from the source and outputs them to the receiver, properly
// draining the source of metadata and closing both the source and receiver.
//
Expand Down
Loading

0 comments on commit 529f626

Please sign in to comment.