-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathfrontier.go
845 lines (732 loc) · 24.8 KB
/
frontier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package span
import (
"container/heap"
"fmt"
"strings"
"sync"
// Needed for roachpb.Span.String().
_ "github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)
// Frontier tracks the minimum timestamp of a set of spans.
// Frontier is not safe for concurrent modification, but MakeConcurrentFrontier
// can be used to make thread safe btreeFrontier.
type Frontier interface {
// AddSpansAt adds the provided spans to the btreeFrontier at the provided timestamp.
// If the span overlaps any spans already tracked by the frontier, the tree is adjusted
// to hold union of the span and the overlaps, with all entries assigned startAt starting
// timestamp.
AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error
// Frontier returns the minimum timestamp being tracked.
Frontier() hlc.Timestamp
// PeekFrontierSpan returns one of the spans at the Frontier.
PeekFrontierSpan() roachpb.Span
// Forward advances the timestamp for a span. Any part of the span that doesn't
// overlap the tracked span set will be ignored. True is returned if the
// frontier advanced as a result.
Forward(span roachpb.Span, ts hlc.Timestamp) (bool, error)
// Release removes all items from the frontier. In doing so, it allows memory
// held by the frontier to be recycled. Failure to call this method before
// letting a frontier be GCed is safe in that it won't cause a memory leak,
// but it will prevent frontier nodes from being efficiently re-used.
Release()
// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
Entries(fn Operation)
// SpanEntries invokes op for each sub-span of the specified span with the
// timestamp as observed by this frontier.
//
// Time
// 5| .b__c .
// 4| . h__k .
// 3| . e__f .
// 1 ---a----------------------m---q-- Frontier
//
// |___________span___________|
//
// In the above example, frontier tracks [b, m) and the current frontier
// timestamp is 1. SpanEntries for span [a-q) will invoke op with:
//
// ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1).
//
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
SpanEntries(span roachpb.Span, op Operation)
// Len returns the number of spans tracked by the frontier.
Len() int
// String returns string representation of this fFrontier.
String() string
}
// OpResult is the result of the Operation callback.
type OpResult bool
const (
// ContinueMatch signals DoMatching should continue.
ContinueMatch OpResult = false
// StopMatch signals DoMatching should stop.
StopMatch OpResult = true
)
func (r OpResult) asBool() bool {
return bool(r)
}
// An Operation is a function that operates on a frontier spans. If done is returned true, the
// Operation is indicating that no further work needs to be done and so the DoMatching function
// should traverse no further.
type Operation func(roachpb.Span, hlc.Timestamp) (done OpResult)
var useBtreeFrontier = envutil.EnvOrDefaultBool("COCKROACH_BTREE_SPAN_FRONTIER_ENABLED",
util.ConstantWithMetamorphicTestBool("COCKROACH_BTREE_SPAN_FRONTIER_ENABLED", true))
func enableBtreeFrontier(enabled bool) func() {
old := useBtreeFrontier
useBtreeFrontier = enabled
return func() {
useBtreeFrontier = old
}
}
func newFrontier() Frontier {
if useBtreeFrontier {
return &btreeFrontier{}
}
return &llrbFrontier{tree: interval.NewTree(interval.ExclusiveOverlapper)}
}
// MakeFrontier returns a Frontier that tracks the given set of spans.
// Each span timestamp initialized at 0.
func MakeFrontier(spans ...roachpb.Span) (Frontier, error) {
return MakeFrontierAt(hlc.Timestamp{}, spans...)
}
// MakeFrontierAt returns a Frontier that tracks the given set of spans.
// Each span timestamp initialized at specified start time.
func MakeFrontierAt(startAt hlc.Timestamp, spans ...roachpb.Span) (Frontier, error) {
f := newFrontier()
if err := f.AddSpansAt(startAt, spans...); err != nil {
return nil, err
}
return f, nil
}
// MakeConcurrentFrontier wraps provided frontier to make it safe to use concurrently.
func MakeConcurrentFrontier(f Frontier) Frontier {
return &concurrentFrontier{f: f}
}
// btreeFrontier is a btree based implementation of Frontier.
type btreeFrontier struct {
// tree contains `*btreeFrontierEntry` items for the entire currently tracked
// span set. Any tracked spans that have never been `Forward`ed will have a
// zero timestamp. If any entries needed to be split along a tracking
// boundary, this has already been done by `forward` before it entered the
// tree.
tree btree
// minHeap contains the same `*btreeFrontierEntry` items as `tree`. Entries
// in the heap are sorted first by minimum timestamp and then by lesser
// start key.
minHeap frontierHeap
idAlloc uint64
mergeAlloc []*btreeFrontierEntry // Amortize allocations.
}
// btreeFrontierEntry represents a timestamped span. It is used as the nodes in both
// the tree and heap needed to keep the Frontier.
// btreeFrontierEntry implements interval/generic interface.
type btreeFrontierEntry struct {
Start, End roachpb.Key
id uint64
ts hlc.Timestamp
// The heapIdx of the item in the frontierHeap, maintained by the
// heap.Interface methods.
heapIdx int
// spanCopy contains a copy of the user provided span.
// This is used only under test to detect frontier mis-uses when
// the caller mutates span keys after adding those spans to this frontier.
spanCopy roachpb.Span
}
//go:generate ../interval/generic/gen.sh *frontierEntry span
// AddSpansAt adds the provided spans to the btreeFrontier at the provided timestamp.
// AddSpans assumes (and verifies under test) that the spans being added do not
// overlap spans already tracked by this btreeFrontier.
//
// NB: It is *extremely* important for the caller to guarantee that the passed
// in spans (the underlying Key/EndKey []byte slices) are not modified in any
// way after this call. If modifications are made to the underlying key slices
// after the spans are added, the results are undefined -- anything from panic
// to infinite loops are possible. While this warning is scary, as it should be,
// the reality is that all callers so far, use the spans that come in from
// external source (an iterator, or RPC), and none of these callers ever modify
// the underlying keys. If the caller has to modify underlying key slices, they
// must pass in the copy.
func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) (retErr error) {
if expensiveChecksEnabled() {
defer func() {
if err := f.checkUnsafeKeyModification(); err != nil {
retErr = errors.CombineErrors(retErr, err)
}
}()
}
collectOverlaps := func(s roachpb.Span, sg *roachpb.SpanGroup) (overlaps []*btreeFrontierEntry) {
key := newSearchKey(s.Key, s.EndKey)
defer putFrontierEntry(key)
it := f.tree.MakeIter()
for it.FirstOverlap(key); it.Valid(); it.NextOverlap(key) {
overlaps = append(overlaps, it.Cur())
}
return overlaps
}
for _, s := range spans {
// Validate caller provided span.
if err := checkSpan(s); err != nil {
return err
}
var sg roachpb.SpanGroup
sg.Add(s)
for _, o := range collectOverlaps(s, &sg) {
if err := f.deleteEntry(o); err != nil {
return err
}
}
if err := sg.ForEach(func(span roachpb.Span) error {
e := newFrontierEntry(&f.idAlloc, span.Key, span.EndKey, startAt)
return f.setEntry(e)
}); err != nil {
return err
}
}
return nil
}
// Release removes all items from the btreeFrontier. In doing so, it allows memory
// held by the btreeFrontier to be recycled. Failure to call this method before
// letting a btreeFrontier be GCed is safe in that it won't cause a memory leak,
// but it will prevent btreeFrontier nodes from being efficiently re-used.
func (f *btreeFrontier) Release() {
it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
putFrontierEntry(it.Cur())
}
f.tree.Reset()
}
// Frontier returns the minimum timestamp being tracked.
func (f *btreeFrontier) Frontier() hlc.Timestamp {
if f.minHeap.Len() == 0 {
return hlc.Timestamp{}
}
return f.minHeap[0].ts
}
// PeekFrontierSpan returns one of the spans at the Frontier.
func (f *btreeFrontier) PeekFrontierSpan() roachpb.Span {
if f.minHeap.Len() == 0 {
return roachpb.Span{}
}
return f.minHeap[0].span()
}
// Forward advances the timestamp for a span. Any part of the span that doesn't
// overlap the tracked span set will be ignored. True is returned if the
// frontier advanced as a result.
//
// Note that internally, it may be necessary to use multiple entries to
// represent this timestamped span (e.g. if it overlaps with the tracked span
// set boundary). Similarly, an entry created by a previous Forward may be
// partially overlapped and have to be split into two entries.
//
// NB: it is unsafe for the caller to modify the keys in the provided span after this
// call returns.
func (f *btreeFrontier) Forward(
span roachpb.Span, ts hlc.Timestamp,
) (forwarded bool, retErr error) {
// Validate caller provided span.
if err := checkSpan(span); err != nil {
return false, err
}
if expensiveChecksEnabled() {
defer func() {
if err := f.checkUnsafeKeyModification(); err != nil {
retErr = errors.CombineErrors(retErr, err)
}
}()
}
prevFrontier := f.Frontier()
if err := f.forward(span, ts); err != nil {
return false, err
}
return prevFrontier.Less(f.Frontier()), nil
}
// clone augments generated iterStack code to support cloning.
func (is *iterStack) clone() iterStack {
c := *is
c.s = append([]iterFrame(nil), is.s...) // copy stack.
return c
}
// clone augments generated iterator code to support cloning.
func (i *iterator) clone() iterator {
c := *i
c.s = i.s.clone() // copy stack.
return c
}
// mergeEntries searches for the entries to the left and to the right
// of the input entry that are contiguous to the entry range and have the same timestamp.
// Updates btree to include single merged entry.
// Any existing tree iterators are invalid after this call.
// Returns btreeFrontierEntry that replaced passed in entry.
func (f *btreeFrontier) mergeEntries(e *btreeFrontierEntry) (*btreeFrontierEntry, error) {
defer func() {
f.mergeAlloc = f.mergeAlloc[:0]
}()
// First, position iterator at e.
pos := f.tree.MakeIter()
pos.SeekGE(e)
if !pos.Valid() || pos.Cur() != e {
return nil, errors.AssertionFailedf("failed to find entry %s in btree", e)
}
// Now, search for contiguous spans to the left of e.
leftMost := e
leftIter := pos.clone()
for leftIter.Prev(); leftIter.Valid(); leftIter.Prev() {
if !(leftIter.Cur().End.Equal(leftMost.Start) && leftIter.Cur().ts.Equal(e.ts)) {
break
}
f.mergeAlloc = append(f.mergeAlloc, leftIter.Cur())
leftMost = leftIter.Cur()
}
end := leftMost.End
if leftMost != e {
// We found ranges to the left of e that have the same timestamp.
// That means that we'll merge entries into leftMost, and we will
// also subsume e itself.
f.mergeAlloc[len(f.mergeAlloc)-1] = e
end = e.End
}
// Now, continue to the right of e.
rightIter := pos.clone()
for rightIter.Next(); rightIter.Valid(); rightIter.Next() {
if !(rightIter.Cur().Start.Equal(end) && rightIter.Cur().ts.Equal(e.ts)) {
break
}
end = rightIter.Cur().End
f.mergeAlloc = append(f.mergeAlloc, rightIter.Cur())
}
// Delete entries first, before updating leftMost boundaries since doing so
// will mess up btree.
for _, toRemove := range f.mergeAlloc {
if err := f.deleteEntry(toRemove); err != nil {
return nil, err
}
}
leftMost.End = end
if expensiveChecksEnabled() {
leftMost.spanCopy.EndKey = append(roachpb.Key{}, end...)
}
return leftMost, nil
}
// setEntry adds entry to the tree and to the heap.
func (f *btreeFrontier) setEntry(e *btreeFrontierEntry) error {
if expensiveChecksEnabled() {
if err := checkSpan(e.span()); err != nil {
return err
}
}
f.tree.Set(e)
heap.Push(&f.minHeap, e)
return nil
}
// deleteEntry removes entry from the tree and the heap, and releases this entry
// into the pool.
func (f *btreeFrontier) deleteEntry(e *btreeFrontierEntry) error {
defer putFrontierEntry(e)
if expensiveChecksEnabled() {
if err := checkSpan(e.span()); err != nil {
return err
}
}
heap.Remove(&f.minHeap, e.heapIdx)
f.tree.Delete(e)
return nil
}
// splitEntryAt splits entry at specified split point.
// Returns left and right entries.
// Any existing tree iterators are invalid after this call.
func (f *btreeFrontier) splitEntryAt(
e *btreeFrontierEntry, split roachpb.Key,
) (left, right *btreeFrontierEntry, err error) {
if expensiveChecksEnabled() {
if !e.span().ContainsKey(split) {
return nil, nil, errors.AssertionFailedf(
"split key %s is not contained by %s", split, e.span())
}
}
right = newFrontierEntry(&f.idAlloc, split, e.End, e.ts)
// Adjust e boundary before we add right (so that there is no overlap in the tree).
e.End = split
if expensiveChecksEnabled() {
e.spanCopy.EndKey = append(roachpb.Key{}, split...)
}
if err := f.setEntry(right); err != nil {
putFrontierEntry(right)
return nil, nil, err
}
return e, right, nil
}
// forward is the work horse of the btreeFrontier. It forwards the timestamp
// for the specified span, splitting, and merging btreeFrontierEntries as needed.
func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error {
todoEntry := newSearchKey(span.Key, span.EndKey)
defer putFrontierEntry(todoEntry)
// forwardEntryTimestamp forwards timestamp to insertTS, and updates
// tree to merge contiguous spans with the same timestamp (if possible).
forwardEntryTimestamp := func(e *btreeFrontierEntry) (*btreeFrontierEntry, error) {
e.ts = insertTS
heap.Fix(&f.minHeap, e.heapIdx)
return f.mergeEntries(e)
}
it := f.tree.MakeIter()
for !todoEntry.isEmptyRange() { // Keep going as long as there is work to be done.
if expensiveChecksEnabled() {
if err := checkSpan(todoEntry.span()); err != nil {
return err
}
}
// Seek to the first entry overlapping todoEntry.
it.FirstOverlap(todoEntry)
if !it.Valid() {
break
}
overlap := it.Cur()
// Invariant (a): todoEntry.Start must be after overlap.Start.
// Trim todoEntry if it falls outside the span(s) tracked by this btreeFrontier.
// This establishes the invariant that overlap start must be at or before todoEntry start.
if todoEntry.Start.Compare(overlap.Start) < 0 {
todoEntry.Start = overlap.Start
if todoEntry.isEmptyRange() {
break
}
}
// Fast case: we already recorded higher timestamp for this overlap.
if insertTS.Less(overlap.ts) {
todoEntry.Start = overlap.End
continue
}
// Fast case: we expect that most of the time, we forward timestamp for
// stable ranges -- that is, we expect range split/merge are not that common.
// As such, if the overlap range exactly matches todoEntry, we can simply
// update overlap timestamp and be done.
if overlap.span().Equal(todoEntry.span()) {
if _, err := forwardEntryTimestamp(overlap); err != nil {
return err
}
break
}
// At this point, we know that overlap timestamp is not ahead of the
// insertTS (otherwise we'd hit fast case above).
// We need to split overlap range into multiple parts.
// 1. Possibly isEmptyRange part before todoEntry.Start
// 2. Middle part (with updated timestamp),
// 3. Possibly isEmptyRange part after todoEntry end.
if overlap.Start.Compare(todoEntry.Start) < 0 {
// Split overlap into 2 entries
// [overlap.Start, todoEntry.Start) and [todoEntry.Start, overlap.End)
// Invariant (b): after this step, overlap is split into 2 parts. The right
// part starts at todoEntry.Start.
_, _, err := f.splitEntryAt(overlap, todoEntry.Start)
if err != nil {
return err
}
continue
}
// NB: overlap.Start must be equal to todoEntry.Start (established by Invariant (a) and (b) above).
if expensiveChecksEnabled() && !overlap.Start.Equal(todoEntry.Start) {
return errors.AssertionFailedf("expected overlap %s to start at %s", overlap, todoEntry)
}
switch cmp := todoEntry.End.Compare(overlap.End); {
case cmp < 0:
// Our todoEntry ends before the overlap ends.
// Split overlap into 2 entries:
// [overlap.Start, todoEntry.End) and [todoEntry.End, overlap.End)
// Left entry can reuse overlap with insertTS.
left, right, err := f.splitEntryAt(overlap, todoEntry.End)
if err != nil {
return err
}
todoEntry.Start = right.End
// The left part advances its timestamp.
if _, err := forwardEntryTimestamp(left); err != nil {
return err
}
case cmp >= 0:
// todoEntry ends at or beyond overlap. Regardless, we can simply update overlap
// and if needed, continue matching remaining todoEntry (if any).
fwd, err := forwardEntryTimestamp(overlap)
if err != nil {
return err
}
todoEntry.Start = fwd.End
}
}
return nil
}
// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
func (f *btreeFrontier) Entries(fn Operation) {
it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
if fn(it.Cur().span(), it.Cur().ts) == StopMatch {
break
}
}
}
// SpanEntries invokes op for each sub-span of the specified span with the
// timestamp as observed by this frontier.
//
// Time
// 5| .b__c .
// 4| . h__k .
// 3| . e__f .
// 1 ---a----------------------m---q-- Frontier
//
// |___________span___________|
//
// In the above example, frontier tracks [b, m) and the current frontier
// timestamp is 1. SpanEntries for span [a-q) will invoke op with:
//
// ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1).
//
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
func (f *btreeFrontier) SpanEntries(span roachpb.Span, op Operation) {
todoRange := newSearchKey(span.Key, span.EndKey)
defer putFrontierEntry(todoRange)
it := f.tree.MakeIter()
for it.FirstOverlap(todoRange); it.Valid(); it.NextOverlap(todoRange) {
e := it.Cur()
// Skip untracked portion.
if todoRange.Start.Compare(e.Start) < 0 {
todoRange.Start = e.Start
}
end := e.End
if e.End.Compare(todoRange.End) > 0 {
end = todoRange.End
}
if op(roachpb.Span{Key: todoRange.Start, EndKey: end}, e.ts) == StopMatch {
return
}
todoRange.Start = end
}
}
// String implements Stringer.
func (f *btreeFrontier) String() string {
var buf strings.Builder
it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
if buf.Len() != 0 {
buf.WriteString(` `)
}
buf.WriteString(it.Cur().String())
}
return buf.String()
}
// Len implements Frontier.
func (f *btreeFrontier) Len() int {
return f.tree.Len()
}
func (e *btreeFrontierEntry) ID() uint64 {
return e.id
}
func (e *btreeFrontierEntry) Key() []byte {
return e.Start
}
func (e *btreeFrontierEntry) EndKey() []byte {
return e.End
}
func (e *btreeFrontierEntry) New() *btreeFrontierEntry {
return &btreeFrontierEntry{}
}
func (e *btreeFrontierEntry) SetID(id uint64) {
e.id = id
}
func (e *btreeFrontierEntry) SetKey(k []byte) {
e.Start = k
}
func (e *btreeFrontierEntry) SetEndKey(k []byte) {
e.End = k
}
func (e *btreeFrontierEntry) String() string {
return fmt.Sprintf("[%s@%s]", e.span(), e.ts)
}
func (e *btreeFrontierEntry) span() roachpb.Span {
return roachpb.Span{Key: e.Start, EndKey: e.End}
}
// isEmptyRange returns true if btreeFrontier entry range is empty.
func (e *btreeFrontierEntry) isEmptyRange() bool {
return e.Start.Compare(e.End) >= 0
}
// frontierHeap implements heap.Interface and holds `btreeFrontierEntry`s. Entries
// are sorted based on their timestamp such that the oldest will rise to the top
// of the heap.
type frontierHeap []*btreeFrontierEntry
// Len implements heap.Interface.
func (h frontierHeap) Len() int { return len(h) }
// Less implements heap.Interface.
func (h frontierHeap) Less(i, j int) bool {
if h[i].ts.EqOrdering(h[j].ts) {
return h[i].Start.Compare(h[j].Start) < 0
}
return h[i].ts.Less(h[j].ts)
}
// Swap implements heap.Interface.
func (h frontierHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIdx, h[j].heapIdx = i, j
}
// Push implements heap.Interface.
func (h *frontierHeap) Push(x interface{}) {
n := len(*h)
entry := x.(*btreeFrontierEntry)
entry.heapIdx = n
*h = append(*h, entry)
}
// Pop implements heap.Interface.
func (h *frontierHeap) Pop() interface{} {
old := *h
n := len(old)
entry := old[n-1]
entry.heapIdx = -1 // for safety
old[n-1] = nil // for gc
*h = old[0 : n-1]
return entry
}
// newFrontierEntry/putFrontierEntry provide access to pooled *btreeFrontierEntry.
var newFrontierEntry, putFrontierEntry = func() (
func(id *uint64, start, end roachpb.Key, ts hlc.Timestamp) *btreeFrontierEntry,
func(e *btreeFrontierEntry),
) {
entryPool := sync.Pool{New: func() any { return new(btreeFrontierEntry) }}
newEntry := func(idAlloc *uint64, start, end roachpb.Key, ts hlc.Timestamp) *btreeFrontierEntry {
e := entryPool.Get().(*btreeFrontierEntry)
var id uint64
if idAlloc != nil {
id = *idAlloc
*idAlloc++
}
*e = btreeFrontierEntry{
Start: start,
End: end,
id: id,
ts: ts,
heapIdx: -1,
}
if expensiveChecksEnabled() {
e.spanCopy.Key = append(e.spanCopy.Key, start...)
e.spanCopy.EndKey = append(e.spanCopy.EndKey, end...)
}
return e
}
putEntry := func(e *btreeFrontierEntry) {
e.Start = nil
e.End = nil
e.spanCopy.Key = nil
e.spanCopy.EndKey = nil
entryPool.Put(e)
}
return newEntry, putEntry
}()
// newSearchKey returns btreeFrontierEntry that can be used to search/seek
// in the btree.
var newSearchKey = func(start, end roachpb.Key) *btreeFrontierEntry {
return newFrontierEntry(nil, start, end, hlc.Timestamp{})
}
// checkSpan validates span.
func checkSpan(s roachpb.Span) error {
switch s.Key.Compare(s.EndKey) {
case 1:
return errors.Wrapf(interval.ErrInvertedRange, "inverted span %s", s)
case 0:
if len(s.Key) == 0 && len(s.EndKey) == 0 {
return errors.Wrapf(interval.ErrNilRange, "nil span %s", s)
}
return errors.Wrapf(interval.ErrEmptyRange, "empty range %s", s)
default:
return nil
}
}
// checkUnsafeKeyModification is an expensive check performed under tests
// to verify that the caller did not mutate span keys after adding/forwarding them.
func (f *btreeFrontier) checkUnsafeKeyModification() error {
it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
cur := it.Cur()
if !cur.Start.Equal(cur.spanCopy.Key) || !cur.End.Equal(cur.spanCopy.EndKey) {
return errors.Newf("unsafe span key modification: was %s, now %s", cur.spanCopy, cur.span())
}
}
return nil
}
var disableSanityChecksForBenchmark bool
func expensiveChecksEnabled() bool {
return buildutil.CrdbTestBuild && !disableSanityChecksForBenchmark
}
type concurrentFrontier struct {
syncutil.Mutex
f Frontier
}
var _ Frontier = (*concurrentFrontier)(nil)
// AddSpansAt implements Frontier.
func (f *concurrentFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error {
f.Lock()
defer f.Unlock()
return f.f.AddSpansAt(startAt, spans...)
}
// Frontier implements Frontier.
func (f *concurrentFrontier) Frontier() hlc.Timestamp {
f.Lock()
defer f.Unlock()
return f.f.Frontier()
}
// PeekFrontierSpan implements Frontier.
func (f *concurrentFrontier) PeekFrontierSpan() roachpb.Span {
f.Lock()
defer f.Unlock()
return f.f.PeekFrontierSpan()
}
// Forward implements Frontier.
func (f *concurrentFrontier) Forward(span roachpb.Span, ts hlc.Timestamp) (bool, error) {
f.Lock()
defer f.Unlock()
return f.f.Forward(span, ts)
}
// Release implements Frontier.
func (f *concurrentFrontier) Release() {
f.Lock()
defer f.Unlock()
f.f.Release()
}
// Entries implements Frontier.
func (f *concurrentFrontier) Entries(fn Operation) {
f.Lock()
defer f.Unlock()
f.f.Entries(fn)
}
// SpanEntries implements Frontier.
func (f *concurrentFrontier) SpanEntries(span roachpb.Span, op Operation) {
f.Lock()
defer f.Unlock()
f.f.SpanEntries(span, op)
}
// Len implements Frontier.
func (f *concurrentFrontier) Len() int {
f.Lock()
defer f.Unlock()
return f.f.Len()
}
// String implements Frontier.
func (f *concurrentFrontier) String() string {
f.Lock()
defer f.Unlock()
return f.f.String()
}