Skip to content

Commit

Permalink
This pr adds bounds checking (global and block) for singleLevelIterat…
Browse files Browse the repository at this point in the history
…or when returning a value.

Fixes: cockroachdb#2593
Release note: None
  • Loading branch information
raggar committed Jun 15, 2023
1 parent 898fb2f commit 24be613
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ type singleLevelIterator struct {
// iteration.
vState *virtualState
// endKeyInclusive is set to force the iterator to treat the upper field as
// inclusive while iterating instead of exclusive. This is used for virtual
// sstable iteration where the file bounds are inclusive.
// inclusive while iterating instead of exclusive.
endKeyInclusive bool
index blockIter
data blockIter
Expand Down Expand Up @@ -464,6 +463,24 @@ func (i *singleLevelIterator) init(
return nil
}

// Helper function to check if keys returned from iterator are within global and virtual bounds.
func (i *singleLevelIterator) maybeVerifyKey(
iKey *InternalKey, val base.LazyValue,
) (*InternalKey, base.LazyValue) {
// maybeVerify key is only used for virtual sstable iterators.
if invariants.Enabled && i.vState != nil && iKey != nil {
key := iKey.UserKey

uc, vuc := i.cmp(key, i.upper), i.cmp(key, i.vState.upper.UserKey)
lc, vlc := i.cmp(key, i.lower), i.cmp(key, i.vState.lower.UserKey)

if (!i.vState.upper.IsExclusiveSentinel() && (uc == 0 || vuc == 0)) || (uc > 0 || vuc > 0 || lc < 0 || vlc < 0) {
panic(fmt.Sprintf("key: %s out of bounds of singleLevelIterator", key))
}
}
return iKey, val
}

// setupForCompaction sets up the singleLevelIterator for use with compactionIter.
// Currently, it skips readahead ramp-up. It should be called after init is called.
func (i *singleLevelIterator) setupForCompaction() {
Expand Down Expand Up @@ -931,8 +948,7 @@ func (i *singleLevelIterator) SeekPrefixGE(
key = i.lower
}
}
k, v := i.seekPrefixGE(prefix, key, flags, i.useFilter)
return k, v
return i.seekPrefixGE(prefix, key, flags, i.useFilter)
}

func (i *singleLevelIterator) seekPrefixGE(
Expand Down Expand Up @@ -996,7 +1012,7 @@ func (i *singleLevelIterator) seekPrefixGE(
i.boundsCmp = 0
i.positionedUsingLatestBounds = true
k, value = i.seekGEHelper(key, boundsCmp, flags)
return k, value
return i.maybeVerifyKey(k, value)
}

// virtualLast should only be called if i.vReader != nil.
Expand Down Expand Up @@ -1143,7 +1159,7 @@ func (i *singleLevelIterator) SeekLT(
// be chosen as "compleu". The SeekGE in the index block will then point
// us to the block containing "complexion". If this happens, we want the
// last key from the previous data block.
return i.skipBackward()
return i.maybeVerifyKey(i.skipBackward())
}

// First implements internalIterator.First, as documented in the pebble
Expand All @@ -1163,6 +1179,7 @@ func (i *singleLevelIterator) First() (*InternalKey, base.LazyValue) {
}
i.positionedUsingLatestBounds = true
i.maybeFilteredKeysSingleLevel = false

return i.firstInternal()
}

Expand Down Expand Up @@ -1375,8 +1392,9 @@ func (i *singleLevelIterator) NextPrefix(succKey []byte) (*InternalKey, base.Laz
return nil, base.LazyValue{}
}
}
return key, val
return i.maybeVerifyKey(key, val)
}

return i.skipForward()
}

Expand Down Expand Up @@ -1445,7 +1463,7 @@ func (i *singleLevelIterator) skipForward() (*InternalKey, base.LazyValue) {
return nil, base.LazyValue{}
}
}
return key, val
return i.maybeVerifyKey(key, val)
}
}
return nil, base.LazyValue{}
Expand Down Expand Up @@ -1488,7 +1506,7 @@ func (i *singleLevelIterator) skipBackward() (*InternalKey, base.LazyValue) {
i.exhaustedBounds = -1
return nil, base.LazyValue{}
}
return key, val
return i.maybeVerifyKey(key, val)
}
return nil, base.LazyValue{}
}
Expand Down Expand Up @@ -2324,7 +2342,7 @@ func (i *twoLevelIterator) SeekLT(
}
if result == loadBlockOK {
if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil {
return ikey, val
return i.maybeVerifyKey(ikey, val)
}
// Fall through to skipBackward since the singleLevelIterator did
// not have any blocks that satisfy the block interval
Expand All @@ -2338,7 +2356,7 @@ func (i *twoLevelIterator) SeekLT(
}
if result == loadBlockOK {
if ikey, val := i.singleLevelIterator.SeekLT(key, flags); ikey != nil {
return ikey, val
return i.maybeVerifyKey(ikey, val)
}
// Fall through to skipBackward since the singleLevelIterator did
// not have any blocks that satisfy the block interval
Expand Down Expand Up @@ -2519,7 +2537,7 @@ func (i *twoLevelIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyVa
}
}
} else if key, val := i.singleLevelIterator.SeekGE(succKey, base.SeekGEFlagsNone); key != nil {
return key, val
return i.maybeVerifyKey(key, val)
}
return i.skipForward()
}
Expand Down Expand Up @@ -2557,7 +2575,7 @@ func (i *twoLevelIterator) skipForward() (*InternalKey, base.LazyValue) {
}
if result == loadBlockOK {
if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil {
return ikey, val
return i.maybeVerifyKey(ikey, val)
}
// Next iteration will return if singleLevelIterator set
// exhaustedBounds = +1.
Expand Down Expand Up @@ -2598,7 +2616,7 @@ func (i *twoLevelIterator) skipBackward() (*InternalKey, base.LazyValue) {
}
if result == loadBlockOK {
if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil {
return ikey, val
return i.maybeVerifyKey(ikey, val)
}
// Next iteration will return if singleLevelIterator set
// exhaustedBounds = -1.
Expand Down

0 comments on commit 24be613

Please sign in to comment.