Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batchrepr: new package #3232

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 33 additions & 124 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/batchskl"
"github.com/cockroachdb/pebble/internal/humanize"
Expand All @@ -29,8 +30,6 @@ import (
)

const (
batchCountOffset = 8
batchHeaderLen = 12
batchInitialSize = 1 << 10 // 1 KB
batchMaxRetainedSize = 1 << 20 // 1 MB
invalidBatchCount = 1<<32 - 1
Expand All @@ -42,7 +41,7 @@ const (
var ErrNotIndexed = errors.New("pebble: batch not indexed")

// ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
var ErrInvalidBatch = batchrepr.ErrInvalidBatch

// ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
Expand Down Expand Up @@ -452,7 +451,7 @@ func (b *Batch) release() {

func (b *Batch) refreshMemTableSize() error {
b.memTableSize = 0
if len(b.data) < batchHeaderLen {
if len(b.data) < batchrepr.HeaderLen {
return nil
}

Expand Down Expand Up @@ -498,23 +497,23 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
if len(batch.data) == 0 {
return nil
}
if len(batch.data) < batchHeaderLen {
if len(batch.data) < batchrepr.HeaderLen {
return ErrInvalidBatch
}

offset := len(b.data)
if offset == 0 {
b.init(offset)
offset = batchHeaderLen
offset = batchrepr.HeaderLen
}
b.data = append(b.data, batch.data[batchHeaderLen:]...)
b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)

b.setCount(b.Count() + batch.Count())

if b.db != nil || b.index != nil {
// Only iterate over the new entries if we need to track memTableSize or in
// order to update the index.
for iter := BatchReader(b.data[offset:]); len(iter) > 0; {
for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
kind, key, value, ok, err := iter.Next()
if !ok {
Expand Down Expand Up @@ -580,7 +579,7 @@ func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind Interna
panic("pebble: batch already committing")
}
if len(b.data) == 0 {
b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen)
b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
}
b.count++
b.memTableSize += memTableEntrySize(keyLen, valueLen)
Expand Down Expand Up @@ -632,7 +631,7 @@ func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
panic("pebble: batch already committing")
}
if len(b.data) == 0 {
b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen)
b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
}
b.count++
b.memTableSize += memTableEntrySize(keyLen, 0)
Expand Down Expand Up @@ -1101,37 +1100,35 @@ func (b *Batch) ingestSST(fileNum base.FileNum) {

// Empty returns true if the batch is empty, and false otherwise.
func (b *Batch) Empty() bool {
return len(b.data) <= batchHeaderLen
return batchrepr.IsEmpty(b.data)
}

// Len returns the current size of the batch in bytes.
func (b *Batch) Len() int {
if len(b.data) <= batchHeaderLen {
return batchHeaderLen
}
return len(b.data)
return max(batchrepr.HeaderLen, len(b.data))
}

// Repr returns the underlying batch representation. It is not safe to modify
// the contents. Reset() will not change the contents of the returned value,
// though any other mutation operation may do so.
func (b *Batch) Repr() []byte {
if len(b.data) == 0 {
b.init(batchHeaderLen)
b.init(batchrepr.HeaderLen)
}
binary.LittleEndian.PutUint32(b.countData(), b.Count())
batchrepr.SetCount(b.data, b.Count())
return b.data
}

// SetRepr sets the underlying batch representation. The batch takes ownership
// of the supplied slice. It is not safe to modify it afterwards until the
// Batch is no longer in use.
func (b *Batch) SetRepr(data []byte) error {
if len(data) < batchHeaderLen {
h, ok := batchrepr.ReadHeader(data)
if !ok {
return base.CorruptionErrorf("invalid batch")
}
b.data = data
b.count = uint64(binary.LittleEndian.Uint32(b.countData()))
b.count = uint64(h.Count)
var err error
if b.db != nil {
// Only track memTableSize for batches that will be committed to the DB.
Expand Down Expand Up @@ -1391,9 +1388,9 @@ func (b *Batch) init(size int) {
n *= 2
}
if cap(b.data) < n {
b.data = rawalloc.New(batchHeaderLen, n)
b.data = rawalloc.New(batchrepr.HeaderLen, n)
}
b.data = b.data[:batchHeaderLen]
b.data = b.data[:batchrepr.HeaderLen]
clear(b.data) // Zero the sequence number in the header
}

Expand Down Expand Up @@ -1423,7 +1420,7 @@ func (b *Batch) Reset() {
b.data = nil
} else {
// Otherwise, reset the buffer for re-use.
b.data = b.data[:batchHeaderLen]
b.data = b.data[:batchrepr.HeaderLen]
clear(b.data)
}
}
Expand All @@ -1432,18 +1429,6 @@ func (b *Batch) Reset() {
}
}

// seqNumData returns the 8 byte little-endian sequence number. Zero means that
// the batch has not yet been applied.
func (b *Batch) seqNumData() []byte {
return b.data[:8]
}

// countData returns the 4 byte little-endian count data. "\xff\xff\xff\xff"
// means that the batch is invalid.
func (b *Batch) countData() []byte {
return b.data[8:12]
}

func (b *Batch) grow(n int) {
newSize := len(b.data) + n
if uint64(newSize) >= maxBatchSize {
Expand All @@ -1462,17 +1447,17 @@ func (b *Batch) grow(n int) {
}

func (b *Batch) setSeqNum(seqNum uint64) {
binary.LittleEndian.PutUint64(b.seqNumData(), seqNum)
batchrepr.SetSeqNum(b.data, seqNum)
}

// SeqNum returns the batch sequence number which is applied to the first
// record in the batch. The sequence number is incremented for each subsequent
// record. It returns zero if the batch is empty.
func (b *Batch) SeqNum() uint64 {
if len(b.data) == 0 {
b.init(batchHeaderLen)
b.init(batchrepr.HeaderLen)
}
return binary.LittleEndian.Uint64(b.seqNumData())
return batchrepr.ReadSeqNum(b.data)
}

func (b *Batch) setCount(v uint32) {
Expand All @@ -1485,52 +1470,18 @@ func (b *Batch) setCount(v uint32) {
// batch isn't applied to the memtable.
func (b *Batch) Count() uint32 {
if b.count > math.MaxUint32 {
panic(ErrInvalidBatch)
panic(batchrepr.ErrInvalidBatch)
}
return uint32(b.count)
}

// Reader returns a BatchReader for the current batch contents. If the batch is
// mutated, the new entries will not be visible to the reader.
func (b *Batch) Reader() BatchReader {
// Reader returns a batchrepr.Reader for the current batch contents. If the
// batch is mutated, the new entries will not be visible to the reader.
func (b *Batch) Reader() batchrepr.Reader {
if len(b.data) == 0 {
b.init(batchHeaderLen)
}
return b.data[batchHeaderLen:]
}

func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
// TODO(jackson): This will index out of bounds if there's no varint or an
// invalid varint (eg, a single 0xff byte). Correcting will add a bit of
// overhead. We could avoid that overhead whenever len(data) >=
// binary.MaxVarint32?

var v uint32
var n int
ptr := unsafe.Pointer(&data[0])
if a := *((*uint8)(ptr)); a < 128 {
v = uint32(a)
n = 1
} else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
v = uint32(b)<<7 | uint32(a)
n = 2
} else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 3
} else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 4
} else {
d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
n = 5
b.init(batchrepr.HeaderLen)
}

data = data[n:]
if v > uint32(len(data)) {
return nil, nil, false
}
return data[v:], data[:v], true
return batchrepr.Read(b.data)
}

// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
Expand All @@ -1553,48 +1504,6 @@ func (b *Batch) CommitStats() BatchCommitStats {
return b.commitStats
}

// BatchReader iterates over the entries contained in a batch.
type BatchReader []byte

// ReadBatch constructs a BatchReader from a batch representation. The
// header is not validated. ReadBatch returns a new batch reader and the
// count of entries contained within the batch.
func ReadBatch(repr []byte) (r BatchReader, count uint32) {
if len(repr) <= batchHeaderLen {
return nil, count
}
count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen])
return repr[batchHeaderLen:], count
}

// Next returns the next entry in this batch, if there is one. If the reader has
// reached the end of the batch, Next returns ok=false and a nil error. If the
// batch is corrupt and the next entry is illegible, Next returns ok=false and a
// non-nil error.
func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
if len(*r) == 0 {
return 0, nil, nil, false, nil
}
kind = InternalKeyKind((*r)[0])
if kind > InternalKeyKindMax {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
}
*r, ukey, ok = batchDecodeStr((*r)[1:])
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
}
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
*r, value, ok = batchDecodeStr(*r)
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
}
}
return kind, ukey, value, true, nil
}

// Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
// two in sync.
type batchIter struct {
Expand Down Expand Up @@ -1723,7 +1632,7 @@ func (i *batchIter) value() []byte {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
_, value, ok := batchDecodeStr(data[keyEnd:])
_, value, ok := batchrepr.DecodeStr(data[keyEnd:])
if !ok {
return nil
}
Expand Down Expand Up @@ -1812,10 +1721,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error
}
var rangeDelOffsets []flushableBatchEntry
var rangeKeyOffsets []flushableBatchEntry
if len(b.data) > batchHeaderLen {
if len(b.data) > batchrepr.HeaderLen {
// Non-empty batch.
var index uint32
for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ {
for iter := batchrepr.Read(b.data); len(iter) > 0; index++ {
offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
kind, key, _, ok, err := iter.Next()
if !ok {
Expand Down Expand Up @@ -1990,7 +1899,7 @@ func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0

// inuseBytes is part of the flushable interface.
func (b *flushableBatch) inuseBytes() uint64 {
return uint64(len(b.data) - batchHeaderLen)
return uint64(len(b.data) - batchrepr.HeaderLen)
}

// totalBytes is part of the flushable interface.
Expand Down Expand Up @@ -2191,7 +2100,7 @@ func (i *flushableBatchIter) value() base.LazyValue {
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
keyEnd := i.offsets[i.index].keyEnd
_, value, ok = batchDecodeStr(i.data[keyEnd:])
_, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
if !ok {
i.err = base.CorruptionErrorf("corrupted batch")
return base.LazyValue{}
Expand Down
Loading
Loading