-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathrouters.go
811 lines (743 loc) · 28.8 KB
/
routers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexec
import (
"context"
"sync"
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
// routerOutput is an interface implemented by router outputs. It exists for
// easier test mocking of outputs.
type routerOutput interface {
execinfra.OpNode
// initWithHashRouter passes a reference to the HashRouter that will be
// pushing batches to this output.
initWithHashRouter(*HashRouter)
// addBatch adds the elements specified by the selection vector from batch to
// the output. It returns whether or not the output changed its state to
// blocked (see implementations).
addBatch(context.Context, coldata.Batch, []int) bool
// cancel tells the output to stop producing batches. Optionally forwards an
// error if not nil.
cancel(context.Context, error)
// forwardErr forwards an error to the output. The output should call
// colexecerror.ExpectedError with this error on the next call to Next.
// Calling forwardErr multiple times will result in the most recent error
// overwriting the previous error.
forwardErr(error)
// resetForTests resets the routerOutput for a benchmark or test run.
resetForTests(context.Context)
}
// getDefaultRouterOutputBlockedThreshold returns the number of unread values
// buffered by the routerOutputOp after which the output is considered blocked.
// It is a function rather than a variable so that in tests we could modify
// coldata.BatchSize() (if it were a variable, then its value would be
// evaluated before we set the desired batch size).
func getDefaultRouterOutputBlockedThreshold() int {
return coldata.BatchSize() * 2
}
type routerOutputOpState int
const (
// routerOutputOpRunning is the state in which routerOutputOp operates
// normally. The router output transitions into the draining state when
// either it is finished (when a zero-length batch was added or when it was
// canceled) or it encounters an error.
routerOutputOpRunning routerOutputOpState = iota
// routerOutputOpDraining is the state in which routerOutputOp always
// returns zero-length batches on calls to Next.
routerOutputOpDraining
)
// drainCoordinator is an interface that the HashRouter implements to coordinate
// cancellation of all of its outputs in the case of an error and draining in
// the case of graceful termination.
// WARNING: No locks should be held when calling these methods, as the
// HashRouter might call routerOutput methods (e.g. cancel) that attempt to
// reacquire locks.
type drainCoordinator interface {
// encounteredError should be called when a routerOutput encounters an error.
// This terminates execution. No locks should be held when calling this
// method, since cancellation could occur.
encounteredError(context.Context)
// drainMeta should be called exactly once when the routerOutput moves to
// draining.
drainMeta() []execinfrapb.ProducerMetadata
}
type routerOutputOp struct {
// input is a reference to our router.
input execinfra.OpNode
// drainCoordinator is a reference to the HashRouter to be able to notify it
// if the output encounters an error or transitions to a draining state.
drainCoordinator drainCoordinator
types []*types.T
// unblockedEventsChan is signaled when a routerOutput changes state from
// blocked to unblocked.
unblockedEventsChan chan<- struct{}
mu struct {
syncutil.Mutex
state routerOutputOpState
// forwardedErr is an error that was forwarded by the HashRouter. If set,
// any subsequent calls to Next will return this error.
forwardedErr error
// unlimitedAllocator tracks the memory usage of this router output,
// providing a signal for when it should spill to disk.
// The memory lifecycle is as follows:
//
// o.mu.pendingBatch is allocated as a "staging" area. Tuples are copied
// into it in addBatch.
// A read may come in in this state, in which case pendingBatch is returned
// and references to it are removed. Since batches are unsafe for reuse,
// the batch is also manually released from the allocator.
// If a read does not come in and the batch becomes full of tuples, that
// batch is stored in o.mu.data, which is a queue with an in-memory circular
// buffer backed by disk. If the batch fits in memory, a reference to it
// is retained and a new pendingBatch is allocated.
//
// If a read comes in at this point, the batch is dequeued from o.mu.data
// and returned, but the memory is still accounted for. In fact, memory use
// increases up to when o.mu.data is full and must spill to disk.
// Once it spills to disk, the spillingQueue (o.mu.data), will release
// batches it spills to disk to stop accounting for them.
// The tricky part comes when o.mu.data is dequeued from. In this case, the
// reference for a previously-returned batch is overwritten with an on-disk
// batch, so the memory for the overwritten batch is released, while the
// new batch's memory is retained. Note that if batches are being dequeued
// from disk, it must be the case that the circular buffer is now empty,
// holding references to batches that have been previously returned.
//
// In short, batches whose references are retained are also retained in the
// allocator, but if any references are overwritten or lost, those batches
// are released.
unlimitedAllocator *colmem.Allocator
cond *sync.Cond
// pendingBatch is a partially-filled batch with data added through
// addBatch. Once this batch reaches capacity, it is flushed to data. The
// main use of pendingBatch is coalescing various fragmented batches into
// one.
pendingBatch coldata.Batch
// data is a spillingQueue, a circular buffer backed by a disk queue.
data *spillingQueue
numUnread int
blocked bool
}
// pendingBatchCapacity indicates the capacity which the new mu.pendingBatch
// should be allocated with. It'll increase dynamically until
// coldata.BatchSize(). We need to track it ourselves since the pending
// batch ownership is given to the spillingQueue, so when using
// ResetMaybeReallocate, we don't the old batch to check the capacity of.
pendingBatchCapacity int
testingKnobs routerOutputOpTestingKnobs
}
func (o *routerOutputOp) ChildCount(verbose bool) int {
return 1
}
func (o *routerOutputOp) Child(nth int, verbose bool) execinfra.OpNode {
if nth == 0 {
return o.input
}
colexecerror.InternalError(errors.AssertionFailedf("invalid index %d", nth))
// This code is unreachable, but the compiler cannot infer that.
return nil
}
var _ colexecbase.Operator = &routerOutputOp{}
type routerOutputOpTestingKnobs struct {
// blockedThreshold is the number of buffered values above which we consider
// a router output to be blocked. It defaults to
// defaultRouterOutputBlockedThreshold but can be modified by tests to test
// edge cases.
blockedThreshold int
// alwaysFlush, if set to true, will always flush o.mu.pendingBatch to
// o.mu.data.
alwaysFlush bool
// addBatchTestInducedErrorCb is called after any function call that could
// produce an error if that error is nil. If the callback returns an error,
// the router output overwrites the nil error with the returned error.
// It is guaranteed that this callback will be called at least once during
// normal execution.
addBatchTestInducedErrorCb func() error
// nextTestInducedErrorCb is called after any function call that could
// produce an error if that error is nil. If the callback returns an error,
// the router output overwrites the nil error with the returned error.
// It is guaranteed that this callback will be called at least once during
// normal execution.
nextTestInducedErrorCb func() error
}
// routerOutputOpArgs are the arguments to newRouterOutputOp. All fields apart
// from the testing knobs are optional.
type routerOutputOpArgs struct {
// All fields are required unless marked optional.
types []*types.T
// unlimitedAllocator should not have a memory limit. Pass in a soft
// memoryLimit that will be respected instead.
unlimitedAllocator *colmem.Allocator
// memoryLimit acts as a soft limit to allow the router output to use disk
// when it is exceeded.
memoryLimit int64
diskAcc *mon.BoundAccount
cfg colcontainer.DiskQueueCfg
fdSemaphore semaphore.Semaphore
// unblockedEventsChan must be a buffered channel.
unblockedEventsChan chan<- struct{}
testingKnobs routerOutputOpTestingKnobs
}
// newRouterOutputOp creates a new router output.
func newRouterOutputOp(args routerOutputOpArgs) *routerOutputOp {
if args.testingKnobs.blockedThreshold == 0 {
args.testingKnobs.blockedThreshold = getDefaultRouterOutputBlockedThreshold()
}
o := &routerOutputOp{
types: args.types,
unblockedEventsChan: args.unblockedEventsChan,
testingKnobs: args.testingKnobs,
}
o.mu.unlimitedAllocator = args.unlimitedAllocator
o.mu.cond = sync.NewCond(&o.mu)
o.mu.data = newSpillingQueue(
args.unlimitedAllocator,
args.types,
args.memoryLimit,
args.cfg,
args.fdSemaphore,
args.diskAcc,
)
return o
}
func (o *routerOutputOp) Init() {}
// nextErrorLocked is a helper method that handles an error encountered in Next.
func (o *routerOutputOp) nextErrorLocked(ctx context.Context, err error) {
o.mu.state = routerOutputOpDraining
o.maybeUnblockLocked()
// Unlock the mutex, since the HashRouter will cancel all outputs.
o.mu.Unlock()
o.drainCoordinator.encounteredError(ctx)
o.mu.Lock()
colexecerror.InternalError(err)
}
// Next returns the next coldata.Batch from the routerOutputOp. Note that Next
// is designed for only one concurrent caller and will block until data is
// ready.
func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch {
o.mu.Lock()
defer o.mu.Unlock()
for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.pendingBatch == nil && o.mu.data.empty() {
// Wait until there is data to read or the output is canceled.
o.mu.cond.Wait()
}
if o.mu.forwardedErr != nil {
colexecerror.ExpectedError(o.mu.forwardedErr)
}
if o.mu.state == routerOutputOpDraining {
return coldata.ZeroBatch
}
var b coldata.Batch
if o.mu.pendingBatch != nil && o.mu.data.empty() {
// o.mu.data is empty (i.e. nothing has been flushed to the spillingQueue),
// but there is a o.mu.pendingBatch that has not been flushed yet. Return
// this batch directly.
b = o.mu.pendingBatch
o.mu.unlimitedAllocator.ReleaseBatch(b)
o.mu.pendingBatch = nil
} else {
var err error
b, err = o.mu.data.dequeue(ctx)
if err == nil && o.testingKnobs.nextTestInducedErrorCb != nil {
err = o.testingKnobs.nextTestInducedErrorCb()
}
if err != nil {
o.nextErrorLocked(ctx, err)
}
}
o.mu.numUnread -= b.Length()
if o.mu.numUnread <= o.testingKnobs.blockedThreshold {
o.maybeUnblockLocked()
}
if b.Length() == 0 {
if o.testingKnobs.nextTestInducedErrorCb != nil {
if err := o.testingKnobs.nextTestInducedErrorCb(); err != nil {
o.nextErrorLocked(ctx, err)
}
}
// This is the last batch. closeLocked will set done to protect against
// further calls to Next since this is allowed by the interface as well as
// cleaning up and releasing possible disk infrastructure.
o.closeLocked(ctx)
}
return b
}
func (o *routerOutputOp) DrainMeta(_ context.Context) []execinfrapb.ProducerMetadata {
o.mu.Lock()
o.mu.state = routerOutputOpDraining
o.maybeUnblockLocked()
o.mu.Unlock()
return o.drainCoordinator.drainMeta()
}
func (o *routerOutputOp) initWithHashRouter(r *HashRouter) {
o.input = r
o.drainCoordinator = r
}
func (o *routerOutputOp) closeLocked(ctx context.Context) {
o.mu.state = routerOutputOpDraining
if err := o.mu.data.close(ctx); err != nil {
// This log message is Info instead of Warning because the flow will also
// attempt to clean up the parent directory, so this failure might not have
// any effect.
log.Infof(ctx, "error closing vectorized hash router output, files may be left over: %s", err)
}
}
// cancel wakes up a reader in Next if there is one and results in the output
// returning zero length batches for every Next call after cancel. Note that
// all accumulated data that hasn't been read will not be returned.
func (o *routerOutputOp) cancel(ctx context.Context, err error) {
o.mu.Lock()
defer o.mu.Unlock()
o.closeLocked(ctx)
o.forwardErrLocked(err)
// Some goroutine might be waiting on the condition variable, so wake it up.
// Note that read goroutines check o.mu.done, so won't wait on the condition
// variable after we unlock the mutex.
o.mu.cond.Signal()
}
func (o *routerOutputOp) forwardErrLocked(err error) {
if err != nil {
o.mu.forwardedErr = err
}
}
func (o *routerOutputOp) forwardErr(err error) {
o.mu.Lock()
defer o.mu.Unlock()
o.forwardErrLocked(err)
o.mu.cond.Signal()
}
// addBatch copies the columns in batch according to selection into an internal
// buffer.
// The routerOutputOp only adds the elements specified by selection. Therefore,
// an empty selection slice will add no elements. Note that the selection vector
// on the batch is ignored. This is so that callers of addBatch can push the
// same batch with different selection vectors to many different outputs.
// True is returned if the output changes state to blocked (note: if the
// output is already blocked, false is returned).
// TODO(asubiotto): We should explore pipelining addBatch if disk-spilling
// performance becomes a concern. The main router goroutine will be writing to
// disk as the code is written, meaning that we impact the performance of
// writing rows to a fast output if we have to write to disk for a single
// slow output.
func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch, selection []int) bool {
if len(selection) > batch.Length() {
selection = selection[:batch.Length()]
}
o.mu.Lock()
defer o.mu.Unlock()
if o.mu.state == routerOutputOpDraining {
// This output is draining, discard any data.
return false
}
if batch.Length() == 0 {
if o.mu.pendingBatch != nil {
err := o.mu.data.enqueue(ctx, o.mu.pendingBatch)
if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil {
err = o.testingKnobs.addBatchTestInducedErrorCb()
}
if err != nil {
colexecerror.InternalError(err)
}
} else if o.testingKnobs.addBatchTestInducedErrorCb != nil {
// This is the last chance to run addBatchTestInducedErorCb if it has
// been set.
if err := o.testingKnobs.addBatchTestInducedErrorCb(); err != nil {
colexecerror.InternalError(err)
}
}
o.mu.pendingBatch = coldata.ZeroBatch
o.mu.cond.Signal()
return false
}
if len(selection) == 0 {
// Non-zero batch with no selection vector. Nothing to do.
return false
}
// Increment o.mu.numUnread before going into the loop, as we will consume
// selection.
o.mu.numUnread += len(selection)
for toAppend := len(selection); toAppend > 0; {
if o.mu.pendingBatch == nil {
if o.pendingBatchCapacity < coldata.BatchSize() {
// We still haven't reached the maximum capacity, so let's
// calculate the next capacity to use.
if o.pendingBatchCapacity == 0 {
// This is the first set of tuples that are added to this
// router output, so we'll allocate the batch with just
// enough capacity to fill all of these tuples.
o.pendingBatchCapacity = len(selection)
} else {
o.pendingBatchCapacity *= 2
}
}
// Note: we could have used NewMemBatchWithFixedCapacity here, but
// we choose not to in order to indicate that the capacity of the
// pending batches has dynamic behavior.
o.mu.pendingBatch, _ = o.mu.unlimitedAllocator.ResetMaybeReallocate(o.types, nil /* oldBatch */, o.pendingBatchCapacity)
}
available := o.mu.pendingBatch.Capacity() - o.mu.pendingBatch.Length()
numAppended := toAppend
if toAppend > available {
numAppended = available
}
o.mu.unlimitedAllocator.PerformOperation(o.mu.pendingBatch.ColVecs(), func() {
for i := range o.types {
o.mu.pendingBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: batch.ColVec(i),
Sel: selection[:numAppended],
DestIdx: o.mu.pendingBatch.Length(),
SrcEndIdx: numAppended,
},
},
)
}
})
newLength := o.mu.pendingBatch.Length() + numAppended
o.mu.pendingBatch.SetLength(newLength)
if o.testingKnobs.alwaysFlush || newLength >= o.mu.pendingBatch.Capacity() {
// The capacity in o.mu.pendingBatch has been filled.
err := o.mu.data.enqueue(ctx, o.mu.pendingBatch)
if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil {
err = o.testingKnobs.addBatchTestInducedErrorCb()
}
if err != nil {
colexecerror.InternalError(err)
}
o.mu.pendingBatch = nil
}
toAppend -= numAppended
selection = selection[numAppended:]
}
stateChanged := false
if o.mu.numUnread > o.testingKnobs.blockedThreshold && !o.mu.blocked {
// The output is now blocked.
o.mu.blocked = true
stateChanged = true
}
o.mu.cond.Signal()
return stateChanged
}
// maybeUnblockLocked unblocks the router output if it is in a blocked state. If the
// output was previously in a blocked state, an event will be sent on
// routerOutputOp.unblockedEventsChan.
func (o *routerOutputOp) maybeUnblockLocked() {
if o.mu.blocked {
o.mu.blocked = false
o.unblockedEventsChan <- struct{}{}
}
}
// resetForTests resets the routerOutputOp for a test or benchmark run.
func (o *routerOutputOp) resetForTests(ctx context.Context) {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.state = routerOutputOpRunning
o.mu.forwardedErr = nil
o.mu.data.reset(ctx)
o.mu.numUnread = 0
o.mu.blocked = false
o.pendingBatchCapacity = 0
}
// hashRouterDrainState is a state that specifically describes the hashRouter's
// state in the draining process. This differs from its "general" state. For
// example, a hash router can have drained and exited the Run method but still
// be in hashRouterDrainStateRunning until somebody calls drainMeta.
type hashRouterDrainState int
const (
// hashRouterDrainStateRunning is the state that a hashRouter is in when
// running normally (i.e. pulling and pushing batches).
hashRouterDrainStateRunning = iota
// hashRouterDrainStateRequested is the state that a hashRouter is in when
// either all outputs have called drainMeta or an error was encountered by one
// of the outputs.
hashRouterDrainStateRequested
// hashRouterDrainStateCompleted is the state that a hashRouter is in when
// draining has completed.
hashRouterDrainStateCompleted
)
// HashRouter hashes values according to provided hash columns and computes a
// destination for each row. These destinations are exposed as Operators
// returned by the constructor.
type HashRouter struct {
OneInputNode
// hashCols is a slice of indices of the columns used for hashing.
hashCols []uint32
// One output for each stream.
outputs []routerOutput
// metadataSources is a slice of execinfrapb.MetadataSources that need to be
// drained when the HashRouter terminates.
metadataSources execinfrapb.MetadataSources
// closers is a slice of Closers that need to be closed when the hash router
// terminates.
closers colexecbase.Closers
// unblockedEventsChan is a channel shared between the HashRouter and its
// outputs. outputs send events on this channel when they are unblocked by a
// read.
unblockedEventsChan <-chan struct{}
numBlockedOutputs int
bufferedMeta []execinfrapb.ProducerMetadata
// atomics is shared state between the Run goroutine and any routerOutput
// goroutines that call drainMeta.
atomics struct {
// drainState is the state the hashRouter is in. The Run goroutine should
// only ever read these states, never set them.
drainState int32
numDrainedOutputs int32
}
// waitForMetadata is a channel that the last output to drain will read from
// to pass on any metadata buffered through the Run goroutine.
waitForMetadata chan []execinfrapb.ProducerMetadata
// tupleDistributor is used to decide to which output a particular tuple
// should be routed.
tupleDistributor *tupleHashDistributor
}
// NewHashRouter creates a new hash router that consumes coldata.Batches from
// input and hashes each row according to hashCols to one of the outputs
// returned as Operators.
// The number of allocators provided will determine the number of outputs
// returned. Note that each allocator must be unlimited, memory will be limited
// by comparing memory use in the allocator with the memoryLimit argument. Each
// Operator must have an independent allocator (this means that each allocator
// should be linked to an independent mem account) as Operator.Next will usually
// be called concurrently between different outputs. Similarly, each output
// needs to have a separate disk account.
func NewHashRouter(
unlimitedAllocators []*colmem.Allocator,
input colexecbase.Operator,
types []*types.T,
hashCols []uint32,
memoryLimit int64,
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
diskAccounts []*mon.BoundAccount,
toDrain []execinfrapb.MetadataSource,
toClose []colexecbase.Closer,
) (*HashRouter, []colexecbase.DrainableOperator) {
if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault {
colexecerror.InternalError(errors.Errorf("hash router instantiated with incompatible disk queue cache mode: %d", diskQueueCfg.CacheMode))
}
outputs := make([]routerOutput, len(unlimitedAllocators))
outputsAsOps := make([]colexecbase.DrainableOperator, len(unlimitedAllocators))
// unblockEventsChan is buffered to 2*numOutputs as we don't want the outputs
// writing to it to block.
// Unblock events only happen after a corresponding block event. Since these
// are state changes and are done under lock (including the output sending
// on the channel, which is why we want the channel to be buffered in the
// first place), every time the HashRouter blocks an output, it *must* read
// all unblock events preceding it since these *must* be on the channel.
unblockEventsChan := make(chan struct{}, 2*len(unlimitedAllocators))
memoryLimitPerOutput := memoryLimit / int64(len(unlimitedAllocators))
for i := range unlimitedAllocators {
op := newRouterOutputOp(
routerOutputOpArgs{
types: types,
unlimitedAllocator: unlimitedAllocators[i],
memoryLimit: memoryLimitPerOutput,
diskAcc: diskAccounts[i],
cfg: diskQueueCfg,
fdSemaphore: fdSemaphore,
unblockedEventsChan: unblockEventsChan,
},
)
outputs[i] = op
outputsAsOps[i] = op
}
return newHashRouterWithOutputs(input, hashCols, unblockEventsChan, outputs, toDrain, toClose), outputsAsOps
}
func newHashRouterWithOutputs(
input colexecbase.Operator,
hashCols []uint32,
unblockEventsChan <-chan struct{},
outputs []routerOutput,
toDrain []execinfrapb.MetadataSource,
toClose []colexecbase.Closer,
) *HashRouter {
r := &HashRouter{
OneInputNode: NewOneInputNode(input),
hashCols: hashCols,
outputs: outputs,
closers: toClose,
metadataSources: toDrain,
unblockedEventsChan: unblockEventsChan,
// waitForMetadata is a buffered channel to avoid blocking if nobody will
// read the metadata.
waitForMetadata: make(chan []execinfrapb.ProducerMetadata, 1),
tupleDistributor: newTupleHashDistributor(defaultInitHashValue, len(outputs)),
}
for i := range outputs {
outputs[i].initWithHashRouter(r)
}
return r
}
// cancelOutputs cancels all outputs and forwards the given error to all of
// them if non-nil. The only case where the error is not forwarded is if no
// output could be canceled due to an error. In this case each output will
// forward the error returned during cancellation.
func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) {
for _, o := range r.outputs {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
o.cancel(ctx, errToForward)
}); err != nil {
// If there was an error canceling this output, this error can be
// forwarded to whoever is calling Next.
o.forwardErr(err)
}
}
}
func (r *HashRouter) setDrainState(drainState hashRouterDrainState) {
atomic.StoreInt32(&r.atomics.drainState, int32(drainState))
}
func (r *HashRouter) getDrainState() hashRouterDrainState {
return hashRouterDrainState(atomic.LoadInt32(&r.atomics.drainState))
}
// Run runs the HashRouter. Batches are read from the input and pushed to an
// output calculated by hashing columns. Cancel the given context to terminate
// early.
func (r *HashRouter) Run(ctx context.Context) {
// Since HashRouter runs in a separate goroutine, we want to be safe and
// make sure that we catch errors in all code paths, so we wrap the whole
// method with a catcher. Note that we also have "internal" catchers as
// well for more fine-grained control of error propagation.
if err := colexecerror.CatchVectorizedRuntimeError(func() {
r.input.Init()
var done bool
processNextBatch := func() {
done = r.processNextBatch(ctx)
}
for {
if r.getDrainState() != hashRouterDrainStateRunning {
break
}
// Check for cancellation.
select {
case <-ctx.Done():
r.cancelOutputs(ctx, ctx.Err())
return
default:
}
// Read all the routerOutput state changes that have happened since the
// last iteration.
for moreToRead := true; moreToRead; {
select {
case <-r.unblockedEventsChan:
r.numBlockedOutputs--
default:
// No more routerOutput state changes to read without blocking.
moreToRead = false
}
}
if r.numBlockedOutputs == len(r.outputs) {
// All outputs are blocked, wait until at least one output is unblocked.
select {
case <-r.unblockedEventsChan:
r.numBlockedOutputs--
case <-ctx.Done():
r.cancelOutputs(ctx, ctx.Err())
return
}
}
if err := colexecerror.CatchVectorizedRuntimeError(processNextBatch); err != nil {
r.cancelOutputs(ctx, err)
return
}
if done {
// The input was done and we have notified the routerOutputs that there
// is no more data.
return
}
}
}); err != nil {
r.cancelOutputs(ctx, err)
}
// Non-blocking send of metadata so that one of the outputs can return it
// in DrainMeta.
r.bufferedMeta = append(r.bufferedMeta, r.metadataSources.DrainMeta(ctx)...)
r.waitForMetadata <- r.bufferedMeta
close(r.waitForMetadata)
r.closers.CloseAndLogOnErr(ctx, "hash router")
}
// processNextBatch reads the next batch from its input, hashes it and adds
// each column to its corresponding output, returning whether the input is
// done.
func (r *HashRouter) processNextBatch(ctx context.Context) bool {
b := r.input.Next(ctx)
n := b.Length()
if n == 0 {
// Done. Push an empty batch to outputs to tell them the data is done as
// well.
for _, o := range r.outputs {
o.addBatch(ctx, b, nil)
}
return true
}
selections := r.tupleDistributor.distribute(ctx, b, r.hashCols)
for i, o := range r.outputs {
if o.addBatch(ctx, b, selections[i]) {
// This batch blocked the output.
r.numBlockedOutputs++
}
}
return false
}
// resetForTests resets the HashRouter for a test or benchmark run.
func (r *HashRouter) resetForTests(ctx context.Context) {
if i, ok := r.input.(resetter); ok {
i.reset(ctx)
}
r.setDrainState(hashRouterDrainStateRunning)
r.waitForMetadata = make(chan []execinfrapb.ProducerMetadata, 1)
r.atomics.numDrainedOutputs = 0
r.bufferedMeta = nil
r.numBlockedOutputs = 0
for moreToRead := true; moreToRead; {
select {
case <-r.unblockedEventsChan:
default:
moreToRead = false
}
}
for _, o := range r.outputs {
o.resetForTests(ctx)
}
}
func (r *HashRouter) encounteredError(ctx context.Context) {
// Once one output returns an error the hash router needs to stop running
// and drain its input.
r.setDrainState(hashRouterDrainStateRequested)
// cancel all outputs. The Run goroutine will eventually realize that the
// HashRouter is done and exit without draining.
r.cancelOutputs(ctx, nil /* errToForward */)
}
func (r *HashRouter) drainMeta() []execinfrapb.ProducerMetadata {
if int(atomic.AddInt32(&r.atomics.numDrainedOutputs, 1)) != len(r.outputs) {
return nil
}
// All outputs have been drained, return any buffered metadata to the last
// output to call drainMeta.
r.setDrainState(hashRouterDrainStateRequested)
meta := <-r.waitForMetadata
r.setDrainState(hashRouterDrainStateCompleted)
return meta
}