Skip to content

Commit

Permalink
colexec: introduce batches with dynamic capacity
Browse files Browse the repository at this point in the history
This commit introduces `DynamicBatchSizeHelper` which is a utility
struct that helps operators achieve dynamic batch size behavior. The
contract is that those operators that want such behavior are required to
call `ResetMaybeReallocate` method which might allocate a new batch (it
uses an exponential capacity growth until `coldata.BatchSize()` and also
supports a minimum capacity argument). Most notably, the helper is now
used by `cFetcher` as well as several other operators that could be the
"sources of the batch' origination" (inboxes, ordered synchronizers,
columnarizers) and a few others. The operators that are expected to take
long time in order to produce a single batch (like joiners and
aggregators) don't exhibit the dynamic batch size behavior.

The main operator that I'm not sure about whether we want to exhibit the
dynamic batch size behavior is `routerOutputOp`.

Release note: None
  • Loading branch information
yuzefovich committed Aug 7, 2020
1 parent b121cb6 commit 3a93872
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 202 deletions.
3 changes: 3 additions & 0 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ const defaultBatchSize = 1024
var batchSize int64 = defaultBatchSize

// BatchSize is the maximum number of tuples that fit in a column batch.
// TODO(yuzefovich): we are treating this method almost as if it were a
// constant while it performs an atomic operation. Think through whether it has
// a noticeable performance hit.
func BatchSize() int {
return int(atomic.LoadInt64(&batchSize))
}
Expand Down
24 changes: 17 additions & 7 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Columnarizer struct {

buffered sqlbase.EncDatumRows
batch coldata.Batch
batchHelper *DynamicBatchSizeHelper
accumulatedMeta []execinfrapb.ProducerMetadata
ctx context.Context
typs []*types.T
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewColumnarizer(
return nil, err
}
c.typs = c.OutputTypes()
c.batchHelper = NewDynamicBatchSizeHelper(allocator, c.typs)
return c, nil
}

Expand All @@ -81,11 +83,6 @@ func (c *Columnarizer) Init() {
// internal objects several times if Init method is called more than once, so
// we have this check in place.
if c.initStatus == OperatorNotInitialized {
c.batch = c.allocator.NewMemBatchWithMaxCapacity(c.typs)
c.buffered = make(sqlbase.EncDatumRows, coldata.BatchSize())
for i := range c.buffered {
c.buffered[i] = make(sqlbase.EncDatumRow, len(c.typs))
}
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
c.input.Start(c.ctx)
c.initStatus = OperatorInitialized
Expand All @@ -94,11 +91,24 @@ func (c *Columnarizer) Init() {

// Next is part of the Operator interface.
func (c *Columnarizer) Next(context.Context) coldata.Batch {
c.batch.ResetInternalBatch()
var reallocated bool
c.batch, reallocated = c.batchHelper.ResetMaybeReallocate(c.batch, 1 /* minCapacity */)
if reallocated {
oldRows := c.buffered
c.buffered = make(sqlbase.EncDatumRows, c.batch.Capacity())
for i := range c.buffered {
if len(oldRows) > 0 {
c.buffered[i] = oldRows[0]
oldRows = oldRows[1:]
} else {
c.buffered[i] = make(sqlbase.EncDatumRow, len(c.typs))
}
}
}
// Buffer up n rows.
nRows := 0
columnTypes := c.OutputTypes()
for ; nRows < coldata.BatchSize(); nRows++ {
for ; nRows < c.batch.Capacity(); nRows++ {
row, meta := c.input.Next()
if meta != nil {
nRows--
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/colexec/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type deselectorOp struct {
allocator *colmem.Allocator
inputTypes []*types.T

output coldata.Batch
output coldata.Batch
batchHelper *DynamicBatchSizeHelper
}

var _ colexecbase.Operator = &deselectorOp{}
Expand All @@ -43,6 +44,7 @@ func NewDeselectorOp(
OneInputNode: NewOneInputNode(input),
allocator: allocator,
inputTypes: typs,
batchHelper: NewDynamicBatchSizeHelper(allocator, typs),
}
}

Expand All @@ -51,12 +53,18 @@ func (p *deselectorOp) Init() {
}

func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
p.resetOutput()
// TODO(yuzefovich): this allocation is only needed in order to appease the
// tests of the external sorter with forced disk spilling (if we don't do
// this, an OOM error occurs during ResetMaybeReallocate call below at
// which point we have already received a batch from the input and it'll
// get lost because deselectorOp doesn't support fall-over to the
// disk-backed infrastructure).
p.output, _ = p.batchHelper.ResetMaybeReallocate(p.output, 1 /* minCapacity */)
batch := p.input.Next(ctx)
if batch.Selection() == nil {
return batch
}

p.output, _ = p.batchHelper.ResetMaybeReallocate(p.output, batch.Length())
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
for i := range p.inputTypes {
Expand All @@ -76,11 +84,3 @@ func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
p.output.SetLength(batch.Length())
return p.output
}

func (p *deselectorOp) resetOutput() {
if p.output == nil {
p.output = p.allocator.NewMemBatchWithMaxCapacity(p.inputTypes)
} else {
p.output.ResetInternalBatch()
}
}
60 changes: 60 additions & 0 deletions pkg/sql/colexec/dynamic_batch_size_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// NewDynamicBatchSizeHelper returns a new DynamicBatchSizeHelper.
func NewDynamicBatchSizeHelper(
allocator *colmem.Allocator, typs []*types.T,
) *DynamicBatchSizeHelper {
return &DynamicBatchSizeHelper{
allocator: allocator,
typs: typs,
}
}

// DynamicBatchSizeHelper is a utility struct that helps operators work with
// batches of dynamic size.
type DynamicBatchSizeHelper struct {
allocator *colmem.Allocator
typs []*types.T
}

// ResetMaybeReallocate returns a batch that is guaranteed to be in a "reset"
// state and to have the capacity of at least minCapacity. The method will
// grow the allocated capacity of the batch exponentially, until the batch
// reaches coldata.BatchSize().
func (d *DynamicBatchSizeHelper) ResetMaybeReallocate(
batch coldata.Batch, minCapacity int,
) (_ coldata.Batch, reallocated bool) {
reallocated = true
if batch == nil {
batch = d.allocator.NewMemBatchWithFixedCapacity(d.typs, minCapacity)
} else if batch.Capacity() < coldata.BatchSize() {
newCapacity := batch.Capacity() * 2
if newCapacity < minCapacity {
newCapacity = minCapacity
}
if newCapacity > coldata.BatchSize() {
newCapacity = coldata.BatchSize()
}
batch = d.allocator.NewMemBatchWithFixedCapacity(d.typs, newCapacity)
} else {
reallocated = false
batch.ResetInternalBatch()
}
return batch, reallocated
}
162 changes: 90 additions & 72 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3a93872

Please sign in to comment.