-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
hashtable.go
974 lines (900 loc) · 41.9 KB
/
hashtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexechash
import (
"context"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
// HashTableBuildMode represents different modes in which the HashTable can be
// built.
type HashTableBuildMode int
const (
// HashTableFullBuildMode is the mode where HashTable buffers all input
// tuples and populates First and Next arrays for each hash bucket.
HashTableFullBuildMode HashTableBuildMode = iota
// HashTableDistinctBuildMode is the mode where HashTable only buffers
// distinct tuples and discards the duplicates. In this mode the hash table
// actually stores only the equality columns, so the columns with positions
// not present in keyCols will remain zero-capacity vectors in Vals.
HashTableDistinctBuildMode
)
// HashTableProbeMode represents different modes of probing the HashTable.
type HashTableProbeMode int
const (
// HashTableDefaultProbeMode is the default probing mode of the HashTable.
HashTableDefaultProbeMode HashTableProbeMode = iota
// HashTableDeletingProbeMode is the mode of probing the HashTable in which
// it "deletes" the tuples from itself once they are matched against
// probing tuples.
//
// For example, if we have a HashTable consisting of tuples {1, 1}, {1, 2},
// {2, 3}, and the probing tuples are {1, 4}, {1, 5}, {1, 6}, then we get
// the following when probing on the first column only:
// {1, 4} -> {1, 1} | HashTable = {1, 2}, {2, 3}
// {1, 5} -> {1, 2} | HashTable = {2, 3}
// {1, 6} -> no match | HashTable = {2, 3}
// Note that the output of such probing is not fully deterministic when
// tuples contain non-equality columns.
HashTableDeletingProbeMode
)
// keyID encodes the ordinal of the corresponding tuple.
//
// For each tuple with the ordinal 'i' (the ordinal among all tuples in the
// hash table or within a single probing batch), keyID is calculated as:
// keyID = i + 1.
//
// keyID of 0 is reserved to indicate the end of the hash chain.
type keyID uint64
// hashChains describes the partitioning of a set of tuples into singly-linked
// lists ("buckets") where all tuples in a list have the same hash.
//
// In order to iterate over the list corresponding to a particular hash value
// 'bucket', the keyID of the head of the list can be looked up using
// First[bucket], and the following keyIDs of the tuples in the list can be
// found using Next[keyID] traversal. Whenever keyID of 0 is encountered, the
// end of the list has been reached.
type hashChains struct {
// First stores the first keyID of the tuple that resides in each bucket.
// The tuple is the head of the corresponding hash chain.
//
// The length of this slice is equal to the number of buckets used in the
// hash table at the moment.
First []keyID
// Next is a densely-packed list that stores the keyID of the next tuple in
// the hash chain, where an ID of 0 is reserved to represent the end of the
// chain.
//
// The length of this slice is equal to the number of tuples stored in the
// hash table at the moment plus one (Next[0] is unused).
Next []keyID
}
// hashTableProbeBuffer stores the information related to the probing batch.
//
// From the high level, this struct facilitates checking whether tuples from the
// probing batch are equal to "candidate" tuples (those that have the same
// hash). In other words, hashTableProbeBuffer helps the hash table to determine
// whether there is a hash collision or an actual equality match.
//
// Given that this struct is tied to the probing batch, all slices are limited
// by the length of the batch (in some cases plus one) in size.
type hashTableProbeBuffer struct {
// When the hash table is used by the hash aggregator or the unordered
// distinct, an implicit hash table is built on the probing batch itself,
// and it is stored in this hashChains object. The goal is to reuse the
// facilities of the hash table to partition all tuples in the batch into
// equality buckets (i.e. into lists where all tuples are equal on the hash
// columns).
//
// Not used by the hash joiner.
hashChains
// limitedSlicesAreAccountedFor indicates whether we have already
// accounted for the memory used by the slices below.
limitedSlicesAreAccountedFor bool
///////////////////////////////////////////////////////////////
// Slices below are allocated dynamically but are limited by //
// coldata.BatchSize() in size. //
///////////////////////////////////////////////////////////////
// ToCheck stores the indices of tuples from the probing batch for which we
// still want to keep traversing the hash chain trying to find equality
// matches.
//
// For example, if the probing batch has 4 tuples, then initially ToCheck
// will be [0, 1, 2, 3]. Say, tuples with ordinals 0 and 3 are determined to
// definitely not have any matches (e.g. because the corresponding hash
// chains are empty) while tuples with ordinals 1 and 2 have hash
// collisions against the current "candidates", then ToCheck will be updated
// to [1, 2].
ToCheck []uint64
// ToCheckID stores the keyIDs of the current "candidate" matches for the
// tuples from the probing batch. Concretely, ToCheckID[i] is the keyID of
// the tuple in the hash table which we are currently comparing with the ith
// tuple of the probing batch. i is included in ToCheck. The result of the
// comparison is stored in 'differs' and/or 'distinct'.
//
// On the first iteration:
// ToCheckID[i] = First[hash[i]]
// (i.e. we're comparing the ith probing tuple against the head of the hash
// chain). For the next iteration of probing, new values of ToCheckID are
// calculated as
// ToCheckID[i] = Next[ToCheckID[i]].
// Whenever ToCheckID[i] becomes 0, there are no more matches for the ith
// probing tuple.
ToCheckID []keyID
// differs stores whether the probing tuple included in ToCheck differs
// from the corresponding "candidate" tuple specified in ToCheckID.
differs []bool
// distinct stores whether the probing tuple is distinct (i.e. it will
// differ from all possible "candidates"). Used only for the hash aggregator
// and the unordered distinct.
distinct []bool
// HeadID stores the keyID of the tuple that has an equality match with the
// tuple at any given index from the probing batch. Unlike First where we
// might have a hash collision, HeadID stores the actual equality matches.
//
// All three users of the hash table use HeadID differently.
//
// In the hash joiner, HeadID is used only when the hash table might contain
// duplicate values (this is not the case when the equality columns form a
// key). The hash table describes the partitioning of the build (right) side
// table, and the probing batch comes from the left side. During the probing
// phase, if a match is found for the probing tuple i, then HeadID[i] is set
// to the keyID of that match, then the HashTable.Same slice is used to
// traverse all equality matches for the given probing tuple.
//
// In the hash aggregator, then the hash table can describe the partitioning
// of the probing batch (via ProbeScratch) or of already found grouping
// buckets (via BuildScratch), depending on the phase of the algorithm. See
// an extensive comment on hashAggregator.onlineAgg for more details.
//
// The unordered distinct uses HeadID in the following manner:
// - in the first step of the probing phase (when duplicates are being
// removed from the probing batch), HeadID[i] is set to the first tuple that
// matches ith tuple. Since ith tuple always matches itself (and possibly
// others), HeadID values will never be zero in this step;
// - in the second step of the probing phase (once the batch only contains
// unique tuples), the non-zero HeadID value is set for a particular tuple
// once the tuple is determined to not have duplicates with any tuples
// already in the hash table.
// See a comment on DistinctBuild for an example.
HeadID []keyID
// HashBuffer stores the hash values of each tuple in the probing batch. It
// will be dynamically updated when the HashTable is built in distinct mode.
HashBuffer []uint64
}
// HashTable is a structure used by the hash joiner, the hash aggregator, and
// the unordered distinct in order to perform the equality checks and
// de-duplication.
//
// When used by the hash joiner, the whole build (right) side table is inserted
// into the hash table. Later, the left side table is read one batch at a time,
// and that batch is used to probe against the hash table to find matches. On a
// single probing iteration at most one match is found for each tuple in the
// probing batch. In order to find all matches, for each probing tuple the
// corresponding hash chain will be fully traversed. For more details see the
// comment on colexecjoin.hashJoiner.
//
// When used by the hash aggregator and the unordered distinct, the hash table
// is built in an incremental fashion. One batch is read from the input, then
// using the hash table's facilities the batch is divided into "equality"
// buckets. Next, the de-duplicated batch is probed against the hash table to
// find possible matches for each tuple. For more details see the comments on
// hashAggregator.onlineAgg and DistinctBuild.
type HashTable struct {
allocator *colmem.Allocator
// unlimitedSlicesNumUint64AccountedFor stores the number of uint64 from
// the unlimited slices that we have already accounted for.
unlimitedSlicesNumUint64AccountedFor int64
// BuildScratch contains the hash chains among tuples of the hash table
// (those stored in Vals).
BuildScratch hashChains
// ProbeScratch contains the scratch buffers that provide the facilities of
// the hash table (like equality checks) for the probing batch.
ProbeScratch hashTableProbeBuffer
// Keys is a scratch space that stores the equality columns of the tuples
// currently being probed.
Keys []coldata.Vec
// Same and Visited are only used when the HashTable contains non-distinct
// keys (in HashTableFullBuildMode mode).
//
// Same is a densely-packed list that stores the keyID of the next key in
// the hash table that has the same value as the current key. The HeadID of
// the key is the first key of that value found in the next linked list.
// This field will be lazily populated by the prober.
Same []keyID
// Visited represents whether each of the corresponding keys have been
// touched by the prober.
Visited []bool
// Vals stores columns of the build source that are specified in colsToStore
// in the constructor. The ID of a tuple at any index of Vals is index + 1.
Vals *colexecutils.AppendOnlyBufferedBatch
// keyCols stores the indices of Vals which are used for equality
// comparison.
keyCols []uint32
// numBuckets returns the number of buckets the HashTable employs at the
// moment. This number increases as more tuples are added into the hash
// table.
numBuckets uint64
// loadFactor determines the average number of tuples per bucket exceeding
// of which will trigger resizing the hash table.
loadFactor float64
// allowNullEquality determines if NULL keys should be treated as equal to
// each other.
allowNullEquality bool
datumAlloc tree.DatumAlloc
cancelChecker colexecutils.CancelChecker
BuildMode HashTableBuildMode
probeMode HashTableProbeMode
}
var _ colexecop.Resetter = &HashTable{}
// NewHashTable returns a new HashTable.
//
// - loadFactor determines the average number of tuples per bucket which, if
// exceeded, will trigger resizing the hash table. This number can have a
// noticeable effect on the performance, so every user of the hash table should
// choose the number that works well for the corresponding use case. 1.0 could
// be used as the initial default value, and most likely the best value will be
// in [0.1, 10.0] range.
//
// - initialNumHashBuckets determines the number of buckets allocated initially.
// When the current load factor of the hash table exceeds the loadFactor, the
// hash table is resized by doubling the number of buckets. The user of the hash
// table should choose this number based on the amount of its other allocations,
// but it is likely should be in [8, coldata.BatchSize()] range.
// The thinking process for choosing coldata.BatchSize() could be roughly as
// follows:
// - on one hand, if we make several other allocations that have to be at
// least coldata.BatchSize() in size, then we don't win much in the case of
// the input with small number of tuples;
// - on the other hand, if we start out with a larger number, we won't be
// using the vast of majority of the buckets on the input with small number
// of tuples (a downside) while not gaining much in the case of the input
// with large number of tuples.
func NewHashTable(
ctx context.Context,
allocator *colmem.Allocator,
loadFactor float64,
initialNumHashBuckets uint64,
sourceTypes []*types.T,
keyCols []uint32,
allowNullEquality bool,
buildMode HashTableBuildMode,
probeMode HashTableProbeMode,
) *HashTable {
if !allowNullEquality && probeMode == HashTableDeletingProbeMode {
// At the moment, we don't have a use case for such behavior, so let's
// assert that it is not requested.
colexecerror.InternalError(errors.AssertionFailedf("HashTableDeletingProbeMode is supported only when null equality is allowed"))
}
// Note that we don't perform memory accounting of the internal memory here
// and delay it till buildFromBufferedTuples in order to appease *-disk
// logic test configs (our disk-spilling infrastructure doesn't know how to
// fallback to disk when a memory limit is hit in the constructor methods
// of the operators or in Init() implementations).
// colsToStore indicates the positions of columns to actually store in the
// hash table depending on the build mode:
// - all columns are stored in the full build mode
// - only columns with indices in keyCols are stored in the distinct build
// mode (columns with other indices will remain zero-capacity vectors in
// Vals).
var colsToStore []int
switch buildMode {
case HashTableFullBuildMode:
colsToStore = make([]int, len(sourceTypes))
for i := range colsToStore {
colsToStore[i] = i
}
case HashTableDistinctBuildMode:
colsToStore = make([]int, len(keyCols))
for i := range colsToStore {
colsToStore[i] = int(keyCols[i])
}
default:
colexecerror.InternalError(errors.AssertionFailedf("unknown HashTableBuildMode %d", buildMode))
}
ht := &HashTable{
allocator: allocator,
BuildScratch: hashChains{
First: make([]keyID, initialNumHashBuckets),
},
Keys: make([]coldata.Vec, len(keyCols)),
Vals: colexecutils.NewAppendOnlyBufferedBatch(allocator, sourceTypes, colsToStore),
keyCols: keyCols,
numBuckets: initialNumHashBuckets,
loadFactor: loadFactor,
allowNullEquality: allowNullEquality,
BuildMode: buildMode,
probeMode: probeMode,
}
if buildMode == HashTableDistinctBuildMode {
ht.ProbeScratch.First = make([]keyID, initialNumHashBuckets)
// ht.BuildScratch.Next will be populated dynamically by appending to
// it, but we need to make sure that the special keyID=0 (which
// indicates the end of the hash chain) is always present.
ht.BuildScratch.Next = []keyID{0}
}
ht.cancelChecker.Init(ctx)
return ht
}
// HashTableInitialToCheck is a slice that contains all consequent integers in
// [0, coldata.MaxBatchSize) range that can be used to initialize ToCheck buffer
// for most of the join types.
var HashTableInitialToCheck []uint64
func init() {
HashTableInitialToCheck = make([]uint64, coldata.MaxBatchSize)
for i := range HashTableInitialToCheck {
HashTableInitialToCheck[i] = uint64(i)
}
}
// shouldResize returns whether the hash table storing numTuples should be
// resized in order to not exceed the load factor given the current number of
// buckets.
func (ht *HashTable) shouldResize(numTuples int) bool {
return float64(numTuples)/float64(ht.numBuckets) > ht.loadFactor
}
// accountForLimitedSlices checks whether we have already accounted for the
// memory used by the slices that are limited by coldata.BatchSize() in size
// and adjusts the allocator accordingly if we haven't.
func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocator) {
if p.limitedSlicesAreAccountedFor {
return
}
internalMemMaxUsed := memsize.Int64*int64(5*coldata.BatchSize()) + memsize.Bool*int64(2*coldata.BatchSize())
allocator.AdjustMemoryUsage(internalMemMaxUsed)
p.limitedSlicesAreAccountedFor = true
}
// buildFromBufferedTuples builds the hash table from already buffered tuples
// in ht.Vals. It'll determine the appropriate number of buckets that satisfy
// the target load factor.
func (ht *HashTable) buildFromBufferedTuples() {
for ht.shouldResize(ht.Vals.Length()) {
ht.numBuckets *= 2
}
ht.BuildScratch.First = maybeAllocateKeyIDArray(ht.BuildScratch.First, int(ht.numBuckets))
if ht.ProbeScratch.First != nil {
ht.ProbeScratch.First = maybeAllocateKeyIDArray(ht.ProbeScratch.First, int(ht.numBuckets))
}
for i, keyCol := range ht.keyCols {
ht.Keys[i] = ht.Vals.ColVec(int(keyCol))
}
// ht.BuildScratch.Next is used to store the computed hash value of each key.
ht.BuildScratch.Next = maybeAllocateKeyIDArray(ht.BuildScratch.Next, ht.Vals.Length()+1)
ht.ComputeBuckets(keyIDToUint64(ht.BuildScratch.Next[1:]), ht.Keys, ht.Vals.Length(), nil /* sel */)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, 1 /* offset */, uint64(ht.Vals.Length()))
// Account for memory used by the internal auxiliary slices that are
// limited in size.
ht.ProbeScratch.accountForLimitedSlices(ht.allocator)
// Note that if ht.ProbeScratch.First is nil, it'll have zero capacity.
newUint64Count := int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next))
ht.allocator.AdjustMemoryUsage(memsize.Int64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor))
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}
// FullBuild executes the entirety of the hash table build phase using the input
// as the build source. The input is entirely consumed in the process. Note that
// the hash table is assumed to operate in HashTableFullBuildMode.
func (ht *HashTable) FullBuild(input colexecop.Operator) {
if ht.BuildMode != HashTableFullBuildMode {
colexecerror.InternalError(errors.AssertionFailedf(
"HashTable.FullBuild is called in unexpected build mode %d", ht.BuildMode,
))
}
// We're using the hash table with the full build mode in which we will
// fully buffer all tuples from the input first and only then we'll build
// the hash table. Such approach allows us to compute the desired number of
// hash buckets for the target load factor (this is done in
// buildFromBufferedTuples()).
for {
batch := input.Next()
if batch.Length() == 0 {
break
}
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
}
ht.buildFromBufferedTuples()
}
// DistinctBuild appends all distinct tuples from batch to the hash table. Note
// that the hash table is assumed to operate in HashTableDistinctBuildMode.
// batch is updated to include only the distinct tuples.
//
// This is achieved by first removing duplicates within the batch itself and
// then probing the de-duplicated batch against the hash table (which contains
// already emitted distinct tuples). Duplicates of the emitted tuples are also
// removed from the batch, and if there are any tuples left, all of them are
// distinct and, thus, are appended into the hash table and outputted.
//
// Let's go through an example of how this function works: our input stream
// contains the following tuples:
// {-6}, {-6}, {-7}, {-5}, {-8}, {-5}, {-5}, {-8}.
// (Note that negative values are chosen in order to visually distinguish them
// from the IDs that we'll be working with below.)
// We will use coldata.BatchSize() == 4 and let's assume that we will use a
// hash function
// h(-5) = 1, h(-6) = 1, h(-7) = 0, h(-8) = 0
// with two buckets in the hash table.
//
// I. we get a batch [-6, -6, -7, -5].
// 1. ComputeHashAndBuildChains:
// a) compute hash buckets:
// HashBuffer = [1, 1, 0, 1]
// ProbeScratch.Next = [reserved, 1, 1, 0, 1]
// b) build 'Next' chains between hash buckets:
// ProbeScratch.First = [3, 1] (length of First == # of hash buckets)
// ProbeScratch.Next = [reserved, 2, 4, 0, 0]
// (Note that we have a hash collision in the bucket with hash 1.)
// 2. RemoveDuplicates within the batch:
// 1) first iteration in FindBuckets:
// a) all 4 tuples included to be checked against heads of their hash
// chains:
// ToCheck = [0, 1, 2, 3]
// ToCheckID = [1, 1, 3, 1]
// b) after performing the equality check using CheckProbeForDistinct,
// tuples 0, 1, 2 are found to be equal to the heads of their hash
// chains while tuple 3 (-5) has a hash collision with tuple 0 (-6),
// so it is kept for another iteration:
// ToCheck = [3]
// ToCheckID = [x, x, x, 2]
// HeadID = [1, 1, 3, x]
// 2) second iteration in FindBuckets finds that tuple 3 (-5) again has a
// hash collision with tuple 1 (-6), so it is kept for another
// iteration:
// ToCheck = [3]
// ToCheckID = [x, x, x, 4]
// HeadID = [1, 1, 3, x]
// 3) third iteration finds a match for tuple (the tuple itself), no more
// tuples to check, so the iterations stop:
// ToCheck = []
// HeadID = [1, 1, 3, 4]
// 4) the duplicates are represented by having the same HeadID values, and
// all duplicates are removed in updateSel:
// batch = [-6, -6, -7, -5]
// length = 3, sel = [0, 2, 3]
// Notably, HashBuffer is compacted accordingly:
// HashBuffer = [1, 0, 1]
// 3. The hash table is empty, so RemoveDuplicates against the hash table is
// skipped.
// 4. All 3 tuples still present in the batch are distinct, they are appended
// to the hash table and will be emitted to the output:
// Vals = [-6, -7, -5]
// BuildScratch.First = [2, 1]
// BuildScratch.Next = [reserved, 3, 0, 0]
// We have fully processed the first batch.
//
// II. we get a batch [-8, -5, -5, -8].
// 1. ComputeHashAndBuildChains:
// a) compute hash buckets:
// HashBuffer = [0, 1, 1, 0]
// ProbeScratch.Next = [reserved, 0, 1, 1, 0]
// b) build 'Next' chains between hash buckets:
// ProbeScratch.First = [1, 2]
// ProbeScratch.Next = [reserved, 4, 3, 0, 0]
// 2. RemoveDuplicates within the batch:
// 1) first iteration in FindBuckets:
// a) all 4 tuples included to be checked against heads of their hash
// chains:
// ToCheck = [0, 1, 2, 3]
// ToCheckID = [1, 2, 2, 1]
// b) after performing the equality check using CheckProbeForDistinct,
// all tuples are found to be equal to the heads of their hash
// chains, no more tuples to check, so the iterations stop:
// ToCheck = []
// HeadID = [1, 2, 2, 1]
// 2) the duplicates are represented by having the same HeadID values, and
// all duplicates are removed in updateSel:
// batch = [-8, -5, -5, -8]
// length = 2, sel = [0, 1]
// Notably, HashBuffer is compacted accordingly:
// HashBuffer = [0, 1]
// 3. RemoveDuplicates against the hash table:
// 1) first iteration in FindBuckets:
// a) both tuples included to be checked against heads of their hash
// chains of the hash table (meaning BuildScratch.First and
// BuildScratch.Next are used to populate ToCheckID values):
// ToCheck = [0, 1]
// ToCheckID = [2, 1]
// b) after performing the equality check using CheckBuildForDistinct,
// both tuples are found to have hash collisions (-8 with -7 and -5
// with -6), so both are kept for another iteration:
// ToCheck = [0, 1]
// ToCheckID = [0, 2]
// 2) second iteration in FindBuckets finds that tuple 1 (-5) has a match
// whereas tuple 0 (-8) is distinct (because its ToCheckID is 0), no
// more tuples to check:
// ToCheck = []
// HeadID = [1, 0]
// 3) duplicates are represented by having HeadID value of 0, so the batch
// is updated to only include tuple -8:
// batch = [-8, -5, -5, -8]
// length = 1, sel = [0]
// HashBuffer = [0]
// 4. The single tuple still present in the batch is distinct, it is appended
// to the hash table and will be emitted to the output:
// Vals = [-6, -7, -5, -8]
// BuildScratch.First = [2, 1]
// BuildScratch.Next = [reserved, 3, 4, 0, 0]
// We have fully processed the second batch and the input as a whole.
//
// NOTE: b *must* be a non-zero length batch.
func (ht *HashTable) DistinctBuild(batch coldata.Batch) {
if ht.BuildMode != HashTableDistinctBuildMode {
colexecerror.InternalError(errors.AssertionFailedf(
"HashTable.DistinctBuild is called in unexpected build mode %d", ht.BuildMode,
))
}
ht.ComputeHashAndBuildChains(batch)
ht.RemoveDuplicates(batch, ht.Keys, ht.ProbeScratch.First, ht.ProbeScratch.Next, ht.CheckProbeForDistinct)
// We only check duplicates when there is at least one buffered tuple.
if ht.Vals.Length() > 0 {
ht.RemoveDuplicates(batch, ht.Keys, ht.BuildScratch.First, ht.BuildScratch.Next, ht.CheckBuildForDistinct)
}
if batch.Length() > 0 {
ht.AppendAllDistinct(batch)
}
}
// ComputeHashAndBuildChains computes the hash codes of the tuples in batch and
// then builds 'First' chains between those tuples. The goal is to separate all
// tuples in batch into singly linked lists containing only tuples with the
// same hash code. Those 'Next' chains are stored in ht.ProbeScratch.Next.
func (ht *HashTable) ComputeHashAndBuildChains(batch coldata.Batch) {
srcVecs := batch.ColVecs()
for i, keyCol := range ht.keyCols {
ht.Keys[i] = srcVecs[keyCol]
}
batchLength := batch.Length()
if cap(ht.ProbeScratch.Next) < batchLength+1 {
ht.ProbeScratch.Next = make([]keyID, batchLength+1)
}
ht.ComputeBuckets(keyIDToUint64(ht.ProbeScratch.Next[1:batchLength+1]), ht.Keys, batchLength, batch.Selection())
ht.ProbeScratch.HashBuffer = append(ht.ProbeScratch.HashBuffer[:0], keyIDToUint64(ht.ProbeScratch.Next[1:batchLength+1])...)
// We need to zero out 'First' buffer for all hash codes present in
// HashBuffer, and there are two possible approaches that we choose from
// based on a heuristic - we can either iterate over all hash codes and
// zero out only the relevant elements (beneficial when 'First' buffer is
// at least batchLength in size) or zero out the whole 'First' buffer
// (beneficial otherwise).
if batchLength < len(ht.ProbeScratch.First) {
for _, hash := range ht.ProbeScratch.HashBuffer[:batchLength] {
ht.ProbeScratch.First[hash] = 0
}
} else {
for n := 0; n < len(ht.ProbeScratch.First); n += copy(ht.ProbeScratch.First[n:], uint64ToKeyID(colexecutils.ZeroUint64Column)) {
}
}
ht.buildNextChains(ht.ProbeScratch.First, ht.ProbeScratch.Next, 1 /* offset */, uint64(batchLength))
}
// FindBuckets finds the buckets for all tuples in batch when probing against a
// hash table that is specified by 'first' and 'next' vectors as well as
// 'duplicatesChecker'. `duplicatesChecker` takes a slice of key columns of the
// batch, number of tuples to check, and the selection vector of the batch, and
// it returns number of tuples that needs to be checked for next iteration.
// The "buckets" are specified by equal values in ht.ProbeScratch.HeadID.
// NOTE: *first* and *next* vectors should be properly populated.
// NOTE: batch is assumed to be non-zero length.
func (ht *HashTable) FindBuckets(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []keyID,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
batchLength := batch.Length()
sel := batch.Selection()
ht.ProbeScratch.SetupLimitedSlices(batchLength, ht.BuildMode)
// Early bounds checks.
toCheckIDs := ht.ProbeScratch.ToCheckID
_ = toCheckIDs[batchLength-1]
for i, hash := range ht.ProbeScratch.HashBuffer[:batchLength] {
f := first[hash]
//gcassert:bce
toCheckIDs[i] = f
}
copy(ht.ProbeScratch.ToCheck, HashTableInitialToCheck[:batchLength])
for nToCheck := uint64(batchLength); nToCheck > 0; {
// Continue searching for the build table matching keys while the ToCheck
// array is non-empty.
nToCheck = duplicatesChecker(keyCols, nToCheck, sel)
ht.FindNext(next, nToCheck)
}
}
// RemoveDuplicates updates the selection vector of the batch to only include
// distinct tuples when probing against a hash table specified by 'first' and
// 'next' vectors as well as 'duplicatesChecker'.
// NOTE: *first* and *next* vectors should be properly populated.
func (ht *HashTable) RemoveDuplicates(
batch coldata.Batch,
keyCols []coldata.Vec,
first, next []keyID,
duplicatesChecker func([]coldata.Vec, uint64, []int) uint64,
) {
ht.FindBuckets(batch, keyCols, first, next, duplicatesChecker)
ht.updateSel(batch)
}
// AppendAllDistinct appends all tuples from batch to the hash table. It
// assumes that all tuples are distinct and that ht.ProbeScratch.HashBuffer
// contains the hash codes for all of them.
// NOTE: batch must be of non-zero length.
func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) {
numBuffered := uint64(ht.Vals.Length())
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
ht.BuildScratch.Next = append(ht.BuildScratch.Next, uint64ToKeyID(ht.ProbeScratch.HashBuffer[:batch.Length()])...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, numBuffered+1, uint64(batch.Length()))
if ht.shouldResize(ht.Vals.Length()) {
ht.buildFromBufferedTuples()
}
}
// MaybeRepairAfterDistinctBuild checks whether the hash table built via
// DistinctBuild is in an inconsistent state and repairs it if so.
func (ht *HashTable) MaybeRepairAfterDistinctBuild() {
// BuildScratch.Next has an extra 0th element not used by the tuples
// reserved for the end of the chain.
if len(ht.BuildScratch.Next) < ht.Vals.Length()+1 {
// The hash table in such a state that some distinct tuples were
// appended to ht.Vals, but 'Next' and 'First' slices were not updated
// accordingly.
numConsistentTuples := len(ht.BuildScratch.Next) - 1
lastBatchNumDistinctTuples := ht.Vals.Length() - numConsistentTuples
ht.BuildScratch.Next = append(ht.BuildScratch.Next, uint64ToKeyID(ht.ProbeScratch.HashBuffer[:lastBatchNumDistinctTuples])...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, uint64(numConsistentTuples)+1, uint64(lastBatchNumDistinctTuples))
}
}
// checkCols performs a column by column checkCol on the key columns.
func (ht *HashTable) checkCols(probeVecs []coldata.Vec, nToCheck uint64, probeSel []int) {
switch ht.probeMode {
case HashTableDefaultProbeMode:
for i, keyCol := range ht.keyCols {
ht.checkCol(probeVecs[i], ht.Vals.ColVec(int(keyCol)), i, nToCheck, probeSel)
}
case HashTableDeletingProbeMode:
for i, keyCol := range ht.keyCols {
ht.checkColDeleting(probeVecs[i], ht.Vals.ColVec(int(keyCol)), i, nToCheck, probeSel)
}
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported hash table probe mode: %d", ht.probeMode))
}
}
// checkColsForDistinctTuples performs a column by column check to find distinct
// tuples in the probe table that are not present in the build table.
// NOTE: It assumes that probeSel has already been populated and it is not nil.
func (ht *HashTable) checkColsForDistinctTuples(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) {
buildVecs := ht.Vals.ColVecs()
for i := range ht.keyCols {
probeVec := probeVecs[i]
buildVec := buildVecs[ht.keyCols[i]]
ht.checkColForDistinctTuples(probeVec, buildVec, nToCheck, probeSel)
}
}
// ComputeBuckets computes the hash value of each key and stores the result in
// buckets.
func (ht *HashTable) ComputeBuckets(buckets []uint64, keys []coldata.Vec, nKeys int, sel []int) {
if nKeys == 0 {
// No work to do - avoid doing the loops below.
return
}
initHash(buckets, nKeys, DefaultInitHashValue)
// Check if we received more tuples than the current allocation size and
// increase it if so (limiting it by coldata.BatchSize()).
if nKeys > ht.datumAlloc.AllocSize && ht.datumAlloc.AllocSize < coldata.BatchSize() {
ht.datumAlloc.AllocSize = nKeys
if ht.datumAlloc.AllocSize > coldata.BatchSize() {
ht.datumAlloc.AllocSize = coldata.BatchSize()
}
}
for i := range ht.keyCols {
rehash(buckets, keys[i], nKeys, sel, ht.cancelChecker, &ht.datumAlloc)
}
finalizeHash(buckets, nKeys, ht.numBuckets)
}
// buildNextChains builds the hash map from the computed hash values.
func (ht *HashTable) buildNextChains(first, next []keyID, offset, batchSize uint64) {
// The loop direction here is reversed to ensure that when we are building the
// next chain for the probe table, the keyID in each equality chain inside
// `next` is strictly in ascending order. This is crucial to ensure that when
// built in distinct mode, hash table will emit the first distinct tuple it
// encounters. When the next chain is built for build side, this invariant no
// longer holds for the equality chains inside `next`. This is ok however for
// the build side since all tuple that buffered on build side are already
// distinct, therefore we can be sure that when we emit a tuple, there cannot
// potentially be other tuples with the same key.
for id := keyID(offset + batchSize - 1); uint64(id) >= offset; id-- {
// keyID is stored into corresponding hash bucket at the front of the next
// chain.
hash := next[id]
firstKeyID := first[hash]
// This is to ensure that `first` always points to the tuple with smallest
// keyID in each equality chain. firstKeyID==0 means it is the first tuple
// that we have encountered with the given hash value.
if firstKeyID == 0 || id < firstKeyID {
next[id] = first[hash]
first[hash] = id
} else {
next[id] = next[firstKeyID]
next[firstKeyID] = id
}
}
ht.cancelChecker.CheckEveryCall()
}
// SetupLimitedSlices ensures that HeadID, differs, distinct, ToCheckID, and
// ToCheck are of the desired length and are setup for probing.
// Note that if the old ToCheckID or ToCheck slices have enough capacity, they
// are *not* zeroed out.
func (p *hashTableProbeBuffer) SetupLimitedSlices(length int, buildMode HashTableBuildMode) {
p.HeadID = maybeAllocateLimitedKeyIDArray(p.HeadID, length)
p.differs = colexecutils.MaybeAllocateLimitedBoolArray(p.differs, length)
if buildMode == HashTableDistinctBuildMode {
p.distinct = colexecutils.MaybeAllocateLimitedBoolArray(p.distinct, length)
}
// Note that we don't use maybeAllocate* methods below because ToCheckID and
// ToCheck don't need to be zeroed out when reused.
if cap(p.ToCheckID) < length {
p.ToCheckID = make([]keyID, length)
} else {
p.ToCheckID = p.ToCheckID[:length]
}
if cap(p.ToCheck) < length {
p.ToCheck = make([]uint64, length)
} else {
p.ToCheck = p.ToCheck[:length]
}
}
// FindNext determines the id of the next key inside the ToCheckID buckets for
// each equality column key in ToCheck.
func (ht *HashTable) FindNext(next []keyID, nToCheck uint64) {
for _, toCheck := range ht.ProbeScratch.ToCheck[:nToCheck] {
ht.ProbeScratch.ToCheckID[toCheck] = next[ht.ProbeScratch.ToCheckID[toCheck]]
}
}
// CheckBuildForDistinct finds all tuples in probeVecs that are *not* present in
// buffered tuples stored in ht.Vals. It stores the probeVecs's distinct tuples'
// keyIDs in HeadID buffer.
// NOTE: It assumes that probeVecs does not contain any duplicates itself.
// NOTE: It assumes that probeSel has already been populated and it is not nil.
// NOTE: It assumes that nToCheck is positive.
func (ht *HashTable) CheckBuildForDistinct(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) uint64 {
if probeSel == nil {
colexecerror.InternalError(errors.AssertionFailedf("invalid selection vector"))
}
ht.checkColsForDistinctTuples(probeVecs, nToCheck, probeSel)
nDiffers := uint64(0)
toCheckSlice := ht.ProbeScratch.ToCheck
_ = toCheckSlice[nToCheck-1]
for toCheckPos := uint64(0); toCheckPos < nToCheck && nDiffers < nToCheck; toCheckPos++ {
//gcassert:bce
toCheck := toCheckSlice[toCheckPos]
if ht.ProbeScratch.distinct[toCheck] {
ht.ProbeScratch.distinct[toCheck] = false
// Calculated using the convention: keyID = keys.indexOf(key) + 1.
ht.ProbeScratch.HeadID[toCheck] = keyID(toCheck + 1)
} else if ht.ProbeScratch.differs[toCheck] {
// Continue probing in this next chain for the probe key.
ht.ProbeScratch.differs[toCheck] = false
//gcassert:bce
toCheckSlice[nDiffers] = toCheck
nDiffers++
}
}
return nDiffers
}
// CheckBuildForAggregation finds all tuples in probeVecs that *are* present in
// buffered tuples stored in ht.Vals. For each present tuple at position i it
// stores keyID of its duplicate in HeadID[i], for all non-present tuples the
// corresponding HeadID value is left unchanged.
// NOTE: It assumes that probeVecs does not contain any duplicates itself.
// NOTE: It assumes that probeSel has already been populated and it is not nil.
// NOTE: It assumes that nToCheck is positive.
func (ht *HashTable) CheckBuildForAggregation(
probeVecs []coldata.Vec, nToCheck uint64, probeSel []int,
) uint64 {
if probeSel == nil {
colexecerror.InternalError(errors.AssertionFailedf("invalid selection vector"))
}
ht.checkColsForDistinctTuples(probeVecs, nToCheck, probeSel)
nDiffers := uint64(0)
toCheckSlice := ht.ProbeScratch.ToCheck
_ = toCheckSlice[nToCheck-1]
for toCheckPos := uint64(0); toCheckPos < nToCheck && nDiffers < nToCheck; toCheckPos++ {
//gcassert:bce
toCheck := toCheckSlice[toCheckPos]
if !ht.ProbeScratch.distinct[toCheck] {
// If the tuple is distinct, it doesn't have a duplicate in the
// hash table already, so we skip it.
if ht.ProbeScratch.differs[toCheck] {
// We have a hash collision, so we need to continue probing
// against the next tuples in the hash chain.
ht.ProbeScratch.differs[toCheck] = false
//gcassert:bce
toCheckSlice[nDiffers] = toCheck
nDiffers++
} else {
// This tuple has a duplicate in the hash table, so we remember
// keyID of that duplicate.
ht.ProbeScratch.HeadID[toCheck] = ht.ProbeScratch.ToCheckID[toCheck]
}
}
}
return nDiffers
}
// DistinctCheck determines if the current key in the ToCheckID bucket matches the
// equality column key. If there is a match, then the key is removed from
// ToCheck. If the bucket has reached the end, the key is rejected. The ToCheck
// list is reconstructed to only hold the indices of the keyCols keys that have
// not been found. The new length of ToCheck is returned by this function.
func (ht *HashTable) DistinctCheck(nToCheck uint64, probeSel []int) uint64 {
ht.checkCols(ht.Keys, nToCheck, probeSel)
// Select the indices that differ and put them into ToCheck.
nDiffers := uint64(0)
toCheckSlice := ht.ProbeScratch.ToCheck
_ = toCheckSlice[nToCheck-1]
for toCheckPos := uint64(0); toCheckPos < nToCheck && nDiffers < nToCheck; toCheckPos++ {
//gcassert:bce
toCheck := toCheckSlice[toCheckPos]
if ht.ProbeScratch.differs[toCheck] {
ht.ProbeScratch.differs[toCheck] = false
//gcassert:bce
toCheckSlice[nDiffers] = toCheck
nDiffers++
}
}
return nDiffers
}
// Reset resets the HashTable for reuse.
// NOTE: memory that already has been allocated for ht.Vals is *not* released.
// However, resetting the length of ht.Vals to zero doesn't confuse the
// allocator - it is smart enough to look at the capacities of the allocated
// vectors, and the capacities would stay the same until an actual new
// allocation is needed, and at that time the allocator will update the memory
// account accordingly.
func (ht *HashTable) Reset(_ context.Context) {
for n := 0; n < len(ht.BuildScratch.First); n += copy(ht.BuildScratch.First[n:], uint64ToKeyID(colexecutils.ZeroUint64Column)) {
}
ht.Vals.ResetInternalBatch()
// ht.ProbeScratch.Next, ht.Same and ht.Visited are reset separately before
// they are used (these slices are not used in all of the code paths).
// ht.ProbeScratch.HeadID, ht.ProbeScratch.differs, and
// ht.ProbeScratch.distinct are reset before they are used (these slices
// are limited in size and dynamically allocated).
// ht.ProbeScratch.ToCheckID and ht.ProbeScratch.ToCheck don't need to be
// reset because they are populated manually every time before checking the
// columns.
if ht.BuildMode == HashTableDistinctBuildMode {
// In "distinct" build mode, ht.BuildScratch.Next is populated
// iteratively, whenever we find tuples that we haven't seen before. In
// order to reuse the same underlying memory we need to slice up that
// slice (note that keyID=0 is reserved for the end of all hash chains,
// so we make the length 1).
ht.BuildScratch.Next = ht.BuildScratch.Next[:1]
}
}
// MaybeAllocateSame ensures that ht.Same has enough capacity for all tuples
// already present in the hash table plus one.
func (ht *HashTable) MaybeAllocateSame() {
ht.Same = maybeAllocateKeyIDArray(ht.Same, ht.Vals.Length()+1)
}
func maybeAllocateKeyIDArray(old []keyID, length int) []keyID {
return uint64ToKeyID(colexecutils.MaybeAllocateUint64Array(keyIDToUint64(old), length))
}
func maybeAllocateLimitedKeyIDArray(old []keyID, length int) []keyID {
return uint64ToKeyID(colexecutils.MaybeAllocateLimitedUint64Array(keyIDToUint64(old), length))
}
func keyIDToUint64(k []keyID) []uint64 {
return *(*[]uint64)(unsafe.Pointer(&k))
}
func uint64ToKeyID(u []uint64) []keyID {
return *(*[]keyID)(unsafe.Pointer(&u))
}