diff --git a/wal/reader.go b/wal/reader.go index ad92e42465..d1ac5a830a 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -5,13 +5,18 @@ package wal import ( + "bytes" "cmp" "fmt" + "io" "slices" "strings" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" ) // A segment represents an individual physical file that makes up a contiguous @@ -49,9 +54,23 @@ func (w logicalWAL) String() string { return sb.String() } +type logicalWALs []logicalWAL + +// get retrieves the WAL with the given number if present. The second return +// value indicates whether or not the WAL was found. +func (wals logicalWALs) get(num NumWAL) (logicalWAL, bool) { + i, found := slices.BinarySearchFunc(wals, num, func(lw logicalWAL, n NumWAL) int { + return cmp.Compare(lw.NumWAL, n) + }) + if !found { + return logicalWAL{}, false + } + return wals[i], true +} + // listLogs finds all log files in the provided directories. It returns an // ordered list of WALs in increasing NumWAL order. -func listLogs(dirs ...Dir) ([]logicalWAL, error) { +func listLogs(dirs ...Dir) (logicalWALs, error) { var wals []logicalWAL for _, d := range dirs { ls, err := d.FS.List(d.Dirname) @@ -85,3 +104,184 @@ func listLogs(dirs ...Dir) ([]logicalWAL, error) { } return wals, nil } + +func newVirtualWALReader(logNum NumWAL, segments []segment) *virtualWALReader { + return &virtualWALReader{ + logNum: logNum, + segments: segments, + currIndex: -1, + } +} + +// A virtualWALReader takes an ordered sequence of physical WAL files +// ("segments") and implements the wal.Reader interface, providing a merged view +// of the WAL's logical contents. It's responsible for filtering duplicate +// records which may be shared by the tail of a segment file and the head of its +// successor. +type virtualWALReader struct { + // VirtualWAL metadata. + logNum NumWAL + segments []segment + + // State pertaining to the current position of the reader within the virtual + // WAL and its constituent physical files. + currIndex int + currFile vfs.File + currReader *record.Reader + // off describes the current Offset within the WAL. + off Offset + // lastSeqNum is the sequence number of the batch contained within the last + // record returned to the user. A virtual WAL may be split across a sequence + // of several physical WAL files. The tail of one physical WAL may be + // duplicated within the head of the next physical WAL file. We use + // contained batches' sequence numbers to deduplicate. This lastSeqNum field + // should monotonically increase as we iterate over the WAL files. If we + // ever observe a batch encoding a sequence number <= lastSeqNum, we must + // have already returned the batch and should skip it. + lastSeqNum uint64 + // recordBuf is a buffer used to hold the latest record read from a physical + // file, and then returned to the user. A pointer to this buffer is returned + // directly to the caller of NextRecord. + recordBuf bytes.Buffer +} + +// *virtualWALReader implements wal.Reader. +var _ Reader = (*virtualWALReader)(nil) + +// NextRecord returns a reader for the next record. It returns io.EOF if there +// are no more records. The reader returned becomes stale after the next +// NextRecord call, and should no longer be used. +func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { + r.recordBuf.Reset() + + // On the first call, we need to open the first file. + if r.currIndex < 0 { + err := r.nextFile() + if err != nil { + return nil, Offset{}, err + } + } + + for { + // Update our current physical offset to match the current file offset. + r.off.Physical = r.currReader.Offset() + // Obtain a Reader for the next record within this log file. + rec, err := r.currReader.Next() + if errors.Is(err, io.EOF) { + // This file is exhausted; continue to the next. + err := r.nextFile() + if err != nil { + return nil, r.off, err + } + continue + } + + // Copy the record into a buffer. This ensures we read its entirety so + // that NextRecord returns the next record, even if the caller never + // exhausts the previous record's Reader. The record.Reader requires the + // record to be exhausted to read all of the record's chunks before + // attempting to read the next record. Buffering also also allows us to + // easily read the header of the batch down below for deduplication. + if err == nil { + _, err = io.Copy(&r.recordBuf, rec) + } + // The record may be malformed. This is expected during a WAL failover, + // because the tail of a WAL may be only partially written or otherwise + // unclean because of WAL recycling and the inability to write the EOF + // trailer record. If this isn't the last file, we silently ignore the + // invalid record at the tail and proceed to the next file. If it is + // the last file, bubble the error up and let the client decide what to + // do with it. If the virtual WAL is the most recent WAL, Open may also + // decide to ignore it because it's consistent with an incomplete + // in-flight write at the time of process exit/crash. See #453. + if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 { + if err := r.nextFile(); err != nil { + return nil, r.off, err + } + continue + } else if err != nil { + return nil, r.off, err + } + + // We may observe repeat records between the physical files that make up + // a virtual WAL because inflight writes to a file on a stalled disk may + // or may not end up completing. WAL records always contain encoded + // batches, and batches that contain data can be uniquely identifed by + // sequence number. + // + // Parse the batch header. + h, ok := batchrepr.ReadHeader(r.recordBuf.Bytes()) + if !ok { + // Failed to read the batch header because the record was smaller + // than the length of a batch header. This is unexpected. The record + // envelope successfully decoded and the checkums of the individual + // record fragment(s) validated, so the writer truly wrote an + // invalid batch. During Open WAL recovery treats this as + // corruption. We could return the record to the caller, allowing + // the caller to interpret it as corruption, but it seems safer to + // be explicit and surface the corruption error here. + return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch", + r.logNum, errors.Safe(r.segments[r.currIndex].logNameIndex)) + } + + // There's a subtlety necessitated by LogData operations. A LogData + // applied to a batch results in data appended to the WAL in a batch + // format, but the data is never applied to the memtable or LSM. A batch + // only containing LogData will repeat a sequence number. We skip these + // batches because they're not relevant for recovery and we do not want + // to mistakenly deduplicate the batch containing KVs at the same + // sequence number. We can differentiate LogData-only batches through + // their batch headers: they'll encode a count of zero. + if h.Count == 0 { + r.recordBuf.Reset() + continue + } + + // If we've already observed a sequence number >= this batch's sequence + // number, we must've already returned this record to the client. Skip + // it. + if h.SeqNum <= r.lastSeqNum { + r.recordBuf.Reset() + continue + } + r.lastSeqNum = h.SeqNum + return &r.recordBuf, r.off, nil + } +} + +// Close closes the reader, releasing open resources. +func (r *virtualWALReader) Close() error { + if r.currFile != nil { + if err := r.currFile.Close(); err != nil { + return err + } + } + return nil +} + +// nextFile advances the internal state to the next physical segment file. +func (r *virtualWALReader) nextFile() error { + if r.currFile != nil { + err := r.currFile.Close() + r.currFile = nil + if err != nil { + return err + } + } + r.currIndex++ + if r.currIndex >= len(r.segments) { + return io.EOF + } + + segment := r.segments[r.currIndex] + fs := segment.dir.FS + path := fs.PathJoin(segment.dir.Dirname, makeLogFilename(r.logNum, segment.logNameIndex)) + r.off.PhysicalFile = path + r.off.Physical = 0 + var err error + if r.currFile, err = fs.Open(path); err != nil { + return errors.Wrapf(err, "opening WAL file segment %q", path) + } + r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.logNum)) + return nil +} diff --git a/wal/reader_test.go b/wal/reader_test.go index 02c847284d..38ae89a10b 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -6,13 +6,21 @@ package wal import ( "bytes" + "cmp" "fmt" + "io" + "math/rand" "os" "slices" "strings" + "sync" "testing" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/batchrepr" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/datadrivenutil" + "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -85,3 +93,157 @@ func TestList(t *testing.T) { } }) } + +// TestReader tests the virtual WAL reader that merges across multiple physical +// log files. +func TestReader(t *testing.T) { + fs := vfs.NewStrictMem() + rng := rand.New(rand.NewSource(1)) + var buf bytes.Buffer + datadriven.RunTest(t, "testdata/reader", func(t *testing.T, td *datadriven.TestData) string { + buf.Reset() + switch td.Cmd { + case "define": + var logNum uint64 + var index int64 + var recycleFilename string + td.ScanArgs(t, "logNum", &logNum) + td.MaybeScanArgs(t, "logNameIndex", &index) + td.MaybeScanArgs(t, "recycleFilename", &recycleFilename) + + filename := makeLogFilename(NumWAL(logNum), logNameIndex(index)) + var f vfs.File + var err error + if recycleFilename != "" { + f, err = fs.ReuseForWrite(recycleFilename, filename) + require.NoError(t, err) + fmt.Fprintf(&buf, "recycled %q as %q\n", recycleFilename, filename) + } else { + f, err = fs.Create(filename) + require.NoError(t, err) + fmt.Fprintf(&buf, "created %q\n", filename) + } + dir, err := fs.OpenDir("") + require.NoError(t, err) + require.NoError(t, dir.Sync()) + require.NoError(t, dir.Close()) + w := record.NewLogWriter(f, base.DiskFileNum(logNum), record.LogWriterConfig{}) + + lines := datadrivenutil.Lines(td.Input) + var offset int64 + for len(lines) > 0 { + fields := lines.Next().Fields() + switch fields[0] { + case "batch": + // Fake a batch of the provided size. + size := fields.MustKeyValue("size").Int() + repr := make([]byte, size) + var seq uint64 + if len(repr) >= batchrepr.HeaderLen { + count := uint32(fields.MustKeyValue("count").Uint64()) + seq = fields.MustKeyValue("seq").Uint64() + rng.Read(repr[batchrepr.HeaderLen:]) + batchrepr.SetSeqNum(repr, seq) + batchrepr.SetCount(repr, count) + } + + var tailOffset int64 + if fields.HasValue("sync") { + var wg sync.WaitGroup + var writeErr, syncErr error + wg.Add(1) + tailOffset, writeErr = w.SyncRecord(repr, &wg, &syncErr) + if writeErr != nil { + return writeErr.Error() + } + wg.Wait() + if syncErr != nil { + return syncErr.Error() + } + } else { + var writeErr error + tailOffset, writeErr = w.WriteRecord(repr) + if writeErr != nil { + return writeErr.Error() + } + } + + fmt.Fprintf(&buf, "%d..%d: batch #%d\n", offset, tailOffset, seq) + offset = tailOffset + case "write-garbage": + size := fields.MustKeyValue("size").Int() + garbage := make([]byte, size) + rng.Read(garbage) + _, err := f.Write(garbage) + require.NoError(t, err) + require.NoError(t, f.Sync()) + fields.HasValue("sync") + default: + panic(fmt.Sprintf("unrecognized command %q", fields[0])) + } + } + if td.HasArg("close-unclean") { + fs.SetIgnoreSyncs(true) + require.NoError(t, w.Close()) + fs.ResetToSyncedState() + fs.SetIgnoreSyncs(false) + } else { + require.NoError(t, w.Close()) + } + return buf.String() + case "read": + var logNum uint64 + var forceLogNameIndexes []uint64 + td.ScanArgs(t, "logNum", &logNum) + td.MaybeScanArgs(t, "forceLogNameIndexes", &forceLogNameIndexes) + logs, err := listLogs(Dir{FS: fs}) + require.NoError(t, err) + log, ok := logs.get(NumWAL(logNum)) + if !ok { + return "not found" + } + + segments := log.segments + // If forceLogNameIndexes is provided, pretend we found some + // additional segments. This can be used to exercise the case where + // opening the next physical segment file fails. + if len(forceLogNameIndexes) > 0 { + for _, li := range forceLogNameIndexes { + j, found := slices.BinarySearchFunc(segments, logNameIndex(li), func(s segment, li logNameIndex) int { + return cmp.Compare(s.logNameIndex, li) + }) + require.False(t, found) + segments = slices.Insert(segments, j, segment{logNameIndex: logNameIndex(li), dir: Dir{FS: fs}}) + } + } + + r := newVirtualWALReader(log.NumWAL, segments) + for { + rr, off, err := r.NextRecord() + fmt.Fprintf(&buf, "r.NextRecord() = (rr, %s, %v)\n", off, err) + if err != nil { + break + } + b, err := io.ReadAll(rr) + fmt.Fprintf(&buf, " io.ReadAll(rr) = (\"") + if len(b) < 32 { + fmt.Fprintf(&buf, "%x", b) + } else { + fmt.Fprintf(&buf, "%x... <%d-byte record>", b[:32], len(b)) + } + fmt.Fprintf(&buf, "\", %v)\n", err) + if h, ok := batchrepr.ReadHeader(b); !ok { + fmt.Fprintln(&buf, " failed to parse batch header") + } else { + fmt.Fprintf(&buf, " BatchHeader: %s\n", h) + } + } + if err := r.Close(); err != nil { + fmt.Fprintf(&buf, "r.Close() = %q", err) + } + return buf.String() + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} diff --git a/wal/testdata/reader b/wal/testdata/reader new file mode 100644 index 0000000000..d310bde2e1 --- /dev/null +++ b/wal/testdata/reader @@ -0,0 +1,331 @@ +# Test a simple WAL file that consists of a single segment with a clean WAL +# tail. This is what we'd expect from a graceful process exit in a non-failover +# scenario. + +define logNum=000001 +batch count=3 size=1024 seq=1 +batch count=2 size=30 seq=20 +batch count=50 size=512000 seq=21 sync +---- +created "000001.log" +0..1035: batch #1 +1035..1076: batch #20 +1076..513252: batch #21 + +read logNum=000001 +---- +r.NextRecord() = (rr, (000001.log: 0), ) + io.ReadAll(rr) = ("01000000000000000300000052fdfc072182654f163f5f0f9a621d729566c74d... <1024-byte record>", ) + BatchHeader: [seqNum=1,count=3] +r.NextRecord() = (rr, (000001.log: 1035), ) + io.ReadAll(rr) = ("140000000000000002000000408e3969c2e2cdcf233438bf1774ace7709a", ) + BatchHeader: [seqNum=20,count=2] +r.NextRecord() = (rr, (000001.log: 1076), ) + io.ReadAll(rr) = ("1500000000000000320000004f091e9a83fdeae0ec55eb233a9b5394cb3c7856... <512000-byte record>", ) + BatchHeader: [seqNum=21,count=50] +r.NextRecord() = (rr, (000001.log: 513252), EOF) + +# Add a new physical file for the same logical log, this one with a batch that +# only contains a LogData. This exercises a subtlety in which a sequence number +# is repeated. The LogData batch with zero count should be skipped. The record +# with a batch with a nonzero count and the same sequence number should be +# surfaced. + +define logNum=000001 logNameIndex=001 +batch count=2 seq=22 size=412 +batch count=0 seq=24 size=64 +batch count=1 seq=24 size=100 +---- +created "000001-001.log" +0..423: batch #22 +423..498: batch #24 +498..609: batch #24 + +read logNum=000001 +---- +r.NextRecord() = (rr, (000001.log: 0), ) + io.ReadAll(rr) = ("01000000000000000300000052fdfc072182654f163f5f0f9a621d729566c74d... <1024-byte record>", ) + BatchHeader: [seqNum=1,count=3] +r.NextRecord() = (rr, (000001.log: 1035), ) + io.ReadAll(rr) = ("140000000000000002000000408e3969c2e2cdcf233438bf1774ace7709a", ) + BatchHeader: [seqNum=20,count=2] +r.NextRecord() = (rr, (000001.log: 1076), ) + io.ReadAll(rr) = ("1500000000000000320000004f091e9a83fdeae0ec55eb233a9b5394cb3c7856... <512000-byte record>", ) + BatchHeader: [seqNum=21,count=50] +r.NextRecord() = (rr, (000001-001.log: 0), ) + io.ReadAll(rr) = ("16000000000000000200000038d0ccacfb33b57fb3d386cbe2b67a2fbdc82214... <412-byte record>", ) + BatchHeader: [seqNum=22,count=2] +r.NextRecord() = (rr, (000001-001.log: 498), ) + io.ReadAll(rr) = ("180000000000000001000000ede8f156c48faf84dd55235d19a2df01d13021fc... <100-byte record>", ) + BatchHeader: [seqNum=24,count=1] +r.NextRecord() = (rr, (000001-001.log: 609), EOF) + +# Test a recycled log file. Recycle 000001.log as 000002.log. This time, do not +# exit cleanly. This simulates a hard process exit (eg, during a fatal shutdown, +# power failure, etc). + +define logNum=000002 recycleFilename=000001.log close-unclean +batch count=10 size=100 seq=10 sync +batch count=22 size=150 seq=20 sync +batch count=1 size=64000 seq=42 sync +---- +recycled "000001.log" as "000002.log" +0..111: batch #10 +111..272: batch #20 +272..64294: batch #42 + +# Reading a recycled log file with an unclean close can result in an error at +# the tail of the file; eg, "invalid chunk." This is okay and ignored by Open as +# long as the WAL is the most recent one. Older WALs are requried to have "clean +# closes". + +read logNum=000002 +---- +r.NextRecord() = (rr, (000002.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a0000007debf464698cb1084cb57d385b0d7eafbf01e41c... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (000002.log: 111), ) + io.ReadAll(rr) = ("1400000000000000160000006db5b2c70b9dab36e3d61260d8578c9edc4fbc70... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +r.NextRecord() = (rr, (000002.log: 272), ) + io.ReadAll(rr) = ("2a000000000000000100000064713cda5c5a3723971819a640589926f23d6342... <64000-byte record>", ) + BatchHeader: [seqNum=42,count=1] +r.NextRecord() = (rr, (000002.log: 64294), pebble/record: invalid chunk) + +# Test a typical failure scenario. Start off with a recycled log file (000003) +# that would be on the primary device. It closes "unclean" because we're unable +# to sync the EOF trailer due to the stall. + +define logNum=000003 recycleFilename=000002.log close-unclean +batch count=10 size=100 seq=10 sync +batch count=22 size=150 seq=20 +batch count=1 size=20 seq=42 sync +---- +recycled "000002.log" as "000003.log" +0..111: batch #10 +111..272: batch #20 +272..303: batch #42 + +# Then the WAL fails over to a new physical WAL file on a new device. The last +# two batches of previous WAL are duplicated. + +define logNum=000003 logNameIndex=001 +batch count=22 size=150 seq=20 +batch count=1 size=20 seq=42 sync +batch count=3 size=80 seq=43 sync +batch count=9 size=2055 seq=46 sync +---- +created "000003-001.log" +0..161: batch #20 +161..192: batch #42 +192..283: batch #43 +283..2349: batch #46 + +# Reading the log file should transparently deduplicate the repeated batches. + +read logNum=000003 +---- +r.NextRecord() = (rr, (000003.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a0000009cb1f02ec47b54a974cdf40ffa6721505c795e7e... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (000003.log: 111), ) + io.ReadAll(rr) = ("1400000000000000160000004fc0ed1998a53d1695b93be7ef4e8e09a7d06e9f... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +r.NextRecord() = (rr, (000003.log: 272), ) + io.ReadAll(rr) = ("2a000000000000000100000019458dc5400169e5", ) + BatchHeader: [seqNum=42,count=1] +r.NextRecord() = (rr, (000003-001.log: 192), ) + io.ReadAll(rr) = ("2b00000000000000030000009cbf29476e797bac2db8bfea65bda29ea50ddbe4... <80-byte record>", ) + BatchHeader: [seqNum=43,count=3] +r.NextRecord() = (rr, (000003-001.log: 283), ) + io.ReadAll(rr) = ("2e000000000000000900000027337fa5bd626044dc5d9d08085bf4ce13bc8d00... <2055-byte record>", ) + BatchHeader: [seqNum=46,count=9] +r.NextRecord() = (rr, (000003-001.log: 2349), EOF) + +# Extend logical log file 000003 with another log file, the result of failing +# back to the original the device. This time do an "unclean" close. + +define logNum=000003 logNameIndex=002 unclean-close +batch count=3 size=80 seq=43 sync +batch count=9 size=2055 seq=46 sync +batch count=2 size=205 seq=55 sync +---- +created "000003-002.log" +0..91: batch #43 +91..2157: batch #46 +2157..2373: batch #55 + +read logNum=000003 +---- +r.NextRecord() = (rr, (000003.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a0000009cb1f02ec47b54a974cdf40ffa6721505c795e7e... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (000003.log: 111), ) + io.ReadAll(rr) = ("1400000000000000160000004fc0ed1998a53d1695b93be7ef4e8e09a7d06e9f... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +r.NextRecord() = (rr, (000003.log: 272), ) + io.ReadAll(rr) = ("2a000000000000000100000019458dc5400169e5", ) + BatchHeader: [seqNum=42,count=1] +r.NextRecord() = (rr, (000003-001.log: 192), ) + io.ReadAll(rr) = ("2b00000000000000030000009cbf29476e797bac2db8bfea65bda29ea50ddbe4... <80-byte record>", ) + BatchHeader: [seqNum=43,count=3] +r.NextRecord() = (rr, (000003-001.log: 283), ) + io.ReadAll(rr) = ("2e000000000000000900000027337fa5bd626044dc5d9d08085bf4ce13bc8d00... <2055-byte record>", ) + BatchHeader: [seqNum=46,count=9] +r.NextRecord() = (rr, (000003-002.log: 2157), ) + io.ReadAll(rr) = ("370000000000000002000000ff0710201f4008e679428b4994708a1af8507303... <205-byte record>", ) + BatchHeader: [seqNum=55,count=2] +r.NextRecord() = (rr, (000003-002.log: 2373), EOF) + +# Test reading a log file that does not exist. + +read logNum=000004 +---- +not found + +# Test a corrupted log file that encodes a record too small to be a valid batch. +# The second "batch" only has a length of 5. + +define logNum=000004 +batch count=1 seq=1 size=20 sync +batch size=5 sync +---- +created "000004.log" +0..31: batch #1 +31..47: batch #0 + +# Reading the corrupt batch should error with a corruption error. + +read logNum=000004 +---- +r.NextRecord() = (rr, (000004.log: 0), ) + io.ReadAll(rr) = ("0100000000000000010000009a1b1b9e43558bba", ) + BatchHeader: [seqNum=1,count=1] +r.NextRecord() = (rr, (000004.log: 31), pebble: corrupt log file logNum=4, logNameIndex=000: invalid batch) + +# Test a two segment log file where the second log file ends in garbage. The +# invalid chunk error of the final log file should be propagated up. + +define logNum=000005 unclean-close +batch count=1 seq=95225 size=592 sync +batch count=9 seq=95226 size=295 sync +batch count=8 seq=95235 size=2525 sync +batch count=256 seq=95243 size=2566 sync +---- +created "000005.log" +0..603: batch #95225 +603..909: batch #95226 +909..3445: batch #95235 +3445..6022: batch #95243 + +define logNum=000005 logNameIndex=001 unclean-close +batch count=2 seq=95499 size=44 sync +batch count=5 seq=95501 size=416 sync +batch count=29 seq=95506 size=199 sync +write-garbage size=353 sync +---- +created "000005-001.log" +0..55: batch #95499 +55..482: batch #95501 +482..692: batch #95506 + +read logNum=000005 +---- +r.NextRecord() = (rr, (000005.log: 0), ) + io.ReadAll(rr) = ("f97301000000000001000000ba609cd6f3a753ff06b2814e96b56fbc0e02a5c0... <592-byte record>", ) + BatchHeader: [seqNum=95225,count=1] +r.NextRecord() = (rr, (000005.log: 603), ) + io.ReadAll(rr) = ("fa7301000000000009000000a5f232151b9e00243e5737fdcf83e7ffcb495e41... <295-byte record>", ) + BatchHeader: [seqNum=95226,count=9] +r.NextRecord() = (rr, (000005.log: 909), ) + io.ReadAll(rr) = ("037401000000000008000000bc380c1c503de185910401b05527b1788900bbfd... <2525-byte record>", ) + BatchHeader: [seqNum=95235,count=8] +r.NextRecord() = (rr, (000005.log: 3445), ) + io.ReadAll(rr) = ("0b7401000000000000010000907cd29c9a6deaf239e76e3374f6e9eef047f57f... <2566-byte record>", ) + BatchHeader: [seqNum=95243,count=256] +r.NextRecord() = (rr, (000005-001.log: 0), ) + io.ReadAll(rr) = ("0b75010000000000020000006cad8a0a1461d1ec53bb834b47c6853e040ae9ce... <44-byte record>", ) + BatchHeader: [seqNum=95499,count=2] +r.NextRecord() = (rr, (000005-001.log: 55), ) + io.ReadAll(rr) = ("0d7501000000000005000000c78be2f74d28753a03854ed63e6fd0f17113688d... <416-byte record>", ) + BatchHeader: [seqNum=95501,count=5] +r.NextRecord() = (rr, (000005-001.log: 482), ) + io.ReadAll(rr) = ("12750100000000001d00000096cedf6103af61c008d9f850e63a1dfc7518b9a7... <199-byte record>", ) + BatchHeader: [seqNum=95506,count=29] +r.NextRecord() = (rr, (000005-001.log: 692), pebble/record: invalid chunk) + +# Read again, this time pretending we found a third segment with the +# logNameIndex=002. This helps exercise error conditions switching to a new +# file. + +read logNum=000005 forceLogNameIndexes=(002) +---- +r.NextRecord() = (rr, (000005.log: 0), ) + io.ReadAll(rr) = ("f97301000000000001000000ba609cd6f3a753ff06b2814e96b56fbc0e02a5c0... <592-byte record>", ) + BatchHeader: [seqNum=95225,count=1] +r.NextRecord() = (rr, (000005.log: 603), ) + io.ReadAll(rr) = ("fa7301000000000009000000a5f232151b9e00243e5737fdcf83e7ffcb495e41... <295-byte record>", ) + BatchHeader: [seqNum=95226,count=9] +r.NextRecord() = (rr, (000005.log: 909), ) + io.ReadAll(rr) = ("037401000000000008000000bc380c1c503de185910401b05527b1788900bbfd... <2525-byte record>", ) + BatchHeader: [seqNum=95235,count=8] +r.NextRecord() = (rr, (000005.log: 3445), ) + io.ReadAll(rr) = ("0b7401000000000000010000907cd29c9a6deaf239e76e3374f6e9eef047f57f... <2566-byte record>", ) + BatchHeader: [seqNum=95243,count=256] +r.NextRecord() = (rr, (000005-001.log: 0), ) + io.ReadAll(rr) = ("0b75010000000000020000006cad8a0a1461d1ec53bb834b47c6853e040ae9ce... <44-byte record>", ) + BatchHeader: [seqNum=95499,count=2] +r.NextRecord() = (rr, (000005-001.log: 55), ) + io.ReadAll(rr) = ("0d7501000000000005000000c78be2f74d28753a03854ed63e6fd0f17113688d... <416-byte record>", ) + BatchHeader: [seqNum=95501,count=5] +r.NextRecord() = (rr, (000005-001.log: 482), ) + io.ReadAll(rr) = ("12750100000000001d00000096cedf6103af61c008d9f850e63a1dfc7518b9a7... <199-byte record>", ) + BatchHeader: [seqNum=95506,count=29] +r.NextRecord() = (rr, (000005-002.log: 0), opening WAL file segment "000005-002.log": open 000005-002.log: file does not exist) + +# Test a scenario where 4 unique batches are split across three physical log +# files. The first log contains (b0, b1, b2), the second log (b1) and the third +# log (b1, b2, b3). + +define logNum=000006 unclean-close +batch count=3 seq=535 size=395 sync +batch count=2 seq=538 size=93666 sync +batch count=1 seq=540 size=180 sync +---- +created "000006.log" +0..406: batch #535 +406..94105: batch #538 +94105..94296: batch #540 + +define logNum=000006 logNameIndex=001 unclean-close +batch count=2 seq=538 size=93666 sync +---- +created "000006-001.log" +0..93699: batch #538 + + +define logNum=000006 logNameIndex=001 unclean-close +batch count=2 seq=538 size=93666 sync +batch count=1 seq=540 size=180 sync +batch count=5 seq=541 size=2055 sync +---- +created "000006-001.log" +0..93699: batch #538 +93699..93890: batch #540 +93890..95956: batch #541 + +read logNum=000006 +---- +r.NextRecord() = (rr, (000006.log: 0), ) + io.ReadAll(rr) = ("170200000000000003000000cee7fc8a6db4d4cd39d0790e02d05d1c062f56fe... <395-byte record>", ) + BatchHeader: [seqNum=535,count=3] +r.NextRecord() = (rr, (000006.log: 406), ) + io.ReadAll(rr) = ("1a02000000000000020000000dc70d396bcbc5fc05541c74c95cdaec1232c110... <93666-byte record>", ) + BatchHeader: [seqNum=538,count=2] +r.NextRecord() = (rr, (000006.log: 94105), ) + io.ReadAll(rr) = ("1c0200000000000001000000404841433f5369713ee90d8f86c50c5903fa38e9... <180-byte record>", ) + BatchHeader: [seqNum=540,count=1] +r.NextRecord() = (rr, (000006-001.log: 93890), ) + io.ReadAll(rr) = ("1d0200000000000005000000b68c7a260135dce1ce5c5498550793d15edfae62... <2055-byte record>", ) + BatchHeader: [seqNum=541,count=5] +r.NextRecord() = (rr, (000006-001.log: 95956), EOF)