Skip to content

Commit

Permalink
wal: ensure last record.LogWriter is closed to avoid spurious errors …
Browse files Browse the repository at this point in the history
…on open

This also ensures we don't leak a file descriptor.

Informs #3865
  • Loading branch information
sumeerbhola authored and jbowens committed Aug 28, 2024
1 parent 02bb64a commit f5dc2a4
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions wal/failover_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,19 +773,23 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
ww.mu.Lock()
defer ww.mu.Unlock()
numWriters := ww.mu.nextWriterIndex
// Every iteration starts and ends with the mutex held.
//
// Invariant: ww.mu.nextWriterIndex >= 1.
// Invariant: numWriters >= 1.
//
// We will loop until we have closed the lastWriter (and use
// lastWriter.err). We also need to call close on all LogWriters
// that will not close themselves, i.e., those that have already been
// created and installed in failoverWriter.writers (this set may change
// while failoverWriter.Close runs).
for !lastWriter.closed {
numWriters := ww.mu.nextWriterIndex
for !lastWriter.closed || numWriters > lastWriter.index+1 {
if numWriters > closeCalledCount {
// INVARIANT: numWriters > closeCalledCount.
// lastWriter.index may or may not have advanced. If it has advanced, we
// need to reinitialize lastWriterState. If it hasn't advanced, and
// numWriters > closeCalledCount, we know that we haven't called close
// on it, so nothing in lastWriterState needs to be retained. For
// simplicity, we overwrite in both cases.
lastWriter = lastWriterState{
index: numWriters - 1,
}
Expand Down Expand Up @@ -857,7 +861,34 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
if !lastWriter.closed {
// Either waiting for creation of last writer or waiting for the close
// to finish, or something else to become the last writer.
//
// It is possible that what we think of as the last writer (lastWriter)
// closes itself while ww.mu is no longer held here, and a new LogWriter
// is created too. All the records are synced, but the real last writer
// may still be writing some records. Specifically, consider the
// following sequence while this wait does not hold the mutex:
//
// - recordQueue has an entry, with index 50, that does not require a
// sync.
// - Last writer created at index 10 and entry 50 is handed to it.
// - lastWriter.index is still 9 and it closes itself and signals this
// cond. It has written entry 50 and synced (since close syncs).
// - The wait completes.
//
// Now the writer at index 10 will never be closed and will never sync.
// A crash can cause some part of what it writes to be lost. Note that
// there is no data loss, but there are some unfortunate consequences:
//
// - We never closed a file descriptor.
// - virtualWALReader.NextRecord can return an error on finding a
// malformed chunk in the last writer (at index 10) instead of
// swallowing the error. This can cause DB.Open to fail.
//
// To avoid this, we grab the latest value of numWriters on reacquiring
// the mutex, and will continue looping until the writer at index 10 is
// closed (or writer at index 11 is created).
ww.mu.cond.Wait()
numWriters = ww.mu.nextWriterIndex
}
}
if ww.mu.writers[lastWriter.index].w != nil {
Expand All @@ -867,7 +898,10 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
err = lastWriter.err
ww.mu.metrics = lastWriter.metrics
ww.mu.closed = true
_, _ = ww.q.popAll(err)
n, m := ww.q.popAll(err)
if err == nil && (n > 0 || m > 0) {
panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
}
return logicalOffset, err
}

Expand Down

0 comments on commit f5dc2a4

Please sign in to comment.