From e3cbe6501484beccccdbb3dceffac0738993dbf1 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 9 Oct 2023 13:06:49 -0700 Subject: [PATCH] db: use DiskFileName for manifest files and memtables These subsystems deal with on-disk files, using DiskFileName is more appropriate. This reduces the number of conversions in the code a lot. There are only a few `DiskFileName.FileName()` conversions left. We change `DiskFileNum` to be a `uint64` like `FileNum`, so that `DiskFileNum`s can also be compared and printed as integers. --- checkpoint.go | 8 +- compaction.go | 6 +- db.go | 18 +++-- db_test.go | 4 +- event.go | 5 +- flushable.go | 3 +- flushable_test.go | 2 +- format_major_version.go | 2 +- ingest.go | 4 +- internal/base/filenames.go | 18 ++--- internal/base/filenames_test.go | 2 +- internal/manifest/testdata/version_edit_apply | 4 +- internal/manifest/version_edit.go | 10 +-- open.go | 31 ++++---- record/log_writer.go | 4 +- record/record.go | 2 +- record/record_test.go | 26 +++---- replay/workload_capture.go | 2 +- replay/workload_capture_test.go | 4 +- table_cache_test.go | 2 +- tool/find.go | 24 +++--- tool/wal.go | 2 +- version_set.go | 74 ++++++++++--------- version_set_test.go | 4 +- 24 files changed, 139 insertions(+), 122 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index ad70a35473..f321c01ec3 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -298,7 +298,7 @@ func (d *DB) Checkpoint( } ckErr = d.writeCheckpointManifest( - fs, formatVers, destDir, dir, manifestFileNum.DiskFileNum(), manifestSize, + fs, formatVers, destDir, dir, manifestFileNum, manifestSize, excludedFiles, removeBackingTables, ) if ckErr != nil { @@ -313,7 +313,7 @@ func (d *DB) Checkpoint( if logNum == 0 { continue } - srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum.DiskFileNum()) + srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum) destPath := fs.PathJoin(destDir, fs.PathBase(srcPath)) ckErr = vfs.Copy(fs, srcPath, destPath) if ckErr != nil { @@ -368,7 +368,7 @@ func (d *DB) writeCheckpointManifest( // need to append another record with the excluded files (we cannot simply // append a record after a raw data copy; see // https://github.com/cockroachdb/cockroach/issues/100935). - r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum()) + r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum) w := record.NewWriter(dst) for { rr, err := r.Next() @@ -421,7 +421,7 @@ func (d *DB) writeCheckpointManifest( if err != nil { return err } - if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum.FileNum()); err != nil { + if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum); err != nil { return err } return manifestMarker.Close() diff --git a/compaction.go b/compaction.go index fb29dda195..79fc115cc1 100644 --- a/compaction.go +++ b/compaction.go @@ -3670,7 +3670,7 @@ func (d *DB) scanObsoleteFiles(list []string) { } switch fileType { case fileTypeLog: - if diskFileNum.FileNum() >= minUnflushedLogNum { + if diskFileNum >= minUnflushedLogNum { continue } fi := fileInfo{fileNum: diskFileNum} @@ -3679,7 +3679,7 @@ func (d *DB) scanObsoleteFiles(list []string) { } obsoleteLogs = append(obsoleteLogs, fi) case fileTypeManifest: - if diskFileNum.FileNum() >= manifestFileNum { + if diskFileNum >= manifestFileNum { continue } fi := fileInfo{fileNum: diskFileNum} @@ -3785,7 +3785,7 @@ func (d *DB) deleteObsoleteFiles(jobID int) { // log that has not had its contents flushed to an sstable. We can recycle // the prefix of d.mu.log.queue with log numbers less than // minUnflushedLogNum. - if d.mu.log.queue[i].fileNum.FileNum() >= d.mu.versions.minUnflushedLogNum { + if d.mu.log.queue[i].fileNum >= d.mu.versions.minUnflushedLogNum { obsoleteLogs = d.mu.log.queue[:i] d.mu.log.queue = d.mu.log.queue[i:] d.mu.versions.metrics.WAL.Files -= int64(len(obsoleteLogs)) diff --git a/db.go b/db.go index 49b025dd02..4d25b9f241 100644 --- a/db.go +++ b/db.go @@ -2302,7 +2302,7 @@ func (d *DB) walPreallocateSize() int { return int(size) } -func (d *DB) newMemTable(logNum FileNum, logSeqNum uint64) (*memTable, *flushableEntry) { +func (d *DB) newMemTable(logNum base.DiskFileNum, logSeqNum uint64) (*memTable, *flushableEntry) { size := d.mu.mem.nextSize if d.mu.mem.nextSize < d.opts.MemTableSize { d.mu.mem.nextSize *= 2 @@ -2373,7 +2373,9 @@ func (d *DB) freeMemTable(m *memTable) { m.free() } -func (d *DB) newFlushableEntry(f flushable, logNum FileNum, logSeqNum uint64) *flushableEntry { +func (d *DB) newFlushableEntry( + f flushable, logNum base.DiskFileNum, logSeqNum uint64, +) *flushableEntry { fe := &flushableEntry{ flushable: f, flushed: make(chan struct{}), @@ -2457,7 +2459,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { continue } - var newLogNum base.FileNum + var newLogNum base.DiskFileNum var prevLogSize uint64 if !d.opts.DisableWAL { now := time.Now() @@ -2514,7 +2516,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { } // Both DB.mu and commitPipeline.mu must be held by the caller. -func (d *DB) rotateMemtable(newLogNum FileNum, logSeqNum uint64, prev *memTable) { +func (d *DB) rotateMemtable(newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable) { // Create a new memtable, scheduling the previous one for flushing. We do // this even if the previous memtable was empty because the DB.Flush // mechanism is dependent on being able to wait for the empty memtable to @@ -2541,14 +2543,14 @@ func (d *DB) rotateMemtable(newLogNum FileNum, logSeqNum uint64, prev *memTable) // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu // may be released and reacquired. -func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) { +func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) { if d.opts.DisableWAL { panic("pebble: invalid function call") } jobID := d.mu.nextJobID d.mu.nextJobID++ - newLogNum = d.mu.versions.getNextFileNum() + newLogNum = d.mu.versions.getNextDiskFileNum() prevLogSize = uint64(d.mu.log.Size()) @@ -2574,7 +2576,7 @@ func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) { } d.mu.Unlock() - newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum.DiskFileNum()) + newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum) // Try to use a recycled log file. Recycling log files is an important // performance optimization as it is faster to sync a file that has @@ -2655,7 +2657,7 @@ func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) { panic(err) } - d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum.DiskFileNum(), fileSize: newLogSize}) + d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize}) d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{ WALFsyncLatency: d.mu.log.metrics.fsyncLatency, WALMinSyncInterval: d.opts.WALMinSyncInterval, diff --git a/db_test.go b/db_test.go index 0c208bea73..a3d781de6c 100644 --- a/db_test.go +++ b/db_test.go @@ -943,7 +943,7 @@ func TestRollManifest(t *testing.T) { d, err := Open("", opts) require.NoError(t, err) - manifestFileNumber := func() FileNum { + manifestFileNumber := func() base.DiskFileNum { d.mu.Lock() defer d.mu.Unlock() return d.mu.versions.manifestFileNum @@ -961,7 +961,7 @@ func TestRollManifest(t *testing.T) { } lastManifestNum := manifestFileNumber() - manifestNums := []base.FileNum{lastManifestNum} + manifestNums := []base.DiskFileNum{lastManifestNum} for i := 0; i < 5; i++ { // MaxManifestFileSize is 1, but the rollover logic also counts edits // since the last snapshot to decide on rollover, so do as many flushes as diff --git a/event.go b/event.go index d431d07092..39452c72b2 100644 --- a/event.go +++ b/event.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" @@ -247,7 +248,7 @@ type ManifestCreateInfo struct { JobID int Path string // The file number of the new Manifest. - FileNum FileNum + FileNum base.DiskFileNum Err error } @@ -414,7 +415,7 @@ type WALCreateInfo struct { JobID int Path string // The file number of the new WAL. - FileNum FileNum + FileNum base.DiskFileNum // The file number of a previous WAL which was recycled to create this // one. Zero if recycling did not take place. RecycledFileNum FileNum diff --git a/flushable.go b/flushable.go index 09abee385b..ef801ee97d 100644 --- a/flushable.go +++ b/flushable.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" ) @@ -46,7 +47,7 @@ type flushableEntry struct { delayedFlushForcedAt time.Time // logNum corresponds to the WAL that contains the records present in the // receiver. - logNum FileNum + logNum base.DiskFileNum // logSize is the size in bytes of the associated WAL. Protected by DB.mu. logSize uint64 // The current logSeqNum at the time the memtable was created. This is diff --git a/flushable_test.go b/flushable_test.go index 18c448ce6d..c5d1d9ca34 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -50,7 +50,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { d.mu.Lock() pendingOutputs := make([]base.DiskFileNum, len(paths)) for i := range paths { - pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum() + pendingOutputs[i] = d.mu.versions.getNextDiskFileNum() } jobID := d.mu.nextJobID d.mu.nextJobID++ diff --git a/format_major_version.go b/format_major_version.go index c418578f37..d5876f6866 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -241,7 +241,7 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ // guaranteed to exist, because we unconditionally locate it // during Open. manifestFileNum := d.mu.versions.manifestFileNum - filename := base.MakeFilename(fileTypeManifest, manifestFileNum.DiskFileNum()) + filename := base.MakeFilename(fileTypeManifest, manifestFileNum) if err := d.mu.versions.manifestMarker.Move(filename); err != nil { return errors.Wrap(err, "moving manifest marker") } diff --git a/ingest.go b/ingest.go index f619a7a1ce..53e02d0c7c 100644 --- a/ingest.go +++ b/ingest.go @@ -1160,7 +1160,7 @@ func (d *DB) IngestAndExcise( // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( - meta []*fileMetadata, seqNum uint64, logNum FileNum, + meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, ) (*flushableEntry, error) { // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the @@ -1287,7 +1287,7 @@ func (d *DB) ingest( d.mu.Lock() pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)+len(external)) for i := 0; i < len(paths)+len(shared)+len(external); i++ { - pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum() + pendingOutputs[i] = d.mu.versions.getNextDiskFileNum() } jobID := d.mu.nextJobID diff --git a/internal/base/filenames.go b/internal/base/filenames.go index d2ba2f0e58..06098ab639 100644 --- a/internal/base/filenames.go +++ b/internal/base/filenames.go @@ -30,7 +30,7 @@ func (fn FileNum) SafeFormat(w redact.SafePrinter, _ rune) { // on disk. These could be manifests, log files, physical sstables on disk, the // options file, but not virtual sstables. func (fn FileNum) DiskFileNum() DiskFileNum { - return DiskFileNum{fn} + return DiskFileNum(fn) } // A DiskFileNum is just a FileNum belonging to a file which exists on disk. @@ -39,20 +39,18 @@ func (fn FileNum) DiskFileNum() DiskFileNum { // Converting a DiskFileNum to a FileNum is always valid, whereas converting a // FileNum to DiskFileNum may not be valid and care should be taken to prove // that the FileNum actually exists on disk. -type DiskFileNum struct { - fn FileNum -} +type DiskFileNum uint64 -func (dfn DiskFileNum) String() string { return dfn.fn.String() } +func (dfn DiskFileNum) String() string { return fmt.Sprintf("%06d", dfn) } // SafeFormat implements redact.SafeFormatter. func (dfn DiskFileNum) SafeFormat(w redact.SafePrinter, verb rune) { - dfn.fn.SafeFormat(w, verb) + w.Printf("%06d", redact.SafeUint(dfn)) } // FileNum converts a DiskFileNum to a FileNum. This conversion is always valid. func (dfn DiskFileNum) FileNum() FileNum { - return dfn.fn + return FileNum(dfn) } // FileType enumerates the types of files found in a DB. @@ -103,9 +101,9 @@ func ParseFilename(fs vfs.FS, filename string) (fileType FileType, dfn DiskFileN filename = fs.PathBase(filename) switch { case filename == "CURRENT": - return FileTypeCurrent, DiskFileNum{0}, true + return FileTypeCurrent, 0, true case filename == "LOCK": - return FileTypeLock, DiskFileNum{0}, true + return FileTypeLock, 0, true case strings.HasPrefix(filename, "MANIFEST-"): dfn, ok = parseDiskFileNum(filename[len("MANIFEST-"):]) if !ok { @@ -156,7 +154,7 @@ func parseDiskFileNum(s string) (dfn DiskFileNum, ok bool) { if err != nil { return dfn, false } - return DiskFileNum{FileNum(u)}, true + return DiskFileNum(u), true } // A Fataler fatals a process with a message when called. diff --git a/internal/base/filenames_test.go b/internal/base/filenames_test.go index 7c86fb6cb6..9a5c331580 100644 --- a/internal/base/filenames_test.go +++ b/internal/base/filenames_test.go @@ -110,5 +110,5 @@ directory contains 10 files, 3 unknown, 1 tables, 1 logs, 1 manifests`, buf.buf. func TestRedactFileNum(t *testing.T) { // Ensure that redaction never redacts file numbers. require.Equal(t, redact.RedactableString("000005"), redact.Sprint(FileNum(5))) - require.Equal(t, redact.RedactableString("000005"), redact.Sprint(DiskFileNum{fn: 5})) + require.Equal(t, redact.RedactableString("000005"), redact.Sprint(DiskFileNum(5))) } diff --git a/internal/manifest/testdata/version_edit_apply b/internal/manifest/testdata/version_edit_apply index 7a7e6d4da9..66d9efa700 100644 --- a/internal/manifest/testdata/version_edit_apply +++ b/internal/manifest/testdata/version_edit_apply @@ -125,7 +125,7 @@ edit 000005:[h#3,SET-j#4,SET] 000010:[j#3,SET-m#2,SET] 000002:[n#5,SET-q#3,SET] -zombies [{1} {4}] +zombies [1 4] apply edit @@ -153,7 +153,7 @@ edit L1 2 ---- -zombies [{1} {2}] +zombies [1 2] # Deletion of a non-existent table results in an error. diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 550477c050..5f2d4a15b3 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -94,7 +94,7 @@ type VersionEdit struct { // mutations that have not been flushed to an sstable. // // This is an optional field, and 0 represents it is not set. - MinUnflushedLogNum base.FileNum + MinUnflushedLogNum base.DiskFileNum // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in @@ -104,7 +104,7 @@ type VersionEdit struct { // The next file number. A single counter is used to assign file numbers // for the WAL, MANIFEST, sstable, and OPTIONS files. - NextFileNum base.FileNum + NextFileNum uint64 // LastSeqNum is an upper bound on the sequence numbers that have been // assigned in flushed WALs. Unflushed WALs (that will be replayed during @@ -175,14 +175,14 @@ func (v *VersionEdit) Decode(r io.Reader) error { v.ComparerName = string(s) case tagLogNumber: - n, err := d.readFileNum() + n, err := d.readUvarint() if err != nil { return err } - v.MinUnflushedLogNum = n + v.MinUnflushedLogNum = base.DiskFileNum(n) case tagNextFileNumber: - n, err := d.readFileNum() + n, err := d.readUvarint() if err != nil { return err } diff --git a/open.go b/open.go index 7997087161..e021c9a1aa 100644 --- a/open.go +++ b/open.go @@ -268,7 +268,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname) } // Load the version set. - if err := d.mu.versions.load(dirname, opts, manifestFileNum.FileNum(), manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil { + if err := d.mu.versions.load(dirname, opts, manifestFileNum, manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil { return nil, err } if opts.ErrorIfNotPristine { @@ -336,7 +336,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { // Replay any newer log files than the ones named in the manifest. type fileNumAndName struct { - num FileNum + num base.DiskFileNum name string } var logFiles []fileNumAndName @@ -350,14 +350,14 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { // Don't reuse any obsolete file numbers to avoid modifying an // ingested sstable's original external file. - if d.mu.versions.nextFileNum <= fn.FileNum() { - d.mu.versions.nextFileNum = fn.FileNum() + 1 + if d.mu.versions.nextFileNum <= uint64(fn.FileNum()) { + d.mu.versions.nextFileNum = uint64(fn.FileNum()) + 1 } switch ft { case fileTypeLog: - if fn.FileNum() >= d.mu.versions.minUnflushedLogNum { - logFiles = append(logFiles, fileNumAndName{fn.FileNum(), filename}) + if fn >= d.mu.versions.minUnflushedLogNum { + logFiles = append(logFiles, fileNumAndName{fn, filename}) } if d.logRecycler.minRecycleLogNum <= fn.FileNum() { d.logRecycler.minRecycleLogNum = fn.FileNum() + 1 @@ -385,8 +385,8 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { // objProvider. This avoids FileNum collisions with obsolete sstables. objects := d.objProvider.List() for _, obj := range objects { - if d.mu.versions.nextFileNum <= obj.DiskFileNum.FileNum() { - d.mu.versions.nextFileNum = obj.DiskFileNum.FileNum() + 1 + if d.mu.versions.nextFileNum <= uint64(obj.DiskFileNum) { + d.mu.versions.nextFileNum = uint64(obj.DiskFileNum) + 1 } } @@ -423,7 +423,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { if !d.opts.ReadOnly { // Create an empty .log file. - newLogNum := d.mu.versions.getNextFileNum() + newLogNum := d.mu.versions.getNextDiskFileNum() // This logic is slightly different than RocksDB's. Specifically, RocksDB // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the @@ -445,8 +445,8 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { entry.readerUnrefLocked(true) } - newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum.DiskFileNum()) - d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum.DiskFileNum(), fileSize: 0}) + newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum) + d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0}) logFile, err := opts.FS.Create(newLogName) if err != nil { return nil, err @@ -497,7 +497,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { if !d.opts.ReadOnly { // Write the current options to disk. - d.optionsFileNum = d.mu.versions.getNextFileNum().DiskFileNum() + d.optionsFileNum = d.mu.versions.getNextDiskFileNum() tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum) optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum) @@ -690,7 +690,12 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) replayWAL( - jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, strictWALTail bool, + jobID int, + ve *versionEdit, + fs vfs.FS, + filename string, + logNum base.DiskFileNum, + strictWALTail bool, ) (toFlush flushableList, maxSeqNum uint64, err error) { file, err := fs.Open(filename) if err != nil { diff --git a/record/log_writer.go b/record/log_writer.go index 4aa5d2d249..891879dae9 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -323,7 +323,9 @@ var blockPool = sync.Pool{ } // NewLogWriter returns a new LogWriter. -func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { +func NewLogWriter( + w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig, +) *LogWriter { c, _ := w.(io.Closer) s, _ := w.(syncer) r := &LogWriter{ diff --git a/record/record.go b/record/record.go index 9b42a4c510..8924bfb894 100644 --- a/record/record.go +++ b/record/record.go @@ -187,7 +187,7 @@ type Reader struct { // NewReader returns a new reader. If the file contains records encoded using // the recyclable record format, then the log number in those records must // match the specified logNum. -func NewReader(r io.Reader, logNum base.FileNum) *Reader { +func NewReader(r io.Reader, logNum base.DiskFileNum) *Reader { return &Reader{ r: r, logNum: uint32(logNum), diff --git a/record/record_test.go b/record/record_test.go index 0559ab9d9f..d05207930c 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -893,7 +893,7 @@ func TestRecycleLog(t *testing.T) { limit: blocks, } - w := NewLogWriter(limitedBuf, base.FileNum(i), LogWriterConfig{ + w := NewLogWriter(limitedBuf, base.DiskFileNum(i), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) sizes := make([]int, 10+rnd.Intn(100)) for j := range sizes { @@ -907,7 +907,7 @@ func TestRecycleLog(t *testing.T) { t.Fatalf("%d: %v", i, err) } - r := NewReader(bytes.NewReader(backing), base.FileNum(i)) + r := NewReader(bytes.NewReader(backing), base.DiskFileNum(i)) for j := range sizes { rr, err := r.Next() if err != nil { @@ -937,14 +937,14 @@ func TestRecycleLog(t *testing.T) { func TestTruncatedLog(t *testing.T) { backing := make([]byte, 2*blockSize) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1), LogWriterConfig{ + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(1), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) // Write a record that spans 2 blocks. _, err := w.WriteRecord(bytes.Repeat([]byte("s"), blockSize+100)) require.NoError(t, err) require.NoError(t, w.Close()) // Create a reader only for the first block. - r := NewReader(bytes.NewReader(backing[:blockSize]), base.FileNum(1)) + r := NewReader(bytes.NewReader(backing[:blockSize]), base.DiskFileNum(1)) rr, err := r.Next() require.NoError(t, err) _, err = io.ReadAll(rr) @@ -953,7 +953,7 @@ func TestTruncatedLog(t *testing.T) { func TestRecycleLogWithPartialBlock(t *testing.T) { backing := make([]byte, 27) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1), LogWriterConfig{ + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(1), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) // Will write a chunk with 11 byte header + 5 byte payload. _, err := w.WriteRecord([]byte("aaaaa")) @@ -961,7 +961,7 @@ func TestRecycleLogWithPartialBlock(t *testing.T) { // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2), LogWriterConfig{ + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(2), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) // Will write a chunk with 11 byte header + 1 byte payload. _, err = w.WriteRecord([]byte("a")) @@ -969,7 +969,7 @@ func TestRecycleLogWithPartialBlock(t *testing.T) { // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - r := NewReader(bytes.NewReader(backing), base.FileNum(2)) + r := NewReader(bytes.NewReader(backing), base.DiskFileNum(2)) _, err = r.Next() require.NoError(t, err) // 4 bytes left, which are not enough for even the legacy header. @@ -984,7 +984,7 @@ func TestRecycleLogNumberOverflow(t *testing.T) { // interpreted correctly. backing := make([]byte, 27) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32), LogWriterConfig{ + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(math.MaxUint32), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) // Will write a chunk with 11 byte header + 5 byte payload. _, err := w.WriteRecord([]byte("aaaaa")) @@ -992,7 +992,7 @@ func TestRecycleLogNumberOverflow(t *testing.T) { // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1), LogWriterConfig{ + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(math.MaxUint32+1), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) // Will write a chunk with 11 byte header + 1 byte payload. _, err = w.WriteRecord([]byte("a")) @@ -1000,7 +1000,7 @@ func TestRecycleLogNumberOverflow(t *testing.T) { // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - r := NewReader(bytes.NewReader(backing), base.FileNum(math.MaxUint32+1)) + r := NewReader(bytes.NewReader(backing), base.DiskFileNum(math.MaxUint32+1)) _, err = r.Next() require.NoError(t, err) // 4 bytes left, which are not enough for even the legacy header. @@ -1014,7 +1014,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // Write a record that is larger than the log block size. backing1 := make([]byte, 2*blockSize) - w := NewLogWriter(bytes.NewBuffer(backing1[:0]), base.FileNum(1), LogWriterConfig{ + w := NewLogWriter(bytes.NewBuffer(backing1[:0]), base.DiskFileNum(1), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) _, err := w.WriteRecord(bytes.Repeat([]byte("a"), recordSize)) require.NoError(t, err) @@ -1023,7 +1023,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // Write another record to a new incarnation of the WAL that is larger than // the block size. backing2 := make([]byte, 2*blockSize) - w = NewLogWriter(bytes.NewBuffer(backing2[:0]), base.FileNum(2), LogWriterConfig{ + w = NewLogWriter(bytes.NewBuffer(backing2[:0]), base.DiskFileNum(2), LogWriterConfig{ WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) _, err = w.WriteRecord(bytes.Repeat([]byte("b"), recordSize)) require.NoError(t, err) @@ -1035,7 +1035,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { copy(backing2[blockSize:], backing1[blockSize:]) // Verify that we can't read a partial record from the second WAL. - r := NewReader(bytes.NewReader(backing2), base.FileNum(2)) + r := NewReader(bytes.NewReader(backing2), base.DiskFileNum(2)) rr, err := r.Next() require.NoError(t, err) diff --git a/replay/workload_capture.go b/replay/workload_capture.go index e01f6f8de7..743cf70d4c 100644 --- a/replay/workload_capture.go +++ b/replay/workload_capture.go @@ -198,7 +198,7 @@ func (w *WorkloadCollector) onManifestCreated(info pebble.ManifestCreateInfo) { // mark the manifest file as ready for processing to prevent it from being // cleaned before we process it. - fileName := base.MakeFilename(base.FileTypeManifest, info.FileNum.DiskFileNum()) + fileName := base.MakeFilename(base.FileTypeManifest, info.FileNum) w.mu.fileState[fileName] |= readyForProcessing w.mu.manifests = append(w.mu.manifests, &manifestDetails{ sourceFilepath: info.Path, diff --git a/replay/workload_capture_test.go b/replay/workload_capture_test.go index 0ce3f94e3a..4fafe71c7d 100644 --- a/replay/workload_capture_test.go +++ b/replay/workload_capture_test.go @@ -66,7 +66,7 @@ func TestWorkloadCollector(t *testing.T) { var fileNum uint64 var err error td.ScanArgs(t, "filenum", &fileNum) - path := base.MakeFilepath(fs, srcDir, base.FileTypeManifest, base.FileNum(fileNum).DiskFileNum()) + path := base.MakeFilepath(fs, srcDir, base.FileTypeManifest, base.DiskFileNum(fileNum)) currentManifest, err = fs.Create(path) require.NoError(t, err) _, err = currentManifest.Write(randData(100)) @@ -74,7 +74,7 @@ func TestWorkloadCollector(t *testing.T) { c.onManifestCreated(pebble.ManifestCreateInfo{ Path: path, - FileNum: base.FileNum(fileNum), + FileNum: base.DiskFileNum(fileNum), }) return "" case "flush": diff --git a/table_cache_test.go b/table_cache_test.go index 285aaca7e1..4aa1edc023 100644 --- a/table_cache_test.go +++ b/table_cache_test.go @@ -285,7 +285,7 @@ func TestVirtualReadsWiring(t *testing.T) { l6 := currVersion.Levels[6] l6FileIter := l6.Iter() parentFile := l6FileIter.First() - f1 := d.mu.versions.nextFileNum + f1 := FileNum(d.mu.versions.nextFileNum) f2 := f1 + 1 d.mu.versions.nextFileNum += 2 diff --git a/tool/find.go b/tool/find.go index 40209f2431..d1e90b5cb6 100644 --- a/tool/find.go +++ b/tool/find.go @@ -51,15 +51,15 @@ type findT struct { verbose bool // Map from file num to path on disk. - files map[base.FileNum]string + files map[base.DiskFileNum]string // Map from file num to version edit index which references the file num. editRefs map[base.FileNum][]int // List of version edits. edits []manifest.VersionEdit // Sorted list of WAL file nums. - logs []base.FileNum + logs []base.DiskFileNum // Sorted list of manifest file nums. - manifests []base.FileNum + manifests []base.DiskFileNum // Sorted list of table file nums. tables []base.FileNum // Set of tables that contains references to the search key. @@ -135,7 +135,7 @@ func (f *findT) run(cmd *cobra.Command, args []string) { r := &refs[i] if lastFileNum != r.fileNum { lastFileNum = r.fileNum - fmt.Fprintf(stdout, "%s", f.opts.FS.PathBase(f.files[r.fileNum])) + fmt.Fprintf(stdout, "%s", f.opts.FS.PathBase(f.files[r.fileNum.DiskFileNum()])) if m := f.tableMeta[r.fileNum]; m != nil { fmt.Fprintf(stdout, " ") formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest) @@ -156,7 +156,7 @@ func (f *findT) run(cmd *cobra.Command, args []string) { // Find all of the manifests, logs, and tables in the specified directory. func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error { - f.files = make(map[base.FileNum]string) + f.files = make(map[base.DiskFileNum]string) f.editRefs = make(map[base.FileNum][]int) f.logs = nil f.manifests = nil @@ -174,15 +174,15 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error { } switch ft { case base.FileTypeLog: - f.logs = append(f.logs, fileNum.FileNum()) + f.logs = append(f.logs, fileNum) case base.FileTypeManifest: - f.manifests = append(f.manifests, fileNum.FileNum()) + f.manifests = append(f.manifests, fileNum) case base.FileTypeTable: f.tables = append(f.tables, fileNum.FileNum()) default: return } - f.files[fileNum.FileNum()] = path + f.files[fileNum] = path }) sort.Slice(f.logs, func(i, j int) bool { @@ -242,7 +242,7 @@ func (f *findT) readManifests(stdout io.Writer) { if ve.ComparerName != "" { f.comparerName = ve.ComparerName } - if num := ve.MinUnflushedLogNum; num != 0 { + if num := ve.MinUnflushedLogNum.FileNum(); num != 0 { f.editRefs[num] = append(f.editRefs[num], i) } for df := range ve.DeletedFiles { @@ -380,7 +380,7 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ refs = append(refs, findRef{ key: ikey.Clone(), value: append([]byte(nil), value...), - fileNum: fileNum, + fileNum: fileNum.FileNum(), }) } } @@ -397,7 +397,7 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) f.tableRefs = make(map[base.FileNum]bool) for _, fileNum := range f.tables { _ = func() (err error) { - path := f.files[fileNum] + path := f.files[fileNum.DiskFileNum()] tf, err := f.opts.FS.Open(path) if err != nil { fmt.Fprintf(stdout, "%s\n", err) @@ -435,7 +435,7 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers, private.SSTableRawTombstonesOpt.(sstable.ReaderOption)) if err != nil { - f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", f.files[fileNum], err.Error())) + f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", f.files[fileNum.DiskFileNum()], err.Error())) // Ensure the error only gets printed once. err = nil return diff --git a/tool/wal.go b/tool/wal.go index 66fcedb8d1..8df508c563 100644 --- a/tool/wal.go +++ b/tool/wal.go @@ -93,7 +93,7 @@ func (w *walT) runDump(cmd *cobra.Command, args []string) { var b pebble.Batch var buf bytes.Buffer - rr := record.NewReader(f, fileNum.FileNum()) + rr := record.NewReader(f, fileNum) for { offset := rr.Offset() r, err := rr.Next() diff --git a/version_set.go b/version_set.go index 46f2372306..25a6ee9213 100644 --- a/version_set.go +++ b/version_set.go @@ -104,19 +104,19 @@ type versionSet struct { // minUnflushedLogNum is the smallest WAL log file number corresponding to // mutations that have not been flushed to an sstable. - minUnflushedLogNum FileNum + minUnflushedLogNum base.DiskFileNum - // The next file number. A single counter is used to assign file numbers - // for the WAL, MANIFEST, sstable, and OPTIONS files. - nextFileNum FileNum + // The next file number. A single counter is used to assign file + // numbers for the WAL, MANIFEST, sstable, and OPTIONS files. + nextFileNum uint64 // The current manifest file number. - manifestFileNum FileNum + manifestFileNum base.DiskFileNum manifestMarker *atomicfs.Marker manifestFile vfs.File manifest *record.Writer - setCurrent func(FileNum) error + setCurrent func(base.DiskFileNum) error getFormatMajorVersion func() FormatMajorVersion writing bool @@ -129,7 +129,7 @@ func (vs *versionSet) init( dirname string, opts *Options, marker *atomicfs.Marker, - setCurrent func(FileNum) error, + setCurrent func(base.DiskFileNum) error, getFMV func() FormatMajorVersion, mu *sync.Mutex, ) { @@ -158,7 +158,7 @@ func (vs *versionSet) create( dirname string, opts *Options, marker *atomicfs.Marker, - setCurrent func(FileNum) error, + setCurrent func(base.DiskFileNum) error, getFormatMajorVersion func() FormatMajorVersion, mu *sync.Mutex, ) error { @@ -170,7 +170,7 @@ func (vs *versionSet) create( vs.picker = newCompactionPicker(newVersion, vs.opts, nil) // Note that a "snapshot" version edit is written to the manifest when it is // created. - vs.manifestFileNum = vs.getNextFileNum() + vs.manifestFileNum = vs.getNextDiskFileNum() err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum) if err == nil { if err = vs.manifest.Flush(); err != nil { @@ -191,7 +191,7 @@ func (vs *versionSet) create( vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ JobID: jobID, - Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum.DiskFileNum()), + Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum), FileNum: vs.manifestFileNum, Err: err, }) @@ -205,16 +205,16 @@ func (vs *versionSet) create( func (vs *versionSet) load( dirname string, opts *Options, - manifestFileNum FileNum, + manifestFileNum base.DiskFileNum, marker *atomicfs.Marker, - setCurrent func(FileNum) error, + setCurrent func(base.DiskFileNum) error, getFormatMajorVersion func() FormatMajorVersion, mu *sync.Mutex, ) error { vs.init(dirname, opts, marker, setCurrent, getFormatMajorVersion, mu) vs.manifestFileNum = manifestFileNum - manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum.DiskFileNum()) + manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum) manifestFilename := opts.FS.PathBase(manifestPath) // Read the versionEdits in the manifest file. @@ -411,7 +411,7 @@ func (vs *versionSet) logAndApply( if ve.MinUnflushedLogNum != 0 { if ve.MinUnflushedLogNum < vs.minUnflushedLogNum || - vs.nextFileNum <= ve.MinUnflushedLogNum { + vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) { panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d", ve.MinUnflushedLogNum)) } @@ -496,10 +496,10 @@ func (vs *versionSet) logAndApply( if sizeExceeded && !requireRotation { requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount) } - var newManifestFileNum FileNum + var newManifestFileNum base.DiskFileNum var prevManifestFileSize uint64 if requireRotation { - newManifestFileNum = vs.getNextFileNum() + newManifestFileNum = vs.getNextDiskFileNum() prevManifestFileSize = uint64(vs.manifest.Size()) } @@ -530,7 +530,7 @@ func (vs *versionSet) logAndApply( if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum); err != nil { vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ JobID: jobID, - Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum.DiskFileNum()), + Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum), FileNum: newManifestFileNum, Err: err, }) @@ -564,7 +564,7 @@ func (vs *versionSet) logAndApply( } vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ JobID: jobID, - Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum.DiskFileNum()), + Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum), FileNum: newManifestFileNum, }) } @@ -605,7 +605,7 @@ func (vs *versionSet) logAndApply( if newManifestFileNum != 0 { if vs.manifestFileNum != 0 { vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{ - fileNum: vs.manifestFileNum.DiskFileNum(), + fileNum: vs.manifestFileNum, fileSize: prevManifestFileSize, }) } @@ -696,10 +696,10 @@ func (vs *versionSet) incrementCompactionBytes(numBytes int64) { // createManifest creates a manifest file that contains a snapshot of vs. func (vs *versionSet) createManifest( - dirname string, fileNum, minUnflushedLogNum, nextFileNum FileNum, + dirname string, fileNum, minUnflushedLogNum base.DiskFileNum, nextFileNum uint64, ) (err error) { var ( - filename = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum.DiskFileNum()) + filename = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum) manifestFile vfs.File manifest *record.Writer ) @@ -774,16 +774,22 @@ func (vs *versionSet) createManifest( return nil } -func (vs *versionSet) markFileNumUsed(fileNum FileNum) { - if vs.nextFileNum <= fileNum { - vs.nextFileNum = fileNum + 1 +func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) { + if vs.nextFileNum <= uint64(fileNum) { + vs.nextFileNum = uint64(fileNum + 1) } } -func (vs *versionSet) getNextFileNum() FileNum { +func (vs *versionSet) getNextFileNum() base.FileNum { x := vs.nextFileNum vs.nextFileNum++ - return x + return base.FileNum(x) +} + +func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum { + x := vs.nextFileNum + vs.nextFileNum++ + return base.DiskFileNum(x) } func (vs *versionSet) append(v *version) { @@ -875,17 +881,17 @@ func (vs *versionSet) updateObsoleteTableMetricsLocked() { func setCurrentFunc( vers FormatMajorVersion, marker *atomicfs.Marker, fs vfs.FS, dirname string, dir vfs.File, -) func(FileNum) error { +) func(base.DiskFileNum) error { if vers < formatVersionedManifestMarker { // Pebble versions before `formatVersionedManifestMarker` used // the CURRENT file to signal which MANIFEST is current. Ignore // the filename read during LocateMarker. - return func(manifestFileNum FileNum) error { - if err := setCurrentFile(dirname, fs, manifestFileNum.DiskFileNum()); err != nil { + return func(manifestFileNum base.DiskFileNum) error { + if err := setCurrentFile(dirname, fs, manifestFileNum); err != nil { return err } if err := dir.Sync(); err != nil { - // This is a panic here, rather than higher in the call + // This is a panic here, rather than higher in the call // stack, for parity with the atomicfs.Marker behavior. // A panic is always necessary because failed Syncs are // unrecoverable. @@ -897,9 +903,11 @@ func setCurrentFunc( return setCurrentFuncMarker(marker, fs, dirname) } -func setCurrentFuncMarker(marker *atomicfs.Marker, fs vfs.FS, dirname string) func(FileNum) error { - return func(manifestFileNum FileNum) error { - return marker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum.DiskFileNum())) +func setCurrentFuncMarker( + marker *atomicfs.Marker, fs vfs.FS, dirname string, +) func(base.DiskFileNum) error { + return func(manifestFileNum base.DiskFileNum) error { + return marker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum)) } } diff --git a/version_set_test.go b/version_set_test.go index f9d5aba133..7a8fd0b8b0 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -73,7 +73,7 @@ func TestLatestRefCounting(t *testing.T) { // Grab some new file nums. d.mu.Lock() - f1 := d.mu.versions.nextFileNum + f1 := FileNum(d.mu.versions.nextFileNum) f2 := f1 + 1 d.mu.versions.nextFileNum += 2 d.mu.Unlock() @@ -260,7 +260,7 @@ func TestVirtualSSTableManifestReplay(t *testing.T) { // Grab some new file nums. d.mu.Lock() - f1 := d.mu.versions.nextFileNum + f1 := FileNum(d.mu.versions.nextFileNum) f2 := f1 + 1 d.mu.versions.nextFileNum += 2 d.mu.Unlock()