forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
table_stats.go
995 lines (931 loc) · 36.7 KB
/
table_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"fmt"
"math"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/sstable"
)
// In-memory statistics about tables help inform compaction picking, but may
// be expensive to calculate or load from disk. Every time a database is
// opened, these statistics must be reloaded or recalculated. To minimize
// impact on user activity and compactions, we load these statistics
// asynchronously in the background and store loaded statistics in each
// table's *FileMetadata.
//
// This file implements the asynchronous loading of statistics by maintaining
// a list of files that require statistics, alongside their LSM levels.
// Whenever new files are added to the LSM, the files are appended to
// d.mu.tableStats.pending. If a stats collection job is not currently
// running, one is started in a separate goroutine.
//
// The stats collection job grabs and clears the pending list, computes table
// statistics relative to the current readState and updates the tables' file
// metadata. New pending files may accumulate during a stats collection job,
// so a completing job triggers a new job if necessary. Only one job runs at a
// time.
//
// When an existing database is opened, all files lack in-memory statistics.
// These files' stats are loaded incrementally whenever the pending list is
// empty by scanning a current readState for files missing statistics. Once a
// job completes a scan without finding any remaining files without
// statistics, it flips a `loadedInitial` flag. From then on, the stats
// collection job only needs to load statistics for new files appended to the
// pending list.
func (d *DB) maybeCollectTableStatsLocked() {
if d.shouldCollectTableStatsLocked() {
go d.collectTableStats()
}
}
// updateTableStatsLocked is called when new files are introduced, after the
// read state has been updated. It may trigger a new stat collection.
// DB.mu must be locked when calling.
func (d *DB) updateTableStatsLocked(newFiles []manifest.NewFileEntry) {
var needStats bool
for _, nf := range newFiles {
if !nf.Meta.StatsValid() {
needStats = true
break
}
}
if !needStats {
return
}
d.mu.tableStats.pending = append(d.mu.tableStats.pending, newFiles...)
d.maybeCollectTableStatsLocked()
}
func (d *DB) shouldCollectTableStatsLocked() bool {
return !d.mu.tableStats.loading &&
d.closed.Load() == nil &&
!d.opts.private.disableTableStats &&
(len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
}
// collectTableStats runs a table stats collection job, returning true if the
// invocation did the collection work, false otherwise (e.g. if another job was
// already running).
func (d *DB) collectTableStats() bool {
const maxTableStatsPerScan = 50
d.mu.Lock()
if !d.shouldCollectTableStatsLocked() {
d.mu.Unlock()
return false
}
pending := d.mu.tableStats.pending
d.mu.tableStats.pending = nil
d.mu.tableStats.loading = true
jobID := d.mu.nextJobID
d.mu.nextJobID++
loadedInitial := d.mu.tableStats.loadedInitial
// Drop DB.mu before performing IO.
d.mu.Unlock()
// Every run of collectTableStats either collects stats from the pending
// list (if non-empty) or from scanning the version (loadedInitial is
// false). This job only runs if at least one of those conditions holds.
// Grab a read state to scan for tables.
rs := d.loadReadState()
var collected []collectedStats
var hints []deleteCompactionHint
if len(pending) > 0 {
collected, hints = d.loadNewFileStats(rs, pending)
} else {
var moreRemain bool
var buf [maxTableStatsPerScan]collectedStats
collected, hints, moreRemain = d.scanReadStateTableStats(rs, buf[:0])
loadedInitial = !moreRemain
}
rs.unref()
// Update the FileMetadata with the loaded stats while holding d.mu.
d.mu.Lock()
defer d.mu.Unlock()
d.mu.tableStats.loading = false
if loadedInitial && !d.mu.tableStats.loadedInitial {
d.mu.tableStats.loadedInitial = loadedInitial
d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
JobID: jobID,
})
}
maybeCompact := false
for _, c := range collected {
c.fileMetadata.Stats = c.TableStats
maybeCompact = maybeCompact || c.TableStats.RangeDeletionsBytesEstimate > 0
c.fileMetadata.StatsMarkValid()
}
d.mu.tableStats.cond.Broadcast()
d.maybeCollectTableStatsLocked()
if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
// Verify that all of the hint tombstones' files still exist in the
// current version. Otherwise, the tombstone itself may have been
// compacted into L6 and more recent keys may have had their sequence
// numbers zeroed.
//
// Note that it's possible that the tombstone file is being compacted
// presently. In that case, the file will be present in v. When the
// compaction finishes compacting the tombstone file, it will detect
// and clear the hint.
//
// See DB.maybeUpdateDeleteCompactionHints.
v := d.mu.versions.currentVersion()
keepHints := hints[:0]
for _, h := range hints {
if v.Contains(h.tombstoneLevel, d.cmp, h.tombstoneFile) {
keepHints = append(keepHints, h)
}
}
d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
}
if maybeCompact {
d.maybeScheduleCompaction()
}
return true
}
type collectedStats struct {
*fileMetadata
manifest.TableStats
}
func (d *DB) loadNewFileStats(
rs *readState, pending []manifest.NewFileEntry,
) ([]collectedStats, []deleteCompactionHint) {
var hints []deleteCompactionHint
collected := make([]collectedStats, 0, len(pending))
for _, nf := range pending {
// A file's stats might have been populated by an earlier call to
// loadNewFileStats if the file was moved.
// NB: We're not holding d.mu which protects f.Stats, but only
// collectTableStats updates f.Stats for active files, and we
// ensure only one goroutine runs it at a time through
// d.mu.tableStats.loading.
if nf.Meta.StatsValid() {
continue
}
// The file isn't guaranteed to still be live in the readState's
// version. It may have been deleted or moved. Skip it if it's not in
// the expected level.
if !rs.current.Contains(nf.Level, d.cmp, nf.Meta) {
continue
}
stats, newHints, err := d.loadTableStats(
rs.current, nf.Level,
nf.Meta.PhysicalMeta(),
)
if err != nil {
d.opts.EventListener.BackgroundError(err)
continue
}
// NB: We don't update the FileMetadata yet, because we aren't
// holding DB.mu. We'll copy it to the FileMetadata after we're
// finished with IO.
collected = append(collected, collectedStats{
fileMetadata: nf.Meta,
TableStats: stats,
})
hints = append(hints, newHints...)
}
return collected, hints
}
// scanReadStateTableStats is run by an active stat collection job when there
// are no pending new files, but there might be files that existed at Open for
// which we haven't loaded table stats.
func (d *DB) scanReadStateTableStats(
rs *readState, fill []collectedStats,
) ([]collectedStats, []deleteCompactionHint, bool) {
moreRemain := false
var hints []deleteCompactionHint
for l, levelMetadata := range rs.current.Levels {
iter := levelMetadata.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
// NB: We're not holding d.mu which protects f.Stats, but only the
// active stats collection job updates f.Stats for active files,
// and we ensure only one goroutine runs it at a time through
// d.mu.tableStats.loading. This makes it safe to read validity
// through f.Stats.ValidLocked despite not holding d.mu.
if f.StatsValid() {
continue
}
// Limit how much work we do per read state. The older the read
// state is, the higher the likelihood files are no longer being
// used in the current version. If we've exhausted our allowance,
// return true for the last return value to signal there's more
// work to do.
if len(fill) == cap(fill) {
moreRemain = true
return fill, hints, moreRemain
}
stats, newHints, err := d.loadTableStats(
rs.current, l, f.PhysicalMeta(),
)
if err != nil {
// Set `moreRemain` so we'll try again.
moreRemain = true
d.opts.EventListener.BackgroundError(err)
continue
}
fill = append(fill, collectedStats{
fileMetadata: f,
TableStats: stats,
})
hints = append(hints, newHints...)
}
}
return fill, hints, moreRemain
}
// loadTableStats currently only supports stats collection for physical
// sstables.
//
// TODO(bananabrick): Support stats collection for virtual sstables.
func (d *DB) loadTableStats(
v *version, level int, meta physicalMeta,
) (manifest.TableStats, []deleteCompactionHint, error) {
var stats manifest.TableStats
var compactionHints []deleteCompactionHint
err := d.tableCache.withReader(
meta, func(r *sstable.Reader) (err error) {
stats.NumEntries = r.Properties.NumEntries
stats.NumDeletions = r.Properties.NumDeletions
if r.Properties.NumPointDeletions() > 0 {
if err = d.loadTablePointKeyStats(r, v, level, meta, &stats); err != nil {
return
}
}
if r.Properties.NumRangeDeletions > 0 || r.Properties.NumRangeKeyDels > 0 {
if compactionHints, err = d.loadTableRangeDelStats(
r, v, level, meta, &stats,
); err != nil {
return
}
}
// TODO(travers): Once we have real-world data, consider collecting
// additional stats that may provide improved heuristics for compaction
// picking.
stats.NumRangeKeySets = r.Properties.NumRangeKeySets
stats.ValueBlocksSize = r.Properties.ValueBlocksSize
return
})
if err != nil {
return stats, nil, err
}
return stats, compactionHints, nil
}
// loadTablePointKeyStats calculates the point key statistics for the given
// table. The provided manifest.TableStats are updated.
func (d *DB) loadTablePointKeyStats(
r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats,
) error {
// TODO(jackson): If the file has a wide keyspace, the average
// value size beneath the entire file might not be representative
// of the size of the keys beneath the point tombstones.
// We could write the ranges of 'clusters' of point tombstones to
// a sstable property and call averageValueSizeBeneath for each of
// these narrower ranges to improve the estimate.
avgValSize, err := d.averagePhysicalValueSizeBeneath(v, level, meta)
if err != nil {
return err
}
stats.PointDeletionsBytesEstimate =
pointDeletionsBytesEstimate(meta.Size, &r.Properties, avgValSize)
return nil
}
// loadTableRangeDelStats calculates the range deletion and range key deletion
// statistics for the given table.
func (d *DB) loadTableRangeDelStats(
r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats,
) ([]deleteCompactionHint, error) {
iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta.FileMetadata)
if err != nil {
return nil, err
}
defer iter.Close()
var compactionHints []deleteCompactionHint
// We iterate over the defragmented range tombstones and range key deletions,
// which ensures we don't double count ranges deleted at different sequence
// numbers. Also, merging abutting tombstones reduces the number of calls to
// estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
// our overall estimate.
for s := iter.First(); s != nil; s = iter.Next() {
start, end := s.Start, s.End
// We only need to consider deletion size estimates for tables that contain
// RANGEDELs.
var maxRangeDeleteSeqNum uint64
for _, k := range s.Keys {
if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
maxRangeDeleteSeqNum = k.SeqNum()
break
}
}
// If the file is in the last level of the LSM, there is no data beneath
// it. The fact that there is still a range tombstone in a bottommost file
// indicates two possibilites:
// 1. an open snapshot kept the tombstone around, and the data the
// tombstone deletes is contained within the file itself.
// 2. the file was ingested.
// In the first case, we'd like to estimate disk usage within the file
// itself since compacting the file will drop that covered data. In the
// second case, we expect that compacting the file will NOT drop any
// data and rewriting the file is a waste of write bandwidth. We can
// distinguish these cases by looking at the file metadata's sequence
// numbers. A file's range deletions can only delete data within the
// file at lower sequence numbers. All keys in an ingested sstable adopt
// the same sequence number, preventing tombstones from deleting keys
// within the same file. We check here if the largest RANGEDEL sequence
// number is greater than the file's smallest sequence number. If it is,
// the RANGEDEL could conceivably (although inconclusively) delete data
// within the same file.
//
// Note that this heuristic is imperfect. If a table containing a range
// deletion is ingested into L5 and subsequently compacted into L6 but
// an open snapshot prevents elision of covered keys in L6, the
// resulting RangeDeletionsBytesEstimate will incorrectly include all
// covered keys.
//
// TODO(jackson): We could prevent the above error in the heuristic by
// computing the file's RangeDeletionsBytesEstimate during the
// compaction itself. It's unclear how common this is.
//
// NOTE: If the span `s` wholly contains a table containing range keys,
// the returned size estimate will be slightly inflated by the range key
// block. However, in practice, range keys are expected to be rare, and
// the size of the range key block relative to the overall size of the
// table is expected to be small.
if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
size, err := r.EstimateDiskUsage(start, end)
if err != nil {
return nil, err
}
stats.RangeDeletionsBytesEstimate += size
// As the file is in the bottommost level, there is no need to collect a
// deletion hint.
continue
}
// While the size estimates for point keys should only be updated if this
// span contains a range del, the sequence numbers are required for the
// hint. Unconditionally descend, but conditionally update the estimates.
hintType := compactionHintFromKeys(s.Keys)
estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(v, level, start, end, hintType)
if err != nil {
return nil, err
}
stats.RangeDeletionsBytesEstimate += estimate
// If any files were completely contained with the range,
// hintSeqNum is the smallest sequence number contained in any
// such file.
if hintSeqNum == math.MaxUint64 {
continue
}
hint := deleteCompactionHint{
hintType: hintType,
start: make([]byte, len(start)),
end: make([]byte, len(end)),
tombstoneFile: meta.FileMetadata,
tombstoneLevel: level,
tombstoneLargestSeqNum: s.LargestSeqNum(),
tombstoneSmallestSeqNum: s.SmallestSeqNum(),
fileSmallestSeqNum: hintSeqNum,
}
copy(hint.start, start)
copy(hint.end, end)
compactionHints = append(compactionHints, hint)
}
return compactionHints, err
}
func (d *DB) averagePhysicalValueSizeBeneath(
v *version, level int, meta physicalMeta,
) (avgValueSize uint64, err error) {
// Find all files in lower levels that overlap with meta,
// summing their value sizes and entry counts.
var fileSum, keySum, valSum, entryCount uint64
for l := level + 1; l < numLevels; l++ {
overlaps := v.Overlaps(l, d.cmp, meta.Smallest.UserKey,
meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
iter := overlaps.Iter()
for file := iter.First(); file != nil; file = iter.Next() {
var err error
if file.Virtual {
err = d.tableCache.withVirtualReader(
file.VirtualMeta(),
func(v sstable.VirtualReader) (err error) {
fileSum += file.Size
entryCount += file.Stats.NumEntries
keySum += v.Properties.RawKeySize
valSum += v.Properties.RawValueSize
return nil
})
} else {
err = d.tableCache.withReader(
file.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
fileSum += file.Size
entryCount += r.Properties.NumEntries
keySum += r.Properties.RawKeySize
valSum += r.Properties.RawValueSize
return nil
})
}
if err != nil {
return 0, err
}
}
}
if entryCount == 0 {
return 0, nil
}
// RawKeySize and RawValueSize are uncompressed totals. Scale the value sum
// according to the data size to account for compression, index blocks and
// metadata overhead. Eg:
//
// Compression rate × Average uncompressed value size
//
// ↓
//
// FileSize RawValueSize
// ----------------------- × ------------
// RawKeySize+RawValueSize NumEntries
uncompressedSum := float64(keySum + valSum)
compressionRatio := float64(fileSum) / uncompressedSum
avgCompressedValueSize := (float64(valSum) / float64(entryCount)) * compressionRatio
return uint64(avgCompressedValueSize), nil
}
func (d *DB) estimateReclaimedSizeBeneath(
v *version, level int, start, end []byte, hintType deleteCompactionHintType,
) (estimate uint64, hintSeqNum uint64, err error) {
// Find all files in lower levels that overlap with the deleted range
// [start, end).
//
// An overlapping file might be completely contained by the range
// tombstone, in which case we can count the entire file size in
// our estimate without doing any additional I/O.
//
// Otherwise, estimating the range for the file requires
// additional I/O to read the file's index blocks.
hintSeqNum = math.MaxUint64
for l := level + 1; l < numLevels; l++ {
overlaps := v.Overlaps(l, d.cmp, start, end, true /* exclusiveEnd */)
iter := overlaps.Iter()
for file := iter.First(); file != nil; file = iter.Next() {
startCmp := d.cmp(start, file.Smallest.UserKey)
endCmp := d.cmp(file.Largest.UserKey, end)
if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest.IsExclusiveSentinel()) {
// The range fully contains the file, so skip looking it up in table
// cache/looking at its indexes and add the full file size. Whether the
// disk estimate and hint seqnums are updated depends on a) the type of
// hint that requested the estimate and b) the keys contained in this
// current file.
var updateEstimates, updateHints bool
switch hintType {
case deleteCompactionHintTypePointKeyOnly:
// The range deletion byte estimates should only be updated if this
// table contains point keys. This ends up being an overestimate in
// the case that table also has range keys, but such keys are expected
// to contribute a negligible amount of the table's overall size,
// relative to point keys.
if file.HasPointKeys {
updateEstimates = true
}
// As the initiating span contained only range dels, hints can only be
// updated if this table does _not_ contain range keys.
if !file.HasRangeKeys {
updateHints = true
}
case deleteCompactionHintTypeRangeKeyOnly:
// The initiating span contained only range key dels. The estimates
// apply only to point keys, and are therefore not updated.
updateEstimates = false
// As the initiating span contained only range key dels, hints can
// only be updated if this table does _not_ contain point keys.
if !file.HasPointKeys {
updateHints = true
}
case deleteCompactionHintTypePointAndRangeKey:
// Always update the estimates and hints, as this hint type can drop a
// file, irrespective of the mixture of keys. Similar to above, the
// range del bytes estimates is an overestimate.
updateEstimates, updateHints = true, true
default:
panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
}
if updateEstimates {
estimate += file.Size
}
if updateHints && hintSeqNum > file.SmallestSeqNum {
hintSeqNum = file.SmallestSeqNum
}
} else if d.cmp(file.Smallest.UserKey, end) <= 0 && d.cmp(start, file.Largest.UserKey) <= 0 {
// Partial overlap.
if hintType == deleteCompactionHintTypeRangeKeyOnly {
// If the hint that generated this overlap contains only range keys,
// there is no need to calculate disk usage, as the reclaimable space
// is expected to be minimal relative to point keys.
continue
}
var size uint64
var err error
if file.Virtual {
err = d.tableCache.withVirtualReader(
file.VirtualMeta(), func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
})
} else {
err = d.tableCache.withReader(
file.PhysicalMeta(), func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
})
}
if err != nil {
return 0, hintSeqNum, err
}
estimate += size
}
}
}
return estimate, hintSeqNum, nil
}
func maybeSetStatsFromProperties(meta physicalMeta, props *sstable.Properties) bool {
// If a table contains range deletions or range key deletions, we defer the
// stats collection. There are two main reasons for this:
//
// 1. Estimating the potential for reclaimed space due to a range deletion
// tombstone requires scanning the LSM - a potentially expensive operation
// that should be deferred.
// 2. Range deletions and / or range key deletions present an opportunity to
// compute "deletion hints", which also requires a scan of the LSM to
// compute tables that would be eligible for deletion.
//
// These two tasks are deferred to the table stats collector goroutine.
if props.NumRangeDeletions != 0 || props.NumRangeKeyDels != 0 {
return false
}
// If a table is more than 10% point deletions, don't calculate the
// PointDeletionsBytesEstimate statistic using our limited knowledge. The
// table stats collector can populate the stats and calculate an average
// of value size of all the tables beneath the table in the LSM, which
// will be more accurate.
if props.NumDeletions > props.NumEntries/10 {
return false
}
var pointEstimate uint64
if props.NumEntries > 0 {
// Use the file's own average key and value sizes as an estimate. This
// doesn't require any additional IO and since the number of point
// deletions in the file is low, the error introduced by this crude
// estimate is expected to be small.
avgValSize := estimateValuePhysicalSize(meta.Size, props)
pointEstimate = pointDeletionsBytesEstimate(meta.Size, props, avgValSize)
}
meta.Stats.NumEntries = props.NumEntries
meta.Stats.NumDeletions = props.NumDeletions
meta.Stats.NumRangeKeySets = props.NumRangeKeySets
meta.Stats.PointDeletionsBytesEstimate = pointEstimate
meta.Stats.RangeDeletionsBytesEstimate = 0
meta.Stats.ValueBlocksSize = props.ValueBlocksSize
meta.StatsMarkValid()
return true
}
func pointDeletionsBytesEstimate(
fileSize uint64, props *sstable.Properties, avgValPhysicalSize uint64,
) (estimate uint64) {
if props.NumEntries == 0 {
return 0
}
numPointDels := props.NumPointDeletions()
if numPointDels == 0 {
return 0
}
// Estimate the potential space to reclaim using the table's own properties.
// There may or may not be keys covered by any individual point tombstone.
// If not, compacting the point tombstone into L6 will at least allow us to
// drop the point deletion key and will reclaim the tombstone's key bytes.
// If there are covered key(s), we also get to drop key and value bytes for
// each covered key.
//
// We estimate assuming that each point tombstone on average covers 1 key.
// This is almost certainly an overestimate, but that's probably okay
// because point tombstones can slow range iterations even when they don't
// cover a key. It may be beneficial in the future to more accurately
// estimate which tombstones cover keys and which do not.
// Calculate the contribution of the key sizes: 1x for the point tombstone
// itself and 1x for the key that's expected to exist lower in the LSM.
var keysLogicalSize uint64
if props.RawPointTombstoneKeySize > 0 {
// This table has a RawPointTombstoneKeySize property, so we know the
// exact size of the logical, uncompressed bytes of the point tombstone
// keys.
keysLogicalSize = 2 * props.RawPointTombstoneKeySize
} else {
// This sstable predates the existence of the RawPointTombstoneKeySize
// property. We can use the average key size within the file itself and
// the count of point deletions to estimate the size.
keysLogicalSize = 2 * numPointDels * props.RawKeySize / props.NumEntries
}
// Scale the logical key size by the logical:physical ratio of the file to
// account for compression, metadata overhead, etc.
//
// Physical FileSize
// ----------- = -----------------------
// Logical RawKeySize+RawValueSize
//
estimate += (keysLogicalSize * fileSize) / (props.RawKeySize + props.RawValueSize)
// Calculate the contribution of the deleted values. The caller has already
// computed an average physical size (possibly comptued across many
// sstables), so there's no need to scale it.
estimate += numPointDels * avgValPhysicalSize
return estimate
}
func estimateValuePhysicalSize(fileSize uint64, props *sstable.Properties) (avgValSize uint64) {
// RawKeySize and RawValueSize are uncompressed totals. Scale according to
// the data size to account for compression, index blocks and metadata
// overhead. Eg:
//
// Compression rate × Average uncompressed value size
//
// ↓
//
// FileSize RawValSize
// ----------------------- × ----------
// RawKeySize+RawValueSize NumEntries
//
uncompressedSum := props.RawKeySize + props.RawValueSize
compressionRatio := float64(fileSize) / float64(uncompressedSum)
avgCompressedValueSize := (float64(props.RawValueSize) / float64(props.NumEntries)) * compressionRatio
return uint64(avgCompressedValueSize)
}
// newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
// returns "ranged deletion" spans for a single table, providing a combined view
// of both range deletion and range key deletion spans. The
// tableRangedDeletionIter is intended for use in the specific case of computing
// the statistics and deleteCompactionHints for a single table.
//
// As an example, consider the following set of spans from the range deletion
// and range key blocks of a table:
//
// |---------| |---------| |-------| RANGEKEYDELs
// |-----------|-------------| |-----| RANGEDELs
// __________________________________________________________
// a b c d e f g h i j k l m n o p q r s t u v w x y z
//
// The tableRangedDeletionIter produces the following set of output spans, where
// '1' indicates a span containing only range deletions, '2' is a span
// containing only range key deletions, and '3' is a span containing a mixture
// of both range deletions and range key deletions.
//
// 1 3 1 3 2 1 3 2
// |-----|---------|-----|---|-----| |---|-|-----|
// __________________________________________________________
// a b c d e f g h i j k l m n o p q r s t u v w x y z
//
// Algorithm.
//
// The iterator first defragments the range deletion and range key blocks
// separately. During this defragmentation, the range key block is also filtered
// so that keys other than range key deletes are ignored. The range delete and
// range key delete keyspaces are then merged.
//
// Note that the only fragmentation introduced by merging is from where a range
// del span overlaps with a range key del span. Within the bounds of any overlap
// there is guaranteed to be no further fragmentation, as the constituent spans
// have already been defragmented. To the left and right of any overlap, the
// same reasoning applies. For example,
//
// |--------| |-------| RANGEKEYDEL
// |---------------------------| RANGEDEL
// |----1---|----3---|----1----|---2---| Merged, fragmented spans.
// __________________________________________________________
// a b c d e f g h i j k l m n o p q r s t u v w x y z
//
// Any fragmented abutting spans produced by the merging iter will be of
// differing types (i.e. a transition from a span with homogenous key kinds to a
// heterogeneous span, or a transition from a span with exclusively range dels
// to a span with exclusively range key dels). Therefore, further
// defragmentation is not required.
//
// Each span returned by the tableRangeDeletionIter will have at most four keys,
// corresponding to the largest and smallest sequence numbers encountered across
// the range deletes and range keys deletes that comprised the merged spans.
func newCombinedDeletionKeyspanIter(
comparer *base.Comparer, r *sstable.Reader, m *fileMetadata,
) (keyspan.FragmentIterator, error) {
// The range del iter and range key iter are each wrapped in their own
// defragmenting iter. For each iter, abutting spans can always be merged.
var equal = keyspan.DefragmentMethodFunc(func(_ base.Equal, a, b *keyspan.Span) bool { return true })
// Reduce keys by maintaining a slice of at most length two, corresponding to
// the largest and smallest keys in the defragmented span. This maintains the
// contract that the emitted slice is sorted by (SeqNum, Kind) descending.
reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
if len(current) == 0 && len(incoming) == 0 {
// While this should never occur in practice, a defensive return is used
// here to preserve correctness.
return current
}
var largest, smallest keyspan.Key
var set bool
for _, keys := range [2][]keyspan.Key{current, incoming} {
if len(keys) == 0 {
continue
}
first, last := keys[0], keys[len(keys)-1]
if !set {
largest, smallest = first, last
set = true
continue
}
if first.Trailer > largest.Trailer {
largest = first
}
if last.Trailer < smallest.Trailer {
smallest = last
}
}
if largest.Equal(comparer.Equal, smallest) {
current = append(current[:0], largest)
} else {
current = append(current[:0], largest, smallest)
}
return current
}
// The separate iters for the range dels and range keys are wrapped in a
// merging iter to join the keyspaces into a single keyspace. The separate
// iters are only added if the particular key kind is present.
mIter := &keyspan.MergingIter{}
var transform = keyspan.TransformerFunc(func(cmp base.Compare, in keyspan.Span, out *keyspan.Span) error {
if in.KeysOrder != keyspan.ByTrailerDesc {
panic("pebble: combined deletion iter encountered keys in non-trailer descending order")
}
out.Start, out.End = in.Start, in.End
out.Keys = append(out.Keys[:0], in.Keys...)
out.KeysOrder = keyspan.ByTrailerDesc
// NB: The order of by-trailer descending may have been violated,
// because we've layered rangekey and rangedel iterators from the same
// sstable into the same keyspan.MergingIter. The MergingIter will
// return the keys in the order that the child iterators were provided.
// Sort the keys to ensure they're sorted by trailer descending.
keyspan.SortKeysByTrailer(&out.Keys)
return nil
})
mIter.Init(comparer.Compare, transform, new(keyspan.MergingBuffers))
iter, err := r.NewRawRangeDelIter()
if err != nil {
return nil, err
}
if iter != nil {
dIter := &keyspan.DefragmentingIter{}
dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
iter = dIter
// Truncate tombstones to the containing file's bounds if necessary.
// See docs/range_deletions.md for why this is necessary.
iter = keyspan.Truncate(
comparer.Compare, iter, m.Smallest.UserKey, m.Largest.UserKey,
nil, nil, false, /* panicOnPartialOverlap */
)
mIter.AddLevel(iter)
}
iter, err = r.NewRawRangeKeyIter()
if err != nil {
return nil, err
}
if iter != nil {
// Wrap the range key iterator in a filter that elides keys other than range
// key deletions.
iter = keyspan.Filter(iter, func(in *keyspan.Span, out *keyspan.Span) (keep bool) {
out.Start, out.End = in.Start, in.End
out.Keys = out.Keys[:0]
for _, k := range in.Keys {
if k.Kind() != base.InternalKeyKindRangeKeyDelete {
continue
}
out.Keys = append(out.Keys, k)
}
return len(out.Keys) > 0
})
dIter := &keyspan.DefragmentingIter{}
dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
iter = dIter
mIter.AddLevel(iter)
}
return mIter, nil
}
// rangeKeySetsAnnotator implements manifest.Annotator, annotating B-Tree nodes
// with the sum of the files' counts of range key fragments. Its annotation type
// is a *uint64. The count of range key sets may change once a table's stats are
// loaded asynchronously, so its values are marked as cacheable only if a file's
// stats have been loaded.
type rangeKeySetsAnnotator struct{}
var _ manifest.Annotator = rangeKeySetsAnnotator{}
func (a rangeKeySetsAnnotator) Zero(dst interface{}) interface{} {
if dst == nil {
return new(uint64)
}
v := dst.(*uint64)
*v = 0
return v
}
func (a rangeKeySetsAnnotator) Accumulate(
f *fileMetadata, dst interface{},
) (v interface{}, cacheOK bool) {
vptr := dst.(*uint64)
*vptr = *vptr + f.Stats.NumRangeKeySets
return vptr, f.StatsValid()
}
func (a rangeKeySetsAnnotator) Merge(src interface{}, dst interface{}) interface{} {
srcV := src.(*uint64)
dstV := dst.(*uint64)
*dstV = *dstV + *srcV
return dstV
}
// countRangeKeySetFragments counts the number of RANGEKEYSET keys across all
// files of the LSM. It only counts keys in files for which table stats have
// been loaded. It uses a b-tree annotator to cache intermediate values between
// calculations when possible.
func countRangeKeySetFragments(v *version) (count uint64) {
for l := 0; l < numLevels; l++ {
if v.RangeKeyLevels[l].Empty() {
continue
}
count += *v.RangeKeyLevels[l].Annotation(rangeKeySetsAnnotator{}).(*uint64)
}
return count
}
// tombstonesAnnotator implements manifest.Annotator, annotating B-Tree nodes
// with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDELk
// eys). Its annotation type is a *uint64. The count of tombstones may change
// once a table's stats are loaded asynchronously, so its values are marked as
// cacheable only if a file's stats have been loaded.
type tombstonesAnnotator struct{}
var _ manifest.Annotator = tombstonesAnnotator{}
func (a tombstonesAnnotator) Zero(dst interface{}) interface{} {
if dst == nil {
return new(uint64)
}
v := dst.(*uint64)
*v = 0
return v
}
func (a tombstonesAnnotator) Accumulate(
f *fileMetadata, dst interface{},
) (v interface{}, cacheOK bool) {
vptr := dst.(*uint64)
*vptr = *vptr + f.Stats.NumDeletions
return vptr, f.StatsValid()
}
func (a tombstonesAnnotator) Merge(src interface{}, dst interface{}) interface{} {
srcV := src.(*uint64)
dstV := dst.(*uint64)
*dstV = *dstV + *srcV
return dstV
}
// countTombstones counts the number of tombstone (DEL, SINGLEDEL and RANGEDEL)
// internal keys across all files of the LSM. It only counts keys in files for
// which table stats have been loaded. It uses a b-tree annotator to cache
// intermediate values between calculations when possible.
func countTombstones(v *version) (count uint64) {
for l := 0; l < numLevels; l++ {
if v.Levels[l].Empty() {
continue
}
count += *v.Levels[l].Annotation(tombstonesAnnotator{}).(*uint64)
}
return count
}
// valueBlocksSizeAnnotator implements manifest.Annotator, annotating B-Tree
// nodes with the sum of the files' Properties.ValueBlocksSize. Its annotation
// type is a *uint64. The value block size may change once a table's stats are
// loaded asynchronously, so its values are marked as cacheable only if a
// file's stats have been loaded.
type valueBlocksSizeAnnotator struct{}
var _ manifest.Annotator = valueBlocksSizeAnnotator{}
func (a valueBlocksSizeAnnotator) Zero(dst interface{}) interface{} {
if dst == nil {
return new(uint64)
}
v := dst.(*uint64)
*v = 0
return v
}
func (a valueBlocksSizeAnnotator) Accumulate(
f *fileMetadata, dst interface{},
) (v interface{}, cacheOK bool) {
vptr := dst.(*uint64)
*vptr = *vptr + f.Stats.ValueBlocksSize
return vptr, f.StatsValid()
}
func (a valueBlocksSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
srcV := src.(*uint64)
dstV := dst.(*uint64)
*dstV = *dstV + *srcV
return dstV
}
// valueBlocksSizeForLevel returns the Properties.ValueBlocksSize across all
// files for a level of the LSM. It only includes the size for files for which
// table stats have been loaded. It uses a b-tree annotator to cache
// intermediate values between calculations when possible. It must not be
// called concurrently.
//
// REQUIRES: 0 <= level <= numLevels.
func valueBlocksSizeForLevel(v *version, level int) (count uint64) {
if v.Levels[level].Empty() {
return 0
}
return *v.Levels[level].Annotation(valueBlocksSizeAnnotator{}).(*uint64)
}