Skip to content

Commit

Permalink
fix(store/v2): Fix PebbleDB Iteration Edge Cases (#18948)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderbez authored Jan 10, 2024
1 parent 78ce70d commit 0b12995
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
13 changes: 9 additions & 4 deletions store/storage/pebbledb/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,25 @@ func (f mvccKeyFormatter) Format(s fmt.State, verb rune) {

// SplitMVCCKey accepts an MVCC key and returns the "user" key, the MVCC version,
// and a boolean indicating if the provided key is an MVCC key.
//
// Note, internally, we must make a copy of the provided mvccKey argument, which
// typically comes from the Key() method as it's not safe.
func SplitMVCCKey(mvccKey []byte) (key, version []byte, ok bool) {
if len(mvccKey) == 0 {
return nil, nil, false
}

n := len(mvccKey) - 1
tsLen := int(mvccKey[n])
mvccKeyCopy := bytes.Clone(mvccKey)

n := len(mvccKeyCopy) - 1
tsLen := int(mvccKeyCopy[n])
if n < tsLen {
return nil, nil, false
}

key = mvccKey[:n-tsLen]
key = mvccKeyCopy[:n-tsLen]
if tsLen > 0 {
version = mvccKey[n-tsLen+1 : len(mvccKey)-1]
version = mvccKeyCopy[n-tsLen+1 : len(mvccKeyCopy)-1]
}

return key, version, true
Expand Down
37 changes: 28 additions & 9 deletions store/storage/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func (itr *iterator) Value() []byte {
}

func (itr *iterator) Next() {
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}

var next bool
if itr.reverse {
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}

// Since PebbleDB has no PrevPrefix API, we must manually seek to the next
// key that is lexicographically less than the current key.
next = itr.source.SeekLT(MVCCEncode(currKey, 0))
Expand All @@ -132,7 +132,7 @@ func (itr *iterator) Next() {

// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
// so we seek back to the latest desired version, s.t. the version <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
Expand All @@ -147,10 +147,29 @@ func (itr *iterator) Next() {
return
}

// Move the iterator to the closest version to the desired version, so we
// Move the iterator to the closest version of the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))

tmpKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}

// There exists cases where the SeekLT() call moved us back to the same key
// we started at, so we must move to next key, i.e. two keys forward.
if bytes.Equal(tmpKey, currKey) {
if itr.source.NextPrefix() {
itr.Next()
} else {
itr.valid = false
return
}
}

// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
Expand Down
44 changes: 44 additions & 0 deletions store/storage/storage_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,54 @@ func (s *StorageTestSuite) TestDatabase_IteratorMultiVersion() {
i = (i + 1) % 10
count++
}

s.Require().Equal(10, count)
s.Require().NoError(itr.Error())
}

func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)

defer db.Close()

cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value003")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))

cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyE"), Value: []byte("value000")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))

cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyF"), Value: []byte("value000")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))

cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value004")},
}})
s.Require().NoError(db.ApplyChangeset(58833605, cs))

cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyD"), Value: []byte("value006")},
}})
s.Require().NoError(db.ApplyChangeset(58833606, cs))

itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil)
s.Require().NoError(err)
defer itr.Close()

count := make(map[string]struct{})
for ; itr.Valid(); itr.Next() {
count[string(itr.Key())] = struct{}{}
}

s.Require().Equal(3, len(count))
}

func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)
Expand Down

0 comments on commit 0b12995

Please sign in to comment.