Skip to content

Commit

Permalink
colmem: allow Allocator max batch size to be customized
Browse files Browse the repository at this point in the history
Previously this was hardcoded to coldata.BatchSize or 1024, now it
can be increased or decreased.

Epic: CRDB-18892
Informs: #91831
Release note: None
  • Loading branch information
cucaroach committed Mar 15, 2023
1 parent 19e5845 commit d987fd4
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Allocator struct {
// allocation is denied by acc.
unlimitedAcc *mon.BoundAccount
factory coldata.ColumnFactory
maxBatchSize int
}

// SelVectorSize returns the memory usage of the selection vector of the given
Expand Down Expand Up @@ -157,6 +158,18 @@ func NewLimitedAllocator(
}
}

// SetMaxBatchSize use this to get more or less than the coldata.BatchSize() default.
func (a *Allocator) SetMaxBatchSize(siz int) {
a.maxBatchSize = siz
}

func (a *Allocator) getMaxBatchSize() int {
if a.maxBatchSize == 0 {
return coldata.BatchSize()
}
return a.maxBatchSize
}

// NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with the
// given vector capacity.
// Note: consider whether you want the dynamic batch size behavior (in which
Expand Down Expand Up @@ -215,14 +228,14 @@ func truncateToMemoryLimit(minDesiredCapacity int, maxBatchMemSize int64, typs [
}

// growCapacity grows the capacity exponentially or up to minDesiredCapacity
// (whichever is larger) without exceeding coldata.BatchSize().
func growCapacity(oldCapacity int, minDesiredCapacity int) int {
// (whichever is larger) without exceeding maxBatchSize.
func growCapacity(oldCapacity int, minDesiredCapacity int, maxBatchSize int) int {
newCapacity := oldCapacity * 2
if newCapacity < minDesiredCapacity {
newCapacity = minDesiredCapacity
}
if newCapacity > coldata.BatchSize() {
newCapacity = coldata.BatchSize()
if newCapacity > maxBatchSize {
newCapacity = maxBatchSize
}
return newCapacity
}
Expand Down Expand Up @@ -265,8 +278,8 @@ func (a *Allocator) resetMaybeReallocate(
colexecerror.InternalError(errors.AssertionFailedf("invalid minDesiredCapacity %d", minDesiredCapacity))
} else if minDesiredCapacity == 0 {
minDesiredCapacity = 1
} else if minDesiredCapacity > coldata.BatchSize() {
minDesiredCapacity = coldata.BatchSize()
} else if minDesiredCapacity > a.getMaxBatchSize() {
minDesiredCapacity = a.getMaxBatchSize()
}
reallocated = true
if oldBatch == nil {
Expand All @@ -277,15 +290,15 @@ func (a *Allocator) resetMaybeReallocate(
var useOldBatch bool
// Avoid calculating the memory footprint if possible.
var oldBatchMemSize int64
if oldCapacity == coldata.BatchSize() {
if oldCapacity == a.getMaxBatchSize() {
// If old batch is already of the largest capacity, we will reuse
// it.
useOldBatch = true
} else {
// Check that if we were to grow the capacity and allocate a new
// batch, the new batch would still not exceed the limit.
if estimatedMaxCapacity := truncateToMemoryLimit(
growCapacity(oldCapacity, minDesiredCapacity), maxBatchMemSize, typs,
growCapacity(oldCapacity, minDesiredCapacity, a.getMaxBatchSize()), maxBatchMemSize, typs,
); estimatedMaxCapacity < minDesiredCapacity {
// Reduce the ask according to the estimated maximum. Note that
// we do not set desiredCapacitySufficient to false since this
Expand Down Expand Up @@ -331,7 +344,7 @@ func (a *Allocator) resetMaybeReallocate(
newBatch = oldBatch
} else {
a.ReleaseMemory(oldBatchMemSize)
newCapacity := growCapacity(oldCapacity, minDesiredCapacity)
newCapacity := growCapacity(oldCapacity, minDesiredCapacity, a.getMaxBatchSize())
newCapacity = truncateToMemoryLimit(newCapacity, maxBatchMemSize, typs)
newBatch = a.NewMemBatchWithFixedCapacity(typs, newCapacity)
}
Expand Down

0 comments on commit d987fd4

Please sign in to comment.