Skip to content

Commit

Permalink
Merge pull request #124 from cockroachdb/spencerkimball/fix-random-runs
Browse files Browse the repository at this point in the history
Replace Engine.Scan with Engine.Iterate
  • Loading branch information
spencerkimball committed Oct 14, 2014
2 parents 00f206a + e8db4f0 commit 80379ed
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 221 deletions.
124 changes: 68 additions & 56 deletions storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package engine

import (
"bytes"

"code.google.com/p/biogo.store/llrb"
gogoproto "code.google.com/p/gogoprotobuf/proto"
"github.com/cockroachdb/cockroach/proto"
Expand Down Expand Up @@ -99,70 +97,84 @@ func (b *Batch) Get(key Key) ([]byte, error) {
return b.engine.Get(key)
}

// Scan scans from both the updates tree and the underlying engine
// and combines the results, up to max.
func (b *Batch) Scan(start, end Key, max int64) ([]proto.RawKeyValue, error) {
// First, get up to max key value pairs from the wrapped engine.
engs, err := b.engine.Scan(start, end, max)
if err != nil {
return nil, err
}
engIdx := 0
var kvs []proto.RawKeyValue

// Now, scan the updates tree for the same range, combining as we go
// up to max entries.
b.updates.DoRange(func(n llrb.Comparable) (done bool) {
// First add all values from engs slice less than the current updates key.
for engIdx < len(engs) && bytes.Compare(engs[engIdx].Key, n.(proto.KeyGetter).KeyGet()) < 0 {
kvs = append(kvs, engs[engIdx])
engIdx++
if max != 0 && int64(len(kvs)) >= max {
return true
}
}
engKV := proto.RawKeyValue{Key: KeyMax}
if engIdx < len(engs) {
engKV = engs[engIdx]
}
// iterateUpdates scans the updates tree from start to end, invoking f
// on each value until f returns false or an error.
func (b *Batch) iterateUpdates(start, end Key, f func(proto.RawKeyValue) (bool, error)) (bool, error) {
var done bool
var err error
// Scan the updates tree for the key range, merging as we go.
b.updates.DoRange(func(n llrb.Comparable) bool {
switch t := n.(type) {
case BatchDelete: // On delete, just skip the corresponding engine entry.
if bytes.Equal(t.Key, engKV.Key) {
engIdx++
}
case BatchDelete: // On delete, skip.
case BatchPut: // On put, override the corresponding engine entry.
if bytes.Equal(t.Key, engKV.Key) {
engIdx++
}
kvs = append(kvs, t.RawKeyValue)
done, err = f(t.RawKeyValue)
case BatchMerge: // On merge, merge with corresponding engine entry.
var existingBytes []byte
if bytes.Equal(t.Key, engKV.Key) {
existingBytes = engKV.Value
engIdx++
}
kv := proto.RawKeyValue{Key: t.Key}
kv.Value, err = goMerge(existingBytes, t.Value)
if err != nil { // break out of DoRange on error.
return true
kv.Value, err = goMerge([]byte(nil), t.Value)
if err == nil {
done, err = f(kv)
}
kvs = append(kvs, kv)
}
return max != 0 && int64(len(kvs)) >= max
return done || err != nil
}, proto.RawKeyValue{Key: start}, proto.RawKeyValue{Key: end})
return done, err
}

// Check for common case of no matches in the updates map.
if len(kvs) == 0 {
return engs[engIdx:], err
}
// Otherwise, append remaining entries in engs up to max.
lastIdx := int64(len(engs))
if max != 0 {
if (lastIdx - int64(engIdx)) > max-int64(len(kvs)) {
lastIdx = max - int64(len(kvs)-engIdx)
// Iterate invokes f on key/value pairs merged from the underlying
// engine and pending batch updates. If f returns done or an error,
// the iteration ends and propagates the error.
//
// TODO(spencer): this implementation could benefit from an
// iterator-style interface to the update map. If/when one is
// provided by the llrb implementation it should be used here
// to make this code more efficient.
func (b *Batch) Iterate(start, end Key, f func(proto.RawKeyValue) (bool, error)) error {
last := start
if err := b.engine.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) {
// Merge iteration from updates tree at each key/value.
done, err := b.iterateUpdates(last, kv.Key, f)
last = Key(kv.Key).Next()
if !done && err == nil {
val := b.updates.Get(proto.RawKeyValue{Key: kv.Key})
if val != nil {
switch t := val.(type) {
case BatchDelete:
case BatchPut:
f(t.RawKeyValue)
case BatchMerge:
mergedKV := proto.RawKeyValue{Key: t.Key}
mergedKV.Value, err = goMerge(kv.Value, t.Value)
if err == nil {
done, err = f(mergedKV)
}
}
} else {
done, err = f(kv)
}
}
return done, err
}); err != nil {
return err
}
return append(kvs, engs[engIdx:lastIdx]...), err
// Final iteration from updates tree.
if _, err := b.iterateUpdates(last, end, f); err != nil {
return err
}
return nil
}

// Scan scans from both the updates tree and the underlying engine
// and combines the results, up to max.
func (b *Batch) Scan(start, end Key, max int64) ([]proto.RawKeyValue, error) {
var kvs []proto.RawKeyValue
err := b.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) {
if max != 0 && int64(len(kvs)) >= max {
return true, nil
}
kvs = append(kvs, kv)
return false, nil
})
return kvs, err
}

// Clear stores the key as a BatchDelete in the updates tree.
Expand Down
33 changes: 30 additions & 3 deletions storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBatchBasics(t *testing.T) {
{Key: Key("b"), Value: []byte("value")},
{Key: Key("c"), Value: appender("foo")},
}
kvs, err := e.Scan(KeyMin, KeyMax, 0)
kvs, err := Scan(e, KeyMin, KeyMax, 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -81,7 +81,7 @@ func TestBatchBasics(t *testing.T) {
if err := b.Commit(); err != nil {
t.Fatal(err)
}
kvs, err = e.Scan(KeyMin, KeyMax, 0)
kvs, err = Scan(e, KeyMin, KeyMax, 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestBatchScan(t *testing.T) {
t.Fatal(err)
}
for i, scan := range scans {
kvs, err := e.Scan(scan.start, scan.end, scan.max)
kvs, err := Scan(e, scan.start, scan.end, scan.max)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -323,6 +323,33 @@ func TestBatchScanWithDelete(t *testing.T) {
}
}

// TestBatchScanMaxWithDeleted verifies that if a deletion
// in the updates map shadows an entry from the engine, the
// max on a scan is still reached.
func TestBatchScanMaxWithDeleted(t *testing.T) {
e := NewInMem(proto.Attributes{}, 1<<20)
b := NewBatch(e)
// Write two values.
if err := e.Put(Key("a"), []byte("value1")); err != nil {
t.Fatal(err)
}
if err := e.Put(Key("b"), []byte("value2")); err != nil {
t.Fatal(err)
}
// Now, delete "a" in batch.
if err := b.Clear(Key("a")); err != nil {
t.Fatal(err)
}
// A scan with max=1 should scan "b".
kvs, err := b.Scan(KeyMin, KeyMax, 1)
if err != nil {
t.Fatal(err)
}
if len(kvs) != 1 || !bytes.Equal(kvs[0].Key, []byte("b")) {
t.Errorf("expected scan of \"b\"; got %v", kvs)
}
}

// TestBatchConcurrency verifies operation of batch when the
// underlying engine has concurrent modifications to overlapping
// keys. This should never happen with the way Cockroach uses
Expand Down
78 changes: 37 additions & 41 deletions storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ type Engine interface {
Put(key Key, value []byte) error
// Get returns the value for the given key, nil otherwise.
Get(key Key) ([]byte, error)
// Scan returns up to max key/value objects starting from
// start (inclusive) and ending at end (non-inclusive).
// Specify max=0 for unbounded scans.
Scan(start, end Key, max int64) ([]proto.RawKeyValue, error)
// Iterate scans from start to end keys, visiting at most max
// key/value pairs. On each key value pair, the function f is
// invoked. If f returns an error or if the scan itself encounters
// an error, the iteration will stop and return f.
Iterate(start, end Key, f func(proto.RawKeyValue) (bool, error)) error
// Clear removes the item from the db with the given key.
// Note that clear actually removes entries from the storage
// engine, rather than inserting tombstones.
Expand Down Expand Up @@ -88,11 +89,9 @@ type Engine interface {
// GetSnapshot returns the value for the given key from the given
// snapshotID, nil otherwise.
GetSnapshot(key Key, snapshotID string) ([]byte, error)
// ScanSnapshot returns up to max key/value objects starting from
// start (inclusive) and ending at end (non-inclusive) from the
// given snapshotID.
// Specify max=0 for unbounded scans.
ScanSnapshot(start, end Key, max int64, snapshotID string) ([]proto.RawKeyValue, error)
// IterateSnapshot scans from start to end keys, visiting at
// most max key/value pairs from the specified snapshot ID.
IterateSnapshot(start, end Key, snapshotID string, f func(proto.RawKeyValue) (bool, error)) error
// ApproximateSize returns the approximate number of bytes the engine is
// using to store data for the given range of keys.
ApproximateSize(start, end Key) (uint64, error)
Expand Down Expand Up @@ -213,6 +212,34 @@ func Increment(engine Engine, key Key, inc int64) (int64, error) {
return r, nil
}

// Scan returns up to max key/value objects starting from
// start (inclusive) and ending at end (non-inclusive).
// Specify max=0 for unbounded scans.
func Scan(engine Engine, start, end Key, max int64) ([]proto.RawKeyValue, error) {
var kvs []proto.RawKeyValue
err := engine.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) {
if max != 0 && int64(len(kvs)) >= max {
return true, nil
}
kvs = append(kvs, kv)
return false, nil
})
return kvs, err
}

// ScanSnapshot scans using the given snapshot ID.
func ScanSnapshot(engine Engine, start, end Key, max int64, snapshotID string) ([]proto.RawKeyValue, error) {
var kvs []proto.RawKeyValue
err := engine.IterateSnapshot(start, end, snapshotID, func(kv proto.RawKeyValue) (bool, error) {
if max != 0 && int64(len(kvs)) >= max {
return true, nil
}
kvs = append(kvs, kv)
return false, nil
})
return kvs, err
}

// ClearRange removes a set of entries, from start (inclusive)
// to end (exclusive), up to max entries. If max is 0, all
// entries between start and end are deleted. This function
Expand All @@ -222,7 +249,7 @@ func Increment(engine Engine, key Key, inc int64) (int64, error) {
// removes entries from the storage engine, rather than inserting
// tombstones.
func ClearRange(engine Engine, start, end Key, max int64) (int, error) {
scanned, err := engine.Scan(start, end, max)
scanned, err := Scan(engine, start, end, max)

if err != nil {
return 0, err
Expand All @@ -241,34 +268,3 @@ func ClearRange(engine Engine, start, end Key, max int64) (int, error) {
}
return numElements, nil
}

// iterateRange scans the given key range using the underlying engine
// in blocks of at most chunkSize results, invoking f for each chunk
// read, until there are no more results. An error is returned if an
// underlying scan returns an error, or if f does. The read is
// executed against a snapshot if the specified snapshotID is
// non-empty.
func iterateRange(eng Engine, startKey, endKey Key, chunkSize int64,
snapshotID string, f func([]proto.RawKeyValue) error) error {
var kvs []proto.RawKeyValue
var err error
hasSnap := len(snapshotID) > 0
for {
if hasSnap {
kvs, err = eng.ScanSnapshot(startKey, endKey, chunkSize, snapshotID)
} else {
kvs, err = eng.Scan(startKey, endKey, chunkSize)
}
if err != nil {
return err
}
if err = f(kvs); err != nil {
return err
}
if len(kvs) == 0 {
break
}
startKey = Key(kvs[len(kvs)-1].Key).Next()
}
return nil
}
14 changes: 7 additions & 7 deletions storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,20 @@ func TestEngineScan1(t *testing.T) {
}
sort.Strings(sortedKeys)

keyvals, err := engine.Scan([]byte("chinese"), []byte("german"), 0)
keyvals, err := Scan(engine, []byte("chinese"), []byte("german"), 0)
if err != nil {
t.Fatalf("could not run scan: %v", err)
}
ensureRangeEqual(t, sortedKeys[1:4], keyMap, keyvals)

// Check an end of range which does not equal an existing key.
keyvals, err = engine.Scan([]byte("chinese"), []byte("german1"), 0)
keyvals, err = Scan(engine, []byte("chinese"), []byte("german1"), 0)
if err != nil {
t.Fatalf("could not run scan: %v", err)
}
ensureRangeEqual(t, sortedKeys[1:5], keyMap, keyvals)

keyvals, err = engine.Scan([]byte("chinese"), []byte("german"), 2)
keyvals, err = Scan(engine, []byte("chinese"), []byte("german"), 2)
if err != nil {
t.Fatalf("could not run scan: %v", err)
}
Expand All @@ -320,7 +320,7 @@ func TestEngineScan1(t *testing.T) {
// a special case in engine.scan, that's why we test it here.
startKeys := []Key{Key("cat"), Key("")}
for _, startKey := range startKeys {
keyvals, err := engine.Scan(startKey, KeyMax, 0)
keyvals, err := Scan(engine, startKey, KeyMax, 0)
if err != nil {
t.Fatalf("could not run scan: %v", err)
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestEngineIncrement(t *testing.T) {
}

func verifyScan(start, end Key, max int64, expKeys []Key, engine Engine, t *testing.T) {
kvs, err := engine.Scan(start, end, max)
kvs, err := Scan(engine, start, end, max)
if err != nil {
t.Errorf("scan %q-%q: expected no error, but got %s", string(start), string(end), err)
}
Expand Down Expand Up @@ -496,8 +496,8 @@ func TestSnapshot(t *testing.T) {
valSnapshot, val1)
}

keyvals, _ := engine.Scan(key, KeyMax, 0)
keyvalsSnapshot, error := engine.ScanSnapshot(key, KeyMax, 0, snapshotID)
keyvals, _ := Scan(engine, key, KeyMax, 0)
keyvalsSnapshot, error := ScanSnapshot(engine, key, KeyMax, 0, snapshotID)
if error != nil {
t.Fatalf("error : %s", error)
}
Expand Down
Loading

0 comments on commit 80379ed

Please sign in to comment.