From ce5fb8dbc2ed5b73a031227d1ca2a79cf3f88f7f Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Wed, 14 Jan 2015 18:45:53 -0500 Subject: [PATCH 1/2] Reimplement snapshots as a new implementation of Engine interface. This fixes a TODO in the code and helpful to upcoming changes to implement asynchronous scans of ranges for custodial work. Without, more Engine method variants taking snapshot IDs would be necessary. This change is more in keeping with how we do Batch engines and is simpler. Also remove the InternalSnapshotCopy method, which will be replaced by an in-Raft mechanism whereby the leader feeds the snapshot directly to a requesting follower. --- kv/db_test.go | 5 +- proto/api.go | 23 ++--- proto/internal.go | 4 - proto/internal.proto | 24 +---- server/node.go | 5 - storage/engine/batch.go | 25 +---- storage/engine/engine.go | 38 ++------ storage/engine/engine_test.go | 14 +-- storage/engine/in_mem.go | 144 ++++++++++++++-------------- storage/engine/keys.go | 3 - storage/engine/mvcc.go | 9 +- storage/engine/mvcc_test.go | 38 +++----- storage/engine/rocksdb.go | 173 +++++++++++++++++++++------------- storage/range.go | 45 +-------- storage/range_test.go | 126 ------------------------- storage/store.go | 15 +-- 16 files changed, 231 insertions(+), 460 deletions(-) diff --git a/kv/db_test.go b/kv/db_test.go index 9ed2587f22bd..97eefaed916c 100644 --- a/kv/db_test.go +++ b/kv/db_test.go @@ -62,7 +62,7 @@ func TestKVDBCoverage(t *testing.T) { containsReq.Key = key containsResp := &proto.ContainsResponse{} if err := kvClient.Call(proto.Contains, containsReq, containsResp); err != nil || containsResp.Error != nil { - t.Fatal("%s, %s", err, containsResp.GoError()) + t.Fatalf("%s, %s", err, containsResp.GoError()) } if !containsResp.Exists { t.Error("expected contains to be true") @@ -109,7 +109,7 @@ func TestKVDBCoverage(t *testing.T) { t.Fatalf("%s, %s", err, delResp.GoError()) } if err := kvClient.Call(proto.Contains, containsReq, containsResp); err != nil || containsResp.Error != nil { - t.Fatal("%s, %s", err, containsResp.GoError()) + t.Fatalf("%s, %s", err, containsResp.GoError()) } if containsResp.Exists { t.Error("expected contains to be false after delete") @@ -170,7 +170,6 @@ func TestKVDBInternalMethods(t *testing.T) { {proto.InternalHeartbeatTxn, &proto.InternalHeartbeatTxnRequest{}, &proto.InternalHeartbeatTxnResponse{}}, {proto.InternalPushTxn, &proto.InternalPushTxnRequest{}, &proto.InternalPushTxnResponse{}}, {proto.InternalResolveIntent, &proto.InternalResolveIntentRequest{}, &proto.InternalResolveIntentResponse{}}, - {proto.InternalSnapshotCopy, &proto.InternalSnapshotCopyRequest{}, &proto.InternalSnapshotCopyResponse{}}, {proto.InternalMerge, &proto.InternalMergeRequest{}, &proto.InternalMergeResponse{}}, } // Verify non-public methods experience bad request errors. diff --git a/proto/api.go b/proto/api.go index 50e83809cb50..296bed11045a 100644 --- a/proto/api.go +++ b/proto/api.go @@ -102,7 +102,6 @@ var AllMethods = stringSet{ InternalHeartbeatTxn: struct{}{}, InternalPushTxn: struct{}{}, InternalResolveIntent: struct{}{}, - InternalSnapshotCopy: struct{}{}, InternalMerge: struct{}{}, } @@ -131,20 +130,18 @@ var InternalMethods = stringSet{ InternalHeartbeatTxn: struct{}{}, InternalPushTxn: struct{}{}, InternalResolveIntent: struct{}{}, - InternalSnapshotCopy: struct{}{}, InternalMerge: struct{}{}, } // ReadMethods specifies the set of methods which read and return data. var ReadMethods = stringSet{ - Contains: struct{}{}, - Get: struct{}{}, - ConditionalPut: struct{}{}, - Increment: struct{}{}, - Scan: struct{}{}, - ReapQueue: struct{}{}, - InternalRangeLookup: struct{}{}, - InternalSnapshotCopy: struct{}{}, + Contains: struct{}{}, + Get: struct{}{}, + ConditionalPut: struct{}{}, + Increment: struct{}{}, + Scan: struct{}{}, + ReapQueue: struct{}{}, + InternalRangeLookup: struct{}{}, } // WriteMethods specifies the set of methods which write data. @@ -336,8 +333,6 @@ func MethodForRequest(req Request) (string, error) { return InternalPushTxn, nil case *InternalResolveIntentRequest: return InternalResolveIntent, nil - case *InternalSnapshotCopyRequest: - return InternalSnapshotCopy, nil case *InternalMergeRequest: return InternalMerge, nil } @@ -391,8 +386,6 @@ func CreateArgs(method string) (Request, error) { return &InternalPushTxnRequest{}, nil case InternalResolveIntent: return &InternalResolveIntentRequest{}, nil - case InternalSnapshotCopy: - return &InternalSnapshotCopyRequest{}, nil case InternalMerge: return &InternalMergeRequest{}, nil } @@ -436,8 +429,6 @@ func CreateReply(method string) (Response, error) { return &InternalPushTxnResponse{}, nil case InternalResolveIntent: return &InternalResolveIntentResponse{}, nil - case InternalSnapshotCopy: - return &InternalSnapshotCopyResponse{}, nil case InternalMerge: return &InternalMergeResponse{}, nil } diff --git a/proto/internal.go b/proto/internal.go index 4b4bc604bca4..827627765f81 100644 --- a/proto/internal.go +++ b/proto/internal.go @@ -43,10 +43,6 @@ const ( // InternalResolveIntent resolves existing write intents for a key or // key range. InternalResolveIntent = "InternalResolveIntent" - // InternalSnapshotCopy scans the key range specified by start key through - // end key up to some maximum number of results from the given snapshot_id. - // It will create a snapshot if snapshot_id is empty. - InternalSnapshotCopy = "InternalSnapshotCopy" // InternalMerge merges a given value into the specified key. Merge is a // high-performance operation provided by underlying data storage for values // which are accumulated over several writes. Because it is not diff --git a/proto/internal.proto b/proto/internal.proto index 8ad85f67b06b..b17c1279cdca 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -111,27 +111,6 @@ message InternalResolveIntentResponse { optional ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; } -// An InternalSnapshotCopyRequest is arguments to the InternalSnapshotCopy() -// method. It specifies the start and end keys for the scan and the -// maximum number of results from the given snapshot_id. It will create -// a snapshot if snapshot_id is empty. -message InternalSnapshotCopyRequest { - optional RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; - // Optional, a new snapshot will be created if it is empty. - optional string snapshot_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "SnapshotID"]; - // Must be > 0. - optional int64 max_results = 3 [(gogoproto.nullable) = false]; -} - -// An InternalSnapshotCopyResponse is the return value from the -// InternalSnapshotCopy() method. -message InternalSnapshotCopyResponse { - optional ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; - optional string snapshot_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "SnapshotID"]; - // Empty if no rows were scanned. - repeated RawKeyValue rows = 3 [(gogoproto.nullable) = false]; -} - // An InternalMergeRequest contains arguments to the InternalMerge() method. It // specifies a key and a value which should be merged into the existing value at // that key. @@ -190,8 +169,7 @@ message InternalRaftCommandUnion { optional InternalHeartbeatTxnRequest internal_heartbeat_txn = 32; optional InternalPushTxnRequest internal_push_txn = 33; optional InternalResolveIntentRequest internal_resolve_intent = 34; - optional InternalSnapshotCopyRequest internal_snapshot_copy = 35; - optional InternalMergeRequest internal_merge_response = 36; + optional InternalMergeRequest internal_merge_response = 35; } // An InternalRaftCommand is a command which can be serialized and diff --git a/server/node.go b/server/node.go index ce1c5bdbc8a1..ea3d22329fb2 100644 --- a/server/node.go +++ b/server/node.go @@ -476,11 +476,6 @@ func (n *Node) InternalResolveIntent(args *proto.InternalResolveIntentRequest, r return n.executeCmd(proto.InternalResolveIntent, args, reply) } -// InternalSnapshotCopy . -func (n *Node) InternalSnapshotCopy(args *proto.InternalSnapshotCopyRequest, reply *proto.InternalSnapshotCopyResponse) error { - return n.executeCmd(proto.InternalSnapshotCopy, args, reply) -} - // InternalMerge . func (n *Node) InternalMerge(args *proto.InternalMergeRequest, reply *proto.InternalMergeResponse) error { return n.executeCmd(proto.InternalMerge, args, reply) diff --git a/storage/engine/batch.go b/storage/engine/batch.go index 7cdceb81b2ae..a6f587ef8075 100644 --- a/storage/engine/batch.go +++ b/storage/engine/batch.go @@ -209,26 +209,6 @@ func (b *Batch) Capacity() (StoreCapacity, error) { func (b *Batch) SetGCTimeouts(minTxnTS, minRCacheTS int64) { } -// CreateSnapshot returns an error if called on a Batch. -func (b *Batch) CreateSnapshot(snapshotID string) error { - return util.Errorf("cannot create a snapshot from a Batch") -} - -// ReleaseSnapshot returns an error if called on a Batch. -func (b *Batch) ReleaseSnapshot(snapshotID string) error { - return util.Errorf("cannot release a snapshot from a Batch") -} - -// GetSnapshot returns an error if called on a Batch. -func (b *Batch) GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error) { - return nil, util.Errorf("cannot get with a snapshot from a Batch") -} - -// IterateSnapshot returns an error if called on a Batch. -func (b *Batch) IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error { - return util.Errorf("cannot iterate with a snapshot from a Batch") -} - // ApproximateSize returns an error if called on a Batch. func (b *Batch) ApproximateSize(start, end proto.EncodedKey) (uint64, error) { return 0, util.Errorf("cannot get approximate size from a Batch") @@ -240,6 +220,11 @@ func (b *Batch) NewIterator() Iterator { return newBatchIterator(b.engine, &b.updates) } +// NewSnapshot returns nil if called on a Batch. +func (b *Batch) NewSnapshot() Engine { + return nil +} + // NewBatch returns a new Batch instance wrapping same underlying engine. func (b *Batch) NewBatch() Engine { return &Batch{engine: b.engine} diff --git a/storage/engine/engine.go b/storage/engine/engine.go index 9866e1755b58..2d64429e15e2 100644 --- a/storage/engine/engine.go +++ b/storage/engine/engine.go @@ -64,9 +64,6 @@ type Iterator interface { // Engine is the interface that wraps the core operations of a // key/value store. -// TODO(Jiang-Ming,Spencer): Remove some of the *Snapshot methods and have -// their non-snapshot counterparts accept a snapshotID which is used unless -// empty. type Engine interface { // Start initializes and starts the engine. Start() error @@ -115,17 +112,6 @@ type Engine interface { // Rows with timestamps less than the associated value will be GC'd // during compaction. SetGCTimeouts(minTxnTS, minRCacheTS int64) - // CreateSnapshot creates a snapshot handle from engine. - CreateSnapshot(snapshotID string) error - // ReleaseSnapshot releases the existing snapshot handle for the - // given snapshotID. - ReleaseSnapshot(snapshotID string) error - // GetSnapshot returns the value for the given key from the given - // snapshotID, nil otherwise. - GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error) - // IterateSnapshot scans from start to end keys, visiting at - // most max key/value pairs from the specified snapshot ID. - IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error // ApproximateSize returns the approximate number of bytes the engine is // using to store data for the given range of keys. ApproximateSize(start, end proto.EncodedKey) (uint64, error) @@ -133,6 +119,12 @@ type Engine interface { // engine. The caller must invoke Iterator.Close() when finished with // the iterator to free resources. NewIterator() Iterator + // NewSnapshot returns a new instance of a read-only snapshot + // engine. Snapshots are instantaneous and, as long as they're + // released relatively quickly, inexpensive. Snapshots are released + // by invoking Stop(). Note that snapshots must not be used after the + // original engine has been stopped. + NewSnapshot() Engine // NewBatch returns a new instance of a batched engine which wraps // this engine. Batched engines accumulate all mutations and apply // them atomically on a call to Commit(). @@ -140,11 +132,6 @@ type Engine interface { // Commit atomically applies any batched updates to the underlying // engine. This is a noop unless the engine was created via NewBatch(). Commit() error - - // TODO(petermattis): Remove the WriteBatch functionality from this - // interface. Add a "NewSnapshot() Engine" method which works - // similarly to NewBatch and remove CreateSnapshot, GetSnapshot, - // IterateSnapshot. } // A BatchDelete is a delete operation executed as part of an atomic batch. @@ -258,19 +245,6 @@ func Scan(engine Engine, start, end proto.EncodedKey, max int64) ([]proto.RawKey return kvs, err } -// ScanSnapshot scans using the given snapshot ID. -func ScanSnapshot(engine Engine, start, end proto.EncodedKey, max int64, snapshotID string) ([]proto.RawKeyValue, error) { - var kvs []proto.RawKeyValue - err := engine.IterateSnapshot(start, end, snapshotID, func(kv proto.RawKeyValue) (bool, error) { - if max != 0 && int64(len(kvs)) >= max { - return true, nil - } - kvs = append(kvs, kv) - return false, nil - }) - return kvs, err -} - // ClearRange removes a set of entries, from start (inclusive) to end // (exclusive). This function returns the number of entries // removed. Either all entries within the range will be deleted, or diff --git a/storage/engine/engine_test.go b/storage/engine/engine_test.go index bd7a1f92587b..74952f22fe44 100644 --- a/storage/engine/engine_test.go +++ b/storage/engine/engine_test.go @@ -506,16 +506,11 @@ func TestSnapshot(t *testing.T) { val, val1) } - snapshotID := strconv.FormatInt(1, 10) - error := engine.CreateSnapshot(snapshotID) - if error != nil { - t.Fatalf("error : %s", error) - } - + snap := engine.NewSnapshot() val2 := []byte("2") engine.Put(key, val2) val, _ = engine.Get(key) - valSnapshot, error := engine.GetSnapshot(key, snapshotID) + valSnapshot, error := snap.Get(key) if error != nil { t.Fatalf("error : %s", error) } @@ -529,7 +524,7 @@ func TestSnapshot(t *testing.T) { } keyvals, _ := Scan(engine, key, proto.EncodedKey(KeyMax), 0) - keyvalsSnapshot, error := ScanSnapshot(engine, key, proto.EncodedKey(KeyMax), 0, snapshotID) + keyvalsSnapshot, error := Scan(snap, key, proto.EncodedKey(KeyMax), 0) if error != nil { t.Fatalf("error : %s", error) } @@ -541,8 +536,7 @@ func TestSnapshot(t *testing.T) { t.Fatalf("the value %s in get result does not match the value %s in request", keyvalsSnapshot[0].Value, val1) } - - engine.ReleaseSnapshot(snapshotID) + snap.Stop() }, t) } diff --git a/storage/engine/in_mem.go b/storage/engine/in_mem.go index 979cae3de782..0d9e282cb576 100644 --- a/storage/engine/in_mem.go +++ b/storage/engine/in_mem.go @@ -28,6 +28,7 @@ import ( "code.google.com/p/biogo.store/llrb" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/util" + "github.com/cockroachdb/cockroach/util/log" ) // TODO(petermattis): Remove this file. @@ -56,15 +57,13 @@ type InMem struct { maxBytes int64 usedBytes int64 data llrb.Tree - snapshots map[string]llrb.Tree } // NewInMem allocates and returns a new InMem object. func NewInMem(attrs proto.Attributes, maxBytes int64) *InMem { return &InMem{ - snapshots: map[string]llrb.Tree{}, - attrs: attrs, - maxBytes: maxBytes, + attrs: attrs, + maxBytes: maxBytes, } } @@ -82,48 +81,6 @@ func (in *InMem) Start() error { func (in *InMem) Stop() { } -// CreateSnapshot creates a snapshot handle from engine. -func (in *InMem) CreateSnapshot(snapshotID string) error { - in.Lock() - defer in.Unlock() - _, ok := in.snapshots[snapshotID] - if ok { - return util.Errorf("snapshotID %s already exists", snapshotID) - } - snapshotHandle := cloneTree(in.data) - in.snapshots[snapshotID] = snapshotHandle - return nil -} - -func cloneTree(a llrb.Tree) llrb.Tree { - var newTree = llrb.Tree{Count: a.Count} - newTree.Root = cloneNode(a.Root) - return newTree -} - -func cloneNode(a *llrb.Node) *llrb.Node { - if a == nil { - return nil - } - var newNode = &llrb.Node{Elem: a.Elem, Color: a.Color} - newNode.Left = cloneNode(a.Left) - newNode.Right = cloneNode(a.Right) - return newNode -} - -// ReleaseSnapshot releases the existing snapshot handle for the -// given snapshotID. -func (in *InMem) ReleaseSnapshot(snapshotID string) error { - in.Lock() - defer in.Unlock() - _, ok := in.snapshots[snapshotID] - if !ok { - return util.Errorf("snapshotID %s does not exist", snapshotID) - } - delete(in.snapshots, snapshotID) - return nil -} - // Attrs returns the list of attributes describing this engine. This // includes the disk type (always "mem") and potentially other labels // to identify important attributes of the engine. @@ -189,18 +146,6 @@ func (in *InMem) Get(key proto.EncodedKey) ([]byte, error) { return in.getLocked(key, in.data) } -// GetSnapshot returns the value for the given key from the given -// snapshotID, nil otherwise. -func (in *InMem) GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error) { - in.RLock() - defer in.RUnlock() - snapshotHandle, ok := in.snapshots[snapshotID] - if !ok { - return nil, util.Errorf("snapshotID %s does not exist", snapshotID) - } - return in.getLocked(key, snapshotHandle) -} - // getLocked performs a get operation assuming that the caller // is already holding the mutex. func (in *InMem) getLocked(key proto.EncodedKey, data llrb.Tree) ([]byte, error) { @@ -223,18 +168,6 @@ func (in *InMem) Iterate(start, end proto.EncodedKey, f func(proto.RawKeyValue) return in.iterateLocked(start, end, f, in.data) } -// Iterate iterates from start to end keys using snapshot ID, invoking -// f on each key/value pair. See engine.IterateSnapshot for details. -func (in *InMem) IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error { - in.RLock() - defer in.RUnlock() - snapshotHandle, ok := in.snapshots[snapshotID] - if !ok { - return util.Errorf("snapshotID %s does not exist", snapshotID) - } - return in.iterateLocked(start, end, f, snapshotHandle) -} - func (in *InMem) iterateLocked(start, end proto.EncodedKey, f func(proto.RawKeyValue) (bool, error), data llrb.Tree) error { if bytes.Compare(start, end) >= 0 { return nil @@ -342,6 +275,36 @@ func (in *InMem) NewIterator() Iterator { } } +// NewSnapshot creates a read-only snapshot engine from this engine. +func (in *InMem) NewSnapshot() Engine { + in.Lock() + defer in.Unlock() + return &inMemSnapshot{ + InMem: InMem{ + attrs: in.attrs, + maxBytes: in.maxBytes, + usedBytes: in.usedBytes, + data: cloneTree(in.data), + }, + } +} + +func cloneTree(a llrb.Tree) llrb.Tree { + var newTree = llrb.Tree{Count: a.Count} + newTree.Root = cloneNode(a.Root) + return newTree +} + +func cloneNode(a *llrb.Node) *llrb.Node { + if a == nil { + return nil + } + var newNode = &llrb.Node{Elem: a.Elem, Color: a.Color} + newNode.Left = cloneNode(a.Left) + newNode.Right = cloneNode(a.Right) + return newNode +} + // Returns a new Batch wrapping this in-memory engine. func (in *InMem) NewBatch() Engine { return &Batch{engine: in} @@ -352,6 +315,47 @@ func (in *InMem) Commit() error { return nil } +// This implementation restricts operation of the underlying in memory +// engine to read-only commands. +type inMemSnapshot struct { + InMem +} + +// Put is illegal for snapshot and returns an error. +func (in *inMemSnapshot) Put(key proto.EncodedKey, value []byte) error { + return util.Errorf("cannot Put to a snapshot") +} + +// Clear is illegal for snapshot and returns an error. +func (in *inMemSnapshot) Clear(key proto.EncodedKey) error { + return util.Errorf("cannot Clear from a snapshot") +} + +// WriteBatch is illegal for snapshot and returns an error. +func (in *inMemSnapshot) WriteBatch([]interface{}) error { + return util.Errorf("cannot WriteBatch to a snapshot") +} + +// Merge is illegal for snapshot and returns an error. +func (in *inMemSnapshot) Merge(key proto.EncodedKey, value []byte) error { + return util.Errorf("cannot Merge to a snapshot") +} + +// SetGCTimeouts is a noop for a snapshot. +func (in *inMemSnapshot) SetGCTimeouts(minTxnTS, minRCacheTS int64) { +} + +// NewBatch is illegal for snapshot and returns an error. +func (in *inMemSnapshot) NewBatch() Engine { + log.Errorf("cannot create a NewBatch from a snapshot") + return nil +} + +// Commit is illegal for snapshot and returns an error. +func (in *inMemSnapshot) Commit() error { + return util.Errorf("cannot Commit to a snapshot") +} + // This implementation is not very efficient because the biogo LLRB // API supports iterations, not iterators. Every call to Next() is // O(logN). But since the in-memory engine is really only good for diff --git a/storage/engine/keys.go b/storage/engine/keys.go index a67e810cbf36..99d5e1a68dd5 100644 --- a/storage/engine/keys.go +++ b/storage/engine/keys.go @@ -196,9 +196,6 @@ var ( // KeyLocalTransactionPrefix specifies the key prefix for // transaction records. The suffix is the transaction id. KeyLocalTransactionPrefix = MakeKey(KeyLocalPrefix, proto.Key("txn-")) - // KeyLocalSnapshotIDGenerator is a snapshot ID generator sequence. - // Snapshot IDs must be unique per store ID. - KeyLocalSnapshotIDGenerator = MakeKey(KeyLocalPrefix, proto.Key("ssid")) // KeyLocalMax is the end of the local key range. KeyLocalMax = KeyLocalPrefix.PrefixEnd() diff --git a/storage/engine/mvcc.go b/storage/engine/mvcc.go index 6b6dddebed24..cc50cc01dcbe 100644 --- a/storage/engine/mvcc.go +++ b/storage/engine/mvcc.go @@ -1034,13 +1034,12 @@ func isValidEncodedSplitKey(key proto.EncodedKey) bool { // MVCCFindSplitKey suggests a split key from the given user-space key // range that aims to roughly cut into half the total number of bytes -// used (in raw key and value byte strings) in both subranges. It will -// operate on a snapshot of the underlying engine if a snapshotID is -// given, and in that case may safely be invoked in a goroutine. +// used (in raw key and value byte strings) in both subranges. Specify +// a snapshot engine to safely invoke this method in a goroutine. // // The split key will never be chosen from the key ranges listed in // illegalSplitKeyRanges. -func MVCCFindSplitKey(engine Engine, raftID int64, key, endKey proto.Key, snapshotID string) (proto.Key, error) { +func MVCCFindSplitKey(engine Engine, raftID int64, key, endKey proto.Key) (proto.Key, error) { if key.Less(KeyLocalMax) { key = KeyLocalMax } @@ -1058,7 +1057,7 @@ func MVCCFindSplitKey(engine Engine, raftID int64, key, endKey proto.Key, snapsh bestSplitKey := encStartKey bestSplitDiff := int64(math.MaxInt64) - if err := engine.IterateSnapshot(encStartKey, encEndKey, snapshotID, func(kv proto.RawKeyValue) (bool, error) { + if err := engine.Iterate(encStartKey, encEndKey, func(kv proto.RawKeyValue) (bool, error) { // Is key within a legal key range? valid := isValidEncodedSplitKey(kv.Key) diff --git a/storage/engine/mvcc_test.go b/storage/engine/mvcc_test.go index 15fead2943ca..77f70b4ab4c0 100644 --- a/storage/engine/mvcc_test.go +++ b/storage/engine/mvcc_test.go @@ -762,7 +762,7 @@ func TestMVCCConditionalPut(t *testing.T) { } switch e := err.(type) { default: - t.Fatal("unexpected error %T", e) + t.Fatalf("unexpected error %T", e) case *proto.ConditionFailedError: if e.ActualValue != nil { t.Fatalf("expected missing actual value: %v", e.ActualValue) @@ -776,7 +776,7 @@ func TestMVCCConditionalPut(t *testing.T) { } switch e := err.(type) { default: - t.Fatal("unexpected error %T", e) + t.Fatalf("unexpected error %T", e) case *proto.ConditionFailedError: if e.ActualValue != nil { t.Fatalf("expected missing actual value: %v", e.ActualValue) @@ -796,7 +796,7 @@ func TestMVCCConditionalPut(t *testing.T) { } switch e := err.(type) { default: - t.Fatal("unexpected error %T", e) + t.Fatalf("unexpected error %T", e) case *proto.ConditionFailedError: if !bytes.Equal(e.ActualValue.Bytes, value1.Bytes) { t.Fatalf("the value %s in get result does not match the value %s in request", @@ -811,7 +811,7 @@ func TestMVCCConditionalPut(t *testing.T) { } switch e := err.(type) { default: - t.Fatal("unexpected error %T", e) + t.Fatalf("unexpected error %T", e) case *proto.ConditionFailedError: if !bytes.Equal(e.ActualValue.Bytes, value1.Bytes) { t.Fatalf("the value %s in get result does not match the value %s in request", @@ -1238,10 +1238,9 @@ func TestFindSplitKey(t *testing.T) { } } ms.MergeStats(engine, raftID, 0) // write stats - if err := engine.CreateSnapshot("snap1"); err != nil { - t.Fatal(err) - } - humanSplitKey, err := MVCCFindSplitKey(engine, raftID, KeyMin, KeyMax, "snap1") + snap := engine.NewSnapshot() + defer snap.Stop() + humanSplitKey, err := MVCCFindSplitKey(snap, raftID, KeyMin, KeyMax) if err != nil { t.Fatal(err) } @@ -1249,9 +1248,6 @@ func TestFindSplitKey(t *testing.T) { if diff := splitReservoirSize/2 - ind; diff > 1 || diff < -1 { t.Fatalf("wanted key #%d+-1, but got %d (diff %d)", ind+diff, ind, diff) } - if err := engine.ReleaseSnapshot("snap1"); err != nil { - t.Fatal(err) - } } // TestFindValidSplitKeys verifies split keys are located such that @@ -1341,12 +1337,11 @@ func TestFindValidSplitKeys(t *testing.T) { } } ms.MergeStats(engine, raftID, 0) // write stats - if err := engine.CreateSnapshot("snap1"); err != nil { - t.Fatal(err) - } + snap := engine.NewSnapshot() + defer snap.Stop() rangeStart := test.keys[0] rangeEnd := test.keys[len(test.keys)-1].Next() - splitKey, err := MVCCFindSplitKey(engine, raftID, rangeStart, rangeEnd, "snap1") + splitKey, err := MVCCFindSplitKey(snap, raftID, rangeStart, rangeEnd) if test.expError { if err == nil { t.Errorf("%d: expected error", i) @@ -1360,9 +1355,6 @@ func TestFindValidSplitKeys(t *testing.T) { if !splitKey.Equal(test.expSplit) { t.Errorf("%d: expected split key %q; got %q", i, test.expSplit, splitKey) } - if err := engine.ReleaseSnapshot("snap1"); err != nil { - t.Fatal(err) - } } } @@ -1428,10 +1420,9 @@ func TestFindBalancedSplitKeys(t *testing.T) { } } ms.MergeStats(engine, raftID, 0) // write stats - if err := engine.CreateSnapshot("snap1"); err != nil { - t.Fatal(err) - } - splitKey, err := MVCCFindSplitKey(engine, raftID, proto.Key("\x01"), proto.KeyMax, "snap1") + snap := engine.NewSnapshot() + defer snap.Stop() + splitKey, err := MVCCFindSplitKey(snap, raftID, proto.Key("\x01"), proto.KeyMax) if err != nil { t.Errorf("unexpected error: %s", err) continue @@ -1439,9 +1430,6 @@ func TestFindBalancedSplitKeys(t *testing.T) { if !splitKey.Equal(expKey) { t.Errorf("%d: expected split key %q; got %q", i, expKey, splitKey) } - if err := engine.ReleaseSnapshot("snap1"); err != nil { - t.Fatal(err) - } } } diff --git a/storage/engine/rocksdb.go b/storage/engine/rocksdb.go index 071093340a79..0379cb1f6e2e 100644 --- a/storage/engine/rocksdb.go +++ b/storage/engine/rocksdb.go @@ -29,7 +29,6 @@ import ( "errors" "flag" "fmt" - "sync" "syscall" "unsafe" @@ -51,17 +50,13 @@ type RocksDB struct { rdb *C.DBEngine attrs proto.Attributes // Attributes for this engine dir string // The data directory - - sync.Mutex // Protects the snapshots map. - snapshots map[string]*C.DBSnapshot // Map of snapshot handles by snapshot ID } // NewRocksDB allocates and returns a new RocksDB object. func NewRocksDB(attrs proto.Attributes, dir string) *RocksDB { return &RocksDB{ - snapshots: map[string]*C.DBSnapshot{}, - attrs: attrs, - dir: dir, + attrs: attrs, + dir: dir, } } @@ -114,39 +109,6 @@ func (r *RocksDB) Stop() { r.rdb = nil } -// CreateSnapshot creates a snapshot handle from engine. -func (r *RocksDB) CreateSnapshot(snapshotID string) error { - if r.rdb == nil { - return util.Errorf("RocksDB is not initialized yet") - } - r.Lock() - defer r.Unlock() - _, ok := r.snapshots[snapshotID] - if ok { - return util.Errorf("snapshotID %s already exists", snapshotID) - } - snapshotHandle := C.DBNewSnapshot(r.rdb) - r.snapshots[snapshotID] = snapshotHandle - return nil -} - -// ReleaseSnapshot releases the existing snapshot handle for the -// given snapshotID. -func (r *RocksDB) ReleaseSnapshot(snapshotID string) error { - if r.rdb == nil { - return util.Errorf("RocksDB is not initialized yet") - } - r.Lock() - defer r.Unlock() - snapshotHandle, ok := r.snapshots[snapshotID] - if !ok { - return util.Errorf("snapshotID %s does not exist", snapshotID) - } - C.DBSnapshotRelease(snapshotHandle) - delete(r.snapshots, snapshotID) - return nil -} - // Attrs returns the list of attributes describing this engine. This // may include a specification of disk type (e.g. hdd, ssd, fio, etc.) // and potentially other labels to identify important attributes of @@ -198,19 +160,6 @@ func (r *RocksDB) Get(key proto.EncodedKey) ([]byte, error) { return r.getInternal(key, nil) } -// GetSnapshot returns the value for the given key from the given -// snapshotID, nil otherwise. -func (r *RocksDB) GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error) { - r.Lock() - snapshotHandle, ok := r.snapshots[snapshotID] - if !ok { - return nil, util.Errorf("snapshotID %s does not exist", snapshotID) - } - r.Unlock() - - return r.getInternal(key, snapshotHandle) -} - // Get returns the value for the given key. func (r *RocksDB) getInternal(key proto.EncodedKey, snapshotHandle *C.DBSnapshot) ([]byte, error) { if len(key) == 0 { @@ -238,19 +187,6 @@ func (r *RocksDB) Iterate(start, end proto.EncodedKey, f func(proto.RawKeyValue) return r.iterateInternal(start, end, f, nil) } -// IterateSnapshot iterates from start to end keys, invoking f on -// each key/value pair. See engine.IterateSnapshot for details. -func (r *RocksDB) IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error { - r.Lock() - snapshotHandle, ok := r.snapshots[snapshotID] - if !ok { - return util.Errorf("snapshotID %s does not exist", snapshotID) - } - r.Unlock() - - return r.iterateInternal(start, end, f, snapshotHandle) -} - func (r *RocksDB) iterateInternal(start, end proto.EncodedKey, f func(proto.RawKeyValue) (bool, error), snapshotHandle *C.DBSnapshot) error { if bytes.Compare(start, end) >= 0 { @@ -428,6 +364,19 @@ func (r *RocksDB) NewIterator() Iterator { return newRocksDBIterator(r.rdb, nil) } +// NewSnapshot creates a snapshot handle from engine and returns a +// read-only rocksDBSnapshot engine. +func (r *RocksDB) NewSnapshot() Engine { + if r.rdb == nil { + log.Errorf("RocksDB is not initialized yet") + return nil + } + return &rocksDBSnapshot{ + parent: r, + handle: C.DBNewSnapshot(r.rdb), + } +} + // Returns a new Batch wrapping this rocksdb engine. func (r *RocksDB) NewBatch() Engine { return &Batch{engine: r} @@ -438,6 +387,98 @@ func (r *RocksDB) Commit() error { return nil } +type rocksDBSnapshot struct { + parent *RocksDB + handle *C.DBSnapshot +} + +// Start is a noop. +func (r *rocksDBSnapshot) Start() error { + return nil +} + +// Stop releases the snapshot handle. +func (r *rocksDBSnapshot) Stop() { + C.DBSnapshotRelease(r.handle) +} + +// Attrs returns the engine/store attributes. +func (r *rocksDBSnapshot) Attrs() proto.Attributes { + return r.parent.Attrs() +} + +// Put is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) Put(key proto.EncodedKey, value []byte) error { + return util.Errorf("cannot Put to a snapshot") +} + +// Get returns the value for the given key, nil otherwise using +// the snapshot handle. +func (r *rocksDBSnapshot) Get(key proto.EncodedKey) ([]byte, error) { + return r.parent.getInternal(key, r.handle) +} + +// Iterate iterates over the keys between start inclusive and end +// exclusive, invoking f() on each key/value pair using the snapshot +// handle. +func (r *rocksDBSnapshot) Iterate(start, end proto.EncodedKey, f func(proto.RawKeyValue) (bool, error)) error { + return r.parent.iterateInternal(start, end, f, r.handle) +} + +// Clear is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) Clear(key proto.EncodedKey) error { + return util.Errorf("cannot Clear from a snapshot") +} + +// WriteBatch is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) WriteBatch([]interface{}) error { + return util.Errorf("cannot WriteBatch to a snapshot") +} + +// Merge is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) Merge(key proto.EncodedKey, value []byte) error { + return util.Errorf("cannot Merge to a snapshot") +} + +// Capacity returns capacity details for the engine's available storage. +func (r *rocksDBSnapshot) Capacity() (StoreCapacity, error) { + return r.parent.Capacity() +} + +// SetGCTimeouts is a noop for a snapshot. +func (r *rocksDBSnapshot) SetGCTimeouts(minTxnTS, minRCacheTS int64) { +} + +// ApproximateSize returns the approximate number of bytes the engine is +// using to store data for the given range of keys. +func (r *rocksDBSnapshot) ApproximateSize(start, end proto.EncodedKey) (uint64, error) { + return r.parent.ApproximateSize(start, end) +} + +// NewIterator returns a new instance of an Iterator over the +// engine using the snapshot handle. +func (r *rocksDBSnapshot) NewIterator() Iterator { + return newRocksDBIterator(r.parent.rdb, r.handle) +} + +// NewSnapshot returns a new instance of a read-only snapshot +// from the original RocksDB instance. This will be an updated +// snapshot. +func (r *rocksDBSnapshot) NewSnapshot() Engine { + return r.parent.NewSnapshot() +} + +// NewBatch is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) NewBatch() Engine { + log.Errorf("cannot create a NewBatch from a snapshot") + return nil +} + +// Commit is illegal for snapshot and returns an error. +func (r *rocksDBSnapshot) Commit() error { + return util.Errorf("cannot Commit to a snapshot") +} + type rocksDBIterator struct { iter *C.DBIterator } diff --git a/storage/range.go b/storage/range.go index 36bdc99119d3..d2885cb35238 100644 --- a/storage/range.go +++ b/storage/range.go @@ -135,7 +135,7 @@ type RangeManager interface { SplitRange(origRng, newRng *Range) error AddRange(rng *Range) error RemoveRange(rng *Range) error - CreateSnapshot() (string, error) + NewSnapshot() engine.Engine ProposeRaftCommand(cmdIDKey, proto.InternalRaftCommand) } @@ -704,8 +704,6 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon r.InternalPushTxn(batch, args.(*proto.InternalPushTxnRequest), reply.(*proto.InternalPushTxnResponse)) case proto.InternalResolveIntent: r.InternalResolveIntent(batch, ms, args.(*proto.InternalResolveIntentRequest), reply.(*proto.InternalResolveIntentResponse)) - case proto.InternalSnapshotCopy: - r.InternalSnapshotCopy(r.rm.Engine(), args.(*proto.InternalSnapshotCopyRequest), reply.(*proto.InternalSnapshotCopyResponse)) case proto.InternalMerge: r.InternalMerge(batch, ms, args.(*proto.InternalMergeRequest), reply.(*proto.InternalMergeResponse)) default: @@ -1210,33 +1208,6 @@ func (r *Range) InternalResolveIntent(batch engine.Engine, ms *engine.MVCCStats, } } -// InternalSnapshotCopy scans the key range specified by start key through -// end key up to some maximum number of results from the given snapshot_id. -// It will create a snapshot if snapshot_id is empty. -func (r *Range) InternalSnapshotCopy(e engine.Engine, args *proto.InternalSnapshotCopyRequest, reply *proto.InternalSnapshotCopyResponse) { - if len(args.SnapshotID) == 0 { - snapshotID, err := r.rm.CreateSnapshot() - if err != nil { - reply.SetGoError(err) - return - } - args.SnapshotID = snapshotID - } - - kvs, err := engine.ScanSnapshot(e, proto.EncodedKey(args.Key), proto.EncodedKey(args.EndKey), args.MaxResults, args.SnapshotID) - if err != nil { - reply.SetGoError(err) - return - } - if len(kvs) == 0 { - err = e.ReleaseSnapshot(args.SnapshotID) - } - - reply.Rows = kvs - reply.SnapshotID = args.SnapshotID - reply.SetGoError(err) -} - // InternalMerge is used to merge a value into an existing key. Merge is an // efficient accumulation operation which is exposed by RocksDB, used by // Cockroach for the efficient accumulation of certain values. Due to the @@ -1307,16 +1278,10 @@ func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSpli // other commands. splitKey := proto.Key(args.SplitKey) if len(splitKey) == 0 { - snapshotID, err := r.rm.CreateSnapshot() - if err != nil { - reply.SetGoError(util.Errorf("unable to create snapshot: %s", err)) - return - } - splitKey, err = engine.MVCCFindSplitKey(r.rm.Engine(), r.Desc.RaftID, r.Desc.StartKey, r.Desc.EndKey, snapshotID) - if releaseErr := r.rm.Engine().ReleaseSnapshot(snapshotID); releaseErr != nil { - log.Errorf("unable to release snapshot: %s", releaseErr) - } - if err != nil { + snap := r.rm.NewSnapshot() + defer snap.Stop() + var err error + if splitKey, err = engine.MVCCFindSplitKey(snap, r.Desc.RaftID, r.Desc.StartKey, r.Desc.EndKey); err != nil { reply.SetGoError(util.Errorf("unable to determine split key: %s", err)) return } diff --git a/storage/range_test.go b/storage/range_test.go index 33a59117b11a..6d8858e0930e 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -485,25 +485,6 @@ func heartbeatArgs(txn *proto.Transaction, raftID int64, storeID int32) ( return args, reply } -// internalSnapshotCopyArgs returns a InternalSnapshotCopyRequest and -// InternalSnapshotCopyResponse pair addressed to the default replica -// for the specified key and endKey. -func internalSnapshotCopyArgs(key []byte, endKey []byte, maxResults int64, snapshotID string, raftID int64, storeID int32) ( - *proto.InternalSnapshotCopyRequest, *proto.InternalSnapshotCopyResponse) { - args := &proto.InternalSnapshotCopyRequest{ - RequestHeader: proto.RequestHeader{ - Key: key, - EndKey: endKey, - RaftID: raftID, - Replica: proto.Replica{StoreID: storeID}, - }, - SnapshotID: snapshotID, - MaxResults: maxResults, - } - reply := &proto.InternalSnapshotCopyResponse{} - return args, reply -} - // internalMergeArgs returns a InternalMergeRequest and InternalMergeResponse // pair addressed to the default replica for the specified key. The request will // contain the given proto.Value. @@ -835,113 +816,6 @@ func TestRangeIdempotence(t *testing.T) { } } -// TestRangeSnapshot. -func TestRangeSnapshot(t *testing.T) { - s, rng, _, clock, _ := createTestRangeWithClock(t) - defer s.Stop() - - key1 := []byte("a") - key2 := []byte("b") - val1 := []byte("1") - val2 := []byte("2") - val3 := []byte("3") - - pArgs, pReply := putArgs(key1, val1, 1, s.StoreID()) - pArgs.Timestamp = clock.Now() - err := rng.AddCmd(proto.Put, pArgs, pReply, true) - - pArgs, pReply = putArgs(key2, val2, 1, s.StoreID()) - pArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.Put, pArgs, pReply, true) - - gArgs, gReply := getArgs(key1, 1, s.StoreID()) - gArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.Get, gArgs, gReply, true) - - if err != nil { - t.Fatalf("error : %s", err) - } - if !bytes.Equal(gReply.Value.Bytes, val1) { - t.Fatalf("the value %s in get result does not match the value %s in request", - gReply.Value.Bytes, val1) - } - - iscArgs, iscReply := internalSnapshotCopyArgs(engine.MVCCEncodeKey(engine.KeyLocalPrefix.PrefixEnd()), engine.KeyMax, 50, "", 1, s.StoreID()) - iscArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.InternalSnapshotCopy, iscArgs, iscReply, true) - if err != nil { - t.Fatalf("error : %s", err) - } - snapshotID := iscReply.SnapshotID - expectedKey := engine.MVCCEncodeKey(key1) - expectedVal := getSerializedMVCCValue(&proto.Value{Bytes: val1}) - if len(iscReply.Rows) != 4 || - !bytes.Equal(iscReply.Rows[0].Key, expectedKey) || - !bytes.Equal(iscReply.Rows[1].Value, expectedVal) { - t.Fatalf("the value %v of key %v in get result does not match the value %v of key %v in request", - iscReply.Rows[1].Value, iscReply.Rows[0].Key, expectedVal, expectedKey) - } - - pArgs, pReply = putArgs(key2, val3, 1, s.StoreID()) - pArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.Put, pArgs, pReply, true) - - // Scan with the previous snapshot will get the old value val2 of key2. - iscArgs, iscReply = internalSnapshotCopyArgs(engine.MVCCEncodeKey(engine.KeyLocalPrefix.PrefixEnd()), engine.KeyMax, 50, snapshotID, 1, s.StoreID()) - iscArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.InternalSnapshotCopy, iscArgs, iscReply, true) - if err != nil { - t.Fatalf("error : %s", err) - } - expectedKey = engine.MVCCEncodeKey(key2) - expectedVal = getSerializedMVCCValue(&proto.Value{Bytes: val2}) - if len(iscReply.Rows) != 4 || - !bytes.Equal(iscReply.Rows[2].Key, expectedKey) || - !bytes.Equal(iscReply.Rows[3].Value, expectedVal) { - t.Fatalf("the value %v of key %v in get result does not match the value %v of key %v in request", - iscReply.Rows[3].Value, iscReply.Rows[2].Key, expectedVal, expectedKey) - } - snapshotLastKey := proto.Key(iscReply.Rows[3].Key) - - // Create a new snapshot to cover the latest value. - iscArgs, iscReply = internalSnapshotCopyArgs(engine.MVCCEncodeKey(engine.KeyLocalPrefix.PrefixEnd()), engine.KeyMax, 50, "", 1, s.StoreID()) - iscArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.InternalSnapshotCopy, iscArgs, iscReply, true) - if err != nil { - t.Fatalf("error : %s", err) - } - snapshotID2 := iscReply.SnapshotID - expectedKey = engine.MVCCEncodeKey(key2) - expectedVal = getSerializedMVCCValue(&proto.Value{Bytes: val3}) - // Expect one more mvcc version. - if len(iscReply.Rows) != 5 || - !bytes.Equal(iscReply.Rows[2].Key, expectedKey) || - !bytes.Equal(iscReply.Rows[3].Value, expectedVal) { - t.Fatalf("the value %v of key %v in get result does not match the value %v of key %v in request", - iscReply.Rows[3].Value, iscReply.Rows[2].Key, expectedVal, expectedKey) - } - snapshot2LastKey := proto.Key(iscReply.Rows[4].Key) - - iscArgs, iscReply = internalSnapshotCopyArgs(snapshotLastKey.PrefixEnd(), engine.KeyMax, 50, snapshotID, 1, s.StoreID()) - iscArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.InternalSnapshotCopy, iscArgs, iscReply, true) - if err != nil { - t.Fatalf("error : %s", err) - } - if len(iscReply.Rows) != 0 { - t.Fatalf("error : %d", len(iscReply.Rows)) - } - iscArgs, iscReply = internalSnapshotCopyArgs(snapshot2LastKey.PrefixEnd(), engine.KeyMax, 50, snapshotID2, 1, s.StoreID()) - iscArgs.Timestamp = clock.Now() - err = rng.AddCmd(proto.InternalSnapshotCopy, iscArgs, iscReply, true) - if err != nil { - t.Fatalf("error : %s", err) - } - if len(iscReply.Rows) != 0 { - t.Fatalf("error : %d", len(iscReply.Rows)) - } -} - // TestEndTransactionBeforeHeartbeat verifies that a transaction // can be committed/aborted before being heartbeat. func TestEndTransactionBeforeHeartbeat(t *testing.T) { diff --git a/storage/store.go b/storage/store.go index e8eb6d2766d2..ff9bcc2c689e 100644 --- a/storage/store.go +++ b/storage/store.go @@ -23,7 +23,6 @@ import ( "fmt" "net" "sort" - "strconv" "sync" "time" @@ -646,17 +645,9 @@ func (s *Store) RemoveRange(rng *Range) error { return nil } -// CreateSnapshot creates a new snapshot, named using an internal counter. -func (s *Store) CreateSnapshot() (string, error) { - s.mu.Lock() - defer s.mu.Unlock() - key := engine.KeyLocalSnapshotIDGenerator - candidateID, err := engine.MVCCIncrement(s.engine, nil, key, proto.ZeroTimestamp, nil, 1) - if err != nil { - return "", err - } - snapshotID := strconv.FormatInt(candidateID, 10) - return snapshotID, s.engine.CreateSnapshot(snapshotID) +// NewSnapshot creates a new snapshot engine. +func (s *Store) NewSnapshot() engine.Engine { + return s.engine.NewSnapshot() } // Attrs returns the attributes of the underlying store. From 0c3d47364396b7ae498b8026a782b2064e8a9454 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Thu, 15 Jan 2015 11:21:05 -0500 Subject: [PATCH 2/2] Add unittest verifiying all methods of snapshot engines. Changed NewSnapshot behavior to log an error and return nil if called from a snapshot. --- storage/engine/engine_test.go | 146 +++++++++++++++++++++++++++++++++- storage/engine/in_mem.go | 10 ++- storage/engine/rocksdb.go | 11 ++- 3 files changed, 156 insertions(+), 11 deletions(-) diff --git a/storage/engine/engine_test.go b/storage/engine/engine_test.go index 74952f22fe44..13338d8c523d 100644 --- a/storage/engine/engine_test.go +++ b/storage/engine/engine_test.go @@ -51,13 +51,18 @@ func ensureRangeEqual(t *testing.T, sortedKeys []string, keyMap map[string][]byt } } +var ( + inMemAttrs = proto.Attributes{Attrs: []string{"mem"}} + rocksDBAttrs = proto.Attributes{Attrs: []string{"ssd"}} +) + // runWithAllEngines creates a new engine of each supported type and // invokes the supplied test func with each instance. func runWithAllEngines(test func(e Engine, t *testing.T), t *testing.T) { - inMem := NewInMem(proto.Attributes{}, 10<<20) + inMem := NewInMem(inMemAttrs, 10<<20) loc := fmt.Sprintf("%s/data_%d", os.TempDir(), time.Now().UnixNano()) - rocksdb := NewRocksDB(proto.Attributes{Attrs: []string{"ssd"}}, loc) + rocksdb := NewRocksDB(rocksDBAttrs, loc) err := rocksdb.Start() if err != nil { t.Fatalf("could not create new rocksdb db instance at %s: %v", loc, err) @@ -507,6 +512,8 @@ func TestSnapshot(t *testing.T) { } snap := engine.NewSnapshot() + defer snap.Stop() + val2 := []byte("2") engine.Put(key, val2) val, _ = engine.Get(key) @@ -536,7 +543,140 @@ func TestSnapshot(t *testing.T) { t.Fatalf("the value %s in get result does not match the value %s in request", keyvalsSnapshot[0].Value, val1) } - snap.Stop() + }, t) +} + +// TestSnapshotMethods verifies that snapshots allow only read-only +// engine operations. +func TestSnapshotMethods(t *testing.T) { + runWithAllEngines(func(engine Engine, t *testing.T) { + keys := [][]byte{[]byte("a"), []byte("b")} + vals := [][]byte{[]byte("1"), []byte("2")} + for i := range keys { + engine.Put(keys[i], vals[i]) + } + snap := engine.NewSnapshot() + defer snap.Stop() + + // Verify Attrs. + var attrs proto.Attributes + switch engine.(type) { + case *InMem: + attrs = inMemAttrs + case *RocksDB: + attrs = rocksDBAttrs + } + if !reflect.DeepEqual(engine.Attrs(), attrs) { + t.Errorf("attrs mismatch; expected %+v, got %+v", attrs, engine.Attrs()) + } + + // Verify Put is error. + if err := snap.Put([]byte("c"), []byte("3")); err == nil { + t.Error("expected error on Put to snapshot") + } + + // Verify Get. + valSnapshot, err := snap.Get(keys[0]) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(vals[0], valSnapshot) { + t.Fatalf("the value %s in get result does not match the value %s in snapshot", + vals[0], valSnapshot) + } + + // Verify Scan. + keyvals, _ := Scan(engine, proto.EncodedKey(KeyMin), proto.EncodedKey(KeyMax), 0) + keyvalsSnapshot, err := Scan(snap, proto.EncodedKey(KeyMin), proto.EncodedKey(KeyMax), 0) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(keyvals, keyvalsSnapshot) { + t.Fatalf("the key/values %v in scan result does not match the value %s in snapshot", + keyvals, keyvalsSnapshot) + } + + // Verify Iterate. + index := 0 + if err := snap.Iterate(proto.EncodedKey(KeyMin), proto.EncodedKey(KeyMax), func(kv proto.RawKeyValue) (bool, error) { + if !bytes.Equal(kv.Key, keys[index]) || !bytes.Equal(kv.Value, vals[index]) { + t.Errorf("%d: key/value not equal between expected and snapshot: %s/%s, %s/%s", keys[index], vals[index], kv.Key, kv.Value) + } + index++ + return false, nil + }); err != nil { + t.Fatal(err) + } + + // Verify Clear is error. + if err := snap.Clear(keys[0]); err == nil { + t.Error("expected error on Clear to snapshot") + } + + // Verify WriteBatch is error. + if err := snap.WriteBatch([]interface{}{BatchDelete{proto.RawKeyValue{Key: keys[0]}}}); err == nil { + t.Error("expected error on WriteBatch to snapshot") + } + + // Verify Merge is error. + if err := snap.Merge([]byte("merge-key"), appender("x")); err == nil { + t.Error("expected error on Merge to snapshot") + } + + // Verify Capacity. + capacity, err := engine.Capacity() + if err != nil { + t.Fatal(err) + } + capacitySnapshot, err := snap.Capacity() + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(capacity, capacitySnapshot) { + t.Errorf("expected capacities to be equal: %v != %v", capacity, capacitySnapshot) + } + + // Verify ApproximateSize. + approx, err := engine.ApproximateSize(proto.EncodedKey(KeyMin), proto.EncodedKey(KeyMax)) + if err != nil { + t.Fatal(err) + } + approxSnapshot, err := snap.ApproximateSize(proto.EncodedKey(KeyMin), proto.EncodedKey(KeyMax)) + if err != nil { + t.Fatal(err) + } + if approx != approxSnapshot { + t.Errorf("expected approx sizes to be equal: %d != %d", approx, approxSnapshot) + } + + // Write a new key to engine. + newKey := []byte("c") + newVal := []byte("3") + if err := engine.Put(newKey, newVal); err != nil { + t.Fatal(err) + } + + // Verify NewIterator still iterates over original snapshot. + iter := snap.NewIterator() + iter.Seek(newKey) + if iter.Valid() { + t.Error("expected invalid iterator when seeking to element which shouldn't be visible to snapshot") + } + + // Verify NewSnapshot returns nil. + if newSnap := snap.NewSnapshot(); newSnap != nil { + t.Error("expected NewSnapshot on snapshot to return nil; got %+v", newSnap) + } + + // Verify NewBatch returns nil. + if batch := snap.NewBatch(); batch != nil { + t.Error("expected NewBatch on snapshot to return nil; got %+v", batch) + } + + // Verify Commit is error. + if err := snap.Commit(); err == nil { + t.Error("expected error on Commit to snapshot") + } }, t) } diff --git a/storage/engine/in_mem.go b/storage/engine/in_mem.go index 0d9e282cb576..237c4bfc8e28 100644 --- a/storage/engine/in_mem.go +++ b/storage/engine/in_mem.go @@ -305,7 +305,7 @@ func cloneNode(a *llrb.Node) *llrb.Node { return newNode } -// Returns a new Batch wrapping this in-memory engine. +// NewBatch returns a new Batch wrapping this in-memory engine. func (in *InMem) NewBatch() Engine { return &Batch{engine: in} } @@ -345,7 +345,13 @@ func (in *inMemSnapshot) Merge(key proto.EncodedKey, value []byte) error { func (in *inMemSnapshot) SetGCTimeouts(minTxnTS, minRCacheTS int64) { } -// NewBatch is illegal for snapshot and returns an error. +// NewSnapshot is illegal for snapshot and returns nil. +func (in *inMemSnapshot) NewSnapshot() Engine { + log.Errorf("cannot create a NewSnapshot from a snapshot") + return nil +} + +// NewBatch is illegal for snapshot and returns nil. func (in *inMemSnapshot) NewBatch() Engine { log.Errorf("cannot create a NewBatch from a snapshot") return nil diff --git a/storage/engine/rocksdb.go b/storage/engine/rocksdb.go index 0379cb1f6e2e..507c63763f64 100644 --- a/storage/engine/rocksdb.go +++ b/storage/engine/rocksdb.go @@ -377,7 +377,7 @@ func (r *RocksDB) NewSnapshot() Engine { } } -// Returns a new Batch wrapping this rocksdb engine. +// NewBatch returns a new Batch wrapping this rocksdb engine. func (r *RocksDB) NewBatch() Engine { return &Batch{engine: r} } @@ -461,14 +461,13 @@ func (r *rocksDBSnapshot) NewIterator() Iterator { return newRocksDBIterator(r.parent.rdb, r.handle) } -// NewSnapshot returns a new instance of a read-only snapshot -// from the original RocksDB instance. This will be an updated -// snapshot. +// NewSnapshot is illegal for snapshot and returns nil. func (r *rocksDBSnapshot) NewSnapshot() Engine { - return r.parent.NewSnapshot() + log.Errorf("cannot create a NewSnapshot from a snapshot") + return nil } -// NewBatch is illegal for snapshot and returns an error. +// NewBatch is illegal for snapshot and returns nil. func (r *rocksDBSnapshot) NewBatch() Engine { log.Errorf("cannot create a NewBatch from a snapshot") return nil