Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wal: ensure last record.LogWriter is closed to avoid spurious errors … #3896

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions wal/failover_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,19 +773,23 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
ww.mu.Lock()
defer ww.mu.Unlock()
numWriters := ww.mu.nextWriterIndex
// Every iteration starts and ends with the mutex held.
//
// Invariant: ww.mu.nextWriterIndex >= 1.
// Invariant: numWriters >= 1.
//
// We will loop until we have closed the lastWriter (and use
// lastWriter.err). We also need to call close on all LogWriters
// that will not close themselves, i.e., those that have already been
// created and installed in failoverWriter.writers (this set may change
// while failoverWriter.Close runs).
for !lastWriter.closed {
numWriters := ww.mu.nextWriterIndex
for !lastWriter.closed || numWriters > lastWriter.index+1 {
if numWriters > closeCalledCount {
// INVARIANT: numWriters > closeCalledCount.
// lastWriter.index may or may not have advanced. If it has advanced, we
// need to reinitialize lastWriterState. If it hasn't advanced, and
// numWriters > closeCalledCount, we know that we haven't called close
// on it, so nothing in lastWriterState needs to be retained. For
// simplicity, we overwrite in both cases.
lastWriter = lastWriterState{
index: numWriters - 1,
}
Expand Down Expand Up @@ -857,7 +861,34 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
if !lastWriter.closed {
// Either waiting for creation of last writer or waiting for the close
// to finish, or something else to become the last writer.
//
// It is possible that what we think of as the last writer (lastWriter)
// closes itself while ww.mu is no longer held here, and a new LogWriter
// is created too. All the records are synced, but the real last writer
// may still be writing some records. Specifically, consider the
// following sequence while this wait does not hold the mutex:
//
// - recordQueue has an entry, with index 50, that does not require a
// sync.
// - Last writer created at index 10 and entry 50 is handed to it.
// - lastWriter.index is still 9 and it closes itself and signals this
// cond. It has written entry 50 and synced (since close syncs).
// - The wait completes.
//
// Now the writer at index 10 will never be closed and will never sync.
// A crash can cause some part of what it writes to be lost. Note that
// there is no data loss, but there are some unfortunate consequences:
//
// - We never closed a file descriptor.
// - virtualWALReader.NextRecord can return an error on finding a
// malformed chunk in the last writer (at index 10) instead of
// swallowing the error. This can cause DB.Open to fail.
//
// To avoid this, we grab the latest value of numWriters on reacquiring
// the mutex, and will continue looping until the writer at index 10 is
// closed (or writer at index 11 is created).
ww.mu.cond.Wait()
numWriters = ww.mu.nextWriterIndex
}
}
if ww.mu.writers[lastWriter.index].w != nil {
Expand All @@ -867,7 +898,10 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
err = lastWriter.err
ww.mu.metrics = lastWriter.metrics
ww.mu.closed = true
_, _ = ww.q.popAll(err)
n, m := ww.q.popAll(err)
if err == nil && (n > 0 || m > 0) {
panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
}
return logicalOffset, err
}

Expand Down
Loading