-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
pebble.go
3213 lines (2907 loc) · 111 KB
/
pebble.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/storage/pebbleiter"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/replay"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
humanize "github.com/dustin/go-humanize"
)
// UseEFOS controls whether uses of pebble Snapshots should use
// EventuallyFileOnlySnapshots instead. This reduces write-amp with the main
// tradeoff being higher space-amp. Note that UseExciseForSnapshot, if true,
// effectively causes EventuallyFileOnlySnapshots to be used as well.
//
// Note: Do NOT read this setting directly. Use ShouldUseEFOS() instead.
var UseEFOS = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.experimental.eventually_file_only_snapshots.enabled",
"set to false to disable eventually-file-only-snapshots (kv.snapshot_receiver.excise.enabled must also be false)",
metamorphic.ConstantWithTestBool(
"storage.experimental.eventually_file_only_snapshots.enabled", true), /* defaultValue */
settings.WithPublic)
// UseExciseForSnapshots controls whether virtual-sstable-based excises should
// be used instead of range deletions for clearing out replica contents as part
// of a rebalance/recovery snapshot application. Applied on the receiver side.
// Note that setting this setting to true also effectively causes UseEFOS above
// to become true. This interaction is why this setting is defined in the
// storage package even though it mostly affects KV.
var UseExciseForSnapshots = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.snapshot_receiver.excise.enabled",
"set to false to disable excises in place of range deletions for KV snapshots",
metamorphic.ConstantWithTestBool(
"kv.snapshot_receiver.excise.enabled", true), /* defaultValue */
settings.WithPublic,
)
// IngestSplitEnabled controls whether ingest-time splitting is enabled in
// Pebble. This feature allows for existing sstables to be split into multiple
// virtual sstables at ingest time if that allows for an ingestion sstable to go
// into a lower level than it would otherwise be in. No keys are masked with
// this split; it only happens if there are no keys in that existing sstable
// in the span of the incoming sstable.
var IngestSplitEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.ingest_split.enabled",
"set to false to disable ingest-time splitting that lowers write-amplification",
metamorphic.ConstantWithTestBool(
"storage.ingest_split.enabled", true), /* defaultValue */
settings.WithPublic,
)
// IngestAsFlushable controls whether ingested sstables that overlap the
// memtable may be lazily ingested: written to the WAL and enqueued in the list
// of flushables (eg, memtables, large batches and now lazily-ingested
// sstables). This only affects sstables that are ingested in the future. If a
// sstable was already lazily ingested but not flushed, a crash and subsequent
// recovery will still enqueue the sstables as flushable when the ingest's WAL
// entry is replayed.
//
// This cluster setting will be removed in a subsequent release.
var IngestAsFlushable = settings.RegisterBoolSetting(
settings.ApplicationLevel, // used to init temp storage in virtual cluster servers
"storage.ingest_as_flushable.enabled",
"set to true to enable lazy ingestion of sstables",
metamorphic.ConstantWithTestBool(
"storage.ingest_as_flushable.enabled", true))
// MinCapacityForBulkIngest is the fraction of remaining store capacity
// under which bulk ingestion requests are rejected.
var MinCapacityForBulkIngest = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.bulk_io_write.min_capacity_remaining_fraction",
"remaining store capacity fraction below which bulk ingestion requests are rejected",
0.05,
)
// BlockLoadConcurrencyLimit controls the maximum number of outstanding
// filesystem read operations for loading sstable blocks. This limit is a
// last-resort queueing mechanism to avoid memory issues or running against the
// Go OS system threads limit (see runtime.SetMaxThreads() with default value
// 10,000).
//
// The limit is distributed evenly between all stores (rounding up). This is to
// provide isolation between the stores - we don't want one bad disk blocking
// other stores.
var BlockLoadConcurrencyLimit = settings.RegisterIntSetting(
settings.ApplicationLevel, // used by temp storage as well
"storage.block_load.node_max_active",
"maximum number of outstanding sstable block reads per host",
7500,
settings.IntInRange(1, 9000),
)
// readaheadModeInformed controls the pebble.ReadaheadConfig.Informed setting.
//
// Note that the setting is taken into account when a table enters the Pebble
// table cache; it can take a while for an updated setting to take effect.
var readaheadModeInformed = settings.RegisterEnumSetting(
settings.ApplicationLevel, // used by temp storage as well
"storage.readahead_mode.informed",
"the readahead mode for operations which are known to read through large chunks of data; "+
"sys-readahead performs explicit prefetching via the readahead syscall; "+
"fadv-sequential lets the OS perform prefetching via fadvise(FADV_SEQUENTIAL)",
"fadv-sequential",
map[objstorageprovider.ReadaheadMode]string{
objstorageprovider.NoReadahead: "off",
objstorageprovider.SysReadahead: "sys-readahead",
objstorageprovider.FadviseSequential: "fadv-sequential",
},
)
// readaheadModeSpeculative controls the pebble.ReadaheadConfig.Speculative setting.
//
// Note that the setting is taken into account when a table enters the Pebble
// table cache; it can take a while for an updated setting to take effect.
var readaheadModeSpeculative = settings.RegisterEnumSetting(
settings.ApplicationLevel, // used by temp storage as well
"storage.readahead_mode.speculative",
"the readahead mode that is used automatically when sequential reads are detected; "+
"sys-readahead performs explicit prefetching via the readahead syscall; "+
"fadv-sequential starts with explicit prefetching via the readahead syscall then automatically "+
"switches to OS-driven prefetching via fadvise(FADV_SEQUENTIAL)",
"fadv-sequential",
map[objstorageprovider.ReadaheadMode]string{
objstorageprovider.NoReadahead: "off",
objstorageprovider.SysReadahead: "sys-readahead",
objstorageprovider.FadviseSequential: "fadv-sequential",
},
)
// CompressionAlgorithm is an enumeration of available compression algorithms
// available.
type compressionAlgorithm int64
const (
compressionAlgorithmSnappy compressionAlgorithm = 1
compressionAlgorithmZstd compressionAlgorithm = 2
compressionAlgorithmNone compressionAlgorithm = 3
)
// String implements fmt.Stringer for CompressionAlgorithm.
func (c compressionAlgorithm) String() string {
switch c {
case compressionAlgorithmSnappy:
return "snappy"
case compressionAlgorithmZstd:
return "zstd"
case compressionAlgorithmNone:
return "none"
default:
panic(errors.Errorf("unknown compression type: %d", c))
}
}
// RegisterCompressionAlgorithmClusterSetting is a helper to register an enum
// cluster setting with the given name, description and default value.
func RegisterCompressionAlgorithmClusterSetting(
name settings.InternalKey, desc string, defaultValue compressionAlgorithm,
) *settings.EnumSetting[compressionAlgorithm] {
return settings.RegisterEnumSetting(
// NB: We can't use settings.SystemOnly today because we may need to read the
// value from within a tenant building an sstable for AddSSTable.
settings.SystemVisible, name,
desc,
// TODO(jackson): Consider using a metamorphic constant here, but many tests
// will need to override it because they depend on a deterministic sstable
// size.
defaultValue.String(),
map[compressionAlgorithm]string{
compressionAlgorithmSnappy: compressionAlgorithmSnappy.String(),
compressionAlgorithmZstd: compressionAlgorithmZstd.String(),
compressionAlgorithmNone: compressionAlgorithmNone.String(),
},
settings.WithPublic,
)
}
// CompressionAlgorithmStorage determines the compression algorithm used to
// compress data blocks when writing sstables for use in a Pebble store (written
// directly, or constructed for ingestion on a remote store via AddSSTable).
// Users should call getCompressionAlgorithm with the cluster setting, rather
// than calling Get directly.
var CompressionAlgorithmStorage = RegisterCompressionAlgorithmClusterSetting(
"storage.sstable.compression_algorithm",
`determines the compression algorithm to use when compressing sstable data blocks for use in a Pebble store;`,
compressionAlgorithmSnappy, // Default.
)
// CompressionAlgorithmBackupStorage determines the compression algorithm used
// to compress data blocks when writing sstables that contain backup row data
// storage. Users should call getCompressionAlgorithm with the cluster setting,
// rather than calling Get directly.
var CompressionAlgorithmBackupStorage = RegisterCompressionAlgorithmClusterSetting(
"storage.sstable.compression_algorithm_backup_storage",
`determines the compression algorithm to use when compressing sstable data blocks for backup row data storage;`,
compressionAlgorithmSnappy, // Default.
)
// CompressionAlgorithmBackupTransport determines the compression algorithm used
// to compress data blocks when writing sstables that will be immediately
// iterated and will never need to touch disk. These sstables typically have
// much larger blocks and benefit from compression. However, this compression
// algorithm may be different to the one used when writing out the sstables for
// remote storage. Users should call getCompressionAlgorithm with the cluster
// setting, rather than calling Get directly.
var CompressionAlgorithmBackupTransport = RegisterCompressionAlgorithmClusterSetting(
"storage.sstable.compression_algorithm_backup_transport",
`determines the compression algorithm to use when compressing sstable data blocks for backup transport;`,
compressionAlgorithmSnappy, // Default.
)
func getCompressionAlgorithm(
ctx context.Context,
settings *cluster.Settings,
setting *settings.EnumSetting[compressionAlgorithm],
) pebble.Compression {
switch setting.Get(&settings.SV) {
case compressionAlgorithmSnappy:
return pebble.SnappyCompression
case compressionAlgorithmZstd:
// Pre-24.1 Pebble's implementation of zstd had bugs that could cause
// in-memory corruption. We require that the cluster version is 24.1 which
// implies that all nodes are running 24.1 code and will never run code
// < 24.1 again.
if settings.Version.ActiveVersionOrEmpty(ctx).IsActive(clusterversion.V24_1) {
return pebble.ZstdCompression
}
return pebble.DefaultCompression
case compressionAlgorithmNone:
return pebble.NoCompression
default:
return pebble.DefaultCompression
}
}
// DO NOT set storage.single_delete.crash_on_invariant_violation.enabled or
// storage.single_delete.crash_on_ineffectual.enabled to true.
//
// Pebble's delete-only compactions can cause a recent RANGEDEL to peek below
// an older SINGLEDEL and delete an arbitrary subset of data below that
// SINGLEDEL. When that SINGLEDEL gets compacted (without the RANGEDEL), any
// of these callbacks can happen, without it being a real correctness problem.
//
// Example 1:
// RANGEDEL [a, c)#10 in L0
// SINGLEDEL b#5 in L1
// SET b#3 in L6
//
// If the L6 file containing the SET is narrow and the L1 file containing the
// SINGLEDEL is wide, a delete-only compaction can remove the file in L2
// before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
// compacted down, it will not find any SET to delete, resulting in the
// ineffectual callback.
//
// Example 2:
// RANGEDEL [a, z)#60 in L0
// SINGLEDEL g#50 in L1
// SET g#40 in L2
// RANGEDEL [g,h)#30 in L3
// SET g#20 in L6
//
// In this example, the two SETs represent the same intent, and the RANGEDELs
// are caused by the CRDB range being dropped. That is, the transaction wrote
// the intent once, range was dropped, then added back, which caused the SET
// again, then the transaction committed, causing a SINGLEDEL, and then the
// range was dropped again. The older RANGEDEL can get fragmented due to
// compactions it has been part of. Say this L3 file containing the RANGEDEL
// is very narrow, while the L1, L2, L6 files are wider than the RANGEDEL in
// L0. Then the RANGEDEL in L3 can be dropped using a delete-only compaction,
// resulting in an LSM with state:
//
// RANGEDEL [a, z)#60 in L0
// SINGLEDEL g#50 in L1
// SET g#40 in L2
// SET g#20 in L6
//
// A multi-level compaction involving L1, L2, L6 will cause the invariant
// violation callback. This example doesn't need multi-level compactions: say
// there was a Pebble snapshot at g#21 preventing g#20 from being dropped when
// it meets g#40 in a compaction. That snapshot will not save RANGEDEL
// [g,h)#30, so we can have:
//
// SINGLEDEL g#50 in L1
// SET g#40, SET g#20 in L6
//
// And say the snapshot is removed and then the L1 and L6 compaction happens,
// resulting in the invariant violation callback.
//
// TODO(sumeer): remove these cluster settings or figure out a way to bring
// back some invariant checking.
var SingleDeleteCrashOnInvariantViolation = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.single_delete.crash_on_invariant_violation.enabled",
"set to true to crash if the single delete invariant is violated",
false,
settings.WithVisibility(settings.Reserved),
)
var SingleDeleteCrashOnIneffectual = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.single_delete.crash_on_ineffectual.enabled",
"set to true to crash if the single delete was ineffectual",
false,
settings.WithVisibility(settings.Reserved),
)
var walFailoverUnhealthyOpThreshold = settings.RegisterDurationSetting(
settings.SystemOnly,
"storage.wal_failover.unhealthy_op_threshold",
"the latency of a WAL write considered unhealthy and triggers a failover to a secondary WAL location",
100*time.Millisecond,
settings.WithPublic,
)
// TODO(ssd): This could be SystemOnly but we currently init pebble
// engines for temporary storage. Temporary engines shouldn't really
// care about download compactions, but they do currently simply
// because of code organization.
var concurrentDownloadCompactions = settings.RegisterIntSetting(
settings.ApplicationLevel,
"storage.max_download_compaction_concurrency",
"the maximum number of concurrent download compactions",
8,
settings.IntWithMinimum(1),
)
// ShouldUseEFOS returns true if either of the UseEFOS or UseExciseForSnapshots
// cluster settings are enabled, and EventuallyFileOnlySnapshots must be used
// to guarantee snapshot-like semantics.
func ShouldUseEFOS(settings *settings.Values) bool {
return UseEFOS.Get(settings) || UseExciseForSnapshots.Get(settings)
}
// EngineKeyCompare compares cockroach keys, including the version (which
// could be MVCC timestamps).
func EngineKeyCompare(a, b []byte) int {
// NB: For performance, this routine manually splits the key into the
// user-key and version components rather than using DecodeEngineKey. In
// most situations, use DecodeEngineKey or GetKeyPartFromEngineKey or
// SplitMVCCKey instead of doing this.
aEnd := len(a) - 1
bEnd := len(b) - 1
if aEnd < 0 || bEnd < 0 {
// This should never happen unless there is some sort of corruption of
// the keys.
return bytes.Compare(a, b)
}
// Compute the index of the separator between the key and the version. If the
// separator is found to be at -1 for both keys, then we are comparing bare
// suffixes without a user key part. Pebble requires bare suffixes to be
// comparable with the same ordering as if they had a common user key.
aSep := aEnd - int(a[aEnd])
bSep := bEnd - int(b[bEnd])
if aSep == -1 && bSep == -1 {
aSep, bSep = 0, 0 // comparing bare suffixes
}
if aSep < 0 || bSep < 0 {
// This should never happen unless there is some sort of corruption of
// the keys.
return bytes.Compare(a, b)
}
// Compare the "user key" part of the key.
if c := bytes.Compare(a[:aSep], b[:bSep]); c != 0 {
return c
}
// Compare the version part of the key. Note that when the version is a
// timestamp, the timestamp encoding causes byte comparison to be equivalent
// to timestamp comparison.
aVer := a[aSep:aEnd]
bVer := b[bSep:bEnd]
if len(aVer) == 0 {
if len(bVer) == 0 {
return 0
}
return -1
} else if len(bVer) == 0 {
return 1
}
aVer = normalizeEngineKeyVersionForCompare(aVer)
bVer = normalizeEngineKeyVersionForCompare(bVer)
return bytes.Compare(bVer, aVer)
}
// EngineKeyEqual checks for equality of cockroach keys, including the version
// (which could be MVCC timestamps).
func EngineKeyEqual(a, b []byte) bool {
// NB: For performance, this routine manually splits the key into the
// user-key and version components rather than using DecodeEngineKey. In
// most situations, use DecodeEngineKey or GetKeyPartFromEngineKey or
// SplitMVCCKey instead of doing this.
aEnd := len(a) - 1
bEnd := len(b) - 1
if aEnd < 0 || bEnd < 0 {
// This should never happen unless there is some sort of corruption of
// the keys.
return bytes.Equal(a, b)
}
// Last byte is the version length + 1 when there is a version,
// else it is 0.
aVerLen := int(a[aEnd])
bVerLen := int(b[bEnd])
// Fast-path. If the key version is empty or contains only a walltime
// component then normalizeEngineKeyVersionForCompare is a no-op, so we don't
// need to split the "user key" from the version suffix before comparing to
// compute equality. Instead, we can check for byte equality immediately.
const withWall = mvccEncodedTimeSentinelLen + mvccEncodedTimeWallLen
const withLockTableLen = mvccEncodedTimeSentinelLen + engineKeyVersionLockTableLen
if (aVerLen <= withWall && bVerLen <= withWall) || (aVerLen == withLockTableLen && bVerLen == withLockTableLen) {
return bytes.Equal(a, b)
}
// Compute the index of the separator between the key and the version. If the
// separator is found to be at -1 for both keys, then we are comparing bare
// suffixes without a user key part. Pebble requires bare suffixes to be
// comparable with the same ordering as if they had a common user key.
aSep := aEnd - aVerLen
bSep := bEnd - bVerLen
if aSep == -1 && bSep == -1 {
aSep, bSep = 0, 0 // comparing bare suffixes
}
if aSep < 0 || bSep < 0 {
// This should never happen unless there is some sort of corruption of
// the keys.
return bytes.Equal(a, b)
}
// Compare the "user key" part of the key.
if !bytes.Equal(a[:aSep], b[:bSep]) {
return false
}
// Compare the version part of the key.
aVer := a[aSep:aEnd]
bVer := b[bSep:bEnd]
aVer = normalizeEngineKeyVersionForCompare(aVer)
bVer = normalizeEngineKeyVersionForCompare(bVer)
return bytes.Equal(aVer, bVer)
}
var zeroLogical [mvccEncodedTimeLogicalLen]byte
//gcassert:inline
func normalizeEngineKeyVersionForCompare(a []byte) []byte {
// In general, the version could also be a non-timestamp version, but we know
// that engineKeyVersionLockTableLen+mvccEncodedTimeSentinelLen is a different
// constant than the above, so there is no danger here of stripping parts from
// a non-timestamp version.
const withWall = mvccEncodedTimeSentinelLen + mvccEncodedTimeWallLen
const withLogical = withWall + mvccEncodedTimeLogicalLen
const withSynthetic = withLogical + mvccEncodedTimeSyntheticLen
if len(a) == withSynthetic {
// Strip the synthetic bit component from the timestamp version. The
// presence of the synthetic bit does not affect key ordering or equality.
a = a[:withLogical]
}
if len(a) == withLogical {
// If the timestamp version contains a logical timestamp component that is
// zero, strip the component. encodeMVCCTimestampToBuf will typically omit
// the entire logical component in these cases as an optimization, but it
// does not guarantee to never include a zero logical component.
// Additionally, we can fall into this case after stripping off other
// components of the key version earlier on in this function.
if bytes.Equal(a[withWall:], zeroLogical[:]) {
a = a[:withWall]
}
}
return a
}
// EngineComparer is a pebble.Comparer object that implements MVCC-specific
// comparator settings for use with Pebble.
var EngineComparer = &pebble.Comparer{
Compare: EngineKeyCompare,
Equal: EngineKeyEqual,
AbbreviatedKey: func(k []byte) uint64 {
key, ok := GetKeyPartFromEngineKey(k)
if !ok {
return 0
}
return pebble.DefaultComparer.AbbreviatedKey(key)
},
FormatKey: func(k []byte) fmt.Formatter {
decoded, ok := DecodeEngineKey(k)
if !ok {
return mvccKeyFormatter{err: errors.Errorf("invalid encoded engine key: %x", k)}
}
if decoded.IsMVCCKey() {
mvccKey, err := decoded.ToMVCCKey()
if err != nil {
return mvccKeyFormatter{err: err}
}
return mvccKeyFormatter{key: mvccKey}
}
return EngineKeyFormatter{key: decoded}
},
Separator: func(dst, a, b []byte) []byte {
aKey, ok := GetKeyPartFromEngineKey(a)
if !ok {
return append(dst, a...)
}
bKey, ok := GetKeyPartFromEngineKey(b)
if !ok {
return append(dst, a...)
}
// If the keys are the same just return a.
if bytes.Equal(aKey, bKey) {
return append(dst, a...)
}
n := len(dst)
// Engine key comparison uses bytes.Compare on the roachpb.Key, which is the same semantics as
// pebble.DefaultComparer, so reuse the latter's Separator implementation.
dst = pebble.DefaultComparer.Separator(dst, aKey, bKey)
// Did it pick a separator different than aKey -- if it did not we can't do better than a.
buf := dst[n:]
if bytes.Equal(aKey, buf) {
return append(dst[:n], a...)
}
// The separator is > aKey, so we only need to add the sentinel.
return append(dst, 0)
},
Successor: func(dst, a []byte) []byte {
aKey, ok := GetKeyPartFromEngineKey(a)
if !ok {
return append(dst, a...)
}
n := len(dst)
// Engine key comparison uses bytes.Compare on the roachpb.Key, which is the same semantics as
// pebble.DefaultComparer, so reuse the latter's Successor implementation.
dst = pebble.DefaultComparer.Successor(dst, aKey)
// Did it pick a successor different than aKey -- if it did not we can't do better than a.
buf := dst[n:]
if bytes.Equal(aKey, buf) {
return append(dst[:n], a...)
}
// The successor is > aKey, so we only need to add the sentinel.
return append(dst, 0)
},
ImmediateSuccessor: func(dst, a []byte) []byte {
// The key `a` is guaranteed to be a bare prefix: It's a
// `engineKeyNoVersion` key without a version—just a trailing 0-byte to
// signify the length of the version. For example the user key "foo" is
// encoded as: "foo\0". We need to encode the immediate successor to
// "foo", which in the natural byte ordering is "foo\0". Append a
// single additional zero, to encode the user key "foo\0" with a
// zero-length version.
return append(append(dst, a...), 0)
},
Split: func(k []byte) int {
keyLen := len(k)
if keyLen == 0 {
return 0
}
// Last byte is the version length + 1 when there is a version,
// else it is 0.
versionLen := int(k[keyLen-1])
// keyPartEnd points to the sentinel byte.
keyPartEnd := keyLen - 1 - versionLen
if keyPartEnd < 0 {
return keyLen
}
// Pebble requires that keys generated via a split be comparable with
// normal encoded engine keys. Encoded engine keys have a suffix
// indicating the number of bytes of version data. Engine keys without a
// version have a suffix of 0. We're careful in EncodeKey to make sure
// that the user-key always has a trailing 0. If there is no version this
// falls out naturally. If there is a version we prepend a 0 to the
// encoded version data.
return keyPartEnd + 1
},
Name: "cockroach_comparator",
}
// MVCCMerger is a pebble.Merger object that implements the merge operator used
// by Cockroach.
var MVCCMerger = &pebble.Merger{
Name: "cockroach_merge_operator",
Merge: func(_, value []byte) (pebble.ValueMerger, error) {
res := &MVCCValueMerger{}
err := res.MergeNewer(value)
if err != nil {
return nil, err
}
return res, nil
},
}
var _ sstable.BlockIntervalSuffixReplacer = MVCCBlockIntervalSuffixReplacer{}
type MVCCBlockIntervalSuffixReplacer struct{}
func (MVCCBlockIntervalSuffixReplacer) ApplySuffixReplacement(
interval sstable.BlockInterval, newSuffix []byte,
) (sstable.BlockInterval, error) {
synthDecoded, err := DecodeMVCCTimestampSuffix(newSuffix)
if err != nil {
return sstable.BlockInterval{}, errors.AssertionFailedf("could not decode synthetic suffix")
}
synthDecodedWalltime := uint64(synthDecoded.WallTime)
// The returned bound includes the synthetic suffix, regardless of its logical
// component.
return sstable.BlockInterval{Lower: synthDecodedWalltime, Upper: synthDecodedWalltime + 1}, nil
}
type pebbleIntervalMapper struct{}
var _ sstable.IntervalMapper = pebbleIntervalMapper{}
// MapPointKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapPointKey(
key pebble.InternalKey, value []byte,
) (sstable.BlockInterval, error) {
return mapSuffixToInterval(key.UserKey)
}
// MapRangeKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapRangeKeys(span sstable.Span) (sstable.BlockInterval, error) {
var res sstable.BlockInterval
for _, k := range span.Keys {
i, err := mapSuffixToInterval(k.Suffix)
if err != nil {
return sstable.BlockInterval{}, err
}
res.UnionWith(i)
}
return res, nil
}
// mapSuffixToInterval maps the suffix of a key to a timestamp interval.
// The buffer can be an entire key or just the suffix.
func mapSuffixToInterval(b []byte) (sstable.BlockInterval, error) {
if len(b) == 0 {
return sstable.BlockInterval{}, nil
}
// Last byte is the version length + 1 when there is a version,
// else it is 0.
versionLen := int(b[len(b)-1])
if versionLen == 0 {
// This is not an MVCC key that we can collect.
return sstable.BlockInterval{}, nil
}
// prefixPartEnd points to the sentinel byte, unless this is a bare suffix, in
// which case the index is -1.
prefixPartEnd := len(b) - 1 - versionLen
// Sanity check: the index should be >= -1. Additionally, if the index is >=
// 0, it should point to the sentinel byte, as this is a full EngineKey.
if prefixPartEnd < -1 || (prefixPartEnd >= 0 && b[prefixPartEnd] != sentinel) {
return sstable.BlockInterval{}, errors.Errorf("invalid key %s", roachpb.Key(b).String())
}
// We don't need the last byte (the version length).
versionLen--
// Only collect if this looks like an MVCC timestamp.
if versionLen == engineKeyVersionWallTimeLen ||
versionLen == engineKeyVersionWallAndLogicalTimeLen ||
versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen {
// INVARIANT: -1 <= prefixPartEnd < len(b) - 1.
// Version consists of the bytes after the sentinel and before the length.
ts := binary.BigEndian.Uint64(b[prefixPartEnd+1:])
return sstable.BlockInterval{Lower: ts, Upper: ts + 1}, nil
}
return sstable.BlockInterval{}, nil
}
const mvccWallTimeIntervalCollector = "MVCCTimeInterval"
var _ pebble.BlockPropertyFilterMask = (*mvccWallTimeIntervalRangeKeyMask)(nil)
type mvccWallTimeIntervalRangeKeyMask struct {
sstable.BlockIntervalFilter
}
// SetSuffix implements the pebble.BlockPropertyFilterMask interface.
func (m *mvccWallTimeIntervalRangeKeyMask) SetSuffix(suffix []byte) error {
if len(suffix) == 0 {
// This is currently impossible, because the only range key Cockroach
// writes today is the MVCC Delete Range that's always suffixed.
return nil
}
ts, err := DecodeMVCCTimestampSuffix(suffix)
if err != nil {
return err
}
m.BlockIntervalFilter.SetInterval(uint64(ts.WallTime), math.MaxUint64)
return nil
}
// PebbleBlockPropertyCollectors is the list of functions to construct
// BlockPropertyCollectors.
var PebbleBlockPropertyCollectors = []func() pebble.BlockPropertyCollector{
func() pebble.BlockPropertyCollector {
return sstable.NewBlockIntervalCollector(
mvccWallTimeIntervalCollector,
pebbleIntervalMapper{},
MVCCBlockIntervalSuffixReplacer{},
)
},
}
// MinimumSupportedFormatVersion is the version that provides features that the
// Cockroach code relies on unconditionally (like range keys). New stores are by
// default created with this version. It should correspond to the minimum
// supported binary version.
const MinimumSupportedFormatVersion = pebble.FormatVirtualSSTables
// DefaultPebbleOptions returns the default pebble options.
func DefaultPebbleOptions() *pebble.Options {
opts := &pebble.Options{
Comparer: EngineComparer,
FS: vfs.Default,
// A value of 2 triggers a compaction when there is 1 sub-level.
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: make([]pebble.LevelOptions, 7),
// NB: Options.MaxConcurrentCompactions may be overidden in NewPebble to
// allow overriding the max at runtime through
// Engine.SetCompactionConcurrency.
MaxConcurrentCompactions: getMaxConcurrentCompactions,
MemTableSize: 64 << 20, // 64 MB
MemTableStopWritesThreshold: 4,
Merger: MVCCMerger,
BlockPropertyCollectors: PebbleBlockPropertyCollectors,
FormatMajorVersion: MinimumSupportedFormatVersion,
}
opts.Experimental.L0CompactionConcurrency = l0SubLevelCompactionConcurrency
// Automatically flush 10s after the first range tombstone is added to a
// memtable. This ensures that we can reclaim space even when there's no
// activity on the database generating flushes.
opts.FlushDelayDeleteRange = 10 * time.Second
// Automatically flush 10s after the first range key is added to a memtable.
// This ensures that range keys are quickly flushed, allowing use of lazy
// combined iteration within Pebble.
opts.FlushDelayRangeKey = 10 * time.Second
// Enable deletion pacing. This helps prevent disk slowness events on some
// SSDs, that kick off an expensive GC if a lot of files are deleted at
// once.
opts.TargetByteDeletionRate = 128 << 20 // 128 MB
// Validate min/max keys in each SSTable when performing a compaction. This
// serves as a simple protection against corruption or programmer-error in
// Pebble.
opts.Experimental.KeyValidationFunc = func(userKey []byte) error {
engineKey, ok := DecodeEngineKey(userKey)
if !ok {
return errors.Newf("key %s could not be decoded as an EngineKey", string(userKey))
}
if err := engineKey.Validate(); err != nil {
return err
}
return nil
}
opts.Experimental.ShortAttributeExtractor = shortAttributeExtractorForValues
opts.Experimental.RequiredInPlaceValueBound = pebble.UserKeyPrefixBound{
Lower: keys.LocalRangeLockTablePrefix,
Upper: keys.LocalRangeLockTablePrefix.PrefixEnd(),
}
for i := 0; i < len(opts.Levels); i++ {
l := &opts.Levels[i]
l.BlockSize = 32 << 10 // 32 KB
l.IndexBlockSize = 256 << 10 // 256 KB
l.FilterPolicy = bloom.FilterPolicy(10)
l.FilterType = pebble.TableFilter
if i > 0 {
l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2
}
l.EnsureDefaults()
}
// These size classes are a subset of available size classes in jemalloc[1].
// The size classes are used by Pebble for determining target block sizes for
// flushes, with the goal of reducing internal fragmentation. There are many
// more size classes that could be included, however, sstable blocks have a
// target block size of 32KiB, a minimum size threshold of ~19.6KiB and are
// unlikely to exceed 128KiB.
//
// [1] https://jemalloc.net/jemalloc.3.html#size_classes
opts.AllocatorSizeClasses = []int{
16384,
20480, 24576, 28672, 32768,
40960, 49152, 57344, 65536,
81920, 98304, 114688, 131072,
}
return opts
}
func shortAttributeExtractorForValues(
key []byte, keyPrefixLen int, value []byte,
) (pebble.ShortAttribute, error) {
suffixLen := len(key) - keyPrefixLen
const lockTableSuffixLen = engineKeyVersionLockTableLen + sentinelLen
if suffixLen == engineKeyNoVersion || suffixLen == lockTableSuffixLen {
// Not a versioned MVCC value.
return 0, nil
}
isTombstone, err := EncodedMVCCValueIsTombstone(value)
if err != nil {
return 0, err
}
if isTombstone {
return 1, nil
}
return 0, nil
}
// engineConfig holds all configuration parameters and knobs used in setting up
// a new storage engine.
type engineConfig struct {
attrs roachpb.Attributes
// ballastSize is the amount reserved by a ballast file for manual
// out-of-disk recovery.
ballastSize int64
// cacheSize is stored separately so that we can avoid constructing the
// PebbleConfig.Opts.Cache until the call to Open. A Cache is created with
// a ref count of 1, so creating the Cache during execution of
// ConfigOption makes it too easy to leak a cache.
cacheSize *int64
// env holds the initialized virtual filesystem that the Engine should use.
env *fs.Env
// maxSize is used for calculating free space and making rebalancing
// decisions. The zero value indicates that there is no absolute maximum size.
maxSize storeSize
// If true, creating the instance fails if the target directory does not hold
// an initialized instance.
//
// Makes no sense for in-memory instances.
mustExist bool
// pebble specific options.
opts *pebble.Options
// remoteStorageFactory is used to pass the ExternalStorage factory.
remoteStorageFactory *cloud.EarlyBootExternalStorageAccessor
// settings instance for cluster-wide knobs. Must not be nil.
settings *cluster.Settings
// sharedStorage is a cloud.ExternalStorage that can be used by all Pebble
// stores on this node and on other nodes to store sstables.
sharedStorage cloud.ExternalStorage
// beforeClose is a slice of functions to be invoked before the engine is closed.
beforeClose []func(*Pebble)
// afterClose is a slice of functions to be invoked after the engine is closed.
afterClose []func()
// diskMonitor is used to output a disk trace when a stall is detected.
diskMonitor *disk.Monitor
// DiskWriteStatsCollector is used to categorically track disk write metrics
// across all Pebble stores on this node.
DiskWriteStatsCollector *vfs.DiskWriteStatsCollector
// blockConcurrencyLimitDivisor is used to calculate the block load
// concurrency limit: it is the current valuer of the
// BlockLoadConcurrencyLimit setting divided by this value. It should be set
// to the number of stores.
//
// This is necessary because we want separate limiters per stores (we don't
// want one bad disk to block other stores)
//
// A value of 0 disables the limit.
blockConcurrencyLimitDivisor int
}
// Pebble is a wrapper around a Pebble database instance.
type Pebble struct {
atomic struct {
// compactionConcurrency is the current compaction concurrency set on
// the Pebble store. The compactionConcurrency option in the Pebble
// Options struct is a closure which will return
// Pebble.atomic.compactionConcurrency.
//
// This mechanism allows us to change the Pebble compactionConcurrency
// on the fly without restarting Pebble.
compactionConcurrency uint64
}
cfg engineConfig
db *pebble.DB
closed bool
auxDir string
ballastPath string
properties roachpb.StoreProperties
// Stats updated by pebble.EventListener invocations, and returned in
// GetMetrics. Updated and retrieved atomically.
writeStallCount int64
writeStallDuration time.Duration
writeStallStartNanos int64
diskSlowCount int64
diskStallCount int64
singleDelInvariantViolationCount int64
singleDelIneffectualCount int64
sharedBytesRead int64
sharedBytesWritten int64
iterStats struct {
syncutil.Mutex
AggregatedIteratorStats
}
batchCommitStats struct {
syncutil.Mutex
AggregatedBatchCommitStats
}
diskWriteStatsCollector *vfs.DiskWriteStatsCollector
// Relevant options copied over from pebble.Options.
logCtx context.Context
logger pebble.LoggerAndTracer
eventListener *pebble.EventListener
mu struct {
// This mutex is the lowest in any lock ordering.
syncutil.Mutex
flushCompletedCallback func()
}
asyncDone sync.WaitGroup
// minVersion is the minimum CockroachDB version that can open this store.
minVersion roachpb.Version
storeIDPebbleLog *base.StoreIDContainer
replayer *replay.WorkloadCollector
diskSlowFunc atomic.Pointer[func(vfs.DiskSlowInfo)]
singleDelLogEvery log.EveryN
}
// WorkloadCollector implements an workloadCollectorGetter and returns the
// workload collector stored on Pebble. This method is invoked following a
// successful cast of an Engine to a `workloadCollectorGetter` type. This method
// allows for pebble exclusive functionality to be used without modifying the
// Engine interface.
func (p *Pebble) WorkloadCollector() *replay.WorkloadCollector {
return p.replayer
}