diff --git a/pkg/ccl/storageccl/engineccl/rocksdb.go b/pkg/ccl/storageccl/engineccl/rocksdb.go index ab84ea03cc16..a54726f622d9 100644 --- a/pkg/ccl/storageccl/engineccl/rocksdb.go +++ b/pkg/ccl/storageccl/engineccl/rocksdb.go @@ -52,11 +52,11 @@ func VerifyBatchRepr( for r.Next() { switch r.BatchType() { case engine.BatchTypeValue: - mvccKey, err := engine.DecodeKey(r.UnsafeKey()) + mvccKey, err := r.MVCCKey() if err != nil { return enginepb.MVCCStats{}, errors.Wrapf(err, "verifying key/value checksums") } - v := roachpb.Value{RawBytes: r.UnsafeValue()} + v := roachpb.Value{RawBytes: r.Value()} if err := v.Verify(mvccKey.Key); err != nil { return enginepb.MVCCStats{}, err } diff --git a/pkg/storage/engine/batch.go b/pkg/storage/engine/batch.go index 7d25a8e3054b..60abd6e630ea 100644 --- a/pkg/storage/engine/batch.go +++ b/pkg/storage/engine/batch.go @@ -15,12 +15,9 @@ package engine import ( - "bytes" "encoding/binary" "fmt" - "io" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/pkg/errors" ) @@ -347,18 +344,18 @@ func DecodeKey(encodedKey []byte) (MVCCKey, error) { // for r.Next() { // switch r.BatchType() { // case BatchTypeDeletion: -// fmt.Printf("delete(%x)", r.UnsafeKey()) +// fmt.Printf("delete(%x)", r.Key()) // case BatchTypeValue: -// fmt.Printf("put(%x,%x)", r.UnsafeKey(), r.UnsafeValue()) +// fmt.Printf("put(%x,%x)", r.Key(), r.Value()) // case BatchTypeMerge: -// fmt.Printf("merge(%x,%x)", r.UnsafeKey(), r.UnsafeValue()) +// fmt.Printf("merge(%x,%x)", r.Key(), r.Value()) // } // } // if err != nil { // return nil // } type RocksDBBatchReader struct { - buf *bytes.Reader + repr []byte // The error encountered during iterator, if any err error @@ -366,9 +363,6 @@ type RocksDBBatchReader struct { // The total number of entries, decoded from the batch header count uint32 - // For allocation avoidance - alloc bufalloc.ByteAllocator - // The following all represent the current entry and are updated by Next. // `value` is not applicable for BatchTypeDeletion. offset int @@ -380,22 +374,19 @@ type RocksDBBatchReader struct { // NewRocksDBBatchReader creates a RocksDBBatchReader from the given repr and // verifies the header. func NewRocksDBBatchReader(repr []byte) (*RocksDBBatchReader, error) { - // Set offset to -1 so the first call to Next will increment it to 0. - r := RocksDBBatchReader{buf: bytes.NewReader(repr), offset: -1} - - var seq uint64 - if err := binary.Read(r.buf, binary.LittleEndian, &seq); err != nil { - return nil, err + if len(repr) < headerSize { + return nil, errors.Errorf("batch repr too small: %d < %d", len(repr), headerSize) } + seq := binary.LittleEndian.Uint64(repr[:countPos]) if seq != 0 { return nil, errors.Errorf("bad sequence: expected 0, but found %d", seq) } - if err := binary.Read(r.buf, binary.LittleEndian, &r.count); err != nil { - return nil, err - } - - return &r, nil + // Set offset to -1 so the first call to Next will increment it to 0. + r := &RocksDBBatchReader{repr: repr, offset: -1} + r.count = binary.LittleEndian.Uint32(repr[countPos:headerSize]) + r.repr = r.repr[headerSize:] + return r, nil } // Count returns the declared number of entries in the batch. @@ -413,18 +404,21 @@ func (r *RocksDBBatchReader) BatchType() BatchType { return r.typ } -// UnsafeKey returns the key of the current batch entry. The memory is -// invalidated on the next call to Next. -func (r *RocksDBBatchReader) UnsafeKey() []byte { +// Key returns the key of the current batch entry. +func (r *RocksDBBatchReader) Key() []byte { return r.key } -// UnsafeValue returns the value of the current batch entry. The memory is -// invalidated on the next call to Next. UnsafeValue panics if the BatchType is -// BatchTypeDeleted. -func (r *RocksDBBatchReader) UnsafeValue() []byte { +// MVCCKey returns the MVCC key of the current batch entry. +func (r *RocksDBBatchReader) MVCCKey() (MVCCKey, error) { + return DecodeKey(r.key) +} + +// Value returns the value of the current batch entry. Value panics if the +// BatchType is BatchTypeDeleted. +func (r *RocksDBBatchReader) Value() []byte { if r.typ == BatchTypeDeletion { - panic("cannot call UnsafeValue on a deletion entry") + panic("cannot call Value on a deletion entry") } return r.value } @@ -437,21 +431,15 @@ func (r *RocksDBBatchReader) Next() bool { } r.offset++ - typ, err := r.buf.ReadByte() - if err == io.EOF { + if len(r.repr) == 0 { if r.offset < int(r.count) { r.err = errors.Errorf("invalid batch: expected %d entries but found %d", r.count, r.offset) } return false - } else if err != nil { - r.err = err - return false } - // Reset alloc. - r.alloc = r.alloc[:0] - - r.typ = BatchType(typ) + r.typ = BatchType(r.repr[0]) + r.repr = r.repr[1:] switch r.typ { case BatchTypeDeletion: if r.key, r.err = r.varstring(); r.err != nil { @@ -472,30 +460,27 @@ func (r *RocksDBBatchReader) Next() bool { return false } default: - r.err = errors.Errorf("unexpected type %d", typ) + r.err = errors.Errorf("unexpected type %d", r.typ) return false } return true } func (r *RocksDBBatchReader) varstring() ([]byte, error) { - n, err := binary.ReadUvarint(r.buf) - if err != nil { - return nil, err + v, n := binary.Uvarint(r.repr) + if n <= 0 { + return nil, fmt.Errorf("unable to decode uvarint") } - if n == 0 { + r.repr = r.repr[n:] + if v == 0 { return nil, nil } - - var s []byte - r.alloc, s = r.alloc.Alloc(int(n), 0) - c, err := r.buf.Read(s) - if err != nil { - return nil, err - } - if c != int(n) { - return nil, fmt.Errorf("expected %d bytes, but found %d", n, c) + if v > uint64(len(r.repr)) { + return nil, fmt.Errorf("malformed varstring, expected %d bytes, but only %d remaining", + v, len(r.repr)) } + s := r.repr[:v] + r.repr = r.repr[v:] return s, nil } diff --git a/pkg/storage/engine/batch_test.go b/pkg/storage/engine/batch_test.go index c3daae30bf4f..f106a0ccfaf2 100644 --- a/pkg/storage/engine/batch_test.go +++ b/pkg/storage/engine/batch_test.go @@ -261,12 +261,12 @@ func TestBatchRepr(t *testing.T) { } switch r.BatchType() { case BatchTypeDeletion: - ops = append(ops, fmt.Sprintf("delete(%s)", string(r.UnsafeKey()))) + ops = append(ops, fmt.Sprintf("delete(%s)", string(r.Key()))) case BatchTypeValue: - ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.UnsafeKey()), string(r.UnsafeValue()))) + ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.Key()), string(r.Value()))) case BatchTypeMerge: // The merge value is a protobuf and not easily displayable. - ops = append(ops, fmt.Sprintf("merge(%s)", string(r.UnsafeKey()))) + ops = append(ops, fmt.Sprintf("merge(%s)", string(r.Key()))) } } if err != nil { @@ -1140,7 +1140,7 @@ func TestDecodeKey(t *testing.T) { if !r.Next() { t.Fatalf("could not get the first entry: %+v", r.Error()) } - decodedKey, err := DecodeKey(r.UnsafeKey()) + decodedKey, err := DecodeKey(r.Key()) if err != nil { t.Fatalf("unexpected err: %+v", err) }