-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathhashtable.go
674 lines (606 loc) · 27 KB
/
hashtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexec
import (
"context"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
// hashTableBuildMode represents different modes in which the hashTable can be
// built.
type hashTableBuildMode int
const (
// hashTableFullBuildMode is the mode where hashTable buffers all input
// tuples and populates first and next arrays for each hash bucket.
hashTableFullBuildMode hashTableBuildMode = iota
// hashTableDistinctBuildMode is the mode where hashTable only buffers
// distinct tuples and discards the duplicates.
hashTableDistinctBuildMode
)
// hashTableProbeMode represents different modes of probing the hashTable.
type hashTableProbeMode int
const (
// hashTableDefaultProbeMode is the default probing mode of the hashTable.
hashTableDefaultProbeMode hashTableProbeMode = iota
// hashTableDeletingProbeMode is the mode of probing the hashTable in which
// it "deletes" the tuples from itself once they are matched against
// probing tuples.
// For example, if we have a hashTable consisting of tuples {1, 1}, {1, 2},
// {2, 3}, and the probing tuples are {1, 4}, {1, 5}, {1, 6}, then we get
// the following when probing on the first column only:
// {1, 4} -> {1, 1} | hashTable = {1, 2}, {2, 3}
// {1, 5} -> {1, 2} | hashTable = {2, 3}
// {1, 6} -> no match | hashTable = {2, 3}
// Note that the output of such probing is not fully deterministic when
// tuples contain non-equality columns.
hashTableDeletingProbeMode
)
// hashTableBuildBuffer stores the information related to the build table.
type hashTableBuildBuffer struct {
// first stores the first keyID of the key that resides in each bucket.
// This keyID is used to determine the corresponding equality column key as
// well as output column values.
first []uint64
// next is a densely-packed list that stores the keyID of the next key in the
// hash table bucket chain, where an id of 0 is reserved to represent end of
// chain.
next []uint64
}
// hashTableProbeBuffer stores the information related to the probe table.
type hashTableProbeBuffer struct {
// first stores the first keyID of the key that resides in each bucket.
// This keyID is used to determine the corresponding equality column key as
// well as output column values.
first []uint64
// next is a densely-packed list that stores the keyID of the next key in the
// hash table bucket chain, where an id of 0 is reserved to represent end of
// chain.
next []uint64
// limitedSlicesAreAccountedFor indicates whether we have already
// accounted for the memory used by the slices below.
limitedSlicesAreAccountedFor bool
///////////////////////////////////////////////////////////////
// Slices below are allocated dynamically but are limited by //
// coldata.BatchSize() in size. //
///////////////////////////////////////////////////////////////
// headID stores the first build table keyID that matched with the probe batch
// key at any given index.
headID []uint64
// differs stores whether the key at any index differs with the build table
// key.
differs []bool
// distinct stores whether the key in the probe batch is distinct in the build
// table.
distinct []bool
// groupID stores the keyID that maps to the joining rows of the build table.
// The ith element of groupID stores the keyID of the build table that
// corresponds to the ith key in the probe table.
groupID []uint64
// toCheck stores the indices of the eqCol rows that have yet to be found or
// rejected.
toCheck []uint64
// hashBuffer stores the hash values of each tuple in the probe table. It will
// be dynamically updated when the hashTable is build in distinct mode.
hashBuffer []uint64
}
// hashTable is a structure used by the hash joiner to store the build table
// batches. Keys are stored according to the encoding of the equality column,
// which point to the corresponding output keyID. The keyID is calculated
// using the below equation:
//
// keyID = keys.indexOf(key) + 1
//
// and inversely:
//
// keys[keyID - 1] = key
//
// The table can then be probed in column batches to find at most one matching
// row per column batch row.
type hashTable struct {
allocator *colmem.Allocator
// unlimitedSlicesNumUint64AccountedFor stores the number of uint64 from
// the unlimited slices that we have already accounted for.
unlimitedSlicesNumUint64AccountedFor int64
// buildScratch contains the scratch buffers required for the build table.
buildScratch hashTableBuildBuffer
// probeScratch contains the scratch buffers required for the probe table.
probeScratch hashTableProbeBuffer
// keys is a scratch space that stores the equality columns (either of
// probe or build table, depending on the phase of algorithm).
keys []coldata.Vec
// same and visited are only used when the hashTable contains non-distinct
// keys (in hashTableFullBuildMode mode).
//
// same is a densely-packed list that stores the keyID of the next key in the
// hash table that has the same value as the current key. The headID of the key
// is the first key of that value found in the next linked list. This field
// will be lazily populated by the prober.
same []uint64
// visited represents whether each of the corresponding keys have been touched
// by the prober.
visited []bool
// vals stores columns of the build source that are specified in
// colsToStore in the constructor. The ID of a tuple at any index of vals
// is index + 1.
vals *appendOnlyBufferedBatch
// keyCols stores the indices of vals which are key columns.
keyCols []uint32
// numBuckets returns the number of buckets the hashTable employs. This is
// equivalent to the size of first.
numBuckets uint64
// loadFactor determines the average number of tuples per bucket exceeding
// of which will trigger resizing the hash table.
loadFactor float64
// allowNullEquality determines if NULL keys should be treated as equal to
// each other.
allowNullEquality bool
overloadHelper overloadHelper
datumAlloc rowenc.DatumAlloc
cancelChecker CancelChecker
buildMode hashTableBuildMode
probeMode hashTableProbeMode
}
var _ resetter = &hashTable{}
// newHashTable returns a new hashTable.
// - loadFactor determines the average number of tuples per bucket which, if
// exceeded, will trigger resizing the hash table. This number can have a
// noticeable effect on the performance, so every user of the hash table should
// choose the number that works well for the corresponding use case. 1.0 could
// be used as the initial default value, and most likely the best value will be
// in [0.1, 10.0] range.
// - colsToStore indicates the positions of columns to actually store in this
// batch. All columns are stored if colsToStore is nil, but when it is non-nil,
// then columns with positions not present in colsToStore will remain
// zero-capacity vectors in vals.
func newHashTable(
allocator *colmem.Allocator,
loadFactor float64,
sourceTypes []*types.T,
eqCols []uint32,
colsToStore []int,
allowNullEquality bool,
buildMode hashTableBuildMode,
probeMode hashTableProbeMode,
) *hashTable {
if !allowNullEquality && probeMode == hashTableDeletingProbeMode {
// At the moment, we don't have a use case for such behavior, so let's
// assert that it is not requested.
colexecerror.InternalError(errors.AssertionFailedf("hashTableDeletingProbeMode is supported only when null equality is allowed"))
}
// This number was chosen after running benchmarks of all users of the hash
// table (hash joiner, hash aggregator, unordered distinct). The reasoning
// for why using coldata.BatchSize() as the initial number of buckets makes
// sense:
// - on one hand, we make several other allocations that have to be at
// least coldata.BatchSize() in size, so we don't win much in the case of
// the input with small number of tuples;
// - on the other hand, if we start out with a larger number, we won't be
// using the vast of majority of the buckets on the input with small number
// of tuples (a downside) while not gaining much in the case of the input
// with large number of tuples.
// TODO(yuzefovich): the comment above is no longer true because all
// limited slices in hashTableProbeBuffer are now allocated dynamically, so
// we might need to re-tune this number.
initialNumHashBuckets := uint64(coldata.BatchSize())
// Note that we don't perform memory accounting of the internal memory here
// and delay it till buildFromBufferedTuples in order to appease *-disk
// logic test configs (our disk-spilling infrastructure doesn't know how to
// fallback to disk when a memory limit is hit in the constructor methods
// of the operators or in Init() implementations).
ht := &hashTable{
allocator: allocator,
buildScratch: hashTableBuildBuffer{
first: make([]uint64, initialNumHashBuckets),
},
keys: make([]coldata.Vec, len(eqCols)),
vals: newAppendOnlyBufferedBatch(allocator, sourceTypes, colsToStore),
keyCols: eqCols,
numBuckets: initialNumHashBuckets,
loadFactor: loadFactor,
allowNullEquality: allowNullEquality,
buildMode: buildMode,
probeMode: probeMode,
}
if buildMode == hashTableDistinctBuildMode {
ht.probeScratch.first = make([]uint64, initialNumHashBuckets)
// ht.buildScratch.next will be populated dynamically by appending to
// it, but we need to make sure that the special keyID=0 (which
// indicates the end of the hash chain) is always present.
ht.buildScratch.next = []uint64{0}
}
return ht
}
// hashTableInitialToCheck is a slice that contains all consequent integers in
// [0, coldata.MaxBatchSize) range that can be used to initialize toCheck buffer
// for most of the join types.
var hashTableInitialToCheck []uint64
func init() {
hashTableInitialToCheck = make([]uint64, coldata.MaxBatchSize)
for i := range hashTableInitialToCheck {
hashTableInitialToCheck[i] = uint64(i)
}
}
// shouldResize returns whether the hash table storing numTuples should be
// resized in order to not exceed the load factor given the current number of
// buckets.
func (ht *hashTable) shouldResize(numTuples int) bool {
return float64(numTuples)/float64(ht.numBuckets) > ht.loadFactor
}
const sizeOfUint64 = int64(unsafe.Sizeof(uint64(0)))
// accountForLimitedSlices checks whether we have already accounted for the
// memory used by the slices that are limited by coldata.BatchSize() in size
// and adjusts the allocator accordingly if we haven't.
func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocator) {
if p.limitedSlicesAreAccountedFor {
return
}
const sizeOfBool = int64(unsafe.Sizeof(true))
internalMemMaxUsed := sizeOfUint64*int64(5*coldata.BatchSize()) + sizeOfBool*int64(2*coldata.BatchSize())
allocator.AdjustMemoryUsage(internalMemMaxUsed)
p.limitedSlicesAreAccountedFor = true
}
// buildFromBufferedTuples builds the hash table from already buffered tuples
// in ht.vals. It'll determine the appropriate number of buckets that satisfy
// the target load factor.
func (ht *hashTable) buildFromBufferedTuples(ctx context.Context) {
for ht.shouldResize(ht.vals.Length()) {
ht.numBuckets *= 2
}
ht.buildScratch.first = maybeAllocateUint64Array(ht.buildScratch.first, int(ht.numBuckets))
if ht.probeScratch.first != nil {
ht.probeScratch.first = maybeAllocateUint64Array(ht.probeScratch.first, int(ht.numBuckets))
}
for i, keyCol := range ht.keyCols {
ht.keys[i] = ht.vals.ColVec(int(keyCol))
}
// ht.buildScratch.next is used to store the computed hash value of each key.
ht.buildScratch.next = maybeAllocateUint64Array(ht.buildScratch.next, ht.vals.Length()+1)
ht.computeBuckets(ctx, ht.buildScratch.next[1:], ht.keys, ht.vals.Length(), nil /* sel */)
ht.buildNextChains(ctx, ht.buildScratch.first, ht.buildScratch.next, 1, uint64(ht.vals.Length()))
// Account for memory used by the internal auxiliary slices that are
// limited in size.
ht.probeScratch.accountForLimitedSlices(ht.allocator)
// Note that if ht.probeScratch.first is nil, it'll have zero capacity.
newUint64Count := int64(cap(ht.buildScratch.first) + cap(ht.probeScratch.first) + cap(ht.buildScratch.next))
ht.allocator.AdjustMemoryUsage(sizeOfUint64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}
// build executes the entirety of the hash table build phase using the input
// as the build source. The input is entirely consumed in the process.
func (ht *hashTable) build(ctx context.Context, input colexecbase.Operator) {
switch ht.buildMode {
case hashTableFullBuildMode:
// We're using the hash table with the full build mode in which we will
// fully buffer all tuples from the input first and only then we'll
// build the hash table. Such approach allows us to compute the desired
// number of hash buckets for the target load factor (this is done in
// buildFromBufferedTuples()).
for {
batch := input.Next(ctx)
if batch.Length() == 0 {
break
}
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})
}
ht.buildFromBufferedTuples(ctx)
case hashTableDistinctBuildMode:
for {
batch := input.Next(ctx)
if batch.Length() == 0 {
break
}
ht.computeHashAndBuildChains(ctx, batch)
ht.removeDuplicates(batch, ht.keys, ht.probeScratch.first, ht.probeScratch.next, ht.checkProbeForDistinct)
// We only check duplicates when there is at least one buffered
// tuple.
if ht.vals.Length() > 0 {
ht.removeDuplicates(batch, ht.keys, ht.buildScratch.first, ht.buildScratch.next, ht.checkBuildForDistinct)
}
if batch.Length() > 0 {
ht.appendAllDistinct(ctx, batch)
}
}
default:
colexecerror.InternalError(errors.AssertionFailedf("hashTable in unhandled state"))
}
}
// computeHashAndBuildChains computes the hash codes of the tuples in batch and
// then builds 'next' chains between those tuples. The goal is to separate all
// tuples in batch into singly linked lists containing only tuples with the
// same hash code. Those 'next' chains are stored in ht.probeScratch.next.
func (ht *hashTable) computeHashAndBuildChains(ctx context.Context, batch coldata.Batch) {
srcVecs := batch.ColVecs()
for i, keyCol := range ht.keyCols {
ht.keys[i] = srcVecs[keyCol]
}
batchLength := batch.Length()
if cap(ht.probeScratch.next) < batchLength+1 {
ht.probeScratch.next = make([]uint64, batchLength+1)
}
ht.computeBuckets(ctx, ht.probeScratch.next[1:batchLength+1], ht.keys, batchLength, batch.Selection())
ht.probeScratch.hashBuffer = append(ht.probeScratch.hashBuffer[:0], ht.probeScratch.next[1:batchLength+1]...)
// We need to zero out 'first' buffer for all hash codes present in
// hashBuffer, and there are two possible approaches that we choose from
// based on a heuristic - we can either iterate over all hash codes and
// zero out only the relevant elements (beneficial when 'first' buffer is
// at least batchLength in size) or zero out the whole 'first' buffer
// (beneficial otherwise).
if batchLength < len(ht.probeScratch.first) {
for _, hash := range ht.probeScratch.hashBuffer[:batchLength] {
ht.probeScratch.first[hash] = 0
}
} else {
for n := 0; n < len(ht.probeScratch.first); n += copy(ht.probeScratch.first[n:], zeroUint64Column) {
}
}
ht.buildNextChains(ctx, ht.probeScratch.first, ht.probeScratch.next, 1 /* offset */, uint64(batchLength))
}
// findBuckets finds the buckets for all tuples in batch when probing against a
// hash table that is specified by 'first' and 'next' vectors as well as
// 'duplicatesChecker'. `duplicatesChecker` takes a slice of key columns of the
// batch, number of tuples to check, and the selection vector of the batch, and
// it returns number of tuples that needs to be checked for next iteration.
// The "buckets" are specified by equal values in ht.probeScratch.headID.
// NOTE: *first* and *next* vectors should be properly populated.
func (ht *hashTable) findBuckets(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []uint64,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
batchLength := batch.Length()
sel := batch.Selection()
ht.probeScratch.setupLimitedSlices(batchLength, ht.buildMode)
for i, hash := range ht.probeScratch.hashBuffer[:batchLength] {
ht.probeScratch.groupID[i] = first[hash]
}
copy(ht.probeScratch.toCheck, hashTableInitialToCheck[:batchLength])
for nToCheck := uint64(batchLength); nToCheck > 0; {
// Continue searching for the build table matching keys while the toCheck
// array is non-empty.
nToCheck = duplicatesChecker(keyCols, nToCheck, sel)
ht.findNext(next, nToCheck)
}
}
// removeDuplicates updates the selection vector of the batch to only include
// distinct tuples when probing against a hash table specified by 'first' and
// 'next' vectors as well as 'duplicatesChecker'.
// NOTE: *first* and *next* vectors should be properly populated.
func (ht *hashTable) removeDuplicates(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []uint64,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
ht.findBuckets(batch, keyCols, first, next, duplicatesChecker)
ht.updateSel(batch)
}
// appendAllDistinct appends all tuples from batch to the hash table. It
// assumes that all tuples are distinct and that ht.probeScratch.hashBuffer
// contains the hash codes for all of them.
func (ht *hashTable) appendAllDistinct(ctx context.Context, batch coldata.Batch) {
numBuffered := uint64(ht.vals.Length())
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})
ht.buildScratch.next = append(ht.buildScratch.next, ht.probeScratch.hashBuffer[:batch.Length()]...)
ht.buildNextChains(ctx, ht.buildScratch.first, ht.buildScratch.next, numBuffered+1, uint64(batch.Length()))
if ht.shouldResize(ht.vals.Length()) {
ht.buildFromBufferedTuples(ctx)
}
}
// checkCols performs a column by column checkCol on the key columns.
func (ht *hashTable) checkCols(probeVecs []coldata.Vec, nToCheck uint64, probeSel []int) {
switch ht.probeMode {
case hashTableDefaultProbeMode:
for i, keyCol := range ht.keyCols {
ht.checkCol(probeVecs[i], ht.vals.ColVec(int(keyCol)), i, nToCheck, probeSel)
}
case hashTableDeletingProbeMode:
for i, keyCol := range ht.keyCols {
ht.checkColDeleting(probeVecs[i], ht.vals.ColVec(int(keyCol)), i, nToCheck, probeSel)
}
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported hash table probe mode: %d", ht.probeMode))
}
}
// checkColsForDistinctTuples performs a column by column check to find distinct
// tuples in the probe table that are not present in the build table.
func (ht *hashTable) checkColsForDistinctTuples(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) {
buildVecs := ht.vals.ColVecs()
for i := range ht.keyCols {
probeVec := probeVecs[i]
buildVec := buildVecs[ht.keyCols[i]]
ht.checkColForDistinctTuples(probeVec, buildVec, nToCheck, probeSel)
}
}
// computeBuckets computes the hash value of each key and stores the result in
// buckets.
func (ht *hashTable) computeBuckets(
ctx context.Context, buckets []uint64, keys []coldata.Vec, nKeys int, sel []int,
) {
if nKeys == 0 {
// No work to do - avoid doing the loops below.
return
}
initHash(buckets, nKeys, defaultInitHashValue)
// Check if we received more tuples than the current allocation size and
// increase it if so (limiting it by coldata.BatchSize()).
if nKeys > ht.datumAlloc.AllocSize && ht.datumAlloc.AllocSize < coldata.BatchSize() {
ht.datumAlloc.AllocSize = nKeys
if ht.datumAlloc.AllocSize > coldata.BatchSize() {
ht.datumAlloc.AllocSize = coldata.BatchSize()
}
}
for i := range ht.keyCols {
rehash(ctx, buckets, keys[i], nKeys, sel, ht.cancelChecker, ht.overloadHelper, &ht.datumAlloc)
}
finalizeHash(buckets, nKeys, ht.numBuckets)
}
// buildNextChains builds the hash map from the computed hash values.
func (ht *hashTable) buildNextChains(
ctx context.Context, first, next []uint64, offset, batchSize uint64,
) {
// The loop direction here is reversed to ensure that when we are building the
// next chain for the probe table, the keyID in each equality chain inside
// `next` is strictly in ascending order. This is crucial to ensure that when
// built in distinct mode, hash table will emit the first distinct tuple it
// encounters. When the next chain is built for build side, this invariant no
// longer holds for the equality chains inside `next`. This is ok however for
// the build side since all tuple that buffered on build side are already
// distinct, therefore we can be sure that when we emit a tuple, there cannot
// potentially be other tuples with the same key.
for id := offset + batchSize - 1; id >= offset; id-- {
// keyID is stored into corresponding hash bucket at the front of the next
// chain.
hash := next[id]
firstKeyID := first[hash]
// This is to ensure that `first` always points to the tuple with smallest
// keyID in each equality chain. firstKeyID==0 means it is the first tuple
// that we have encountered with the given hash value.
if firstKeyID == 0 || id < firstKeyID {
next[id] = first[hash]
first[hash] = id
} else {
next[id] = next[firstKeyID]
next[firstKeyID] = id
}
}
ht.cancelChecker.checkEveryCall(ctx)
}
// setupLimitedSlices ensures that headID, differs, distinct, groupID, and
// toCheck are of the desired length and are setup for probing.
// Note that if the old groupID or toCheck slices have enough capacity, they
// are *not* zeroed out.
func (p *hashTableProbeBuffer) setupLimitedSlices(length int, buildMode hashTableBuildMode) {
p.headID = maybeAllocateLimitedUint64Array(p.headID, length)
p.differs = maybeAllocateLimitedBoolArray(p.differs, length)
if buildMode == hashTableDistinctBuildMode {
p.distinct = maybeAllocateLimitedBoolArray(p.distinct, length)
}
// Note that we don't use maybeAllocate* methods below because groupID and
// toCheck don't need to be zeroed out when reused.
if cap(p.groupID) < length {
p.groupID = make([]uint64, length)
} else {
p.groupID = p.groupID[:length]
}
if cap(p.toCheck) < length {
p.toCheck = make([]uint64, length)
} else {
p.toCheck = p.toCheck[:length]
}
}
// findNext determines the id of the next key inside the groupID buckets for
// each equality column key in toCheck.
func (ht *hashTable) findNext(next []uint64, nToCheck uint64) {
for _, toCheck := range ht.probeScratch.toCheck[:nToCheck] {
ht.probeScratch.groupID[toCheck] = next[ht.probeScratch.groupID[toCheck]]
}
}
// checkBuildForDistinct finds all tuples in probeVecs that are *not* present in
// buffered tuples stored in ht.vals. It stores the probeVecs's distinct tuples'
// keyIDs in headID buffer.
// NOTE: It assumes that probeVecs does not contain any duplicates itself.
// NOTE: It assumes that probeSel has already been populated and it is not nil.
func (ht *hashTable) checkBuildForDistinct(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) uint64 {
if probeSel == nil {
colexecerror.InternalError(errors.AssertionFailedf("invalid selection vector"))
}
ht.checkColsForDistinctTuples(probeVecs, nToCheck, probeSel)
nDiffers := uint64(0)
for _, toCheck := range ht.probeScratch.toCheck[:nToCheck] {
if ht.probeScratch.distinct[toCheck] {
ht.probeScratch.distinct[toCheck] = false
// Calculated using the convention: keyID = keys.indexOf(key) + 1.
ht.probeScratch.headID[toCheck] = toCheck + 1
} else if ht.probeScratch.differs[toCheck] {
// Continue probing in this next chain for the probe key.
ht.probeScratch.differs[toCheck] = false
ht.probeScratch.toCheck[nDiffers] = toCheck
nDiffers++
}
}
return nDiffers
}
// checkBuildForAggregation finds all tuples in probeVecs that *are* present in
// buffered tuples stored in ht.vals. For each present tuple at position i it
// stores keyID of its duplicate in headID[i], for all non-present tuples the
// corresponding headID value is left unchanged.
// NOTE: It assumes that probeVecs does not contain any duplicates itself.
// NOTE: It assumes that probeSel has already been populated and it is not nil.
func (ht *hashTable) checkBuildForAggregation(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) uint64 {
if probeSel == nil {
colexecerror.InternalError(errors.AssertionFailedf("invalid selection vector"))
}
ht.checkColsForDistinctTuples(probeVecs, nToCheck, probeSel)
nDiffers := uint64(0)
for _, toCheck := range ht.probeScratch.toCheck[:nToCheck] {
if !ht.probeScratch.distinct[toCheck] {
// If the tuple is distinct, it doesn't have a duplicate in the
// hash table already, so we skip it.
if ht.probeScratch.differs[toCheck] {
// We have a hash collision, so we need to continue probing
// against the next tuples in the hash chain.
ht.probeScratch.differs[toCheck] = false
ht.probeScratch.toCheck[nDiffers] = toCheck
nDiffers++
} else {
// This tuple has a duplicate in the hash table, so we remember
// keyID of that duplicate.
ht.probeScratch.headID[toCheck] = ht.probeScratch.groupID[toCheck]
}
}
}
return nDiffers
}
// reset resets the hashTable for reuse.
// NOTE: memory that already has been allocated for ht.vals is *not* released.
// However, resetting the length of ht.vals to zero doesn't confuse the
// allocator - it is smart enough to look at the capacities of the allocated
// vectors, and the capacities would stay the same until an actual new
// allocation is needed, and at that time the allocator will update the memory
// account accordingly.
func (ht *hashTable) reset(_ context.Context) {
for n := 0; n < len(ht.buildScratch.first); n += copy(ht.buildScratch.first[n:], zeroUint64Column) {
}
ht.vals.ResetInternalBatch()
ht.vals.SetLength(0)
// ht.probeScratch.next, ht.same and ht.visited are reset separately before
// they are used (these slices are not used in all of the code paths).
// ht.probeScratch.headID, ht.probeScratch.differs, and
// ht.probeScratch.distinct are reset before they are used (these slices
// are limited in size by dynamically allocated).
// ht.probeScratch.groupID and ht.probeScratch.toCheck don't need to be
// reset because they are populated manually every time before checking the
// columns.
if ht.buildMode == hashTableDistinctBuildMode {
// In "distinct" build mode, ht.buildScratch.next is populated
// iteratively, whenever we find tuples that we haven't seen before. In
// order to reuse the same underlying memory we need to slice up that
// slice (note that keyID=0 is reserved for the end of all hash chains,
// so we make the length 1).
ht.buildScratch.next = ht.buildScratch.next[:1]
}
}