diff --git a/record/log_writer.go b/record/log_writer.go index 3b1fa775fee..9ed166f390e 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -150,12 +150,14 @@ func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{} *slot.err = err slot.wg = nil slot.err = nil - // We need to bump the tail count before signalling the wait group as - // signalling the wait group can trigger release a blocked goroutine which - // will try to enqueue before we've "freed" space in the queue. + // We need to bump the tail count before releasing the queueSemChan + // semaphore as releasing the semaphore can cause a blocked goroutine to + // acquire the semaphore and enqueue before we've "freed" space in the + // queue. q.headTail.Add(1) wg.Done() - // Is always non-nil in production. + // Is always non-nil in production, unless using wal package for WAL + // failover. if queueSemChan != nil { <-queueSemChan } @@ -164,17 +166,160 @@ func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{} return nil } +// pendingSyncs abstracts out the handling of pending sync requests. In +// standalone mode the implementation is a thin wrapper around syncQueue. In +// the mode where the LogWriter can be subject to failover, there is no queue +// kept in the LogWriter and the signaling to those waiting for sync is +// handled in the wal package. +type pendingSyncs interface { + push(PendingSync) + setBlocked() + clearBlocked() + empty() bool + snapshotForPop() pendingSyncsSnapshot + pop(snap pendingSyncsSnapshot, err error) error +} + +type pendingSyncsSnapshot interface { + empty() bool +} + +// PendingSync abstracts the sync specification for a record queued on the +// LogWriter. The only implementations are provided in this package since +// syncRequested is not exported. +type PendingSync interface { + syncRequested() bool +} + +// The implementation of pendingSyncs in standalone mode. +type pendingSyncsWithSyncQueue struct { + syncQueue + syncQueueLen *base.GaugeSampleMetric + // See the comment for LogWriterConfig.QueueSemChan. + queueSemChan chan struct{} +} + +var _ pendingSyncs = &pendingSyncsWithSyncQueue{} + +func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) { + ps2 := ps.(pendingSyncForSyncQueue) + q.syncQueue.push(ps2.wg, ps2.err) +} + +func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot { + head, tail, realLength := q.syncQueue.load() + snap := syncQueueSnapshot{ + head: head, + tail: tail, + } + q.syncQueueLen.AddSample(int64(realLength)) + return snap +} + +func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error { + s := snap.(syncQueueSnapshot) + return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan) +} + +// The implementation of pendingSyncsSnapshot in standalone mode. +type syncQueueSnapshot struct { + head, tail uint32 +} + +func (s syncQueueSnapshot) empty() bool { + return s.head == s.tail +} + +// The implementation of pendingSync in standalone mode. +type pendingSyncForSyncQueue struct { + wg *sync.WaitGroup + err *error +} + +func (ps pendingSyncForSyncQueue) syncRequested() bool { + return ps.wg != nil +} + +// The implementation of pendingSyncs in failover mode. +type pendingSyncsWithHighestSyncIndex struct { + // The highest "index" queued that is requesting a sync. Initialized + // to NoSyncIndex, and reset to NoSyncIndex after the sync. + index atomic.Int64 + // blocked is an atomic boolean which indicates whether syncing is currently + // blocked or can proceed. It is used by the implementation of + // min-sync-interval to block syncing until the min interval has passed. + blocked atomic.Bool + externalSyncQueueCallback ExternalSyncQueueCallback +} + +// NoSyncIndex is the value of PendingSyncIndex when a sync is not requested. +const NoSyncIndex = -1 + +func (si *pendingSyncsWithHighestSyncIndex) init( + externalSyncQueueCallback ExternalSyncQueueCallback, +) { + si.index.Store(NoSyncIndex) + si.externalSyncQueueCallback = externalSyncQueueCallback +} + +func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) { + ps2 := ps.(PendingSyncIndex) + si.index.Store(ps2.Index) +} + +func (si *pendingSyncsWithHighestSyncIndex) setBlocked() { + si.blocked.Store(true) +} + +func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() { + si.blocked.Store(false) +} + +func (si *pendingSyncsWithHighestSyncIndex) empty() bool { + return si.index.Load() == NoSyncIndex +} + +func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot { + return PendingSyncIndex{Index: si.index.Load()} +} + +func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error { + index := snap.(PendingSyncIndex) + if index.Index == NoSyncIndex { + return nil + } + // Set to NoSyncIndex if a higher index has not queued. + si.index.CompareAndSwap(index.Index, NoSyncIndex) + si.externalSyncQueueCallback(index, err) + return nil +} + +// PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync. +type PendingSyncIndex struct { + // Index is some state meaningful to the user of LogWriter. The LogWriter + // itself only examines whether Index is equal to NoSyncIndex. + Index int64 +} + +func (s PendingSyncIndex) empty() bool { + return s.Index == NoSyncIndex +} + +func (s PendingSyncIndex) syncRequested() bool { + return s.Index != NoSyncIndex +} + // flusherCond is a specialized condition variable that allows its condition to // change and readiness be signalled without holding its associated mutex. In // particular, when a waiter is added to syncQueue atomically, this condition // variable can be signalled without holding flusher.Mutex. type flusherCond struct { mu *sync.Mutex - q *syncQueue + q pendingSyncs cond sync.Cond } -func (c *flusherCond) init(mu *sync.Mutex, q *syncQueue) { +func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) { c.mu = mu c.q = q // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L @@ -286,8 +431,10 @@ type LogWriter struct { minSyncInterval durationFunc fsyncLatency prometheus.Histogram pending []*block - syncQ syncQueue - metrics *LogWriterMetrics + // Pushing and popping from pendingSyncs does not require flusher mutex to + // be held. + pendingSyncs pendingSyncs + metrics *LogWriterMetrics } // afterFunc is a hook to allow tests to mock out the timer functionality @@ -295,8 +442,12 @@ type LogWriter struct { // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer - // See the comment for LogWriterConfig.QueueSemChan. - queueSemChan chan struct{} + // Backing for both pendingSyncs implementations. + // + // TODO(sumeer): do we really care to optimize memory allocations given + // LogWriters are not constructed at a high rate? + pendingSyncsBackingQ pendingSyncsWithSyncQueue + pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex } // LogWriterConfig is a struct used for configuring new LogWriters @@ -308,8 +459,25 @@ type LogWriterConfig struct { // the syncQueue from overflowing (which will cause a panic). All production // code ensures this is non-nil. QueueSemChan chan struct{} + + // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used + // as part of a WAL implementation that can failover between LogWriters. + // + // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must + // be used with a PendingSync parameter that is implemented by + // PendingSyncIndex. When an index is synced (which implies all earlier + // indices are also synced), this callback is invoked. The caller must not + // hold any mutex when invoking this callback, since the lock ordering + // requirement in this case is that any higher layer locks (in the wal + // package) precede the lower layer locks (in the record package). These + // callbacks are serialized since they are invoked from the flushLoop. + ExternalSyncQueueCallback ExternalSyncQueueCallback } +// ExternalSyncQueueCallback is to be run when a PendingSync has been +// processed, either successfully or with an error. +type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error) + // initialAllocatedBlocksCap is the initial capacity of the various slices // intended to hold LogWriter blocks. The LogWriter may allocate more blocks // than this threshold allows. @@ -323,6 +491,9 @@ var blockPool = sync.Pool{ } // NewLogWriter returns a new LogWriter. +// +// The io.Writer may also be used as an io.Closer and syncer. No other methods +// will be called on the writer. func NewLogWriter( w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig, ) *LogWriter { @@ -340,14 +511,25 @@ func NewLogWriter( afterFunc: func(d time.Duration, f func()) syncTimer { return time.AfterFunc(d, f) }, - queueSemChan: logWriterConfig.QueueSemChan, } + m := &LogWriterMetrics{} + if logWriterConfig.ExternalSyncQueueCallback != nil { + r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback) + r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex + } else { + r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{ + syncQueueLen: &m.SyncQueueLen, + queueSemChan: logWriterConfig.QueueSemChan, + } + r.flusher.pendingSyncs = &r.pendingSyncsBackingQ + } + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = blockPool.Get().(*block) - r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) + r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs) r.flusher.closed = make(chan struct{}) r.flusher.pending = make([]*block, 0, cap(r.free.blocks)) - r.flusher.metrics = &LogWriterMetrics{} + r.flusher.metrics = m f := &r.flusher f.minSyncInterval = logWriterConfig.WALMinSyncInterval @@ -423,14 +605,17 @@ func (w *LogWriter) flushLoop(context.Context) { // the current block can be added to the pending blocks list after we release // the flusher lock, but it won't be part of pending. written := w.block.written.Load() - if len(f.pending) > 0 || written > w.block.flushed || !f.syncQ.empty() { + // TODO(sumeer): pendingSyncs.empty() ought to account for whether + // syncing is blocked. Will we spin currently? Also take a careful look + // at flusherCond. + if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() { break } if f.close { // If the writer is closed, pretend the sync timer fired immediately so // that we can process any queued sync requests. - f.syncQ.clearBlocked() - if !f.syncQ.empty() { + f.pendingSyncs.clearBlocked() + if !f.pendingSyncs.empty() { break } return @@ -439,6 +624,18 @@ func (w *LogWriter) flushLoop(context.Context) { continue } // Found work to do, so no longer idle. + // + // NB: it is safe to read pending before loading from the syncQ since + // mutations to pending require the w.flusher mutex, which is held here. + // There is no risk that someone will concurrently add to pending, so the + // following sequence, which would pick up a syncQ entry without the + // corresponding data, is impossible: + // + // Thread enqueueing This thread + // 1. read pending + // 2. add block to pending + // 3. add to syncQ + // 4. read syncQ workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) pending = append(pending[:0], f.pending...) @@ -448,8 +645,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Grab the list of sync waiters. Note that syncQueue.load() will return // 0,0 while we're waiting for the min-sync-interval to expire. This // allows flushing to proceed even if we're not ready to sync. - head, tail, realSyncQLen := f.syncQ.load() - f.metrics.SyncQueueLen.AddSample(int64(realSyncQLen)) + snap := f.pendingSyncs.snapshotForPop() // Grab the portion of the current block that requires flushing. Note that // the current block can be added to the pending blocks list after we @@ -461,25 +657,27 @@ func (w *LogWriter) flushLoop(context.Context) { data := w.block.buf[w.block.flushed:written] w.block.flushed = written + fErr := f.err + f.Unlock() // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. - if f.err != nil { - f.syncQ.pop(head, tail, f.err, w.queueSemChan) + if fErr != nil { + f.pendingSyncs.pop(snap, fErr) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. idleStartTime = time.Now() + f.Lock() continue } - f.Unlock() - synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail) + synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap) f.Lock() if synced && f.fsyncLatency != nil { f.fsyncLatency.Observe(float64(syncLatency)) } f.err = err if f.err != nil { - f.syncQ.clearBlocked() + f.pendingSyncs.clearBlocked() // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -491,10 +689,10 @@ func (w *LogWriter) flushLoop(context.Context) { // A sync was performed. Make sure we've waited for the min sync // interval before syncing again. if min := f.minSyncInterval(); min > 0 { - f.syncQ.setBlocked() + f.pendingSyncs.setBlocked() if syncTimer == nil { syncTimer = w.afterFunc(min, func() { - f.syncQ.clearBlocked() + f.pendingSyncs.clearBlocked() f.ready.Signal() }) } else { @@ -512,7 +710,7 @@ func (w *LogWriter) flushLoop(context.Context) { } func (w *LogWriter) flushPending( - data []byte, pending []*block, head, tail uint32, + data []byte, pending []*block, snap pendingSyncsSnapshot, ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) { defer func() { // Translate panics into errors. The errors will cause flushLoop to shut @@ -535,13 +733,13 @@ func (w *LogWriter) flushPending( _, err = w.w.Write(data) } - synced = head != tail + synced = !snap.empty() if synced { if err == nil && w.s != nil { syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil { + if popErr := f.pendingSyncs.pop(snap, err); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } @@ -661,6 +859,15 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, ) (logSize int64, err2 error) { + return w.SyncRecordGeneralized(p, pendingSyncForSyncQueue{ + wg: wg, + err: err, + }) +} + +// SyncRecordGeneralized is a version of SyncRecord that accepts a +// PendingSync. +func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) { if w.err != nil { return -1, w.err } @@ -673,14 +880,14 @@ func (w *LogWriter) SyncRecord( p = w.emitFragment(i, p) } - if wg != nil { + if ps.syncRequested() { // If we've been asked to persist the record, add the WaitGroup to the sync // queue and signal the flushLoop. Note that flushLoop will write partial // blocks to the file if syncing has been requested. The contract is that // any record written to the LogWriter to this point will be flushed to the // OS and synced to disk. f := &w.flusher - f.syncQ.push(wg, err) + f.pendingSyncs.push(ps) f.ready.Signal() } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 973105af2ae..e8ae523f08a 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -76,11 +76,12 @@ func TestSyncQueue(t *testing.T) { func TestFlusherCond(t *testing.T) { var mu sync.Mutex - var q syncQueue var c flusherCond var closed bool - c.init(&mu, &q) + psq := &pendingSyncsWithSyncQueue{} + c.init(&mu, psq) + q := &psq.syncQueue var flusherWG sync.WaitGroup flusherWG.Add(1) @@ -299,7 +300,8 @@ func TestMinSyncInterval(t *testing.T) { } // NB: we can't use syncQueue.load() here as that will return 0,0 while the // syncQueue is blocked. - head, tail := w.flusher.syncQ.unpack(w.flusher.syncQ.headTail.Load()) + syncQ := w.flusher.pendingSyncs.(*pendingSyncsWithSyncQueue) + head, tail := syncQ.unpack(syncQ.headTail.Load()) waiters := head - tail if waiters != uint32(i+1) { t.Fatalf("expected %d waiters, but found %d", i+1, waiters) diff --git a/wal/failover_manager.go b/wal/failover_manager.go new file mode 100644 index 00000000000..779d97ed127 --- /dev/null +++ b/wal/failover_manager.go @@ -0,0 +1,478 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "sync" + "time" + + "github.com/cockroachdb/pebble/vfs" + "golang.org/x/exp/rand" +) + +// dirProber probes the primary dir, until it is confirmed to be healthy. If +// it doesn't have enough samples, it is deemed to be unhealthy. It is only +// used for failback to the primary. +type dirProber struct { + fs vfs.FS + filename string + interval time.Duration + buf [100 << 10]byte + enabled chan bool + mu struct { + sync.Mutex + history [probeHistoryLength]time.Duration + firstProbeIndex int + nextProbeIndex int + } +} + +const probeHistoryLength = 128 + +// Large value. +const failedProbeDuration = 24 * 60 * 60 * time.Second + +// There is no close/stop method on the dirProber -- use stopper. +func (p *dirProber) init(fs vfs.FS, filename string, interval time.Duration, stopper *stopper) { + *p = dirProber{ + fs: fs, + filename: filename, + interval: interval, + enabled: make(chan bool), + } + // Random bytes for writing, to defeat any FS compression optimization. + _, err := rand.Read(p.buf[:]) + if err != nil { + panic(err) + } + stopper.runAsync(func() { + p.probeLoop(stopper.shouldQuiesce()) + }) +} + +func (p *dirProber) probeLoop(shouldQuiesce <-chan struct{}) { + ticker := time.NewTicker(p.interval) + ticker.Stop() + done := false + var enabled bool + for !done { + select { + case <-ticker.C: + if !enabled { + // Could have a tick waiting before we disabled it. Ignore. + continue + } + probeDur := func() time.Duration { + // Delete, create, write, sync. + start := time.Now() + _ = p.fs.Remove(p.filename) + f, err := p.fs.Create(p.filename) + if err != nil { + return failedProbeDuration + } + defer f.Close() + n, err := f.Write(p.buf[:]) + if err != nil { + return failedProbeDuration + } + if n != len(p.buf) { + panic("invariant violation") + } + err = f.Sync() + if err != nil { + return failedProbeDuration + } + return time.Since(start) + }() + p.mu.Lock() + nextIndex := p.mu.nextProbeIndex % probeHistoryLength + p.mu.history[nextIndex] = probeDur + firstIndex := p.mu.firstProbeIndex % probeHistoryLength + if firstIndex == nextIndex { + // Wrapped around + p.mu.firstProbeIndex++ + } + p.mu.nextProbeIndex++ + p.mu.Unlock() + + case <-shouldQuiesce: + done = true + + case enabled = <-p.enabled: + if enabled { + ticker.Reset(p.interval) + } else { + ticker.Stop() + p.mu.Lock() + p.mu.firstProbeIndex = 0 + p.mu.nextProbeIndex = 0 + p.mu.Unlock() + } + } + } +} + +func (p *dirProber) enableProbing() { + p.enabled <- true +} + +func (p *dirProber) disableProbing() { + p.enabled <- false +} + +func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex + samplesNeeded := int((interval + p.interval - 1) / p.interval) + if samplesNeeded == 0 { + panic("interval is too short") + } else if samplesNeeded > probeHistoryLength { + panic("interval is too long") + } + if samplesNeeded < numSamples { + return failedProbeDuration, failedProbeDuration + } + offset := numSamples - samplesNeeded + var sum, max time.Duration + for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ { + sampleDur := p.mu.history[i%probeHistoryLength] + sum += sampleDur + if max < sampleDur { + max = sampleDur + } + } + mean := sum / time.Duration(samplesNeeded) + return mean, max +} + +type dirIndex int + +const ( + primaryDirIndex dirIndex = iota + secondaryDirIndex + numDirIndices +) + +type dirAndFileHandle struct { + Dir + vfs.File +} + +type failoverMonitorOptions struct { + dirs [numDirIndices]dirAndFileHandle + + primaryDirProbeInterval time.Duration + healthyProbeLatencyThreshold time.Duration + healthyInterval time.Duration + + unhealthySamplingInterval time.Duration + unhealthyOperationLatencyThreshold time.Duration + + elevatedWriteStallThresholdLag time.Duration + + stopper *stopper +} + +// switchableWriter is a subset of failoverWriter needed by failoverMonitor. +type switchableWriter interface { + switchToNewDir(dirAndFileHandle) error + ongoingLatencyOrErrorForCurDir() (time.Duration, error) +} + +// failoverMonitor monitors the latency and error observed by the +// switchableWriter, and does failover by switching the dir. It also monitors +// the primary dir for failback. +type failoverMonitor struct { + opts failoverMonitorOptions + prober dirProber + mu struct { + sync.Mutex + // dirIndex and lastFailbackTime are only modified by monitorLoop. + dirIndex + lastFailBackTime time.Time + writer switchableWriter + } +} + +func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor { + m := &failoverMonitor{ + opts: opts, + } + m.prober.init(opts.dirs[primaryDirIndex].FS, + opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"), + opts.primaryDirProbeInterval, opts.stopper) + opts.stopper.runAsync(func() { + m.monitorLoop(opts.stopper.shouldQuiesce()) + }) + return m +} + +// Called when previous writer is closed +func (m *failoverMonitor) noWriter() { + m.mu.Lock() + defer m.mu.Unlock() + m.mu.writer = nil +} + +// writerCreateFunc is allowed to return nil. +func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.writer != nil { + panic("previous writer not closed") + } + m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex]) +} + +func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.dirIndex == secondaryDirIndex { + return true + } + intervalSinceFailedback := time.Since(m.mu.lastFailBackTime) + return intervalSinceFailedback > m.opts.elevatedWriteStallThresholdLag +} + +type lastWriterInfo struct { + writer switchableWriter + numSwitches int + ongoingLatencyAtSwitch time.Duration + errorCounts [numDirIndices]int +} + +func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) { + ticker := time.NewTicker(m.opts.unhealthySamplingInterval) + dirIndex := primaryDirIndex + var lastWriter lastWriterInfo + for { + select { + case <-shouldQuiesce: + return + case <-ticker.C: + writerOngoingLatency, writerErr := func() (time.Duration, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.writer != lastWriter.writer { + lastWriter = lastWriterInfo{writer: m.mu.writer} + } + if lastWriter.writer == nil { + return 0, nil + } + return lastWriter.writer.ongoingLatencyOrErrorForCurDir() + }() + switchDir := false + // We don't consider a switch if currently using the primary dir and the + // secondary dir has high enough errors. It is more likely that someone + // has misconfigured a secondary e.g. wrong permissions or not enough + // disk space. We only remember the error history in the context of the + // lastWriter since an operator can fix the underlying misconfiguration. + if !(lastWriter.errorCounts[secondaryDirIndex] > 2 && dirIndex == primaryDirIndex) { + // Switching heuristics. Subject to change based on real world experience. + if writerErr != nil { + // An error causes an immediate switch, since a *LogWriter with an error is useless. + lastWriter.errorCounts[dirIndex]++ + switchDir = true + + } else if writerOngoingLatency > m.opts.unhealthyOperationLatencyThreshold { + // High latency. Switch immediately two times, since that gives us an + // observation of both dirs. After that, decay the switch rate by + // increasing the observed latency needed for a switch. + if lastWriter.numSwitches < 2 || writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch { + switchDir = true + lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency + } + // Else high latency, but not high enough yet to motivate switch. + } else if dirIndex == secondaryDirIndex { + // The writer looks healthy. We can still switch if the writer is using the + // secondary dir and the primary is healthy again. + primaryMean, primaryMax := m.prober.getMeanMax(m.opts.healthyInterval) + if primaryMean < m.opts.healthyProbeLatencyThreshold && primaryMax < m.opts.healthyProbeLatencyThreshold { + switchDir = true + } + } + } + if switchDir { + lastWriter.numSwitches++ + if dirIndex == secondaryDirIndex { + // Switching back to primary, so don't need to probe to see if + // primary is healthy. + m.prober.disableProbing() + dirIndex = primaryDirIndex + } else { + m.prober.enableProbing() + dirIndex = secondaryDirIndex + } + dir := m.opts.dirs[dirIndex] + m.mu.Lock() + m.mu.dirIndex = dirIndex + if dirIndex == primaryDirIndex { + m.mu.lastFailBackTime = time.Now() + } + if m.mu.writer != nil { + m.mu.writer.switchToNewDir(dir) + } + m.mu.Unlock() + } + } + } +} + +type failoverManager struct { + opts Options + // TODO(jackson/sumeer): read-path etc. + + dirHandles [numDirIndices]vfs.File + stopper *stopper + monitor *failoverMonitor +} + +var _ Manager = &failoverManager{} + +// Init implements Manager. +func (wm *failoverManager) Init(o Options) error { + stopper := newStopper() + var dirs [numDirIndices]dirAndFileHandle + for i, dir := range []Dir{o.Primary, o.Secondary} { + dirs[i].Dir = dir + f, err := dir.FS.OpenDir(dir.Dirname) + if err != nil { + return err + } + dirs[i].File = f + } + fmOpts := failoverMonitorOptions{ + dirs: dirs, + // TODO(sumeer): make configurable. + primaryDirProbeInterval: time.Second, + healthyProbeLatencyThreshold: 100 * time.Millisecond, + healthyInterval: 2 * time.Minute, + unhealthySamplingInterval: 100 * time.Millisecond, + unhealthyOperationLatencyThreshold: 200 * time.Millisecond, + elevatedWriteStallThresholdLag: o.ElevatedWriteStallThresholdLag, + stopper: stopper, + } + monitor := newFailoverMonitor(fmOpts) + // TODO(jackson): list dirs and assemble a list of all NumWALs and + // corresponding log files. + + *wm = failoverManager{ + opts: o, + dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File}, + stopper: stopper, + monitor: monitor, + } + return nil +} + +// List implements Manager. +func (wm *failoverManager) List() ([]NumWAL, error) { + // TODO(jackson): + return nil, nil +} + +// Delete implements Manager. +func (wm *failoverManager) Delete(highestObsoleteNum NumWAL) error { + // TODO(sumeer): + return nil +} + +// OpenForRead implements Manager. +func (wm *failoverManager) OpenForRead(wn NumWAL, strictWALTail bool) (Reader, error) { + // TODO(jackson): + return nil, nil +} + +// Create implements Manager. +func (wm *failoverManager) Create(wn NumWAL) (Writer, error) { + fwOpts := failoverWriterOpts{ + wn: wn, + logger: wm.opts.Logger, + noSyncOnClose: wm.opts.NoSyncOnClose, + bytesPerSync: wm.opts.BytesPerSync, + preallocateSize: wm.opts.PreallocateSize, + minSyncInterval: wm.opts.MinSyncInterval, + fsyncLatency: wm.opts.FsyncLatency, + queueSemChan: wm.opts.QueueSemChan, + stopper: wm.stopper, + } + var err error + var ww *failoverWriter + writerCreateFunc := func(dir dirAndFileHandle) switchableWriter { + ww, err = newFailoverWriter(fwOpts, dir, wm) + if err != nil { + return nil + } + return ww + } + wm.monitor.newWriter(writerCreateFunc) + return ww, err +} + +// ElevateWriteStallThresholdForFailover implements Manager. +func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool { + return wm.monitor.elevateWriteStallThresholdForFailover() +} + +func (wm *failoverManager) writerClosed() { + wm.monitor.noWriter() +} + +// Stats implements Manager. +func (wm *failoverManager) Stats() Stats { + // TODO(sumeer): + return Stats{} +} + +// Close implements Manager. +func (wm *failoverManager) Close() error { + wm.stopper.stop() + // Since all goroutines are stopped, can close the dirs. + var err error + for _, f := range wm.dirHandles { + err = firstError(err, f.Close()) + } + return err +} + +type stopper struct { + quiescer chan struct{} // Closed when quiescing + wg sync.WaitGroup +} + +func newStopper() *stopper { + return &stopper{ + quiescer: make(chan struct{}), + } +} + +func (s *stopper) runAsync(f func()) { + s.wg.Add(1) + go func() { + f() + s.wg.Done() + }() +} + +// shouldQuiesce returns a channel which will be closed when stop() has been +// invoked and outstanding goroutines should begin to quiesce. +func (s *stopper) shouldQuiesce() <-chan struct{} { + return s.quiescer +} + +func (s *stopper) stop() { + close(s.quiescer) + s.wg.Wait() +} + +// firstError returns the first non-nil error of err0 and err1, or nil if both +// are nil. +func firstError(err0, err1 error) error { + if err0 != nil { + return err0 + } + return err1 +} diff --git a/wal/failover_writer.go b/wal/failover_writer.go new file mode 100644 index 00000000000..f3c905c7fb0 --- /dev/null +++ b/wal/failover_writer.go @@ -0,0 +1,744 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "io" + "sync" + "sync/atomic" + "time" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/prometheus/client_golang/prometheus" +) + +// TODO(sumeer): remove these notes after ensuring they are done, and +// incorporated into code comments. +// +// Misc temporary notes: +// +// recordQueue size: should we inject fsyncs to keep the memory bounded? +// Doesn't seem necessary for CRDB since there will be frequent fsyncs due to +// raft log appends. Also, it can't grow beyond the memtable size (though +// failing over 64MB, the memtable size, will not be fast). +// +// If switch to secondary failed because of misconfiguration, we should switch +// back to primary and make a note of it. If primary is still unavailable we +// would switch again, so repeated switching which is bad. Misconfiguration +// should result in an error. Consider an error as worse than slowness, so +// don't switch back? +// +// recycle: recycle only those in primary dir. Keep track of those where +// record.logwriter closed. Those are the only ones we can delete or recycle. +// +// Gap and de-duping logic on read: +// Gap can be there in adjacent logs but not between log n and the union of logs 0..n-1. +// Entries 0..50 sent to log 0. +// Entries 0..60 sent to log 1 +// Log 0 commits up to 50 +// Entries 50..100 sent to log 2 +// Log 2 commits all if these. +// Log 1 only commits 0..10. +// +// Write stall threshold: Decay the write stall threshold slowly after +// switching back to primary dir, as flush may take some time. Say 10 +// memtables. Could take 20s to flush. Good thing is that it will only add 1 +// sublevel. + +// recordQueueEntry is an entry in recordQueue. +type recordQueueEntry struct { + p []byte + opts SyncOptions +} + +const initialBufferLen = 8192 + +// recordQueue is a variable size single producer multiple consumer queue. It +// is not lock-free, but most operations only need mu.RLock. It needs a mutex +// to grow the size, since there is no upper bound on the number of queued +// records (which are all the records that are not synced, and will need to be +// written again in case of failover). Additionally, it needs a mutex to +// atomically grab a snapshot of the queued records and provide them to a new +// LogWriter that is being switched to. +type recordQueue struct { + // Only held for reading for all pop operations and most push operations. + // Held for writing when buffer needs to be grown or when switching to a new + // writer. + mu sync.RWMutex + + // queue is [tail, head). tail is the oldest entry and head is the index for + // the next entry. + // + // Consumers: atomically read and write tail in pop (using + // compare-and-swap). This is not the usual kind of queue consumer since + // they already know the index that they are popping exists, hence don't + // need to look at head. + // + // Producer: atomically reads tail in push. Writes to head. + // + // Based on the above we only need tail to be atomic. However, the producer + // also populates entries in buffer, whose values need to be seen by the + // consumers when doing a pop, which means they need to synchronize using a + // release and acquire memory barrier pair, where the push does the release + // and the pop does the acquire. For this reason we make head also atomic + // and merge head and tail into a single atomic, so that the store of head + // in push and the load of tail in pop accomplishes this release-acquire + // pair. + // + // All updates to headTail hold mu at least for reading. So when mu is held + // for writing, there is a guarantee that headTail is not being updated. + // + // head is most-significant 32 bits and tail is least-significant 32 bits. + headTail atomic.Uint64 + + // Access to buffer requires at least RLock. + buffer []recordQueueEntry + + lastTailObservedByProducer uint32 + + // Protected by mu. + writer *record.LogWriter +} + +func (q *recordQueue) init() { + *q = recordQueue{ + buffer: make([]recordQueueEntry, initialBufferLen), + } +} + +// NB: externally synchronized, i.e., no concurrent push calls. +func (q *recordQueue) push(p []byte, opts SyncOptions) (index uint32, writer *record.LogWriter) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if len(q.buffer) == n { + // Full + m := 2 * n + newBuffer := make([]recordQueueEntry, m) + for i := int(t); i < int(h); i++ { + newBuffer[i%m] = q.buffer[i%n] + } + q.mu.Lock() + q.buffer = newBuffer + q.mu.Unlock() + } + q.mu.RLock() + q.buffer[h] = recordQueueEntry{ + p: p, + opts: opts, + } + // Reclaim memory for consumed entries. We couldn't do that in pop since + // multiple consumers are popping using CAS and that immediately transfers + // ownership to the producer. + for i := q.lastTailObservedByProducer; i < t; i++ { + q.buffer[i] = recordQueueEntry{} + } + q.lastTailObservedByProducer = t + q.headTail.Add(1 << headTailBits) + writer = q.writer + q.mu.RUnlock() + return h, writer +} + +func (q *recordQueue) lenForTesting() int { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + return int(h - t) +} + +// Pops all entries. Must be called only after the last push returns. +func (q *recordQueue) popAll() (numRecords int, numSyncsPopped int) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if n == 0 { + return 0, 0 + } + return n, q.pop(h-1, nil) +} + +// Pops all entries up to and including index. The remaining queue is +// [index+1, head). +// +// NB: we could slightly simplify to only have the latest writer be able to +// pop. This would avoid the CAS below, but it seems better to reduce the +// amount of queued work regardless of who has successfully written it. +func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) { + var buf [512]SyncOptions + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + tail := int(t) + maxEntriesToPop := int(index) - tail + 1 + if maxEntriesToPop <= 0 { + return 0 + } + var b []SyncOptions + if maxEntriesToPop <= len(buf) { + b = buf[:maxEntriesToPop] + } else { + b = make([]SyncOptions, maxEntriesToPop) + } + // Allocations were done before acquiring the mutex. + q.mu.RLock() + for i := 0; i < maxEntriesToPop; i++ { + // Grab all the possible entries before doing CAS, since successful CAS + // will also release those buffer slots to the producer. + n := len(q.buffer) + b[i] = q.buffer[(i+tail)%n].opts + } + for { + newHT := makeHeadTail(h, index+1) + if q.headTail.CompareAndSwap(ht, newHT) { + break + } + ht := q.headTail.Load() + h, t = unpackHeadTail(ht) + tail = int(t) + maxEntriesToPop = int(index) - tail + 1 + if maxEntriesToPop <= 0 { + break + } + } + q.mu.RUnlock() + + numEntriesPopped := maxEntriesToPop + if numEntriesPopped <= 0 { + return 0 + } + n := len(b) + for i := n - numEntriesPopped; i < n; i++ { + if b[i].Done != nil { + numSyncsPopped++ + if err != nil { + *b[i].Err = err + } + b[i].Done.Done() + } + } + return numSyncsPopped +} + +func (q *recordQueue) snapshotAndSwitchWriter( + writer *record.LogWriter, snapshotFunc func(firstIndex uint32, entries []recordQueueEntry), +) { + q.mu.Lock() + defer q.mu.Unlock() + q.writer = writer + h, t := unpackHeadTail(q.headTail.Load()) + n := h - t + if n > 0 { + m := uint32(len(q.buffer)) + b := make([]recordQueueEntry, n) + for i := t; i < h; i++ { + b[i-t] = q.buffer[i%m] + } + snapshotFunc(t, b) + } +} + +const headTailBits = 32 + +func unpackHeadTail(ht uint64) (head, tail uint32) { + const mask = 1<> headTailBits) & mask) + tail = uint32(ht & mask) + return head, tail +} + +func makeHeadTail(head, tail uint32) uint64 { + return (uint64(head) << headTailBits) | uint64(tail) +} + +// Maximum number of physical log files when writing a virtual WAL. Arbitrarily +// chosen value. Setting this to 2 will not simplify the code. We make this a +// constant since we want a fixed size array for writer.writers. +const maxPhysicalLogs = 20 + +// failoverWriter is the implementation of Writer in failover mode. No Writer +// method blocks for IO, except for Close. Close will block until all records +// are successfully written and synced to some log writer. Monitoring of log +// writer latency and errors continues after Close is called, which means +// failoverWriter can be switched to a new log writer after Close is called, +// so as to unblock Close. +type failoverWriter struct { + wm *failoverManager + opts failoverWriterOpts + q recordQueue + writers [maxPhysicalLogs]logWriterAndRecorder + mu struct { + sync.Mutex + // cond is signaled when the latest *LogWriter is set in writers, or when + // what was probably the latest *LogWriter is successfully closed. It is + // waited on in Close. + cond *sync.Cond + // nextWriterIndex is advanced before creating the *LogWriter. That is, a + // slot is reserved by taking the current value of nextWriterIndex and + // incrementing it, and then the *LogWriter for that slot is created. When + // newFailoverWriter returns, nextWriterIndex = 1. + // + // The latest *LogWriter is (will be) at nextWriterIndex-1. + // + // INVARIANT: nextWriterIndex <= len(writers) + nextWriterIndex logNameIndex + closed bool + } +} + +type logWriterAndRecorder struct { + // This may never become non-nil, if when the LogWriter was finally created, + // it was no longer the latest writer. Additionally, if there was an error + // in creating the writer, w will remain nil and createError will be set. + w *record.LogWriter + // createError is set if there is an error creating the writer. This is + // useful in Close since we need to know when the work for creating the + // latest writer is done, whether it resulted in success or not. + createError error + r latencyAndErrorRecorder +} + +var _ Writer = &failoverWriter{} + +type failoverWriterOpts struct { + wn NumWAL + logger base.Logger + + // Options that feed into SyncingFileOptions. + noSyncOnClose bool + bytesPerSync int + preallocateSize func() int + + // Options for record.LogWriter. + minSyncInterval func() time.Duration + fsyncLatency prometheus.Histogram + queueSemChan chan struct{} + stopper *stopper + + writerCreatedForTest chan<- struct{} +} + +func newFailoverWriter( + opts failoverWriterOpts, initialDir dirAndFileHandle, wm *failoverManager, +) (*failoverWriter, error) { + ww := &failoverWriter{ + wm: wm, + opts: opts, + } + ww.q.init() + ww.mu.cond = sync.NewCond(&ww.mu) + // The initial record.LogWriter creation also happens via a + // switchToNewWriter since we don't want it to block newFailoverWriter. + err := ww.switchToNewDir(initialDir) + if err != nil { + return nil, err + } + return ww, nil +} + +// Size implements Writer. +func (ww *failoverWriter) Size() uint64 { + // TODO(sumeer): + return 0 +} + +// FileSize implements Writer. +func (ww *failoverWriter) FileSize() uint64 { + // TODO(sumeer): + return 0 +} + +// WriteRecord implements Writer. +func (ww *failoverWriter) WriteRecord(p []byte, opts SyncOptions) error { + recordIndex, writer := ww.q.push(p, opts) + if writer == nil { + // Don't have a record.LogWriter yet. + return nil + } + ps := record.PendingSyncIndex{Index: record.NoSyncIndex} + if opts.Done != nil { + ps.Index = int64(recordIndex) + } + _, err := writer.SyncRecordGeneralized(p, ps) + return err +} + +// switchToNewDir starts switching to dir. It implements switchableWriter. +func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { + ww.mu.Lock() + // Can have a late switchToNewDir call is the failoverMonitor has not yet + // been told that the writer is closed. Ignore. + if ww.mu.closed { + return nil + } + writerIndex := ww.mu.nextWriterIndex + if int(writerIndex) == len(ww.writers) { + ww.mu.Unlock() + return errors.Errorf("exceeded switching limit") + } + ww.mu.nextWriterIndex++ + ww.mu.Unlock() + + ww.opts.stopper.runAsync(func() { + // TODO(sumeer): recycling of logs. + filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex)) + recorderAndWriter := &ww.writers[writerIndex].r + recorderAndWriter.writeStart() + file, err := dir.FS.Create(filename) + recorderAndWriter.writeEnd(err) + // TODO(sumeer): should we fatal if primary dir? At some point it is better + // to fatal instead of continuing to failover. + // base.MustExist(dir.FS, filename, ww.opts.logger, err) + handleErrFunc := func() { + if file != nil { + file.Close() + } + ww.mu.Lock() + defer ww.mu.Unlock() + ww.writers[writerIndex].createError = err + ww.mu.cond.Signal() + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + } + if err != nil { + handleErrFunc() + return + } + err = dir.Sync() + if err != nil { + handleErrFunc() + } + syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{ + NoSyncOnClose: ww.opts.noSyncOnClose, + BytesPerSync: ww.opts.bytesPerSync, + PreallocateSize: ww.opts.preallocateSize(), + }) + recorderAndWriter.setWriter(syncingFile) + + // Using NumWAL as the DiskFileNum is fine since it is used only as + // EOF trailer for safe log recycling. Even though many log files can + // map to a single NumWAL, a file used for NumWAL n at index m will + // never get recycled for NumWAL n at a later index (since recycling + // happens when n as a whole is obsolete). + w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn), + record.LogWriterConfig{ + WALMinSyncInterval: ww.opts.minSyncInterval, + WALFsyncLatency: ww.opts.fsyncLatency, + QueueSemChan: ww.opts.queueSemChan, + ExternalSyncQueueCallback: ww.doneSyncCallback, + }) + closeWriter := func() bool { + ww.mu.Lock() + defer ww.mu.Unlock() + if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed { + // Not the latest writer or the writer was closed while this async + // creation was ongoing. + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return true + } + // Latest writer. + ww.writers[writerIndex].w = w + ww.mu.cond.Signal() + // NB: snapshotAndSwitchWriter does not block on IO, since + // SyncRecordGeneralized does no IO. + ww.q.snapshotAndSwitchWriter(w, func(firstIndex uint32, entries []recordQueueEntry) { + for i := range entries { + ps := record.PendingSyncIndex{Index: record.NoSyncIndex} + if entries[i].opts.Done != nil { + ps.Index = int64(firstIndex) + int64(i) + } + _, err := w.SyncRecordGeneralized(entries[i].p, ps) + if err != nil { + // TODO(sumeer): log periodically. The err will also surface via + // the latencyAndErrorRecorder, so if a switch is possible, it + // will be done. + ww.opts.logger.Errorf("%s", err) + } + } + }) + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return false + }() + if closeWriter { + // Never wrote anything to this writer so don't care about the + // returned error. + ww.opts.stopper.runAsync(func() { + _ = w.Close() + }) + } + }) + return nil +} + +// doneSyncCallback is the record.ExternalSyncQueueCallback called by +// record.LogWriter. +func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) { + // NB: harmless after Close returns since numSyncsPopped will be 0. + numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err) + if ww.opts.queueSemChan != nil { + for i := 0; i < numSyncsPopped; i++ { + <-ww.opts.queueSemChan + } + } +} + +// ongoingLatencyOrErrorForCurDir implements switchableWriter. +func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) { + ww.mu.Lock() + defer ww.mu.Unlock() + if ww.mu.closed { + return 0, nil + } + return ww.writers[ww.mu.nextWriterIndex-1].r.ongoingLatencyOrError() +} + +// Close implements Writer. +// +// NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be +// called after Close is called, and there is also a possibility that they get +// called after Close returns and before failoverMonitor knows that the +// failoverWriter is closed. +// +// doneSyncCallback can be called anytime after Close returns since there +// could be stuck writes that finish arbitrarily later. +func (ww *failoverWriter) Close() error { + // [0, closeCalledCount) have had LogWriter.Close called (though may not + // have finished) or the LogWriter will never be non-nil. Either way, they + // have been "processed". + closeCalledCount := logNameIndex(0) + done := false + var err error + ww.mu.Lock() + // Every iteration starts and ends with the mutex held. + // + // Invariant: ww.mu.nextWriterIndex >= 1. + // + // TODO(sumeer): write a high level comment for the logic below. + for !done { + numWriters := ww.mu.nextWriterIndex + // Want to process [closeCalledCount, numWriters). + // Invariant: numWriters - closeCalledCount >= 1. + if numWriters-closeCalledCount <= 0 { + panic("invariant violation") + } + // Wait until writers[numWriters-1] is either created or has a creation + // error or numWriters advances. We are waiting on IO here, since failover + // is continuing to happen if IO is taking too long. If we run out of + // maxPhysicalLogs, then we will truly block on IO here, but there is + // nothing that can be done in that case. Note that there is a very rare + // case that the recordQueue is already empty, and waiting is unnecessary, + // but so be it. + for { + if ww.writers[numWriters-1].w == nil && ww.writers[numWriters-1].createError == nil { + ww.mu.cond.Wait() + } else { + break + } + numWriters = ww.mu.nextWriterIndex + } + // Invariant: [closeCalledCount, numWriters) have their *LogWriters in the + // final state (the createError may not be in its final state): nil and + // will stay nil, or non-nil. Additionally, numWriters-1 creation has + // completed, so both the *LogWriter and createError is in its final + // state. + + // Unlock, so monitoring and switching can continue happening. + ww.mu.Unlock() + // Process [closeCalledCount, numWriters). + + // Iterate over everything except the latest writer, i.e., + // [closeCalledCount, numWriters-1). These are either non-nil, or will + // forever stay nil. From the previous invariant, we know that + // numWriters-1-closeCalledCount >= 0. + for i := closeCalledCount; i < numWriters-1; i++ { + w := ww.writers[i].w + if w != nil { + // Don't care about the returned error since all the records we relied + // on this writer for were already successfully written. + ww.opts.stopper.runAsync(func() { + _ = w.Close() + }) + } + } + closeCalledCount = numWriters - 1 + ww.mu.Lock() + numWriters = ww.mu.nextWriterIndex + if closeCalledCount < numWriters-1 { + // Haven't processed some non-latest writers. Process them first. + continue + } + // Latest writer, for now. + latestWriterIndex := closeCalledCount + w := ww.writers[latestWriterIndex].w + createErr := ww.writers[latestWriterIndex].createError + closedLatest := false + var closeErr error + if w != nil || createErr != nil { + closeCalledCount++ + if createErr == nil { + if w == nil { + panic("invariant violation") + } + ww.opts.stopper.runAsync(func() { + // Write to iteration local variable closeErr, since we may write to + // a different closeErr in a different iteration. We only read this + // value in the same iteration (below). + closeErr = w.Close() + closedLatest = true + ww.mu.Lock() + ww.mu.cond.Signal() + ww.mu.Unlock() + }) + } else { + if len(ww.writers) == int(latestWriterIndex+1) { + // There are no more slots left. If there are any records queued, we + // have no guarantee that they were written, so propagate the error. + numRecords, _ := ww.q.popAll() + err = createErr + if numRecords == 0 { + err = nil + } + // Exit all loops. + done = true + } + } + } + // Inner loop that exits when: + // - w != nil and w is closed (by the preceding code). + // + // - latestWriterIndex is no longer the latest writer index (this is also + // the case when w is nil due to a createErr). + for !done { + ww.mu.cond.Wait() + if latestWriterIndex < ww.mu.nextWriterIndex-1 { + // Doesn't matter if closed or not, since no longer the latest. + break + // Continue outer for loop. + } + // Still the latest writer. + if closedLatest { + done = true + ww.mu.closed = true + if closeErr == nil { + _, numSyncsPopped := ww.q.popAll() + if numSyncsPopped != 0 { + // The client requested syncs are required to be popped by the + // record.LogWriter. The only records we expect to pop now are the + // tail that did not request a sync, and have been successfully + // synced implicitly by the record.LogWriter. + // + // NB: popAll is not really necessary for correctness. We do this + // to free memory. + panic(errors.AssertionFailedf( + "%d syncs not popped by the record.LogWriter", numSyncsPopped)) + + } + } else { + // If there are any records queued, we have no guarantee that they + // were written, so propagate the error. + numRecords, _ := ww.q.popAll() + err = closeErr + if numRecords == 0 { + err = nil + } + } + // Exit all loops. + break + } else { + // Nothing happened. Either w was nil, and we are still waiting for + // the next writer, or w was not nil, and we are still waiting for + // the close to finish. + continue + } + } + } + ww.mu.Unlock() + // Only nil in some unit tests. + if ww.wm != nil { + ww.wm.writerClosed() + } + return err +} + +func (ww *failoverWriter) Metrics() *record.LogWriterMetrics { + // TODO(sumeer): + return nil +} + +// latencyAndErrorRecorder records ongoing write and sync operations and errors +// in those operations. record.LogWriter cannot continue functioning after any +// error, so all errors are considered permanent. +// +// writeStart/writeEnd are used directly when creating a file. After the file +// is successfully created, setWriter turns latencyAndErrorRecorder into an +// implementation of writerSyncerCloser that will record for the Write and +// Sync methods. +type latencyAndErrorRecorder struct { + ongoingOperationStart atomic.Int64 + error atomic.Pointer[error] + writerSyncerCloser +} + +type writerSyncerCloser interface { + io.Writer + io.Closer + Sync() error +} + +func (r *latencyAndErrorRecorder) writeStart() { + r.ongoingOperationStart.Store(time.Now().UnixNano()) +} + +func (r *latencyAndErrorRecorder) writeEnd(err error) { + if err != nil { + ptr := &err + r.error.Store(ptr) + } + r.ongoingOperationStart.Store(0) +} + +func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) { + r.writerSyncerCloser = w +} + +func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) { + startTime := r.ongoingOperationStart.Load() + var latency time.Duration + if startTime != 0 { + l := time.Now().UnixNano() - startTime + if l < 0 { + l = 0 + } + latency = time.Duration(l) + } + errPtr := r.error.Load() + var err error + if errPtr != nil { + err = *errPtr + } + return latency, err +} + +// Sync implements writerSyncerCloser. +func (r *latencyAndErrorRecorder) Sync() error { + r.writeStart() + err := r.writerSyncerCloser.Sync() + r.writeEnd(err) + return err +} + +// Write implements io.Writer. +func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) { + r.writeStart() + n, err = r.writerSyncerCloser.Write(p) + r.writeEnd(err) + return n, err +} diff --git a/wal/failover_writer_test.go b/wal/failover_writer_test.go new file mode 100644 index 00000000000..43b506df4e1 --- /dev/null +++ b/wal/failover_writer_test.go @@ -0,0 +1,204 @@ +package wal + +import ( + "fmt" + "io" + "slices" + "strings" + "sync" + "testing" + "time" + + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" +) + +func TestFailoverWriter(t *testing.T) { + memFS := vfs.NewStrictMem() + dirs := []dirAndFileHandle{ + {Dir: Dir{FS: memFS, Dirname: "pri"}}, + {Dir: Dir{FS: memFS, Dirname: "sec"}}, + } + for i, dir := range dirs { + require.NoError(t, dir.FS.MkdirAll(dir.Dirname, 0755)) + f, err := dir.FS.OpenDir("") + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + f, err = dir.FS.OpenDir(dir.Dirname) + require.NoError(t, err) + dirs[i].File = f + } + dirIndex := 0 + + type filenameAndFS struct { + name string + fs vfs.FS + } + printLogFiles := func(b *strings.Builder, num NumWAL) { + memFS.ResetToSyncedState() + var filenames []filenameAndFS + prefix := base.DiskFileNum(num).String() + for i := range dirs { + fns, err := dirs[i].FS.List(dirs[i].Dirname) + require.NoError(t, err) + for _, fn := range fns { + if strings.HasPrefix(fn, prefix) { + filenames = append(filenames, filenameAndFS{ + name: dirs[i].FS.PathJoin(dirs[i].Dirname, fn), + fs: dirs[i].FS, + }) + } + } + } + slices.SortFunc(filenames, func(a, b filenameAndFS) int { + return strings.Compare(a.name, b.name) + }) + if len(filenames) > 0 { + fmt.Fprintf(b, "log files:\n") + } + for _, fn := range filenames { + fmt.Fprintf(b, " %s\n", fn.name) + func() { + f, err := fn.fs.Open(fn.name) + require.NoError(t, err) + defer f.Close() + rr := record.NewReader(f, base.DiskFileNum(num)) + for { + offset := rr.Offset() + r, err := rr.Next() + if err == nil { + var bb strings.Builder + _, err = io.Copy(&bb, r) + if err == nil { + fmt.Fprintf(b, " %d: %s\n", offset, bb.String()) + } + } + if err != nil { + fmt.Fprintf(b, " %s\n", err.Error()) + break + } + } + }() + } + } + var w *failoverWriter + waitForQueueLen := func(t *testing.T, qlen int) { + for { + n := w.q.lenForTesting() + require.LessOrEqual(t, qlen, n) + if qlen != n { + time.Sleep(10 * time.Millisecond) + } else { + return + } + } + } + var nextWALNum NumWAL + queueSemChanCap := 100 + queueSemChan := make(chan struct{}, queueSemChanCap) + countSem := func() int { + i := 0 + for { + select { + case queueSemChan <- struct{}{}: + i++ + default: + return i + } + } + } + var stopper *stopper + var logWriterCreated chan struct{} + var syncs []SyncOptions + // TODO(sumeer): latency injection; test latency measurement. + datadriven.RunTest(t, "testdata/failover_writer", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + // TODO: error injection; choice of initial dir; latency injection. + case "init": + wn := nextWALNum + nextWALNum++ + var err error + stopper = newStopper() + logWriterCreated = make(chan struct{}) + w, err = newFailoverWriter(failoverWriterOpts{ + wn: wn, + preallocateSize: func() int { return 10 }, + queueSemChan: queueSemChan, + stopper: stopper, + writerCreatedForTest: logWriterCreated, + }, dirs[0], nil) + require.NoError(t, err) + <-logWriterCreated + return "" + + case "write": + var synco SyncOptions + var doSync bool + td.ScanArgs(t, "sync", &doSync) + if doSync { + wg := &sync.WaitGroup{} + wg.Add(1) + synco = SyncOptions{ + Done: wg, + Err: new(error), + } + queueSemChan <- struct{}{} + } + syncs = append(syncs, synco) + var value string + td.ScanArgs(t, "value", &value) + require.NoError(t, w.WriteRecord([]byte(value), synco)) + return "" + + case "wait-for-queue": + var qlen int + td.ScanArgs(t, "length", &qlen) + waitForQueueLen(t, qlen) + return "" + + case "switch": + dirIndex := (dirIndex + 1) % 2 + require.NoError(t, w.switchToNewDir(dirs[dirIndex])) + <-logWriterCreated + return "" + + case "close": + var b strings.Builder + // TODO(sumeer): ensure that Close is timely by injecting a FS + // delay that does not end until we signal it to end here + // after the Close returns. + require.NoError(t, w.Close()) + for i := range syncs { + errStr := "no error" + if syncs[i].Done != nil { + // Should already be done. + syncs[i].Done.Wait() + err := *syncs[i].Err + if err != nil { + errStr = err.Error() + } + } + fmt.Fprintf(&b, "sync %d: %s\n", i, errStr) + } + // We expect the Close to complete without stopping all the + // goroutines. But for deterministic log file output we stop all + // goroutines. + stopper.stop() + printLogFiles(&b, nextWALNum-1) + require.Equal(t, queueSemChanCap, countSem()) + syncs = nil + w = nil + dirIndex = 0 + + return b.String() + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/wal/testdata/failover_writer b/wal/testdata/failover_writer new file mode 100644 index 00000000000..785c675de73 --- /dev/null +++ b/wal/testdata/failover_writer @@ -0,0 +1,32 @@ +init +---- + +write sync=true value=woolly +---- + +write sync=false value=sheep +---- + +wait-for-queue length=1 +---- + +switch +---- + +write sync=false value=yak +---- + +close +---- +sync 0: no error +sync 1: no error +sync 2: no error +log files: + pri/000000.log + 0: woolly + 17: sheep + EOF + sec/000000-001.log + 0: sheep + 16: yak + EOF diff --git a/wal/wal.go b/wal/wal.go index d9d08456e3d..1166fbf296d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -27,7 +27,7 @@ type Dir struct { // NumWAL is the number of the virtual WAL. It can map to one or more physical // log files. In standalone mode, it will map to exactly one log file. In // failover mode, it can map to many log files, which are totally ordered -// (using a dense logIndex). +// (using a dense logNameIndex). // // In general, WAL refers to the virtual WAL, and file refers to a log file. // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to @@ -36,14 +36,14 @@ type Dir struct { // the contents of the directories. type NumWAL base.DiskFileNum -// logIndex numbers log files within a WAL. -type logIndex uint32 +// logNameIndex numbers log files within a WAL. +type logNameIndex uint32 // TODO(sumeer): parsing func. And remove attempts to parse log files outside // the wal package (including tools). // makeLogFilename makes a log filename. -func makeLogFilename(wn NumWAL, index logIndex) string { +func makeLogFilename(wn NumWAL, index logNameIndex) string { if index == 0 { // Use a backward compatible name, for simplicity. return base.MakeFilename(base.FileTypeLog, base.DiskFileNum(wn)) @@ -53,9 +53,11 @@ func makeLogFilename(wn NumWAL, index logIndex) string { // Options provides configuration for the Manager. type Options struct { - // Primary dir for storing WAL files. + // Primary dir for storing WAL files. It must already be created and synced + // up to the root. Primary Dir - // Secondary is used for failover. Optional. + // Secondary is used for failover. Optional. It must already be created and + // synced up to the root. Secondary Dir // Recyling configuration. Only files in the primary dir are recycled. @@ -69,8 +71,14 @@ type Options struct { // recycling. MaxNumRecyclableLogs int - // SyncingFileOptions is the configuration when calling vfs.NewSyncingFile. - SyncingFileOpts vfs.SyncingFileOptions + // Configuration for calling vfs.NewSyncingFile. + + // NoSyncOnClose is documented in SyncingFileOptions. + NoSyncOnClose bool + // BytesPerSync is documented in SyncingFileOptions. + BytesPerSync int + // PreallocateSize is documented in SyncingFileOptions. + PreallocateSize func() int // MinSyncInterval is documented in Options.WALMinSyncInterval. MinSyncInterval func() time.Duration @@ -87,6 +95,13 @@ type Options struct { // there is no syncQueue, so the pushback into the commit pipeline is // unnecessary, but possibly harmless. QueueSemChan chan struct{} + + // ElevatedWriteStallThresholdLag is the duration for which an elevated + // threshold should continue after a switch back to the primary dir. + ElevatedWriteStallThresholdLag time.Duration + + // Logger for logging. + Logger base.Logger } // Stats exposes stats used in Pebble metrics. @@ -133,9 +148,14 @@ type Manager interface { // increasing, and be greater than any NumWAL seen earlier. The caller must // close the previous Writer before calling Create. Create(wn NumWAL) (Writer, error) + // ElevateWriteStallThresholdForFailover returns true if the caller should + // use a high write stall threshold because the WALs are being written to + // the secondary dir. + ElevateWriteStallThresholdForFailover() bool // Stats returns the latest Stats. Stats() Stats // Close the manager. + // REQUIRES: Writers and Readers have already been closed. Close() error } @@ -181,5 +201,5 @@ type Reader interface { } // Make lint happy. -var _ logIndex = 0 +var _ logNameIndex = 0 var _ = makeLogFilename