Skip to content

Commit

Permalink
colexechash: introduce a type alias for keyID
Browse files Browse the repository at this point in the history
TODO: run benchmarks.

Release note: None
  • Loading branch information
yuzefovich committed Jan 27, 2022
1 parent 87bd158 commit 955c995
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
90 changes: 59 additions & 31 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexechash

import (
"context"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
Expand Down Expand Up @@ -62,6 +63,15 @@ const (
HashTableDeletingProbeMode
)

// keyID encodes the ordinal of the corresponding tuple.
//
// For each tuple with the ordinal 'i' (the ordinal among all tuples in the
// hash table or within a single probing batch), keyID is calculated as:
// keyID = i + 1.
//
// keyID of 0 is reserved to indicate the end of the hash chain.
type keyID uint64

// hashChains describes the partitioning of a set of tuples into singly-linked
// lists ("buckets") where all tuples in a list have the same hash.
//
Expand All @@ -70,25 +80,21 @@ const (
// First[bucket], and the following keyIDs of the tuples in the list can be
// found using Next[keyID] traversal. Whenever keyID of 0 is encountered, the
// end of the list has been reached.
//
// For each tuple with the ordinal 'i' (the ordinal among all tuples in the
// hash table or within a single probing batch), keyID is calculated as:
// keyID = i + 1.
type hashChains struct {
// First stores the first keyID of the tuple that resides in each bucket.
// The tuple is the head of the corresponding hash chain.
//
// The length of this slice is equal to the number of buckets used in the
// hash table at the moment.
First []uint64
First []keyID

// Next is a densely-packed list that stores the keyID of the next tuple in
// the hash chain, where an ID of 0 is reserved to represent the end of the
// chain.
//
// The length of this slice is equal to the number of tuples stored in the
// hash table at the moment plus one (Next[0] is unused).
Next []uint64
Next []keyID
}

// hashTableProbeBuffer stores the information related to the probing batch.
Expand Down Expand Up @@ -146,7 +152,7 @@ type hashTableProbeBuffer struct {
// ToCheckID[i] = Next[ToCheckID[i]].
// Whenever ToCheckID[i] becomes 0, there are no more matches for the ith
// probing tuple.
ToCheckID []uint64
ToCheckID []keyID

// differs stores whether the probing tuple included in ToCheck differs
// from the corresponding "candidate" tuple specified in ToCheckID.
Expand Down Expand Up @@ -186,7 +192,7 @@ type hashTableProbeBuffer struct {
// once the tuple is determined to not have duplicates with any tuples
// already in the hash table.
// See a comment on DistinctBuild for an example.
HeadID []uint64
HeadID []keyID

// HashBuffer stores the hash values of each tuple in the probing batch. It
// will be dynamically updated when the HashTable is built in distinct mode.
Expand Down Expand Up @@ -237,7 +243,7 @@ type HashTable struct {
// the hash table that has the same value as the current key. The HeadID of
// the key is the first key of that value found in the next linked list.
// This field will be lazily populated by the prober.
Same []uint64
Same []keyID
// Visited represents whether each of the corresponding keys have been
// touched by the prober.
Visited []bool
Expand Down Expand Up @@ -339,7 +345,7 @@ func NewHashTable(
ht := &HashTable{
allocator: allocator,
BuildScratch: hashChains{
First: make([]uint64, initialNumHashBuckets),
First: make([]keyID, initialNumHashBuckets),
},
Keys: make([]coldata.Vec, len(keyCols)),
Vals: colexecutils.NewAppendOnlyBufferedBatch(allocator, sourceTypes, colsToStore),
Expand All @@ -352,11 +358,11 @@ func NewHashTable(
}

if buildMode == HashTableDistinctBuildMode {
ht.ProbeScratch.First = make([]uint64, initialNumHashBuckets)
ht.ProbeScratch.First = make([]keyID, initialNumHashBuckets)
// ht.BuildScratch.Next will be populated dynamically by appending to
// it, but we need to make sure that the special keyID=0 (which
// indicates the end of the hash chain) is always present.
ht.BuildScratch.Next = []uint64{0}
ht.BuildScratch.Next = []keyID{0}
}

ht.cancelChecker.Init(ctx)
Expand Down Expand Up @@ -401,16 +407,16 @@ func (ht *HashTable) buildFromBufferedTuples() {
for ht.shouldResize(ht.Vals.Length()) {
ht.numBuckets *= 2
}
ht.BuildScratch.First = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.First, int(ht.numBuckets))
ht.BuildScratch.First = maybeAllocateKeyIDArray(ht.BuildScratch.First, int(ht.numBuckets))
if ht.ProbeScratch.First != nil {
ht.ProbeScratch.First = colexecutils.MaybeAllocateUint64Array(ht.ProbeScratch.First, int(ht.numBuckets))
ht.ProbeScratch.First = maybeAllocateKeyIDArray(ht.ProbeScratch.First, int(ht.numBuckets))
}
for i, keyCol := range ht.keyCols {
ht.Keys[i] = ht.Vals.ColVec(int(keyCol))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = colexecutils.MaybeAllocateUint64Array(ht.BuildScratch.Next, ht.Vals.Length()+1)
ht.ComputeBuckets(ht.BuildScratch.Next[1:], ht.Keys, ht.Vals.Length(), nil /* sel */)
ht.BuildScratch.Next = maybeAllocateKeyIDArray(ht.BuildScratch.Next, ht.Vals.Length()+1)
ht.ComputeBuckets(keyIDToUint64(ht.BuildScratch.Next[1:]), ht.Keys, ht.Vals.Length(), nil /* sel */)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, 1 /* offset */, uint64(ht.Vals.Length()))
// Account for memory used by the internal auxiliary slices that are
// limited in size.
Expand Down Expand Up @@ -596,10 +602,10 @@ func (ht *HashTable) ComputeHashAndBuildChains(batch coldata.Batch) {

batchLength := batch.Length()
if cap(ht.ProbeScratch.Next) < batchLength+1 {
ht.ProbeScratch.Next = make([]uint64, batchLength+1)
ht.ProbeScratch.Next = make([]keyID, batchLength+1)
}
ht.ComputeBuckets(ht.ProbeScratch.Next[1:batchLength+1], ht.Keys, batchLength, batch.Selection())
ht.ProbeScratch.HashBuffer = append(ht.ProbeScratch.HashBuffer[:0], ht.ProbeScratch.Next[1:batchLength+1]...)
ht.ComputeBuckets(keyIDToUint64(ht.ProbeScratch.Next[1:batchLength+1]), ht.Keys, batchLength, batch.Selection())
ht.ProbeScratch.HashBuffer = append(ht.ProbeScratch.HashBuffer[:0], keyIDToUint64(ht.ProbeScratch.Next[1:batchLength+1])...)

// We need to zero out 'First' buffer for all hash codes present in
// HashBuffer, and there are two possible approaches that we choose from
Expand All @@ -612,7 +618,7 @@ func (ht *HashTable) ComputeHashAndBuildChains(batch coldata.Batch) {
ht.ProbeScratch.First[hash] = 0
}
} else {
for n := 0; n < len(ht.ProbeScratch.First); n += copy(ht.ProbeScratch.First[n:], colexecutils.ZeroUint64Column) {
for n := 0; n < len(ht.ProbeScratch.First); n += copy(ht.ProbeScratch.First[n:], uint64ToKeyID(colexecutils.ZeroUint64Column)) {
}
}

Expand All @@ -630,7 +636,7 @@ func (ht *HashTable) ComputeHashAndBuildChains(batch coldata.Batch) {
func (ht *HashTable) FindBuckets(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []uint64,
first, next []keyID,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
batchLength := batch.Length()
Expand Down Expand Up @@ -662,7 +668,7 @@ func (ht *HashTable) FindBuckets(
func (ht *HashTable) RemoveDuplicates(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []uint64,
first, next []keyID,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
ht.FindBuckets(batch, keyCols, first, next, duplicatesChecker)
Expand All @@ -676,7 +682,7 @@ func (ht *HashTable) RemoveDuplicates(
func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) {
numBuffered := uint64(ht.Vals.Length())
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:batch.Length()]...)
ht.BuildScratch.Next = append(ht.BuildScratch.Next, uint64ToKeyID(ht.ProbeScratch.HashBuffer[:batch.Length()])...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, numBuffered+1, uint64(batch.Length()))
if ht.shouldResize(ht.Vals.Length()) {
ht.buildFromBufferedTuples()
Expand All @@ -694,7 +700,7 @@ func (ht *HashTable) MaybeRepairAfterDistinctBuild() {
// accordingly.
numConsistentTuples := len(ht.BuildScratch.Next) - 1
lastBatchNumDistinctTuples := ht.Vals.Length() - numConsistentTuples
ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples]...)
ht.BuildScratch.Next = append(ht.BuildScratch.Next, uint64ToKeyID(ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples])...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, uint64(numConsistentTuples)+1, uint64(lastBatchNumDistinctTuples))
}
}
Expand Down Expand Up @@ -757,7 +763,7 @@ func (ht *HashTable) ComputeBuckets(buckets []uint64, keys []coldata.Vec, nKeys
}

// buildNextChains builds the hash map from the computed hash values.
func (ht *HashTable) buildNextChains(first, next []uint64, offset, batchSize uint64) {
func (ht *HashTable) buildNextChains(first, next []keyID, offset, batchSize uint64) {
// The loop direction here is reversed to ensure that when we are building the
// next chain for the probe table, the keyID in each equality chain inside
// `next` is strictly in ascending order. This is crucial to ensure that when
Expand All @@ -767,7 +773,7 @@ func (ht *HashTable) buildNextChains(first, next []uint64, offset, batchSize uin
// the build side since all tuple that buffered on build side are already
// distinct, therefore we can be sure that when we emit a tuple, there cannot
// potentially be other tuples with the same key.
for id := offset + batchSize - 1; id >= offset; id-- {
for id := keyID(offset + batchSize - 1); uint64(id) >= offset; id-- {
// keyID is stored into corresponding hash bucket at the front of the next
// chain.
hash := next[id]
Expand All @@ -791,15 +797,15 @@ func (ht *HashTable) buildNextChains(first, next []uint64, offset, batchSize uin
// Note that if the old ToCheckID or ToCheck slices have enough capacity, they
// are *not* zeroed out.
func (p *hashTableProbeBuffer) SetupLimitedSlices(length int, buildMode HashTableBuildMode) {
p.HeadID = colexecutils.MaybeAllocateLimitedUint64Array(p.HeadID, length)
p.HeadID = maybeAllocateLimitedKeyIDArray(p.HeadID, length)
p.differs = colexecutils.MaybeAllocateLimitedBoolArray(p.differs, length)
if buildMode == HashTableDistinctBuildMode {
p.distinct = colexecutils.MaybeAllocateLimitedBoolArray(p.distinct, length)
}
// Note that we don't use maybeAllocate* methods below because ToCheckID and
// ToCheck don't need to be zeroed out when reused.
if cap(p.ToCheckID) < length {
p.ToCheckID = make([]uint64, length)
p.ToCheckID = make([]keyID, length)
} else {
p.ToCheckID = p.ToCheckID[:length]
}
Expand All @@ -812,7 +818,7 @@ func (p *hashTableProbeBuffer) SetupLimitedSlices(length int, buildMode HashTabl

// FindNext determines the id of the next key inside the ToCheckID buckets for
// each equality column key in ToCheck.
func (ht *HashTable) FindNext(next []uint64, nToCheck uint64) {
func (ht *HashTable) FindNext(next []keyID, nToCheck uint64) {
for _, toCheck := range ht.ProbeScratch.ToCheck[:nToCheck] {
ht.ProbeScratch.ToCheckID[toCheck] = next[ht.ProbeScratch.ToCheckID[toCheck]]
}
Expand Down Expand Up @@ -840,7 +846,7 @@ func (ht *HashTable) CheckBuildForDistinct(
if ht.ProbeScratch.distinct[toCheck] {
ht.ProbeScratch.distinct[toCheck] = false
// Calculated using the convention: keyID = keys.indexOf(key) + 1.
ht.ProbeScratch.HeadID[toCheck] = toCheck + 1
ht.ProbeScratch.HeadID[toCheck] = keyID(toCheck + 1)
} else if ht.ProbeScratch.differs[toCheck] {
// Continue probing in this next chain for the probe key.
ht.ProbeScratch.differs[toCheck] = false
Expand Down Expand Up @@ -924,7 +930,7 @@ func (ht *HashTable) DistinctCheck(nToCheck uint64, probeSel []int) uint64 {
// allocation is needed, and at that time the allocator will update the memory
// account accordingly.
func (ht *HashTable) Reset(_ context.Context) {
for n := 0; n < len(ht.BuildScratch.First); n += copy(ht.BuildScratch.First[n:], colexecutils.ZeroUint64Column) {
for n := 0; n < len(ht.BuildScratch.First); n += copy(ht.BuildScratch.First[n:], uint64ToKeyID(colexecutils.ZeroUint64Column)) {
}
ht.Vals.ResetInternalBatch()
// ht.ProbeScratch.Next, ht.Same and ht.Visited are reset separately before
Expand All @@ -944,3 +950,25 @@ func (ht *HashTable) Reset(_ context.Context) {
ht.BuildScratch.Next = ht.BuildScratch.Next[:1]
}
}

// MaybeAllocateSame ensures that ht.Same has enough capacity for all tuples
// already present in the hash table plus one.
func (ht *HashTable) MaybeAllocateSame() {
ht.Same = maybeAllocateKeyIDArray(ht.Same, ht.Vals.Length()+1)
}

func maybeAllocateKeyIDArray(old []keyID, length int) []keyID {
return uint64ToKeyID(colexecutils.MaybeAllocateUint64Array(keyIDToUint64(old), length))
}

func maybeAllocateLimitedKeyIDArray(old []keyID, length int) []keyID {
return uint64ToKeyID(colexecutils.MaybeAllocateLimitedUint64Array(keyIDToUint64(old), length))
}

func keyIDToUint64(k []keyID) []uint64 {
return *(*[]uint64)(unsafe.Pointer(&k))
}

func uint64ToKeyID(u []uint64) []keyID {
return *(*[]keyID)(unsafe.Pointer(&u))
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexechash/hashtable_distinct.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexechash/hashtable_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func _CHECK_BODY(_SELECT_SAME_TUPLES bool, _DELETING_PROBE_MODE bool, _SELECT_DI
// probing tuple is distinct (i.e. it is unique in the batch),
// so we want to mark it as equal to itself only.
// */}}
ht.ProbeScratch.HeadID[toCheck] = toCheck + 1
ht.ProbeScratch.HeadID[toCheck] = keyID(toCheck + 1)
continue
}
// {{end}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (hj *hashJoiner) build() {
if !hj.spec.rightDistinct && !hj.spec.JoinType.IsLeftAntiOrExceptAll() {
// We don't need same with LEFT ANTI and EXCEPT ALL joins because
// they have separate collectLeftAnti method.
hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1)
hj.ht.MaybeAllocateSame()
}
if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() {
// visited slice is also used for set-operation joins, regardless of
Expand Down

0 comments on commit 955c995

Please sign in to comment.