Skip to content

Commit

Permalink
colexechash: improve memory accounting in the hash table
Browse files Browse the repository at this point in the history
This commit improves the memory accounting in the hash table to be more
precise in the case when the `distsql_workmem` limit is exhausted.
Previously, we would allocate large slices first only to perform the
memory accounting after the fact, possibly running out of budget which
would result in an error being thrown. We'd end up in a situation where
the hash table is still referencing larger newly-allocated slices while
only the previous memory usage is accounted for. This commit makes it so
that we account for the needed capacity upfront, then perform the
allocation, and then reconcile the accounting if necessary. This way
we're much more likely to encounter the budget error before making the
large allocations.

Additionally, this commit accounts for some internal slices in the hash
table used only in the hash joiner case.

This required a minor change to the way the unordered distinct spills to
disk. Previously, the memory error could only occur in two spots (and
one of those would leave the hash table in an inconsistent state and we
were "smart" in how we repaired that). However, now the memory error
could occur in more spots (and we could have several different
inconsistent states), so this commit chooses a slight performance
regression of simply rebuilding the hash table from scratch, once, when
the unordered distinct spills to disk.

Release note: None
  • Loading branch information
yuzefovich committed Aug 17, 2022
1 parent 13c13b5 commit 7906075
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 26 deletions.
94 changes: 69 additions & 25 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,43 +304,89 @@ func (ht *HashTable) shouldResize(numTuples int) bool {
return float64(numTuples)/float64(ht.numBuckets) > ht.loadFactor
}

// probeBufferInternalMaxMemUsed returns the maximum memory used by the slices
// of hashTableProbeBuffer that are limited by coldata.BatchSize() in size.
func probeBufferInternalMaxMemUsed() int64 {
// probeBufferInternalMaxMemUsed accounts for:
// - five uint64 slices:
// - hashTableProbeBuffer.Next
// - hashTableProbeBuffer.HeadID
// - hashTableProbeBuffer.GroupID
// - hashTableProbeBuffer.ToCheck
// - hashTableProbeBuffer.HashBuffer
// - two bool slices:
// - hashTableProbeBuffer.differs
// - hashTableProbeBuffer.distinct.
return memsize.Uint64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
}

// accountForLimitedSlices checks whether we have already accounted for the
// memory used by the slices that are limited by coldata.BatchSize() in size
// and adjusts the allocator accordingly if we haven't.
func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocator) {
if p.limitedSlicesAreAccountedFor {
return
}
internalMemMaxUsed := memsize.Int64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
allocator.AdjustMemoryUsage(internalMemMaxUsed)
allocator.AdjustMemoryUsage(probeBufferInternalMaxMemUsed())
p.limitedSlicesAreAccountedFor = true
}

func (ht *HashTable) accountForUnlimitedSlices(newUint64Count int64) {
ht.allocator.AdjustMemoryUsage(memsize.Uint64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}

func (ht *HashTable) unlimitedSlicesCapacity() int64 {
// Note that if ht.ProbeScratch.First is nil, it'll have zero capacity.
return int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next))
}

// buildFromBufferedTuples builds the hash table from already buffered tuples
// in ht.Vals. It'll determine the appropriate number of buckets that satisfy
// the target load factor.
func (ht *HashTable) buildFromBufferedTuples() {
for ht.shouldResize(ht.Vals.Length()) {
ht.numBuckets *= 2
}
// Account for memory used by the internal auxiliary slices that are limited
// in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
// Figure out the minimum capacities of the unlimited slices before actually
// allocating then.
needCapacity := int64(ht.numBuckets) + int64(ht.Vals.Length()+1) // ht.BuildScratch.First + ht.BuildScratch.Next
if ht.ProbeScratch.First != nil {
needCapacity += int64(ht.numBuckets) // ht.ProbeScratch.First
}
if needCapacity > ht.unlimitedSlicesCapacity() {
// We'll need to allocate larger slices then we currently have, so
// perform the memory accounting for the anticipated memory usage. Note
// that it might not be precise, so we'll reconcile after the
// allocations below.
ht.accountForUnlimitedSlices(needCapacity)
}
// Perform the actual build.
ht.buildFromBufferedTuplesNoAccounting()
// Now ensure that the accounting is precise (cap's of the slices might
// exceed len's that we've accounted for).
ht.accountForUnlimitedSlices(ht.unlimitedSlicesCapacity())
}

// buildFromBufferedTuples builds the hash table from already buffered tuples in
// ht.Vals according to the current number of buckets. No memory accounting is
// performed, so this function is guaranteed to not throw a memory error.
func (ht *HashTable) buildFromBufferedTuplesNoAccounting() {
ht.BuildScratch.First = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.First, int(ht.numBuckets))
if ht.ProbeScratch.First != nil {
ht.ProbeScratch.First = colexecutils.MaybeAllocateUint64Array(ht.ProbeScratch.First, int(ht.numBuckets))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1)

for i, keyCol := range ht.keyCols {
ht.Keys[i] = ht.Vals.ColVec(int(keyCol))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1)
ht.ComputeBuckets(ht.BuildScratch.Next[1:], ht.Keys, ht.Vals.Length(), nil /* sel */)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, 1 /* offset */, uint64(ht.Vals.Length()))
// Account for memory used by the internal auxiliary slices that are
// limited in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
// Note that if ht.ProbeScratch.first is nil, it'll have zero capacity.
newUint64Count := int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next))
ht.allocator.AdjustMemoryUsage(memsize.Int64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}

// FullBuild executes the entirety of the hash table build phase using the input
Expand Down Expand Up @@ -486,20 +532,18 @@ func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) {
}
}

// MaybeRepairAfterDistinctBuild checks whether the hash table built via
// DistinctBuild is in an inconsistent state and repairs it if so.
func (ht *HashTable) MaybeRepairAfterDistinctBuild() {
// BuildScratch.Next has an extra 0th element not used by the tuples
// reserved for the end of the chain.
if len(ht.BuildScratch.Next) < ht.Vals.Length()+1 {
// The hash table in such a state that some distinct tuples were
// appended to ht.Vals, but 'next' and 'first' slices were not updated
// accordingly.
numConsistentTuples := len(ht.BuildScratch.Next) - 1
lastBatchNumDistinctTuples := ht.Vals.Length() - numConsistentTuples
ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples]...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, uint64(numConsistentTuples)+1, uint64(lastBatchNumDistinctTuples))
}
// RepairAfterDistinctBuild rebuilds the hash table populated via DistinctBuild
// in the case a memory error was thrown.
func (ht *HashTable) RepairAfterDistinctBuild() {
// Note that we don't try to be smart and "repair" the already built hash
// table in which only the most recently added tuples might need to be
// "built". We do it this way because the memory error could have occurred
// in several spots making it harder to reason about what are the
// "consistent" tuples and what are those that need to be repaired. This is
// a minor performance hit, but it is done only once throughout the lifetime
// of the unordered distinct when it just spilled to disk, so the regression
// is ok.
ht.buildFromBufferedTuplesNoAccounting()
}

// checkCols performs a column by column checkCol on the key columns.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/memsize",
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -227,6 +228,15 @@ type hashJoiner struct {
rowIdx int
}

// accountedFor tracks how much memory the hash joiner has accounted for so
// far related to some of the internal slices in the hash table.
accountedFor struct {
// hashtableSame tracks the current memory usage of hj.ht.Same.
hashtableSame int64
// hashtableVisited tracks the current memory usage of hj.ht.Visited.
hashtableVisited int64
}

exportBufferedState struct {
rightExported int
rightWindowedBatch coldata.Batch
Expand Down Expand Up @@ -323,11 +333,25 @@ func (hj *hashJoiner) build() {
// We don't need same with LEFT ANTI and EXCEPT ALL joins because
// they have separate collectLeftAnti method.
hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame
// to fallback to disk, thus, we use the unlimited allocator.
newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same))
// hj.ht.Same will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableSame)
hj.accountedFor.hashtableSame = newAccountedFor
}
if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() {
// visited slice is also used for set-operation joins, regardless of
// the fact whether the right side is distinct.
hj.ht.Visited = colexecutils.MaybeAllocateBoolArray(hj.ht.Visited, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame
// to fallback to disk, thus, we use the unlimited allocator.
newAccountedFor := memsize.Bool * int64(cap(hj.ht.Visited))
// hj.ht.Visited will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableVisited)
hj.accountedFor.hashtableVisited = newAccountedFor
// Since keyID = 0 is reserved for end of list, it can be marked as visited
// at the beginning.
hj.ht.Visited[0] = true
Expand Down Expand Up @@ -722,6 +746,10 @@ func (hj *hashJoiner) Reset(ctx context.Context) {
}
}
hj.state = hjBuilding
// Note that hj.ht.Reset() doesn't reset hj.ht.Same and hj.ht.Visited
// slices, but we'll reset them manually in hj.build(). We also keep
// references to those slices, so we don't release any of the memory we've
// accounted for.
hj.ht.Reset(ctx)
// Note that we don't zero out hj.probeState.buildIdx,
// hj.probeState.probeIdx, and hj.probeState.probeRowUnmatched because the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/unordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *unorderedDistinctFilterer) Next() coldata.Batch {
//
// See https://github.com/cockroachdb/cockroach/pull/58006#pullrequestreview-565859919
// for all the gory details.
f.ud.ht.MaybeRepairAfterDistinctBuild()
f.ud.ht.RepairAfterDistinctBuild()
f.ud.MaybeEmitErrorOnDup(f.ud.lastInputBatchOrigLen, batch.Length())
f.seenBatch = true
return batch
Expand Down

0 comments on commit 7906075

Please sign in to comment.