Skip to content

Commit

Permalink
wal: implement failoverManager.{Obsolete,Stats} and log recycling
Browse files Browse the repository at this point in the history
Informs #3230

Informs CRDB-35401
  • Loading branch information
sumeerbhola committed Feb 21, 2024
1 parent 21f5b6f commit 37c4058
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cm *cleanupManager) mainLoop() {
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
case fileTypeLog:
cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
base.DiskFileNum(of.logFile.NumWAL), of.logFile.FileSize)
base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
default:
path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
cm.deleteObsoleteFile(
Expand Down
242 changes: 228 additions & 14 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package wal

import (
"os"
"sync"
"time"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/exp/rand"
)
Expand Down Expand Up @@ -387,26 +389,41 @@ func (o *FailoverOptions) ensureDefaults() {
}
}

type logicalLogWithSizesEtc struct {
num NumWAL
segments []segmentWithSizeEtc
}

type segmentWithSizeEtc struct {
segment
approxFileSize uint64
synchronouslyClosed bool
}

type failoverManager struct {
opts Options
// TODO(jackson/sumeer): read-path etc.

dirHandles [numDirIndices]vfs.File
stopper *stopper
monitor *failoverMonitor
mu struct {
sync.Mutex
closedWALs []logicalLogWithSizesEtc
ww *failoverWriter
}
recycler LogRecycler
// Due to async creation of files in failoverWriter, multiple goroutines can
// concurrently try to get a file from the recycler. This mutex protects the
// logRecycler.{Peek,Pop} pair.
recyclerPeekPopMu sync.Mutex
}

var _ Manager = &failoverManager{}

// TODO(sumeer):
//
// - log recycling: only those in primary dir. Keep track of those where
// record.LogWriter closed. Those are the only ones we can recycle.
//
// - log deletion: if record.LogWriter did not close yet, the cleaner may
// get an error when deleting or renaming (only under windows?).
//
// - calls to EventListener

// Init implements Manager.
func (wm *failoverManager) Init(o Options, initial Logs) error {
Expand All @@ -430,37 +447,121 @@ func (wm *failoverManager) Init(o Options, initial Logs) error {
stopper: stopper,
}
monitor := newFailoverMonitor(fmOpts)
// TODO(jackson): list dirs and assemble a list of all NumWALs and
// corresponding log files.

*wm = failoverManager{
opts: o,
dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
stopper: stopper,
monitor: monitor,
}
wm.recycler.Init(o.MaxNumRecyclableLogs)
for _, ll := range initial {
llse := logicalLogWithSizesEtc{
num: ll.Num,
}
if wm.recycler.MinRecycleLogNum() <= ll.Num {
wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
}
for i, s := range ll.segments {
fs, path := ll.SegmentLocation(i)
stat, err := fs.Stat(path)
if err != nil {
return err
}
llse.segments = append(llse.segments, segmentWithSizeEtc{
segment: s,
approxFileSize: uint64(stat.Size()),
synchronouslyClosed: true,
})
}
wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
}
return nil
}

// List implements Manager.
func (wm *failoverManager) List() (Logs, error) {
return Scan(wm.opts.Primary, wm.opts.Secondary)
wm.mu.Lock()
defer wm.mu.Unlock()
n := len(wm.mu.closedWALs)
if wm.mu.ww != nil {
n++
}
wals := make(Logs, n)
setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
segments := make([]segment, len(llse.segments))
for j := range llse.segments {
segments[j] = llse.segments[j].segment
}
wals[index] = LogicalLog{
Num: llse.num,
segments: segments,
}
}
for i, llse := range wm.mu.closedWALs {
setLogicalLog(i, llse)
}
if wm.mu.ww != nil {
setLogicalLog(n-1, wm.mu.ww.getLog())
}
return wals, nil
}

// Obsolete implements Manager.
func (wm *failoverManager) Obsolete(
minUnflushedNum NumWAL, noRecycle bool,
) (toDelete []DeletableLog, err error) {
// TODO(sumeer):
return nil, nil
wm.mu.Lock()
defer wm.mu.Unlock()
i := 0
for ; i < len(wm.mu.closedWALs); i++ {
ll := wm.mu.closedWALs[i]
if ll.num >= minUnflushedNum {
break
}
// Recycle only the primary at logNameIndex=0, if there was no failover,
// and synchronously closed. It may not be safe to recycle a file that is
// still being written to. And recycling when there was a failover may
// fill up the recycler with smaller log files. The restriction regarding
// logNameIndex=0 is because logRecycler.Peek only exposes the
// DiskFileNum, and we need to use that to construct the path -- we could
// remove this restriction by changing the logRecycler interface, but we
// don't bother.
canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
ll.segments[0].logNameIndex == 0 &&
ll.segments[0].dir == wm.opts.Primary
if !canRecycle || !wm.recycler.Add(base.FileInfo{
FileNum: base.DiskFileNum(ll.num),
FileSize: ll.segments[0].approxFileSize,
}) {
for _, s := range ll.segments {
toDelete = append(toDelete, DeletableLog{
FS: s.dir.FS,
Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
NumWAL: ll.num,
ApproxFileSize: s.approxFileSize,
})
}
}
}
wm.mu.closedWALs = wm.mu.closedWALs[i:]
return toDelete, nil
}

// Create implements Manager.
func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
func() {
wm.mu.Lock()
defer wm.mu.Unlock()
if wm.mu.ww != nil {
panic("previous wal.Writer not closed")
}
}()
fwOpts := failoverWriterOpts{
wn: wn,
logger: wm.opts.Logger,
timeSource: wm.opts.timeSource,
jobID: jobID,
logCreator: wm.logCreator,
noSyncOnClose: wm.opts.NoSyncOnClose,
bytesPerSync: wm.opts.BytesPerSync,
preallocateSize: wm.opts.PreallocateSize,
Expand All @@ -481,6 +582,11 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
return ww
}
wm.monitor.newWriter(writerCreateFunc)
if ww != nil {
wm.mu.Lock()
defer wm.mu.Unlock()
wm.mu.ww = ww
}
return ww, err
}

Expand All @@ -491,12 +597,37 @@ func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {

func (wm *failoverManager) writerClosed() {
wm.monitor.noWriter()
wm.mu.Lock()
defer wm.mu.Unlock()
wm.mu.closedWALs = append(wm.mu.closedWALs, wm.mu.ww.getLog())
wm.mu.ww = nil
}

// Stats implements Manager.
func (wm *failoverManager) Stats() Stats {
// TODO(sumeer):
return Stats{}
recycledLogsCount, recycledLogSize := wm.recycler.Stats()
wm.mu.Lock()
defer wm.mu.Unlock()
var liveFileCount int
var liveFileSize uint64
updateStats := func(segments []segmentWithSizeEtc) {
for _, s := range segments {
liveFileCount++
liveFileSize += s.approxFileSize
}
}
for _, llse := range wm.mu.closedWALs {
updateStats(llse.segments)
}
if wm.mu.ww != nil {
updateStats(wm.mu.ww.getLog().segments)
}
return Stats{
ObsoleteFileCount: recycledLogsCount,
ObsoleteFileSize: recycledLogSize,
LiveFileCount: liveFileCount,
LiveFileSize: liveFileSize,
}
}

// Close implements Manager.
Expand All @@ -515,6 +646,89 @@ func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
return nil
}

// logCreator implements the logCreator func type.
func (wm *failoverManager) logCreator(
dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int,
) (logFile vfs.File, initialFileSize uint64, err error) {
logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
isPrimary := dir == wm.opts.Primary
// Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
considerRecycle := li == 0 && isPrimary
createInfo := CreateInfo{
JobID: jobID,
Path: logFilename,
IsSecondary: !isPrimary,
Num: wn,
Err: nil,
}
defer func() {
createInfo.Err = err
if wm.opts.EventListener != nil {
wm.opts.EventListener.LogCreated(createInfo)
}
}()
if considerRecycle {
// Try to use a recycled log file. Recycling log files is an important
// performance optimization as it is faster to sync a file that has
// already been written, than one which is being written for the first
// time. This is due to the need to sync file metadata when a file is
// being written for the first time. Note this is true even if file
// preallocation is performed (e.g. fallocate).
var recycleLog base.FileInfo
var recycleOK bool
func() {
wm.recyclerPeekPopMu.Lock()
defer wm.recyclerPeekPopMu.Unlock()
recycleLog, recycleOK = wm.recycler.Peek()
if recycleOK {
if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
panic(err)
}
}
}()
if recycleOK {
createInfo.RecycledFileNum = recycleLog.FileNum
recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
r.writeStart()
logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename)
r.writeEnd(err)
// TODO(sumeer): should we fatal since primary dir? At some point it is
// better to fatal instead of continuing to failover.
// base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
if err != nil {
// TODO(sumeer): we have popped from the logRecycler, which is
// arguably correct, since we don't want to keep trying to reuse a log
// that causes some error. But the original or new file may exist, and
// no one will clean it up unless the process restarts.
return nil, 0, err
}
// Figure out the recycled WAL size. This Stat is necessary because
// ReuseForWrite's contract allows for removing the old file and
// creating a new one. We don't know whether the WAL was actually
// recycled.
//
// TODO(jackson): Adding a boolean to the ReuseForWrite return value
// indicating whether or not the file was actually reused would allow us
// to skip the stat and use recycleLog.FileSize.
var finfo os.FileInfo
finfo, err = logFile.Stat()
if err != nil {
logFile.Close()
return nil, 0, err
}
initialFileSize = uint64(finfo.Size())
return logFile, initialFileSize, nil
}
}
// Did not recycle.
//
// Create file.
r.writeStart()
logFile, err = dir.FS.Create(logFilename)
r.writeEnd(err)
return logFile, 0, err
}

type stopper struct {
quiescer chan struct{} // Closed when quiescing
wg sync.WaitGroup
Expand Down
2 changes: 2 additions & 0 deletions wal/failover_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,5 @@ func recvWithDeadline(t *testing.T, td *datadriven.TestData, waitStr string, ch
// are complete. Currently this is done by waiting on various channels etc.
// which exposes implementation detail. See concurrency_test.monitor, in
// CockroachDB, for an alternative.

// TODO(sumeer): test failoverManager.{List,Obsolete,Stats}
Loading

0 comments on commit 37c4058

Please sign in to comment.