Skip to content

Commit

Permalink
[occ] Implement iterator for mvkv (#329)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This implements Iterator and ReverseIterator for mvkv for the KVStore
interface. The memiterator will be composed of versionindexedstore and
multiversionstore, and will yield values in a cascading fashion firstly
from the writeset, and then second from the multiversion store.

This still needs optimization to persisted sorted keys instead of
reconstructing sorted keys each time.

## Testing performed to validate your change
Unit test to verify basic functionality
  • Loading branch information
udpatil committed Jan 2, 2024
1 parent 7dcdf66 commit f935dac
Show file tree
Hide file tree
Showing 4 changed files with 457 additions and 15 deletions.
74 changes: 74 additions & 0 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package multiversion

import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/types"
scheduler "github.com/cosmos/cosmos-sdk/types/occ"
)

// Iterates over iterKVCache items.
// if key is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset map[string][]byte
index int
abortChannel chan scheduler.Abort
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
) *memIterator {
var iter types.Iterator
var err error

if ascending {
iter, err = items.Iterator(start, end)
} else {
iter, err = items.ReverseIterator(start, end)
}

if err != nil {
if iter != nil {
iter.Close()
}
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
}
}

// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator
func (mi *memIterator) Value() []byte {
key := mi.Iterator.Key()

// try fetch from writeset - return if exists
if val, ok := mi.writeset[string(key)]; ok {
return val
}

// get the value from the multiversion store
val := mi.mvStore.GetLatestBeforeIndex(mi.index, key)

// if we have an estiamte, write to abort channel
if val.IsEstimate() {
mi.abortChannel <- scheduler.NewEstimateAbort(val.Index())
}

// if we have a deleted value, return nil
if val.IsDeleted() {
return nil
}
return val.Value()
}
256 changes: 256 additions & 0 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package multiversion

import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
)

// mvsMergeIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
// had been deleted (but not deleted in the parent).
// If the cache iterator has the same key as the parent, the
// cache shadows (overrides) the parent.
type mvsMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
}

var _ types.Iterator = (*mvsMergeIterator)(nil)

func NewMVSMergeIterator(
parent, cache types.Iterator,
ascending bool,
) *mvsMergeIterator {
iter := &mvsMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
}

return iter
}

// Domain implements Iterator.
// It returns the union of the iter.Parent doman, and the iter.Cache domain.
// If the domains are disjoint, this includes the domain in between them as well.
func (iter *mvsMergeIterator) Domain() (start, end []byte) {
startP, endP := iter.parent.Domain()
startC, endC := iter.cache.Domain()

if iter.compare(startP, startC) < 0 {
start = startP
} else {
start = startC
}

if iter.compare(endP, endC) < 0 {
end = endC
} else {
end = endP
}

return start, end
}

// Valid implements Iterator.
func (iter *mvsMergeIterator) Valid() bool {
return iter.skipUntilExistsOrInvalid()
}

// Next implements Iterator
func (iter *mvsMergeIterator) Next() {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the next cache item.
if !iter.parent.Valid() {
iter.cache.Next()
return
}

// If cache is invalid, get the next parent item.
if !iter.cache.Valid() {
iter.parent.Next()
return
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}

// Key implements Iterator
func (iter *mvsMergeIterator) Key() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache key.
if !iter.parent.Valid() {
return iter.cache.Key()
}

// If cache is invalid, get the parent key.
if !iter.cache.Valid() {
return iter.parent.Key()
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return keyP
case 0: // parent == cache
return keyP
case 1: // parent > cache
return keyC
default:
panic("invalid compare result")
}
}

// Value implements Iterator
func (iter *mvsMergeIterator) Value() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
return value
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
return value
default:
panic("invalid comparison result")
}
}

// Close implements Iterator
func (iter *mvsMergeIterator) Close() error {
if err := iter.parent.Close(); err != nil {
// still want to close cache iterator regardless
iter.cache.Close()
return err
}

return iter.cache.Close()
}

// Error returns an error if the mvsMergeIterator is invalid defined by the
// Valid method.
func (iter *mvsMergeIterator) Error() error {
if !iter.Valid() {
return errors.New("invalid mvsMergeIterator")
}

return nil
}

// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *mvsMergeIterator) assertValid() {
if err := iter.Error(); err != nil {
panic(err)
}
}

// Like bytes.Compare but opposite if not ascending.
func (iter *mvsMergeIterator) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
}

return bytes.Compare(a, b) * -1
}

// Skip all delete-items from the cache w/ `key < until`. After this function,
// current cache item is a non-delete-item, or `until <= key`.
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *mvsMergeIterator) skipCacheDeletes(until []byte) {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
}
}

// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
// Returns whether the iterator is valid.
func (iter *mvsMergeIterator) skipUntilExistsOrInvalid() bool {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
}
// Parent is valid.
if !iter.cache.Valid() {
return true
}
// Parent is valid, cache is valid.

// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()

switch iter.compare(keyP, keyC) {
case -1: // parent < cache.
return true

case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()

continue
}
// Cache is not a delete.

return true // cache exists.
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.skipCacheDeletes(keyP)
continue
}
// Cache is not a delete.

return true // cache exists.
}
}
}
Loading

0 comments on commit f935dac

Please sign in to comment.