Skip to content

Commit

Permalink
Add mem and merge iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
udpatil committed Oct 16, 2023
1 parent 93d475f commit 0d42b24
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 0 deletions.
74 changes: 74 additions & 0 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package multiversion

import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/types"
scheduler "github.com/cosmos/cosmos-sdk/types/occ"
)

// Iterates over iterKVCache items.
// if key is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset map[string][]byte
index int
abortChannel chan scheduler.Abort
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
) *memIterator {
var iter types.Iterator
var err error

if ascending {
iter, err = items.Iterator(start, end)
} else {
iter, err = items.ReverseIterator(start, end)
}

if err != nil {
if iter != nil {
iter.Close()
}
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
}
}

// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator
func (mi *memIterator) Value() []byte {
key := mi.Iterator.Key()

// try fetch from writeset - return if exists
if val, ok := mi.writeset[string(key)]; ok {
return val
}

// get the value from the multiversion store
val := mi.mvStore.GetLatestBeforeIndex(mi.index, key)

// if we have an estiamte, write to abort channel
if val.IsEstimate() {
mi.abortChannel <- scheduler.NewEstimateAbort(val.Index())
}

// if we have a deleted value, return nil
if val.IsDeleted() {
return nil
}
return val.Value()
}
256 changes: 256 additions & 0 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package multiversion

import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
)

// mvsMergeIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
// had been deleted (but not deleted in the parent).
// If the cache iterator has the same key as the parent, the
// cache shadows (overrides) the parent.
type mvsMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
}

var _ types.Iterator = (*mvsMergeIterator)(nil)

func NewMVSMergeIterator(
parent, cache types.Iterator,
ascending bool,
) *mvsMergeIterator {
iter := &mvsMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
}

return iter
}

// Domain implements Iterator.
// It returns the union of the iter.Parent doman, and the iter.Cache domain.
// If the domains are disjoint, this includes the domain in between them as well.
func (iter *mvsMergeIterator) Domain() (start, end []byte) {
startP, endP := iter.parent.Domain()
startC, endC := iter.cache.Domain()

if iter.compare(startP, startC) < 0 {
start = startP
} else {
start = startC
}

if iter.compare(endP, endC) < 0 {
end = endC
} else {
end = endP
}

return start, end
}

// Valid implements Iterator.
func (iter *mvsMergeIterator) Valid() bool {
return iter.skipUntilExistsOrInvalid()
}

// Next implements Iterator
func (iter *mvsMergeIterator) Next() {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the next cache item.
if !iter.parent.Valid() {
iter.cache.Next()
return
}

// If cache is invalid, get the next parent item.
if !iter.cache.Valid() {
iter.parent.Next()
return
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}

// Key implements Iterator
func (iter *mvsMergeIterator) Key() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache key.
if !iter.parent.Valid() {
return iter.cache.Key()
}

// If cache is invalid, get the parent key.
if !iter.cache.Valid() {
return iter.parent.Key()
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return keyP
case 0: // parent == cache
return keyP
case 1: // parent > cache
return keyC
default:
panic("invalid compare result")
}
}

// Value implements Iterator
func (iter *mvsMergeIterator) Value() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
return value
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
return value
default:
panic("invalid comparison result")
}
}

// Close implements Iterator
func (iter *mvsMergeIterator) Close() error {
if err := iter.parent.Close(); err != nil {
// still want to close cache iterator regardless
iter.cache.Close()
return err
}

return iter.cache.Close()
}

// Error returns an error if the mvsMergeIterator is invalid defined by the
// Valid method.
func (iter *mvsMergeIterator) Error() error {
if !iter.Valid() {
return errors.New("invalid mvsMergeIterator")
}

return nil
}

// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *mvsMergeIterator) assertValid() {
if err := iter.Error(); err != nil {
panic(err)
}
}

// Like bytes.Compare but opposite if not ascending.
func (iter *mvsMergeIterator) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
}

return bytes.Compare(a, b) * -1
}

// Skip all delete-items from the cache w/ `key < until`. After this function,
// current cache item is a non-delete-item, or `until <= key`.
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *mvsMergeIterator) skipCacheDeletes(until []byte) {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
}
}

// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
// Returns whether the iterator is valid.
func (iter *mvsMergeIterator) skipUntilExistsOrInvalid() bool {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
}
// Parent is valid.
if !iter.cache.Valid() {
return true
}
// Parent is valid, cache is valid.

// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()

switch iter.compare(keyP, keyC) {
case -1: // parent < cache.
return true

case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()

continue
}
// Cache is not a delete.

return true // cache exists.
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.skipCacheDeletes(keyP)
continue
}
// Cache is not a delete.

return true // cache exists.
}
}
}

0 comments on commit 0d42b24

Please sign in to comment.