Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize MVCCGet path. #459

Merged
merged 5 commits into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func verifyCleanup(key proto.Key, db *client.KV, eng engine.Engine, t *testing.T

if err := util.IsTrueWithin(func() bool {
meta := &proto.MVCCMetadata{}
ok, _, _, err := engine.GetProto(eng, engine.MVCCEncodeKey(key), meta)
ok, _, _, err := eng.GetProto(engine.MVCCEncodeKey(key), meta)
if err != nil {
t.Errorf("error getting MVCC metadata: %s", err)
}
Expand Down
23 changes: 23 additions & 0 deletions storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/biogo/store/llrb"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/util"
gogoproto "github.com/gogo/protobuf/proto"
)

// Batch wrap an instance of Engine and provides a limited subset of
Expand Down Expand Up @@ -89,6 +90,24 @@ func (b *Batch) Get(key proto.EncodedKey) ([]byte, error) {
return b.engine.Get(key)
}

// GetProto fetches the value at the specified key and unmarshals it.
func (b *Batch) GetProto(key proto.EncodedKey, msg gogoproto.Message) (
ok bool, keyBytes, valBytes int64, err error) {
var data []byte
if data, err = b.Get(key); err != nil || data == nil {
return
}
ok = true
if msg != nil {
if err = gogoproto.Unmarshal(data, msg); err != nil {
return
}
}
keyBytes = int64(len(key))
valBytes = int64(len(data))
return
}

// Iterate invokes f on key/value pairs merged from the underlying
// engine and pending batch updates. If f returns done or an error,
// the iteration ends and propagates the error.
Expand Down Expand Up @@ -309,6 +328,10 @@ func (bi *batchIterator) Value() []byte {
return bi.pending[0].Value
}

func (bi *batchIterator) ValueProto(msg gogoproto.Message) error {
return gogoproto.Unmarshal(bi.Value(), msg)
}

func (bi *batchIterator) Error() error {
return bi.err
}
Expand Down
6 changes: 3 additions & 3 deletions storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestBatchProto(t *testing.T) {
kv := &proto.RawKeyValue{Key: proto.EncodedKey("a"), Value: []byte("value")}
PutProto(b, proto.EncodedKey("proto"), kv)
getKV := &proto.RawKeyValue{}
ok, keySize, valSize, err := GetProto(b, proto.EncodedKey("proto"), getKV)
ok, keySize, valSize, err := b.GetProto(proto.EncodedKey("proto"), getKV)
if !ok || err != nil {
t.Fatalf("expected GetProto to success ok=%t: %s", ok, err)
}
Expand All @@ -216,14 +216,14 @@ func TestBatchProto(t *testing.T) {
t.Errorf("expected %v; got %v", kv, getKV)
}
// Before commit, proto will not be available via engine.
if ok, _, _, err := GetProto(e, proto.EncodedKey("proto"), getKV); ok || err != nil {
if ok, _, _, err := e.GetProto(proto.EncodedKey("proto"), getKV); ok || err != nil {
t.Fatalf("expected GetProto to fail ok=%t: %s", ok, err)
}
// Commit and verify the proto can be read directly from the engine.
if err := b.Commit(); err != nil {
t.Fatal(err)
}
if ok, _, _, err := GetProto(e, proto.EncodedKey("proto"), getKV); !ok || err != nil {
if ok, _, _, err := e.GetProto(proto.EncodedKey("proto"), getKV); !ok || err != nil {
t.Fatalf("expected GetProto to success ok=%t: %s", ok, err)
}
if !reflect.DeepEqual(getKV, kv) {
Expand Down
31 changes: 8 additions & 23 deletions storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Iterator interface {
Key() proto.EncodedKey
// Value returns the current value as a byte slice.
Value() []byte
// ValueProto unmarshals the value the iterator is currently
// pointing to using a protobuf decoder.
ValueProto(msg gogoproto.Message) error
// Error returns the error, if any, which the iterator encountered.
Error() error
}
Expand All @@ -77,6 +80,11 @@ type Engine interface {
Put(key proto.EncodedKey, value []byte) error
// Get returns the value for the given key, nil otherwise.
Get(key proto.EncodedKey) ([]byte, error)
// GetProto fetches the value at the specified key and unmarshals it
// using a protobuf decoder. Returns true on success or false if the
// key was not found. On success, returns the length in bytes of the
// key and the value.
GetProto(key proto.EncodedKey, msg gogoproto.Message) (ok bool, keyBytes, valBytes int64, err error)
// Iterate scans from start to end keys, visiting at most max
// key/value pairs. On each key value pair, the function f is
// invoked. If f returns an error or if the scan itself encounters
Expand Down Expand Up @@ -183,29 +191,6 @@ func PutProto(engine Engine, key proto.EncodedKey, msg gogoproto.Message) (keyBy
return
}

// GetProto fetches the value at the specified key and unmarshals it
// using a protobuf decoder. Returns true on success or false if the
// key was not found. On success, returns the length in bytes of the
// key and the value.
func GetProto(engine Engine, key proto.EncodedKey, msg gogoproto.Message) (ok bool, keyBytes, valBytes int64, err error) {
var data []byte
if data, err = engine.Get(key); err != nil {
return
}
if data == nil {
return
}
ok = true
if msg != nil {
if err = gogoproto.Unmarshal(data, msg); err != nil {
return
}
}
keyBytes = int64(len(key))
valBytes = int64(len(data))
return
}

// Increment fetches the varint encoded int64 value specified by key
// and adds "inc" to it then re-encodes as varint. The newly incremented
// value is returned.
Expand Down
Loading