Skip to content

Commit

Permalink
*: delegate all log filename handling to the wal package
Browse files Browse the repository at this point in the history
This commit removes the remaining instances where non-wal packages attempt to
parse wal filenames.
  • Loading branch information
jbowens committed Feb 23, 2024
1 parent 4560105 commit 0b94619
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 60 deletions.
2 changes: 0 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3672,8 +3672,6 @@ func (d *DB) scanObsoleteFiles(list []string) {
continue
}
switch fileType {
case fileTypeLog:
// Ignore. Handled by wal.Manager.
case fileTypeManifest:
if diskFileNum >= manifestFileNum {
continue
Expand Down
29 changes: 14 additions & 15 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/wal"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
Expand Down Expand Up @@ -371,26 +372,22 @@ func TestLargeBatch(t *testing.T) {
}
}

logNum := func() base.DiskFileNum {
getLatestLog := func() wal.LogicalLog {
d.mu.Lock()
defer d.mu.Unlock()
logs, err := d.mu.log.manager.List()
require.NoError(t, err)
return base.DiskFileNum(logs[len(logs)-1].Num)
}
fileSize := func(fileNum base.DiskFileNum) int64 {
info, err := d.opts.FS.Stat(base.MakeFilepath(d.opts.FS, "", fileTypeLog, fileNum))
require.NoError(t, err)
return info.Size()
return logs[len(logs)-1]
}
memTableCreationSeqNum := func() uint64 {
d.mu.Lock()
defer d.mu.Unlock()
return d.mu.mem.mutable.logSeqNum
}

startLogNum := logNum()
startLogStartSize := fileSize(startLogNum)
startLog := getLatestLog()
startLogStartSize, err := startLog.PhysicalSize()
require.NoError(t, err)
startSeqNum := d.mu.versions.logSeqNum.Load()

// Write a key with a value larger than the memtable size.
Expand All @@ -399,18 +396,20 @@ func TestLargeBatch(t *testing.T) {
// Verify that the large batch was written to the WAL that existed before it
// was committed. We verify that WAL rotation occurred, where the large batch
// was written to, and that the new WAL is empty.
endLogNum := logNum()
if startLogNum == endLogNum {
endLog := getLatestLog()
if startLog.Num == endLog.Num {
t.Fatal("expected WAL rotation")
}
startLogEndSize := fileSize(startLogNum)
startLogEndSize, err := startLog.PhysicalSize()
require.NoError(t, err)
if startLogEndSize == startLogStartSize {
t.Fatalf("expected large batch to be written to %s.log, but file size unchanged at %d",
startLogNum, startLogEndSize)
startLog.Num, startLogEndSize)
}
endLogSize := fileSize(endLogNum)
endLogSize, err := endLog.PhysicalSize()
require.NoError(t, err)
if endLogSize != 0 {
t.Fatalf("expected %s.log to be empty, but found %d", endLogNum, endLogSize)
t.Fatalf("expected %s to be empty, but found %d", endLog, endLogSize)
}
if creationSeqNum := memTableCreationSeqNum(); creationSeqNum <= startSeqNum {
t.Fatalf("expected memTable.logSeqNum=%d > largeBatch.seqNum=%d", creationSeqNum, startSeqNum)
Expand Down
17 changes: 11 additions & 6 deletions internal/base/filenames.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package base

import (
"fmt"
"path/filepath"
"strconv"
"strings"

Expand Down Expand Up @@ -65,9 +66,8 @@ const (
// MakeFilename builds a filename from components.
func MakeFilename(fileType FileType, dfn DiskFileNum) string {
switch fileType {
// TODO(sumeer): stop handling FileTypeLog in this function.
case FileTypeLog:
return fmt.Sprintf("%s.log", dfn)
panic("the pebble/wal pkg is responsible for constructing WAL filenames")
case FileTypeLock:
return "LOCK"
case FileTypeTable:
Expand Down Expand Up @@ -134,8 +134,6 @@ func ParseFilename(fs vfs.FS, filename string) (fileType FileType, dfn DiskFileN
switch filename[i+1:] {
case "sst":
return FileTypeTable, dfn, true
case "log":
return FileTypeLog, dfn, true
}
}
return 0, dfn, false
Expand Down Expand Up @@ -175,6 +173,15 @@ func MustExist(fs vfs.FS, filename string, fataler Fataler, err error) {
var total, unknown, tables, logs, manifests int
total = len(ls)
for _, f := range ls {
// The file format of log files is an implementation detail of the wal/
// package that the internal/base package is not privy to. We can't call
// into the wal package because that would introduce a cyclical
// dependency. For our purposes, an exact count isn't important and we
// just count files with .log extensions.
if filepath.Ext(f) == ".log" {
logs++
continue
}
typ, _, ok := ParseFilename(fs, f)
if !ok {
unknown++
Expand All @@ -183,8 +190,6 @@ func MustExist(fs vfs.FS, filename string, fataler Fataler, err error) {
switch typ {
case FileTypeTable:
tables++
case FileTypeLog:
logs++
case FileTypeManifest:
manifests++
}
Expand Down
9 changes: 7 additions & 2 deletions internal/base/filenames_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ import (
)

func TestParseFilename(t *testing.T) {
// NB: log files are not handled by ParseFilename, so valid log filenames
// appear here with a false value. See the wal/ package.
testCases := map[string]bool{
"000000.log": true,
"000000.log": false,
"000000.log.zip": false,
"000000..log": false,
"000001-002.log": false,
"a000000.log": false,
"abcdef.log": false,
"000001ldb": false,
Expand Down Expand Up @@ -57,12 +60,14 @@ func TestFilenameRoundTrip(t *testing.T) {
// LOCK files aren't numbered.
FileTypeLock: false,
// The remaining file types are numbered.
FileTypeLog: true,
FileTypeManifest: true,
FileTypeTable: true,
FileTypeOptions: true,
FileTypeOldTemp: true,
FileTypeTemp: true,
// NB: Log filenames are created and parsed elsewhere in the wal/
// package.
// FileTypeLog: true,
}
fs := vfs.NewMem()
for fileType, numbered := range testCases {
Expand Down
4 changes: 3 additions & 1 deletion replay/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ func collectCorpus(t *testing.T, fs *vfs.MemFS, name string) {
case "table":
filePath = base.MakeFilepath(fs, dir, base.FileTypeTable, fileNum)
case "log":
filePath = base.MakeFilepath(fs, dir, base.FileTypeLog, fileNum)
// TODO(jackson): expose a func from the wal package for
// constructing log filenames for tests?
filePath = fs.PathJoin(dir, fmt.Sprintf("%s.log", fileNum))
case "manifest":
filePath = base.MakeFilepath(fs, dir, base.FileTypeManifest, fileNum)
}
Expand Down
5 changes: 3 additions & 2 deletions tool/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/wal"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -77,7 +78,7 @@ func (w *walT) runDump(cmd *cobra.Command, args []string) {
// necessary in case WAL recycling was used (which it is usually is). If
// we can't parse the filename or it isn't a log file, we'll plow ahead
// anyways (which will likely fail when we try to read the file).
_, fileNum, ok := base.ParseFilename(w.opts.FS, arg)
fileNum, _, ok := wal.ParseLogFilename(arg)
if !ok {
fileNum = 0
}
Expand All @@ -93,7 +94,7 @@ func (w *walT) runDump(cmd *cobra.Command, args []string) {

var b pebble.Batch
var buf bytes.Buffer
rr := record.NewReader(f, fileNum)
rr := record.NewReader(f, base.DiskFileNum(fileNum))
for {
offset := rr.Offset()
r, err := rr.Next()
Expand Down
2 changes: 1 addition & 1 deletion wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (wm *failoverManager) RecyclerForTesting() *LogRecycler {

// logCreator implements the logCreator func type.
func (wm *failoverManager) logCreator(
dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int,
dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
) (logFile vfs.File, initialFileSize uint64, err error) {
logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
isPrimary := dir == wm.opts.Primary
Expand Down
12 changes: 6 additions & 6 deletions wal/failover_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ type failoverWriter struct {
// The latest *LogWriter is (will be) at nextWriterIndex-1.
//
// INVARIANT: nextWriterIndex <= len(writers)
nextWriterIndex logNameIndex
nextWriterIndex LogNameIndex
closed bool
// metrics is initialized in Close. Currently we just use the metrics from
// the latest writer after it is closed, since in the common case with
Expand Down Expand Up @@ -389,7 +389,7 @@ type failoverWriterOpts struct {
}

func simpleLogCreator(
dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int,
dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
) (f vfs.File, initialFileSize uint64, err error) {
filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
// Create file.
Expand All @@ -400,7 +400,7 @@ func simpleLogCreator(
}

type logCreator func(
dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int,
dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
) (f vfs.File, initialFileSize uint64, err error)

func newFailoverWriter(
Expand Down Expand Up @@ -667,13 +667,13 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
// [0, closeCalledCount) have had LogWriter.Close called (though may not
// have finished) or the LogWriter will never be non-nil. Either way, they
// have been "processed".
closeCalledCount := logNameIndex(0)
closeCalledCount := LogNameIndex(0)
// lastWriterState is the state for the last writer, for which we are
// waiting for LogWriter.Close to finish or for creation to be unsuccessful.
// What is considered the last writer can change. All state is protected by
// ww.mu.
type lastWriterState struct {
index logNameIndex
index LogNameIndex
closed bool
err error
metrics record.LogWriterMetrics
Expand Down Expand Up @@ -798,7 +798,7 @@ func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
if ww.mu.writers[i].w != nil {
ll.segments = append(ll.segments, segmentWithSizeEtc{
segment: segment{
logNameIndex: logNameIndex(i),
logNameIndex: LogNameIndex(i),
dir: ww.mu.writers[i].dir,
},
approxFileSize: ww.mu.writers[i].approxFileSize,
Expand Down
10 changes: 5 additions & 5 deletions wal/failover_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestFailoverWriter(t *testing.T) {
return
}
fmt.Fprintf(b, "log writers:\n")
for i := logNameIndex(0); i < w.mu.nextWriterIndex; i++ {
for i := LogNameIndex(0); i < w.mu.nextWriterIndex; i++ {
rLatency, rErr := w.mu.writers[i].r.ongoingLatencyOrError()
require.Equal(t, time.Duration(0), rLatency)
if w.mu.writers[i].createError != nil {
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestFailoverWriter(t *testing.T) {
testLogCreator := simpleLogCreator
if firstCallInitialFileSize > 0 {
testLogCreator = func(
dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int,
dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
) (f vfs.File, initialFileSize uint64, err error) {
f, _, err = simpleLogCreator(dir, wn, li, r, jobID)
if numCreateCalls == 0 {
Expand Down Expand Up @@ -630,7 +630,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
dirs[i].File = f
}
for i := 0; i < numLogWriters; i++ {
bFS.setConf(makeLogFilename(0, logNameIndex(i)), blockingWrite)
bFS.setConf(makeLogFilename(0, LogNameIndex(i)), blockingWrite)
}
stopper := newStopper()
logWriterCreated := make(chan struct{}, 100)
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
}
time.Sleep(5 * time.Millisecond)
for i := 0; i < numLogWriters; i++ {
bFS.setConf(makeLogFilename(0, logNameIndex(i)), 0)
bFS.setConf(makeLogFilename(0, LogNameIndex(i)), 0)
}
_, err = ww.Close()
require.NoError(t, err)
Expand All @@ -681,7 +681,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
}
for i := 0; i < numLogWriters; i++ {
func() {
f, err := memFS.Open(memFS.PathJoin(dirs[i%2].Dirname, makeLogFilename(0, logNameIndex(i))))
f, err := memFS.Open(memFS.PathJoin(dirs[i%2].Dirname, makeLogFilename(0, LogNameIndex(i))))
if err != nil {
t.Logf("file %d: %s", i, err.Error())
return
Expand Down
6 changes: 3 additions & 3 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type LogicalLog struct {
// segment of a logical WAL. If a failover occurred during a WAL's lifetime, a
// WAL may be composed of multiple segments.
type segment struct {
logNameIndex logNameIndex
logNameIndex LogNameIndex
dir Dir
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func (a *FileAccumulator) Finish() Logs {
func (a *FileAccumulator) maybeAccumulate(
fs vfs.FS, dirname, name string,
) (isLogFile bool, err error) {
dfn, li, ok := parseLogFilename(name)
dfn, li, ok := ParseLogFilename(name)
if !ok {
return false, nil
}
Expand All @@ -144,7 +144,7 @@ func (a *FileAccumulator) maybeAccumulate(
}
// Ensure we haven't seen this log index yet, and find where it
// slots within this log's segments.
j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li logNameIndex) int {
j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li LogNameIndex) int {
return cmp.Compare(s.logNameIndex, li)
})
if found {
Expand Down
6 changes: 3 additions & 3 deletions wal/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestReader(t *testing.T) {
td.MaybeScanArgs(t, "logNameIndex", &index)
td.MaybeScanArgs(t, "recycleFilename", &recycleFilename)

filename := makeLogFilename(NumWAL(logNum), logNameIndex(index))
filename := makeLogFilename(NumWAL(logNum), LogNameIndex(index))
var f vfs.File
var err error
if recycleFilename != "" {
Expand Down Expand Up @@ -209,11 +209,11 @@ func TestReader(t *testing.T) {
// opening the next physical segment file fails.
if len(forceLogNameIndexes) > 0 {
for _, li := range forceLogNameIndexes {
j, found := slices.BinarySearchFunc(segments, logNameIndex(li), func(s segment, li logNameIndex) int {
j, found := slices.BinarySearchFunc(segments, LogNameIndex(li), func(s segment, li LogNameIndex) int {
return cmp.Compare(s.logNameIndex, li)
})
require.False(t, found)
segments = slices.Insert(segments, j, segment{logNameIndex: logNameIndex(li), dir: Dir{FS: fs}})
segments = slices.Insert(segments, j, segment{logNameIndex: LogNameIndex(li), dir: Dir{FS: fs}})
}
}
ll := LogicalLog{Num: log.Num, segments: segments}
Expand Down
2 changes: 1 addition & 1 deletion wal/standalone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *StandaloneManager) Obsolete(
if noRecycle || !m.recycler.Add(fi) {
toDelete = append(toDelete, DeletableLog{
FS: m.o.Primary.FS,
Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, base.MakeFilename(base.FileTypeLog, fi.FileNum)),
Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
NumWAL: NumWAL(fi.FileNum),
ApproxFileSize: fi.FileSize,
})
Expand Down
Loading

0 comments on commit 0b94619

Please sign in to comment.