diff --git a/wal/failover_writer.go b/wal/failover_writer.go index 14067d9626d..8aebf493e6d 100644 --- a/wal/failover_writer.go +++ b/wal/failover_writer.go @@ -773,19 +773,23 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()} ww.mu.Lock() defer ww.mu.Unlock() + numWriters := ww.mu.nextWriterIndex // Every iteration starts and ends with the mutex held. // - // Invariant: ww.mu.nextWriterIndex >= 1. + // Invariant: numWriters >= 1. // // We will loop until we have closed the lastWriter (and use // lastWriter.err). We also need to call close on all LogWriters // that will not close themselves, i.e., those that have already been // created and installed in failoverWriter.writers (this set may change // while failoverWriter.Close runs). - for !lastWriter.closed { - numWriters := ww.mu.nextWriterIndex + for !lastWriter.closed || numWriters > lastWriter.index + 1 { if numWriters > closeCalledCount { - // INVARIANT: numWriters > closeCalledCount. + // lastWriter.index may or may not have advanced. If it has advanced, we + // need to reinitialize lastWriterState. If it hasn't advanced, and + // numWriters > closeCalledCount, we know that we haven't called close + // on it, so nothing in lastWriterState needs to be retained. For + // simplicity, we overwrite in both cases. lastWriter = lastWriterState{ index: numWriters - 1, } @@ -857,7 +861,34 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { if !lastWriter.closed { // Either waiting for creation of last writer or waiting for the close // to finish, or something else to become the last writer. + // + // It is possible that what we think of as the last writer (lastWriter) + // closes itself while ww.mu is no longer held here, and a new LogWriter + // is created too. All the records are synced, but the real last writer + // may still be writing some records. Specifically, consider the + // following sequence while this wait does not hold the mutex: + // + // - recordQueue has an entry, with index 50, that does not require a + // sync. + // - Last writer created at index 10 and entry 50 is handed to it. + // - lastWriter.index is still 9 and it closes itself and signals this + // cond. It has written entry 50 and synced (since close syncs). + // - The wait completes. + // + // Now the writer at index 10 will never be closed and will never sync. + // A crash can cause some part of what it writes to be lost. Note that + // there is no data loss, but there are some unfortunate consequences: + // + // - We never closed a file descriptor. + // - virtualWALReader.NextRecord can return an error on finding a + // malformed chunk in the last writer (at index 10) instead of + // swallowing the error. This can cause DB.Open to fail. + // + // To avoid this, we grab the latest value of numWriters on reacquiring + // the mutex, and will continue looping until the writer at index 10 is + // closed (or writer at index 11 is created). ww.mu.cond.Wait() + numWriters = ww.mu.nextWriterIndex } } if ww.mu.writers[lastWriter.index].w != nil { @@ -867,7 +898,10 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) { err = lastWriter.err ww.mu.metrics = lastWriter.metrics ww.mu.closed = true - _, _ = ww.q.popAll(err) + n, m := ww.q.popAll(err) + if err == nil && (n > 0 || m > 0) { + panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m)) + } return logicalOffset, err }