diff --git a/cleaner.go b/cleaner.go index c89b09ba1e..debd3398fc 100644 --- a/cleaner.go +++ b/cleaner.go @@ -166,7 +166,7 @@ func (cm *cleanupManager) mainLoop() { cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum) case fileTypeLog: cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path, - base.DiskFileNum(of.logFile.NumWAL), of.logFile.FileSize) + base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize) default: path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum) cm.deleteObsoleteFile( diff --git a/wal/failover_manager.go b/wal/failover_manager.go index c15252b0a5..b75c72821d 100644 --- a/wal/failover_manager.go +++ b/wal/failover_manager.go @@ -5,9 +5,11 @@ package wal import ( + "os" "sync" "time" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/vfs" "golang.org/x/exp/rand" ) @@ -387,6 +389,17 @@ func (o *FailoverOptions) ensureDefaults() { } } +type logicalLogWithSizesEtc struct { + num NumWAL + segments []segmentWithSizeEtc +} + +type segmentWithSizeEtc struct { + segment + approxFileSize uint64 + synchronouslyClosed bool +} + type failoverManager struct { opts Options // TODO(jackson/sumeer): read-path etc. @@ -394,19 +407,23 @@ type failoverManager struct { dirHandles [numDirIndices]vfs.File stopper *stopper monitor *failoverMonitor + mu struct { + sync.Mutex + closedWALs []logicalLogWithSizesEtc + ww *failoverWriter + } + recycler LogRecycler + // Due to async creation of files in failoverWriter, multiple goroutines can + // concurrently try to get a file from the recycler. This mutex protects the + // logRecycler.{Peek,Pop} pair. + recyclerPeekPopMu sync.Mutex } var _ Manager = &failoverManager{} // TODO(sumeer): -// -// - log recycling: only those in primary dir. Keep track of those where -// record.LogWriter closed. Those are the only ones we can recycle. -// // - log deletion: if record.LogWriter did not close yet, the cleaner may // get an error when deleting or renaming (only under windows?). -// -// - calls to EventListener // Init implements Manager. func (wm *failoverManager) Init(o Options, initial Logs) error { @@ -430,37 +447,121 @@ func (wm *failoverManager) Init(o Options, initial Logs) error { stopper: stopper, } monitor := newFailoverMonitor(fmOpts) - // TODO(jackson): list dirs and assemble a list of all NumWALs and - // corresponding log files. - *wm = failoverManager{ opts: o, dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File}, stopper: stopper, monitor: monitor, } + wm.recycler.Init(o.MaxNumRecyclableLogs) + for _, ll := range initial { + llse := logicalLogWithSizesEtc{ + num: ll.Num, + } + if wm.recycler.MinRecycleLogNum() <= ll.Num { + wm.recycler.SetMinRecycleLogNum(ll.Num + 1) + } + for i, s := range ll.segments { + fs, path := ll.SegmentLocation(i) + stat, err := fs.Stat(path) + if err != nil { + return err + } + llse.segments = append(llse.segments, segmentWithSizeEtc{ + segment: s, + approxFileSize: uint64(stat.Size()), + synchronouslyClosed: true, + }) + } + wm.mu.closedWALs = append(wm.mu.closedWALs, llse) + } return nil } // List implements Manager. func (wm *failoverManager) List() (Logs, error) { - return Scan(wm.opts.Primary, wm.opts.Secondary) + wm.mu.Lock() + defer wm.mu.Unlock() + n := len(wm.mu.closedWALs) + if wm.mu.ww != nil { + n++ + } + wals := make(Logs, n) + setLogicalLog := func(index int, llse logicalLogWithSizesEtc) { + segments := make([]segment, len(llse.segments)) + for j := range llse.segments { + segments[j] = llse.segments[j].segment + } + wals[index] = LogicalLog{ + Num: llse.num, + segments: segments, + } + } + for i, llse := range wm.mu.closedWALs { + setLogicalLog(i, llse) + } + if wm.mu.ww != nil { + setLogicalLog(n-1, wm.mu.ww.getLog()) + } + return wals, nil } // Obsolete implements Manager. func (wm *failoverManager) Obsolete( minUnflushedNum NumWAL, noRecycle bool, ) (toDelete []DeletableLog, err error) { - // TODO(sumeer): - return nil, nil + wm.mu.Lock() + defer wm.mu.Unlock() + i := 0 + for ; i < len(wm.mu.closedWALs); i++ { + ll := wm.mu.closedWALs[i] + if ll.num >= minUnflushedNum { + break + } + // Recycle only the primary at logNameIndex=0, if there was no failover, + // and synchronously closed. It may not be safe to recycle a file that is + // still being written to. And recycling when there was a failover may + // fill up the recycler with smaller log files. The restriction regarding + // logNameIndex=0 is because logRecycler.Peek only exposes the + // DiskFileNum, and we need to use that to construct the path -- we could + // remove this restriction by changing the logRecycler interface, but we + // don't bother. + canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed && + ll.segments[0].logNameIndex == 0 && + ll.segments[0].dir == wm.opts.Primary + if !canRecycle || !wm.recycler.Add(base.FileInfo{ + FileNum: base.DiskFileNum(ll.num), + FileSize: ll.segments[0].approxFileSize, + }) { + for _, s := range ll.segments { + toDelete = append(toDelete, DeletableLog{ + FS: s.dir.FS, + Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)), + NumWAL: ll.num, + ApproxFileSize: s.approxFileSize, + }) + } + } + } + wm.mu.closedWALs = wm.mu.closedWALs[i:] + return toDelete, nil } // Create implements Manager. func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) { + func() { + wm.mu.Lock() + defer wm.mu.Unlock() + if wm.mu.ww != nil { + panic("previous wal.Writer not closed") + } + }() fwOpts := failoverWriterOpts{ wn: wn, logger: wm.opts.Logger, timeSource: wm.opts.timeSource, + jobID: jobID, + logCreator: wm.logCreator, noSyncOnClose: wm.opts.NoSyncOnClose, bytesPerSync: wm.opts.BytesPerSync, preallocateSize: wm.opts.PreallocateSize, @@ -481,6 +582,11 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) { return ww } wm.monitor.newWriter(writerCreateFunc) + if ww != nil { + wm.mu.Lock() + defer wm.mu.Unlock() + wm.mu.ww = ww + } return ww, err } @@ -491,12 +597,37 @@ func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool { func (wm *failoverManager) writerClosed() { wm.monitor.noWriter() + wm.mu.Lock() + defer wm.mu.Unlock() + wm.mu.closedWALs = append(wm.mu.closedWALs, wm.mu.ww.getLog()) + wm.mu.ww = nil } // Stats implements Manager. func (wm *failoverManager) Stats() Stats { - // TODO(sumeer): - return Stats{} + recycledLogsCount, recycledLogSize := wm.recycler.Stats() + wm.mu.Lock() + defer wm.mu.Unlock() + var liveFileCount int + var liveFileSize uint64 + updateStats := func(segments []segmentWithSizeEtc) { + for _, s := range segments { + liveFileCount++ + liveFileSize += s.approxFileSize + } + } + for _, llse := range wm.mu.closedWALs { + updateStats(llse.segments) + } + if wm.mu.ww != nil { + updateStats(wm.mu.ww.getLog().segments) + } + return Stats{ + ObsoleteFileCount: recycledLogsCount, + ObsoleteFileSize: recycledLogSize, + LiveFileCount: liveFileCount, + LiveFileSize: liveFileSize, + } } // Close implements Manager. @@ -515,6 +646,89 @@ func (wm *failoverManager) RecyclerForTesting() *LogRecycler { return nil } +// logCreator implements the logCreator func type. +func (wm *failoverManager) logCreator( + dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int, +) (logFile vfs.File, initialFileSize uint64, err error) { + logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li)) + isPrimary := dir == wm.opts.Primary + // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice. + considerRecycle := li == 0 && isPrimary + createInfo := CreateInfo{ + JobID: jobID, + Path: logFilename, + IsSecondary: !isPrimary, + Num: wn, + Err: nil, + } + defer func() { + createInfo.Err = err + if wm.opts.EventListener != nil { + wm.opts.EventListener.LogCreated(createInfo) + } + }() + if considerRecycle { + // Try to use a recycled log file. Recycling log files is an important + // performance optimization as it is faster to sync a file that has + // already been written, than one which is being written for the first + // time. This is due to the need to sync file metadata when a file is + // being written for the first time. Note this is true even if file + // preallocation is performed (e.g. fallocate). + var recycleLog base.FileInfo + var recycleOK bool + func() { + wm.recyclerPeekPopMu.Lock() + defer wm.recyclerPeekPopMu.Unlock() + recycleLog, recycleOK = wm.recycler.Peek() + if recycleOK { + if err = wm.recycler.Pop(recycleLog.FileNum); err != nil { + panic(err) + } + } + }() + if recycleOK { + createInfo.RecycledFileNum = recycleLog.FileNum + recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) + r.writeStart() + logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename) + r.writeEnd(err) + // TODO(sumeer): should we fatal since primary dir? At some point it is + // better to fatal instead of continuing to failover. + // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err) + if err != nil { + // TODO(sumeer): we have popped from the logRecycler, which is + // arguably correct, since we don't want to keep trying to reuse a log + // that causes some error. But the original or new file may exist, and + // no one will clean it up unless the process restarts. + return nil, 0, err + } + // Figure out the recycled WAL size. This Stat is necessary because + // ReuseForWrite's contract allows for removing the old file and + // creating a new one. We don't know whether the WAL was actually + // recycled. + // + // TODO(jackson): Adding a boolean to the ReuseForWrite return value + // indicating whether or not the file was actually reused would allow us + // to skip the stat and use recycleLog.FileSize. + var finfo os.FileInfo + finfo, err = logFile.Stat() + if err != nil { + logFile.Close() + return nil, 0, err + } + initialFileSize = uint64(finfo.Size()) + return logFile, initialFileSize, nil + } + } + // Did not recycle. + // + // Create file. + r.writeStart() + logFile, err = dir.FS.Create(logFilename) + r.writeEnd(err) + return logFile, 0, err +} + type stopper struct { quiescer chan struct{} // Closed when quiescing wg sync.WaitGroup diff --git a/wal/failover_manager_test.go b/wal/failover_manager_test.go index b2b59884fe..dcbd82e292 100644 --- a/wal/failover_manager_test.go +++ b/wal/failover_manager_test.go @@ -299,7 +299,9 @@ func TestManagerFailover(t *testing.T) { switch td.Cmd { case "init-manager": ts = newManualTime(time.UnixMilli(0)) - memFS = vfs.NewMem() + if !td.HasArg("reuse-fs") { + memFS = vfs.NewMem() + } proberIterationForTesting = make(chan struct{}, 50000) monitorIterationForTesting = make(chan struct{}, 50000) monitorStateBuf.Reset() @@ -319,6 +321,8 @@ func TestManagerFailover(t *testing.T) { } injs[i] = inj } + case "reuse-fs": + // Ignore, already handled above. default: return fmt.Sprintf("unknown arg %s", cmdArg.Key) } @@ -333,7 +337,7 @@ func TestManagerFailover(t *testing.T) { Primary: Dir{FS: fs, Dirname: dirs[primaryDirIndex]}, Secondary: Dir{FS: fs, Dirname: dirs[secondaryDirIndex]}, MinUnflushedWALNum: 0, - MaxNumRecyclableLogs: 0, + MaxNumRecyclableLogs: 1, NoSyncOnClose: false, BytesPerSync: 0, PreallocateSize: func() int { return 0 }, @@ -365,7 +369,12 @@ func TestManagerFailover(t *testing.T) { logs, err := Scan(o.Dirs()...) require.NoError(t, err) err = fm.Init(o, logs) - return errorToStr(err) + var b strings.Builder + fmt.Fprintf(&b, "%s\n", errorToStr(err)) + if err == nil { + fmt.Fprintf(&b, "recycler min-log-num: %d\n", fm.recycler.MinRecycleLogNum()) + } + return b.String() case "create-writer": var walNum int @@ -381,6 +390,58 @@ func TestManagerFailover(t *testing.T) { _, err := fw.Close() return errorToStr(err) + case "obsolete": + var minUnflushed int + td.ScanArgs(t, "min-unflushed", &minUnflushed) + var noRecycle bool + if td.HasArg("no-recycle") { + noRecycle = true + } + toDelete, err := fm.Obsolete(NumWAL(minUnflushed), noRecycle) + var b strings.Builder + fmt.Fprintf(&b, "%s\n", errorToStr(err)) + if err == nil { + fileInfo, ok := fm.recycler.Peek() + fmt.Fprintf(&b, "recycler ") + if ok { + fmt.Fprintf(&b, "non-empty, front filenum: %d size: %d\n", fileInfo.FileNum, fileInfo.FileSize) + } else { + fmt.Fprintf(&b, "empty\n") + } + if len(toDelete) > 0 { + fmt.Fprintf(&b, "to delete:\n") + for _, f := range toDelete { + fmt.Fprintf(&b, " wal %d: path: %s size: %d\n", f.NumWAL, f.Path, f.ApproxFileSize) + } + } + } + return b.String() + + case "list-and-stats": + logs, err := fm.List() + if err != nil { + return err.Error() + } + stats := fm.Stats() + var b strings.Builder + if len(logs) > 0 { + fmt.Fprintf(&b, "logs:\n") + for _, f := range logs { + fmt.Fprintf(&b, " %s\n", f.String()) + } + } + fmt.Fprintf(&b, "stats:\n") + fmt.Fprintf(&b, " obsolete: count %d size %d\n", stats.ObsoleteFileCount, stats.ObsoleteFileSize) + fmt.Fprintf(&b, " live: count %d size %d\n", stats.LiveFileCount, stats.LiveFileSize) + return b.String() + + case "write-record": + var value string + td.ScanArgs(t, "value", &value) + offset, err := fw.WriteRecord([]byte(value), SyncOptions{}) + require.NoError(t, err) + return fmt.Sprintf("offset: %d", offset) + case "close-manager": err := fm.Close() return errorToStr(err) diff --git a/wal/failover_writer.go b/wal/failover_writer.go index cb6a689754..7d689a06ff 100644 --- a/wal/failover_writer.go +++ b/wal/failover_writer.go @@ -276,11 +276,16 @@ const maxPhysicalLogs = 10 // an error on Writer.Close as fatal, this does mean that failoverWriter has // limited ability to mask errors (its primary task is to mask high latency). type failoverWriter struct { - opts failoverWriterOpts - q recordQueue - writers [maxPhysicalLogs]logWriterAndRecorder - mu struct { + opts failoverWriterOpts + q recordQueue + mu struct { sync.Mutex + // writers is protected by mu, except for updates to the + // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the + // protection by mu is for handling concurrent calls to switchToNewDir, + // Close, and getLog. + writers [maxPhysicalLogs]logWriterAndRecorder + // cond is signaled when the latest LogWriter is set in writers (or there // is a creation error), or when the latest LogWriter is successfully // closed. It is waited on in Close. We don't use channels and select @@ -346,6 +351,14 @@ type logWriterAndRecorder struct { // latest writer is done, whether it resulted in success or not. createError error r latencyAndErrorRecorder + + // dir, approxFileSize, synchronouslyClosed are kept for initializing + // segmentWithSizeEtc. The approxFileSize is initially set to whatever is + // returned by logCreator. When failoverWriter.Close is called, + // approxFileSize and synchronouslyClosed may be updated. + dir Dir + approxFileSize uint64 + synchronouslyClosed bool } var _ Writer = &failoverWriter{} @@ -356,6 +369,8 @@ type failoverWriterOpts struct { wn NumWAL logger base.Logger timeSource + jobID int + logCreator // Options that feed into SyncingFileOptions. noSyncOnClose bool @@ -373,6 +388,21 @@ type failoverWriterOpts struct { writerCreatedForTest chan<- struct{} } +func simpleLogCreator( + dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int, +) (f vfs.File, initialFileSize uint64, err error) { + filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li)) + // Create file. + r.writeStart() + f, err = dir.FS.Create(filename) + r.writeEnd(err) + return f, 0, err +} + +type logCreator func( + dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int, +) (f vfs.File, initialFileSize uint64, err error) + func newFailoverWriter( opts failoverWriterOpts, initialDir dirAndFileHandle, ) (*failoverWriter, error) { @@ -450,20 +480,21 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { } // writerIndex is the slot for this writer. writerIndex := ww.mu.nextWriterIndex - if int(writerIndex) == len(ww.writers) { + if int(writerIndex) == len(ww.mu.writers) { ww.mu.Unlock() return errors.Errorf("exceeded switching limit") } + ww.mu.writers[writerIndex].dir = dir.Dir ww.mu.nextWriterIndex++ ww.mu.Unlock() // Creation is async. ww.opts.stopper.runAsync(func() { - // TODO(sumeer): recycling of logs. - filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex)) - recorderAndWriter := &ww.writers[writerIndex].r + recorderAndWriter := &ww.mu.writers[writerIndex].r recorderAndWriter.ts = ww.opts.timeSource - var file vfs.File + file, initialFileSize, err := ww.opts.logCreator( + dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID) + ww.mu.writers[writerIndex].approxFileSize = initialFileSize // handleErrFunc is called when err != nil. It handles the multiple IO error // cases below. handleErrFunc := func(err error) { @@ -472,20 +503,12 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { } ww.mu.Lock() defer ww.mu.Unlock() - ww.writers[writerIndex].createError = err + ww.mu.writers[writerIndex].createError = err ww.mu.cond.Signal() if ww.opts.writerCreatedForTest != nil { ww.opts.writerCreatedForTest <- struct{}{} } } - var err error - // Create file. - recorderAndWriter.writeStart() - file, err = dir.FS.Create(filename) - recorderAndWriter.writeEnd(err) - // TODO(sumeer): should we fatal if primary dir? At some point it is better - // to fatal instead of continuing to failover. - // base.MustExist(dir.FS, filename, ww.opts.logger, err) if err != nil { handleErrFunc(err) return @@ -531,7 +554,7 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { return true } // Latest writer. - ww.writers[writerIndex].w = w + ww.mu.writers[writerIndex].w = w ww.mu.cond.Signal() // NB: snapshotAndSwitchWriter does not block on IO, since // SyncRecordGeneralized does no IO. @@ -563,6 +586,14 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { // returned error. ww.opts.stopper.runAsync(func() { _ = w.Close() + // TODO(sumeer): consider deleting this file too, since + // failoverWriter.Close may not wait for it. This is going to be + // extremely rare, so the risk of garbage empty files piling up is + // extremely low. Say failover happens daily and and of those cases we + // have to be very unlucky and the close happens while a failover was + // ongoing and the previous LogWriter successfully wrote everything + // (say 1% probability if we want to be pessimistic). A garbage file + // every 100 days. Restarts will delete that garbage. }) } }) @@ -611,7 +642,7 @@ func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder { if ww.mu.closed { return nil } - return &ww.writers[ww.mu.nextWriterIndex-1].r + return &ww.mu.writers[ww.mu.nextWriterIndex-1].r } // Close implements Writer. @@ -656,7 +687,7 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { // Invariant: ww.mu.nextWriterIndex >= 1. // // We will loop until we have closed the lastWriter (and use - // lastPossibleWriter.err). We also need to call close on all LogWriters + // lastWriter.err). We also need to call close on all LogWriters // that will not close themselves, i.e., those that have already been // created and installed in failoverWriter.writers (this set may change // while failoverWriter.Close runs). @@ -673,14 +704,18 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { // can only process if it is done initializing, else we will iterate // again. for i := closeCalledCount; i < numWriters; i++ { - w := ww.writers[i].w - cErr := ww.writers[i].createError + w := ww.mu.writers[i].w + cErr := ww.mu.writers[i].createError // Is the current index the last writer. If yes, this is also the last // loop iteration. isLastWriter := i == lastWriter.index if w != nil { // Can close it, so extend closeCalledCount. closeCalledCount = i + 1 + size := uint64(w.Size()) + if ww.mu.writers[i].approxFileSize < size { + ww.mu.writers[i].approxFileSize = size + } if isLastWriter { // We may care about its error and when it finishes closing. index := i @@ -734,6 +769,10 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { ww.mu.cond.Wait() } } + if ww.mu.writers[lastWriter.index].w != nil { + // This permits log recycling. + ww.mu.writers[lastWriter.index].synchronouslyClosed = true + } err = lastWriter.err ww.mu.metrics = lastWriter.metrics ww.mu.closed = true @@ -748,6 +787,28 @@ func (ww *failoverWriter) Metrics() record.LogWriterMetrics { return ww.mu.metrics } +// getLog can be called at any time, including after Close returns. +func (ww *failoverWriter) getLog() logicalLogWithSizesEtc { + ww.mu.Lock() + defer ww.mu.Unlock() + ll := logicalLogWithSizesEtc{ + num: ww.opts.wn, + } + for i := range ww.mu.writers { + if ww.mu.writers[i].w != nil { + ll.segments = append(ll.segments, segmentWithSizeEtc{ + segment: segment{ + logNameIndex: logNameIndex(i), + dir: ww.mu.writers[i].dir, + }, + approxFileSize: ww.mu.writers[i].approxFileSize, + synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed, + }) + } + } + return ll +} + // latencyAndErrorRecorder records ongoing write and sync operations and errors // in those operations. record.LogWriter cannot continue functioning after any // error, so all errors are considered permanent. diff --git a/wal/failover_writer_test.go b/wal/failover_writer_test.go index 551485e623..e417147a22 100644 --- a/wal/failover_writer_test.go +++ b/wal/failover_writer_test.go @@ -136,10 +136,10 @@ func TestFailoverWriter(t *testing.T) { } fmt.Fprintf(b, "log writers:\n") for i := logNameIndex(0); i < w.mu.nextWriterIndex; i++ { - rLatency, rErr := w.writers[i].r.ongoingLatencyOrError() + rLatency, rErr := w.mu.writers[i].r.ongoingLatencyOrError() require.Equal(t, time.Duration(0), rLatency) - if w.writers[i].createError != nil { - require.Equal(t, rErr, w.writers[i].createError) + if w.mu.writers[i].createError != nil { + require.Equal(t, rErr, w.mu.writers[i].createError) } errStr := "no error" if rErr != nil { @@ -179,6 +179,14 @@ func TestFailoverWriter(t *testing.T) { ) datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + getLogFunc := func(b *strings.Builder) { + llse := w.getLog() + fmt.Fprintf(b, "getLog: num: %d\n", llse.num) + for _, s := range llse.segments { + fmt.Fprintf(b, " segment %d: size %d closed %t dir: %s\n", + s.logNameIndex, s.approxFileSize, s.synchronouslyClosed, s.dir.Dirname) + } + } closeFunc := func(closeKind closeKind, stopGoroutines bool) string { if closeKind != waitForCloseToFinish { closeSemCount = queueSemChanCap @@ -231,6 +239,7 @@ func TestFailoverWriter(t *testing.T) { if metrics.WriteThroughput.Bytes > 0 { testutils.DurationIsAtLeast(t, metrics.WriteThroughput.WorkDuration, time.Nanosecond) } + getLogFunc(&b) } if stopGoroutines { // We expect the Close to complete without stopping all the @@ -244,15 +253,30 @@ func TestFailoverWriter(t *testing.T) { } return b.String() } - createWriter := func(noWaitForLogWriterCreation bool) { + createWriter := func(noWaitForLogWriterCreation bool, firstCallInitialFileSize int) { wn := nextWALNum nextWALNum++ var err error stopper = newStopper() + numCreateCalls := 0 + testLogCreator := simpleLogCreator + if firstCallInitialFileSize > 0 { + testLogCreator = func( + dir Dir, wn NumWAL, li logNameIndex, r *latencyAndErrorRecorder, jobID int, + ) (f vfs.File, initialFileSize uint64, err error) { + f, _, err = simpleLogCreator(dir, wn, li, r, jobID) + if numCreateCalls == 0 { + initialFileSize = uint64(firstCallInitialFileSize) + } + numCreateCalls++ + return f, initialFileSize, err + } + } logWriterCreated = make(chan struct{}, 100) w, err = newFailoverWriter(failoverWriterOpts{ wn: wn, timeSource: defaultTime{}, + logCreator: testLogCreator, preallocateSize: func() int { return 0 }, queueSemChan: queueSemChan, stopper: stopper, @@ -268,6 +292,7 @@ func TestFailoverWriter(t *testing.T) { case "init": var injs []errorfs.Injector var noWriter bool + var initialFileSize int for _, cmdArg := range td.CmdArgs { switch cmdArg.Key { case "inject-errors": @@ -284,6 +309,8 @@ func TestFailoverWriter(t *testing.T) { } case "no-writer": noWriter = true + case "initial-file-size": + td.ScanArgs(t, "initial-file-size", &initialFileSize) default: return fmt.Sprintf("unknown arg %s", cmdArg.Key) } @@ -295,7 +322,7 @@ func TestFailoverWriter(t *testing.T) { fs = newBlockingFS(fs) setDirsFunc(t, fs, &testDirs) if !noWriter { - createWriter(false) + createWriter(false, initialFileSize) } return "" @@ -304,7 +331,11 @@ func TestFailoverWriter(t *testing.T) { if td.HasArg("no-wait") { noWaitForLogWriterCreation = true } - createWriter(noWaitForLogWriterCreation) + var initialFileSize int + if td.HasArg("initial-file-size") { + td.ScanArgs(t, "initial-file-size", &initialFileSize) + } + createWriter(noWaitForLogWriterCreation, initialFileSize) return "" case "write": @@ -359,6 +390,11 @@ func TestFailoverWriter(t *testing.T) { case "close-async": return closeFunc(closeAsync, false) + case "get-log": + var b strings.Builder + getLogFunc(&b) + return b.String() + case "ongoing-latency": var index int td.ScanArgs(t, "writer-index", &index) @@ -376,7 +412,7 @@ func TestFailoverWriter(t *testing.T) { } // Timeout eventually, if the state is unexpected. for i := 0; i < 4000; i++ { - d, _ = w.writers[index].r.ongoingLatencyOrError() + d, _ = w.mu.writers[index].r.ongoingLatencyOrError() if (d > 0) == expectedOngoing { return returnStr() } @@ -603,6 +639,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) { ww, err := newFailoverWriter(failoverWriterOpts{ wn: 0, timeSource: defaultTime{}, + logCreator: simpleLogCreator, preallocateSize: func() int { return 0 }, queueSemChan: queueSemChan, stopper: stopper, diff --git a/wal/reader.go b/wal/reader.go index 7641f999b6..7f75587fa2 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -123,7 +123,7 @@ func Scan(dirs ...Dir) (Logs, error) { return wals, nil } -// Logs holds a collection of WAL files. +// Logs holds a collection of WAL files, in increasing order of NumWAL. type Logs []LogicalLog // Get retrieves the WAL with the given number if present. The second return diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index 1047bf0e01..2271c3fae3 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -105,10 +105,10 @@ func (m *StandaloneManager) Obsolete( } if noRecycle || !m.recycler.Add(fi) { toDelete = append(toDelete, DeletableLog{ - FS: m.o.Primary.FS, - Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, base.MakeFilename(base.FileTypeLog, fi.FileNum)), - NumWAL: NumWAL(fi.FileNum), - FileSize: fi.FileSize, + FS: m.o.Primary.FS, + Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, base.MakeFilename(base.FileTypeLog, fi.FileNum)), + NumWAL: NumWAL(fi.FileNum), + ApproxFileSize: fi.FileSize, }) } } diff --git a/wal/testdata/failover_writer/blocking b/wal/testdata/failover_writer/blocking index 7bde28621f..ee809aeaf6 100644 --- a/wal/testdata/failover_writer/blocking +++ b/wal/testdata/failover_writer/blocking @@ -28,6 +28,8 @@ close: ok, offset: 17 records: record 0: synced write bytes metric: 28 +getLog: num: 0 + segment 0: size 17 closed true dir: pri log files: pri/000000.log 0: woolly @@ -135,12 +137,21 @@ records: record 2: synced record 3: no sync write bytes metric: 59 +getLog: num: 1 + segment 0: size 35 closed false dir: pri + segment 1: size 48 closed true dir: sec # Do a noop switch. switch ---- ok +get-log +---- +getLog: num: 1 + segment 0: size 35 closed false dir: pri + segment 1: size 48 closed true dir: sec + # First log writer is still trying to close, but blocked on the write. ongoing-latency writer-index=0 ---- @@ -180,7 +191,7 @@ blocking-conf filename=000002.log create ---- 000002.log: 0b1 -create-writer-after-init no-wait +create-writer-after-init no-wait initial-file-size=20 ---- write sync=true value=woolly print-offset @@ -206,6 +217,15 @@ write sync=false value=yak print-offset ---- offset: 47 +# The initial-file-size of 20 is observed as the approx file size, since +# failoverWriter has not been closed. +get-log +---- +getLog: num: 2 + segment 0: size 20 closed false dir: pri + +# The approx file size is updated to be the larger of the initial-file-size and +# the current approximate, so it becomes 47. close ---- close: ok, offset: 47 @@ -214,6 +234,8 @@ records: record 1: synced record 2: no sync write bytes metric: 58 +getLog: num: 2 + segment 0: size 47 closed true dir: pri log files: pri/000002.log 0: woolly @@ -278,6 +300,8 @@ close: ok, offset: 6 records: record 0: synced write bytes metric: 28 +getLog: num: 3 + segment 1: size 17 closed true dir: sec log files: pri/000003.log EOF @@ -352,6 +376,8 @@ close: ok, offset: 18 records: record 0: synced write bytes metric: 29 +getLog: num: 4 + segment 1: size 18 closed true dir: sec # First writer is still blocked. ongoing-latency writer-index=0 diff --git a/wal/testdata/failover_writer/errors b/wal/testdata/failover_writer/errors index b15d7883c2..66167523de 100644 --- a/wal/testdata/failover_writer/errors +++ b/wal/testdata/failover_writer/errors @@ -1,7 +1,9 @@ # Switch once with tail of first log equal to the head of the second log. This # is because the record at the tail did not request sync, so stayed in the # queue when the switch happened, and was replayed. -init +# +# Large initial-file-size due to log recycling. +init initial-file-size=500 ---- write sync=true value=woolly @@ -21,6 +23,13 @@ write sync=false value=yak print-offset ---- offset: 47 +# The approx file size reflects the initial-file-size. +get-log +---- +getLog: num: 0 + segment 0: size 500 closed false dir: pri + segment 1: size 0 closed false dir: sec + close ---- close: ok, offset: 47 @@ -29,6 +38,9 @@ records: record 1: no sync record 2: no sync write bytes metric: 41 +getLog: num: 0 + segment 0: size 500 closed false dir: pri + segment 1: size 30 closed true dir: sec log files: pri/000000.log 0: woolly @@ -56,6 +68,10 @@ write sync=true value=sheep print-offset ---- offset: 11 +get-log +---- +getLog: num: 1 + switch ---- ok @@ -67,6 +83,8 @@ records: record 0: synced record 1: synced write bytes metric: 44 +getLog: num: 1 + segment 1: size 33 closed true dir: sec log files: sec/000001-001.log 0: woolly @@ -91,6 +109,12 @@ switch ---- ok +get-log +---- +getLog: num: 2 + segment 0: size 0 closed false dir: pri + segment 1: size 0 closed false dir: sec + close ---- close: ok, offset: 35 @@ -98,6 +122,9 @@ records: record 0: synced record 1: synced write bytes metric: 46 +getLog: num: 2 + segment 0: size 35 closed false dir: pri + segment 1: size 35 closed true dir: sec log files: pri/000002.log EOF @@ -136,6 +163,9 @@ records: record 0: synced record 1: synced write bytes metric: 44 +getLog: num: 3 + segment 0: size 33 closed false dir: pri + segment 1: size 33 closed true dir: sec log files: pri/000003.log EOF @@ -155,6 +185,7 @@ close ---- close: injected error, offset: 0 write bytes metric: 0 +getLog: num: 4 log writers: writer 0: injected error @@ -176,6 +207,8 @@ close: injected error, offset: 17 records: record 0: synced write bytes metric: 17 +getLog: num: 5 + segment 0: size 17 closed true dir: pri log files: pri/000005.log 0: woolly @@ -237,6 +270,7 @@ close: injected error, offset: 6 records: record 0: sync error injected error write bytes metric: 0 +getLog: num: 6 log writers: writer 0: injected error writer 1: injected error @@ -299,6 +333,7 @@ close: injected error, offset: 6 records: record 0: no sync write bytes metric: 0 +getLog: num: 7 log writers: writer 0: injected error writer 1: injected error @@ -328,6 +363,8 @@ close: ok, offset: 6 records: record 0: no sync write bytes metric: 28 +getLog: num: 8 + segment 1: size 17 closed true dir: sec log files: sec/000008-001.log 0: woolly @@ -358,6 +395,8 @@ close: ok, offset: 6 records: record 0: synced write bytes metric: 28 +getLog: num: 9 + segment 2: size 17 closed true dir: pri log files: pri/000009-002.log 0: woolly @@ -388,6 +427,7 @@ close: injected error, offset: 6 records: record 0: sync error injected error write bytes metric: 0 +getLog: num: 10 log writers: writer 0: injected error writer 1: injected error diff --git a/wal/testdata/manager_failover b/wal/testdata/manager_failover index c8e3a0af57..29ab602700 100644 --- a/wal/testdata/manager_failover +++ b/wal/testdata/manager_failover @@ -2,6 +2,7 @@ init-manager ---- ok +recycler min-log-num: 0 # Wait for monitor ticker to start. advance-time dur=1ms wait-monitor @@ -54,12 +55,138 @@ close-manager ---- ok +# The min recyable log num bumps up to 3. +init-manager reuse-fs +---- +ok +recycler min-log-num: 3 + +list-and-stats +---- +logs: + 000001: {(pri,000)} + 000002: {(pri,000)} +stats: + obsolete: count 0 size 0 + live: count 2 size 22 + +# Wait for monitor ticker to start. +advance-time dur=1ms wait-monitor +---- +monitor state: dir index: 0 +now: 1ms + +create-writer wal-num=5 +---- +ok + +# Ensure LogWriter is created, so that it does not race with advance-time +# below. +advance-time dur=0ms wait-for-log-writer +---- +now: 1ms + +write-record value=mammoth +---- +offset: 18 + +close-writer +---- +ok + +create-writer wal-num=7 +---- +ok + +# Ensure LogWriter is created, so that it does not race with advance-time +# below. +advance-time dur=0ms wait-for-log-writer +---- +now: 1ms + +write-record value=sheep +---- +offset: 16 + +close-writer +---- +ok + +list-and-stats +---- +logs: + 000001: {(pri,000)} + 000002: {(pri,000)} + 000005: {(pri,000)} + 000007: {(pri,000)} +stats: + obsolete: count 0 size 0 + live: count 4 size 56 + +obsolete min-unflushed=7 +---- +ok +recycler non-empty, front filenum: 5 size: 18 +to delete: + wal 1: path: pri/000001.log size: 11 + wal 2: path: pri/000002.log size: 11 + +obsolete min-unflushed=8 no-recycle +---- +ok +recycler non-empty, front filenum: 5 size: 18 +to delete: + wal 7: path: pri/000007.log size: 16 + +# Reuses a file from the recycler. +create-writer wal-num=9 +---- +ok + +advance-time dur=0ms wait-for-log-writer +---- +now: 1ms + +# Because of the reuse, the file size for this live log is non-zero. +list-and-stats +---- +logs: + 000009: {(pri,000)} +stats: + obsolete: count 0 size 0 + live: count 1 size 29 + +write-record value=woolly +---- +offset: 17 + +write-record value=rhinoceros +---- +offset: 38 + +close-writer +---- +ok + +list-and-stats +---- +logs: + 000009: {(pri,000)} +stats: + obsolete: count 0 size 0 + live: count 1 size 38 + +close-manager +---- +ok + # Test where error on first log file creation causes switch to secondary, and # the secondary creation blocks for too long, causing switch back to the # primary. init-manager inject-errors=((ErrInjected (And Writes (PathMatch "*/000001.log") (OnIndex 0)))) ---- ok +recycler min-log-num: 0 block-io-config filename=000001-001.log create ---- @@ -92,6 +219,10 @@ advance-time dur=0ms wait-ongoing-io ---- now: 77ms +write-record value=mammoth +---- +offset: 7 + elevate-write-stall-threshold ---- true @@ -107,18 +238,26 @@ now: 157ms wait-for-and-unblock-io filename=000001-001.log ---- -advance-time dur=1s ----- -now: 1.157s - -elevate-write-stall-threshold +advance-time dur=0ms wait-for-log-writer ---- -true +now: 157ms close-writer ---- ok +list-and-stats +---- +logs: + 000001: {(pri,002)} +stats: + obsolete: count 0 size 0 + live: count 1 size 18 + +advance-time dur=1s +---- +now: 1.157s + elevate-write-stall-threshold ---- true @@ -131,14 +270,43 @@ elevate-write-stall-threshold ---- false +# Log on primary is not recycled since logNameIndex > 0. +obsolete min-unflushed=2 +---- +ok +recycler empty +to delete: + wal 1: path: pri/000001-002.log size: 18 + create-writer wal-num=2 ---- ok +advance-time dur=0ms wait-for-log-writer +---- +now: 11.157s + +write-record value=sheep +---- +offset: 16 + close-writer ---- ok +list-and-stats +---- +logs: + 000002: {(pri,000)} +stats: + obsolete: count 0 size 0 + live: count 1 size 16 + +obsolete min-unflushed=3 +---- +ok +recycler non-empty, front filenum: 2 size: 16 + close-manager ---- ok @@ -155,6 +323,7 @@ sec/000001-001.log init-manager inject-errors=((ErrInjected (And Writes (PathMatch "sec/*.log")))) ---- ok +recycler min-log-num: 0 # Block creation of the first 3 files in the primary. block-io-config filename=000001.log create @@ -296,6 +465,7 @@ pri/000001.log init-manager inject-errors=((ErrInjected (And Writes (PathMatch "*/000001.log")))) ---- ok +recycler min-log-num: 0 # Wait for monitor ticker to start. advance-time dur=0ms wait-monitor diff --git a/wal/wal.go b/wal/wal.go index d3d6660ed5..50593e0995 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -286,7 +286,7 @@ type DeletableLog struct { // Path to the file. Path string NumWAL - FileSize uint64 + ApproxFileSize uint64 } // SyncOptions has non-nil Done and Err when fsync is requested, else both are