Skip to content

Commit

Permalink
db: use DiskFileName for manifest files and memtables
Browse files Browse the repository at this point in the history
These subsystems deal with on-disk files, using DiskFileName is more
appropriate. This reduces the number of conversions in the code a lot.
There are only a few `DiskFileName.FileName()` conversions left.

We change `DiskFileNum` to be a `uint64` like `FileNum`, so that
`DiskFileNum`s can also be compared and printed as integers.
  • Loading branch information
RaduBerinde committed Oct 9, 2023
1 parent d57a0be commit e3cbe65
Show file tree
Hide file tree
Showing 24 changed files with 139 additions and 122 deletions.
8 changes: 4 additions & 4 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (d *DB) Checkpoint(
}

ckErr = d.writeCheckpointManifest(
fs, formatVers, destDir, dir, manifestFileNum.DiskFileNum(), manifestSize,
fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
excludedFiles, removeBackingTables,
)
if ckErr != nil {
Expand All @@ -313,7 +313,7 @@ func (d *DB) Checkpoint(
if logNum == 0 {
continue
}
srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum.DiskFileNum())
srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
ckErr = vfs.Copy(fs, srcPath, destPath)
if ckErr != nil {
Expand Down Expand Up @@ -368,7 +368,7 @@ func (d *DB) writeCheckpointManifest(
// need to append another record with the excluded files (we cannot simply
// append a record after a raw data copy; see
// https://github.com/cockroachdb/cockroach/issues/100935).
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum())
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
w := record.NewWriter(dst)
for {
rr, err := r.Next()
Expand Down Expand Up @@ -421,7 +421,7 @@ func (d *DB) writeCheckpointManifest(
if err != nil {
return err
}
if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum.FileNum()); err != nil {
if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum); err != nil {
return err
}
return manifestMarker.Close()
Expand Down
6 changes: 3 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3670,7 +3670,7 @@ func (d *DB) scanObsoleteFiles(list []string) {
}
switch fileType {
case fileTypeLog:
if diskFileNum.FileNum() >= minUnflushedLogNum {
if diskFileNum >= minUnflushedLogNum {
continue
}
fi := fileInfo{fileNum: diskFileNum}
Expand All @@ -3679,7 +3679,7 @@ func (d *DB) scanObsoleteFiles(list []string) {
}
obsoleteLogs = append(obsoleteLogs, fi)
case fileTypeManifest:
if diskFileNum.FileNum() >= manifestFileNum {
if diskFileNum >= manifestFileNum {
continue
}
fi := fileInfo{fileNum: diskFileNum}
Expand Down Expand Up @@ -3785,7 +3785,7 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
// log that has not had its contents flushed to an sstable. We can recycle
// the prefix of d.mu.log.queue with log numbers less than
// minUnflushedLogNum.
if d.mu.log.queue[i].fileNum.FileNum() >= d.mu.versions.minUnflushedLogNum {
if d.mu.log.queue[i].fileNum >= d.mu.versions.minUnflushedLogNum {
obsoleteLogs = d.mu.log.queue[:i]
d.mu.log.queue = d.mu.log.queue[i:]
d.mu.versions.metrics.WAL.Files -= int64(len(obsoleteLogs))
Expand Down
18 changes: 10 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,7 +2302,7 @@ func (d *DB) walPreallocateSize() int {
return int(size)
}

func (d *DB) newMemTable(logNum FileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
func (d *DB) newMemTable(logNum base.DiskFileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
size := d.mu.mem.nextSize
if d.mu.mem.nextSize < d.opts.MemTableSize {
d.mu.mem.nextSize *= 2
Expand Down Expand Up @@ -2373,7 +2373,9 @@ func (d *DB) freeMemTable(m *memTable) {
m.free()
}

func (d *DB) newFlushableEntry(f flushable, logNum FileNum, logSeqNum uint64) *flushableEntry {
func (d *DB) newFlushableEntry(
f flushable, logNum base.DiskFileNum, logSeqNum uint64,
) *flushableEntry {
fe := &flushableEntry{
flushable: f,
flushed: make(chan struct{}),
Expand Down Expand Up @@ -2457,7 +2459,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
continue
}

var newLogNum base.FileNum
var newLogNum base.DiskFileNum
var prevLogSize uint64
if !d.opts.DisableWAL {
now := time.Now()
Expand Down Expand Up @@ -2514,7 +2516,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
}

// Both DB.mu and commitPipeline.mu must be held by the caller.
func (d *DB) rotateMemtable(newLogNum FileNum, logSeqNum uint64, prev *memTable) {
func (d *DB) rotateMemtable(newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable) {
// Create a new memtable, scheduling the previous one for flushing. We do
// this even if the previous memtable was empty because the DB.Flush
// mechanism is dependent on being able to wait for the empty memtable to
Expand All @@ -2541,14 +2543,14 @@ func (d *DB) rotateMemtable(newLogNum FileNum, logSeqNum uint64, prev *memTable)

// Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
// may be released and reacquired.
func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) {
func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
if d.opts.DisableWAL {
panic("pebble: invalid function call")
}

jobID := d.mu.nextJobID
d.mu.nextJobID++
newLogNum = d.mu.versions.getNextFileNum()
newLogNum = d.mu.versions.getNextDiskFileNum()

prevLogSize = uint64(d.mu.log.Size())

Expand All @@ -2574,7 +2576,7 @@ func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) {
}
d.mu.Unlock()

newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum.DiskFileNum())
newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum)

// Try to use a recycled log file. Recycling log files is an important
// performance optimization as it is faster to sync a file that has
Expand Down Expand Up @@ -2655,7 +2657,7 @@ func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) {
panic(err)
}

d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum.DiskFileNum(), fileSize: newLogSize})
d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize})
d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
WALMinSyncInterval: d.opts.WALMinSyncInterval,
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ func TestRollManifest(t *testing.T) {
d, err := Open("", opts)
require.NoError(t, err)

manifestFileNumber := func() FileNum {
manifestFileNumber := func() base.DiskFileNum {
d.mu.Lock()
defer d.mu.Unlock()
return d.mu.versions.manifestFileNum
Expand All @@ -961,7 +961,7 @@ func TestRollManifest(t *testing.T) {
}

lastManifestNum := manifestFileNumber()
manifestNums := []base.FileNum{lastManifestNum}
manifestNums := []base.DiskFileNum{lastManifestNum}
for i := 0; i < 5; i++ {
// MaxManifestFileSize is 1, but the rollover logic also counts edits
// since the last snapshot to decide on rollover, so do as many flushes as
Expand Down
5 changes: 3 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
Expand Down Expand Up @@ -247,7 +248,7 @@ type ManifestCreateInfo struct {
JobID int
Path string
// The file number of the new Manifest.
FileNum FileNum
FileNum base.DiskFileNum
Err error
}

Expand Down Expand Up @@ -414,7 +415,7 @@ type WALCreateInfo struct {
JobID int
Path string
// The file number of the new WAL.
FileNum FileNum
FileNum base.DiskFileNum
// The file number of a previous WAL which was recycled to create this
// one. Zero if recycling did not take place.
RecycledFileNum FileNum
Expand Down
3 changes: 2 additions & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ type flushableEntry struct {
delayedFlushForcedAt time.Time
// logNum corresponds to the WAL that contains the records present in the
// receiver.
logNum FileNum
logNum base.DiskFileNum
// logSize is the size in bytes of the associated WAL. Protected by DB.mu.
logSize uint64
// The current logSeqNum at the time the memtable was created. This is
Expand Down
2 changes: 1 addition & 1 deletion flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
d.mu.Lock()
pendingOutputs := make([]base.DiskFileNum, len(paths))
for i := range paths {
pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum()
pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
Expand Down
2 changes: 1 addition & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
// guaranteed to exist, because we unconditionally locate it
// during Open.
manifestFileNum := d.mu.versions.manifestFileNum
filename := base.MakeFilename(fileTypeManifest, manifestFileNum.DiskFileNum())
filename := base.MakeFilename(fileTypeManifest, manifestFileNum)
if err := d.mu.versions.manifestMarker.Move(filename); err != nil {
return errors.Wrap(err, "moving manifest marker")
}
Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func (d *DB) IngestAndExcise(

// Both DB.mu and commitPipeline.mu must be held while this is called.
func (d *DB) newIngestedFlushableEntry(
meta []*fileMetadata, seqNum uint64, logNum FileNum,
meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
) (*flushableEntry, error) {
// Update the sequence number for all of the sstables in the
// metadata. Writing the metadata to the manifest when the
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func (d *DB) ingest(
d.mu.Lock()
pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)+len(external))
for i := 0; i < len(paths)+len(shared)+len(external); i++ {
pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum()
pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
}

jobID := d.mu.nextJobID
Expand Down
18 changes: 8 additions & 10 deletions internal/base/filenames.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (fn FileNum) SafeFormat(w redact.SafePrinter, _ rune) {
// on disk. These could be manifests, log files, physical sstables on disk, the
// options file, but not virtual sstables.
func (fn FileNum) DiskFileNum() DiskFileNum {
return DiskFileNum{fn}
return DiskFileNum(fn)
}

// A DiskFileNum is just a FileNum belonging to a file which exists on disk.
Expand All @@ -39,20 +39,18 @@ func (fn FileNum) DiskFileNum() DiskFileNum {
// Converting a DiskFileNum to a FileNum is always valid, whereas converting a
// FileNum to DiskFileNum may not be valid and care should be taken to prove
// that the FileNum actually exists on disk.
type DiskFileNum struct {
fn FileNum
}
type DiskFileNum uint64

func (dfn DiskFileNum) String() string { return dfn.fn.String() }
func (dfn DiskFileNum) String() string { return fmt.Sprintf("%06d", dfn) }

// SafeFormat implements redact.SafeFormatter.
func (dfn DiskFileNum) SafeFormat(w redact.SafePrinter, verb rune) {
dfn.fn.SafeFormat(w, verb)
w.Printf("%06d", redact.SafeUint(dfn))
}

// FileNum converts a DiskFileNum to a FileNum. This conversion is always valid.
func (dfn DiskFileNum) FileNum() FileNum {
return dfn.fn
return FileNum(dfn)
}

// FileType enumerates the types of files found in a DB.
Expand Down Expand Up @@ -103,9 +101,9 @@ func ParseFilename(fs vfs.FS, filename string) (fileType FileType, dfn DiskFileN
filename = fs.PathBase(filename)
switch {
case filename == "CURRENT":
return FileTypeCurrent, DiskFileNum{0}, true
return FileTypeCurrent, 0, true
case filename == "LOCK":
return FileTypeLock, DiskFileNum{0}, true
return FileTypeLock, 0, true
case strings.HasPrefix(filename, "MANIFEST-"):
dfn, ok = parseDiskFileNum(filename[len("MANIFEST-"):])
if !ok {
Expand Down Expand Up @@ -156,7 +154,7 @@ func parseDiskFileNum(s string) (dfn DiskFileNum, ok bool) {
if err != nil {
return dfn, false
}
return DiskFileNum{FileNum(u)}, true
return DiskFileNum(u), true
}

// A Fataler fatals a process with a message when called.
Expand Down
2 changes: 1 addition & 1 deletion internal/base/filenames_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ directory contains 10 files, 3 unknown, 1 tables, 1 logs, 1 manifests`, buf.buf.
func TestRedactFileNum(t *testing.T) {
// Ensure that redaction never redacts file numbers.
require.Equal(t, redact.RedactableString("000005"), redact.Sprint(FileNum(5)))
require.Equal(t, redact.RedactableString("000005"), redact.Sprint(DiskFileNum{fn: 5}))
require.Equal(t, redact.RedactableString("000005"), redact.Sprint(DiskFileNum(5)))
}
4 changes: 2 additions & 2 deletions internal/manifest/testdata/version_edit_apply
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ edit
000005:[h#3,SET-j#4,SET]
000010:[j#3,SET-m#2,SET]
000002:[n#5,SET-q#3,SET]
zombies [{1} {4}]
zombies [1 4]

apply
edit
Expand Down Expand Up @@ -153,7 +153,7 @@ edit
L1
2
----
zombies [{1} {2}]
zombies [1 2]

# Deletion of a non-existent table results in an error.

Expand Down
10 changes: 5 additions & 5 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type VersionEdit struct {
// mutations that have not been flushed to an sstable.
//
// This is an optional field, and 0 represents it is not set.
MinUnflushedLogNum base.FileNum
MinUnflushedLogNum base.DiskFileNum

// ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
// Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
Expand All @@ -104,7 +104,7 @@ type VersionEdit struct {

// The next file number. A single counter is used to assign file numbers
// for the WAL, MANIFEST, sstable, and OPTIONS files.
NextFileNum base.FileNum
NextFileNum uint64

// LastSeqNum is an upper bound on the sequence numbers that have been
// assigned in flushed WALs. Unflushed WALs (that will be replayed during
Expand Down Expand Up @@ -175,14 +175,14 @@ func (v *VersionEdit) Decode(r io.Reader) error {
v.ComparerName = string(s)

case tagLogNumber:
n, err := d.readFileNum()
n, err := d.readUvarint()
if err != nil {
return err
}
v.MinUnflushedLogNum = n
v.MinUnflushedLogNum = base.DiskFileNum(n)

case tagNextFileNumber:
n, err := d.readFileNum()
n, err := d.readUvarint()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit e3cbe65

Please sign in to comment.