Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crl-release-24.2: wal: fix reader buffer reuse #3867

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,6 @@ var _ Reader = (*virtualWALReader)(nil)
// are no more records. The reader returned becomes stale after the next
// NextRecord call, and should no longer be used.
func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
r.recordBuf.Reset()

// On the first call, we need to open the first file.
if r.currIndex < 0 {
err := r.nextFile()
Expand Down Expand Up @@ -271,6 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// record to be exhausted to read all of the record's chunks before
// attempting to read the next record. Buffering also also allows us to
// easily read the header of the batch down below for deduplication.
r.recordBuf.Reset()
if err == nil {
_, err = io.Copy(&r.recordBuf, rec)
}
Expand Down Expand Up @@ -322,15 +321,13 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// sequence number. We can differentiate LogData-only batches through
// their batch headers: they'll encode a count of zero.
if h.Count == 0 {
r.recordBuf.Reset()
continue
}

// If we've already observed a sequence number >= this batch's sequence
// number, we must've already returned this record to the client. Skip
// it.
if h.SeqNum <= r.lastSeqNum {
r.recordBuf.Reset()
continue
}
r.lastSeqNum = h.SeqNum
Expand Down
11 changes: 9 additions & 2 deletions wal/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,15 @@ func TestReader(t *testing.T) {
rng.Read(garbage)
_, err := f.Write(garbage)
require.NoError(t, err)
require.NoError(t, f.Sync())
fields.HasValue("sync")
if fields.HasValue("sync") {
require.NoError(t, f.Sync())
}
case "corrupt-tail":
length := int64(fields.MustKeyValue("len").Int())
garbage := make([]byte, length)
rng.Read(garbage)
_, err := f.WriteAt(garbage, offset-length)
require.NoError(t, err)
default:
panic(fmt.Sprintf("unrecognized command %q", fields[0]))
}
Expand Down
43 changes: 43 additions & 0 deletions wal/testdata/reader
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,46 @@ r.NextRecord() = (rr, (000006-001.log: 93890), <nil>)
io.ReadAll(rr) = ("1d0200000000000005000000b68c7a260135dce1ce5c5498550793d15edfae62... <2055-byte record>", <nil>)
BatchHeader: [seqNum=541,count=5]
r.NextRecord() = (rr, (000006-001.log: 95956), EOF)

# Test corrupting the tail of a batch that's large enough to be split into
# multiple reads. Regression test for #3865.

define logNum=000007 unclean-close
batch count=2 seq=95499 size=44 sync
batch count=5 seq=95501 size=416 sync
batch count=29 seq=95506 size=199 sync
batch count=19 seq=95535 size=45991 sync
corrupt-tail len=1020
----
created "000007.log"
0..55: batch #95499
55..482: batch #95501
482..692: batch #95506
692..46705: batch #95535

define logNum=000007 logNameIndex=001
batch count=19 seq=95535 size=45991 sync
batch count=19 seq=95554 size=292 sync
----
created "000007-001.log"
0..46013: batch #95535
46013..46316: batch #95554

read logNum=000007
----
r.NextRecord() = (rr, (000007.log: 0), <nil>)
io.ReadAll(rr) = ("0b7501000000000002000000e48d154602d9c44d74851cfa9ff3403655489ab5... <44-byte record>", <nil>)
BatchHeader: [seqNum=95499,count=2]
r.NextRecord() = (rr, (000007.log: 55), <nil>)
io.ReadAll(rr) = ("0d75010000000000050000008ef212bddc565748772200669dee9f906f7fe83d... <416-byte record>", <nil>)
BatchHeader: [seqNum=95501,count=5]
r.NextRecord() = (rr, (000007.log: 482), <nil>)
io.ReadAll(rr) = ("12750100000000001d000000362bba27f0ed6f5433a12bc502873a27c67f256c... <199-byte record>", <nil>)
BatchHeader: [seqNum=95506,count=29]
r.NextRecord() = (rr, (000007-001.log: 0), <nil>)
io.ReadAll(rr) = ("2f75010000000000130000001ddd809cbb45782c44544a15a15dd52fb7b81a74... <45991-byte record>", <nil>)
BatchHeader: [seqNum=95535,count=19]
r.NextRecord() = (rr, (000007-001.log: 46013), <nil>)
io.ReadAll(rr) = ("427501000000000013000000b30c11cf619ea65167511346cc55bb784a9af26f... <292-byte record>", <nil>)
BatchHeader: [seqNum=95554,count=19]
r.NextRecord() = (rr, (000007-001.log: 46316), EOF)
Loading