Skip to content

Commit

Permalink
Merge pull request #102100 from yuzefovich/backport23.1-101971
Browse files Browse the repository at this point in the history
release-23.1: colexec: fix hash table memory accounting for hash aggregator
  • Loading branch information
yuzefovich authored May 12, 2023
2 parents aecf1f1 + fea4f77 commit 44d220c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
27 changes: 17 additions & 10 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type hashTableProbeBuffer struct {

///////////////////////////////////////////////////////////////
// Slices below are allocated dynamically but are limited by //
// coldata.BatchSize() in size. //
// HashTable.maxProbingBatchLength in size. //
///////////////////////////////////////////////////////////////

// ToCheck stores the indices of tuples from the probing batch for which we
Expand Down Expand Up @@ -218,7 +218,8 @@ type hashTableProbeBuffer struct {
// find possible matches for each tuple. For more details see the comments on
// hashAggregator.onlineAgg and DistinctBuild.
type HashTable struct {
allocator *colmem.Allocator
allocator *colmem.Allocator
maxProbingBatchLength int

// unlimitedSlicesNumUint64AccountedFor stores the number of uint64 from
// the unlimited slices that we have already accounted for.
Expand Down Expand Up @@ -285,6 +286,8 @@ var _ colexecop.Resetter = &HashTable{}
// - allocator must be the allocator that is owned by the hash table and not
// shared with any other components.
//
// - maxProbingBatchLength indicates the maximum size of the probing batch.
//
// - loadFactor determines the average number of tuples per bucket which, if
// exceeded, will trigger resizing the hash table. This number can have a
// noticeable effect on the performance, so every user of the hash table should
Expand All @@ -309,6 +312,7 @@ var _ colexecop.Resetter = &HashTable{}
func NewHashTable(
ctx context.Context,
allocator *colmem.Allocator,
maxProbingBatchLength int,
loadFactor float64,
initialNumHashBuckets uint64,
sourceTypes []*types.T,
Expand Down Expand Up @@ -350,7 +354,8 @@ func NewHashTable(
colexecerror.InternalError(errors.AssertionFailedf("unknown HashTableBuildMode %d", buildMode))
}
ht := &HashTable{
allocator: allocator,
allocator: allocator,
maxProbingBatchLength: maxProbingBatchLength,
BuildScratch: hashChains{
First: make([]keyID, initialNumHashBuckets),
},
Expand Down Expand Up @@ -396,8 +401,8 @@ func (ht *HashTable) shouldResize(numTuples int) bool {
}

// probeBufferInternalMaxMemUsed returns the maximum memory used by the slices
// of hashTableProbeBuffer that are limited by coldata.BatchSize() in size.
func probeBufferInternalMaxMemUsed() int64 {
// of hashTableProbeBuffer that are limited by maxProbingBatchLength in size.
func probeBufferInternalMaxMemUsed(maxProbingBatchLength int) int64 {
// probeBufferInternalMaxMemUsed accounts for:
// - five uint64 slices:
// - hashTableProbeBuffer.hashChains.Next
Expand All @@ -408,17 +413,19 @@ func probeBufferInternalMaxMemUsed() int64 {
// - two bool slices:
// - hashTableProbeBuffer.differs
// - hashTableProbeBuffer.distinct.
return memsize.Uint64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
return memsize.Uint64*int64(5*maxProbingBatchLength) + memsize.Bool*int64(2*maxProbingBatchLength)
}

// accountForLimitedSlices checks whether we have already accounted for the
// memory used by the slices that are limited by coldata.BatchSize() in size
// memory used by the slices that are limited by maxProbingBatchLength in size
// and adjusts the allocator accordingly if we haven't.
func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocator) {
func (p *hashTableProbeBuffer) accountForLimitedSlices(
allocator *colmem.Allocator, maxProbingBatchLength int,
) {
if p.limitedSlicesAreAccountedFor {
return
}
allocator.AdjustMemoryUsage(probeBufferInternalMaxMemUsed())
allocator.AdjustMemoryUsage(probeBufferInternalMaxMemUsed(maxProbingBatchLength))
p.limitedSlicesAreAccountedFor = true
}

Expand All @@ -436,7 +443,7 @@ func (ht *HashTable) buildFromBufferedTuples() {
}
// Account for memory used by the internal auxiliary slices that are limited
// in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
ht.ProbeScratch.accountForLimitedSlices(ht.allocator, ht.maxProbingBatchLength)
// Figure out the minimum capacities of the unlimited slices before actually
// allocating then.
needCapacity := int64(ht.numBuckets) + int64(ht.Vals.Length()+1) // ht.BuildScratch.First + ht.BuildScratch.Next
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (hj *hashJoiner) Init(ctx context.Context) {
hj.ht = colexechash.NewHashTable(
ctx,
hj.hashTableAllocator,
coldata.BatchSize(),
hashTableLoadFactor,
hj.hashTableInitialNumBuckets,
hj.spec.Right.SourceTypes,
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (op *hashAggregator) Init(ctx context.Context) {
op.ht = colexechash.NewHashTable(
op.Ctx,
op.hashTableAllocator,
// The hash aggregator will buffer tuples from the input until it has
// hashAggregatorMaxBuffered of them. This is coldata.MaxBatchSize in
// production builds.
hashAggregatorMaxBuffered,
hashTableLoadFactor,
hashTableNumBuckets,
op.inputTypes,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/unordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (op *UnorderedDistinct) Init(ctx context.Context) {
op.Ht = colexechash.NewHashTable(
op.Ctx,
op.hashTableAllocator,
coldata.BatchSize(),
hashTableLoadFactor,
hashTableNumBuckets,
op.typs,
Expand Down

0 comments on commit 44d220c

Please sign in to comment.