From 89c3e2959a6e61326913bc1c85e9d1cc0a11e2a7 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 18 Jan 2024 11:21:17 -0500 Subject: [PATCH] batchrepr: new package Move some of the logic related to the batch representation to a new batchrepr package. For now, this is mostly just the BatchReader type. Future work may move additional logic related to writing new batch mutations and sorted iteration over a batch's contents, assuming there's no impact on performance. This move is motivated by #3230. The planned wal package will need to inspect batch sequence numbers for deduplication when reconstructing the logical contents of a virtual WAL. Moving the logic outside the pebble package avoids duplicating the logic. --- batch.go | 135 ++++-------------- batch_test.go | 59 +------- batchrepr/reader.go | 122 ++++++++++++++++ batchrepr/reader_test.go | 62 ++++++++ .../batch_reader => batchrepr/testdata/reader | 22 +-- commit.go | 3 +- open.go | 3 +- replay/replay_test.go | 3 +- 8 files changed, 236 insertions(+), 173 deletions(-) create mode 100644 batchrepr/reader.go create mode 100644 batchrepr/reader_test.go rename testdata/batch_reader => batchrepr/testdata/reader (85%) diff --git a/batch.go b/batch.go index 29686a6f705..4640cf1d6bf 100644 --- a/batch.go +++ b/batch.go @@ -17,6 +17,7 @@ import ( "unsafe" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/humanize" @@ -29,8 +30,6 @@ import ( ) const ( - batchCountOffset = 8 - batchHeaderLen = 12 batchInitialSize = 1 << 10 // 1 KB batchMaxRetainedSize = 1 << 20 // 1 MB invalidBatchCount = 1<<32 - 1 @@ -42,7 +41,7 @@ const ( var ErrNotIndexed = errors.New("pebble: batch not indexed") // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted. -var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch")) +var ErrInvalidBatch = batchrepr.ErrInvalidBatch // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted. var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize))) @@ -452,7 +451,7 @@ func (b *Batch) release() { func (b *Batch) refreshMemTableSize() error { b.memTableSize = 0 - if len(b.data) < batchHeaderLen { + if len(b.data) < batchrepr.HeaderLen { return nil } @@ -498,23 +497,23 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { if len(batch.data) == 0 { return nil } - if len(batch.data) < batchHeaderLen { + if len(batch.data) < batchrepr.HeaderLen { return ErrInvalidBatch } offset := len(b.data) if offset == 0 { b.init(offset) - offset = batchHeaderLen + offset = batchrepr.HeaderLen } - b.data = append(b.data, batch.data[batchHeaderLen:]...) + b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...) b.setCount(b.Count() + batch.Count()) if b.db != nil || b.index != nil { // Only iterate over the new entries if we need to track memTableSize or in // order to update the index. - for iter := BatchReader(b.data[offset:]); len(iter) > 0; { + for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; { offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0])) kind, key, value, ok, err := iter.Next() if !ok { @@ -580,7 +579,7 @@ func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind Interna panic("pebble: batch already committing") } if len(b.data) == 0 { - b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen) + b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen) } b.count++ b.memTableSize += memTableEntrySize(keyLen, valueLen) @@ -632,7 +631,7 @@ func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) { panic("pebble: batch already committing") } if len(b.data) == 0 { - b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen) + b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen) } b.count++ b.memTableSize += memTableEntrySize(keyLen, 0) @@ -1101,13 +1100,13 @@ func (b *Batch) ingestSST(fileNum base.FileNum) { // Empty returns true if the batch is empty, and false otherwise. func (b *Batch) Empty() bool { - return len(b.data) <= batchHeaderLen + return len(b.data) <= batchrepr.HeaderLen } // Len returns the current size of the batch in bytes. func (b *Batch) Len() int { - if len(b.data) <= batchHeaderLen { - return batchHeaderLen + if len(b.data) <= batchrepr.HeaderLen { + return batchrepr.HeaderLen } return len(b.data) } @@ -1117,7 +1116,7 @@ func (b *Batch) Len() int { // though any other mutation operation may do so. func (b *Batch) Repr() []byte { if len(b.data) == 0 { - b.init(batchHeaderLen) + b.init(batchrepr.HeaderLen) } binary.LittleEndian.PutUint32(b.countData(), b.Count()) return b.data @@ -1127,7 +1126,7 @@ func (b *Batch) Repr() []byte { // of the supplied slice. It is not safe to modify it afterwards until the // Batch is no longer in use. func (b *Batch) SetRepr(data []byte) error { - if len(data) < batchHeaderLen { + if len(data) < batchrepr.HeaderLen { return base.CorruptionErrorf("invalid batch") } b.data = data @@ -1391,9 +1390,9 @@ func (b *Batch) init(size int) { n *= 2 } if cap(b.data) < n { - b.data = rawalloc.New(batchHeaderLen, n) + b.data = rawalloc.New(batchrepr.HeaderLen, n) } - b.data = b.data[:batchHeaderLen] + b.data = b.data[:batchrepr.HeaderLen] clear(b.data) // Zero the sequence number in the header } @@ -1423,7 +1422,7 @@ func (b *Batch) Reset() { b.data = nil } else { // Otherwise, reset the buffer for re-use. - b.data = b.data[:batchHeaderLen] + b.data = b.data[:batchrepr.HeaderLen] clear(b.data) } } @@ -1470,7 +1469,7 @@ func (b *Batch) setSeqNum(seqNum uint64) { // record. It returns zero if the batch is empty. func (b *Batch) SeqNum() uint64 { if len(b.data) == 0 { - b.init(batchHeaderLen) + b.init(batchrepr.HeaderLen) } return binary.LittleEndian.Uint64(b.seqNumData()) } @@ -1485,52 +1484,18 @@ func (b *Batch) setCount(v uint32) { // batch isn't applied to the memtable. func (b *Batch) Count() uint32 { if b.count > math.MaxUint32 { - panic(ErrInvalidBatch) + panic(batchrepr.ErrInvalidBatch) } return uint32(b.count) } -// Reader returns a BatchReader for the current batch contents. If the batch is -// mutated, the new entries will not be visible to the reader. -func (b *Batch) Reader() BatchReader { +// Reader returns a batchrepr.Reader for the current batch contents. If the +// batch is mutated, the new entries will not be visible to the reader. +func (b *Batch) Reader() batchrepr.Reader { if len(b.data) == 0 { - b.init(batchHeaderLen) - } - return b.data[batchHeaderLen:] -} - -func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { - // TODO(jackson): This will index out of bounds if there's no varint or an - // invalid varint (eg, a single 0xff byte). Correcting will add a bit of - // overhead. We could avoid that overhead whenever len(data) >= - // binary.MaxVarint32? - - var v uint32 - var n int - ptr := unsafe.Pointer(&data[0]) - if a := *((*uint8)(ptr)); a < 128 { - v = uint32(a) - n = 1 - } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 { - v = uint32(b)<<7 | uint32(a) - n = 2 - } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 { - v = uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 3 - } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 { - v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 4 - } else { - d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4))) - v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 5 - } - - data = data[n:] - if v > uint32(len(data)) { - return nil, nil, false + b.init(batchrepr.HeaderLen) } - return data[v:], data[:v], true + return b.data[batchrepr.HeaderLen:] } // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. @@ -1553,48 +1518,6 @@ func (b *Batch) CommitStats() BatchCommitStats { return b.commitStats } -// BatchReader iterates over the entries contained in a batch. -type BatchReader []byte - -// ReadBatch constructs a BatchReader from a batch representation. The -// header is not validated. ReadBatch returns a new batch reader and the -// count of entries contained within the batch. -func ReadBatch(repr []byte) (r BatchReader, count uint32) { - if len(repr) <= batchHeaderLen { - return nil, count - } - count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen]) - return repr[batchHeaderLen:], count -} - -// Next returns the next entry in this batch, if there is one. If the reader has -// reached the end of the batch, Next returns ok=false and a nil error. If the -// batch is corrupt and the next entry is illegible, Next returns ok=false and a -// non-nil error. -func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) { - if len(*r) == 0 { - return 0, nil, nil, false, nil - } - kind = InternalKeyKind((*r)[0]) - if kind > InternalKeyKindMax { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0]) - } - *r, ukey, ok = batchDecodeStr((*r)[1:]) - if !ok { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key") - } - switch kind { - case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete, - InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, - InternalKeyKindDeleteSized: - *r, value, ok = batchDecodeStr(*r) - if !ok { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) - } - } - return kind, ukey, value, true, nil -} - // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the // two in sync. type batchIter struct { @@ -1723,7 +1646,7 @@ func (i *batchIter) value() []byte { case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete, InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, InternalKeyKindDeleteSized: - _, value, ok := batchDecodeStr(data[keyEnd:]) + _, value, ok := batchrepr.DecodeStr(data[keyEnd:]) if !ok { return nil } @@ -1812,10 +1735,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error } var rangeDelOffsets []flushableBatchEntry var rangeKeyOffsets []flushableBatchEntry - if len(b.data) > batchHeaderLen { + if len(b.data) > batchrepr.HeaderLen { // Non-empty batch. var index uint32 - for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ { + for iter := batchrepr.Reader(b.data[batchrepr.HeaderLen:]); len(iter) > 0; index++ { offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0])) kind, key, _, ok, err := iter.Next() if !ok { @@ -1990,7 +1913,7 @@ func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 // inuseBytes is part of the flushable interface. func (b *flushableBatch) inuseBytes() uint64 { - return uint64(len(b.data) - batchHeaderLen) + return uint64(len(b.data) - batchrepr.HeaderLen) } // totalBytes is part of the flushable interface. @@ -2191,7 +2114,7 @@ func (i *flushableBatchIter) value() base.LazyValue { InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, InternalKeyKindDeleteSized: keyEnd := i.offsets[i.index].keyEnd - _, value, ok = batchDecodeStr(i.data[keyEnd:]) + _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:]) if !ok { i.err = base.CorruptionErrorf("corrupted batch") return base.LazyValue{} diff --git a/batch_test.go b/batch_test.go index c9778742fec..7a7543b96aa 100644 --- a/batch_test.go +++ b/batch_test.go @@ -8,7 +8,6 @@ import ( "bytes" "context" "encoding/binary" - "encoding/hex" "fmt" "io" "math" @@ -18,10 +17,10 @@ import ( "sync" "testing" "time" - "unicode" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/itertest" @@ -239,7 +238,7 @@ func TestBatchLen(t *testing.T) { require.Equal(t, size, len(b.Repr())) } - requireLenAndReprEq(batchHeaderLen) + requireLenAndReprEq(batchrepr.HeaderLen) key := "test-key" value := "test-value" @@ -398,7 +397,7 @@ func TestBatchReset(t *testing.T) { require.Equal(t, uint32(0), b.Count()) require.Equal(t, uint64(0), b.countRangeDels) require.Equal(t, uint64(0), b.countRangeKeys) - require.Equal(t, batchHeaderLen, len(b.data)) + require.Equal(t, batchrepr.HeaderLen, len(b.data)) require.Equal(t, uint64(0), b.SeqNum()) require.Equal(t, uint64(0), b.memTableSize) require.Equal(t, FormatMajorVersion(0x00), b.minimumFormatMajorVersion) @@ -716,7 +715,7 @@ func TestBatchIncrement(t *testing.T) { 0xfffffffe, } for _, tc := range testCases { - var buf [batchHeaderLen]byte + var buf [batchrepr.HeaderLen]byte binary.LittleEndian.PutUint32(buf[8:12], tc) var b Batch b.SetRepr(buf[:]) @@ -726,7 +725,7 @@ func TestBatchIncrement(t *testing.T) { if got != want { t.Errorf("input=%d: got %d, want %d", tc, got, want) } - _, count := ReadBatch(b.Repr()) + _, count := batchrepr.Read(b.Repr()) if got != want { t.Errorf("input=%d: got %d, want %d", tc, count, want) } @@ -740,7 +739,7 @@ func TestBatchIncrement(t *testing.T) { } } }() - var buf [batchHeaderLen]byte + var buf [batchrepr.HeaderLen]byte binary.LittleEndian.PutUint32(buf[8:12], 0xffffffff) var b Batch b.SetRepr(buf[:]) @@ -1367,52 +1366,6 @@ func TestBatchCommitStats(t *testing.T) { require.NoError(t, err) } -func TestBatchReader(t *testing.T) { - datadriven.RunTest(t, "testdata/batch_reader", func(t *testing.T, td *datadriven.TestData) string { - switch td.Cmd { - case "scan": - var repr bytes.Buffer - for i, l := range strings.Split(td.Input, "\n") { - // Remove any trailing comments behind #. - if i := strings.IndexRune(l, '#'); i >= 0 { - l = l[:i] - } - // Strip all whitespace from the line. - l = strings.Map(func(r rune) rune { - if unicode.IsSpace(r) { - return -1 - } - return r - }, l) - b, err := hex.DecodeString(l) - if err != nil { - return fmt.Sprintf("failed to decode hex; line %d", i) - } - repr.Write(b) - } - r, count := ReadBatch(repr.Bytes()) - var out strings.Builder - fmt.Fprintf(&out, "Count: %d\n", count) - for { - kind, ukey, value, ok, err := r.Next() - if !ok { - if err != nil { - fmt.Fprintf(&out, "err: %s\n", err) - } else { - fmt.Fprint(&out, "eof") - } - break - } - fmt.Fprintf(&out, "%s: %q: %q\n", kind, ukey, value) - } - return out.String() - - default: - return fmt.Sprintf("unrecognized command %q", td.Cmd) - } - }) -} - func BenchmarkBatchSet(b *testing.B) { value := make([]byte, 10) for i := range value { diff --git a/batchrepr/reader.go b/batchrepr/reader.go new file mode 100644 index 00000000000..b2eeb5aa391 --- /dev/null +++ b/batchrepr/reader.go @@ -0,0 +1,122 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// Package batchrepr provides interfaces for reading the binary batch +// representation. This batch representation is used in-memory while +// constructing a batch and on-disk within the write-ahead log. +package batchrepr + +import ( + "encoding/binary" + "fmt" + "unsafe" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/pkg/errors" +) + +// ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted. +var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch")) + +const ( + // HeaderLen is the length of the batch header in bytes. + HeaderLen = 12 + countOffset = 8 +) + +// Header describes the contents of a batch header. +type Header struct { + // SeqNum is the sequence number at which the batch is committed. A batch + // that has not yet committed will have a zero sequence number. + SeqNum uint64 + // Count is the count of keys written to the batch. + Count uint32 +} + +// String returns a string representation of the header's contents. +func (h Header) String() string { + return fmt.Sprintf("[seqNum=%d,count=%d]", h.SeqNum, h.Count) +} + +// Read constructs a Reader from an encoded batch representation. The header is +// not validated. Read returns a new batch reader and the contents of the +// batch's header. +func Read(repr []byte) (r Reader, h Header) { + if len(repr) <= HeaderLen { + return nil, h + } + h.SeqNum = binary.LittleEndian.Uint64(repr[:countOffset]) + h.Count = binary.LittleEndian.Uint32(repr[countOffset:HeaderLen]) + return repr[HeaderLen:], h +} + +// Reader iterates over the entries contained in a batch. +type Reader []byte + +// Next returns the next entry in this batch, if there is one. If the reader has +// reached the end of the batch, Next returns ok=false and a nil error. If the +// batch is corrupt and the next entry is illegible, Next returns ok=false and a +// non-nil error. +func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, ok bool, err error) { + if len(*r) == 0 { + return 0, nil, nil, false, nil + } + kind = base.InternalKeyKind((*r)[0]) + if kind > base.InternalKeyKindMax { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0]) + } + *r, ukey, ok = DecodeStr((*r)[1:]) + if !ok { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key") + } + switch kind { + case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete, + base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete, + base.InternalKeyKindDeleteSized: + *r, value, ok = DecodeStr(*r) + if !ok { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) + } + } + return kind, ukey, value, true, nil +} + +// DecodeStr decodes a varint encoded string from data, returning the remainder +// of data and the decoded string. It returns ok=false if the varint is invalid. +// +// TODO(jackson): This should be unexported once pebble package callers have +// been updated to use appropriate abstractions. +func DecodeStr(data []byte) (odata []byte, s []byte, ok bool) { + // TODO(jackson): This will index out of bounds if there's no varint or an + // invalid varint (eg, a single 0xff byte). Correcting will add a bit of + // overhead. We could avoid that overhead whenever len(data) >= + // binary.MaxVarint32? + + var v uint32 + var n int + ptr := unsafe.Pointer(&data[0]) + if a := *((*uint8)(ptr)); a < 128 { + v = uint32(a) + n = 1 + } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 { + v = uint32(b)<<7 | uint32(a) + n = 2 + } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 { + v = uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 3 + } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 { + v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 4 + } else { + d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4))) + v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 5 + } + + data = data[n:] + if v > uint32(len(data)) { + return nil, nil, false + } + return data[v:], data[:v], true +} diff --git a/batchrepr/reader_test.go b/batchrepr/reader_test.go new file mode 100644 index 00000000000..8fe4461638a --- /dev/null +++ b/batchrepr/reader_test.go @@ -0,0 +1,62 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package batchrepr + +import ( + "bytes" + "encoding/hex" + "fmt" + "strings" + "testing" + "unicode" + + "github.com/cockroachdb/datadriven" +) + +func TestReader(t *testing.T) { + datadriven.RunTest(t, "testdata/reader", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "scan": + var repr bytes.Buffer + for i, l := range strings.Split(td.Input, "\n") { + // Remove any trailing comments behind #. + if i := strings.IndexRune(l, '#'); i >= 0 { + l = l[:i] + } + // Strip all whitespace from the line. + l = strings.Map(func(r rune) rune { + if unicode.IsSpace(r) { + return -1 + } + return r + }, l) + b, err := hex.DecodeString(l) + if err != nil { + return fmt.Sprintf("failed to decode hex; line %d", i) + } + repr.Write(b) + } + r, h := Read(repr.Bytes()) + var out strings.Builder + fmt.Fprintf(&out, "Header: %s\n", h) + for { + kind, ukey, value, ok, err := r.Next() + if !ok { + if err != nil { + fmt.Fprintf(&out, "err: %s\n", err) + } else { + fmt.Fprint(&out, "eof") + } + break + } + fmt.Fprintf(&out, "%s: %q: %q\n", kind, ukey, value) + } + return out.String() + + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} diff --git a/testdata/batch_reader b/batchrepr/testdata/reader similarity index 85% rename from testdata/batch_reader rename to batchrepr/testdata/reader index f3ac59697ca..e1650adf154 100644 --- a/testdata/batch_reader +++ b/batchrepr/testdata/reader @@ -1,19 +1,19 @@ scan ---- -Count: 0 +Header: [seqNum=0,count=0] eof scan ffffffffffffffffffffffffffffffffffffffffffffffff ---- -Count: 4294967295 +Header: [seqNum=18446744073709551615,count=4294967295] err: invalid key kind 0xff: pebble: invalid batch scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 00 01 61 # DEL "a" ---- -Count: 1 +Header: [seqNum=0,count=1] DEL: "a": "" eof @@ -21,7 +21,7 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 62 01 62 # SET "b" = "b" ---- -Count: 1 +Header: [seqNum=0,count=1] SET: "b": "b" eof @@ -29,7 +29,7 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 62 01 62 # SET "b" = "b" ---- -Count: 1 +Header: [seqNum=0,count=1] SET: "b": "b" eof @@ -38,7 +38,7 @@ scan 00 01 61 # DEL "a" 01 01 62 01 62 # SET "b" = "b" ---- -Count: 2 +Header: [seqNum=0,count=2] DEL: "a": "" SET: "b": "b" eof @@ -49,7 +49,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 63 # RANGEDEL "b" = "c" ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" RANGEDEL: "b": "c" @@ -61,7 +61,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 # RANGEDEL "b"... missing end key string data ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: decoding RANGEDEL value: pebble: invalid batch @@ -72,7 +72,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 # RANGEDEL "b"... missing end key string data ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: decoding RANGEDEL value: pebble: invalid batch @@ -84,7 +84,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 1F 01 62 01 # "1F" kind is garbage ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: invalid key kind 0x1f: pebble: invalid batch @@ -93,5 +93,5 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 # SET missing user key string data ---- -Count: 1 +Header: [seqNum=0,count=1] err: decoding user key: pebble: invalid batch diff --git a/commit.go b/commit.go index 38cdbb8f193..86362ef5d4f 100644 --- a/commit.go +++ b/commit.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/record" ) @@ -370,7 +371,7 @@ func (p *commitPipeline) AllocateSeqNum( // Give the batch a count of 1 so that the log and visible sequence number // are incremented correctly. - b.data = make([]byte, batchHeaderLen) + b.data = make([]byte, batchrepr.HeaderLen) b.setCount(uint32(count)) b.commit.Add(1) diff --git a/open.go b/open.go index 9b7154fde76..d4cbbff4eea 100644 --- a/open.go +++ b/open.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/arenaskl" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" @@ -847,7 +848,7 @@ func (d *DB) replayWAL( return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL") } - if buf.Len() < batchHeaderLen { + if buf.Len() < batchrepr.HeaderLen { return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)", filename, errors.Safe(logNum)) } diff --git a/replay/replay_test.go b/replay/replay_test.go index 68186887db1..cd5b443a8cb 100644 --- a/replay/replay_test.go +++ b/replay/replay_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datatest" "github.com/cockroachdb/pebble/internal/humanize" @@ -204,7 +205,7 @@ func TestLoadFlushedSSTableKeys(t *testing.T) { return err.Error() } - br, _ := pebble.ReadBatch(b.Repr()) + br, _ := batchrepr.Read(b.Repr()) kind, ukey, v, ok, err := br.Next() for ; ok; kind, ukey, v, ok, err = br.Next() { fmt.Fprintf(&buf, "%s.%s", ukey, kind)