From 739a233602a2f17baa3672f8991eb4814afdcee1 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 16 Feb 2024 17:55:11 -0500 Subject: [PATCH] wal: separate read interface from Manager This commit separates the interface for reading WAL files from the Manager interface. This helps unify some code on the read path, but it also makes it easier to support changing configuration (eg, changing secondary location or disabling failover). --- checkpoint.go | 36 ++++++----- db_test.go | 6 +- open.go | 41 +++++++------ wal/failover_manager.go | 29 +-------- wal/failover_manager_test.go | 4 +- wal/log_recycler.go | 8 +-- wal/reader.go | 113 ++++++++++++++++++++++------------- wal/reader_test.go | 10 ++-- wal/standalone_manager.go | 104 +++++--------------------------- wal/wal.go | 29 ++++----- 10 files changed, 156 insertions(+), 224 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 6bd6a89204..9163f984fe 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -8,6 +8,7 @@ import ( "io" "os" + "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/record" @@ -193,23 +194,23 @@ func (d *DB) Checkpoint( for diskFileNum := range d.mu.versions.backingState.fileBackingMap { virtualBackingFiles[diskFileNum] = struct{}{} } - var logFiles []wal.CopyableLog + + queuedLogNums := make([]wal.NumWAL, 0, len(memQueue)) for i := range memQueue { - logNum := memQueue[i].logNum - if logNum == 0 { - continue - } - files, err := d.mu.log.manager.ListFiles(wal.NumWAL(logNum)) - if err != nil { - return err + if logNum := memQueue[i].logNum; logNum != 0 { + queuedLogNums = append(queuedLogNums, wal.NumWAL(logNum)) } - logFiles = append(logFiles, files...) } // Release the manifest and DB.mu so we don't block other operations on // the database. d.mu.versions.logUnlock() d.mu.Unlock() + allLogicalLogs, err := d.mu.log.manager.List() + if err != nil { + return err + } + // Wrap the normal filesystem with one which wraps newly created files with // vfs.NewSyncingFile. fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{ @@ -321,11 +322,18 @@ func (d *DB) Checkpoint( // Copy the WAL files. We copy rather than link because WAL file recycling // will cause the WAL files to be reused which would invalidate the // checkpoint. - for _, src := range logFiles { - destPath := fs.PathJoin(destDir, src.FS.PathBase(src.Path)) - ckErr = vfs.CopyAcrossFS(src.FS, src.Path, fs, destPath) - if ckErr != nil { - return ckErr + for _, logNum := range queuedLogNums { + log, ok := allLogicalLogs.Get(logNum) + if !ok { + return errors.Newf("log %s not found", logNum) + } + for i := 0; i < log.NumSegments(); i++ { + srcFS, srcPath := log.SegmentLocation(i) + destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath)) + ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath) + if ckErr != nil { + return ckErr + } } } diff --git a/db_test.go b/db_test.go index d5bf0699b5..8d35877ac6 100644 --- a/db_test.go +++ b/db_test.go @@ -374,9 +374,9 @@ func TestLargeBatch(t *testing.T) { logNum := func() base.DiskFileNum { d.mu.Lock() defer d.mu.Unlock() - walNums, err := d.mu.log.manager.List() + logs, err := d.mu.log.manager.List() require.NoError(t, err) - return base.DiskFileNum(walNums[len(walNums)-1]) + return base.DiskFileNum(logs[len(logs)-1].Num) } fileSize := func(fileNum base.DiskFileNum) int64 { info, err := d.opts.FS.Stat(base.MakeFilepath(d.opts.FS, "", fileTypeLog, fileNum)) @@ -1979,7 +1979,7 @@ func TestRecycleLogs(t *testing.T) { defer d.mu.Unlock() walNums, err := d.mu.log.manager.List() require.NoError(t, err) - return base.DiskFileNum(walNums[len(walNums)-1]) + return base.DiskFileNum(walNums[len(walNums)-1].Num) } logCount := func() int { d.mu.Lock() diff --git a/open.go b/open.go index c27ea34182..b5f381310d 100644 --- a/open.go +++ b/open.go @@ -321,7 +321,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { Buckets: FsyncLatencyBuckets, }) walManager := &wal.StandaloneManager{} - err = walManager.Init(wal.Options{ + walOpts := wal.Options{ Primary: wal.Dir{FS: opts.FS, Dirname: walDirname}, Secondary: wal.Dir{}, MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum), @@ -334,7 +334,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) { QueueSemChan: d.commit.logSyncQSem, Logger: opts.Logger, EventListener: walEventListenerAdaptor{l: opts.EventListener}, - }) + } + wals, err := wal.Scan(walOpts.Dirs()...) + if err != nil { + return nil, err + } + err = walManager.Init(walOpts, wals) if err != nil { return nil, err } @@ -343,10 +348,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { walManager.Close() } }() - wals, err := walManager.List() - if err != nil { - return nil, err - } + d.mu.log.manager = walManager // List the objects. This also happens to include WAL log files, if they are @@ -427,7 +429,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { if n := len(wals); n > 0 { // Don't reuse any obsolete file numbers to avoid modifying an // ingested sstable's original external file. - d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1])) + d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num)) } // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the @@ -448,9 +450,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } // Replay any newer log files than the ones named in the manifest. - var replayWALs []wal.NumWAL - for i, walNum := range wals { - if base.DiskFileNum(walNum) >= d.mu.versions.minUnflushedLogNum { + var replayWALs wal.Logs + for i, w := range wals { + if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum { replayWALs = wals[i:] break } @@ -729,12 +731,9 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) replayWAL( - jobID int, ve *versionEdit, wn wal.NumWAL, strictWALTail bool, + jobID int, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool, ) (toFlush flushableList, maxSeqNum uint64, err error) { - rr, err := d.mu.log.manager.OpenForRead(wn) - if err != nil { - return nil, 0, err - } + rr := ll.OpenForRead() defer rr.Close() var ( b Batch @@ -784,7 +783,7 @@ func (d *DB) replayWAL( if mem != nil { return } - mem, entry = d.newMemTable(base.DiskFileNum(wn), seqNum) + mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum) if d.opts.ReadOnly { d.mu.mem.mutable = mem d.mu.mem.queue = append(d.mu.mem.queue, entry) @@ -811,7 +810,7 @@ func (d *DB) replayWAL( } defer func() { if err != nil { - err = errors.WithDetailf(err, "replaying wal %d, offset %d", wn, offset) + err = errors.WithDetailf(err, "replaying wal %d, offset %d", ll.Num, offset) } }() @@ -836,7 +835,7 @@ func (d *DB) replayWAL( if buf.Len() < batchrepr.HeaderLen { return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)", - errors.Safe(base.DiskFileNum(wn)), offset) + errors.Safe(base.DiskFileNum(ll.Num)), offset) } if d.opts.ErrorIfNotPristine { @@ -923,7 +922,7 @@ func (d *DB) replayWAL( } entry, err = d.newIngestedFlushableEntry( - meta, seqNum, base.DiskFileNum(wn), + meta, seqNum, base.DiskFileNum(ll.Num), ) if err != nil { return nil, 0, err @@ -983,7 +982,7 @@ func (d *DB) replayWAL( if err != nil { return nil, 0, err } - entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(wn), b.SeqNum()) + entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum()) // Disable memory accounting by adding a reader ref that will never be // removed. entry.readerRefs.Add(1) @@ -1026,7 +1025,7 @@ func (d *DB) replayWAL( } d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %d; replayed %d keys in %d batches", - jobID, base.DiskFileNum(wn).String(), offset, keysReplayed, batchesReplayed) + jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed) flushMem() // mem is nil here. diff --git a/wal/failover_manager.go b/wal/failover_manager.go index 694cf11f39..c15252b0a5 100644 --- a/wal/failover_manager.go +++ b/wal/failover_manager.go @@ -409,7 +409,7 @@ var _ Manager = &failoverManager{} // - calls to EventListener // Init implements Manager. -func (wm *failoverManager) Init(o Options) error { +func (wm *failoverManager) Init(o Options, initial Logs) error { if o.timeSource == nil { o.timeSource = defaultTime{} } @@ -443,9 +443,8 @@ func (wm *failoverManager) Init(o Options) error { } // List implements Manager. -func (wm *failoverManager) List() ([]NumWAL, error) { - // TODO(jackson): - return nil, nil +func (wm *failoverManager) List() (Logs, error) { + return Scan(wm.opts.Primary, wm.opts.Secondary) } // Obsolete implements Manager. @@ -456,22 +455,6 @@ func (wm *failoverManager) Obsolete( return nil, nil } -// OpenForRead implements Manager. -func (wm *failoverManager) OpenForRead(wn NumWAL) (Reader, error) { - // TODO(jackson): - // - // Temporary implementation note: Gap and de-duping logic on read. - // - // Gap can be there in adjacent logs but not between log n and the union of logs 0..n-1. - // Entries 0..50 sent to log 0. - // Entries 0..60 sent to log 1 - // Log 0 commits up to 50 - // Entries 50..100 sent to log 2 - // Log 2 commits all of these. - // Log 1 only commits 0..10 and node fails and restarts. - return nil, nil -} - // Create implements Manager. func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) { fwOpts := failoverWriterOpts{ @@ -501,12 +484,6 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) { return ww, err } -// ListFiles implements Manager. -func (wm *failoverManager) ListFiles(wn NumWAL) (files []CopyableLog, err error) { - // TODO(sumeer): - return nil, nil -} - // ElevateWriteStallThresholdForFailover implements Manager. func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool { return wm.monitor.elevateWriteStallThresholdForFailover() diff --git a/wal/failover_manager_test.go b/wal/failover_manager_test.go index 16e82e6610..b2b59884fe 100644 --- a/wal/failover_manager_test.go +++ b/wal/failover_manager_test.go @@ -362,7 +362,9 @@ func TestManagerFailover(t *testing.T) { monitorStateForTesting: monitorStateForTesting, logWriterCreatedForTesting: logWriterCreatedForTesting, } - err := fm.Init(o) + logs, err := Scan(o.Dirs()...) + require.NoError(t, err) + err = fm.Init(o, logs) return errorToStr(err) case "create-writer": diff --git a/wal/log_recycler.go b/wal/log_recycler.go index 63387654f5..cc327b5b98 100644 --- a/wal/log_recycler.go +++ b/wal/log_recycler.go @@ -42,14 +42,14 @@ func (r *LogRecycler) Init(maxNumLogFiles int) { // MinRecycleLogNum returns the current minimum log number that is allowed to // be recycled. -func (r *LogRecycler) MinRecycleLogNum() base.DiskFileNum { - return r.minRecycleLogNum +func (r *LogRecycler) MinRecycleLogNum() NumWAL { + return NumWAL(r.minRecycleLogNum) } // SetMinRecycleLogNum sets the minimum log number that is allowed to be // recycled. -func (r *LogRecycler) SetMinRecycleLogNum(n base.DiskFileNum) { - r.minRecycleLogNum = n +func (r *LogRecycler) SetMinRecycleLogNum(n NumWAL) { + r.minRecycleLogNum = base.DiskFileNum(n) } // Add attempts to recycle the log file specified by logInfo. Returns true if diff --git a/wal/reader.go b/wal/reader.go index d1ac5a830a..83c083b827 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -19,6 +19,15 @@ import ( "github.com/cockroachdb/pebble/vfs" ) +// A LogicalLog identifies a logical WAL and its consituent segment files. +type LogicalLog struct { + Num NumWAL + // segments contains the list of the consistuent physical segment files that + // make up the single logical WAL file. segments is ordered by increasing + // logIndex. + segments []segment +} + // A segment represents an individual physical file that makes up a contiguous // segment of a logical WAL. If a failover occurred during a WAL's lifetime, a // WAL may be composed of multiple segments. @@ -27,51 +36,60 @@ type segment struct { dir Dir } +// String implements fmt.Stringer. func (s segment) String() string { return fmt.Sprintf("(%s,%s)", s.dir.Dirname, s.logNameIndex) } -// A logicalWAL identifies a logical WAL and its consituent segment files. -type logicalWAL struct { - NumWAL - // segments contains the list of the consistuent physical segment files that - // make up the single logical WAL file. segments is ordered by increasing - // logIndex. - segments []segment +// NumSegments returns the number of consitutent physical log files that make up +// thie log. +func (ll LogicalLog) NumSegments() int { return len(ll.segments) } + +// SegmentLocation returns the FS and path for the i-th physical segment file. +func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string) { + s := ll.segments[i] + path := s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.Num, s.logNameIndex)) + return s.dir.FS, path +} + +// PhysicalSize stats each of the log's physical files, summing their sizes. +func (ll LogicalLog) PhysicalSize() (uint64, error) { + var size uint64 + for i := range ll.segments { + fs, path := ll.SegmentLocation(i) + stat, err := fs.Stat(path) + if err != nil { + return 0, err + } + size += uint64(stat.Size()) + } + return size, nil } -func (w logicalWAL) String() string { +// OpenForRead a logical WAL for reading. +func (ll LogicalLog) OpenForRead() Reader { + return newVirtualWALReader(ll) +} + +// String implements fmt.Stringer. +func (ll LogicalLog) String() string { var sb strings.Builder - sb.WriteString(base.DiskFileNum(w.NumWAL).String()) + sb.WriteString(base.DiskFileNum(ll.Num).String()) sb.WriteString(": {") - for i := range w.segments { + for i := range ll.segments { if i > 0 { sb.WriteString(", ") } - sb.WriteString(w.segments[i].String()) + sb.WriteString(ll.segments[i].String()) } sb.WriteString("}") return sb.String() } -type logicalWALs []logicalWAL - -// get retrieves the WAL with the given number if present. The second return -// value indicates whether or not the WAL was found. -func (wals logicalWALs) get(num NumWAL) (logicalWAL, bool) { - i, found := slices.BinarySearchFunc(wals, num, func(lw logicalWAL, n NumWAL) int { - return cmp.Compare(lw.NumWAL, n) - }) - if !found { - return logicalWAL{}, false - } - return wals[i], true -} - -// listLogs finds all log files in the provided directories. It returns an +// Scan finds all log files in the provided directories. It returns an // ordered list of WALs in increasing NumWAL order. -func listLogs(dirs ...Dir) (logicalWALs, error) { - var wals []logicalWAL +func Scan(dirs ...Dir) (Logs, error) { + var wals []LogicalLog for _, d := range dirs { ls, err := d.FS.List(d.Dirname) if err != nil { @@ -83,11 +101,11 @@ func listLogs(dirs ...Dir) (logicalWALs, error) { continue } // Have we seen this logical log number yet? - i, found := slices.BinarySearchFunc(wals, dfn, func(lw logicalWAL, n NumWAL) int { - return cmp.Compare(lw.NumWAL, n) + i, found := slices.BinarySearchFunc(wals, dfn, func(lw LogicalLog, n NumWAL) int { + return cmp.Compare(lw.Num, n) }) if !found { - wals = slices.Insert(wals, i, logicalWAL{NumWAL: dfn, segments: make([]segment, 0, 1)}) + wals = slices.Insert(wals, i, LogicalLog{Num: dfn, segments: make([]segment, 0, 1)}) } // Ensure we haven't seen this log index yet, and find where it @@ -105,11 +123,25 @@ func listLogs(dirs ...Dir) (logicalWALs, error) { return wals, nil } -func newVirtualWALReader(logNum NumWAL, segments []segment) *virtualWALReader { +// Logs holds a collection of WAL files. +type Logs []LogicalLog + +// Get retrieves the WAL with the given number if present. The second return +// value indicates whether or not the WAL was found. +func (l Logs) Get(num NumWAL) (LogicalLog, bool) { + i, found := slices.BinarySearchFunc(l, num, func(lw LogicalLog, n NumWAL) int { + return cmp.Compare(lw.Num, n) + }) + if !found { + return LogicalLog{}, false + } + return l[i], true +} + +func newVirtualWALReader(wal LogicalLog) *virtualWALReader { return &virtualWALReader{ - logNum: logNum, - segments: segments, - currIndex: -1, + LogicalLog: wal, + currIndex: -1, } } @@ -120,8 +152,7 @@ func newVirtualWALReader(logNum NumWAL, segments []segment) *virtualWALReader { // successor. type virtualWALReader struct { // VirtualWAL metadata. - logNum NumWAL - segments []segment + LogicalLog // State pertaining to the current position of the reader within the virtual // WAL and its constituent physical files. @@ -221,7 +252,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // the caller to interpret it as corruption, but it seems safer to // be explicit and surface the corruption error here. return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch", - r.logNum, errors.Safe(r.segments[r.currIndex].logNameIndex)) + r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex)) } // There's a subtlety necessitated by LogData operations. A LogData @@ -273,15 +304,13 @@ func (r *virtualWALReader) nextFile() error { return io.EOF } - segment := r.segments[r.currIndex] - fs := segment.dir.FS - path := fs.PathJoin(segment.dir.Dirname, makeLogFilename(r.logNum, segment.logNameIndex)) + fs, path := r.LogicalLog.SegmentLocation(r.currIndex) r.off.PhysicalFile = path r.off.Physical = 0 var err error if r.currFile, err = fs.Open(path); err != nil { return errors.Wrapf(err, "opening WAL file segment %q", path) } - r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.logNum)) + r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num)) return nil } diff --git a/wal/reader_test.go b/wal/reader_test.go index 38ae89a10b..3453ceb612 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -50,7 +50,7 @@ func TestList(t *testing.T) { Dirname: dirname, }) } - logs, err := listLogs(dirs...) + logs, err := Scan(dirs...) if err != nil { return err.Error() } @@ -196,9 +196,9 @@ func TestReader(t *testing.T) { var forceLogNameIndexes []uint64 td.ScanArgs(t, "logNum", &logNum) td.MaybeScanArgs(t, "forceLogNameIndexes", &forceLogNameIndexes) - logs, err := listLogs(Dir{FS: fs}) + logs, err := Scan(Dir{FS: fs}) require.NoError(t, err) - log, ok := logs.get(NumWAL(logNum)) + log, ok := logs.Get(NumWAL(logNum)) if !ok { return "not found" } @@ -216,8 +216,8 @@ func TestReader(t *testing.T) { segments = slices.Insert(segments, j, segment{logNameIndex: logNameIndex(li), dir: Dir{FS: fs}}) } } - - r := newVirtualWALReader(log.NumWAL, segments) + ll := LogicalLog{Num: log.Num, segments: segments} + r := ll.OpenForRead() for { rr, off, err := r.NextRecord() fmt.Fprintf(&buf, "r.NextRecord() = (rr, %s, %v)\n", off, err) diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index a17e90f6be..1047bf0e01 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -6,7 +6,6 @@ package wal import ( "cmp" - "io" "os" "slices" "sync" @@ -43,7 +42,7 @@ type StandaloneManager struct { var _ Manager = &StandaloneManager{} // Init implements Manager. -func (m *StandaloneManager) Init(o Options) error { +func (m *StandaloneManager) Init(o Options, initial Logs) error { if o.Secondary.FS != nil { return errors.AssertionFailedf("cannot create StandaloneManager with a secondary") } @@ -58,27 +57,19 @@ func (m *StandaloneManager) Init(o Options) error { } m.recycler.Init(o.MaxNumRecyclableLogs) - ls, err := o.Primary.FS.List(o.Primary.Dirname) - if err != nil { - return err - } closeAndReturnErr := func(err error) error { err = firstError(err, walDir.Close()) return err } var files []base.FileInfo - for _, filename := range ls { - ft, fn, ok := base.ParseFilename(o.Primary.FS, filename) - if !ok || ft != base.FileTypeLog { - continue - } - stat, err := o.Primary.FS.Stat(o.Primary.FS.PathJoin(o.Primary.Dirname, filename)) + for _, ll := range initial { + size, err := ll.PhysicalSize() if err != nil { return closeAndReturnErr(err) } - files = append(files, base.FileInfo{FileNum: fn, FileSize: uint64(stat.Size())}) - if m.recycler.MinRecycleLogNum() <= fn { - m.recycler.SetMinRecycleLogNum(fn + 1) + files = append(files, base.FileInfo{FileNum: base.DiskFileNum(ll.Num), FileSize: size}) + if m.recycler.MinRecycleLogNum() <= ll.Num { + m.recycler.SetMinRecycleLogNum(ll.Num + 1) } } slices.SortFunc(files, func(a, b base.FileInfo) int { return cmp.Compare(a.FileNum, b.FileNum) }) @@ -87,12 +78,15 @@ func (m *StandaloneManager) Init(o Options) error { } // List implements Manager. -func (m *StandaloneManager) List() ([]NumWAL, error) { +func (m *StandaloneManager) List() (Logs, error) { m.mu.Lock() defer m.mu.Unlock() - wals := make([]NumWAL, len(m.mu.queue)) + wals := make(Logs, len(m.mu.queue)) for i := range m.mu.queue { - wals[i] = NumWAL(m.mu.queue[i].FileNum) + wals[i] = LogicalLog{ + Num: NumWAL(m.mu.queue[i].FileNum), + segments: []segment{{dir: m.o.Primary}}, + } } return wals, nil } @@ -122,42 +116,11 @@ func (m *StandaloneManager) Obsolete( return toDelete, nil } -// OpenForRead implements Manager. -func (m *StandaloneManager) OpenForRead(wn NumWAL) (Reader, error) { - if wn < m.o.MinUnflushedWALNum { - return nil, errors.AssertionFailedf( - "attempting to open WAL %d which is earlier than min unflushed %d", wn, m.o.MinUnflushedWALNum) - } - var filename string - m.mu.Lock() - for i := range m.mu.queue { - if NumWAL(m.mu.queue[i].FileNum) == wn { - filename = m.o.Primary.FS.PathJoin( - m.o.Primary.Dirname, base.MakeFilename(base.FileTypeLog, m.mu.queue[i].FileNum)) - break - } - } - m.mu.Unlock() - if len(filename) == 0 { - return nil, errors.AssertionFailedf("attempting to open WAL %d which is unknown", wn) - } - file, err := m.o.Primary.FS.Open(filename) - if err != nil { - return nil, err - } - return &standaloneReader{ - filename: filename, - rr: record.NewReader(file, base.DiskFileNum(wn)), - f: file, - }, nil -} - // Create implements Manager. func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { // TODO(sumeer): check monotonicity of wn. newLogNum := base.DiskFileNum(wn) - newLogName := - base.MakeFilepath(m.o.Primary.FS, m.o.Primary.Dirname, base.FileTypeLog, base.DiskFileNum(wn)) + newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0)) // Try to use a recycled log file. Recycling log files is an important // performance optimization as it is faster to sync a file that has @@ -171,8 +134,7 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { var err error recycleLog, recycleOK = m.recycler.Peek() if recycleOK { - recycleLogName := base.MakeFilepath( - m.o.Primary.FS, m.o.Primary.Dirname, base.FileTypeLog, recycleLog.FileNum) + recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName) base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err) } else { @@ -241,21 +203,6 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { return m.w, nil } -// ListFiles implements Manager. -func (m *StandaloneManager) ListFiles(wn NumWAL) (files []CopyableLog, err error) { - m.mu.Lock() - defer m.mu.Unlock() - for _, fi := range m.mu.queue { - if NumWAL(fi.FileNum) == wn { - return []CopyableLog{{ - FS: m.o.Primary.FS, - Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, base.MakeFilename(base.FileTypeLog, fi.FileNum)), - }}, nil - } - } - return nil, errors.Errorf("WAL %d not found", wn) -} - // ElevateWriteStallThresholdForFailover implements Manager. func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool { return false @@ -301,29 +248,6 @@ func firstError(err0, err1 error) error { return err1 } -type standaloneReader struct { - filename string - rr *record.Reader - f vfs.File -} - -var _ Reader = &standaloneReader{} - -// NextRecord implements Reader. -func (r *standaloneReader) NextRecord() (io.Reader, Offset, error) { - record, err := r.rr.Next() - off := Offset{ - PhysicalFile: r.filename, - Physical: r.rr.Offset(), - } - return record, off, err -} - -// Close implements Reader. -func (r *standaloneReader) Close() error { - return r.f.Close() -} - type standaloneWriter struct { m *StandaloneManager w *record.LogWriter diff --git a/wal/wal.go b/wal/wal.go index 2c3fd475b0..d3d6660ed5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -146,6 +146,14 @@ type Options struct { FailoverOptions } +// Dirs returns the primary Dir and the secondary if provided. +func (o *Options) Dirs() []Dir { + if o.Secondary == (Dir{}) { + return []Dir{o.Primary} + } + return []Dir{o.Primary, o.Secondary} +} + // FailoverOptions are options that are specific to failover mode. type FailoverOptions struct { // PrimaryDirProbeInterval is the interval for probing the primary dir, when @@ -234,24 +242,22 @@ type Stats struct { // Manager handles all WAL work. // -// - Init, List, OpenForRead will be called during DB initialization. +// - Init will be called during DB initialization. // - Obsolete can be called concurrently with WAL writing. // - WAL writing: Is done via Create, and the various Writer methods. These // are required to be serialized via external synchronization (specifically, // the caller does it via commitPipeline.mu). type Manager interface { // Init initializes the Manager. - Init(o Options) error + Init(o Options, initial Logs) error // List returns the virtual WALs in ascending order. - List() ([]NumWAL, error) + List() (Logs, error) // Obsolete informs the manager that all virtual WALs less than // minUnflushedNum are obsolete. The callee can choose to recycle some // underlying log files, if !noRecycle. The log files that are not recycled, // and therefore can be deleted, are returned. The deletable files are no // longer tracked by the manager. Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error) - // OpenForRead opens a virtual WAL for read. - OpenForRead(wn NumWAL) (Reader, error) // Create creates a new virtual WAL. // // NumWALs passed to successive Create calls must be monotonically @@ -260,8 +266,6 @@ type Manager interface { // // jobID is used for the WALEventListener. Create(wn NumWAL, jobID int) (Writer, error) - // ListFiles lists the log files backing the given unflushed WAL. - ListFiles(wn NumWAL) (files []CopyableLog, err error) // ElevateWriteStallThresholdForFailover returns true if the caller should // use a high write stall threshold because the WALs are being written to // the secondary dir. @@ -285,13 +289,6 @@ type DeletableLog struct { FileSize uint64 } -// CopyableLog contains information about a log file that can be copied (e.g. -// for checkpointing). -type CopyableLog struct { - vfs.FS - Path string -} - // SyncOptions has non-nil Done and Err when fsync is requested, else both are // nil. type SyncOptions struct { @@ -345,7 +342,3 @@ type Offset struct { func (o Offset) String() string { return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical) } - -// Make lint happy. -var _ logNameIndex = 0 -var _ = makeLogFilename