Skip to content

Commit

Permalink
wal: separate read interface from Manager
Browse files Browse the repository at this point in the history
This commit separates the interface for reading WAL files from the Manager
interface.  This helps unify some code on the read path, but it also makes it
easier to support changing configuration (eg, changing secondary location or
disabling failover).
  • Loading branch information
jbowens committed Feb 16, 2024
1 parent f6c618a commit 739a233
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 224 deletions.
36 changes: 22 additions & 14 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/record"
Expand Down Expand Up @@ -193,23 +194,23 @@ func (d *DB) Checkpoint(
for diskFileNum := range d.mu.versions.backingState.fileBackingMap {
virtualBackingFiles[diskFileNum] = struct{}{}
}
var logFiles []wal.CopyableLog

queuedLogNums := make([]wal.NumWAL, 0, len(memQueue))
for i := range memQueue {
logNum := memQueue[i].logNum
if logNum == 0 {
continue
}
files, err := d.mu.log.manager.ListFiles(wal.NumWAL(logNum))
if err != nil {
return err
if logNum := memQueue[i].logNum; logNum != 0 {
queuedLogNums = append(queuedLogNums, wal.NumWAL(logNum))
}
logFiles = append(logFiles, files...)
}
// Release the manifest and DB.mu so we don't block other operations on
// the database.
d.mu.versions.logUnlock()
d.mu.Unlock()

allLogicalLogs, err := d.mu.log.manager.List()
if err != nil {
return err
}

// Wrap the normal filesystem with one which wraps newly created files with
// vfs.NewSyncingFile.
fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
Expand Down Expand Up @@ -321,11 +322,18 @@ func (d *DB) Checkpoint(
// Copy the WAL files. We copy rather than link because WAL file recycling
// will cause the WAL files to be reused which would invalidate the
// checkpoint.
for _, src := range logFiles {
destPath := fs.PathJoin(destDir, src.FS.PathBase(src.Path))
ckErr = vfs.CopyAcrossFS(src.FS, src.Path, fs, destPath)
if ckErr != nil {
return ckErr
for _, logNum := range queuedLogNums {
log, ok := allLogicalLogs.Get(logNum)
if !ok {
return errors.Newf("log %s not found", logNum)
}
for i := 0; i < log.NumSegments(); i++ {
srcFS, srcPath := log.SegmentLocation(i)
destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
if ckErr != nil {
return ckErr
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ func TestLargeBatch(t *testing.T) {
logNum := func() base.DiskFileNum {
d.mu.Lock()
defer d.mu.Unlock()
walNums, err := d.mu.log.manager.List()
logs, err := d.mu.log.manager.List()
require.NoError(t, err)
return base.DiskFileNum(walNums[len(walNums)-1])
return base.DiskFileNum(logs[len(logs)-1].Num)
}
fileSize := func(fileNum base.DiskFileNum) int64 {
info, err := d.opts.FS.Stat(base.MakeFilepath(d.opts.FS, "", fileTypeLog, fileNum))
Expand Down Expand Up @@ -1979,7 +1979,7 @@ func TestRecycleLogs(t *testing.T) {
defer d.mu.Unlock()
walNums, err := d.mu.log.manager.List()
require.NoError(t, err)
return base.DiskFileNum(walNums[len(walNums)-1])
return base.DiskFileNum(walNums[len(walNums)-1].Num)
}
logCount := func() int {
d.mu.Lock()
Expand Down
41 changes: 20 additions & 21 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
Buckets: FsyncLatencyBuckets,
})
walManager := &wal.StandaloneManager{}
err = walManager.Init(wal.Options{
walOpts := wal.Options{
Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
Secondary: wal.Dir{},
MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
Expand All @@ -334,7 +334,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
QueueSemChan: d.commit.logSyncQSem,
Logger: opts.Logger,
EventListener: walEventListenerAdaptor{l: opts.EventListener},
})
}
wals, err := wal.Scan(walOpts.Dirs()...)
if err != nil {
return nil, err
}
err = walManager.Init(walOpts, wals)
if err != nil {
return nil, err
}
Expand All @@ -343,10 +348,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
walManager.Close()
}
}()
wals, err := walManager.List()
if err != nil {
return nil, err
}

d.mu.log.manager = walManager

// List the objects. This also happens to include WAL log files, if they are
Expand Down Expand Up @@ -427,7 +429,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
if n := len(wals); n > 0 {
// Don't reuse any obsolete file numbers to avoid modifying an
// ingested sstable's original external file.
d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1]))
d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
}

// Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
Expand All @@ -448,9 +450,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}

// Replay any newer log files than the ones named in the manifest.
var replayWALs []wal.NumWAL
for i, walNum := range wals {
if base.DiskFileNum(walNum) >= d.mu.versions.minUnflushedLogNum {
var replayWALs wal.Logs
for i, w := range wals {
if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
replayWALs = wals[i:]
break
}
Expand Down Expand Up @@ -729,12 +731,9 @@ func GetVersion(dir string, fs vfs.FS) (string, error) {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) replayWAL(
jobID int, ve *versionEdit, wn wal.NumWAL, strictWALTail bool,
jobID int, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool,
) (toFlush flushableList, maxSeqNum uint64, err error) {
rr, err := d.mu.log.manager.OpenForRead(wn)
if err != nil {
return nil, 0, err
}
rr := ll.OpenForRead()
defer rr.Close()
var (
b Batch
Expand Down Expand Up @@ -784,7 +783,7 @@ func (d *DB) replayWAL(
if mem != nil {
return
}
mem, entry = d.newMemTable(base.DiskFileNum(wn), seqNum)
mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum)
if d.opts.ReadOnly {
d.mu.mem.mutable = mem
d.mu.mem.queue = append(d.mu.mem.queue, entry)
Expand All @@ -811,7 +810,7 @@ func (d *DB) replayWAL(
}
defer func() {
if err != nil {
err = errors.WithDetailf(err, "replaying wal %d, offset %d", wn, offset)
err = errors.WithDetailf(err, "replaying wal %d, offset %d", ll.Num, offset)
}
}()

Expand All @@ -836,7 +835,7 @@ func (d *DB) replayWAL(

if buf.Len() < batchrepr.HeaderLen {
return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
errors.Safe(base.DiskFileNum(wn)), offset)
errors.Safe(base.DiskFileNum(ll.Num)), offset)
}

if d.opts.ErrorIfNotPristine {
Expand Down Expand Up @@ -923,7 +922,7 @@ func (d *DB) replayWAL(
}

entry, err = d.newIngestedFlushableEntry(
meta, seqNum, base.DiskFileNum(wn),
meta, seqNum, base.DiskFileNum(ll.Num),
)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -983,7 +982,7 @@ func (d *DB) replayWAL(
if err != nil {
return nil, 0, err
}
entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(wn), b.SeqNum())
entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
// Disable memory accounting by adding a reader ref that will never be
// removed.
entry.readerRefs.Add(1)
Expand Down Expand Up @@ -1026,7 +1025,7 @@ func (d *DB) replayWAL(
}

d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %d; replayed %d keys in %d batches",
jobID, base.DiskFileNum(wn).String(), offset, keysReplayed, batchesReplayed)
jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
flushMem()

// mem is nil here.
Expand Down
29 changes: 3 additions & 26 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ var _ Manager = &failoverManager{}
// - calls to EventListener

// Init implements Manager.
func (wm *failoverManager) Init(o Options) error {
func (wm *failoverManager) Init(o Options, initial Logs) error {
if o.timeSource == nil {
o.timeSource = defaultTime{}
}
Expand Down Expand Up @@ -443,9 +443,8 @@ func (wm *failoverManager) Init(o Options) error {
}

// List implements Manager.
func (wm *failoverManager) List() ([]NumWAL, error) {
// TODO(jackson):
return nil, nil
func (wm *failoverManager) List() (Logs, error) {
return Scan(wm.opts.Primary, wm.opts.Secondary)
}

// Obsolete implements Manager.
Expand All @@ -456,22 +455,6 @@ func (wm *failoverManager) Obsolete(
return nil, nil
}

// OpenForRead implements Manager.
func (wm *failoverManager) OpenForRead(wn NumWAL) (Reader, error) {
// TODO(jackson):
//
// Temporary implementation note: Gap and de-duping logic on read.
//
// Gap can be there in adjacent logs but not between log n and the union of logs 0..n-1.
// Entries 0..50 sent to log 0.
// Entries 0..60 sent to log 1
// Log 0 commits up to 50
// Entries 50..100 sent to log 2
// Log 2 commits all of these.
// Log 1 only commits 0..10 and node fails and restarts.
return nil, nil
}

// Create implements Manager.
func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
fwOpts := failoverWriterOpts{
Expand Down Expand Up @@ -501,12 +484,6 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
return ww, err
}

// ListFiles implements Manager.
func (wm *failoverManager) ListFiles(wn NumWAL) (files []CopyableLog, err error) {
// TODO(sumeer):
return nil, nil
}

// ElevateWriteStallThresholdForFailover implements Manager.
func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
return wm.monitor.elevateWriteStallThresholdForFailover()
Expand Down
4 changes: 3 additions & 1 deletion wal/failover_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ func TestManagerFailover(t *testing.T) {
monitorStateForTesting: monitorStateForTesting,
logWriterCreatedForTesting: logWriterCreatedForTesting,
}
err := fm.Init(o)
logs, err := Scan(o.Dirs()...)
require.NoError(t, err)
err = fm.Init(o, logs)
return errorToStr(err)

case "create-writer":
Expand Down
8 changes: 4 additions & 4 deletions wal/log_recycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (r *LogRecycler) Init(maxNumLogFiles int) {

// MinRecycleLogNum returns the current minimum log number that is allowed to
// be recycled.
func (r *LogRecycler) MinRecycleLogNum() base.DiskFileNum {
return r.minRecycleLogNum
func (r *LogRecycler) MinRecycleLogNum() NumWAL {
return NumWAL(r.minRecycleLogNum)
}

// SetMinRecycleLogNum sets the minimum log number that is allowed to be
// recycled.
func (r *LogRecycler) SetMinRecycleLogNum(n base.DiskFileNum) {
r.minRecycleLogNum = n
func (r *LogRecycler) SetMinRecycleLogNum(n NumWAL) {
r.minRecycleLogNum = base.DiskFileNum(n)
}

// Add attempts to recycle the log file specified by logInfo. Returns true if
Expand Down
Loading

0 comments on commit 739a233

Please sign in to comment.