Skip to content

Commit

Permalink
batchrepr: new package
Browse files Browse the repository at this point in the history
Move some of the logic related to the batch representation to a new batchrepr
package. For now, this is primarily the BatchReader type and a few very small
facilities around the Header for now. Future work may move additional logic
related to writing new batch mutations and sorted iteration over a batch's
contents, assuming there's no impact on performance.

This move is motivated by cockroachdb#3230. The planned wal package will need to inspect
batch sequence numbers for deduplication when reconstructing the logical
contents of a virtual WAL. Moving the logic outside the pebble package avoids
duplicating the logic.
  • Loading branch information
jbowens committed Jan 18, 2024
1 parent 4e6dda4 commit 826bf6a
Show file tree
Hide file tree
Showing 11 changed files with 437 additions and 204 deletions.
157 changes: 33 additions & 124 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/batchskl"
"github.com/cockroachdb/pebble/internal/humanize"
Expand All @@ -29,8 +30,6 @@ import (
)

const (
batchCountOffset = 8
batchHeaderLen = 12
batchInitialSize = 1 << 10 // 1 KB
batchMaxRetainedSize = 1 << 20 // 1 MB
invalidBatchCount = 1<<32 - 1
Expand All @@ -42,7 +41,7 @@ const (
var ErrNotIndexed = errors.New("pebble: batch not indexed")

// ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
var ErrInvalidBatch = batchrepr.ErrInvalidBatch

// ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
Expand Down Expand Up @@ -452,7 +451,7 @@ func (b *Batch) release() {

func (b *Batch) refreshMemTableSize() error {
b.memTableSize = 0
if len(b.data) < batchHeaderLen {
if len(b.data) < batchrepr.HeaderLen {
return nil
}

Expand Down Expand Up @@ -498,23 +497,23 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
if len(batch.data) == 0 {
return nil
}
if len(batch.data) < batchHeaderLen {
if len(batch.data) < batchrepr.HeaderLen {
return ErrInvalidBatch
}

offset := len(b.data)
if offset == 0 {
b.init(offset)
offset = batchHeaderLen
offset = batchrepr.HeaderLen
}
b.data = append(b.data, batch.data[batchHeaderLen:]...)
b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)

b.setCount(b.Count() + batch.Count())

if b.db != nil || b.index != nil {
// Only iterate over the new entries if we need to track memTableSize or in
// order to update the index.
for iter := BatchReader(b.data[offset:]); len(iter) > 0; {
for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
kind, key, value, ok, err := iter.Next()
if !ok {
Expand Down Expand Up @@ -580,7 +579,7 @@ func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind Interna
panic("pebble: batch already committing")
}
if len(b.data) == 0 {
b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen)
b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
}
b.count++
b.memTableSize += memTableEntrySize(keyLen, valueLen)
Expand Down Expand Up @@ -632,7 +631,7 @@ func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
panic("pebble: batch already committing")
}
if len(b.data) == 0 {
b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen)
b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
}
b.count++
b.memTableSize += memTableEntrySize(keyLen, 0)
Expand Down Expand Up @@ -1101,37 +1100,35 @@ func (b *Batch) ingestSST(fileNum base.FileNum) {

// Empty returns true if the batch is empty, and false otherwise.
func (b *Batch) Empty() bool {
return len(b.data) <= batchHeaderLen
return batchrepr.IsEmpty(b.data)
}

// Len returns the current size of the batch in bytes.
func (b *Batch) Len() int {
if len(b.data) <= batchHeaderLen {
return batchHeaderLen
}
return len(b.data)
return max(batchrepr.HeaderLen, len(b.data))
}

// Repr returns the underlying batch representation. It is not safe to modify
// the contents. Reset() will not change the contents of the returned value,
// though any other mutation operation may do so.
func (b *Batch) Repr() []byte {
if len(b.data) == 0 {
b.init(batchHeaderLen)
b.init(batchrepr.HeaderLen)
}
binary.LittleEndian.PutUint32(b.countData(), b.Count())
batchrepr.SetCount(b.data, b.Count())
return b.data
}

// SetRepr sets the underlying batch representation. The batch takes ownership
// of the supplied slice. It is not safe to modify it afterwards until the
// Batch is no longer in use.
func (b *Batch) SetRepr(data []byte) error {
if len(data) < batchHeaderLen {
h, ok := batchrepr.ReadHeader(data)
if !ok {
return base.CorruptionErrorf("invalid batch")
}
b.data = data
b.count = uint64(binary.LittleEndian.Uint32(b.countData()))
b.count = uint64(h.Count)
var err error
if b.db != nil {
// Only track memTableSize for batches that will be committed to the DB.
Expand Down Expand Up @@ -1391,9 +1388,9 @@ func (b *Batch) init(size int) {
n *= 2
}
if cap(b.data) < n {
b.data = rawalloc.New(batchHeaderLen, n)
b.data = rawalloc.New(batchrepr.HeaderLen, n)
}
b.data = b.data[:batchHeaderLen]
b.data = b.data[:batchrepr.HeaderLen]
clear(b.data) // Zero the sequence number in the header
}

Expand Down Expand Up @@ -1423,7 +1420,7 @@ func (b *Batch) Reset() {
b.data = nil
} else {
// Otherwise, reset the buffer for re-use.
b.data = b.data[:batchHeaderLen]
b.data = b.data[:batchrepr.HeaderLen]
clear(b.data)
}
}
Expand All @@ -1432,18 +1429,6 @@ func (b *Batch) Reset() {
}
}

// seqNumData returns the 8 byte little-endian sequence number. Zero means that
// the batch has not yet been applied.
func (b *Batch) seqNumData() []byte {
return b.data[:8]
}

// countData returns the 4 byte little-endian count data. "\xff\xff\xff\xff"
// means that the batch is invalid.
func (b *Batch) countData() []byte {
return b.data[8:12]
}

func (b *Batch) grow(n int) {
newSize := len(b.data) + n
if uint64(newSize) >= maxBatchSize {
Expand All @@ -1462,17 +1447,17 @@ func (b *Batch) grow(n int) {
}

func (b *Batch) setSeqNum(seqNum uint64) {
binary.LittleEndian.PutUint64(b.seqNumData(), seqNum)
batchrepr.SetSeqNum(b.data, seqNum)
}

// SeqNum returns the batch sequence number which is applied to the first
// record in the batch. The sequence number is incremented for each subsequent
// record. It returns zero if the batch is empty.
func (b *Batch) SeqNum() uint64 {
if len(b.data) == 0 {
b.init(batchHeaderLen)
b.init(batchrepr.HeaderLen)
}
return binary.LittleEndian.Uint64(b.seqNumData())
return batchrepr.ReadSeqNum(b.data)
}

func (b *Batch) setCount(v uint32) {
Expand All @@ -1485,52 +1470,18 @@ func (b *Batch) setCount(v uint32) {
// batch isn't applied to the memtable.
func (b *Batch) Count() uint32 {
if b.count > math.MaxUint32 {
panic(ErrInvalidBatch)
panic(batchrepr.ErrInvalidBatch)
}
return uint32(b.count)
}

// Reader returns a BatchReader for the current batch contents. If the batch is
// mutated, the new entries will not be visible to the reader.
func (b *Batch) Reader() BatchReader {
// Reader returns a batchrepr.Reader for the current batch contents. If the
// batch is mutated, the new entries will not be visible to the reader.
func (b *Batch) Reader() batchrepr.Reader {
if len(b.data) == 0 {
b.init(batchHeaderLen)
}
return b.data[batchHeaderLen:]
}

func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
// TODO(jackson): This will index out of bounds if there's no varint or an
// invalid varint (eg, a single 0xff byte). Correcting will add a bit of
// overhead. We could avoid that overhead whenever len(data) >=
// binary.MaxVarint32?

var v uint32
var n int
ptr := unsafe.Pointer(&data[0])
if a := *((*uint8)(ptr)); a < 128 {
v = uint32(a)
n = 1
} else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
v = uint32(b)<<7 | uint32(a)
n = 2
} else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 3
} else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 4
} else {
d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 5
b.init(batchrepr.HeaderLen)
}

data = data[n:]
if v > uint32(len(data)) {
return nil, nil, false
}
return data[v:], data[:v], true
return batchrepr.Read(b.data)
}

// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
Expand All @@ -1553,48 +1504,6 @@ func (b *Batch) CommitStats() BatchCommitStats {
return b.commitStats
}

// BatchReader iterates over the entries contained in a batch.
type BatchReader []byte

// ReadBatch constructs a BatchReader from a batch representation. The
// header is not validated. ReadBatch returns a new batch reader and the
// count of entries contained within the batch.
func ReadBatch(repr []byte) (r BatchReader, count uint32) {
if len(repr) <= batchHeaderLen {
return nil, count
}
count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen])
return repr[batchHeaderLen:], count
}

// Next returns the next entry in this batch, if there is one. If the reader has
// reached the end of the batch, Next returns ok=false and a nil error. If the
// batch is corrupt and the next entry is illegible, Next returns ok=false and a
// non-nil error.
func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
if len(*r) == 0 {
return 0, nil, nil, false, nil
}
kind = InternalKeyKind((*r)[0])
if kind > InternalKeyKindMax {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
}
*r, ukey, ok = batchDecodeStr((*r)[1:])
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
}
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
*r, value, ok = batchDecodeStr(*r)
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
}
}
return kind, ukey, value, true, nil
}

// Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
// two in sync.
type batchIter struct {
Expand Down Expand Up @@ -1723,7 +1632,7 @@ func (i *batchIter) value() []byte {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
_, value, ok := batchDecodeStr(data[keyEnd:])
_, value, ok := batchrepr.DecodeStr(data[keyEnd:])
if !ok {
return nil
}
Expand Down Expand Up @@ -1812,10 +1721,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error
}
var rangeDelOffsets []flushableBatchEntry
var rangeKeyOffsets []flushableBatchEntry
if len(b.data) > batchHeaderLen {
if len(b.data) > batchrepr.HeaderLen {
// Non-empty batch.
var index uint32
for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ {
for iter := batchrepr.Read(b.data); len(iter) > 0; index++ {
offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
kind, key, _, ok, err := iter.Next()
if !ok {
Expand Down Expand Up @@ -1990,7 +1899,7 @@ func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0

// inuseBytes is part of the flushable interface.
func (b *flushableBatch) inuseBytes() uint64 {
return uint64(len(b.data) - batchHeaderLen)
return uint64(len(b.data) - batchrepr.HeaderLen)
}

// totalBytes is part of the flushable interface.
Expand Down Expand Up @@ -2191,7 +2100,7 @@ func (i *flushableBatchIter) value() base.LazyValue {
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
keyEnd := i.offsets[i.index].keyEnd
_, value, ok = batchDecodeStr(i.data[keyEnd:])
_, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
if !ok {
i.err = base.CorruptionErrorf("corrupted batch")
return base.LazyValue{}
Expand Down
Loading

0 comments on commit 826bf6a

Please sign in to comment.