From 2802acdebe8cc6ca5b7adae24761225f67ace974 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Sat, 13 Jan 2024 22:49:15 -0500 Subject: [PATCH] wal,record: failover write path code failover_manager.go contains the failoverManager (which implements wal.Manager) for the write path, and helper classes. dirProber monitors the primary dir when failed over to use the secondary, to decide when to failback to the primary. failoverMonitor uses the latency and error seen by the current *LogWriter, and probing state, to decide when to switch to a different dir. failover_writer.go contains the failoverWriter, that can switch across a sequence of record.LogWriters. record.LogWriter is changed to accommodate both standalone and failover mode, without affecting the synchronization in standalone mode. In failover mode there is some additional synchronization, in the failoverWriter queue, which is not lock free, but is hopefully fast enough given the fastpath will use read locks. Informs #3230 --- record/log_writer.go | 267 +++++++++++-- record/log_writer_test.go | 8 +- wal/failover_manager.go | 448 +++++++++++++++++++++ wal/failover_writer.go | 728 +++++++++++++++++++++++++++++++++++ wal/failover_writer_test.go | 165 ++++++++ wal/testdata/failover_writer | 17 + wal/wal.go | 32 +- 7 files changed, 1625 insertions(+), 40 deletions(-) create mode 100644 wal/failover_manager.go create mode 100644 wal/failover_writer.go create mode 100644 wal/failover_writer_test.go create mode 100644 wal/testdata/failover_writer diff --git a/record/log_writer.go b/record/log_writer.go index 3b1fa775fee..9ed166f390e 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -150,12 +150,14 @@ func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{} *slot.err = err slot.wg = nil slot.err = nil - // We need to bump the tail count before signalling the wait group as - // signalling the wait group can trigger release a blocked goroutine which - // will try to enqueue before we've "freed" space in the queue. + // We need to bump the tail count before releasing the queueSemChan + // semaphore as releasing the semaphore can cause a blocked goroutine to + // acquire the semaphore and enqueue before we've "freed" space in the + // queue. q.headTail.Add(1) wg.Done() - // Is always non-nil in production. + // Is always non-nil in production, unless using wal package for WAL + // failover. if queueSemChan != nil { <-queueSemChan } @@ -164,17 +166,160 @@ func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{} return nil } +// pendingSyncs abstracts out the handling of pending sync requests. In +// standalone mode the implementation is a thin wrapper around syncQueue. In +// the mode where the LogWriter can be subject to failover, there is no queue +// kept in the LogWriter and the signaling to those waiting for sync is +// handled in the wal package. +type pendingSyncs interface { + push(PendingSync) + setBlocked() + clearBlocked() + empty() bool + snapshotForPop() pendingSyncsSnapshot + pop(snap pendingSyncsSnapshot, err error) error +} + +type pendingSyncsSnapshot interface { + empty() bool +} + +// PendingSync abstracts the sync specification for a record queued on the +// LogWriter. The only implementations are provided in this package since +// syncRequested is not exported. +type PendingSync interface { + syncRequested() bool +} + +// The implementation of pendingSyncs in standalone mode. +type pendingSyncsWithSyncQueue struct { + syncQueue + syncQueueLen *base.GaugeSampleMetric + // See the comment for LogWriterConfig.QueueSemChan. + queueSemChan chan struct{} +} + +var _ pendingSyncs = &pendingSyncsWithSyncQueue{} + +func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) { + ps2 := ps.(pendingSyncForSyncQueue) + q.syncQueue.push(ps2.wg, ps2.err) +} + +func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot { + head, tail, realLength := q.syncQueue.load() + snap := syncQueueSnapshot{ + head: head, + tail: tail, + } + q.syncQueueLen.AddSample(int64(realLength)) + return snap +} + +func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error { + s := snap.(syncQueueSnapshot) + return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan) +} + +// The implementation of pendingSyncsSnapshot in standalone mode. +type syncQueueSnapshot struct { + head, tail uint32 +} + +func (s syncQueueSnapshot) empty() bool { + return s.head == s.tail +} + +// The implementation of pendingSync in standalone mode. +type pendingSyncForSyncQueue struct { + wg *sync.WaitGroup + err *error +} + +func (ps pendingSyncForSyncQueue) syncRequested() bool { + return ps.wg != nil +} + +// The implementation of pendingSyncs in failover mode. +type pendingSyncsWithHighestSyncIndex struct { + // The highest "index" queued that is requesting a sync. Initialized + // to NoSyncIndex, and reset to NoSyncIndex after the sync. + index atomic.Int64 + // blocked is an atomic boolean which indicates whether syncing is currently + // blocked or can proceed. It is used by the implementation of + // min-sync-interval to block syncing until the min interval has passed. + blocked atomic.Bool + externalSyncQueueCallback ExternalSyncQueueCallback +} + +// NoSyncIndex is the value of PendingSyncIndex when a sync is not requested. +const NoSyncIndex = -1 + +func (si *pendingSyncsWithHighestSyncIndex) init( + externalSyncQueueCallback ExternalSyncQueueCallback, +) { + si.index.Store(NoSyncIndex) + si.externalSyncQueueCallback = externalSyncQueueCallback +} + +func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) { + ps2 := ps.(PendingSyncIndex) + si.index.Store(ps2.Index) +} + +func (si *pendingSyncsWithHighestSyncIndex) setBlocked() { + si.blocked.Store(true) +} + +func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() { + si.blocked.Store(false) +} + +func (si *pendingSyncsWithHighestSyncIndex) empty() bool { + return si.index.Load() == NoSyncIndex +} + +func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot { + return PendingSyncIndex{Index: si.index.Load()} +} + +func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error { + index := snap.(PendingSyncIndex) + if index.Index == NoSyncIndex { + return nil + } + // Set to NoSyncIndex if a higher index has not queued. + si.index.CompareAndSwap(index.Index, NoSyncIndex) + si.externalSyncQueueCallback(index, err) + return nil +} + +// PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync. +type PendingSyncIndex struct { + // Index is some state meaningful to the user of LogWriter. The LogWriter + // itself only examines whether Index is equal to NoSyncIndex. + Index int64 +} + +func (s PendingSyncIndex) empty() bool { + return s.Index == NoSyncIndex +} + +func (s PendingSyncIndex) syncRequested() bool { + return s.Index != NoSyncIndex +} + // flusherCond is a specialized condition variable that allows its condition to // change and readiness be signalled without holding its associated mutex. In // particular, when a waiter is added to syncQueue atomically, this condition // variable can be signalled without holding flusher.Mutex. type flusherCond struct { mu *sync.Mutex - q *syncQueue + q pendingSyncs cond sync.Cond } -func (c *flusherCond) init(mu *sync.Mutex, q *syncQueue) { +func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) { c.mu = mu c.q = q // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L @@ -286,8 +431,10 @@ type LogWriter struct { minSyncInterval durationFunc fsyncLatency prometheus.Histogram pending []*block - syncQ syncQueue - metrics *LogWriterMetrics + // Pushing and popping from pendingSyncs does not require flusher mutex to + // be held. + pendingSyncs pendingSyncs + metrics *LogWriterMetrics } // afterFunc is a hook to allow tests to mock out the timer functionality @@ -295,8 +442,12 @@ type LogWriter struct { // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer - // See the comment for LogWriterConfig.QueueSemChan. - queueSemChan chan struct{} + // Backing for both pendingSyncs implementations. + // + // TODO(sumeer): do we really care to optimize memory allocations given + // LogWriters are not constructed at a high rate? + pendingSyncsBackingQ pendingSyncsWithSyncQueue + pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex } // LogWriterConfig is a struct used for configuring new LogWriters @@ -308,8 +459,25 @@ type LogWriterConfig struct { // the syncQueue from overflowing (which will cause a panic). All production // code ensures this is non-nil. QueueSemChan chan struct{} + + // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used + // as part of a WAL implementation that can failover between LogWriters. + // + // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must + // be used with a PendingSync parameter that is implemented by + // PendingSyncIndex. When an index is synced (which implies all earlier + // indices are also synced), this callback is invoked. The caller must not + // hold any mutex when invoking this callback, since the lock ordering + // requirement in this case is that any higher layer locks (in the wal + // package) precede the lower layer locks (in the record package). These + // callbacks are serialized since they are invoked from the flushLoop. + ExternalSyncQueueCallback ExternalSyncQueueCallback } +// ExternalSyncQueueCallback is to be run when a PendingSync has been +// processed, either successfully or with an error. +type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error) + // initialAllocatedBlocksCap is the initial capacity of the various slices // intended to hold LogWriter blocks. The LogWriter may allocate more blocks // than this threshold allows. @@ -323,6 +491,9 @@ var blockPool = sync.Pool{ } // NewLogWriter returns a new LogWriter. +// +// The io.Writer may also be used as an io.Closer and syncer. No other methods +// will be called on the writer. func NewLogWriter( w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig, ) *LogWriter { @@ -340,14 +511,25 @@ func NewLogWriter( afterFunc: func(d time.Duration, f func()) syncTimer { return time.AfterFunc(d, f) }, - queueSemChan: logWriterConfig.QueueSemChan, } + m := &LogWriterMetrics{} + if logWriterConfig.ExternalSyncQueueCallback != nil { + r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback) + r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex + } else { + r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{ + syncQueueLen: &m.SyncQueueLen, + queueSemChan: logWriterConfig.QueueSemChan, + } + r.flusher.pendingSyncs = &r.pendingSyncsBackingQ + } + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = blockPool.Get().(*block) - r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) + r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs) r.flusher.closed = make(chan struct{}) r.flusher.pending = make([]*block, 0, cap(r.free.blocks)) - r.flusher.metrics = &LogWriterMetrics{} + r.flusher.metrics = m f := &r.flusher f.minSyncInterval = logWriterConfig.WALMinSyncInterval @@ -423,14 +605,17 @@ func (w *LogWriter) flushLoop(context.Context) { // the current block can be added to the pending blocks list after we release // the flusher lock, but it won't be part of pending. written := w.block.written.Load() - if len(f.pending) > 0 || written > w.block.flushed || !f.syncQ.empty() { + // TODO(sumeer): pendingSyncs.empty() ought to account for whether + // syncing is blocked. Will we spin currently? Also take a careful look + // at flusherCond. + if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() { break } if f.close { // If the writer is closed, pretend the sync timer fired immediately so // that we can process any queued sync requests. - f.syncQ.clearBlocked() - if !f.syncQ.empty() { + f.pendingSyncs.clearBlocked() + if !f.pendingSyncs.empty() { break } return @@ -439,6 +624,18 @@ func (w *LogWriter) flushLoop(context.Context) { continue } // Found work to do, so no longer idle. + // + // NB: it is safe to read pending before loading from the syncQ since + // mutations to pending require the w.flusher mutex, which is held here. + // There is no risk that someone will concurrently add to pending, so the + // following sequence, which would pick up a syncQ entry without the + // corresponding data, is impossible: + // + // Thread enqueueing This thread + // 1. read pending + // 2. add block to pending + // 3. add to syncQ + // 4. read syncQ workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) pending = append(pending[:0], f.pending...) @@ -448,8 +645,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Grab the list of sync waiters. Note that syncQueue.load() will return // 0,0 while we're waiting for the min-sync-interval to expire. This // allows flushing to proceed even if we're not ready to sync. - head, tail, realSyncQLen := f.syncQ.load() - f.metrics.SyncQueueLen.AddSample(int64(realSyncQLen)) + snap := f.pendingSyncs.snapshotForPop() // Grab the portion of the current block that requires flushing. Note that // the current block can be added to the pending blocks list after we @@ -461,25 +657,27 @@ func (w *LogWriter) flushLoop(context.Context) { data := w.block.buf[w.block.flushed:written] w.block.flushed = written + fErr := f.err + f.Unlock() // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. - if f.err != nil { - f.syncQ.pop(head, tail, f.err, w.queueSemChan) + if fErr != nil { + f.pendingSyncs.pop(snap, fErr) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. idleStartTime = time.Now() + f.Lock() continue } - f.Unlock() - synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail) + synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap) f.Lock() if synced && f.fsyncLatency != nil { f.fsyncLatency.Observe(float64(syncLatency)) } f.err = err if f.err != nil { - f.syncQ.clearBlocked() + f.pendingSyncs.clearBlocked() // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -491,10 +689,10 @@ func (w *LogWriter) flushLoop(context.Context) { // A sync was performed. Make sure we've waited for the min sync // interval before syncing again. if min := f.minSyncInterval(); min > 0 { - f.syncQ.setBlocked() + f.pendingSyncs.setBlocked() if syncTimer == nil { syncTimer = w.afterFunc(min, func() { - f.syncQ.clearBlocked() + f.pendingSyncs.clearBlocked() f.ready.Signal() }) } else { @@ -512,7 +710,7 @@ func (w *LogWriter) flushLoop(context.Context) { } func (w *LogWriter) flushPending( - data []byte, pending []*block, head, tail uint32, + data []byte, pending []*block, snap pendingSyncsSnapshot, ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) { defer func() { // Translate panics into errors. The errors will cause flushLoop to shut @@ -535,13 +733,13 @@ func (w *LogWriter) flushPending( _, err = w.w.Write(data) } - synced = head != tail + synced = !snap.empty() if synced { if err == nil && w.s != nil { syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil { + if popErr := f.pendingSyncs.pop(snap, err); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } @@ -661,6 +859,15 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, ) (logSize int64, err2 error) { + return w.SyncRecordGeneralized(p, pendingSyncForSyncQueue{ + wg: wg, + err: err, + }) +} + +// SyncRecordGeneralized is a version of SyncRecord that accepts a +// PendingSync. +func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) { if w.err != nil { return -1, w.err } @@ -673,14 +880,14 @@ func (w *LogWriter) SyncRecord( p = w.emitFragment(i, p) } - if wg != nil { + if ps.syncRequested() { // If we've been asked to persist the record, add the WaitGroup to the sync // queue and signal the flushLoop. Note that flushLoop will write partial // blocks to the file if syncing has been requested. The contract is that // any record written to the LogWriter to this point will be flushed to the // OS and synced to disk. f := &w.flusher - f.syncQ.push(wg, err) + f.pendingSyncs.push(ps) f.ready.Signal() } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 973105af2ae..e8ae523f08a 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -76,11 +76,12 @@ func TestSyncQueue(t *testing.T) { func TestFlusherCond(t *testing.T) { var mu sync.Mutex - var q syncQueue var c flusherCond var closed bool - c.init(&mu, &q) + psq := &pendingSyncsWithSyncQueue{} + c.init(&mu, psq) + q := &psq.syncQueue var flusherWG sync.WaitGroup flusherWG.Add(1) @@ -299,7 +300,8 @@ func TestMinSyncInterval(t *testing.T) { } // NB: we can't use syncQueue.load() here as that will return 0,0 while the // syncQueue is blocked. - head, tail := w.flusher.syncQ.unpack(w.flusher.syncQ.headTail.Load()) + syncQ := w.flusher.pendingSyncs.(*pendingSyncsWithSyncQueue) + head, tail := syncQ.unpack(syncQ.headTail.Load()) waiters := head - tail if waiters != uint32(i+1) { t.Fatalf("expected %d waiters, but found %d", i+1, waiters) diff --git a/wal/failover_manager.go b/wal/failover_manager.go new file mode 100644 index 00000000000..0d2362ae899 --- /dev/null +++ b/wal/failover_manager.go @@ -0,0 +1,448 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "sync" + "time" + + "github.com/cockroachdb/pebble/vfs" + "golang.org/x/exp/rand" +) + +// dirProber probes the primary dir, until it is confirmed to be healthy. If +// it doesn't have enough samples, it is deemed to be unhealthy. It is only +// used for failback to the primary. +type dirProber struct { + fs vfs.FS + filename string + interval time.Duration + buf [100 << 10]byte + enabled chan bool + mu struct { + sync.Mutex + history [probeHistoryLength]time.Duration + firstProbeIndex int + nextProbeIndex int + } +} + +const probeHistoryLength = 128 + +// Large value. +const failedProbeDuration = 24 * 60 * 60 * time.Second + +// There is no close/stop method on the dirProber -- use stopper. +func (p *dirProber) init(fs vfs.FS, filename string, interval time.Duration, stopper *stopper) { + *p = dirProber{ + fs: fs, + filename: filename, + interval: interval, + enabled: make(chan bool), + } + // Random bytes for writing, to defeat any FS compression optimization. + _, err := rand.Read(p.buf[:]) + if err != nil { + panic(err) + } + stopper.runAsync(func() { + p.probeLoop(stopper.shouldQuiesce()) + }) +} + +func (p *dirProber) probeLoop(shouldQuiesce <-chan struct{}) { + ticker := time.NewTicker(p.interval) + ticker.Stop() + done := false + var enabled bool + for !done { + select { + case <-ticker.C: + if !enabled { + // Could have a tick waiting before we disabled it. Ignore. + continue + } + probeDur := func() time.Duration { + // Delete, create, write, sync. + start := time.Now() + _ = p.fs.Remove(p.filename) + f, err := p.fs.Create(p.filename) + if err != nil { + return failedProbeDuration + } + defer f.Close() + n, err := f.Write(p.buf[:]) + if err != nil { + return failedProbeDuration + } + if n != len(p.buf) { + panic("invariant violation") + } + err = f.Sync() + if err != nil { + return failedProbeDuration + } + return time.Since(start) + }() + p.mu.Lock() + nextIndex := p.mu.nextProbeIndex % probeHistoryLength + p.mu.history[nextIndex] = probeDur + firstIndex := p.mu.firstProbeIndex % probeHistoryLength + if firstIndex == nextIndex { + // Wrapped around + p.mu.firstProbeIndex++ + } + p.mu.nextProbeIndex++ + p.mu.Unlock() + + case <-shouldQuiesce: + done = true + + case enabled = <-p.enabled: + if enabled { + ticker.Reset(p.interval) + } else { + ticker.Stop() + p.mu.Lock() + p.mu.firstProbeIndex = 0 + p.mu.nextProbeIndex = 0 + p.mu.Unlock() + } + } + } +} + +func (p *dirProber) enableProbing() { + p.enabled <- true +} + +func (p *dirProber) disableProbing() { + p.enabled <- false +} + +func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex + samplesNeeded := int((interval + p.interval - 1) / p.interval) + if samplesNeeded == 0 { + panic("interval is too short") + } else if samplesNeeded > probeHistoryLength { + panic("interval is too long") + } + if samplesNeeded < numSamples { + return failedProbeDuration, failedProbeDuration + } + offset := numSamples - samplesNeeded + var sum, max time.Duration + for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ { + sampleDur := p.mu.history[i%probeHistoryLength] + sum += sampleDur + if max < sampleDur { + max = sampleDur + } + } + mean := sum / time.Duration(samplesNeeded) + return mean, max +} + +type dirIndex int + +const ( + primaryDirIndex dirIndex = iota + secondaryDirIndex + numDirIndices +) + +type failoverMonitorOptions struct { + dirs [numDirIndices]Dir + + primaryDirProbeInterval time.Duration + healthyProbeLatencyThreshold time.Duration + healthyInterval time.Duration + + unhealthySamplingInterval time.Duration + unhealthyOperationLatencyThreshold time.Duration + + elevatedWriteStallThresholdLag time.Duration + + stopper *stopper +} + +// switchableWriter is a subset of failoverWriter needed by failoverMonitor. +type switchableWriter interface { + switchToNewDir(dir Dir) error + ongoingLatencyOrErrorForCurDir() (time.Duration, error) +} + +// failoverMonitor monitors the latency and error observed by the +// switchableWriter, and does failover by switching the dir. It also monitors +// the primary dir for failback. +type failoverMonitor struct { + opts failoverMonitorOptions + prober dirProber + mu struct { + sync.Mutex + // dirIndex and lastFailbackTime are only modified by monitorLoop. + dirIndex + lastFailBackTime time.Time + writer switchableWriter + } +} + +func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor { + m := &failoverMonitor{ + opts: opts, + } + m.prober.init(opts.dirs[primaryDirIndex].FS, + opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"), + opts.primaryDirProbeInterval, opts.stopper) + opts.stopper.runAsync(func() { + m.monitorLoop(opts.stopper.shouldQuiesce()) + }) + return m +} + +// Called when previous writer is closed +func (m *failoverMonitor) noWriter() { + m.mu.Lock() + defer m.mu.Unlock() + m.mu.writer = nil +} + +// writerCreateFunc is allowed to return nil. +func (m *failoverMonitor) newWriter(writerCreateFunc func(dir Dir) switchableWriter) { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.writer != nil { + panic("previous writer not closed") + } + m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex]) +} + +func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.dirIndex == secondaryDirIndex { + return true + } + intervalSinceFailedback := time.Since(m.mu.lastFailBackTime) + return intervalSinceFailedback > m.opts.elevatedWriteStallThresholdLag +} + +type lastWriterInfo struct { + writer switchableWriter + numSwitches int + ongoingLatencyAtSwitch time.Duration + errorCounts [numDirIndices]int +} + +func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) { + ticker := time.NewTicker(m.opts.unhealthySamplingInterval) + dirIndex := primaryDirIndex + var lastWriter lastWriterInfo + for { + select { + case <-shouldQuiesce: + return + case <-ticker.C: + writerOngoingLatency, writerErr := func() (time.Duration, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.writer != lastWriter.writer { + lastWriter = lastWriterInfo{writer: m.mu.writer} + } + if lastWriter.writer == nil { + return 0, nil + } + return lastWriter.writer.ongoingLatencyOrErrorForCurDir() + }() + switchDir := false + // We don't consider a switch if currently using the primary dir and the + // secondary dir has high enough errors. It is more likely that someone + // has misconfigured a secondary e.g. wrong permissions or not enough + // disk space. We only remember the error history in the context of the + // lastWriter since an operator can fix the underlying misconfiguration. + if !(lastWriter.errorCounts[secondaryDirIndex] > 2 && dirIndex == primaryDirIndex) { + // Switching heuristics. Subject to change based on real world experience. + if writerErr != nil { + // An error causes an immediate switch, since a *LogWriter with an error is useless. + lastWriter.errorCounts[dirIndex]++ + switchDir = true + + } else if writerOngoingLatency > m.opts.unhealthyOperationLatencyThreshold { + // High latency. Switch immediately two times, since that gives us an + // observation of both dirs. After that, decay the switch rate by + // increasing the observed latency needed for a switch. + if lastWriter.numSwitches < 2 || writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch { + switchDir = true + lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency + } + // Else high latency, but not high enough yet to motivate switch. + } else if dirIndex == secondaryDirIndex { + // The writer looks healthy. We can still switch if the writer is using the + // secondary dir and the primary is healthy again. + primaryMean, primaryMax := m.prober.getMeanMax(m.opts.healthyInterval) + if primaryMean < m.opts.healthyProbeLatencyThreshold && primaryMax < m.opts.healthyProbeLatencyThreshold { + switchDir = true + } + } + } + if switchDir { + lastWriter.numSwitches++ + if dirIndex == secondaryDirIndex { + // Switching back to primary, so don't need to probe to see if + // primary is healthy. + m.prober.disableProbing() + dirIndex = primaryDirIndex + } else { + m.prober.enableProbing() + dirIndex = secondaryDirIndex + } + dir := m.opts.dirs[dirIndex] + m.mu.Lock() + m.mu.dirIndex = dirIndex + if dirIndex == primaryDirIndex { + m.mu.lastFailBackTime = time.Now() + } + if m.mu.writer != nil { + m.mu.writer.switchToNewDir(dir) + } + m.mu.Unlock() + } + } + } +} + +type failoverManager struct { + opts Options + // TODO(jackson/sumeer): read-path etc. + + stopper *stopper + monitor *failoverMonitor +} + +var _ Manager = &failoverManager{} + +// Init implements Manager. +func (wm *failoverManager) Init(o Options) error { + stopper := newStopper() + fmOpts := failoverMonitorOptions{ + dirs: [numDirIndices]Dir{o.Primary, o.Secondary}, + // TODO(sumeer): make configurable. + primaryDirProbeInterval: time.Second, + healthyProbeLatencyThreshold: 100 * time.Millisecond, + healthyInterval: 2 * time.Minute, + unhealthySamplingInterval: 100 * time.Millisecond, + unhealthyOperationLatencyThreshold: 200 * time.Millisecond, + elevatedWriteStallThresholdLag: o.ElevatedWriteStallThresholdLag, + stopper: stopper, + } + monitor := newFailoverMonitor(fmOpts) + // TODO(jackson): list dirs and assemble a list of all NumWALs and + // corresponding log files. + + *wm = failoverManager{ + opts: o, + stopper: stopper, + monitor: monitor, + } + return nil +} + +// List implements Manager. +func (wm *failoverManager) List() ([]NumWAL, error) { + // TODO(jackson): + return nil, nil +} + +// Delete implements Manager. +func (wm *failoverManager) Delete(highestObsoleteNum NumWAL) error { + // TODO(sumeer): + return nil +} + +// OpenForRead implements Manager. +func (wm *failoverManager) OpenForRead(wn NumWAL, strictWALTail bool) (Reader, error) { + // TODO(jackson): + return nil, nil +} + +// Create implements Manager. +func (wm *failoverManager) Create(wn NumWAL) (Writer, error) { + fwOpts := failoverWriterOpts{ + wn: wn, + logger: wm.opts.Logger, + noSyncOnClose: wm.opts.NoSyncOnClose, + bytesPerSync: wm.opts.BytesPerSync, + preallocateSize: wm.opts.PreallocateSize, + minSyncInterval: wm.opts.MinSyncInterval, + fsyncLatency: wm.opts.FsyncLatency, + queueSemChan: wm.opts.QueueSemChan, + stopper: wm.stopper, + } + var err error + var ww *failoverWriter + writerCreateFunc := func(dir Dir) switchableWriter { + ww, err = newFailoverWriter(fwOpts, dir, wm) + if err != nil { + return nil + } + return ww + } + wm.monitor.newWriter(writerCreateFunc) + return ww, err +} + +// ElevateWriteStallThresholdForFailover implements Manager. +func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool { + return wm.monitor.elevateWriteStallThresholdForFailover() +} + +func (wm *failoverManager) writerClosed() { + wm.monitor.noWriter() +} + +// Stats implements Manager. +func (wm *failoverManager) Stats() Stats { + // TODO(sumeer): + return Stats{} +} + +// Close implements Manager. +func (wm *failoverManager) Close() error { + wm.stopper.stop() + return nil +} + +type stopper struct { + quiescer chan struct{} // Closed when quiescing + wg sync.WaitGroup +} + +func newStopper() *stopper { + return &stopper{ + quiescer: make(chan struct{}), + } +} + +func (s *stopper) runAsync(f func()) { + s.wg.Add(1) + go func() { + f() + s.wg.Done() + }() +} + +// shouldQuiesce returns a channel which will be closed when stop() has been +// invoked and outstanding goroutines should begin to quiesce. +func (s *stopper) shouldQuiesce() <-chan struct{} { + return s.quiescer +} + +func (s *stopper) stop() { + close(s.quiescer) + s.wg.Wait() +} diff --git a/wal/failover_writer.go b/wal/failover_writer.go new file mode 100644 index 00000000000..41ac716a3dc --- /dev/null +++ b/wal/failover_writer.go @@ -0,0 +1,728 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "io" + "sync" + "sync/atomic" + "time" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/prometheus/client_golang/prometheus" +) + +// TODO(sumeer): remove these notes after ensuring they are done, and +// incorporated into code comments. +// +// Misc temporary notes: +// +// recordQueue size: should we inject fsyncs to keep the memory bounded? +// Doesn't seem necessary for CRDB since there will be frequent fsyncs due to +// raft log appends. Also, it can't grow beyond the memtable size (though +// failing over 64MB, the memtable size, will not be fast). +// +// If switch to secondary failed because of misconfiguration, we should switch +// back to primary and make a note of it. If primary is still unavailable we +// would switch again, so repeated switching which is bad. Misconfiguration +// should result in an error. Consider an error as worse than slowness, so +// don't switch back? +// +// recycle: recycle only those in primary dir. Keep track of those where +// record.logwriter closed. Those are the only ones we can delete or recycle. +// +// Gap and de-duping logic on read: +// Gap can be there in adjacent logs but not between log n and the union of logs 0..n-1. +// Entries 0..50 sent to log 0. +// Entries 0..60 sent to log 1 +// Log 0 commits up to 50 +// Entries 50..100 sent to log 2 +// Log 2 commits all if these. +// Log 1 only commits 0..10. +// +// Write stall threshold: Decay the write stall threshold slowly after +// switching back to primary dir, as flush may take some time. Say 10 +// memtables. Could take 20s to flush. Good thing is that it will only add 1 +// sublevel. + +// recordQueueEntry is an entry in recordQueue. +type recordQueueEntry struct { + p []byte + opts SyncOptions +} + +const initialBufferLen = 8192 + +// recordQueue is a variable size single producer multiple consumer queue. It +// is not lock-free, but most operations only need mu.RLock. It needs a mutex +// to grow the size, since there is no upper bound on the number of queued +// records (which are all the records that are not synced, and will need to be +// written again in case of failover). Additionally, it needs a mutex to +// atomically grab a snapshot of the queued records and provide them to a new +// LogWriter that is being switched to. +type recordQueue struct { + // Only held for reading for all pop operations and most push operations. + // Held for writing when buffer needs to be grown or when switching to a new + // writer. + mu sync.RWMutex + + // queue is [tail, head). tail is the oldest entry and head is the index for + // the next entry. + // + // Consumers: atomically read and write tail in pop (using + // compare-and-swap). This is not the usual kind of queue consumer since + // they already know the index that they are popping exists, hence don't + // need to look at head. + // + // Producer: atomically reads tail in push. Writes to head. + // + // Based on the above we only need tail to be atomic. However, the producer + // also populates entries in buffer, whose values need to be seen by the + // consumers when doing a pop, which means they need to synchronize using a + // release and acquire memory barrier pair, where the push does the release + // and the pop does the acquire. For this reason we make head also atomic + // and merge head and tail into a single atomic, so that the store of head + // in push and the load of tail in pop accomplishes this release-acquire + // pair. + // + // All updates to headTail hold mu at least for reading. So when mu is held + // for writing, there is a guarantee that headTail is not being updated. + // + // head is most-significant 32 bits and tail is least-significant 32 bits. + headTail atomic.Uint64 + + // Access to buffer requires at least RLock. + buffer []recordQueueEntry + + lastTailObservedByProducer uint32 + + // Protected by mu. + writer *record.LogWriter +} + +func (q *recordQueue) init() { + *q = recordQueue{ + buffer: make([]recordQueueEntry, initialBufferLen), + } +} + +// NB: externally synchronized, i.e., no concurrent push calls. +func (q *recordQueue) push(p []byte, opts SyncOptions) (index uint32, writer *record.LogWriter) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if len(q.buffer) == n { + // Full + m := 2 * n + newBuffer := make([]recordQueueEntry, m) + for i := int(t); i < int(h); i++ { + newBuffer[i%m] = q.buffer[i%n] + } + q.mu.Lock() + q.buffer = newBuffer + q.mu.Unlock() + } + q.mu.RLock() + q.buffer[h] = recordQueueEntry{ + p: p, + opts: opts, + } + // Reclaim memory for consumed entries. We couldn't do that in pop since + // multiple consumers are popping using CAS and that immediately transfers + // ownership to the producer. + for i := q.lastTailObservedByProducer; i < t; i++ { + q.buffer[i] = recordQueueEntry{} + } + q.lastTailObservedByProducer = t + q.headTail.Add(1 << headTailBits) + writer = q.writer + q.mu.RUnlock() + return h, writer +} + +// Pops all entries. Must be called only after the last push returns. +func (q *recordQueue) popAll() (numRecords int, numSyncsPopped int) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if n == 0 { + return 0, 0 + } + return n, q.pop(h-1, nil) +} + +// Pops all entries up to and including index. The remaining queue is +// [index+1, head). +// +// NB: we could slightly simplify to only have the latest writer be able to +// pop. This would avoid the CAS below, but it seems better to reduce the +// amount of queued work regardless of who has successfully written it. +func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) { + var buf [512]SyncOptions + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + tail := int(t) + maxEntriesToPop := int(index) - tail + 1 + if maxEntriesToPop <= 0 { + return 0 + } + var b []SyncOptions + if maxEntriesToPop <= len(buf) { + b = buf[:maxEntriesToPop] + } else { + b = make([]SyncOptions, maxEntriesToPop) + } + // Allocations were done before acquiring the mutex. + q.mu.RLock() + for i := 0; i < maxEntriesToPop; i++ { + // Grab all the possible entries before doing CAS, since successful CAS + // will also release those buffer slots to the producer. + n := len(q.buffer) + b[i] = q.buffer[(i+tail)%n].opts + } + for { + newHT := makeHeadTail(h, index+1) + if q.headTail.CompareAndSwap(ht, newHT) { + break + } + ht := q.headTail.Load() + h, t = unpackHeadTail(ht) + tail = int(t) + maxEntriesToPop = int(index) - tail + 1 + if maxEntriesToPop <= 0 { + break + } + } + q.mu.RUnlock() + + numEntriesPopped := maxEntriesToPop + if numEntriesPopped <= 0 { + return 0 + } + n := len(b) + for i := n - numEntriesPopped; i < n; i++ { + if b[i].Done != nil { + numSyncsPopped++ + if err != nil { + *b[i].Err = err + } + b[i].Done.Done() + } + } + return numSyncsPopped +} + +func (q *recordQueue) snapshotAndSwitchWriter( + writer *record.LogWriter, snapshotFunc func(firstIndex uint32, entries []recordQueueEntry), +) { + q.mu.Lock() + defer q.mu.Unlock() + q.writer = writer + h, t := unpackHeadTail(q.headTail.Load()) + n := h - t + if n > 0 { + m := uint32(len(q.buffer)) + b := make([]recordQueueEntry, n) + for i := t; i < h; i++ { + b[i-t] = q.buffer[i%m] + } + snapshotFunc(t, b) + } +} + +const headTailBits = 32 + +func unpackHeadTail(ht uint64) (head, tail uint32) { + const mask = 1<> headTailBits) & mask) + tail = uint32(ht & mask) + return head, tail +} + +func makeHeadTail(head, tail uint32) uint64 { + return (uint64(head) << headTailBits) | uint64(tail) +} + +// Maximum number of physical log files when writing a virtual WAL. Arbitrarily +// chosen value. Setting this to 2 will not simplify the code. We make this a +// constant since we want a fixed size array for writer.writers. +const maxPhysicalLogs = 20 + +// failoverWriter is the implementation of Writer in failover mode. No Writer +// method blocks for IO, except for Close. Close will block until all records +// are successfully written and synced to some log writer. Monitoring of log +// writer latency and errors continues after Close is called, which means +// failoverWriter can be switched to a new log writer after Close is called, +// so as to unblock Close. +type failoverWriter struct { + wm *failoverManager + opts failoverWriterOpts + q recordQueue + writers [maxPhysicalLogs]logWriterAndRecorder + mu struct { + sync.Mutex + // cond is signaled when the latest *LogWriter is set in writers, or when + // what was probably the latest *LogWriter is successfully closed. It is + // waited on in Close. + cond *sync.Cond + // nextWriterIndex is advanced before creating the *LogWriter. That is, a + // slot is reserved by taking the current value of nextWriterIndex and + // incrementing it, and then the *LogWriter for that slot is created. When + // newFailoverWriter returns, nextWriterIndex = 1. + // + // The latest *LogWriter is (will be) at nextWriterIndex-1. + // + // INVARIANT: nextWriterIndex <= len(writers) + nextWriterIndex logNameIndex + closed bool + } +} + +type logWriterAndRecorder struct { + // This may never become non-nil, if when the LogWriter was finally created, + // it was no longer the latest writer. Additionally, if there was an error + // in creating the writer, w will remain nil and createError will be set. + w *record.LogWriter + // createError is set if there is an error creating the writer. This is + // useful in Close since we need to know when the work for creating the + // latest writer is done, whether it resulted in success or not. + createError error + r latencyAndErrorRecorder +} + +var _ Writer = &failoverWriter{} + +type failoverWriterOpts struct { + wn NumWAL + logger base.Logger + + // Options that feed into SyncingFileOptions. + noSyncOnClose bool + bytesPerSync int + preallocateSize func() int + + // Options for record.LogWriter. + minSyncInterval func() time.Duration + fsyncLatency prometheus.Histogram + queueSemChan chan struct{} + stopper *stopper + + writerCreatedForTest chan<-struct{} +} + +func newFailoverWriter( + opts failoverWriterOpts, initialDir Dir, wm *failoverManager, +) (*failoverWriter, error) { + ww := &failoverWriter{ + wm: wm, + opts: opts, + } + ww.q.init() + ww.mu.cond = sync.NewCond(&ww.mu) + // The initial record.LogWriter creation also happens via a + // switchToNewWriter since we don't want it to block newFailoverWriter. + err := ww.switchToNewDir(initialDir) + if err != nil { + return nil, err + } + return ww, nil +} + +// Size implements Writer. +func (ww *failoverWriter) Size() uint64 { + // TODO(sumeer): + return 0 +} + +// FileSize implements Writer. +func (ww *failoverWriter) FileSize() uint64 { + // TODO(sumeer): + return 0 +} + +// WriteRecord implements Writer. +func (ww *failoverWriter) WriteRecord(p []byte, opts SyncOptions) error { + recordIndex, writer := ww.q.push(p, opts) + if writer == nil { + // Don't have a record.LogWriter yet. + return nil + } + ps := record.PendingSyncIndex{Index: record.NoSyncIndex} + if opts.Done != nil { + ps.Index = int64(recordIndex) + } + _, err := writer.SyncRecordGeneralized(p, ps) + return err +} + +// switchToNewDir starts switching to dir. It implements switchableWriter. +func (ww *failoverWriter) switchToNewDir(dir Dir) error { + ww.mu.Lock() + // Can have a late switchToNewDir call is the failoverMonitor has not yet + // been told that the writer is closed. Ignore. + if ww.mu.closed { + return nil + } + writerIndex := ww.mu.nextWriterIndex + if int(writerIndex) == len(ww.writers) { + ww.mu.Unlock() + return errors.Errorf("exceeded switching limit") + } + ww.mu.nextWriterIndex++ + ww.mu.Unlock() + + ww.opts.stopper.runAsync(func() { + // TODO(sumeer): recycling of logs. + filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex)) + recorderAndWriter := &ww.writers[writerIndex].r + recorderAndWriter.writeStart() + file, err := dir.FS.Create(filename) + recorderAndWriter.writeEnd(err) + // TODO(sumeer): should we fatal if primary dir? At some point it is better + // to fatal instead of continuing to failover. + // base.MustExist(dir.FS, filename, ww.opts.logger, err) + if err != nil { + ww.mu.Lock() + defer ww.mu.Unlock() + ww.writers[writerIndex].createError = err + ww.mu.cond.Signal() + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return + } + syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{ + NoSyncOnClose: ww.opts.noSyncOnClose, + BytesPerSync: ww.opts.bytesPerSync, + PreallocateSize: ww.opts.preallocateSize(), + }) + recorderAndWriter.setWriter(syncingFile) + + // Using NumWAL as the DiskFileNum is fine since it is used only as + // EOF trailer for safe log recycling. Even though many log files can + // map to a single NumWAL, a file used for NumWAL n at index m will + // never get recycled for NumWAL n at a later index (since recycling + // happens when n as a whole is obsolete). + w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn), + record.LogWriterConfig{ + WALMinSyncInterval: ww.opts.minSyncInterval, + WALFsyncLatency: ww.opts.fsyncLatency, + QueueSemChan: ww.opts.queueSemChan, + ExternalSyncQueueCallback: ww.doneSyncCallback, + }) + closeWriter := func() bool { + ww.mu.Lock() + defer ww.mu.Unlock() + if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed { + // Not the latest writer or the writer was closed while this async + // creation was ongoing. + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return true + } + // Latest writer. + ww.writers[writerIndex].w = w + ww.mu.cond.Signal() + // NB: snapshotAndSwitchWriter does not block on IO, since + // SyncRecordGeneralized does no IO. + ww.q.snapshotAndSwitchWriter(w, func(firstIndex uint32, entries []recordQueueEntry) { + for i := range entries { + ps := record.PendingSyncIndex{Index: record.NoSyncIndex} + if entries[i].opts.Done != nil { + ps.Index = int64(firstIndex) + int64(i) + } + _, err := w.SyncRecordGeneralized(entries[i].p, ps) + if err != nil { + // TODO(sumeer): log periodically. The err will also surface via + // the latencyAndErrorRecorder, so if a switch is possible, it + // will be done. + ww.opts.logger.Errorf("%s", err) + } + } + }) + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return false + }() + if closeWriter { + // Never wrote anything to this writer so don't care about the + // returned error. + ww.opts.stopper.runAsync(func() { + _ = w.Close() + }) + } + }) + return nil +} + +// doneSyncCallback is the record.ExternalSyncQueueCallback called by +// record.LogWriter. +func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) { + // NB: harmless after Close returns since numSyncsPopped will be 0. + numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err) + if ww.opts.queueSemChan != nil { + for i := 0; i < numSyncsPopped; i++ { + <-ww.opts.queueSemChan + } + } +} + +// ongoingLatencyOrErrorForCurDir implements switchableWriter. +func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) { + ww.mu.Lock() + defer ww.mu.Unlock() + if ww.mu.closed { + return 0, nil + } + return ww.writers[ww.mu.nextWriterIndex-1].r.ongoingLatencyOrError() +} + +// Close implements Writer. +// +// NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be +// called after Close is called, and there is also a possibility that they get +// called after Close returns and before failoverMonitor knows that the +// failoverWriter is closed. +// +// doneSyncCallback can be called anytime after Close returns since there +// could be stuck writes that finish arbitrarily later. +func (ww *failoverWriter) Close() error { + // [0, closeCalledCount) have had LogWriter.Close called (though may not + // have finished) or the LogWriter will never be non-nil. Either way, they + // have been "processed". + closeCalledCount := logNameIndex(0) + done := false + var err error + ww.mu.Lock() + // Every iteration starts and ends with the mutex held. + // + // Invariant: ww.mu.nextWriterIndex >= 1. + // + // TODO(sumeer): write a high level comment for the logic below. + for !done { + numWriters := ww.mu.nextWriterIndex + // Want to process [closeCalledCount, numWriters). + // Invariant: numWriters - closeCalledCount >= 1. + if numWriters-closeCalledCount <= 0 { + panic("invariant violation") + } + // Wait until writers[numWriters-1] is either created or has a creation + // error or numWriters advances. We are waiting on IO here, since failover + // is continuing to happen if IO is taking too long. If we run out of + // maxPhysicalLogs, then we will truly block on IO here, but there is + // nothing that can be done in that case. Note that there is a very rare + // case that the recordQueue is already empty, and waiting is unnecessary, + // but so be it. + for { + if ww.writers[numWriters-1].w == nil && ww.writers[numWriters-1].createError == nil { + ww.mu.cond.Wait() + } else { + break + } + numWriters = ww.mu.nextWriterIndex + } + // Invariant: [closeCalledCount, numWriters) have their *LogWriters in the + // final state (the createError may not be in its final state): nil and + // will stay nil, or non-nil. Additionally, numWriters-1 creation has + // completed, so both the *LogWriter and createError is in its final + // state. + + // Unlock, so monitoring and switching can continue happening. + ww.mu.Unlock() + // Process [closeCalledCount, numWriters). + + // Iterate over everything except the latest writer, i.e., + // [closeCalledCount, numWriters-1). These are either non-nil, or will + // forever stay nil. From the previous invariant, we know that + // numWriters-1-closeCalledCount >= 0. + for i := closeCalledCount; i < numWriters-1; i++ { + w := ww.writers[i].w + if w != nil { + // Don't care about the returned error since all the records we relied + // on this writer for were already successfully written. + ww.opts.stopper.runAsync(func() { + _ = w.Close() + }) + } + } + closeCalledCount = numWriters - 1 + ww.mu.Lock() + numWriters = ww.mu.nextWriterIndex + if closeCalledCount < numWriters-1 { + // Haven't processed some non-latest writers. Process them first. + continue + } + // Latest writer, for now. + latestWriterIndex := closeCalledCount + w := ww.writers[latestWriterIndex].w + createErr := ww.writers[latestWriterIndex].createError + closedLatest := false + var closeErr error + if w != nil || createErr != nil { + closeCalledCount++ + if createErr == nil { + if w == nil { + panic("invariant violation") + } + ww.opts.stopper.runAsync(func() { + // Write to iteration local variable closeErr, since we may write to + // a different closeErr in a different iteration. We only read this + // value in the same iteration (below). + closeErr = w.Close() + closedLatest = true + ww.mu.Lock() + ww.mu.cond.Signal() + ww.mu.Unlock() + }) + } else { + if len(ww.writers) == int(latestWriterIndex+1) { + // There are no more slots left. If there are any records queued, we + // have no guarantee that they were written, so propagate the error. + numRecords, _ := ww.q.popAll() + err = createErr + if numRecords == 0 { + err = nil + } + // Exit all loops. + done = true + } + } + } + // Inner loop that exits when: + // - w != nil and w is closed (by the preceding code). + // + // - latestWriterIndex is no longer the latest writer index (this is also + // the case when w is nil due to a createErr). + for !done { + ww.mu.cond.Wait() + if latestWriterIndex < ww.mu.nextWriterIndex-1 { + // Doesn't matter if closed or not, since no longer the latest. + break + // Continue outer for loop. + } + // Still the latest writer. + if closedLatest { + done = true + ww.mu.closed = true + if closeErr == nil { + _, numSyncsPopped := ww.q.popAll() + if numSyncsPopped != 0 { + // The client requested syncs are required to be popped by the + // record.LogWriter. The only records we expect to pop now are the + // tail that did not request a sync, and have been successfully + // synced implicitly by the record.LogWriter. + // + // NB: popAll is not really necessary for correctness. We do this + // to free memory. + panic(errors.AssertionFailedf( + "%d syncs not popped by the record.LogWriter", numSyncsPopped)) + + } + } else { + // If there are any records queued, we have no guarantee that they + // were written, so propagate the error. + numRecords, _ := ww.q.popAll() + err = closeErr + if numRecords == 0 { + err = nil + } + } + // Exit all loops. + break + } else { + // Nothing happened. Either w was nil, and we are still waiting for + // the next writer, or w was not nil, and we are still waiting for + // the close to finish. + continue + } + } + } + ww.mu.Unlock() + // Only nil in some unit tests. + if ww.wm != nil { + ww.wm.writerClosed() + } + return err +} + +func (ww *failoverWriter) Metrics() *record.LogWriterMetrics { + // TODO(sumeer): + return nil +} + +// latencyAndErrorRecorder records ongoing write and sync operations and errors +// in those operations. record.LogWriter cannot continue functioning after any +// error, so all errors are considered permanent. +// +// writeStart/writeEnd are used directly when creating a file. After the file +// is successfully created, setWriter turns latencyAndErrorRecorder into an +// implementation of writerSyncerCloser that will record for the Write and +// Sync methods. +type latencyAndErrorRecorder struct { + ongoingOperationStart atomic.Int64 + error atomic.Pointer[error] + writerSyncerCloser +} + +type writerSyncerCloser interface { + io.Writer + io.Closer + Sync() error +} + +func (r *latencyAndErrorRecorder) writeStart() { + r.ongoingOperationStart.Store(time.Now().UnixNano()) +} + +func (r *latencyAndErrorRecorder) writeEnd(err error) { + if err != nil { + ptr := &err + r.error.Store(ptr) + } + r.ongoingOperationStart.Store(0) +} + +func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) { + r.writerSyncerCloser = w +} + +func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) { + startTime := r.ongoingOperationStart.Load() + var latency time.Duration + if startTime != 0 { + l := time.Now().UnixNano() - startTime + if l < 0 { + l = 0 + } + latency = time.Duration(l) + } + errPtr := r.error.Load() + var err error + if errPtr != nil { + err = *errPtr + } + return latency, err +} + +// Sync implements writerSyncerCloser. +func (r *latencyAndErrorRecorder) Sync() error { + r.writeStart() + err := r.writerSyncerCloser.Sync() + r.writeEnd(err) + return err +} + +// Write implements io.Writer. +func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) { + r.writeStart() + n, err = r.writerSyncerCloser.Write(p) + r.writeEnd(err) + return n, err +} diff --git a/wal/failover_writer_test.go b/wal/failover_writer_test.go new file mode 100644 index 00000000000..0d361a6e450 --- /dev/null +++ b/wal/failover_writer_test.go @@ -0,0 +1,165 @@ +package wal + +import ( + "fmt" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" + "io" + "slices" + "strings" + "sync" + "testing" +) + +func TestFailoverWriter(t *testing.T) { + memFS := vfs.NewStrictMem() + dirs := []Dir{{FS: memFS, Dirname: "pri"}, {FS: memFS, Dirname: "sec"}} + for _, dir := range dirs { + require.NoError(t, dir.FS.MkdirAll(dir.Dirname, 0755)) + } + dirIndex := 0 + + type filenameAndFS struct { + name string + fs vfs.FS + } + printLogFiles := func(b *strings.Builder, num NumWAL) { + var filenames []filenameAndFS + prefix := base.DiskFileNum(num).String() + for i := range dirs { + fns, err := dirs[i].FS.List(dirs[i].Dirname) + require.NoError(t, err) + for _, fn := range fns { + if strings.HasPrefix(fn, prefix) { + filenames = append(filenames, filenameAndFS{ + name: dirs[i].FS.PathJoin(dirs[i].Dirname, fn), + fs: dirs[i].FS, + }) + } + } + } + slices.SortFunc(filenames, func(a, b filenameAndFS) int { + return strings.Compare(a.name, b.name) + }) + if len(filenames) > 0 { + fmt.Fprintf(b, "log files:\n") + } + for _, fn := range filenames { + fmt.Fprintf(b, " %s\n", fn.name) + func() { + f, err := fn.fs.Open(fn.name) + require.NoError(t, err) + defer f.Close() + rr := record.NewReader(f, base.DiskFileNum(num)) + for { + offset := rr.Offset() + r, err := rr.Next() + if err == nil { + var bb strings.Builder + _, err = io.Copy(&bb, r) + if err == nil { + fmt.Fprintf(b, " %d: %s\n", offset, bb.String()) + } + } + if err != nil { + fmt.Fprintf(b, " %s\n", err.Error()) + break + } + } + }() + } + } + var w *failoverWriter + var nextWALNum NumWAL + queueSemChanCap := 100 + queueSemChan := make(chan struct{}, queueSemChanCap) + countSem := func() int { + i := 0 + for { + select { + case queueSemChan <- struct{}{}: i++ + default: return i + } + } + } + var stopper *stopper + var logWriterCreated chan struct{} + var syncs []SyncOptions + datadriven.RunTest(t, "testdata/failover_writer", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + // TODO: error injection; choice of initial dir; latency injection. + case "init": + wn := nextWALNum + nextWALNum++ + var err error + stopper = newStopper() + logWriterCreated = make(chan struct{}) + w, err = newFailoverWriter(failoverWriterOpts{ + wn: wn, + preallocateSize: func() int { return 10 }, + queueSemChan: queueSemChan, + stopper: stopper, + writerCreatedForTest: logWriterCreated, + }, dirs[0], nil) + require.NoError(t, err) + <-logWriterCreated + return "" + + case "write": + var synco SyncOptions + if td.HasArg("sync") { + wg := &sync.WaitGroup{} + wg.Add(1) + synco = SyncOptions{ + Done: wg, + Err: new(error), + } + queueSemChan <- struct{}{} + } + syncs = append(syncs, synco) + var value string + td.ScanArgs(t, "value", &value) + require.NoError(t, w.WriteRecord([]byte(value), synco)) + return "" + + case "switch": + dirIndex := (dirIndex+1)%2 + require.NoError(t, w.switchToNewDir(dirs[dirIndex])) + <-logWriterCreated + return "" + + case "close": + var b strings.Builder + require.NoError(t, w.Close()) + for i := range syncs { + if syncs[i].Done != nil { + // Should already be done. + syncs[i].Done.Wait() + } + errStr := "no error" + if *syncs[i].Err != nil { + errStr = (*syncs[i].Err).Error() + } + fmt.Fprintf(&b, "sync %d: %s\n", i, errStr) + } + // We expect the Close to complete without stopping all the + // goroutines. But for deterministic log file output we stop all + // goroutines. + stopper.stop() + printLogFiles(&b, nextWALNum-1) + require.Equal(t, queueSemChanCap, countSem()) + syncs = nil + w = nil + dirIndex = 0 + + return b.String() + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/wal/testdata/failover_writer b/wal/testdata/failover_writer new file mode 100644 index 00000000000..0bfe9aa6032 --- /dev/null +++ b/wal/testdata/failover_writer @@ -0,0 +1,17 @@ +init +---- + +write sync=true value=woolly +---- + +write sync=false value=sheep +---- + +switch +---- + +write sync=false value=yak +---- + +close +---- diff --git a/wal/wal.go b/wal/wal.go index d9d08456e3d..6f3e8c2baf2 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -27,7 +27,7 @@ type Dir struct { // NumWAL is the number of the virtual WAL. It can map to one or more physical // log files. In standalone mode, it will map to exactly one log file. In // failover mode, it can map to many log files, which are totally ordered -// (using a dense logIndex). +// (using a dense logNameIndex). // // In general, WAL refers to the virtual WAL, and file refers to a log file. // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to @@ -36,14 +36,14 @@ type Dir struct { // the contents of the directories. type NumWAL base.DiskFileNum -// logIndex numbers log files within a WAL. -type logIndex uint32 +// logNameIndex numbers log files within a WAL. +type logNameIndex uint32 // TODO(sumeer): parsing func. And remove attempts to parse log files outside // the wal package (including tools). // makeLogFilename makes a log filename. -func makeLogFilename(wn NumWAL, index logIndex) string { +func makeLogFilename(wn NumWAL, index logNameIndex) string { if index == 0 { // Use a backward compatible name, for simplicity. return base.MakeFilename(base.FileTypeLog, base.DiskFileNum(wn)) @@ -69,8 +69,14 @@ type Options struct { // recycling. MaxNumRecyclableLogs int - // SyncingFileOptions is the configuration when calling vfs.NewSyncingFile. - SyncingFileOpts vfs.SyncingFileOptions + // Configuration for calling vfs.NewSyncingFile. + + // NoSyncOnClose is documented in SyncingFileOptions. + NoSyncOnClose bool + // BytesPerSync is documented in SyncingFileOptions. + BytesPerSync int + // PreallocateSize is documented in SyncingFileOptions. + PreallocateSize func() int // MinSyncInterval is documented in Options.WALMinSyncInterval. MinSyncInterval func() time.Duration @@ -87,6 +93,13 @@ type Options struct { // there is no syncQueue, so the pushback into the commit pipeline is // unnecessary, but possibly harmless. QueueSemChan chan struct{} + + // ElevatedWriteStallThresholdLag is the duration for which an elevated + // threshold should continue after a switch back to the primary dir. + ElevatedWriteStallThresholdLag time.Duration + + // Logger for logging. + Logger base.Logger } // Stats exposes stats used in Pebble metrics. @@ -133,9 +146,14 @@ type Manager interface { // increasing, and be greater than any NumWAL seen earlier. The caller must // close the previous Writer before calling Create. Create(wn NumWAL) (Writer, error) + // ElevateWriteStallThresholdForFailover returns true if the caller should + // use a high write stall threshold because the WALs are being written to + // the secondary dir. + ElevateWriteStallThresholdForFailover() bool // Stats returns the latest Stats. Stats() Stats // Close the manager. + // REQUIRES: Writers and Readers have already been closed. Close() error } @@ -181,5 +199,5 @@ type Reader interface { } // Make lint happy. -var _ logIndex = 0 +var _ logNameIndex = 0 var _ = makeLogFilename