Skip to content

Commit

Permalink
Merge pull request #13864 from bdarnell/span-set-engine
Browse files Browse the repository at this point in the history
storage: Enforce that all keys accessed are declared in SpanSet
  • Loading branch information
bdarnell authored Mar 16, 2017
2 parents bc8b96c + 0edec1c commit 833b6f3
Show file tree
Hide file tree
Showing 16 changed files with 1,017 additions and 146 deletions.
6 changes: 6 additions & 0 deletions pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ func evalWriteBatch(
if err != nil {
return storage.EvalResult{}, err
}
// If this is a SpanSetIterator, we have to unwrap it because
// ClearIterRange needs a plain rocksdb iterator (and can't unwrap
// it itself because of import cycles).
// TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't
// yet work with read-write batches.
if ssi, ok := iter.(*storage.SpanSetIterator); ok {
iter = ssi.Iterator()
}
if err := batch.ClearIterRange(iter, mvccStartKey, mvccEndKey); err != nil {
return storage.EvalResult{}, err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,18 @@ func (s Span) Overlaps(o Span) bool {
return bytes.Compare(s.EndKey, o.Key) > 0 && bytes.Compare(s.Key, o.EndKey) < 0
}

// Contains returns whether the receiver contains the given span.
func (s Span) Contains(o Span) bool {
if len(s.EndKey) == 0 && len(o.EndKey) == 0 {
return s.Key.Equal(o.Key)
} else if len(s.EndKey) == 0 {
return false
} else if len(o.EndKey) == 0 {
return bytes.Compare(o.Key, s.Key) >= 0 && bytes.Compare(o.Key, s.EndKey) < 0
}
return bytes.Compare(s.Key, o.Key) <= 0 && bytes.Compare(s.EndKey, o.EndKey) >= 0
}

// Spans is a slice of spans.
type Spans []Span

Expand Down
35 changes: 35 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,41 @@ func TestSpanOverlaps(t *testing.T) {
}
}

// TestSpanContains verifies methods to check whether a key
// or key range is contained within the span.
func TestSpanContains(t *testing.T) {
s := Span{Key: []byte("a"), EndKey: []byte("b")}

testData := []struct {
start, end []byte
contains bool
}{
// Single keys.
{[]byte("a"), nil, true},
{[]byte("aa"), nil, true},
{[]byte("`"), nil, false},
{[]byte("b"), nil, false},
{[]byte("c"), nil, false},
// Key ranges.
{[]byte("a"), []byte("b"), true},
{[]byte("a"), []byte("aa"), true},
{[]byte("aa"), []byte("b"), true},
{[]byte("0"), []byte("9"), false},
{[]byte("`"), []byte("a"), false},
{[]byte("b"), []byte("bb"), false},
{[]byte("0"), []byte("bb"), false},
{[]byte("aa"), []byte("bb"), false},
// TODO(bdarnell): check for invalid ranges in Span.Contains?
//{[]byte("b"), []byte("a"), false},
}
for i, test := range testData {
if s.Contains(Span{test.start, test.end}) != test.contains {
t.Errorf("%d: expected span %q-%q within range to be %v",
i, test.start, test.end, test.contains)
}
}
}

// TestRSpanContains verifies methods to check whether a key
// or key range is contained within the span.
func TestRSpanContains(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/abort_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,20 @@ func fillUUID(b byte) uuid.UUID {
var txnIDMin = fillUUID('\x00')
var txnIDMax = fillUUID('\xff')

func abortCacheMinKey(rangeID roachpb.RangeID) roachpb.Key {
return keys.AbortCacheKey(rangeID, txnIDMin)
}

func (sc *AbortCache) min() roachpb.Key {
return keys.AbortCacheKey(sc.rangeID, txnIDMin)
return abortCacheMinKey(sc.rangeID)
}

func abortCacheMaxKey(rangeID roachpb.RangeID) roachpb.Key {
return keys.AbortCacheKey(rangeID, txnIDMax)
}

func (sc *AbortCache) max() roachpb.Key {
return keys.AbortCacheKey(sc.rangeID, txnIDMax)
return abortCacheMaxKey(sc.rangeID)
}

// ClearData removes all persisted items stored in the cache.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import "github.com/cockroachdb/cockroach/pkg/util/bufalloc"
func AllocIterKeyValue(
a bufalloc.ByteAllocator, iter Iterator,
) (bufalloc.ByteAllocator, MVCCKey, []byte) {
key := iter.unsafeKey()
key := iter.UnsafeKey()
a, key.Key = a.Copy(key.Key, 0)
value := iter.unsafeValue()
value := iter.UnsafeValue()
a, value = a.Copy(value, 0)
return a, key, value
}
11 changes: 6 additions & 5 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ type Iterator interface {
ValueProto(msg proto.Message) error
// unsafeKey returns the same value as Key, but the memory is invalidated on
// the next call to {Next,Prev,Seek,SeekReverse,Close}.
unsafeKey() MVCCKey
UnsafeKey() MVCCKey
// unsafeKey returns the same value as Value, but the memory is invalidated
// on the next call to {Next,Prev,Seek,SeekReverse,Close}.
unsafeValue() []byte
UnsafeValue() []byte
// Less returns true if the key the iterator is currently positioned at is
// less than the specified key.
Less(key MVCCKey) bool
Expand All @@ -95,10 +95,11 @@ type Reader interface {
// Distinct() batches release their parent batch for future use while
// Engines, Snapshots and Batches free the associated C++ resources.
Close()
// closed returns true if the reader has been closed or is not usable.
// Closed returns true if the reader has been closed or is not usable.
// Objects backed by this reader (e.g. Iterators) can check this to ensure
// that they are not using a closed engine.
closed() bool
// that they are not using a closed engine. Intended for use within package
// engine; exported to enable wrappers to exist in other packages.
Closed() bool
// Get returns the value for the given key, nil otherwise.
Get(key MVCCKey) ([]byte, error)
// GetProto fetches the value at the specified key and unmarshals it
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestEngineBatchStaleCachedIterator(t *testing.T) {

if iter.Valid() {
t.Fatalf("iterator unexpectedly valid: %v -> %v",
iter.unsafeKey(), iter.unsafeValue())
iter.UnsafeKey(), iter.UnsafeValue())
}
}

Expand Down
50 changes: 25 additions & 25 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func mvccGetMetadata(
return false, 0, 0, nil
}

unsafeKey := iter.unsafeKey()
unsafeKey := iter.UnsafeKey()
if !unsafeKey.Key.Equal(metaKey.Key) {
return false, 0, 0, nil
}
Expand All @@ -635,16 +635,16 @@ func mvccGetMetadata(
if err := iter.ValueProto(meta); err != nil {
return false, 0, 0, err
}
return true, int64(unsafeKey.EncodedSize()), int64(len(iter.unsafeValue())), nil
return true, int64(unsafeKey.EncodedSize()), int64(len(iter.UnsafeValue())), nil
}

meta.Reset()
// For values, the size of keys is always accounted for as
// mvccVersionTimestampSize. The size of the metadata key is
// accounted for separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.unsafeValue()))
meta.Deleted = len(iter.unsafeValue()) == 0
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
meta.Timestamp = unsafeKey.Timestamp
return true, int64(unsafeKey.EncodedSize()) - meta.KeyBytes, 0, nil
}
Expand Down Expand Up @@ -772,7 +772,7 @@ func mvccGetInternal(
return nil, ignoredIntents, safeValue, nil
}

unsafeKey := iter.unsafeKey()
unsafeKey := iter.UnsafeKey()
if !unsafeKey.Key.Equal(metaKey.Key) {
return nil, ignoredIntents, safeValue, nil
}
Expand All @@ -796,14 +796,14 @@ func mvccGetInternal(
// already been read above, so there's nothing left to do.
}

if len(iter.unsafeValue()) == 0 {
if len(iter.UnsafeValue()) == 0 {
// Value is deleted.
return nil, ignoredIntents, safeValue, nil
}

value := &buf.value
if allowedSafety == unsafeValue {
value.RawBytes = iter.unsafeValue()
value.RawBytes = iter.UnsafeValue()
} else {
value.RawBytes = iter.Value()
}
Expand Down Expand Up @@ -1428,7 +1428,7 @@ func MVCCDeleteRange(
// MVCCKey is unsafe and will be invalidated by the next call to
// Iterator.{Next,Prev,Seek,SeekReverse,Close}.
func getScanMeta(iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata) (MVCCKey, error) {
metaKey := iter.unsafeKey()
metaKey := iter.UnsafeKey()
if !metaKey.Less(encEndKey) {
return NilKey, iter.Error()
}
Expand All @@ -1439,8 +1439,8 @@ func getScanMeta(iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata)
// mvccVersionTimestampSize. The size of the metadata key is accounted for
// separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.unsafeValue()))
meta.Deleted = len(iter.unsafeValue()) == 0
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
return metaKey, nil
}
if err := iter.ValueProto(meta); err != nil {
Expand All @@ -1456,7 +1456,7 @@ func getScanMeta(iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata)
func getReverseScanMeta(
iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata,
) (MVCCKey, error) {
metaKey := iter.unsafeKey()
metaKey := iter.UnsafeKey()
// The metaKey < encEndKey is exceeding the boundary.
if metaKey.Less(encEndKey) {
return NilKey, iter.Error()
Expand All @@ -1476,15 +1476,15 @@ func getReverseScanMeta(
}

meta.Reset()
metaKey = iter.unsafeKey()
metaKey = iter.UnsafeKey()
meta.Timestamp = metaKey.Timestamp
if metaKey.IsValue() {
// For values, the size of keys is always account for as
// mvccVersionTimestampSize. The size of the metadata key is accounted
// for separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.unsafeValue()))
meta.Deleted = len(iter.unsafeValue()) == 0
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
return metaKey, nil
}
}
Expand Down Expand Up @@ -1699,7 +1699,7 @@ func MVCCIterate(
} else {
// This is subtle: mvccGetInternal might already have advanced us to the
// next key in which case we have to reset our position.
if !iter.unsafeKey().Key.Equal(metaKey.Key) {
if !iter.UnsafeKey().Key.Equal(metaKey.Key) {
iter.Seek(metaKey)
if iter.Valid() {
iter.Prev()
Expand All @@ -1720,7 +1720,7 @@ func MVCCIterate(
// the next key in which case we don't have to do anything. Only call
// NextKey() if the current key pointed to by the iterator is the same
// as the one at the top of the loop.
if iter.unsafeKey().Key.Equal(metaKey.Key) {
if iter.UnsafeKey().Key.Equal(metaKey.Key) {
iter.NextKey()
}
}
Expand Down Expand Up @@ -1937,7 +1937,7 @@ func mvccResolveWriteIntent(
iter.Seek(nextKey)

// If there is no other version, we should just clean up the key entirely.
if !iter.Valid() || !iter.unsafeKey().Key.Equal(intent.Key) {
if !iter.Valid() || !iter.UnsafeKey().Key.Equal(intent.Key) {
if err = engine.Clear(metaKey); err != nil {
return err
}
Expand All @@ -1948,12 +1948,12 @@ func mvccResolveWriteIntent(
return nil
}

unsafeIterKey := iter.unsafeKey()
unsafeIterKey := iter.UnsafeKey()
if !unsafeIterKey.IsValue() {
return errors.Errorf("expected an MVCC value key: %s", unsafeIterKey)
}
// Get the bytes for the next version so we have size for stat counts.
valueSize := int64(len(iter.unsafeValue()))
valueSize := int64(len(iter.UnsafeValue()))
// Update the keyMetadata with the next version.
buf.newMeta = enginepb.MVCCMetadata{
Deleted: valueSize == 0,
Expand Down Expand Up @@ -2032,14 +2032,14 @@ func MVCCResolveWriteIntentRangeUsingIter(

for num < max {
iterAndBuf.iter.Seek(nextKey)
if !iterAndBuf.iter.Valid() || !iterAndBuf.iter.unsafeKey().Less(encEndKey) {
if !iterAndBuf.iter.Valid() || !iterAndBuf.iter.UnsafeKey().Less(encEndKey) {
// No more keys exists in the given range.
break
}

// Manually copy the underlying bytes of the unsafe key. This construction
// reuses keyBuf across iterations.
key := iterAndBuf.iter.unsafeKey()
key := iterAndBuf.iter.UnsafeKey()
keyBuf = append(keyBuf[:0], key.Key...)
key.Key = keyBuf

Expand Down Expand Up @@ -2087,7 +2087,7 @@ func MVCCGarbageCollect(
continue
}
inlinedValue := meta.IsInline()
implicitMeta := iter.unsafeKey().IsValue()
implicitMeta := iter.UnsafeKey().IsValue()
// First, check whether all values of the key are being deleted.
if !gcKey.Timestamp.Less(meta.Timestamp) {
// For version keys, don't allow GC'ing the meta key if it's
Expand All @@ -2110,7 +2110,7 @@ func MVCCGarbageCollect(
}
}
if !implicitMeta {
if err := engine.Clear(iter.unsafeKey()); err != nil {
if err := engine.Clear(iter.UnsafeKey()); err != nil {
return err
}
}
Expand All @@ -2123,7 +2123,7 @@ func MVCCGarbageCollect(

// Now, iterate through all values, GC'ing ones which have expired.
for ; iter.Valid(); iter.Next() {
unsafeIterKey := iter.unsafeKey()
unsafeIterKey := iter.UnsafeKey()
if !unsafeIterKey.Key.Equal(encKey.Key) {
break
}
Expand All @@ -2133,7 +2133,7 @@ func MVCCGarbageCollect(
if !gcKey.Timestamp.Less(unsafeIterKey.Timestamp) {
if ms != nil {
ms.Add(updateStatsOnGC(gcKey.Key, mvccVersionTimestampSize,
int64(len(iter.unsafeValue())), nil, unsafeIterKey.Timestamp.WallTime,
int64(len(iter.UnsafeValue())), nil, unsafeIterKey.Timestamp.WallTime,
timestamp.WallTime))
}
if err := engine.Clear(unsafeIterKey); err != nil {
Expand Down
Loading

0 comments on commit 833b6f3

Please sign in to comment.