Skip to content

Commit

Permalink
distsqlrun: export distinct processor
Browse files Browse the repository at this point in the history
Prerequisite before embedding it in the planNodes in Local SQL.

Release note: None
  • Loading branch information
Arjun Narayan authored and jordanlewis committed Jun 9, 2018
1 parent 7ece796 commit 4088cdd
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions pkg/sql/distsqlrun/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pkg/errors"
)

type distinct struct {
type Distinct struct {
processorBase

input RowSource
Expand All @@ -41,19 +41,19 @@ type distinct struct {
scratch []byte
}

// sortedDistinct is a specialized distinct that can be used when all of the
// SortedDistinct is a specialized distinct that can be used when all of the
// distinct columns are also ordered.
type sortedDistinct struct {
distinct
type SortedDistinct struct {
Distinct
}

var _ Processor = &distinct{}
var _ RowSource = &distinct{}
var _ Processor = &Distinct{}
var _ RowSource = &Distinct{}

const distinctProcName = "distinct"

var _ Processor = &sortedDistinct{}
var _ RowSource = &sortedDistinct{}
var _ Processor = &SortedDistinct{}
var _ RowSource = &SortedDistinct{}

const sortedDistinctProcName = "sorted distinct"

Expand Down Expand Up @@ -82,7 +82,7 @@ func newDistinct(
distinctCols.Add(int(col))
}

d := &distinct{
d := &Distinct{
input: input,
orderedCols: spec.OrderedColumns,
distinctCols: distinctCols,
Expand All @@ -106,22 +106,22 @@ func newDistinct(

if allSorted {
// We can use the faster sortedDistinct processor.
return &sortedDistinct{
distinct: *d,
return &SortedDistinct{
Distinct: *d,
}, nil
}

return d, nil
}

// Start is part of the RowSource interface.
func (d *distinct) Start(ctx context.Context) context.Context {
func (d *Distinct) Start(ctx context.Context) context.Context {
d.input.Start(ctx)
return d.startInternal(ctx, distinctProcName)
}

// Run is part of the processor interface.
func (d *distinct) Run(ctx context.Context, wg *sync.WaitGroup) {
func (d *Distinct) Run(ctx context.Context, wg *sync.WaitGroup) {
if d.out.output == nil {
panic("distinct output not initialized for emitting rows")
}
Expand All @@ -133,13 +133,13 @@ func (d *distinct) Run(ctx context.Context, wg *sync.WaitGroup) {
}

// Start is part of the RowSource interface.
func (d *sortedDistinct) Start(ctx context.Context) context.Context {
func (d *SortedDistinct) Start(ctx context.Context) context.Context {
d.input.Start(ctx)
return d.startInternal(ctx, sortedDistinctProcName)
}

// Run is part of the processor interface.
func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) {
func (d *SortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) {
if d.out.output == nil {
panic("distinct output not initialized for emitting rows")
}
Expand All @@ -150,7 +150,7 @@ func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) {
}
}

func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) {
func (d *Distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) {
if !d.haveLastGroupKey {
return false, nil
}
Expand All @@ -167,7 +167,7 @@ func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) {

// encode appends the encoding of non-ordered columns, which we use as a key in
// our 'seen' set.
func (d *distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, error) {
func (d *Distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, error) {
var err error
for i, datum := range row {
// Ignore columns that are not in the distinctCols, as if we are
Expand All @@ -192,14 +192,14 @@ func (d *distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, err
return appendTo, nil
}

func (d *distinct) close() {
func (d *Distinct) close() {
// Need to close the mem accounting while the context is still valid.
d.memAcc.Close(d.ctx)
d.internalClose()
}

// Next is part of the RowSource interface.
func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
func (d *Distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
for d.state == stateRunning {
row, meta := d.input.Next()
if meta != nil {
Expand Down Expand Up @@ -266,7 +266,7 @@ func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
//
// sortedDistinct is simpler than distinct. All it has to do is keep track
// of the last row it saw, emitting if the new row is different.
func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
func (d *SortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
for d.state == stateRunning {
row, meta := d.input.Next()
if meta != nil {
Expand Down Expand Up @@ -296,12 +296,12 @@ func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
}

// ConsumerDone is part of the RowSource interface.
func (d *distinct) ConsumerDone() {
func (d *Distinct) ConsumerDone() {
d.input.ConsumerDone()
}

// ConsumerClosed is part of the RowSource interface.
func (d *distinct) ConsumerClosed() {
func (d *Distinct) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
d.close()
}

0 comments on commit 4088cdd

Please sign in to comment.