Skip to content

Commit

Permalink
Merge pull request cockroachdb#18914 from petermattis/pmattis/rocksdb…
Browse files Browse the repository at this point in the history
…-batch-reader

storage/engine: reduce allocations in RocksDBBatchReader
  • Loading branch information
petermattis authored Sep 30, 2017
2 parents a710e14 + e27f872 commit 081f0ba
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 58 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/engineccl/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func VerifyBatchRepr(
for r.Next() {
switch r.BatchType() {
case engine.BatchTypeValue:
mvccKey, err := engine.DecodeKey(r.UnsafeKey())
mvccKey, err := r.MVCCKey()
if err != nil {
return enginepb.MVCCStats{}, errors.Wrapf(err, "verifying key/value checksums")
}
v := roachpb.Value{RawBytes: r.UnsafeValue()}
v := roachpb.Value{RawBytes: r.Value()}
if err := v.Verify(mvccKey.Key); err != nil {
return enginepb.MVCCStats{}, err
}
Expand Down
89 changes: 37 additions & 52 deletions pkg/storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
package engine

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -347,28 +344,25 @@ func DecodeKey(encodedKey []byte) (MVCCKey, error) {
// for r.Next() {
// switch r.BatchType() {
// case BatchTypeDeletion:
// fmt.Printf("delete(%x)", r.UnsafeKey())
// fmt.Printf("delete(%x)", r.Key())
// case BatchTypeValue:
// fmt.Printf("put(%x,%x)", r.UnsafeKey(), r.UnsafeValue())
// fmt.Printf("put(%x,%x)", r.Key(), r.Value())
// case BatchTypeMerge:
// fmt.Printf("merge(%x,%x)", r.UnsafeKey(), r.UnsafeValue())
// fmt.Printf("merge(%x,%x)", r.Key(), r.Value())
// }
// }
// if err != nil {
// return nil
// }
type RocksDBBatchReader struct {
buf *bytes.Reader
repr []byte

// The error encountered during iterator, if any
err error

// The total number of entries, decoded from the batch header
count uint32

// For allocation avoidance
alloc bufalloc.ByteAllocator

// The following all represent the current entry and are updated by Next.
// `value` is not applicable for BatchTypeDeletion.
offset int
Expand All @@ -380,22 +374,19 @@ type RocksDBBatchReader struct {
// NewRocksDBBatchReader creates a RocksDBBatchReader from the given repr and
// verifies the header.
func NewRocksDBBatchReader(repr []byte) (*RocksDBBatchReader, error) {
// Set offset to -1 so the first call to Next will increment it to 0.
r := RocksDBBatchReader{buf: bytes.NewReader(repr), offset: -1}

var seq uint64
if err := binary.Read(r.buf, binary.LittleEndian, &seq); err != nil {
return nil, err
if len(repr) < headerSize {
return nil, errors.Errorf("batch repr too small: %d < %d", len(repr), headerSize)
}
seq := binary.LittleEndian.Uint64(repr[:countPos])
if seq != 0 {
return nil, errors.Errorf("bad sequence: expected 0, but found %d", seq)
}

if err := binary.Read(r.buf, binary.LittleEndian, &r.count); err != nil {
return nil, err
}

return &r, nil
// Set offset to -1 so the first call to Next will increment it to 0.
r := &RocksDBBatchReader{repr: repr, offset: -1}
r.count = binary.LittleEndian.Uint32(repr[countPos:headerSize])
r.repr = r.repr[headerSize:]
return r, nil
}

// Count returns the declared number of entries in the batch.
Expand All @@ -413,18 +404,21 @@ func (r *RocksDBBatchReader) BatchType() BatchType {
return r.typ
}

// UnsafeKey returns the key of the current batch entry. The memory is
// invalidated on the next call to Next.
func (r *RocksDBBatchReader) UnsafeKey() []byte {
// Key returns the key of the current batch entry.
func (r *RocksDBBatchReader) Key() []byte {
return r.key
}

// UnsafeValue returns the value of the current batch entry. The memory is
// invalidated on the next call to Next. UnsafeValue panics if the BatchType is
// BatchTypeDeleted.
func (r *RocksDBBatchReader) UnsafeValue() []byte {
// MVCCKey returns the MVCC key of the current batch entry.
func (r *RocksDBBatchReader) MVCCKey() (MVCCKey, error) {
return DecodeKey(r.key)
}

// Value returns the value of the current batch entry. Value panics if the
// BatchType is BatchTypeDeleted.
func (r *RocksDBBatchReader) Value() []byte {
if r.typ == BatchTypeDeletion {
panic("cannot call UnsafeValue on a deletion entry")
panic("cannot call Value on a deletion entry")
}
return r.value
}
Expand All @@ -437,21 +431,15 @@ func (r *RocksDBBatchReader) Next() bool {
}

r.offset++
typ, err := r.buf.ReadByte()
if err == io.EOF {
if len(r.repr) == 0 {
if r.offset < int(r.count) {
r.err = errors.Errorf("invalid batch: expected %d entries but found %d", r.count, r.offset)
}
return false
} else if err != nil {
r.err = err
return false
}

// Reset alloc.
r.alloc = r.alloc[:0]

r.typ = BatchType(typ)
r.typ = BatchType(r.repr[0])
r.repr = r.repr[1:]
switch r.typ {
case BatchTypeDeletion:
if r.key, r.err = r.varstring(); r.err != nil {
Expand All @@ -472,30 +460,27 @@ func (r *RocksDBBatchReader) Next() bool {
return false
}
default:
r.err = errors.Errorf("unexpected type %d", typ)
r.err = errors.Errorf("unexpected type %d", r.typ)
return false
}
return true
}

func (r *RocksDBBatchReader) varstring() ([]byte, error) {
n, err := binary.ReadUvarint(r.buf)
if err != nil {
return nil, err
v, n := binary.Uvarint(r.repr)
if n <= 0 {
return nil, fmt.Errorf("unable to decode uvarint")
}
if n == 0 {
r.repr = r.repr[n:]
if v == 0 {
return nil, nil
}

var s []byte
r.alloc, s = r.alloc.Alloc(int(n), 0)
c, err := r.buf.Read(s)
if err != nil {
return nil, err
}
if c != int(n) {
return nil, fmt.Errorf("expected %d bytes, but found %d", n, c)
if v > uint64(len(r.repr)) {
return nil, fmt.Errorf("malformed varstring, expected %d bytes, but only %d remaining",
v, len(r.repr))
}
s := r.repr[:v]
r.repr = r.repr[v:]
return s, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ func TestBatchRepr(t *testing.T) {
}
switch r.BatchType() {
case BatchTypeDeletion:
ops = append(ops, fmt.Sprintf("delete(%s)", string(r.UnsafeKey())))
ops = append(ops, fmt.Sprintf("delete(%s)", string(r.Key())))
case BatchTypeValue:
ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.UnsafeKey()), string(r.UnsafeValue())))
ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.Key()), string(r.Value())))
case BatchTypeMerge:
// The merge value is a protobuf and not easily displayable.
ops = append(ops, fmt.Sprintf("merge(%s)", string(r.UnsafeKey())))
ops = append(ops, fmt.Sprintf("merge(%s)", string(r.Key())))
}
}
if err != nil {
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func TestDecodeKey(t *testing.T) {
if !r.Next() {
t.Fatalf("could not get the first entry: %+v", r.Error())
}
decodedKey, err := DecodeKey(r.UnsafeKey())
decodedKey, err := DecodeKey(r.Key())
if err != nil {
t.Fatalf("unexpected err: %+v", err)
}
Expand Down

0 comments on commit 081f0ba

Please sign in to comment.