Skip to content

Commit

Permalink
record: propagate WAL sync errors to queued writers
Browse files Browse the repository at this point in the history
Once we stop panicking on commitPipeline failures and start placing DB into
read-only mode, all queued writers waiting for their sync need to be notified
of the sync error so that they can run to completion.

See also: cockroachdb#270
  • Loading branch information
rohansuri committed Jul 17, 2020
1 parent 073e15e commit 4cf4f9f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
25 changes: 20 additions & 5 deletions internal/record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ type LogWriter struct {
block *block
free struct {
sync.Mutex
// Condition variable used to signal a block is freed.
cond sync.Cond
blocks []*block
allocated int
Expand Down Expand Up @@ -439,9 +440,11 @@ func (w *LogWriter) flushLoop(context.Context) {

f.err = err
if f.err != nil {
// TODO(peter): There might be new waiters that we should propagate f.err
// to. Because f.err is now set, we only have to perform a single extra
// clearing of those waiters as no new ones can arrive via SyncRecord().
f.syncQ.clearBlocked()
for !f.syncQ.empty() {
head, tail := f.syncQ.load()
f.syncQ.pop(head, tail, err)
}
return
}
}
Expand Down Expand Up @@ -528,6 +531,7 @@ func (w *LogWriter) queueBlock() {
}

// Close flushes and syncs any unwritten data and closes the writer.
// Where required, external synchronisation is provided by commitPipeline.mu.
func (w *LogWriter) Close() error {
f := &w.flusher

Expand Down Expand Up @@ -562,6 +566,7 @@ func (w *LogWriter) Close() error {

// WriteRecord writes a complete record. Returns the offset just past the end
// of the record.
// External synchronisation provided by commitPipeline.mu.
func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
return w.SyncRecord(p, nil, nil)
}
Expand All @@ -570,6 +575,7 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
// asynchronously persisted to the underlying writer and done will be called on
// the wait group upon completion. Returns the offset just past the end of the
// record.
// External synchronisation provided by commitPipeline.mu.
func (w *LogWriter) SyncRecord(p []byte, wg *sync.WaitGroup, err *error) (int64, error) {
if w.err != nil {
return -1, w.err
Expand All @@ -590,8 +596,16 @@ func (w *LogWriter) SyncRecord(p []byte, wg *sync.WaitGroup, err *error) (int64,
// any record written to the LogWriter to this point will be flushed to the
// OS and synced to disk.
f := &w.flusher
f.syncQ.push(wg, err)
f.ready.Signal()
// If flushLoop had an error, stop accepting sync requests.
f.Lock()
if f.err != nil {
*err = f.err
wg.Done()
} else {
f.syncQ.push(wg, err)
f.ready.Signal()
}
f.Unlock()
}

offset := w.blockNum*blockSize + int64(w.block.written)
Expand All @@ -603,6 +617,7 @@ func (w *LogWriter) SyncRecord(p []byte, wg *sync.WaitGroup, err *error) (int64,
}

// Size returns the current size of the file.
// External synchronisation provided by commitPipeline.mu.
func (w *LogWriter) Size() int64 {
return w.blockNum*blockSize + int64(w.block.written)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/record/log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ func TestSyncQueue(t *testing.T) {
flusherWG.Wait()
}

func TestSyncErrorPropagation(t *testing.T) {
mem := vfs.NewMem()
f, err := mem.Create("log")
require.NoError(t, err)

injectedErr := errors.New("injected error")
w := NewLogWriter(syncErrorFile{f, injectedErr}, 0)

var syncErr error
for i := 0; i < 100; i++ {
var syncWG sync.WaitGroup
syncWG.Add(1)
// This doesn't return error because err from flusher to logWriter is only
// propagated upon reaching the block size.
_, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
require.NoError(t, err)
syncWG.Wait()
require.Equal(t, injectedErr, syncErr)
}
}

func TestFlusherCond(t *testing.T) {
var mu sync.Mutex
var q syncQueue
Expand Down

0 comments on commit 4cf4f9f

Please sign in to comment.