-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathscheduled_processor.go
791 lines (720 loc) · 26.1 KB
/
scheduled_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rangefeed
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)
// request is any action on processor which is not a data path. e.g. request
// active filter, length, add registration etc.
// This request type is only serving as execution mechanism (think RunAsyncTask).
// Processor state is exclusively updated by the running request, the only
// concurrent activity that could happen is enqueueing events that is handled by
// data and request queues independently.
// To execute a request that returns a value, use runRequest that accepts
// function that returns value.
type request func(context.Context)
// ScheduledProcessor is an implementation of processor that uses external
// scheduler to use processing.
type ScheduledProcessor struct {
Config
scheduler ClientScheduler
reg registry
rts resolvedTimestamp
// processCtx is the annotated background context used for process(). It is
// stored here to avoid reconstructing it on every call.
processCtx context.Context
requestQueue chan request
eventC chan *event
// If true, processor is not processing data anymore and waiting for registrations
// to be complete.
stopping bool
stoppedC chan struct{}
// Processor startup runs background tasks to scan intents. If processor is
// stopped early, this task needs to be terminated to avoid resource waste.
startupCancel func()
// stopper passed by start that is used for firing up async work from scheduler.
stopper *stop.Stopper
txnPushActive bool
}
// NewScheduledProcessor creates a new scheduler based rangefeed Processor.
// Processor needs to be explicitly started after creation.
func NewScheduledProcessor(cfg Config) *ScheduledProcessor {
cfg.SetDefaults()
cfg.AmbientContext.AddLogTag("rangefeed", nil)
p := &ScheduledProcessor{
Config: cfg,
scheduler: cfg.Scheduler.NewClientScheduler(),
reg: makeRegistry(cfg.Metrics),
rts: makeResolvedTimestamp(),
processCtx: cfg.AmbientContext.AnnotateCtx(context.Background()),
requestQueue: make(chan request, 20),
eventC: make(chan *event, cfg.EventChanCap),
// Closed when scheduler removed callback.
stoppedC: make(chan struct{}),
}
return p
}
// Start performs processor one-time initialization e.g registers with
// scheduler and fires up background tasks to populate processor state.
// The provided iterator is used to initialize the rangefeed's resolved
// timestamp. It must obey the contract of an iterator used for an
// initResolvedTSScan. The Processor promises to clean up the iterator by
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *ScheduledProcessor) Start(
stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor,
) error {
ctx := p.Config.AmbientContext.AnnotateCtx(context.Background())
ctx, p.startupCancel = context.WithCancel(ctx)
p.stopper = stopper
// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
if err := p.scheduler.Register(p.process, p.Priority); err != nil {
p.cleanup()
return err
}
// Launch an async task to scan over the resolved timestamp iterator and
// initialize the unresolvedIntentQueue.
if rtsIterFunc != nil {
rtsIter := rtsIterFunc()
initScan := newInitResolvedTSScan(p.Span, p, rtsIter)
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks during start.
if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil {
initScan.Cancel()
p.scheduler.StopProcessor()
return err
}
} else {
p.initResolvedTS(ctx)
}
p.Metrics.RangeFeedProcessorsScheduler.Inc(1)
return nil
}
// process is a scheduler callback that is processing scheduled events and
// requests.
func (p *ScheduledProcessor) process(e processorEventType) processorEventType {
ctx := p.processCtx
if e&RequestQueued != 0 {
p.processRequests(ctx)
}
if e&EventQueued != 0 {
p.processEvents(ctx)
}
if e&PushTxnQueued != 0 {
p.processPushTxn(ctx)
}
if e&Stopped != 0 {
p.processStop()
}
return 0
}
// process pending requests.
func (p *ScheduledProcessor) processRequests(ctx context.Context) {
// No need to limit number of processed requests as we don't expect more than
// a handful requests within the whole lifecycle.
for {
select {
case e := <-p.requestQueue:
e(ctx)
default:
return
}
}
}
// Transform and route pending events.
func (p *ScheduledProcessor) processEvents(ctx context.Context) {
// Only process as much data as was present at the start of the processing
// run to avoid starving other processors.
for max := len(p.eventC); max > 0; max-- {
select {
case e := <-p.eventC:
if !p.stopping {
// If we are stopping, there's no need to forward any remaining
// data since registrations already have errors set.
p.consumeEvent(ctx, e)
}
e.alloc.Release(ctx)
putPooledEvent(e)
default:
return
}
}
}
func (p *ScheduledProcessor) processPushTxn(ctx context.Context) {
// NB: Len() check avoids hlc.Clock.Now() mutex acquisition in the common
// case, which can be a significant source of contention.
if !p.txnPushActive && p.rts.IsInit() && p.rts.intentQ.Len() > 0 {
now := p.Clock.Now()
before := now.Add(-p.PushTxnsAge.Nanoseconds(), 0)
oldTxns := p.rts.intentQ.Before(before)
if len(oldTxns) > 0 {
toPush := make([]enginepb.TxnMeta, len(oldTxns))
for i, txn := range oldTxns {
toPush[i] = txn.asTxnMeta()
}
// Launch an async transaction push attempt that pushes the
// timestamp of all transactions beneath the push offset.
// Ignore error if quiescing.
pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, func() {
p.enqueueRequest(func(ctx context.Context) {
p.txnPushActive = false
})
})
p.txnPushActive = true
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks for push.
err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
}
}
}
}
func (p *ScheduledProcessor) processStop() {
p.cleanup()
p.Metrics.RangeFeedProcessorsScheduler.Dec(1)
}
func (p *ScheduledProcessor) cleanup() {
// Cleanup is normally called when all registrations are disconnected and
// unregistered or were not created yet (processor start failure).
// However, there's a case where processor is stopped by replica action while
// registrations are still active. In that case registrations won't have a
// chance to unregister themselves after their work loop terminates because
// processor is already disconnected from scheduler.
// To avoid leaking any registry resources and metrics, processor performs
// explicit registry termination in that case.
pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})
p.reg.DisconnectAllOnShutdown(pErr)
// Unregister callback from scheduler
p.scheduler.Unregister()
p.startupCancel()
close(p.stoppedC)
p.MemBudget.Close(context.Background())
}
// Stop shuts down the processor and closes all registrations. Safe to call on
// nil Processor. It is not valid to restart a processor after it has been
// stopped.
func (p *ScheduledProcessor) Stop() {
p.StopWithErr(nil)
}
// StopWithErr shuts down the processor and closes all registrations with the
// specified error. Safe to call on nil Processor. It is not valid to restart a
// processor after it has been stopped.
func (p *ScheduledProcessor) StopWithErr(pErr *kvpb.Error) {
// Flush any remaining events before stopping.
p.syncEventC()
// Send the processor a stop signal.
p.sendStop(pErr)
}
// DisconnectSpanWithErr disconnects all rangefeed registrations that overlap
// the given span with the given error.
func (p *ScheduledProcessor) DisconnectSpanWithErr(span roachpb.Span, pErr *kvpb.Error) {
if p == nil {
return
}
p.enqueueRequest(func(ctx context.Context) {
p.reg.DisconnectWithErr(span, pErr)
})
}
func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) {
p.enqueueRequest(func(ctx context.Context) {
p.reg.DisconnectWithErr(all, pErr)
// First set stopping flag to ensure that once all registrations are removed
// processor should stop.
p.stopping = true
p.scheduler.StopProcessor()
})
}
// Register registers the stream over the specified span of keys.
//
// The registration will not observe any events that were consumed before this
// method was called. It is undefined whether the registration will observe
// events that are consumed concurrently with this call. The channel will be
// provided an error when the registration closes.
//
// The optionally provided "catch-up" iterator is used to read changes from the
// engine which occurred after the provided start timestamp (exclusive).
//
// If the method returns false, the processor will have been stopped, so calling
// Stop is not necessary. If the method returns true, it will also return an
// updated operation filter that includes the operations required by the new
// registration.
//
// NB: startTS is exclusive; the first possible event will be at startTS.Next().
func (p *ScheduledProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
withDiff bool,
stream Stream,
disconnectFn func(),
done *future.ErrorFuture,
) (bool, *Filter) {
// Synchronize the event channel so that this registration doesn't see any
// events that were consumed before this registration was called. Instead,
// it should see these events during its catch up scan.
p.syncEventC()
blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)
filter := runRequest(p, func(ctx context.Context, p *ScheduledProcessor) *Filter {
if p.stopping {
return nil
}
if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) {
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}
// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return nil
}
// Add the new registration to the registry.
p.reg.Register(&r)
// Prep response with filter that includes the new registration.
f := p.reg.NewFilter()
// Immediately publish a checkpoint event to the registry. This will be the first event
// published to this registration after its initial catch-up scan completes. The resolved
// timestamp might be empty but the checkpoint event is still useful to indicate that the
// catch-up scan has completed. This allows clients to rely on stronger ordering semantics
// once they observe the first checkpoint event.
r.publish(ctx, p.newCheckpointEvent(), nil)
// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
r.runOutputLoop(ctx, p.RangeID)
if p.unregisterClient(&r) {
// unreg callback is set by replica to tear down processors that have
// zero registrations left and to update event filters.
if r.unreg != nil {
r.unreg()
}
}
}
if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
// If we can't schedule internally, processor is already stopped which
// could only happen on shutdown. Disconnect stream and just remove
// registration.
r.disconnect(kvpb.NewError(err))
p.reg.Unregister(ctx, &r)
}
return f
})
if filter != nil {
return true, filter
}
return false, nil
}
func (p *ScheduledProcessor) unregisterClient(r *registration) bool {
return runRequest(p, func(ctx context.Context, p *ScheduledProcessor) bool {
p.reg.Unregister(ctx, r)
return true
})
}
// ConsumeLogicalOps informs the rangefeed processor of the set of logical
// operations. It returns false if consuming the operations hit a timeout, as
// specified by the EventChanTimeout configuration. If the method returns false,
// the processor will have been stopped, so calling Stop is not necessary. Safe
// to call on nil Processor.
func (p *ScheduledProcessor) ConsumeLogicalOps(
ctx context.Context, ops ...enginepb.MVCCLogicalOp,
) bool {
if p == nil {
return true
}
if len(ops) == 0 {
return true
}
return p.sendEvent(ctx, event{ops: ops}, p.EventChanTimeout)
}
// ConsumeSSTable informs the rangefeed processor of an SSTable that was added
// via AddSSTable. It returns false if consuming the SSTable hit a timeout, as
// specified by the EventChanTimeout configuration. If the method returns false,
// the processor will have been stopped, so calling Stop is not necessary. Safe
// to call on nil Processor.
func (p *ScheduledProcessor) ConsumeSSTable(
ctx context.Context, sst []byte, sstSpan roachpb.Span, writeTS hlc.Timestamp,
) bool {
if p == nil {
return true
}
return p.sendEvent(ctx, event{sst: &sstEvent{sst, sstSpan, writeTS}}, p.EventChanTimeout)
}
// ForwardClosedTS indicates that the closed timestamp that serves as the basis
// for the rangefeed processor's resolved timestamp has advanced. It returns
// false if forwarding the closed timestamp hit a timeout, as specified by the
// EventChanTimeout configuration. If the method returns false, the processor
// will have been stopped, so calling Stop is not necessary. Safe to call on
// nil Processor.
func (p *ScheduledProcessor) ForwardClosedTS(ctx context.Context, closedTS hlc.Timestamp) bool {
if p == nil {
return true
}
if closedTS.IsEmpty() {
return true
}
return p.sendEvent(ctx, event{ct: ctEvent{closedTS}}, p.EventChanTimeout)
}
// sendEvent informs the Processor of a new event. If a timeout is specified,
// the method will wait for no longer than that duration before giving up,
// shutting down the Processor, and returning false. 0 for no timeout.
func (p *ScheduledProcessor) sendEvent(ctx context.Context, e event, timeout time.Duration) bool {
if p.enqueueEventInternal(ctx, e, timeout) {
// We can ignore the event because we don't guarantee that we will drain
// all the events after processor was stopped. Memory budget will also be
// closed, releasing info about pending events that would be discarded with
// processor.
p.scheduler.Enqueue(EventQueued)
return true
}
return false
}
func (p *ScheduledProcessor) enqueueEventInternal(
ctx context.Context, e event, timeout time.Duration,
) bool {
// The code is a bit unwieldy because we try to avoid any allocations on fast
// path where we have enough budget and outgoing channel is free. If not, we
// try to set up timeout for acquiring budget and then reuse this timeout when
// inserting value into channel.
var alloc *SharedBudgetAllocation
if p.MemBudget != nil {
size := calculateDateEventSize(e)
if size > 0 {
var err error
// First we will try non-blocking fast path to allocate memory budget.
alloc, err = p.MemBudget.TryGet(ctx, size)
// If budget is already closed, then just let it through because processor
// is terminating.
if err != nil && !errors.Is(err, budgetClosedError) {
// Since we don't have enough budget, we should try to wait for
// allocation returns before failing.
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
// We reset timeout here so that subsequent channel write op doesn't
// try to wait beyond what is already set up.
timeout = 0
}
p.Metrics.RangeFeedBudgetBlocked.Inc(1)
alloc, err = p.MemBudget.WaitAndGet(ctx, size)
}
if err != nil && !errors.Is(err, budgetClosedError) {
p.Metrics.RangeFeedBudgetExhausted.Inc(1)
p.sendStop(newErrBufferCapacityExceeded())
return false
}
// Always release allocation pointer after sending as it is nil safe.
// In normal case its value is moved into event, in case of allocation
// errors it is nil, in case of send errors it is non-nil and this call
// ensures that unused allocation is released.
defer func() {
alloc.Release(ctx)
}()
}
}
ev := getPooledEvent(e)
ev.alloc = alloc
if timeout == 0 {
// Timeout is zero if no timeout was requested or timeout is already set on
// the context by budget allocation. Just try to write using context as a
// timeout.
select {
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it (see comment on defer for explanation).
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
case <-ctx.Done():
p.sendStop(newErrBufferCapacityExceeded())
return false
}
} else {
// First try fast path operation without blocking and without creating any
// contexts in case channel has capacity.
select {
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it (see comment on defer for explanation).
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
default:
// Fast path failed since we don't have capacity in channel. Wait for
// slots to clear up using context timeout.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
select {
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it (see comment on defer for explanation).
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
case <-ctx.Done():
// Sending on the eventC channel would have blocked.
// Instead, tear down the processor and return immediately.
p.sendStop(newErrBufferCapacityExceeded())
return false
}
}
}
return true
}
// setResolvedTSInitialized informs the Processor that its resolved timestamp has
// all the information it needs to be considered initialized.
func (p *ScheduledProcessor) setResolvedTSInitialized(ctx context.Context) {
p.sendEvent(ctx, event{initRTS: true}, 0)
}
// syncEventC synchronizes access to the Processor goroutine, allowing the
// caller to establish causality with actions taken by the Processor goroutine.
// It does so by flushing the event pipeline.
func (p *ScheduledProcessor) syncEventC() {
p.syncSendAndWait(&syncEvent{c: make(chan struct{})})
}
// syncSendAndWait allows sync event to be sent and waited on its channel.
// Exposed to allow special test syneEvents that contain span to be sent.
func (p *ScheduledProcessor) syncSendAndWait(se *syncEvent) {
ev := getPooledEvent(event{sync: se})
select {
case p.eventC <- ev:
// This shouldn't happen as there should be no sync events after disconnect,
// but if there's a bug don't wait it can hang waiting for sync chan.
p.scheduler.Enqueue(EventQueued)
select {
case <-se.c:
// Synchronized.
case <-p.stoppedC:
// Already stopped. Do nothing.
}
case <-p.stoppedC:
// Already stopped. Do nothing.
putPooledEvent(ev)
}
}
// Len returns the number of registrations attached to the processor.
func (p *ScheduledProcessor) Len() int {
return runRequest(p, func(_ context.Context, p *ScheduledProcessor) int {
return p.reg.Len()
})
}
// Filter returns a new operation filter based on the registrations attached to
// the processor. Returns nil if the processor has been stopped already.
func (p *ScheduledProcessor) Filter() *Filter {
return runRequest(p, func(_ context.Context, p *ScheduledProcessor) *Filter {
return newFilterFromRegistry(&p.reg)
})
}
// runRequest will enqueue request to processor and wait for it to be complete.
// Function f will be executed on processor callback by scheduler worker. It
// is guaranteed that only single request is modifying processor at any given
// time. It is advisable to use provided processor reference for operations
// rather than using one within closure itself.
// If request can't be queued or processor stoppedC is closed then default
// value is returned.
func runRequest[T interface{}](
p *ScheduledProcessor, f func(ctx context.Context, p *ScheduledProcessor) T,
) (r T) {
result := make(chan T, 1)
p.enqueueRequest(func(ctx context.Context) {
result <- f(ctx, p)
})
select {
case r = <-result:
return r
case <-p.stoppedC:
return r
}
}
func (p *ScheduledProcessor) enqueueRequest(req request) {
select {
case p.requestQueue <- req:
p.scheduler.Enqueue(RequestQueued)
case <-p.stoppedC:
}
}
func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) {
switch {
case e.ops != nil:
p.consumeLogicalOps(ctx, e.ops, e.alloc)
case !e.ct.IsEmpty():
p.forwardClosedTS(ctx, e.ct.Timestamp)
case bool(e.initRTS):
p.initResolvedTS(ctx)
case e.sst != nil:
p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc)
case e.sync != nil:
if e.sync.testRegCatchupSpan != nil {
if err := p.reg.waitForCaughtUp(*e.sync.testRegCatchupSpan); err != nil {
log.Errorf(
ctx,
"error waiting for registries to catch up during test, results might be impacted: %s",
err,
)
}
}
close(e.sync.c)
default:
panic(fmt.Sprintf("missing event variant: %+v", e))
}
}
func (p *ScheduledProcessor) consumeLogicalOps(
ctx context.Context, ops []enginepb.MVCCLogicalOp, alloc *SharedBudgetAllocation,
) {
for _, op := range ops {
// Publish RangeFeedValue updates, if necessary.
switch t := op.GetValue().(type) {
case *enginepb.MVCCWriteValueOp:
// Publish the new value directly.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc)
case *enginepb.MVCCDeleteRangeOp:
// Publish the range deletion directly.
p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, alloc)
case *enginepb.MVCCWriteIntentOp:
// No updates to publish.
case *enginepb.MVCCUpdateIntentOp:
// No updates to publish.
case *enginepb.MVCCCommitIntentOp:
// Publish the newly committed value.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc)
case *enginepb.MVCCAbortIntentOp:
// No updates to publish.
case *enginepb.MVCCAbortTxnOp:
// No updates to publish.
default:
panic(errors.AssertionFailedf("unknown logical op %T", t))
}
// Determine whether the operation caused the resolved timestamp to
// move forward. If so, publish a RangeFeedCheckpoint notification.
if p.rts.ConsumeLogicalOp(op) {
p.publishCheckpoint(ctx)
}
}
}
func (p *ScheduledProcessor) consumeSSTable(
ctx context.Context,
sst []byte,
sstSpan roachpb.Span,
sstWTS hlc.Timestamp,
alloc *SharedBudgetAllocation,
) {
p.publishSSTable(ctx, sst, sstSpan, sstWTS, alloc)
}
func (p *ScheduledProcessor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) {
if p.rts.ForwardClosedTS(newClosedTS) {
p.publishCheckpoint(ctx)
}
}
func (p *ScheduledProcessor) initResolvedTS(ctx context.Context) {
if p.rts.Init() {
p.publishCheckpoint(ctx)
}
}
func (p *ScheduledProcessor) publishValue(
ctx context.Context,
key roachpb.Key,
timestamp hlc.Timestamp,
value, prevValue []byte,
alloc *SharedBudgetAllocation,
) {
if !p.Span.ContainsKey(roachpb.RKey(key)) {
log.Fatalf(ctx, "key %v not in Processor's key range %v", key, p.Span)
}
var prevVal roachpb.Value
if prevValue != nil {
prevVal.RawBytes = prevValue
}
var event kvpb.RangeFeedEvent
event.MustSetValue(&kvpb.RangeFeedValue{
Key: key,
Value: roachpb.Value{
RawBytes: value,
Timestamp: timestamp,
},
PrevValue: prevVal,
})
p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, alloc)
}
func (p *ScheduledProcessor) publishDeleteRange(
ctx context.Context,
startKey, endKey roachpb.Key,
timestamp hlc.Timestamp,
alloc *SharedBudgetAllocation,
) {
span := roachpb.Span{Key: startKey, EndKey: endKey}
if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) {
log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span)
}
var event kvpb.RangeFeedEvent
event.MustSetValue(&kvpb.RangeFeedDeleteRange{
Span: span,
Timestamp: timestamp,
})
p.reg.PublishToOverlapping(ctx, span, &event, alloc)
}
func (p *ScheduledProcessor) publishSSTable(
ctx context.Context,
sst []byte,
sstSpan roachpb.Span,
sstWTS hlc.Timestamp,
alloc *SharedBudgetAllocation,
) {
if sstSpan.Equal(roachpb.Span{}) {
panic(errors.AssertionFailedf("received SSTable without span"))
}
if sstWTS.IsEmpty() {
panic(errors.AssertionFailedf("received SSTable without write timestamp"))
}
p.reg.PublishToOverlapping(ctx, sstSpan, &kvpb.RangeFeedEvent{
SST: &kvpb.RangeFeedSSTable{
Data: sst,
Span: sstSpan,
WriteTS: sstWTS,
},
}, alloc)
}
func (p *ScheduledProcessor) publishCheckpoint(ctx context.Context) {
// TODO(nvanbenschoten): persist resolvedTimestamp. Give Processor a client.DB.
// TODO(nvanbenschoten): rate limit these? send them periodically?
event := p.newCheckpointEvent()
p.reg.PublishToOverlapping(ctx, all, event, nil)
}
func (p *ScheduledProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent {
// Create a RangeFeedCheckpoint over the Processor's entire span. Each
// individual registration will trim this down to just the key span that
// it is listening on in registration.maybeStripEvent before publishing.
var event kvpb.RangeFeedEvent
event.MustSetValue(&kvpb.RangeFeedCheckpoint{
Span: p.Span.AsRawSpanWithNoLocals(),
ResolvedTS: p.rts.Get(),
})
return &event
}
// ID implements Processor interface.
func (p *ScheduledProcessor) ID() int64 {
return p.scheduler.ID()
}