From 7906075976a84d60624a6c721576b3aad7e2d42a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 11 Jul 2022 19:24:36 -0700 Subject: [PATCH] colexechash: improve memory accounting in the hash table This commit improves the memory accounting in the hash table to be more precise in the case when the `distsql_workmem` limit is exhausted. Previously, we would allocate large slices first only to perform the memory accounting after the fact, possibly running out of budget which would result in an error being thrown. We'd end up in a situation where the hash table is still referencing larger newly-allocated slices while only the previous memory usage is accounted for. This commit makes it so that we account for the needed capacity upfront, then perform the allocation, and then reconcile the accounting if necessary. This way we're much more likely to encounter the budget error before making the large allocations. Additionally, this commit accounts for some internal slices in the hash table used only in the hash joiner case. This required a minor change to the way the unordered distinct spills to disk. Previously, the memory error could only occur in two spots (and one of those would leave the hash table in an inconsistent state and we were "smart" in how we repaired that). However, now the memory error could occur in more spots (and we could have several different inconsistent states), so this commit chooses a slight performance regression of simply rebuilding the hash table from scratch, once, when the unordered distinct spills to disk. Release note: None --- pkg/sql/colexec/colexechash/hashtable.go | 94 +++++++++++++++++------ pkg/sql/colexec/colexecjoin/BUILD.bazel | 1 + pkg/sql/colexec/colexecjoin/hashjoiner.go | 28 +++++++ pkg/sql/colexec/unordered_distinct.go | 2 +- 4 files changed, 99 insertions(+), 26 deletions(-) diff --git a/pkg/sql/colexec/colexechash/hashtable.go b/pkg/sql/colexec/colexechash/hashtable.go index 4cce95437949..3d6e1a557b54 100644 --- a/pkg/sql/colexec/colexechash/hashtable.go +++ b/pkg/sql/colexec/colexechash/hashtable.go @@ -304,6 +304,22 @@ func (ht *HashTable) shouldResize(numTuples int) bool { return float64(numTuples)/float64(ht.numBuckets) > ht.loadFactor } +// probeBufferInternalMaxMemUsed returns the maximum memory used by the slices +// of hashTableProbeBuffer that are limited by coldata.BatchSize() in size. +func probeBufferInternalMaxMemUsed() int64 { + // probeBufferInternalMaxMemUsed accounts for: + // - five uint64 slices: + // - hashTableProbeBuffer.Next + // - hashTableProbeBuffer.HeadID + // - hashTableProbeBuffer.GroupID + // - hashTableProbeBuffer.ToCheck + // - hashTableProbeBuffer.HashBuffer + // - two bool slices: + // - hashTableProbeBuffer.differs + // - hashTableProbeBuffer.distinct. + return memsize.Uint64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize()) +} + // accountForLimitedSlices checks whether we have already accounted for the // memory used by the slices that are limited by coldata.BatchSize() in size // and adjusts the allocator accordingly if we haven't. @@ -311,11 +327,20 @@ func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocat if p.limitedSlicesAreAccountedFor { return } - internalMemMaxUsed := memsize.Int64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize()) - allocator.AdjustMemoryUsage(internalMemMaxUsed) + allocator.AdjustMemoryUsage(probeBufferInternalMaxMemUsed()) p.limitedSlicesAreAccountedFor = true } +func (ht *HashTable) accountForUnlimitedSlices(newUint64Count int64) { + ht.allocator.AdjustMemoryUsage(memsize.Uint64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor)) + ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count +} + +func (ht *HashTable) unlimitedSlicesCapacity() int64 { + // Note that if ht.ProbeScratch.First is nil, it'll have zero capacity. + return int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next)) +} + // buildFromBufferedTuples builds the hash table from already buffered tuples // in ht.Vals. It'll determine the appropriate number of buckets that satisfy // the target load factor. @@ -323,24 +348,45 @@ func (ht *HashTable) buildFromBufferedTuples() { for ht.shouldResize(ht.Vals.Length()) { ht.numBuckets *= 2 } + // Account for memory used by the internal auxiliary slices that are limited + // in size. + ht.ProbeScratch.accountForLimitedSlices(ht.allocator) + // Figure out the minimum capacities of the unlimited slices before actually + // allocating then. + needCapacity := int64(ht.numBuckets) + int64(ht.Vals.Length()+1) // ht.BuildScratch.First + ht.BuildScratch.Next + if ht.ProbeScratch.First != nil { + needCapacity += int64(ht.numBuckets) // ht.ProbeScratch.First + } + if needCapacity > ht.unlimitedSlicesCapacity() { + // We'll need to allocate larger slices then we currently have, so + // perform the memory accounting for the anticipated memory usage. Note + // that it might not be precise, so we'll reconcile after the + // allocations below. + ht.accountForUnlimitedSlices(needCapacity) + } + // Perform the actual build. + ht.buildFromBufferedTuplesNoAccounting() + // Now ensure that the accounting is precise (cap's of the slices might + // exceed len's that we've accounted for). + ht.accountForUnlimitedSlices(ht.unlimitedSlicesCapacity()) +} + +// buildFromBufferedTuples builds the hash table from already buffered tuples in +// ht.Vals according to the current number of buckets. No memory accounting is +// performed, so this function is guaranteed to not throw a memory error. +func (ht *HashTable) buildFromBufferedTuplesNoAccounting() { ht.BuildScratch.First = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.First, int(ht.numBuckets)) if ht.ProbeScratch.First != nil { ht.ProbeScratch.First = colexecutils.MaybeAllocateUint64Array(ht.ProbeScratch.First, int(ht.numBuckets)) } + // ht.BuildScratch.Next is used to store the computed hash value of each key. + ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1) + for i, keyCol := range ht.keyCols { ht.Keys[i] = ht.Vals.ColVec(int(keyCol)) } - // ht.BuildScratch.Next is used to store the computed hash value of each key. - ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1) ht.ComputeBuckets(ht.BuildScratch.Next[1:], ht.Keys, ht.Vals.Length(), nil /* sel */) ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, 1 /* offset */, uint64(ht.Vals.Length())) - // Account for memory used by the internal auxiliary slices that are - // limited in size. - ht.ProbeScratch.accountForLimitedSlices(ht.allocator) - // Note that if ht.ProbeScratch.first is nil, it'll have zero capacity. - newUint64Count := int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next)) - ht.allocator.AdjustMemoryUsage(memsize.Int64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor)) - ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count } // FullBuild executes the entirety of the hash table build phase using the input @@ -486,20 +532,18 @@ func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) { } } -// MaybeRepairAfterDistinctBuild checks whether the hash table built via -// DistinctBuild is in an inconsistent state and repairs it if so. -func (ht *HashTable) MaybeRepairAfterDistinctBuild() { - // BuildScratch.Next has an extra 0th element not used by the tuples - // reserved for the end of the chain. - if len(ht.BuildScratch.Next) < ht.Vals.Length()+1 { - // The hash table in such a state that some distinct tuples were - // appended to ht.Vals, but 'next' and 'first' slices were not updated - // accordingly. - numConsistentTuples := len(ht.BuildScratch.Next) - 1 - lastBatchNumDistinctTuples := ht.Vals.Length() - numConsistentTuples - ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples]...) - ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, uint64(numConsistentTuples)+1, uint64(lastBatchNumDistinctTuples)) - } +// RepairAfterDistinctBuild rebuilds the hash table populated via DistinctBuild +// in the case a memory error was thrown. +func (ht *HashTable) RepairAfterDistinctBuild() { + // Note that we don't try to be smart and "repair" the already built hash + // table in which only the most recently added tuples might need to be + // "built". We do it this way because the memory error could have occurred + // in several spots making it harder to reason about what are the + // "consistent" tuples and what are those that need to be repaired. This is + // a minor performance hit, but it is done only once throughout the lifetime + // of the unordered distinct when it just spilled to disk, so the regression + // is ok. + ht.buildFromBufferedTuplesNoAccounting() } // checkCols performs a column by column checkCol on the key columns. diff --git a/pkg/sql/colexec/colexecjoin/BUILD.bazel b/pkg/sql/colexec/colexecjoin/BUILD.bazel index 59f4c062885c..a2ac5a03b37c 100644 --- a/pkg/sql/colexec/colexecjoin/BUILD.bazel +++ b/pkg/sql/colexec/colexecjoin/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/sql/colmem", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", + "//pkg/sql/memsize", "//pkg/sql/sem/tree", # keep "//pkg/sql/types", "//pkg/util", diff --git a/pkg/sql/colexec/colexecjoin/hashjoiner.go b/pkg/sql/colexec/colexecjoin/hashjoiner.go index 9ae52277e38c..ae3aa48b1226 100644 --- a/pkg/sql/colexec/colexecjoin/hashjoiner.go +++ b/pkg/sql/colexec/colexecjoin/hashjoiner.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/memsize" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -227,6 +228,15 @@ type hashJoiner struct { rowIdx int } + // accountedFor tracks how much memory the hash joiner has accounted for so + // far related to some of the internal slices in the hash table. + accountedFor struct { + // hashtableSame tracks the current memory usage of hj.ht.Same. + hashtableSame int64 + // hashtableVisited tracks the current memory usage of hj.ht.Visited. + hashtableVisited int64 + } + exportBufferedState struct { rightExported int rightWindowedBatch coldata.Batch @@ -323,11 +333,25 @@ func (hj *hashJoiner) build() { // We don't need same with LEFT ANTI and EXCEPT ALL joins because // they have separate collectLeftAnti method. hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1) + // At this point, we have fully built the hash table on the right side + // (meaning we have fully consumed the right input), so it'd be a shame + // to fallback to disk, thus, we use the unlimited allocator. + newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same)) + // hj.ht.Same will never shrink, so the delta is non-negative. + hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableSame) + hj.accountedFor.hashtableSame = newAccountedFor } if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() { // visited slice is also used for set-operation joins, regardless of // the fact whether the right side is distinct. hj.ht.Visited = colexecutils.MaybeAllocateBoolArray(hj.ht.Visited, hj.ht.Vals.Length()+1) + // At this point, we have fully built the hash table on the right side + // (meaning we have fully consumed the right input), so it'd be a shame + // to fallback to disk, thus, we use the unlimited allocator. + newAccountedFor := memsize.Bool * int64(cap(hj.ht.Visited)) + // hj.ht.Visited will never shrink, so the delta is non-negative. + hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableVisited) + hj.accountedFor.hashtableVisited = newAccountedFor // Since keyID = 0 is reserved for end of list, it can be marked as visited // at the beginning. hj.ht.Visited[0] = true @@ -722,6 +746,10 @@ func (hj *hashJoiner) Reset(ctx context.Context) { } } hj.state = hjBuilding + // Note that hj.ht.Reset() doesn't reset hj.ht.Same and hj.ht.Visited + // slices, but we'll reset them manually in hj.build(). We also keep + // references to those slices, so we don't release any of the memory we've + // accounted for. hj.ht.Reset(ctx) // Note that we don't zero out hj.probeState.buildIdx, // hj.probeState.probeIdx, and hj.probeState.probeRowUnmatched because the diff --git a/pkg/sql/colexec/unordered_distinct.go b/pkg/sql/colexec/unordered_distinct.go index 0f433d657ed7..1875e548e927 100644 --- a/pkg/sql/colexec/unordered_distinct.go +++ b/pkg/sql/colexec/unordered_distinct.go @@ -173,7 +173,7 @@ func (f *unorderedDistinctFilterer) Next() coldata.Batch { // // See https://github.com/cockroachdb/cockroach/pull/58006#pullrequestreview-565859919 // for all the gory details. - f.ud.ht.MaybeRepairAfterDistinctBuild() + f.ud.ht.RepairAfterDistinctBuild() f.ud.MaybeEmitErrorOnDup(f.ud.lastInputBatchOrigLen, batch.Length()) f.seenBatch = true return batch