Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] distsql: row batching #20621

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 104 additions & 75 deletions pkg/sql/distsqlrun/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func GetAggregateInfo(
// aggregator's output schema is comprised of what is specified by the
// accompanying SELECT expressions.
type aggregator struct {
Batching bool
processorBase

flowCtx *FlowCtx
Expand Down Expand Up @@ -172,19 +173,21 @@ func newAggregator(
return ag, nil
}

func (ag *aggregator) Close(ctx context.Context) {
for _, f := range ag.funcs {
for _, aggFunc := range f.buckets {
aggFunc.Close(ctx)
}
}
ag.bucketsAcc.Close(ctx)
}

// Run is part of the processor interface.
func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
defer ag.bucketsAcc.Close(ctx)
defer func() {
for _, f := range ag.funcs {
for _, aggFunc := range f.buckets {
aggFunc.Close(ctx)
}
}
}()
defer ag.Close(ctx)

ctx = log.WithLogTag(ctx, "Agg", nil)
ctx, span := processorSpan(ctx, "aggregator")
Expand All @@ -201,7 +204,11 @@ func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) {
}

log.VEvent(ctx, 1, "accumulation complete")
ag.RenderResults(ctx)

}

func (ag *aggregator) RenderResults(ctx context.Context) {
// Queries like `SELECT MAX(n) FROM t` expect a row of NULLs if nothing was
// aggregated.
if len(ag.buckets) < 1 && len(ag.groupCols) == 0 {
Expand Down Expand Up @@ -245,6 +252,31 @@ func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) {
// If an error is returned, both the input and the output have been properly
// closed, and the error has also been forwarded to the output.
func (ag *aggregator) accumulateRows(ctx context.Context) (err error) {
var scratch []byte
if ag.Batching {
rowChan := ag.input.(*RowChannel)
for {
batch, meta := rowChan.NextBatch()
for _, row := range batch {
if cont, err := ag.NextRow(ctx, scratch, row, meta); err != nil || !cont {
return err
}
}
if batch == nil {
return nil
}
}
return nil
}
for {
row, meta := ag.input.Next()
if cont, err := ag.NextRow(ctx, scratch, row, meta); err != nil || !cont {
return err
}
}
}

func (ag *aggregator) NextRow(ctx context.Context, scratch []byte, row sqlbase.EncDatumRow, meta ProducerMetadata) (cont bool, err error) {
cleanupRequired := true
defer func() {
if err != nil {
Expand All @@ -254,85 +286,82 @@ func (ag *aggregator) accumulateRows(ctx context.Context) (err error) {
}
}
}()

var scratch []byte
for {
row, meta := ag.input.Next()
if !meta.Empty() {
if meta.Err != nil {
return meta.Err
}
if !emitHelper(ctx, &ag.out, nil /* row */, meta, ag.input) {
// TODO(andrei): here, because we're passing metadata through, we have
// an opportunity to find out that the consumer doesn't need the data
// any more. If the producer doesn't push any metadata, then there's no
// opportunity to find this out until the accumulation phase is done. We
// should have a way to periodically peek at the state of the
// RowReceiver that's hiding behind the ProcOutputHelper.
cleanupRequired = false
return errors.Errorf("consumer stopped before it received rows")
}
continue
if !meta.Empty() {
if meta.Err != nil {
return false, meta.Err
}
if row == nil {
return nil
if !emitHelper(ctx, &ag.out, nil /* row */, meta, ag.input) {
// TODO(andrei): here, because we're passing metadata through, we have
// an opportunity to find out that the consumer doesn't need the data
// any more. If the producer doesn't push any metadata, then there's no
// opportunity to find this out until the accumulation phase is done. We
// should have a way to periodically peek at the state of the
// RowReceiver that's hiding behind the ProcOutputHelper.
cleanupRequired = false
return false, errors.Errorf("consumer stopped before it received rows")
}
return true, nil
}
if row == nil {
return false, nil
}

// The encoding computed here determines which bucket the non-grouping
// datums are accumulated to.
encoded, err := ag.encode(scratch, row)
if err != nil {
return err
}
// The encoding computed here determines which bucket the non-grouping
// datums are accumulated to.
encoded, err := ag.encode(scratch, row)
if err != nil {
return false, err
}

if _, ok := ag.buckets[string(encoded)]; !ok {
if err := ag.bucketsAcc.Grow(ctx, int64(len(encoded))); err != nil {
return err
}
ag.buckets[string(encoded)] = struct{}{}
if _, ok := ag.buckets[string(encoded)]; !ok {
if err := ag.bucketsAcc.Grow(ctx, int64(len(encoded))); err != nil {
return false, err
}
ag.buckets[string(encoded)] = struct{}{}
}

// Feed the func holders for this bucket the non-grouping datums.
for i, a := range ag.aggregations {
if a.FilterColIdx != nil {
col := *a.FilterColIdx
if err := row[col].EnsureDecoded(&ag.inputTypes[col], &ag.datumAlloc); err != nil {
return err
}
if row[*a.FilterColIdx].Datum != tree.DBoolTrue {
// This row doesn't contribute to this aggregation.
continue
}
// Feed the func holders for this bucket the non-grouping datums.
for i, a := range ag.aggregations {
if a.FilterColIdx != nil {
col := *a.FilterColIdx
if err := row[col].EnsureDecoded(&ag.inputTypes[col], &ag.datumAlloc); err != nil {
return false, err
}
// Extract the corresponding arguments from the row to feed into the
// aggregate function.
// Most functions require at most one argument thus we separate
// the first argument and allocation of (if applicable) a variadic
// collection of arguments thereafter.
var firstArg tree.Datum
var otherArgs tree.Datums
if len(a.ColIdx) > 1 {
otherArgs = make(tree.Datums, len(a.ColIdx)-1)
if row[*a.FilterColIdx].Datum != tree.DBoolTrue {
// This row doesn't contribute to this aggregation.
return true, nil
}
isFirstArg := true
for j, c := range a.ColIdx {
if err := row[c].EnsureDecoded(&ag.inputTypes[c], &ag.datumAlloc); err != nil {
return err
}
if isFirstArg {
firstArg = row[c].Datum
isFirstArg = false
continue
}
otherArgs[j-1] = row[c].Datum
}
// Extract the corresponding arguments from the row to feed into the
// aggregate function.
// Most functions require at most one argument thus we separate
// the first argument and allocation of (if applicable) a variadic
// collection of arguments thereafter.
var firstArg tree.Datum
var otherArgs tree.Datums
if len(a.ColIdx) > 1 {
otherArgs = make(tree.Datums, len(a.ColIdx)-1)
}
isFirstArg := true
for j, c := range a.ColIdx {
if err := row[c].EnsureDecoded(&ag.inputTypes[c], &ag.datumAlloc); err != nil {
return false, err
}

if err := ag.funcs[i].add(ctx, encoded, firstArg, otherArgs); err != nil {
return err
if isFirstArg {
firstArg = row[c].Datum
isFirstArg = false
return true, nil
}
otherArgs[j-1] = row[c].Datum
}

if err := ag.funcs[i].add(ctx, encoded, firstArg, otherArgs); err != nil {
return false, err
}
scratch = encoded[:0]
}

scratch = encoded[:0]
return true, nil
}

type aggregateFuncHolder struct {
Expand Down
101 changes: 93 additions & 8 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ type RowReceiver interface {
ProducerDone()
}

type RowBatchReceiver interface {
RowReceiver
// TODO(asubiotto): Comment. Same semantics as RowReceiver.Push
PushBatch(batch RowBatch) ConsumerStatus
}

// CancellableRowReceiver is a special type of a RowReceiver that can be set to
// canceled asynchronously (i.e. concurrently or after Push()es and ProducerDone()s).
// Once canceled, subsequent Push()es return ConsumerClosed. Implemented by distSQLReceiver
Expand Down Expand Up @@ -147,6 +153,12 @@ type RowSource interface {
ConsumerClosed()
}

type RowBatchSource interface {
RowSource
// TODO(asubiotto): Comment. Should be same semantics as Next()
NextBatch() (RowBatch, ProducerMetadata)
}

// DrainAndForwardMetadata calls src.ConsumerDone() (thus asking src for
// draining metadata) and then forwards all the metadata to dst.
//
Expand Down Expand Up @@ -278,6 +290,8 @@ type RowChannelMsg struct {
// Only one of these fields will be set.
Row sqlbase.EncDatumRow
Meta ProducerMetadata
// TODO(asubiotto): Eventually Batch will replace Row.
Batch RowBatch
}

// ProducerMetadata represents a metadata record flowing through a DistSQL flow.
Expand All @@ -292,6 +306,24 @@ type ProducerMetadata struct {
TraceData []tracing.RecordedSpan
}

// RowBatchSize is the maximum size of a RowBatch. This number is based on the
// batch size of the kv fetcher.
const RowBatchSize = 10000

// TODO(asubiotto): Would this make more sense in pkg/sql/sqlbase?
// TODO(asubiotto): Inefficient representation for now. Might be better to
// be a [][]byte field or a []byte with row offsets stored alongside and a map
// of index to any special (non-value) encodings.
type RowBatch []sqlbase.EncDatumRow

// TODO(asubiotto): For processors that don't work on batches of rows and will
// be calling Next(), we will want to "unbatch" the rows in the RowSource.
// CAREFUL: We might be breaking ordering guarantees.

// TODO(asubiotto): Write a RowBatchIterator. It kind of bugs me how similar
// this is to the memRowContainer. It's probably too heavyweight for this. Might
// want to investigate this.

// Empty returns true if none of the fields in metadata are populated.
func (meta ProducerMetadata) Empty() bool {
return meta.Ranges == nil && meta.Err == nil && meta.TraceData == nil
Expand All @@ -311,10 +343,17 @@ type RowChannel struct {
// consumerStatus is an atomic that signals whether the RowChannel is only
// accepting draining metadata or is no longer accepting any rows via Push.
consumerStatus ConsumerStatus

// pendingBatch is a RowBatch that has been read from the channel but not
// yet read through Next() or NextBatch(). It is only used when a consumer
// calls Next() instead of NextBatch() when the RowChannel's producer is
// pushing RowBatches and the consumer needs to be fed one row at a time.
// TODO(asubiotto): Remove once rows are no longer sent one by one.
pendingBatch RowBatch
}

var _ RowReceiver = &RowChannel{}
var _ RowSource = &RowChannel{}
var _ RowBatchReceiver = &RowChannel{}
var _ RowBatchSource = &RowChannel{}

// InitWithBufSize initializes the RowChannel with a given buffer size.
func (rc *RowChannel) InitWithBufSize(types []sqlbase.ColumnType, chanBufSize int) {
Expand All @@ -330,15 +369,29 @@ func (rc *RowChannel) Init(types []sqlbase.ColumnType) {

// Push is part of the RowReceiver interface.
func (rc *RowChannel) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus {
return rc.pushMsg(RowChannelMsg{Row: row, Meta: meta})
}

// Push is part of the RowBatchReceiver interface.
func (rc *RowChannel) PushBatch(batch RowBatch) ConsumerStatus {
return rc.pushMsg(RowChannelMsg{Batch: batch})
}

func (rc *RowChannel) pushMsg(msg RowChannelMsg) ConsumerStatus {
consumerStatus := ConsumerStatus(
atomic.LoadUint32((*uint32)(&rc.consumerStatus)))
switch consumerStatus {
case NeedMoreRows:
rc.dataChan <- RowChannelMsg{Row: row, Meta: meta}
rc.dataChan <- msg
case DrainRequested:
// If we're draining, only forward metadata.
if !meta.Empty() {
rc.dataChan <- RowChannelMsg{Meta: meta}
if msg.Row != nil {
msg.Row = nil
} else if msg.Batch != nil {
msg.Batch = nil
}
if !msg.Meta.Empty() {
rc.dataChan <- msg
}
case ConsumerClosed:
// If the consumer is gone, swallow all the rows.
Expand All @@ -356,14 +409,46 @@ func (rc *RowChannel) Types() []sqlbase.ColumnType {
return rc.types
}

// Next is part of the RowSource interface.
// Next is part of the RowSource interface. This implementation of Next() is not
// threadsafe.
func (rc *RowChannel) Next() (sqlbase.EncDatumRow, ProducerMetadata) {
if len(rc.pendingBatch) != 0 {
result := rc.pendingBatch[0]
rc.pendingBatch = rc.pendingBatch[1:]
return result, ProducerMetadata{}
}
d := rc.nextMsg()
if d.Batch != nil {
rc.pendingBatch = d.Batch
return rc.Next()
}
return d.Row, d.Meta
}

// NextBatch is part of the RowBatchSource interface.
func (rc *RowChannel) NextBatch() (RowBatch, ProducerMetadata) {
if len(rc.pendingBatch) != 0 {
result := rc.pendingBatch
rc.pendingBatch = nil
return result, ProducerMetadata{}
}
d := rc.nextMsg()
if d.Batch != nil {
return d.Batch, d.Meta
}
if d.Row != nil {
return RowBatch{d.Row}, d.Meta
}
return nil, d.Meta
}

func (rc *RowChannel) nextMsg() RowChannelMsg {
d, ok := <-rc.C
if !ok {
// No more rows.
return nil, ProducerMetadata{}
return RowChannelMsg{Meta: ProducerMetadata{}}
}
return d.Row, d.Meta
return d
}

// ConsumerDone is part of the RowSource interface.
Expand Down
Loading