Skip to content

Commit

Permalink
Merge pull request #256 from cockroachdb/spencerkimball/snapshot-engine
Browse files Browse the repository at this point in the history
Reimplement snapshots as a new implementation of Engine interface.
  • Loading branch information
spencerkimball committed Jan 15, 2015
2 parents d7efcaa + 0c3d473 commit cf09deb
Show file tree
Hide file tree
Showing 16 changed files with 378 additions and 462 deletions.
5 changes: 2 additions & 3 deletions kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestKVDBCoverage(t *testing.T) {
containsReq.Key = key
containsResp := &proto.ContainsResponse{}
if err := kvClient.Call(proto.Contains, containsReq, containsResp); err != nil || containsResp.Error != nil {
t.Fatal("%s, %s", err, containsResp.GoError())
t.Fatalf("%s, %s", err, containsResp.GoError())
}
if !containsResp.Exists {
t.Error("expected contains to be true")
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestKVDBCoverage(t *testing.T) {
t.Fatalf("%s, %s", err, delResp.GoError())
}
if err := kvClient.Call(proto.Contains, containsReq, containsResp); err != nil || containsResp.Error != nil {
t.Fatal("%s, %s", err, containsResp.GoError())
t.Fatalf("%s, %s", err, containsResp.GoError())
}
if containsResp.Exists {
t.Error("expected contains to be false after delete")
Expand Down Expand Up @@ -170,7 +170,6 @@ func TestKVDBInternalMethods(t *testing.T) {
{proto.InternalHeartbeatTxn, &proto.InternalHeartbeatTxnRequest{}, &proto.InternalHeartbeatTxnResponse{}},
{proto.InternalPushTxn, &proto.InternalPushTxnRequest{}, &proto.InternalPushTxnResponse{}},
{proto.InternalResolveIntent, &proto.InternalResolveIntentRequest{}, &proto.InternalResolveIntentResponse{}},
{proto.InternalSnapshotCopy, &proto.InternalSnapshotCopyRequest{}, &proto.InternalSnapshotCopyResponse{}},
{proto.InternalMerge, &proto.InternalMergeRequest{}, &proto.InternalMergeResponse{}},
}
// Verify non-public methods experience bad request errors.
Expand Down
23 changes: 7 additions & 16 deletions proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ var AllMethods = stringSet{
InternalHeartbeatTxn: struct{}{},
InternalPushTxn: struct{}{},
InternalResolveIntent: struct{}{},
InternalSnapshotCopy: struct{}{},
InternalMerge: struct{}{},
}

Expand Down Expand Up @@ -131,20 +130,18 @@ var InternalMethods = stringSet{
InternalHeartbeatTxn: struct{}{},
InternalPushTxn: struct{}{},
InternalResolveIntent: struct{}{},
InternalSnapshotCopy: struct{}{},
InternalMerge: struct{}{},
}

// ReadMethods specifies the set of methods which read and return data.
var ReadMethods = stringSet{
Contains: struct{}{},
Get: struct{}{},
ConditionalPut: struct{}{},
Increment: struct{}{},
Scan: struct{}{},
ReapQueue: struct{}{},
InternalRangeLookup: struct{}{},
InternalSnapshotCopy: struct{}{},
Contains: struct{}{},
Get: struct{}{},
ConditionalPut: struct{}{},
Increment: struct{}{},
Scan: struct{}{},
ReapQueue: struct{}{},
InternalRangeLookup: struct{}{},
}

// WriteMethods specifies the set of methods which write data.
Expand Down Expand Up @@ -347,8 +344,6 @@ func MethodForRequest(req Request) (string, error) {
return InternalPushTxn, nil
case *InternalResolveIntentRequest:
return InternalResolveIntent, nil
case *InternalSnapshotCopyRequest:
return InternalSnapshotCopy, nil
case *InternalMergeRequest:
return InternalMerge, nil
}
Expand Down Expand Up @@ -402,8 +397,6 @@ func CreateArgs(method string) (Request, error) {
return &InternalPushTxnRequest{}, nil
case InternalResolveIntent:
return &InternalResolveIntentRequest{}, nil
case InternalSnapshotCopy:
return &InternalSnapshotCopyRequest{}, nil
case InternalMerge:
return &InternalMergeRequest{}, nil
}
Expand Down Expand Up @@ -447,8 +440,6 @@ func CreateReply(method string) (Response, error) {
return &InternalPushTxnResponse{}, nil
case InternalResolveIntent:
return &InternalResolveIntentResponse{}, nil
case InternalSnapshotCopy:
return &InternalSnapshotCopyResponse{}, nil
case InternalMerge:
return &InternalMergeResponse{}, nil
}
Expand Down
4 changes: 0 additions & 4 deletions proto/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ const (
// InternalResolveIntent resolves existing write intents for a key or
// key range.
InternalResolveIntent = "InternalResolveIntent"
// InternalSnapshotCopy scans the key range specified by start key through
// end key up to some maximum number of results from the given snapshot_id.
// It will create a snapshot if snapshot_id is empty.
InternalSnapshotCopy = "InternalSnapshotCopy"
// InternalMerge merges a given value into the specified key. Merge is a
// high-performance operation provided by underlying data storage for values
// which are accumulated over several writes. Because it is not
Expand Down
24 changes: 1 addition & 23 deletions proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,6 @@ message InternalResolveIntentResponse {
optional ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// An InternalSnapshotCopyRequest is arguments to the InternalSnapshotCopy()
// method. It specifies the start and end keys for the scan and the
// maximum number of results from the given snapshot_id. It will create
// a snapshot if snapshot_id is empty.
message InternalSnapshotCopyRequest {
optional RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// Optional, a new snapshot will be created if it is empty.
optional string snapshot_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "SnapshotID"];
// Must be > 0.
optional int64 max_results = 3 [(gogoproto.nullable) = false];
}

// An InternalSnapshotCopyResponse is the return value from the
// InternalSnapshotCopy() method.
message InternalSnapshotCopyResponse {
optional ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
optional string snapshot_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "SnapshotID"];
// Empty if no rows were scanned.
repeated RawKeyValue rows = 3 [(gogoproto.nullable) = false];
}

// An InternalMergeRequest contains arguments to the InternalMerge() method. It
// specifies a key and a value which should be merged into the existing value at
// that key.
Expand Down Expand Up @@ -190,8 +169,7 @@ message InternalRaftCommandUnion {
optional InternalHeartbeatTxnRequest internal_heartbeat_txn = 32;
optional InternalPushTxnRequest internal_push_txn = 33;
optional InternalResolveIntentRequest internal_resolve_intent = 34;
optional InternalSnapshotCopyRequest internal_snapshot_copy = 35;
optional InternalMergeRequest internal_merge_response = 36;
optional InternalMergeRequest internal_merge_response = 35;
}

// An InternalRaftCommand is a command which can be serialized and
Expand Down
5 changes: 0 additions & 5 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,6 @@ func (n *Node) InternalResolveIntent(args *proto.InternalResolveIntentRequest, r
return n.executeCmd(proto.InternalResolveIntent, args, reply)
}

// InternalSnapshotCopy .
func (n *Node) InternalSnapshotCopy(args *proto.InternalSnapshotCopyRequest, reply *proto.InternalSnapshotCopyResponse) error {
return n.executeCmd(proto.InternalSnapshotCopy, args, reply)
}

// InternalMerge .
func (n *Node) InternalMerge(args *proto.InternalMergeRequest, reply *proto.InternalMergeResponse) error {
return n.executeCmd(proto.InternalMerge, args, reply)
Expand Down
25 changes: 5 additions & 20 deletions storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,6 @@ func (b *Batch) Capacity() (StoreCapacity, error) {
func (b *Batch) SetGCTimeouts(minTxnTS, minRCacheTS int64) {
}

// CreateSnapshot returns an error if called on a Batch.
func (b *Batch) CreateSnapshot(snapshotID string) error {
return util.Errorf("cannot create a snapshot from a Batch")
}

// ReleaseSnapshot returns an error if called on a Batch.
func (b *Batch) ReleaseSnapshot(snapshotID string) error {
return util.Errorf("cannot release a snapshot from a Batch")
}

// GetSnapshot returns an error if called on a Batch.
func (b *Batch) GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error) {
return nil, util.Errorf("cannot get with a snapshot from a Batch")
}

// IterateSnapshot returns an error if called on a Batch.
func (b *Batch) IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error {
return util.Errorf("cannot iterate with a snapshot from a Batch")
}

// ApproximateSize returns an error if called on a Batch.
func (b *Batch) ApproximateSize(start, end proto.EncodedKey) (uint64, error) {
return 0, util.Errorf("cannot get approximate size from a Batch")
Expand All @@ -240,6 +220,11 @@ func (b *Batch) NewIterator() Iterator {
return newBatchIterator(b.engine, &b.updates)
}

// NewSnapshot returns nil if called on a Batch.
func (b *Batch) NewSnapshot() Engine {
return nil
}

// NewBatch returns a new Batch instance wrapping same underlying engine.
func (b *Batch) NewBatch() Engine {
return &Batch{engine: b.engine}
Expand Down
38 changes: 6 additions & 32 deletions storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ type Iterator interface {

// Engine is the interface that wraps the core operations of a
// key/value store.
// TODO(Jiang-Ming,Spencer): Remove some of the *Snapshot methods and have
// their non-snapshot counterparts accept a snapshotID which is used unless
// empty.
type Engine interface {
// Start initializes and starts the engine.
Start() error
Expand Down Expand Up @@ -115,36 +112,26 @@ type Engine interface {
// Rows with timestamps less than the associated value will be GC'd
// during compaction.
SetGCTimeouts(minTxnTS, minRCacheTS int64)
// CreateSnapshot creates a snapshot handle from engine.
CreateSnapshot(snapshotID string) error
// ReleaseSnapshot releases the existing snapshot handle for the
// given snapshotID.
ReleaseSnapshot(snapshotID string) error
// GetSnapshot returns the value for the given key from the given
// snapshotID, nil otherwise.
GetSnapshot(key proto.EncodedKey, snapshotID string) ([]byte, error)
// IterateSnapshot scans from start to end keys, visiting at
// most max key/value pairs from the specified snapshot ID.
IterateSnapshot(start, end proto.EncodedKey, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error
// ApproximateSize returns the approximate number of bytes the engine is
// using to store data for the given range of keys.
ApproximateSize(start, end proto.EncodedKey) (uint64, error)
// NewIterator returns a new instance of an Iterator over this
// engine. The caller must invoke Iterator.Close() when finished with
// the iterator to free resources.
NewIterator() Iterator
// NewSnapshot returns a new instance of a read-only snapshot
// engine. Snapshots are instantaneous and, as long as they're
// released relatively quickly, inexpensive. Snapshots are released
// by invoking Stop(). Note that snapshots must not be used after the
// original engine has been stopped.
NewSnapshot() Engine
// NewBatch returns a new instance of a batched engine which wraps
// this engine. Batched engines accumulate all mutations and apply
// them atomically on a call to Commit().
NewBatch() Engine
// Commit atomically applies any batched updates to the underlying
// engine. This is a noop unless the engine was created via NewBatch().
Commit() error

// TODO(petermattis): Remove the WriteBatch functionality from this
// interface. Add a "NewSnapshot() Engine" method which works
// similarly to NewBatch and remove CreateSnapshot, GetSnapshot,
// IterateSnapshot.
}

// A BatchDelete is a delete operation executed as part of an atomic batch.
Expand Down Expand Up @@ -258,19 +245,6 @@ func Scan(engine Engine, start, end proto.EncodedKey, max int64) ([]proto.RawKey
return kvs, err
}

// ScanSnapshot scans using the given snapshot ID.
func ScanSnapshot(engine Engine, start, end proto.EncodedKey, max int64, snapshotID string) ([]proto.RawKeyValue, error) {
var kvs []proto.RawKeyValue
err := engine.IterateSnapshot(start, end, snapshotID, func(kv proto.RawKeyValue) (bool, error) {
if max != 0 && int64(len(kvs)) >= max {
return true, nil
}
kvs = append(kvs, kv)
return false, nil
})
return kvs, err
}

// ClearRange removes a set of entries, from start (inclusive) to end
// (exclusive). This function returns the number of entries
// removed. Either all entries within the range will be deleted, or
Expand Down
Loading

0 comments on commit cf09deb

Please sign in to comment.