diff --git a/batch.go b/batch.go index 29686a6f705..e70b56d792f 100644 --- a/batch.go +++ b/batch.go @@ -17,6 +17,7 @@ import ( "unsafe" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/humanize" @@ -29,8 +30,6 @@ import ( ) const ( - batchCountOffset = 8 - batchHeaderLen = 12 batchInitialSize = 1 << 10 // 1 KB batchMaxRetainedSize = 1 << 20 // 1 MB invalidBatchCount = 1<<32 - 1 @@ -42,7 +41,7 @@ const ( var ErrNotIndexed = errors.New("pebble: batch not indexed") // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted. -var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch")) +var ErrInvalidBatch = batchrepr.ErrInvalidBatch // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted. var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize))) @@ -452,7 +451,7 @@ func (b *Batch) release() { func (b *Batch) refreshMemTableSize() error { b.memTableSize = 0 - if len(b.data) < batchHeaderLen { + if len(b.data) < batchrepr.HeaderLen { return nil } @@ -498,23 +497,23 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { if len(batch.data) == 0 { return nil } - if len(batch.data) < batchHeaderLen { + if len(batch.data) < batchrepr.HeaderLen { return ErrInvalidBatch } offset := len(b.data) if offset == 0 { b.init(offset) - offset = batchHeaderLen + offset = batchrepr.HeaderLen } - b.data = append(b.data, batch.data[batchHeaderLen:]...) + b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...) b.setCount(b.Count() + batch.Count()) if b.db != nil || b.index != nil { // Only iterate over the new entries if we need to track memTableSize or in // order to update the index. - for iter := BatchReader(b.data[offset:]); len(iter) > 0; { + for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; { offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0])) kind, key, value, ok, err := iter.Next() if !ok { @@ -580,7 +579,7 @@ func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind Interna panic("pebble: batch already committing") } if len(b.data) == 0 { - b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen) + b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen) } b.count++ b.memTableSize += memTableEntrySize(keyLen, valueLen) @@ -632,7 +631,7 @@ func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) { panic("pebble: batch already committing") } if len(b.data) == 0 { - b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen) + b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen) } b.count++ b.memTableSize += memTableEntrySize(keyLen, 0) @@ -1101,15 +1100,12 @@ func (b *Batch) ingestSST(fileNum base.FileNum) { // Empty returns true if the batch is empty, and false otherwise. func (b *Batch) Empty() bool { - return len(b.data) <= batchHeaderLen + return batchrepr.IsEmpty(b.data) } // Len returns the current size of the batch in bytes. func (b *Batch) Len() int { - if len(b.data) <= batchHeaderLen { - return batchHeaderLen - } - return len(b.data) + return max(batchrepr.HeaderLen, len(b.data)) } // Repr returns the underlying batch representation. It is not safe to modify @@ -1117,9 +1113,9 @@ func (b *Batch) Len() int { // though any other mutation operation may do so. func (b *Batch) Repr() []byte { if len(b.data) == 0 { - b.init(batchHeaderLen) + b.init(batchrepr.HeaderLen) } - binary.LittleEndian.PutUint32(b.countData(), b.Count()) + batchrepr.SetCount(b.data, b.Count()) return b.data } @@ -1127,11 +1123,12 @@ func (b *Batch) Repr() []byte { // of the supplied slice. It is not safe to modify it afterwards until the // Batch is no longer in use. func (b *Batch) SetRepr(data []byte) error { - if len(data) < batchHeaderLen { + h, ok := batchrepr.ReadHeader(data) + if !ok { return base.CorruptionErrorf("invalid batch") } b.data = data - b.count = uint64(binary.LittleEndian.Uint32(b.countData())) + b.count = uint64(h.Count) var err error if b.db != nil { // Only track memTableSize for batches that will be committed to the DB. @@ -1391,9 +1388,9 @@ func (b *Batch) init(size int) { n *= 2 } if cap(b.data) < n { - b.data = rawalloc.New(batchHeaderLen, n) + b.data = rawalloc.New(batchrepr.HeaderLen, n) } - b.data = b.data[:batchHeaderLen] + b.data = b.data[:batchrepr.HeaderLen] clear(b.data) // Zero the sequence number in the header } @@ -1423,7 +1420,7 @@ func (b *Batch) Reset() { b.data = nil } else { // Otherwise, reset the buffer for re-use. - b.data = b.data[:batchHeaderLen] + b.data = b.data[:batchrepr.HeaderLen] clear(b.data) } } @@ -1432,18 +1429,6 @@ func (b *Batch) Reset() { } } -// seqNumData returns the 8 byte little-endian sequence number. Zero means that -// the batch has not yet been applied. -func (b *Batch) seqNumData() []byte { - return b.data[:8] -} - -// countData returns the 4 byte little-endian count data. "\xff\xff\xff\xff" -// means that the batch is invalid. -func (b *Batch) countData() []byte { - return b.data[8:12] -} - func (b *Batch) grow(n int) { newSize := len(b.data) + n if uint64(newSize) >= maxBatchSize { @@ -1462,7 +1447,7 @@ func (b *Batch) grow(n int) { } func (b *Batch) setSeqNum(seqNum uint64) { - binary.LittleEndian.PutUint64(b.seqNumData(), seqNum) + batchrepr.SetSeqNum(b.data, seqNum) } // SeqNum returns the batch sequence number which is applied to the first @@ -1470,9 +1455,9 @@ func (b *Batch) setSeqNum(seqNum uint64) { // record. It returns zero if the batch is empty. func (b *Batch) SeqNum() uint64 { if len(b.data) == 0 { - b.init(batchHeaderLen) + b.init(batchrepr.HeaderLen) } - return binary.LittleEndian.Uint64(b.seqNumData()) + return batchrepr.ReadSeqNum(b.data) } func (b *Batch) setCount(v uint32) { @@ -1485,52 +1470,18 @@ func (b *Batch) setCount(v uint32) { // batch isn't applied to the memtable. func (b *Batch) Count() uint32 { if b.count > math.MaxUint32 { - panic(ErrInvalidBatch) + panic(batchrepr.ErrInvalidBatch) } return uint32(b.count) } -// Reader returns a BatchReader for the current batch contents. If the batch is -// mutated, the new entries will not be visible to the reader. -func (b *Batch) Reader() BatchReader { +// Reader returns a batchrepr.Reader for the current batch contents. If the +// batch is mutated, the new entries will not be visible to the reader. +func (b *Batch) Reader() batchrepr.Reader { if len(b.data) == 0 { - b.init(batchHeaderLen) - } - return b.data[batchHeaderLen:] -} - -func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { - // TODO(jackson): This will index out of bounds if there's no varint or an - // invalid varint (eg, a single 0xff byte). Correcting will add a bit of - // overhead. We could avoid that overhead whenever len(data) >= - // binary.MaxVarint32? - - var v uint32 - var n int - ptr := unsafe.Pointer(&data[0]) - if a := *((*uint8)(ptr)); a < 128 { - v = uint32(a) - n = 1 - } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 { - v = uint32(b)<<7 | uint32(a) - n = 2 - } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 { - v = uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 3 - } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 { - v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 4 - } else { - d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4))) - v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) - n = 5 + b.init(batchrepr.HeaderLen) } - - data = data[n:] - if v > uint32(len(data)) { - return nil, nil, false - } - return data[v:], data[:v], true + return batchrepr.Read(b.data) } // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. @@ -1553,48 +1504,6 @@ func (b *Batch) CommitStats() BatchCommitStats { return b.commitStats } -// BatchReader iterates over the entries contained in a batch. -type BatchReader []byte - -// ReadBatch constructs a BatchReader from a batch representation. The -// header is not validated. ReadBatch returns a new batch reader and the -// count of entries contained within the batch. -func ReadBatch(repr []byte) (r BatchReader, count uint32) { - if len(repr) <= batchHeaderLen { - return nil, count - } - count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen]) - return repr[batchHeaderLen:], count -} - -// Next returns the next entry in this batch, if there is one. If the reader has -// reached the end of the batch, Next returns ok=false and a nil error. If the -// batch is corrupt and the next entry is illegible, Next returns ok=false and a -// non-nil error. -func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) { - if len(*r) == 0 { - return 0, nil, nil, false, nil - } - kind = InternalKeyKind((*r)[0]) - if kind > InternalKeyKindMax { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0]) - } - *r, ukey, ok = batchDecodeStr((*r)[1:]) - if !ok { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key") - } - switch kind { - case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete, - InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, - InternalKeyKindDeleteSized: - *r, value, ok = batchDecodeStr(*r) - if !ok { - return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) - } - } - return kind, ukey, value, true, nil -} - // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the // two in sync. type batchIter struct { @@ -1723,7 +1632,7 @@ func (i *batchIter) value() []byte { case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete, InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, InternalKeyKindDeleteSized: - _, value, ok := batchDecodeStr(data[keyEnd:]) + _, value, ok := batchrepr.DecodeStr(data[keyEnd:]) if !ok { return nil } @@ -1812,10 +1721,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error } var rangeDelOffsets []flushableBatchEntry var rangeKeyOffsets []flushableBatchEntry - if len(b.data) > batchHeaderLen { + if len(b.data) > batchrepr.HeaderLen { // Non-empty batch. var index uint32 - for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ { + for iter := batchrepr.Read(b.data); len(iter) > 0; index++ { offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0])) kind, key, _, ok, err := iter.Next() if !ok { @@ -1990,7 +1899,7 @@ func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 // inuseBytes is part of the flushable interface. func (b *flushableBatch) inuseBytes() uint64 { - return uint64(len(b.data) - batchHeaderLen) + return uint64(len(b.data) - batchrepr.HeaderLen) } // totalBytes is part of the flushable interface. @@ -2191,7 +2100,7 @@ func (i *flushableBatchIter) value() base.LazyValue { InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete, InternalKeyKindDeleteSized: keyEnd := i.offsets[i.index].keyEnd - _, value, ok = batchDecodeStr(i.data[keyEnd:]) + _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:]) if !ok { i.err = base.CorruptionErrorf("corrupted batch") return base.LazyValue{} diff --git a/batch_test.go b/batch_test.go index c9778742fec..0c93bbe7366 100644 --- a/batch_test.go +++ b/batch_test.go @@ -8,7 +8,6 @@ import ( "bytes" "context" "encoding/binary" - "encoding/hex" "fmt" "io" "math" @@ -18,10 +17,10 @@ import ( "sync" "testing" "time" - "unicode" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/itertest" @@ -239,7 +238,7 @@ func TestBatchLen(t *testing.T) { require.Equal(t, size, len(b.Repr())) } - requireLenAndReprEq(batchHeaderLen) + requireLenAndReprEq(batchrepr.HeaderLen) key := "test-key" value := "test-value" @@ -378,18 +377,14 @@ func TestBatchReset(t *testing.T) { // At this point b.data has not been modified since the db.NewBatch() and is // either nil or contains a byte slice of length batchHeaderLen, with a 0 // seqnum encoded in data[0:8] and an arbitrary count encoded in data[8:12]. - // The following commented code will often fail. - // count := binary.LittleEndian.Uint32(b.countData()) - // if count != 0 && count != 3 { - // t.Fatalf("count: %d", count) - // } // If we simply called b.Reset now and later used b.data to initialize // expected, the count in expected will also be arbitrary. So we fix the // count in b.data now by calling b.Repr(). This call isn't essential, since // we will call b.Repr() again, and just shows that it fixes the count in // b.data. - _ = b.Repr() - require.Equal(t, uint32(3), binary.LittleEndian.Uint32(b.countData())) + h, ok := batchrepr.ReadHeader(b.Repr()) + require.True(t, ok) + require.Equal(t, uint32(3), h.Count) b.Reset() require.Equal(t, db, b.db) @@ -398,7 +393,7 @@ func TestBatchReset(t *testing.T) { require.Equal(t, uint32(0), b.Count()) require.Equal(t, uint64(0), b.countRangeDels) require.Equal(t, uint64(0), b.countRangeKeys) - require.Equal(t, batchHeaderLen, len(b.data)) + require.Equal(t, batchrepr.HeaderLen, len(b.data)) require.Equal(t, uint64(0), b.SeqNum()) require.Equal(t, uint64(0), b.memTableSize) require.Equal(t, FormatMajorVersion(0x00), b.minimumFormatMajorVersion) @@ -716,19 +711,16 @@ func TestBatchIncrement(t *testing.T) { 0xfffffffe, } for _, tc := range testCases { - var buf [batchHeaderLen]byte + var buf [batchrepr.HeaderLen]byte binary.LittleEndian.PutUint32(buf[8:12], tc) var b Batch b.SetRepr(buf[:]) b.count++ - got := binary.LittleEndian.Uint32(b.Repr()[8:12]) want := tc + 1 - if got != want { - t.Errorf("input=%d: got %d, want %d", tc, got, want) - } - _, count := ReadBatch(b.Repr()) - if got != want { - t.Errorf("input=%d: got %d, want %d", tc, count, want) + h, ok := batchrepr.ReadHeader(b.Repr()) + require.True(t, ok) + if h.Count != want { + t.Errorf("input=%d: got %d, want %d", tc, h.Count, want) } } @@ -740,7 +732,7 @@ func TestBatchIncrement(t *testing.T) { } } }() - var buf [batchHeaderLen]byte + var buf [batchrepr.HeaderLen]byte binary.LittleEndian.PutUint32(buf[8:12], 0xffffffff) var b Batch b.SetRepr(buf[:]) @@ -1367,52 +1359,6 @@ func TestBatchCommitStats(t *testing.T) { require.NoError(t, err) } -func TestBatchReader(t *testing.T) { - datadriven.RunTest(t, "testdata/batch_reader", func(t *testing.T, td *datadriven.TestData) string { - switch td.Cmd { - case "scan": - var repr bytes.Buffer - for i, l := range strings.Split(td.Input, "\n") { - // Remove any trailing comments behind #. - if i := strings.IndexRune(l, '#'); i >= 0 { - l = l[:i] - } - // Strip all whitespace from the line. - l = strings.Map(func(r rune) rune { - if unicode.IsSpace(r) { - return -1 - } - return r - }, l) - b, err := hex.DecodeString(l) - if err != nil { - return fmt.Sprintf("failed to decode hex; line %d", i) - } - repr.Write(b) - } - r, count := ReadBatch(repr.Bytes()) - var out strings.Builder - fmt.Fprintf(&out, "Count: %d\n", count) - for { - kind, ukey, value, ok, err := r.Next() - if !ok { - if err != nil { - fmt.Fprintf(&out, "err: %s\n", err) - } else { - fmt.Fprint(&out, "eof") - } - break - } - fmt.Fprintf(&out, "%s: %q: %q\n", kind, ukey, value) - } - return out.String() - - default: - return fmt.Sprintf("unrecognized command %q", td.Cmd) - } - }) -} - func BenchmarkBatchSet(b *testing.B) { value := make([]byte, 10) for i := range value { diff --git a/batchrepr/reader.go b/batchrepr/reader.go new file mode 100644 index 00000000000..faa0ac45035 --- /dev/null +++ b/batchrepr/reader.go @@ -0,0 +1,146 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// Package batchrepr provides interfaces for reading the binary batch +// representation. This batch representation is used in-memory while +// constructing a batch and on-disk within the write-ahead log. +package batchrepr + +import ( + "encoding/binary" + "fmt" + "unsafe" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/pkg/errors" +) + +// ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted. +var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch")) + +const ( + // HeaderLen is the length of the batch header in bytes. + HeaderLen = 12 + // countOffset is the index into the batch represenation at which the + // uin32 count is, encoded as a little-endian uint32. + countOffset = 8 +) + +// IsEmpty returns true iff the batch contains zero keys. +func IsEmpty(repr []byte) bool { + return len(repr) <= HeaderLen +} + +// ReadHeader reads the contents of the batch header. If the repr is too small +// to contain a valid batch header, ReadHeader returns ok=false. +func ReadHeader(repr []byte) (h Header, ok bool) { + if len(repr) < HeaderLen { + return h, false + } + return Header{ + SeqNum: ReadSeqNum(repr), + Count: binary.LittleEndian.Uint32(repr[countOffset:HeaderLen]), + }, true +} + +// Header describes the contents of a batch header. +type Header struct { + // SeqNum is the sequence number at which the batch is committed. A batch + // that has not yet committed will have a zero sequence number. + SeqNum uint64 + // Count is the count of keys written to the batch. + Count uint32 +} + +// String returns a string representation of the header's contents. +func (h Header) String() string { + return fmt.Sprintf("[seqNum=%d,count=%d]", h.SeqNum, h.Count) +} + +// ReadSeqNum reads the sequence number encoded within the batch. ReadSeqNum +// does not validate that the repr is valid. It's exported only for very +// performance sensitive code paths that should not necessarily read the rest of +// the header as well. +func ReadSeqNum(repr []byte) uint64 { + return binary.LittleEndian.Uint64(repr[:countOffset]) +} + +// Read constructs a Reader from an encoded batch representation, ignoring the +// contents of the Header. +func Read(repr []byte) (r Reader) { + if len(repr) <= HeaderLen { + return nil + } + return repr[HeaderLen:] +} + +// Reader iterates over the entries contained in a batch. +type Reader []byte + +// Next returns the next entry in this batch, if there is one. If the reader has +// reached the end of the batch, Next returns ok=false and a nil error. If the +// batch is corrupt and the next entry is illegible, Next returns ok=false and a +// non-nil error. +func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, ok bool, err error) { + if len(*r) == 0 { + return 0, nil, nil, false, nil + } + kind = base.InternalKeyKind((*r)[0]) + if kind > base.InternalKeyKindMax { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0]) + } + *r, ukey, ok = DecodeStr((*r)[1:]) + if !ok { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key") + } + switch kind { + case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete, + base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete, + base.InternalKeyKindDeleteSized: + *r, value, ok = DecodeStr(*r) + if !ok { + return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) + } + } + return kind, ukey, value, true, nil +} + +// DecodeStr decodes a varint encoded string from data, returning the remainder +// of data and the decoded string. It returns ok=false if the varint is invalid. +// +// TODO(jackson): This should be unexported once pebble package callers have +// been updated to use appropriate abstractions. +func DecodeStr(data []byte) (odata []byte, s []byte, ok bool) { + // TODO(jackson): This will index out of bounds if there's no varint or an + // invalid varint (eg, a single 0xff byte). Correcting will add a bit of + // overhead. We could avoid that overhead whenever len(data) >= + // binary.MaxVarint32? + + var v uint32 + var n int + ptr := unsafe.Pointer(&data[0]) + if a := *((*uint8)(ptr)); a < 128 { + v = uint32(a) + n = 1 + } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 { + v = uint32(b)<<7 | uint32(a) + n = 2 + } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 { + v = uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 3 + } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 { + v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 4 + } else { + d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4))) + v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) + n = 5 + } + + data = data[n:] + if v > uint32(len(data)) { + return nil, nil, false + } + return data[v:], data[:v], true +} diff --git a/batchrepr/reader_test.go b/batchrepr/reader_test.go new file mode 100644 index 00000000000..89db80da1d8 --- /dev/null +++ b/batchrepr/reader_test.go @@ -0,0 +1,63 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package batchrepr + +import ( + "bytes" + "encoding/hex" + "fmt" + "strings" + "testing" + "unicode" + + "github.com/cockroachdb/datadriven" +) + +func TestReader(t *testing.T) { + datadriven.RunTest(t, "testdata/reader", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "scan": + var repr bytes.Buffer + for i, l := range strings.Split(td.Input, "\n") { + // Remove any trailing comments behind #. + if i := strings.IndexRune(l, '#'); i >= 0 { + l = l[:i] + } + // Strip all whitespace from the line. + l = strings.Map(func(r rune) rune { + if unicode.IsSpace(r) { + return -1 + } + return r + }, l) + b, err := hex.DecodeString(l) + if err != nil { + return fmt.Sprintf("failed to decode hex; line %d", i) + } + repr.Write(b) + } + h, _ := ReadHeader(repr.Bytes()) + r := Read(repr.Bytes()) + var out strings.Builder + fmt.Fprintf(&out, "Header: %s\n", h) + for { + kind, ukey, value, ok, err := r.Next() + if !ok { + if err != nil { + fmt.Fprintf(&out, "err: %s\n", err) + } else { + fmt.Fprint(&out, "eof") + } + break + } + fmt.Fprintf(&out, "%s: %q: %q\n", kind, ukey, value) + } + return out.String() + + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} diff --git a/testdata/batch_reader b/batchrepr/testdata/reader similarity index 85% rename from testdata/batch_reader rename to batchrepr/testdata/reader index f3ac59697ca..e1650adf154 100644 --- a/testdata/batch_reader +++ b/batchrepr/testdata/reader @@ -1,19 +1,19 @@ scan ---- -Count: 0 +Header: [seqNum=0,count=0] eof scan ffffffffffffffffffffffffffffffffffffffffffffffff ---- -Count: 4294967295 +Header: [seqNum=18446744073709551615,count=4294967295] err: invalid key kind 0xff: pebble: invalid batch scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 00 01 61 # DEL "a" ---- -Count: 1 +Header: [seqNum=0,count=1] DEL: "a": "" eof @@ -21,7 +21,7 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 62 01 62 # SET "b" = "b" ---- -Count: 1 +Header: [seqNum=0,count=1] SET: "b": "b" eof @@ -29,7 +29,7 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 62 01 62 # SET "b" = "b" ---- -Count: 1 +Header: [seqNum=0,count=1] SET: "b": "b" eof @@ -38,7 +38,7 @@ scan 00 01 61 # DEL "a" 01 01 62 01 62 # SET "b" = "b" ---- -Count: 2 +Header: [seqNum=0,count=2] DEL: "a": "" SET: "b": "b" eof @@ -49,7 +49,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 63 # RANGEDEL "b" = "c" ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" RANGEDEL: "b": "c" @@ -61,7 +61,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 # RANGEDEL "b"... missing end key string data ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: decoding RANGEDEL value: pebble: invalid batch @@ -72,7 +72,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 0F 01 62 01 # RANGEDEL "b"... missing end key string data ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: decoding RANGEDEL value: pebble: invalid batch @@ -84,7 +84,7 @@ scan 01 01 62 01 62 # SET "b" = "b" 1F 01 62 01 # "1F" kind is garbage ---- -Count: 3 +Header: [seqNum=0,count=3] DEL: "a": "" SET: "b": "b" err: invalid key kind 0x1f: pebble: invalid batch @@ -93,5 +93,5 @@ scan 0000000000000000 01000000 # Seqnum = 0, Count = 1 01 01 # SET missing user key string data ---- -Count: 1 +Header: [seqNum=0,count=1] err: decoding user key: pebble: invalid batch diff --git a/batchrepr/writer.go b/batchrepr/writer.go new file mode 100644 index 00000000000..3069856ea36 --- /dev/null +++ b/batchrepr/writer.go @@ -0,0 +1,21 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package batchrepr + +import "encoding/binary" + +// SetSeqNum mutates the provided batch representation, storing the provided +// sequence number in its header. The provided byte slice must already be at +// least HeaderLen bytes long or else SetSeqNum will panic. +func SetSeqNum(repr []byte, seqNum uint64) { + binary.LittleEndian.PutUint64(repr[:countOffset], seqNum) +} + +// SetCount mutates the provided batch representation, storing the provided +// count in its header. The provided byte slice must already be at least +// HeaderLen bytes long or else SetCount will panic. +func SetCount(repr []byte, count uint32) { + binary.LittleEndian.PutUint32(repr[countOffset:HeaderLen], count) +} diff --git a/commit.go b/commit.go index 38cdbb8f193..86362ef5d4f 100644 --- a/commit.go +++ b/commit.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/record" ) @@ -370,7 +371,7 @@ func (p *commitPipeline) AllocateSeqNum( // Give the batch a count of 1 so that the log and visible sequence number // are incremented correctly. - b.data = make([]byte, batchHeaderLen) + b.data = make([]byte, batchrepr.HeaderLen) b.setCount(uint32(count)) b.commit.Add(1) diff --git a/open.go b/open.go index 9b7154fde76..d4cbbff4eea 100644 --- a/open.go +++ b/open.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/arenaskl" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" @@ -847,7 +848,7 @@ func (d *DB) replayWAL( return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL") } - if buf.Len() < batchHeaderLen { + if buf.Len() < batchrepr.HeaderLen { return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)", filename, errors.Safe(logNum)) } diff --git a/replay/replay_test.go b/replay/replay_test.go index 68186887db1..eb81648cfb6 100644 --- a/replay/replay_test.go +++ b/replay/replay_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/batchrepr" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datatest" "github.com/cockroachdb/pebble/internal/humanize" @@ -204,7 +205,7 @@ func TestLoadFlushedSSTableKeys(t *testing.T) { return err.Error() } - br, _ := pebble.ReadBatch(b.Repr()) + br := batchrepr.Read(b.Repr()) kind, ukey, v, ok, err := br.Next() for ; ok; kind, ukey, v, ok, err = br.Next() { fmt.Fprintf(&buf, "%s.%s", ukey, kind)