From 3a93872871b2842552606e086b0ce349e1444d80 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 6 Aug 2020 17:46:50 -0700 Subject: [PATCH] colexec: introduce batches with dynamic capacity This commit introduces `DynamicBatchSizeHelper` which is a utility struct that helps operators achieve dynamic batch size behavior. The contract is that those operators that want such behavior are required to call `ResetMaybeReallocate` method which might allocate a new batch (it uses an exponential capacity growth until `coldata.BatchSize()` and also supports a minimum capacity argument). Most notably, the helper is now used by `cFetcher` as well as several other operators that could be the "sources of the batch' origination" (inboxes, ordered synchronizers, columnarizers) and a few others. The operators that are expected to take long time in order to produce a single batch (like joiners and aggregators) don't exhibit the dynamic batch size behavior. The main operator that I'm not sure about whether we want to exhibit the dynamic batch size behavior is `routerOutputOp`. Release note: None --- pkg/col/coldata/batch.go | 3 + pkg/sql/colexec/columnarizer.go | 24 ++- pkg/sql/colexec/deselector.go | 22 +-- pkg/sql/colexec/dynamic_batch_size_helper.go | 60 +++++++ pkg/sql/colexec/ordered_synchronizer.eg.go | 162 ++++++++++--------- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 53 +++--- pkg/sql/colexec/routers.go | 5 +- pkg/sql/colexec/sort.go | 99 ++++++------ pkg/sql/colexec/sorttopk.go | 56 ++++--- pkg/sql/colexec/utils.go | 6 + pkg/sql/colfetcher/cfetcher.go | 34 ++-- pkg/sql/colflow/colrpc/inbox.go | 13 +- pkg/sql/colmem/allocator.go | 8 +- 13 files changed, 343 insertions(+), 202 deletions(-) create mode 100644 pkg/sql/colexec/dynamic_batch_size_helper.go diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index acbefd427fbc..5887f0edbf4e 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -78,6 +78,9 @@ const defaultBatchSize = 1024 var batchSize int64 = defaultBatchSize // BatchSize is the maximum number of tuples that fit in a column batch. +// TODO(yuzefovich): we are treating this method almost as if it were a +// constant while it performs an atomic operation. Think through whether it has +// a noticeable performance hit. func BatchSize() int { return int(atomic.LoadInt64(&batchSize)) } diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 17cae80a27b0..f63c7a2bae17 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -38,6 +38,7 @@ type Columnarizer struct { buffered sqlbase.EncDatumRows batch coldata.Batch + batchHelper *DynamicBatchSizeHelper accumulatedMeta []execinfrapb.ProducerMetadata ctx context.Context typs []*types.T @@ -72,6 +73,7 @@ func NewColumnarizer( return nil, err } c.typs = c.OutputTypes() + c.batchHelper = NewDynamicBatchSizeHelper(allocator, c.typs) return c, nil } @@ -81,11 +83,6 @@ func (c *Columnarizer) Init() { // internal objects several times if Init method is called more than once, so // we have this check in place. if c.initStatus == OperatorNotInitialized { - c.batch = c.allocator.NewMemBatchWithMaxCapacity(c.typs) - c.buffered = make(sqlbase.EncDatumRows, coldata.BatchSize()) - for i := range c.buffered { - c.buffered[i] = make(sqlbase.EncDatumRow, len(c.typs)) - } c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) c.input.Start(c.ctx) c.initStatus = OperatorInitialized @@ -94,11 +91,24 @@ func (c *Columnarizer) Init() { // Next is part of the Operator interface. func (c *Columnarizer) Next(context.Context) coldata.Batch { - c.batch.ResetInternalBatch() + var reallocated bool + c.batch, reallocated = c.batchHelper.ResetMaybeReallocate(c.batch, 1 /* minCapacity */) + if reallocated { + oldRows := c.buffered + c.buffered = make(sqlbase.EncDatumRows, c.batch.Capacity()) + for i := range c.buffered { + if len(oldRows) > 0 { + c.buffered[i] = oldRows[0] + oldRows = oldRows[1:] + } else { + c.buffered[i] = make(sqlbase.EncDatumRow, len(c.typs)) + } + } + } // Buffer up n rows. nRows := 0 columnTypes := c.OutputTypes() - for ; nRows < coldata.BatchSize(); nRows++ { + for ; nRows < c.batch.Capacity(); nRows++ { row, meta := c.input.Next() if meta != nil { nRows-- diff --git a/pkg/sql/colexec/deselector.go b/pkg/sql/colexec/deselector.go index bceecbb6d74d..ce6a607767dc 100644 --- a/pkg/sql/colexec/deselector.go +++ b/pkg/sql/colexec/deselector.go @@ -29,7 +29,8 @@ type deselectorOp struct { allocator *colmem.Allocator inputTypes []*types.T - output coldata.Batch + output coldata.Batch + batchHelper *DynamicBatchSizeHelper } var _ colexecbase.Operator = &deselectorOp{} @@ -43,6 +44,7 @@ func NewDeselectorOp( OneInputNode: NewOneInputNode(input), allocator: allocator, inputTypes: typs, + batchHelper: NewDynamicBatchSizeHelper(allocator, typs), } } @@ -51,12 +53,18 @@ func (p *deselectorOp) Init() { } func (p *deselectorOp) Next(ctx context.Context) coldata.Batch { - p.resetOutput() + // TODO(yuzefovich): this allocation is only needed in order to appease the + // tests of the external sorter with forced disk spilling (if we don't do + // this, an OOM error occurs during ResetMaybeReallocate call below at + // which point we have already received a batch from the input and it'll + // get lost because deselectorOp doesn't support fall-over to the + // disk-backed infrastructure). + p.output, _ = p.batchHelper.ResetMaybeReallocate(p.output, 1 /* minCapacity */) batch := p.input.Next(ctx) if batch.Selection() == nil { return batch } - + p.output, _ = p.batchHelper.ResetMaybeReallocate(p.output, batch.Length()) sel := batch.Selection() p.allocator.PerformOperation(p.output.ColVecs(), func() { for i := range p.inputTypes { @@ -76,11 +84,3 @@ func (p *deselectorOp) Next(ctx context.Context) coldata.Batch { p.output.SetLength(batch.Length()) return p.output } - -func (p *deselectorOp) resetOutput() { - if p.output == nil { - p.output = p.allocator.NewMemBatchWithMaxCapacity(p.inputTypes) - } else { - p.output.ResetInternalBatch() - } -} diff --git a/pkg/sql/colexec/dynamic_batch_size_helper.go b/pkg/sql/colexec/dynamic_batch_size_helper.go new file mode 100644 index 000000000000..7d57675f720e --- /dev/null +++ b/pkg/sql/colexec/dynamic_batch_size_helper.go @@ -0,0 +1,60 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +// NewDynamicBatchSizeHelper returns a new DynamicBatchSizeHelper. +func NewDynamicBatchSizeHelper( + allocator *colmem.Allocator, typs []*types.T, +) *DynamicBatchSizeHelper { + return &DynamicBatchSizeHelper{ + allocator: allocator, + typs: typs, + } +} + +// DynamicBatchSizeHelper is a utility struct that helps operators work with +// batches of dynamic size. +type DynamicBatchSizeHelper struct { + allocator *colmem.Allocator + typs []*types.T +} + +// ResetMaybeReallocate returns a batch that is guaranteed to be in a "reset" +// state and to have the capacity of at least minCapacity. The method will +// grow the allocated capacity of the batch exponentially, until the batch +// reaches coldata.BatchSize(). +func (d *DynamicBatchSizeHelper) ResetMaybeReallocate( + batch coldata.Batch, minCapacity int, +) (_ coldata.Batch, reallocated bool) { + reallocated = true + if batch == nil { + batch = d.allocator.NewMemBatchWithFixedCapacity(d.typs, minCapacity) + } else if batch.Capacity() < coldata.BatchSize() { + newCapacity := batch.Capacity() * 2 + if newCapacity < minCapacity { + newCapacity = minCapacity + } + if newCapacity > coldata.BatchSize() { + newCapacity = coldata.BatchSize() + } + batch = d.allocator.NewMemBatchWithFixedCapacity(d.typs, newCapacity) + } else { + reallocated = false + batch.ResetInternalBatch() + } + return batch, reallocated +} diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 9f4ff1b63cdd..0cdd8a7f0403 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -48,6 +48,7 @@ type OrderedSynchronizer struct { // comparators stores one comparator per ordering column. comparators []vecComparator output coldata.Batch + batchHelper *DynamicBatchSizeHelper outNulls []*coldata.Nulls // In order to reduce the number of interface conversions, we will get access // to the underlying slice for the output vectors and will use them directly. @@ -104,6 +105,7 @@ func NewOrderedSynchronizer( ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + batchHelper: NewDynamicBatchSizeHelper(allocator, typs), }, nil } @@ -121,10 +123,10 @@ func (o *OrderedSynchronizer) Next(ctx context.Context) coldata.Batch { } heap.Init(o) } - o.output.ResetInternalBatch() + o.resetOutput() outputIdx := 0 o.allocator.PerformOperation(o.output.ColVecs(), func() { - for outputIdx < coldata.BatchSize() { + for outputIdx < o.output.Capacity() { if o.Len() == 0 { // All inputs exhausted. break @@ -253,81 +255,97 @@ func (o *OrderedSynchronizer) Next(ctx context.Context) coldata.Batch { return o.output } +func (o *OrderedSynchronizer) resetOutput() { + var reallocated bool + o.output, reallocated = o.batchHelper.ResetMaybeReallocate(o.output, 1 /* minCapacity */) + if reallocated { + o.outBoolCols = o.outBoolCols[:0] + o.outBytesCols = o.outBytesCols[:0] + o.outDecimalCols = o.outDecimalCols[:0] + o.outInt16Cols = o.outInt16Cols[:0] + o.outInt32Cols = o.outInt32Cols[:0] + o.outInt64Cols = o.outInt64Cols[:0] + o.outFloat64Cols = o.outFloat64Cols[:0] + o.outTimestampCols = o.outTimestampCols[:0] + o.outIntervalCols = o.outIntervalCols[:0] + o.outDatumCols = o.outDatumCols[:0] + for i, outVec := range o.output.ColVecs() { + o.outNulls[i] = outVec.Nulls() + switch typeconv.TypeFamilyToCanonicalTypeFamily(o.typs[i].Family()) { + case types.BoolFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outBoolCols) + o.outBoolCols = append(o.outBoolCols, outVec.Bool()) + } + case types.BytesFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outBytesCols) + o.outBytesCols = append(o.outBytesCols, outVec.Bytes()) + } + case types.DecimalFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outDecimalCols) + o.outDecimalCols = append(o.outDecimalCols, outVec.Decimal()) + } + case types.IntFamily: + switch o.typs[i].Width() { + case 16: + o.outColsMap[i] = len(o.outInt16Cols) + o.outInt16Cols = append(o.outInt16Cols, outVec.Int16()) + case 32: + o.outColsMap[i] = len(o.outInt32Cols) + o.outInt32Cols = append(o.outInt32Cols, outVec.Int32()) + case -1: + default: + o.outColsMap[i] = len(o.outInt64Cols) + o.outInt64Cols = append(o.outInt64Cols, outVec.Int64()) + } + case types.FloatFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outFloat64Cols) + o.outFloat64Cols = append(o.outFloat64Cols, outVec.Float64()) + } + case types.TimestampTZFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outTimestampCols) + o.outTimestampCols = append(o.outTimestampCols, outVec.Timestamp()) + } + case types.IntervalFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outIntervalCols) + o.outIntervalCols = append(o.outIntervalCols, outVec.Interval()) + } + case typeconv.DatumVecCanonicalTypeFamily: + switch o.typs[i].Width() { + case -1: + default: + o.outColsMap[i] = len(o.outDatumCols) + o.outDatumCols = append(o.outDatumCols, outVec.Datum()) + } + default: + colexecerror.InternalError(fmt.Sprintf("unhandled type %s", o.typs[i])) + } + } + } +} + // Init is part of the Operator interface. func (o *OrderedSynchronizer) Init() { o.inputIndices = make([]int, len(o.inputs)) - o.output = o.allocator.NewMemBatchWithMaxCapacity(o.typs) o.outNulls = make([]*coldata.Nulls, len(o.typs)) o.outColsMap = make([]int, len(o.typs)) - for i, outVec := range o.output.ColVecs() { - o.outNulls[i] = outVec.Nulls() - switch typeconv.TypeFamilyToCanonicalTypeFamily(o.typs[i].Family()) { - case types.BoolFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outBoolCols) - o.outBoolCols = append(o.outBoolCols, outVec.Bool()) - } - case types.BytesFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outBytesCols) - o.outBytesCols = append(o.outBytesCols, outVec.Bytes()) - } - case types.DecimalFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outDecimalCols) - o.outDecimalCols = append(o.outDecimalCols, outVec.Decimal()) - } - case types.IntFamily: - switch o.typs[i].Width() { - case 16: - o.outColsMap[i] = len(o.outInt16Cols) - o.outInt16Cols = append(o.outInt16Cols, outVec.Int16()) - case 32: - o.outColsMap[i] = len(o.outInt32Cols) - o.outInt32Cols = append(o.outInt32Cols, outVec.Int32()) - case -1: - default: - o.outColsMap[i] = len(o.outInt64Cols) - o.outInt64Cols = append(o.outInt64Cols, outVec.Int64()) - } - case types.FloatFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outFloat64Cols) - o.outFloat64Cols = append(o.outFloat64Cols, outVec.Float64()) - } - case types.TimestampTZFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outTimestampCols) - o.outTimestampCols = append(o.outTimestampCols, outVec.Timestamp()) - } - case types.IntervalFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outIntervalCols) - o.outIntervalCols = append(o.outIntervalCols, outVec.Interval()) - } - case typeconv.DatumVecCanonicalTypeFamily: - switch o.typs[i].Width() { - case -1: - default: - o.outColsMap[i] = len(o.outDatumCols) - o.outDatumCols = append(o.outDatumCols, outVec.Datum()) - } - default: - colexecerror.InternalError(fmt.Sprintf("unhandled type %s", o.typs[i])) - } - } for i := range o.inputs { o.inputs[i].Op.Init() } diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 6907e71697ae..23f6b847871f 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -73,6 +73,7 @@ type OrderedSynchronizer struct { // comparators stores one comparator per ordering column. comparators []vecComparator output coldata.Batch + batchHelper *DynamicBatchSizeHelper outNulls []*coldata.Nulls // In order to reduce the number of interface conversions, we will get access // to the underlying slice for the output vectors and will use them directly. @@ -124,6 +125,7 @@ func NewOrderedSynchronizer( ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + batchHelper: NewDynamicBatchSizeHelper(allocator, typs), }, nil } @@ -141,10 +143,10 @@ func (o *OrderedSynchronizer) Next(ctx context.Context) coldata.Batch { } heap.Init(o) } - o.output.ResetInternalBatch() + o.resetOutput() outputIdx := 0 o.allocator.PerformOperation(o.output.ColVecs(), func() { - for outputIdx < coldata.BatchSize() { + for outputIdx < o.output.Capacity() { if o.Len() == 0 { // All inputs exhausted. break @@ -203,29 +205,40 @@ func (o *OrderedSynchronizer) Next(ctx context.Context) coldata.Batch { return o.output } +func (o *OrderedSynchronizer) resetOutput() { + var reallocated bool + o.output, reallocated = o.batchHelper.ResetMaybeReallocate(o.output, 1 /* minCapacity */) + if reallocated { + // {{range .}} + // {{range .WidthOverloads}} + o.out_TYPECols = o.out_TYPECols[:0] + // {{end}} + // {{end}} + for i, outVec := range o.output.ColVecs() { + o.outNulls[i] = outVec.Nulls() + switch typeconv.TypeFamilyToCanonicalTypeFamily(o.typs[i].Family()) { + // {{range .}} + case _CANONICAL_TYPE_FAMILY: + switch o.typs[i].Width() { + // {{range .WidthOverloads}} + case _TYPE_WIDTH: + o.outColsMap[i] = len(o.out_TYPECols) + o.out_TYPECols = append(o.out_TYPECols, outVec._TYPE()) + // {{end}} + } + // {{end}} + default: + colexecerror.InternalError(fmt.Sprintf("unhandled type %s", o.typs[i])) + } + } + } +} + // Init is part of the Operator interface. func (o *OrderedSynchronizer) Init() { o.inputIndices = make([]int, len(o.inputs)) - o.output = o.allocator.NewMemBatchWithMaxCapacity(o.typs) o.outNulls = make([]*coldata.Nulls, len(o.typs)) o.outColsMap = make([]int, len(o.typs)) - for i, outVec := range o.output.ColVecs() { - o.outNulls[i] = outVec.Nulls() - switch typeconv.TypeFamilyToCanonicalTypeFamily(o.typs[i].Family()) { - // {{range .}} - case _CANONICAL_TYPE_FAMILY: - switch o.typs[i].Width() { - // {{range .WidthOverloads}} - case _TYPE_WIDTH: - o.outColsMap[i] = len(o.out_TYPECols) - o.out_TYPECols = append(o.out_TYPECols, outVec._TYPE()) - // {{end}} - } - // {{end}} - default: - colexecerror.InternalError(fmt.Sprintf("unhandled type %s", o.typs[i])) - } - } for i := range o.inputs { o.inputs[i].Op.Init() } diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 561e18c307b9..3f6bd724979d 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -412,9 +412,10 @@ func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch, sele for toAppend := len(selection); toAppend > 0; { if o.mu.pendingBatch == nil { + // TODO(yuzefovich): consider whether this should be a dynamic batch. o.mu.pendingBatch = o.mu.unlimitedAllocator.NewMemBatchWithMaxCapacity(o.types) } - available := coldata.BatchSize() - o.mu.pendingBatch.Length() + available := o.mu.pendingBatch.Capacity() - o.mu.pendingBatch.Length() numAppended := toAppend if toAppend > available { numAppended = available @@ -435,7 +436,7 @@ func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch, sele }) newLength := o.mu.pendingBatch.Length() + numAppended o.mu.pendingBatch.SetLength(newLength) - if o.testingKnobs.alwaysFlush || newLength >= coldata.BatchSize() { + if o.testingKnobs.alwaysFlush || newLength >= o.mu.pendingBatch.Capacity() { // The capacity in o.mu.pendingBatch has been filled. err := o.mu.data.enqueue(ctx, o.mu.pendingBatch) if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index 0040946bd143..76aefd43fe14 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -65,6 +65,7 @@ func newSorter( partitioners: partitioners, orderingCols: orderingCols, state: sortSpooling, + batchHelper: NewDynamicBatchSizeHelper(allocator, inputTypes), }, nil } @@ -211,7 +212,8 @@ type sortOp struct { // state is the current state of the sort. state sortState - output coldata.Batch + output coldata.Batch + batchHelper *DynamicBatchSizeHelper exported int } @@ -249,52 +251,59 @@ const ( // sortEmitting is the third state of the operator, indicating that each call // to Next will return another batch of the sorted data. sortEmitting + // sortDone is the final state of the operator, where it always returns a + // zero batch. + sortDone ) func (p *sortOp) Next(ctx context.Context) coldata.Batch { - switch p.state { - case sortSpooling: - p.input.spool(ctx) - p.state = sortSorting - fallthrough - case sortSorting: - p.sort(ctx) - p.state = sortEmitting - fallthrough - case sortEmitting: - newEmitted := p.emitted + coldata.BatchSize() - if newEmitted > p.input.getNumTuples() { - newEmitted = p.input.getNumTuples() - } - if newEmitted == p.emitted { - return coldata.ZeroBatch - } - - p.resetOutput() - for j := 0; j < len(p.inputTypes); j++ { - // At this point, we have already fully sorted the input. It is ok to do - // this Copy outside of the allocator - the work has been done, but - // theoretically it is possible to hit the limit here (mainly with - // variable-sized types like Bytes). Nonetheless, for performance reasons - // it would be sad to fallback to disk at this point. - p.output.ColVec(j).Copy( - coldata.CopySliceArgs{ - SliceArgs: coldata.SliceArgs{ - Sel: p.order, - Src: p.input.getValues(j), - SrcStartIdx: p.emitted, - SrcEndIdx: newEmitted, + for { + switch p.state { + case sortSpooling: + p.input.spool(ctx) + p.state = sortSorting + case sortSorting: + p.sort(ctx) + p.state = sortEmitting + case sortEmitting: + toEmit := p.input.getNumTuples() - p.emitted + if toEmit == 0 { + p.state = sortDone + continue + } + if toEmit > coldata.BatchSize() { + toEmit = coldata.BatchSize() + } + p.output, _ = p.batchHelper.ResetMaybeReallocate(p.output, toEmit) + newEmitted := p.emitted + toEmit + for j := 0; j < len(p.inputTypes); j++ { + // At this point, we have already fully sorted the input. It is ok to do + // this Copy outside of the allocator - the work has been done, but + // theoretically it is possible to hit the limit here (mainly with + // variable-sized types like Bytes). Nonetheless, for performance reasons + // it would be sad to fallback to disk at this point. + p.output.ColVec(j).Copy( + coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + Sel: p.order, + Src: p.input.getValues(j), + SrcStartIdx: p.emitted, + SrcEndIdx: newEmitted, + }, }, - }, - ) + ) + } + p.output.SetLength(toEmit) + p.emitted = newEmitted + return p.output + case sortDone: + return coldata.ZeroBatch + default: + colexecerror.InternalError(fmt.Sprintf("invalid sort state %v", p.state)) + // This code is unreachable, but the compiler cannot infer that. + return nil } - p.output.SetLength(newEmitted - p.emitted) - p.emitted = newEmitted - return p.output } - colexecerror.InternalError(fmt.Sprintf("invalid sort state %v", p.state)) - // This code is unreachable, but the compiler cannot infer that. - return nil } // sort sorts the spooled tuples, so it must be called after spool() has been @@ -394,14 +403,6 @@ func (p *sortOp) sort(ctx context.Context) { } } -func (p *sortOp) resetOutput() { - if p.output == nil { - p.output = p.allocator.NewMemBatchWithMaxCapacity(p.inputTypes) - } else { - p.output.ResetInternalBatch() - } -} - func (p *sortOp) reset(ctx context.Context) { if r, ok := p.input.(resetter); ok { r.reset(ctx) diff --git a/pkg/sql/colexec/sorttopk.go b/pkg/sql/colexec/sorttopk.go index 2962dcb84d18..331978a108ca 100644 --- a/pkg/sql/colexec/sorttopk.go +++ b/pkg/sql/colexec/sorttopk.go @@ -44,6 +44,7 @@ func NewTopKSorter( inputTypes: inputTypes, orderingCols: orderingCols, k: k, + batchHelper: NewDynamicBatchSizeHelper(allocator, inputTypes), } } @@ -53,12 +54,15 @@ var _ colexecbase.BufferingInMemoryOperator = &topKSorter{} type topKSortState int const ( - // sortSpooling is the initial state of the operator, where it spools its - // input. + // topKSortSpooling is the initial state of the operator, where it spools + // its input. topKSortSpooling topKSortState = iota - // sortEmitting is the second state of the operator, indicating that each call - // to Next will return another batch of the sorted data. + // topKSortSpooling is the second state of the operator, indicating that + // each call to Next will return another batch of the sorted data. topKSortEmitting + // topKSortDone is the final state of the operator, where it always returns + // a zero batch. + topKSortDone ) type topKSorter struct { @@ -85,8 +89,9 @@ type topKSorter struct { // sel is a selection vector which specifies an ordering on topK. sel []int // emitted is the count of rows which have been emitted so far. - emitted int - output coldata.Batch + emitted int + output coldata.Batch + batchHelper *DynamicBatchSizeHelper exportedFromTopK int exportedFromBatch int @@ -107,17 +112,26 @@ func (t *topKSorter) Init() { } func (t *topKSorter) Next(ctx context.Context) coldata.Batch { - switch t.state { - case topKSortSpooling: - t.spool(ctx) - t.state = topKSortEmitting - fallthrough - case topKSortEmitting: - return t.emit() + for { + switch t.state { + case topKSortSpooling: + t.spool(ctx) + t.state = topKSortEmitting + case topKSortEmitting: + output := t.emit() + if output.Length() == 0 { + t.state = topKSortDone + continue + } + return output + case topKSortDone: + return coldata.ZeroBatch + default: + colexecerror.InternalError(fmt.Sprintf("invalid sort state %v", t.state)) + // This code is unreachable, but the compiler cannot infer that. + return nil + } } - colexecerror.InternalError(fmt.Sprintf("invalid sort state %v", t.state)) - // This code is unreachable, but the compiler cannot infer that. - return nil } // spool reads in the entire input, always storing the top K rows it has seen so @@ -195,16 +209,7 @@ func (t *topKSorter) spool(ctx context.Context) { } } -func (t *topKSorter) resetOutput() { - if t.output == nil { - t.output = t.allocator.NewMemBatchWithMaxCapacity(t.inputTypes) - } else { - t.output.ResetInternalBatch() - } -} - func (t *topKSorter) emit() coldata.Batch { - t.resetOutput() toEmit := t.topK.Length() - t.emitted if toEmit == 0 { // We're done. @@ -213,6 +218,7 @@ func (t *topKSorter) emit() coldata.Batch { if toEmit > coldata.BatchSize() { toEmit = coldata.BatchSize() } + t.output, _ = t.batchHelper.ResetMaybeReallocate(t.output, toEmit) for i := range t.inputTypes { vec := t.output.ColVec(i) // At this point, we have already fully sorted the input. It is ok to do diff --git a/pkg/sql/colexec/utils.go b/pkg/sql/colexec/utils.go index 0b136efe5a5d..21405e1db27d 100644 --- a/pkg/sql/colexec/utils.go +++ b/pkg/sql/colexec/utils.go @@ -118,6 +118,12 @@ func (p *partitionerToOperator) Next(ctx context.Context) coldata.Batch { return p.batch } +// newAppendOnlyBufferedBatch returns a new appendOnlyBufferedBatch that has +// initial zero capacity and could grow arbitrarily large with append() method. +// It is intended to be used by the operators that need to buffer unknown +// number of tuples. +// TODO(yuzefovich): consider whether it is beneficial to start out with +// non-zero capacity. func newAppendOnlyBufferedBatch( allocator *colmem.Allocator, typs []*types.T, ) *appendOnlyBufferedBatch { diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index faa3e2e6bcdd..1eac383c0b4f 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -237,7 +237,8 @@ type cFetcher struct { prettyValueBuf *bytes.Buffer // batch is the output batch the fetcher writes to. - batch coldata.Batch + batch coldata.Batch + batchHelper *colexec.DynamicBatchSizeHelper // colvecs is a slice of the ColVecs within batch, pulled out to avoid // having to call batch.Vec too often in the tight loop. @@ -258,6 +259,23 @@ type cFetcher struct { } } +const cFetcherBatchMinCapacity = 1 + +func (rf *cFetcher) resetBatch(timestampOutputIdx int) { + var reallocated bool + rf.machine.batch, reallocated = rf.machine.batchHelper.ResetMaybeReallocate( + rf.machine.batch, cFetcherBatchMinCapacity, + ) + if reallocated { + rf.machine.colvecs = rf.machine.batch.ColVecs() + // If the fetcher is requested to produce a timestamp column, pull out the + // column as a decimal and save it. + if timestampOutputIdx != noTimestampColumn { + rf.machine.timestampCol = rf.machine.colvecs[timestampOutputIdx].Decimal() + } + } +} + // Init sets up a Fetcher for a given table and index. If we are using a // non-primary index, tables.ValNeededForCol can only refer to columns in the // index. @@ -328,14 +346,8 @@ func (rf *cFetcher) Init( } sort.Ints(table.neededColsList) - rf.machine.batch = allocator.NewMemBatchWithMaxCapacity(typs) - rf.machine.colvecs = rf.machine.batch.ColVecs() - // If the fetcher is requested to produce a timestamp column, pull out the - // column as a decimal and save it. - if table.timestampOutputIdx != noTimestampColumn { - rf.machine.timestampCol = rf.machine.colvecs[table.timestampOutputIdx].Decimal() - } - + rf.machine.batchHelper = colexec.NewDynamicBatchSizeHelper(allocator, typs) + rf.resetBatch(table.timestampOutputIdx) table.knownPrefixLength = len(sqlbase.MakeIndexKeyPrefix(codec, table.desc, table.index.ID)) var indexColumnIDs []descpb.ColumnID @@ -672,7 +684,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) { rf.machine.state[0] = stateDecodeFirstKVOfRow case stateResetBatch: - rf.machine.batch.ResetInternalBatch() + rf.resetBatch(rf.table.timestampOutputIdx) rf.shiftState() case stateDecodeFirstKVOfRow: // Reset MVCC metadata for the table, since this is the first KV of a row. @@ -904,7 +916,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) { } rf.machine.rowIdx++ rf.shiftState() - if rf.machine.rowIdx >= coldata.BatchSize() { + if rf.machine.rowIdx >= rf.machine.batch.Capacity() { rf.pushState(stateResetBatch) rf.machine.batch.SetLength(rf.machine.rowIdx) rf.machine.rowIdx = 0 diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 7e8686644eb2..859bdc3017d1 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -18,6 +18,7 @@ import ( "github.com/apache/arrow/go/arrow/array" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/colserde" + "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -99,8 +100,9 @@ type Inbox struct { flowCtx context.Context scratch struct { - data []*array.Data - b coldata.Batch + data []*array.Data + b coldata.Batch + batchHelper *colexec.DynamicBatchSizeHelper } } @@ -130,7 +132,7 @@ func NewInbox( flowCtx: ctx, } i.scratch.data = make([]*array.Data, len(typs)) - i.scratch.b = allocator.NewMemBatchWithMaxCapacity(typs) + i.scratch.batchHelper = colexec.NewDynamicBatchSizeHelper(allocator, typs) return i, nil } @@ -300,6 +302,11 @@ func (i *Inbox) Next(ctx context.Context) coldata.Batch { if err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes); err != nil { colexecerror.InternalError(err) } + i.scratch.b, _ = i.scratch.batchHelper.ResetMaybeReallocate( + // We don't support type-less schema, so len(i.scratch.data) is + // always at least 1. + i.scratch.b, i.scratch.data[0].Len(), + ) if err := i.converter.ArrowToBatch(i.scratch.data, i.scratch.b); err != nil { colexecerror.InternalError(err) } diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 93c813e1ecdc..2f2905d3f767 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -91,12 +91,16 @@ func NewAllocator( // NewMemBatchWithMaxCapacity allocates a new in-memory coldata.Batch of // coldata.BatchSize() capacity. +// Note: consider whether you want dynamic batch size behavior (in which case +// you should be using colexec.DynamicBatchSizeHelper). func (a *Allocator) NewMemBatchWithMaxCapacity(typs []*types.T) coldata.Batch { return a.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize()) } -// NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with -// the given capacity. +// NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with the +// given vector capacity. +// Note: consider whether you want dynamic batch size behavior (in which case +// you should be using colexec.DynamicBatchSizeHelper). func (a *Allocator) NewMemBatchWithFixedCapacity(typs []*types.T, capacity int) coldata.Batch { estimatedMemoryUsage := selVectorSize(capacity) + int64(EstimateBatchSizeBytes(typs, capacity)) if err := a.acc.Grow(a.ctx, estimatedMemoryUsage); err != nil {