Skip to content

Commit

Permalink
wal: fix reader buffer reuse
Browse files Browse the repository at this point in the history
While stitching together physical WAL segments, the WAL reader reads each
record's contents into a temporary buffer. Previously, if an error was
encountered mid-copy of a record (eg, because the record was large enough to be
split across multiple chunks), the buffer was not reset before attempting to
read the next file's record. This corruption could prevent replay of the WAL.

This commit fixes the bug moving the buffer reset to immediately precede its
use.

Informs cockroachdb#3865.
  • Loading branch information
jbowens committed Aug 20, 2024
1 parent cd80963 commit 8725a9f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
5 changes: 1 addition & 4 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,6 @@ var _ Reader = (*virtualWALReader)(nil)
// are no more records. The reader returned becomes stale after the next
// NextRecord call, and should no longer be used.
func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
r.recordBuf.Reset()

// On the first call, we need to open the first file.
if r.currIndex < 0 {
err := r.nextFile()
Expand Down Expand Up @@ -271,6 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// record to be exhausted to read all of the record's chunks before
// attempting to read the next record. Buffering also also allows us to
// easily read the header of the batch down below for deduplication.
r.recordBuf.Reset()
if err == nil {
_, err = io.Copy(&r.recordBuf, rec)
}
Expand Down Expand Up @@ -322,15 +321,13 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// sequence number. We can differentiate LogData-only batches through
// their batch headers: they'll encode a count of zero.
if h.Count == 0 {
r.recordBuf.Reset()
continue
}

// If we've already observed a sequence number >= this batch's sequence
// number, we must've already returned this record to the client. Skip
// it.
if h.SeqNum <= r.lastSeqNum {
r.recordBuf.Reset()
continue
}
r.lastSeqNum = h.SeqNum
Expand Down
11 changes: 9 additions & 2 deletions wal/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,15 @@ func TestReader(t *testing.T) {
rng.Read(garbage)
_, err := f.Write(garbage)
require.NoError(t, err)
require.NoError(t, f.Sync())
fields.HasValue("sync")
if fields.HasValue("sync") {
require.NoError(t, f.Sync())
}
case "corrupt-tail":
length := int64(fields.MustKeyValue("len").Int())
garbage := make([]byte, length)
rng.Read(garbage)
_, err := f.WriteAt(garbage, offset-length)
require.NoError(t, err)
default:
panic(fmt.Sprintf("unrecognized command %q", fields[0]))
}
Expand Down
43 changes: 43 additions & 0 deletions wal/testdata/reader
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,46 @@ r.NextRecord() = (rr, (000006-001.log: 93890), <nil>)
io.ReadAll(rr) = ("1d0200000000000005000000b68c7a260135dce1ce5c5498550793d15edfae62... <2055-byte record>", <nil>)
BatchHeader: [seqNum=541,count=5]
r.NextRecord() = (rr, (000006-001.log: 95956), EOF)

# Test corrupting the tail of a batch that's large enough to be split into
# multiple reads. Regression test for #3865.

define logNum=000007 unclean-close
batch count=2 seq=95499 size=44 sync
batch count=5 seq=95501 size=416 sync
batch count=29 seq=95506 size=199 sync
batch count=19 seq=95535 size=45991 sync
corrupt-tail len=1020
----
created "000007.log"
0..55: batch #95499
55..482: batch #95501
482..692: batch #95506
692..46705: batch #95535

define logNum=000007 logNameIndex=001
batch count=19 seq=95535 size=45991 sync
batch count=19 seq=95554 size=292 sync
----
created "000007-001.log"
0..46013: batch #95535
46013..46316: batch #95554

read logNum=000007
----
r.NextRecord() = (rr, (000007.log: 0), <nil>)
io.ReadAll(rr) = ("0b7501000000000002000000e48d154602d9c44d74851cfa9ff3403655489ab5... <44-byte record>", <nil>)
BatchHeader: [seqNum=95499,count=2]
r.NextRecord() = (rr, (000007.log: 55), <nil>)
io.ReadAll(rr) = ("0d75010000000000050000008ef212bddc565748772200669dee9f906f7fe83d... <416-byte record>", <nil>)
BatchHeader: [seqNum=95501,count=5]
r.NextRecord() = (rr, (000007.log: 482), <nil>)
io.ReadAll(rr) = ("12750100000000001d000000362bba27f0ed6f5433a12bc502873a27c67f256c... <199-byte record>", <nil>)
BatchHeader: [seqNum=95506,count=29]
r.NextRecord() = (rr, (000007-001.log: 0), <nil>)
io.ReadAll(rr) = ("2f75010000000000130000001ddd809cbb45782c44544a15a15dd52fb7b81a74... <45991-byte record>", <nil>)
BatchHeader: [seqNum=95535,count=19]
r.NextRecord() = (rr, (000007-001.log: 46013), <nil>)
io.ReadAll(rr) = ("427501000000000013000000b30c11cf619ea65167511346cc55bb784a9af26f... <292-byte record>", <nil>)
BatchHeader: [seqNum=95554,count=19]
r.NextRecord() = (rr, (000007-001.log: 46316), EOF)

0 comments on commit 8725a9f

Please sign in to comment.