From 211dce02c64ac5b2e54282c38ed24e7404d7d471 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 19 Aug 2024 17:38:17 -0400 Subject: [PATCH] wal: fix reader buffer reuse While stitching together physical WAL segments, the WAL reader reads each record's contents into a temporary buffer. Previously, if an error was encountered mid-copy of a record (eg, because the record was large enough to be split across multiple chunks), the buffer was not reset before attempting to read the next file's record. This corruption could prevent replay of the WAL. This commit fixes the bug moving the buffer reset to immediately precede its use. Informs #3865. --- wal/reader.go | 5 +---- wal/reader_test.go | 11 +++++++++-- wal/testdata/reader | 43 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/wal/reader.go b/wal/reader.go index 745fe1698a..6cec8f8085 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -241,8 +241,6 @@ var _ Reader = (*virtualWALReader)(nil) // are no more records. The reader returned becomes stale after the next // NextRecord call, and should no longer be used. func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { - r.recordBuf.Reset() - // On the first call, we need to open the first file. if r.currIndex < 0 { err := r.nextFile() @@ -271,6 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // record to be exhausted to read all of the record's chunks before // attempting to read the next record. Buffering also also allows us to // easily read the header of the batch down below for deduplication. + r.recordBuf.Reset() if err == nil { _, err = io.Copy(&r.recordBuf, rec) } @@ -322,7 +321,6 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // sequence number. We can differentiate LogData-only batches through // their batch headers: they'll encode a count of zero. if h.Count == 0 { - r.recordBuf.Reset() continue } @@ -330,7 +328,6 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // number, we must've already returned this record to the client. Skip // it. if h.SeqNum <= r.lastSeqNum { - r.recordBuf.Reset() continue } r.lastSeqNum = h.SeqNum diff --git a/wal/reader_test.go b/wal/reader_test.go index 9b40380117..520807fc27 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -176,8 +176,15 @@ func TestReader(t *testing.T) { rng.Read(garbage) _, err := f.Write(garbage) require.NoError(t, err) - require.NoError(t, f.Sync()) - fields.HasValue("sync") + if fields.HasValue("sync") { + require.NoError(t, f.Sync()) + } + case "corrupt-tail": + length := int64(fields.MustKeyValue("len").Int()) + garbage := make([]byte, length) + rng.Read(garbage) + _, err := f.WriteAt(garbage, offset-length) + require.NoError(t, err) default: panic(fmt.Sprintf("unrecognized command %q", fields[0])) } diff --git a/wal/testdata/reader b/wal/testdata/reader index d310bde2e1..bf3010fbb3 100644 --- a/wal/testdata/reader +++ b/wal/testdata/reader @@ -329,3 +329,46 @@ r.NextRecord() = (rr, (000006-001.log: 93890), ) io.ReadAll(rr) = ("1d0200000000000005000000b68c7a260135dce1ce5c5498550793d15edfae62... <2055-byte record>", ) BatchHeader: [seqNum=541,count=5] r.NextRecord() = (rr, (000006-001.log: 95956), EOF) + +# Test corrupting the tail of a batch that's large enough to be split into +# multiple reads. Regression test for #3865. + +define logNum=000007 unclean-close +batch count=2 seq=95499 size=44 sync +batch count=5 seq=95501 size=416 sync +batch count=29 seq=95506 size=199 sync +batch count=19 seq=95535 size=45991 sync +corrupt-tail len=1020 +---- +created "000007.log" +0..55: batch #95499 +55..482: batch #95501 +482..692: batch #95506 +692..46705: batch #95535 + +define logNum=000007 logNameIndex=001 +batch count=19 seq=95535 size=45991 sync +batch count=19 seq=95554 size=292 sync +---- +created "000007-001.log" +0..46013: batch #95535 +46013..46316: batch #95554 + +read logNum=000007 +---- +r.NextRecord() = (rr, (000007.log: 0), ) + io.ReadAll(rr) = ("0b7501000000000002000000e48d154602d9c44d74851cfa9ff3403655489ab5... <44-byte record>", ) + BatchHeader: [seqNum=95499,count=2] +r.NextRecord() = (rr, (000007.log: 55), ) + io.ReadAll(rr) = ("0d75010000000000050000008ef212bddc565748772200669dee9f906f7fe83d... <416-byte record>", ) + BatchHeader: [seqNum=95501,count=5] +r.NextRecord() = (rr, (000007.log: 482), ) + io.ReadAll(rr) = ("12750100000000001d000000362bba27f0ed6f5433a12bc502873a27c67f256c... <199-byte record>", ) + BatchHeader: [seqNum=95506,count=29] +r.NextRecord() = (rr, (000007-001.log: 0), ) + io.ReadAll(rr) = ("2f75010000000000130000001ddd809cbb45782c44544a15a15dd52fb7b81a74... <45991-byte record>", ) + BatchHeader: [seqNum=95535,count=19] +r.NextRecord() = (rr, (000007-001.log: 46013), ) + io.ReadAll(rr) = ("427501000000000013000000b30c11cf619ea65167511346cc55bb784a9af26f... <292-byte record>", ) + BatchHeader: [seqNum=95554,count=19] +r.NextRecord() = (rr, (000007-001.log: 46316), EOF)