diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go new file mode 100644 index 000000000..a3fb74323 --- /dev/null +++ b/store/multiversion/memiterator.go @@ -0,0 +1,74 @@ +package multiversion + +import ( + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/types" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" +) + +// Iterates over iterKVCache items. +// if key is nil, means it was deleted. +// Implements Iterator. +type memIterator struct { + types.Iterator + + mvStore MultiVersionStore + writeset map[string][]byte + index int + abortChannel chan scheduler.Abort +} + +func (store *VersionIndexedStore) newMemIterator( + start, end []byte, + items *dbm.MemDB, + ascending bool, +) *memIterator { + var iter types.Iterator + var err error + + if ascending { + iter, err = items.Iterator(start, end) + } else { + iter, err = items.ReverseIterator(start, end) + } + + if err != nil { + if iter != nil { + iter.Close() + } + panic(err) + } + + return &memIterator{ + Iterator: iter, + mvStore: store.multiVersionStore, + index: store.transactionIndex, + abortChannel: store.abortChannel, + writeset: store.GetWriteset(), + } +} + +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +func (mi *memIterator) Value() []byte { + key := mi.Iterator.Key() + + // try fetch from writeset - return if exists + if val, ok := mi.writeset[string(key)]; ok { + return val + } + + // get the value from the multiversion store + val := mi.mvStore.GetLatestBeforeIndex(mi.index, key) + + // if we have an estiamte, write to abort channel + if val.IsEstimate() { + mi.abortChannel <- scheduler.NewEstimateAbort(val.Index()) + } + + // if we have a deleted value, return nil + if val.IsDeleted() { + return nil + } + return val.Value() +} diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go new file mode 100644 index 000000000..c0a9d23ef --- /dev/null +++ b/store/multiversion/mergeiterator.go @@ -0,0 +1,256 @@ +package multiversion + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +// mvsMergeIterator merges a parent Iterator and a cache Iterator. +// The cache iterator may return nil keys to signal that an item +// had been deleted (but not deleted in the parent). +// If the cache iterator has the same key as the parent, the +// cache shadows (overrides) the parent. +type mvsMergeIterator struct { + parent types.Iterator + cache types.Iterator + ascending bool +} + +var _ types.Iterator = (*mvsMergeIterator)(nil) + +func NewMVSMergeIterator( + parent, cache types.Iterator, + ascending bool, +) *mvsMergeIterator { + iter := &mvsMergeIterator{ + parent: parent, + cache: cache, + ascending: ascending, + } + + return iter +} + +// Domain implements Iterator. +// It returns the union of the iter.Parent doman, and the iter.Cache domain. +// If the domains are disjoint, this includes the domain in between them as well. +func (iter *mvsMergeIterator) Domain() (start, end []byte) { + startP, endP := iter.parent.Domain() + startC, endC := iter.cache.Domain() + + if iter.compare(startP, startC) < 0 { + start = startP + } else { + start = startC + } + + if iter.compare(endP, endC) < 0 { + end = endC + } else { + end = endP + } + + return start, end +} + +// Valid implements Iterator. +func (iter *mvsMergeIterator) Valid() bool { + return iter.skipUntilExistsOrInvalid() +} + +// Next implements Iterator +func (iter *mvsMergeIterator) Next() { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the next cache item. + if !iter.parent.Valid() { + iter.cache.Next() + return + } + + // If cache is invalid, get the next parent item. + if !iter.cache.Valid() { + iter.parent.Next() + return + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + switch iter.compare(keyP, keyC) { + case -1: // parent < cache + iter.parent.Next() + case 0: // parent == cache + iter.parent.Next() + iter.cache.Next() + case 1: // parent > cache + iter.cache.Next() + } +} + +// Key implements Iterator +func (iter *mvsMergeIterator) Key() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache key. + if !iter.parent.Valid() { + return iter.cache.Key() + } + + // If cache is invalid, get the parent key. + if !iter.cache.Valid() { + return iter.parent.Key() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return keyP + case 0: // parent == cache + return keyP + case 1: // parent > cache + return keyC + default: + panic("invalid compare result") + } +} + +// Value implements Iterator +func (iter *mvsMergeIterator) Value() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache value. + if !iter.parent.Valid() { + value := iter.cache.Value() + return value + } + + // If cache is invalid, get the parent value. + if !iter.cache.Valid() { + value := iter.parent.Value() + return value + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + value := iter.parent.Value() + return value + case 0, 1: // parent >= cache + value := iter.cache.Value() + return value + default: + panic("invalid comparison result") + } +} + +// Close implements Iterator +func (iter *mvsMergeIterator) Close() error { + if err := iter.parent.Close(); err != nil { + // still want to close cache iterator regardless + iter.cache.Close() + return err + } + + return iter.cache.Close() +} + +// Error returns an error if the mvsMergeIterator is invalid defined by the +// Valid method. +func (iter *mvsMergeIterator) Error() error { + if !iter.Valid() { + return errors.New("invalid mvsMergeIterator") + } + + return nil +} + +// If not valid, panics. +// NOTE: May have side-effect of iterating over cache. +func (iter *mvsMergeIterator) assertValid() { + if err := iter.Error(); err != nil { + panic(err) + } +} + +// Like bytes.Compare but opposite if not ascending. +func (iter *mvsMergeIterator) compare(a, b []byte) int { + if iter.ascending { + return bytes.Compare(a, b) + } + + return bytes.Compare(a, b) * -1 +} + +// Skip all delete-items from the cache w/ `key < until`. After this function, +// current cache item is a non-delete-item, or `until <= key`. +// If the current cache item is not a delete item, does nothing. +// If `until` is nil, there is no limit, and cache may end up invalid. +// CONTRACT: cache is valid. +func (iter *mvsMergeIterator) skipCacheDeletes(until []byte) { + for iter.cache.Valid() && + iter.cache.Value() == nil && + (until == nil || iter.compare(iter.cache.Key(), until) < 0) { + iter.cache.Next() + } +} + +// Fast forwards cache (or parent+cache in case of deleted items) until current +// item exists, or until iterator becomes invalid. +// Returns whether the iterator is valid. +func (iter *mvsMergeIterator) skipUntilExistsOrInvalid() bool { + for { + // If parent is invalid, fast-forward cache. + if !iter.parent.Valid() { + iter.skipCacheDeletes(nil) + return iter.cache.Valid() + } + // Parent is valid. + if !iter.cache.Valid() { + return true + } + // Parent is valid, cache is valid. + + // Compare parent and cache. + keyP := iter.parent.Key() + keyC := iter.cache.Key() + + switch iter.compare(keyP, keyC) { + case -1: // parent < cache. + return true + + case 0: // parent == cache. + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.parent.Next() + iter.cache.Next() + + continue + } + // Cache is not a delete. + + return true // cache exists. + case 1: // cache < parent + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.skipCacheDeletes(keyP) + continue + } + // Cache is not a delete. + + return true // cache exists. + } + } +} diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 697561355..b96338c1e 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -20,8 +20,6 @@ type VersionIndexedStore struct { writeset map[string][]byte // contains the key -> value mapping for all keys written to the store // TODO: need to add iterateset here as well - // TODO: do we need this? - I think so? / maybe we just treat `nil` value in the writeset as a delete - deleted *sync.Map // dirty keys that haven't been sorted yet for iteration dirtySet map[string]struct{} // used for iterators - populated at the time of iterator instantiation @@ -43,7 +41,6 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion return &VersionIndexedStore{ readset: make(map[string][]byte), writeset: make(map[string][]byte), - deleted: &sync.Map{}, dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, @@ -191,12 +188,63 @@ func (store *VersionIndexedStore) Set(key []byte, value []byte) { // Iterator implements types.KVStore. func (v *VersionIndexedStore) Iterator(start []byte, end []byte) dbm.Iterator { - panic("unimplemented") + return v.iterator(start, end, true) } // ReverseIterator implements types.KVStore. func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iterator { - panic("unimplemented") + return v.iterator(start, end, false) +} + +// TODO: still needs iterateset tracking +// Iterator implements types.KVStore. +func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator { + store.mtx.Lock() + defer store.mtx.Unlock() + // TODO: ideally we persist writeset keys into a sorted btree for later use + // make a set of total keys across mvkv and mvs to iterate + keysToIterate := make(map[string]struct{}) + for key := range store.writeset { + keysToIterate[key] = struct{}{} + } + + // TODO: ideally we take advantage of mvs keys already being sorted + // get the multiversion store sorted keys + writesetMap := store.multiVersionStore.GetAllWritesetKeys() + for i := 0; i < store.transactionIndex; i++ { + // add all the writesets keys up until current index + for _, key := range writesetMap[i] { + keysToIterate[key] = struct{}{} + } + } + // TODO: ideally merge btree and mvs keys into a single sorted btree + + // TODO: this is horribly inefficient, fix this + sortedKeys := make([]string, len(keysToIterate)) + for key := range keysToIterate { + sortedKeys = append(sortedKeys, key) + } + sort.Strings(sortedKeys) + + memDB := dbm.NewMemDB() + for _, key := range sortedKeys { + memDB.Set([]byte(key), []byte{}) + } + + var parent, memIterator types.Iterator + + // make a memIterator + memIterator = store.newMemIterator(start, end, memDB, ascending) + + if ascending { + parent = store.parent.Iterator(start, end) + } else { + parent = store.parent.ReverseIterator(start, end) + } + + // mergeIterator + return NewMVSMergeIterator(parent, memIterator, ascending) + } // GetStoreType implements types.KVStore. @@ -230,11 +278,6 @@ func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirt keyStr := string(key) store.writeset[keyStr] = value - if deleted { - store.deleted.Store(keyStr, struct{}{}) - } else { - store.deleted.Delete(keyStr) - } if dirty { store.dirtySet[keyStr] = struct{}{} } @@ -261,8 +304,3 @@ func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { // add to dirty set store.dirtySet[keyStr] = struct{}{} } - -func (store *VersionIndexedStore) isDeleted(key string) bool { - _, ok := store.deleted.Load(key) - return ok -} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index e17cba65c..23bcc472a 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -296,3 +296,77 @@ func TestVersionIndexedStoreValidation(t *testing.T) { vis.ValidateReadset() }) } + +func TestIterator(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore(parentKVStore) + // initialize a new VersionIndexedStore + abortC := make(chan scheduler.Abort) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC) + + // set some initial values + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + parentKVStore.Set([]byte("deletedKey"), []byte("foo")) + mvs.SetWriteset(0, 1, map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "deletedKey": nil, + }) + // add an estimate to MVS + mvs.SetEstimatedWriteset(3, 1, map[string][]byte{ + "key3": []byte("value1_b"), + }) + + // iterate over the keys - exclusive on key5 + iter := vis.Iterator([]byte("000"), []byte("key5")) + vals := []string{} + defer iter.Close() + for ; iter.Valid(); iter.Next() { + vals = append(vals, string(iter.Value())) + } + require.Equal(t, []string{"value1", "value2", "value4"}, vals) + iter.Close() + + // test reverse iteration + vals2 := []string{} + iter2 := vis.ReverseIterator([]byte("000"), []byte("key6")) + defer iter2.Close() + for ; iter2.Valid(); iter2.Next() { + vals2 = append(vals2, string(iter2.Value())) + } + // has value5 because of end being key6 + require.Equal(t, []string{"value5", "value4", "value2", "value1"}, vals2) + iter2.Close() + + // add items to writeset + vis.Set([]byte("key3"), []byte("value3")) + vis.Set([]byte("key4"), []byte("valueNew")) + + // iterate over the keys - exclusive on key5 + iter3 := vis.Iterator([]byte("000"), []byte("key5")) + vals3 := []string{} + defer iter3.Close() + for ; iter3.Valid(); iter3.Next() { + vals3 = append(vals3, string(iter3.Value())) + } + require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3) + iter3.Close() + + // add an estimate to MVS + mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ + "key2": []byte("value1_b"), + }) + + go func() { + // new iter + iter4 := vis.Iterator([]byte("000"), []byte("key5")) + defer iter4.Close() + for ; iter4.Valid(); iter4.Next() { + } + }() + abort := <-abortC // read the abort from the channel + require.Equal(t, 1, abort.DependentTxIdx) + +}