Skip to content

Commit

Permalink
wal,record: failover write path code
Browse files Browse the repository at this point in the history
failover_manager.go contains the failoverManager (which implements
wal.Manager) for the write path, and helper classes. dirProber monitors
the primary dir when failed over to use the secondary, to decide when to
failback to the primary. failoverMonitor uses the latency and error seen
by the current *LogWriter, and probing state, to decide when to switch to
a different dir.

failover_writer.go contains the failoverWriter, that can switch across
a sequence of record.LogWriters.

record.LogWriter is changed to accommodate both standalone and failover
mode, without affecting the synchronization in standalone mode.
In failover mode there is some additional synchronization, in the
failoverWriter queue, which is not lock free, but is hopefully fast
enough given the fastpath will use read locks.

Informs cockroachdb#3230
  • Loading branch information
sumeerbhola committed Jan 25, 2024
1 parent 5b09251 commit 0048fc0
Show file tree
Hide file tree
Showing 8 changed files with 2,243 additions and 49 deletions.
286 changes: 251 additions & 35 deletions record/log_writer.go

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions record/log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ func TestSyncQueue(t *testing.T) {

func TestFlusherCond(t *testing.T) {
var mu sync.Mutex
var q syncQueue
var c flusherCond
var closed bool

c.init(&mu, &q)
psq := &pendingSyncsWithSyncQueue{}
c.init(&mu, psq)
q := &psq.syncQueue

var flusherWG sync.WaitGroup
flusherWG.Add(1)
Expand Down Expand Up @@ -299,7 +300,8 @@ func TestMinSyncInterval(t *testing.T) {
}
// NB: we can't use syncQueue.load() here as that will return 0,0 while the
// syncQueue is blocked.
head, tail := w.flusher.syncQ.unpack(w.flusher.syncQ.headTail.Load())
syncQ := w.flusher.pendingSyncs.(*pendingSyncsWithSyncQueue)
head, tail := syncQ.unpack(syncQ.headTail.Load())
waiters := head - tail
if waiters != uint32(i+1) {
t.Fatalf("expected %d waiters, but found %d", i+1, waiters)
Expand Down
8 changes: 6 additions & 2 deletions vfs/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
OpFileStat
// OpFileSync describes a file sync operation.
OpFileSync
// OpFileSyncData describes a file sync operation.
OpFileSyncData
// OpFileFlush describes a file flush operation.
OpFileFlush
)
Expand All @@ -90,7 +92,7 @@ func (o OpKind) ReadOrWrite() OpReadWrite {
switch o {
case OpOpen, OpOpenDir, OpList, OpStat, OpGetDiskUsage, OpFileRead, OpFileReadAt, OpFileStat:
return OpIsRead
case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileFlush, OpFilePreallocate:
case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileSyncData, OpFileFlush, OpFilePreallocate:
return OpIsWrite
default:
panic(fmt.Sprintf("unrecognized op %v\n", o))
Expand Down Expand Up @@ -528,7 +530,9 @@ func (f *errorFile) Sync() error {
}

func (f *errorFile) SyncData() error {
// TODO(jackson): Consider error injection.
if err := f.inj.MaybeError(Op{Kind: OpFileSyncData, Path: f.path}); err != nil {
return err
}
return f.file.SyncData()
}

Expand Down
Loading

0 comments on commit 0048fc0

Please sign in to comment.