Skip to content

Commit

Permalink
colexec: optimize buffering operators with Bytes vectors
Browse files Browse the repository at this point in the history
Previously, buffering operators would call `SetLength` on their buffered
batch when consuming the input. If there are any `Bytes` columns, we would
be updating the offsets. Additionally, our "input consumption pattern" is
appending to the end of the buffered vectors, and all this would result
in quadratic behavior of updating the offsets. This is actually not
necessary at all (since `Vec.Append` maintains the correct offsets), so
this commit introduces a utility wrapper around `coldata.Batch` that
should be used by buffering operators.

This commit also removes some "column schema compression" business from
the constructor of `hashAggregator` since it makes to sense.

Release note: None
  • Loading branch information
yuzefovich committed Apr 4, 2020
1 parent 072da6b commit 21d1e9b
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 181 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestAggregatorAllFunctions(t *testing.T) {
execinfrapb.AggregatorSpec_BOOL_OR,
},
aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}, {3}, {3}},
colTypes: []coltypes.T{coltypes.Int64, coltypes.Decimal, coltypes.Int64, coltypes.Bool, coltypes.Bool},
colTypes: []coltypes.T{coltypes.Int64, coltypes.Decimal, coltypes.Int64, coltypes.Bool},
input: tuples{
{nil, 1.1, 4, true},
{0, nil, nil, nil},
Expand Down
111 changes: 22 additions & 89 deletions pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -62,6 +61,7 @@ type hashAggregator struct {
aggTypes [][]coltypes.T
aggFuncs []execinfrapb.AggregatorSpec_Func

inputTypes []coltypes.T
outputTypes []coltypes.T

// aggFuncMap stores the mapping from hash code to a vector of aggregation
Expand All @@ -70,14 +70,6 @@ type hashAggregator struct {
// handle hash collisions.
aggFuncMap hashAggFuncMap

// valTypes stores the column types of grouping columns and aggregating
// columns.
valTypes []coltypes.T

// valCols stores the column indices of grouping columns and aggregating
// columns.
valCols []uint32

// batchTupleLimit limits the number of tuples the aggregator will buffer
// before it starts to perform aggregation.
batchTupleLimit int
Expand All @@ -86,9 +78,7 @@ type hashAggregator struct {
state hashAggregatorState

scratch struct {
coldata.Batch
// vecs stores "unwrapped" batch.
vecs []coldata.Vec
*appendOnlyBufferedBatch

// sels stores the intermediate selection vector for each hash code. It
// is maintained in such a way that when for a particular hashCode
Expand Down Expand Up @@ -130,9 +120,7 @@ type hashAggregator struct {
// keyMapping stores the key values for each aggregation group. It is a
// bufferedBatch because in the worst case where all keys in the grouping
// columns are distinct, we need to store every single key in the input.
keyMapping coldata.Batch
// keyMappingVecs stores "unwrapped" keyMapping batch.
keyMappingVecs []coldata.Vec
keyMapping *appendOnlyBufferedBatch

output struct {
coldata.Batch
Expand Down Expand Up @@ -184,46 +172,6 @@ func NewHashAggregator(
aggCols [][]uint32,
) (Operator, error) {
aggTyps := extractAggTypes(aggCols, colTypes)

// Only keep relevant output columns, those that are used as input to an
// aggregation.
nCols := uint32(len(colTypes))
var keepCol util.FastIntSet

// compressed represents a mapping between each original column and its index
// in the new compressed columns set. This is required since we are
// effectively compressing the original list of columns by only keeping the
// columns used as input to an aggregation function and for grouping.
compressed := make([]uint32, nCols)
for _, cols := range aggCols {
for _, col := range cols {
keepCol.Add(int(col))
}
}

for _, col := range groupCols {
keepCol.Add(int(col))
}

// Map the corresponding aggCols to the new output column indices.
nOutCols := uint32(0)
compressedInputCols := make([]uint32, 0)
compressedValTypes := make([]coltypes.T, 0)
keepCol.ForEach(func(i int) {
compressedInputCols = append(compressedInputCols, uint32(i))
compressedValTypes = append(compressedValTypes, colTypes[i])
compressed[i] = nOutCols
nOutCols++
})

mappedAggCols := make([][]uint32, len(aggCols))
for aggIdx := range aggCols {
mappedAggCols[aggIdx] = make([]uint32, len(aggCols[aggIdx]))
for i := range mappedAggCols[aggIdx] {
mappedAggCols[aggIdx][i] = compressed[aggCols[aggIdx][i]]
}
}

outputTypes, err := makeAggregateFuncsOutputTypes(aggTyps, aggFns)
if err != nil {
return nil, errors.AssertionFailedf(
Expand All @@ -243,19 +191,17 @@ func NewHashAggregator(
OneInputNode: NewOneInputNode(input),
allocator: allocator,

aggCols: mappedAggCols,
aggCols: aggCols,
aggFuncs: aggFns,
aggTypes: aggTyps,
aggFuncMap: make(hashAggFuncMap),

batchTupleLimit: tupleLimit,

state: hashAggregatorBuffering,
inputTypes: colTypes,
outputTypes: outputTypes,

valTypes: compressedValTypes,
valCols: compressedInputCols,

groupCols: groupCols,
groupTypes: groupTypes,
}, nil
Expand All @@ -270,16 +216,18 @@ func (op *hashAggregator) Init() {
// op.batchTupleLimit. This is because we perform checks after appending the
// input tuples to the scratch buffer.
maxBufferedTuples := op.batchTupleLimit + coldata.BatchSize()
op.scratch.Batch = op.allocator.NewMemBatchWithSize(op.valTypes, maxBufferedTuples)
op.scratch.vecs = op.scratch.ColVecs()
op.scratch.appendOnlyBufferedBatch = newAppendOnlyBufferedBatch(
op.allocator, op.inputTypes, maxBufferedTuples,
)
op.scratch.sels = make([][]int, maxBufferedTuples)
op.scratch.hashCodeToSelsSlot = make(map[uint64]int)
op.scratch.group = make([]bool, maxBufferedTuples)
// Eventually, op.keyMapping will contain as many tuples as there are
// groups in the input, but we don't know that number upfront, so we
// allocate it with some reasonably sized constant capacity.
op.keyMapping = op.allocator.NewMemBatchWithSize(op.groupTypes, op.batchTupleLimit)
op.keyMappingVecs = op.keyMapping.ColVecs()
op.keyMapping = newAppendOnlyBufferedBatch(
op.allocator, op.groupTypes, op.batchTupleLimit,
)

op.hashBuffer = make([]uint64, maxBufferedTuples)
}
Expand Down Expand Up @@ -371,32 +319,17 @@ func (op *hashAggregator) Next(ctx context.Context) coldata.Batch {
// reaches batchTupleLimit. It returns true when the hash aggregator has
// consumed all batches from input.
func (op *hashAggregator) bufferBatch(ctx context.Context) bool {
bufferedTupleCount := 0

for bufferedTupleCount < op.batchTupleLimit {
for op.scratch.Length() < op.batchTupleLimit {
b := op.input.Next(ctx)
batchSize := b.Length()
if b.Length() == 0 {
if batchSize == 0 {
break
}
bufferedTupleCount += batchSize
op.allocator.PerformOperation(op.scratch.vecs, func() {
for i, colIdx := range op.valCols {
op.scratch.vecs[i].Append(
coldata.SliceArgs{
ColType: op.valTypes[i],
Src: b.ColVec(int(colIdx)),
Sel: b.Selection(),
DestIdx: op.scratch.Length(),
SrcEndIdx: batchSize,
},
)
}
op.allocator.PerformOperation(op.scratch.ColVecs(), func() {
op.scratch.append(b, 0 /* startIdx */, batchSize)
})
op.scratch.SetLength(bufferedTupleCount)
}

return bufferedTupleCount == 0
return op.scratch.Length() == 0
}

func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) {
Expand All @@ -408,8 +341,8 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) {
for _, colIdx := range op.groupCols {
rehash(ctx,
hashBuffer,
op.valTypes[colIdx],
op.scratch.vecs[colIdx],
op.inputTypes[colIdx],
op.scratch.ColVec(int(colIdx)),
nKeys,
nil, /* sel */
op.cancelChecker,
Expand Down Expand Up @@ -489,14 +422,14 @@ func (op *hashAggregator) onlineAgg() {
aggFunc.keyIdx = keyIdx

// Store the key of the current aggregating group into keyMapping.
op.allocator.PerformOperation(op.keyMappingVecs, func() {
op.allocator.PerformOperation(op.keyMapping.ColVecs(), func() {
for keyIdx, colIdx := range op.groupCols {
// TODO(azhng): Try to preallocate enough memory so instead of
// .Append() we can use execgen.SET to improve the
// performance.
op.keyMappingVecs[keyIdx].Append(coldata.SliceArgs{
Src: op.scratch.vecs[colIdx],
ColType: op.valTypes[colIdx],
op.keyMapping.ColVec(keyIdx).Append(coldata.SliceArgs{
Src: op.scratch.ColVec(int(colIdx)),
ColType: op.inputTypes[colIdx],
DestIdx: aggFunc.keyIdx,
SrcStartIdx: remaining[0],
SrcEndIdx: remaining[0] + 1,
Expand Down
53 changes: 12 additions & 41 deletions pkg/sql/colexec/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type hashTable struct {
// table. A key tuple is defined as the elements in each row of vals that
// makes up the equality columns. The ID of a key at any index of vals is
// index + 1.
vals coldata.Batch
vals *appendOnlyBufferedBatch
// valTypes stores the corresponding types of the val columns.
valTypes []coltypes.T
// keyCols stores the corresponding types of key columns.
Expand Down Expand Up @@ -182,7 +182,7 @@ func newHashTable(
differs: make([]bool, coldata.BatchSize()),
},

vals: allocator.NewMemBatchWithSize(sourceTypes, 0 /* initialSize */),
vals: newAppendOnlyBufferedBatch(allocator, sourceTypes, 0 /* initialSize */),
valTypes: sourceTypes,
keyCols: eqCols,
keyTypes: keyTypes,
Expand Down Expand Up @@ -217,7 +217,9 @@ func (ht *hashTable) build(ctx context.Context, input Operator) {
break
}

ht.loadBatch(batch)
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})
}

keyCols := make([]coldata.Vec, nKeyCols)
Expand All @@ -237,7 +239,6 @@ func (ht *hashTable) build(ctx context.Context, input Operator) {
}

srcVecs := batch.ColVecs()
targetVecs := ht.vals.ColVecs()

for i := 0; i < nKeyCols; i++ {
ht.probeScratch.keys[i] = srcVecs[ht.keyCols[i]]
Expand All @@ -261,28 +262,19 @@ func (ht *hashTable) build(ctx context.Context, input Operator) {

ht.removeDuplicates(batch, ht.probeScratch.keys, ht.probeScratch.first, ht.probeScratch.next, ht.checkProbeForDistinct)

// We only check duplicates when there is tuple buffered.
if ht.vals.Length() > 0 {
numBuffered := ht.vals.Length()
// We only check duplicates when there is at least one buffered
// tuple.
if numBuffered > 0 {
ht.removeDuplicates(batch, ht.probeScratch.keys, ht.buildScratch.first, ht.buildScratch.next, ht.checkBuildForDistinct)
}

ht.allocator.PerformOperation(targetVecs, func() {
for i, typ := range ht.valTypes {
targetVecs[i].Append(
coldata.SliceArgs{
ColType: typ,
Src: srcVecs[i],
Sel: batch.Selection(),
DestIdx: ht.vals.Length(),
SrcEndIdx: batch.Length(),
},
)
}
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})

ht.buildScratch.next = append(ht.buildScratch.next, ht.probeScratch.hashBuffer[:batch.Length()]...)
ht.buildNextChains(ctx, ht.buildScratch.first, ht.buildScratch.next, ht.vals.Length()+1, batch.Length())
ht.vals.SetLength(ht.vals.Length() + batch.Length())
ht.buildNextChains(ctx, ht.buildScratch.first, ht.buildScratch.next, numBuffered+1, batch.Length())
}
default:
execerror.VectorizedInternalPanic(fmt.Sprintf("hashTable in unhandled state"))
Expand Down Expand Up @@ -355,27 +347,6 @@ func (ht *hashTable) checkColsForDistinctTuples(
}
}

// loadBatch appends a new batch of keys and outputs to the existing keys and
// output columns.
func (ht *hashTable) loadBatch(batch coldata.Batch) {
batchSize := batch.Length()
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
htSize := ht.vals.Length()
for i, typ := range ht.valTypes {
ht.vals.ColVec(i).Append(
coldata.SliceArgs{
ColType: typ,
Src: batch.ColVec(i),
Sel: batch.Selection(),
DestIdx: htSize,
SrcEndIdx: batchSize,
},
)
}
ht.vals.SetLength(htSize + batchSize)
})
}

// computeBuckets computes the hash value of each key and stores the result in
// buckets.
func (ht *hashTable) computeBuckets(
Expand Down
22 changes: 6 additions & 16 deletions pkg/sql/colexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ type allSpooler struct {
// inputTypes contains the types of all of the columns from the input.
inputTypes []coltypes.T
// bufferedTuples stores all the values from the input after spooling. Each
// Vec in this slice is the entire column from the input.
bufferedTuples coldata.Batch
// Vec in this batch is the entire column from the input.
bufferedTuples *appendOnlyBufferedBatch
// spooled indicates whether spool() has already been called.
spooled bool
windowedBatch coldata.Batch
Expand All @@ -123,7 +123,9 @@ func newAllSpooler(allocator *Allocator, input Operator, inputTypes []coltypes.T

func (p *allSpooler) init() {
p.input.Init()
p.bufferedTuples = p.allocator.NewMemBatchWithSize(p.inputTypes, 0 /* size */)
p.bufferedTuples = newAppendOnlyBufferedBatch(
p.allocator, p.inputTypes, 0, /* initialSize */
)
p.windowedBatch = p.allocator.NewMemBatchWithSize(p.inputTypes, 0 /* size */)
}

Expand All @@ -134,19 +136,7 @@ func (p *allSpooler) spool(ctx context.Context) {
p.spooled = true
for batch := p.input.Next(ctx); batch.Length() != 0; batch = p.input.Next(ctx) {
p.allocator.PerformOperation(p.bufferedTuples.ColVecs(), func() {
numBufferedTuples := p.bufferedTuples.Length()
for i, colVec := range p.bufferedTuples.ColVecs() {
colVec.Append(
coldata.SliceArgs{
ColType: p.inputTypes[i],
Src: batch.ColVec(i),
Sel: batch.Selection(),
DestIdx: numBufferedTuples,
SrcEndIdx: batch.Length(),
},
)
}
p.bufferedTuples.SetLength(numBufferedTuples + batch.Length())
p.bufferedTuples.append(batch, 0 /* startIdx */, batch.Length())
})
}
}
Expand Down
Loading

0 comments on commit 21d1e9b

Please sign in to comment.