Skip to content

Commit

Permalink
Merge pull request cockroachdb#86323 from yuzefovich/backport21.2-84229
Browse files Browse the repository at this point in the history
release-21.2: colexechash: improve memory accounting in the hash table
  • Loading branch information
yuzefovich authored Aug 17, 2022
2 parents 13c13b5 + 7906075 commit c32478f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 26 deletions.
94 changes: 69 additions & 25 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,43 +304,89 @@ func (ht *HashTable) shouldResize(numTuples int) bool {
return float64(numTuples)/float64(ht.numBuckets) > ht.loadFactor
}

// probeBufferInternalMaxMemUsed returns the maximum memory used by the slices
// of hashTableProbeBuffer that are limited by coldata.BatchSize() in size.
func probeBufferInternalMaxMemUsed() int64 {
// probeBufferInternalMaxMemUsed accounts for:
// - five uint64 slices:
// - hashTableProbeBuffer.Next
// - hashTableProbeBuffer.HeadID
// - hashTableProbeBuffer.GroupID
// - hashTableProbeBuffer.ToCheck
// - hashTableProbeBuffer.HashBuffer
// - two bool slices:
// - hashTableProbeBuffer.differs
// - hashTableProbeBuffer.distinct.
return memsize.Uint64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
}

// accountForLimitedSlices checks whether we have already accounted for the
// memory used by the slices that are limited by coldata.BatchSize() in size
// and adjusts the allocator accordingly if we haven't.
func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocator) {
if p.limitedSlicesAreAccountedFor {
return
}
internalMemMaxUsed := memsize.Int64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
allocator.AdjustMemoryUsage(internalMemMaxUsed)
allocator.AdjustMemoryUsage(probeBufferInternalMaxMemUsed())
p.limitedSlicesAreAccountedFor = true
}

func (ht *HashTable) accountForUnlimitedSlices(newUint64Count int64) {
ht.allocator.AdjustMemoryUsage(memsize.Uint64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}

func (ht *HashTable) unlimitedSlicesCapacity() int64 {
// Note that if ht.ProbeScratch.First is nil, it'll have zero capacity.
return int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next))
}

// buildFromBufferedTuples builds the hash table from already buffered tuples
// in ht.Vals. It'll determine the appropriate number of buckets that satisfy
// the target load factor.
func (ht *HashTable) buildFromBufferedTuples() {
for ht.shouldResize(ht.Vals.Length()) {
ht.numBuckets *= 2
}
// Account for memory used by the internal auxiliary slices that are limited
// in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
// Figure out the minimum capacities of the unlimited slices before actually
// allocating then.
needCapacity := int64(ht.numBuckets) + int64(ht.Vals.Length()+1) // ht.BuildScratch.First + ht.BuildScratch.Next
if ht.ProbeScratch.First != nil {
needCapacity += int64(ht.numBuckets) // ht.ProbeScratch.First
}
if needCapacity > ht.unlimitedSlicesCapacity() {
// We'll need to allocate larger slices then we currently have, so
// perform the memory accounting for the anticipated memory usage. Note
// that it might not be precise, so we'll reconcile after the
// allocations below.
ht.accountForUnlimitedSlices(needCapacity)
}
// Perform the actual build.
ht.buildFromBufferedTuplesNoAccounting()
// Now ensure that the accounting is precise (cap's of the slices might
// exceed len's that we've accounted for).
ht.accountForUnlimitedSlices(ht.unlimitedSlicesCapacity())
}

// buildFromBufferedTuples builds the hash table from already buffered tuples in
// ht.Vals according to the current number of buckets. No memory accounting is
// performed, so this function is guaranteed to not throw a memory error.
func (ht *HashTable) buildFromBufferedTuplesNoAccounting() {
ht.BuildScratch.First = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.First, int(ht.numBuckets))
if ht.ProbeScratch.First != nil {
ht.ProbeScratch.First = colexecutils.MaybeAllocateUint64Array(ht.ProbeScratch.First, int(ht.numBuckets))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1)

for i, keyCol := range ht.keyCols {
ht.Keys[i] = ht.Vals.ColVec(int(keyCol))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1)
ht.ComputeBuckets(ht.BuildScratch.Next[1:], ht.Keys, ht.Vals.Length(), nil /* sel */)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, 1 /* offset */, uint64(ht.Vals.Length()))
// Account for memory used by the internal auxiliary slices that are
// limited in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
// Note that if ht.ProbeScratch.first is nil, it'll have zero capacity.
newUint64Count := int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next))
ht.allocator.AdjustMemoryUsage(memsize.Int64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}

// FullBuild executes the entirety of the hash table build phase using the input
Expand Down Expand Up @@ -486,20 +532,18 @@ func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) {
}
}

// MaybeRepairAfterDistinctBuild checks whether the hash table built via
// DistinctBuild is in an inconsistent state and repairs it if so.
func (ht *HashTable) MaybeRepairAfterDistinctBuild() {
// BuildScratch.Next has an extra 0th element not used by the tuples
// reserved for the end of the chain.
if len(ht.BuildScratch.Next) < ht.Vals.Length()+1 {
// The hash table in such a state that some distinct tuples were
// appended to ht.Vals, but 'next' and 'first' slices were not updated
// accordingly.
numConsistentTuples := len(ht.BuildScratch.Next) - 1
lastBatchNumDistinctTuples := ht.Vals.Length() - numConsistentTuples
ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples]...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, uint64(numConsistentTuples)+1, uint64(lastBatchNumDistinctTuples))
}
// RepairAfterDistinctBuild rebuilds the hash table populated via DistinctBuild
// in the case a memory error was thrown.
func (ht *HashTable) RepairAfterDistinctBuild() {
// Note that we don't try to be smart and "repair" the already built hash
// table in which only the most recently added tuples might need to be
// "built". We do it this way because the memory error could have occurred
// in several spots making it harder to reason about what are the
// "consistent" tuples and what are those that need to be repaired. This is
// a minor performance hit, but it is done only once throughout the lifetime
// of the unordered distinct when it just spilled to disk, so the regression
// is ok.
ht.buildFromBufferedTuplesNoAccounting()
}

// checkCols performs a column by column checkCol on the key columns.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/memsize",
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -227,6 +228,15 @@ type hashJoiner struct {
rowIdx int
}

// accountedFor tracks how much memory the hash joiner has accounted for so
// far related to some of the internal slices in the hash table.
accountedFor struct {
// hashtableSame tracks the current memory usage of hj.ht.Same.
hashtableSame int64
// hashtableVisited tracks the current memory usage of hj.ht.Visited.
hashtableVisited int64
}

exportBufferedState struct {
rightExported int
rightWindowedBatch coldata.Batch
Expand Down Expand Up @@ -323,11 +333,25 @@ func (hj *hashJoiner) build() {
// We don't need same with LEFT ANTI and EXCEPT ALL joins because
// they have separate collectLeftAnti method.
hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame
// to fallback to disk, thus, we use the unlimited allocator.
newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same))
// hj.ht.Same will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableSame)
hj.accountedFor.hashtableSame = newAccountedFor
}
if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() {
// visited slice is also used for set-operation joins, regardless of
// the fact whether the right side is distinct.
hj.ht.Visited = colexecutils.MaybeAllocateBoolArray(hj.ht.Visited, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame
// to fallback to disk, thus, we use the unlimited allocator.
newAccountedFor := memsize.Bool * int64(cap(hj.ht.Visited))
// hj.ht.Visited will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableVisited)
hj.accountedFor.hashtableVisited = newAccountedFor
// Since keyID = 0 is reserved for end of list, it can be marked as visited
// at the beginning.
hj.ht.Visited[0] = true
Expand Down Expand Up @@ -722,6 +746,10 @@ func (hj *hashJoiner) Reset(ctx context.Context) {
}
}
hj.state = hjBuilding
// Note that hj.ht.Reset() doesn't reset hj.ht.Same and hj.ht.Visited
// slices, but we'll reset them manually in hj.build(). We also keep
// references to those slices, so we don't release any of the memory we've
// accounted for.
hj.ht.Reset(ctx)
// Note that we don't zero out hj.probeState.buildIdx,
// hj.probeState.probeIdx, and hj.probeState.probeRowUnmatched because the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/unordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *unorderedDistinctFilterer) Next() coldata.Batch {
//
// See https://github.com/cockroachdb/cockroach/pull/58006#pullrequestreview-565859919
// for all the gory details.
f.ud.ht.MaybeRepairAfterDistinctBuild()
f.ud.ht.RepairAfterDistinctBuild()
f.ud.MaybeEmitErrorOnDup(f.ud.lastInputBatchOrigLen, batch.Length())
f.seenBatch = true
return batch
Expand Down

0 comments on commit c32478f

Please sign in to comment.