From f935dacddca6eb756ec6367b35af94835576a312 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 17 Oct 2023 13:15:23 -0500 Subject: [PATCH] [occ] Implement iterator for mvkv (#329) ## Describe your changes and provide context This implements Iterator and ReverseIterator for mvkv for the KVStore interface. The memiterator will be composed of versionindexedstore and multiversionstore, and will yield values in a cascading fashion firstly from the writeset, and then second from the multiversion store. This still needs optimization to persisted sorted keys instead of reconstructing sorted keys each time. ## Testing performed to validate your change Unit test to verify basic functionality --- store/multiversion/memiterator.go | 74 ++++++++ store/multiversion/mergeiterator.go | 256 ++++++++++++++++++++++++++++ store/multiversion/mvkv.go | 68 ++++++-- store/multiversion/mvkv_test.go | 74 ++++++++ 4 files changed, 457 insertions(+), 15 deletions(-) create mode 100644 store/multiversion/memiterator.go create mode 100644 store/multiversion/mergeiterator.go diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go new file mode 100644 index 000000000..a3fb74323 --- /dev/null +++ b/store/multiversion/memiterator.go @@ -0,0 +1,74 @@ +package multiversion + +import ( + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/types" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" +) + +// Iterates over iterKVCache items. +// if key is nil, means it was deleted. +// Implements Iterator. +type memIterator struct { + types.Iterator + + mvStore MultiVersionStore + writeset map[string][]byte + index int + abortChannel chan scheduler.Abort +} + +func (store *VersionIndexedStore) newMemIterator( + start, end []byte, + items *dbm.MemDB, + ascending bool, +) *memIterator { + var iter types.Iterator + var err error + + if ascending { + iter, err = items.Iterator(start, end) + } else { + iter, err = items.ReverseIterator(start, end) + } + + if err != nil { + if iter != nil { + iter.Close() + } + panic(err) + } + + return &memIterator{ + Iterator: iter, + mvStore: store.multiVersionStore, + index: store.transactionIndex, + abortChannel: store.abortChannel, + writeset: store.GetWriteset(), + } +} + +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +func (mi *memIterator) Value() []byte { + key := mi.Iterator.Key() + + // try fetch from writeset - return if exists + if val, ok := mi.writeset[string(key)]; ok { + return val + } + + // get the value from the multiversion store + val := mi.mvStore.GetLatestBeforeIndex(mi.index, key) + + // if we have an estiamte, write to abort channel + if val.IsEstimate() { + mi.abortChannel <- scheduler.NewEstimateAbort(val.Index()) + } + + // if we have a deleted value, return nil + if val.IsDeleted() { + return nil + } + return val.Value() +} diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go new file mode 100644 index 000000000..c0a9d23ef --- /dev/null +++ b/store/multiversion/mergeiterator.go @@ -0,0 +1,256 @@ +package multiversion + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +// mvsMergeIterator merges a parent Iterator and a cache Iterator. +// The cache iterator may return nil keys to signal that an item +// had been deleted (but not deleted in the parent). +// If the cache iterator has the same key as the parent, the +// cache shadows (overrides) the parent. +type mvsMergeIterator struct { + parent types.Iterator + cache types.Iterator + ascending bool +} + +var _ types.Iterator = (*mvsMergeIterator)(nil) + +func NewMVSMergeIterator( + parent, cache types.Iterator, + ascending bool, +) *mvsMergeIterator { + iter := &mvsMergeIterator{ + parent: parent, + cache: cache, + ascending: ascending, + } + + return iter +} + +// Domain implements Iterator. +// It returns the union of the iter.Parent doman, and the iter.Cache domain. +// If the domains are disjoint, this includes the domain in between them as well. +func (iter *mvsMergeIterator) Domain() (start, end []byte) { + startP, endP := iter.parent.Domain() + startC, endC := iter.cache.Domain() + + if iter.compare(startP, startC) < 0 { + start = startP + } else { + start = startC + } + + if iter.compare(endP, endC) < 0 { + end = endC + } else { + end = endP + } + + return start, end +} + +// Valid implements Iterator. +func (iter *mvsMergeIterator) Valid() bool { + return iter.skipUntilExistsOrInvalid() +} + +// Next implements Iterator +func (iter *mvsMergeIterator) Next() { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the next cache item. + if !iter.parent.Valid() { + iter.cache.Next() + return + } + + // If cache is invalid, get the next parent item. + if !iter.cache.Valid() { + iter.parent.Next() + return + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + switch iter.compare(keyP, keyC) { + case -1: // parent < cache + iter.parent.Next() + case 0: // parent == cache + iter.parent.Next() + iter.cache.Next() + case 1: // parent > cache + iter.cache.Next() + } +} + +// Key implements Iterator +func (iter *mvsMergeIterator) Key() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache key. + if !iter.parent.Valid() { + return iter.cache.Key() + } + + // If cache is invalid, get the parent key. + if !iter.cache.Valid() { + return iter.parent.Key() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return keyP + case 0: // parent == cache + return keyP + case 1: // parent > cache + return keyC + default: + panic("invalid compare result") + } +} + +// Value implements Iterator +func (iter *mvsMergeIterator) Value() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache value. + if !iter.parent.Valid() { + value := iter.cache.Value() + return value + } + + // If cache is invalid, get the parent value. + if !iter.cache.Valid() { + value := iter.parent.Value() + return value + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + value := iter.parent.Value() + return value + case 0, 1: // parent >= cache + value := iter.cache.Value() + return value + default: + panic("invalid comparison result") + } +} + +// Close implements Iterator +func (iter *mvsMergeIterator) Close() error { + if err := iter.parent.Close(); err != nil { + // still want to close cache iterator regardless + iter.cache.Close() + return err + } + + return iter.cache.Close() +} + +// Error returns an error if the mvsMergeIterator is invalid defined by the +// Valid method. +func (iter *mvsMergeIterator) Error() error { + if !iter.Valid() { + return errors.New("invalid mvsMergeIterator") + } + + return nil +} + +// If not valid, panics. +// NOTE: May have side-effect of iterating over cache. +func (iter *mvsMergeIterator) assertValid() { + if err := iter.Error(); err != nil { + panic(err) + } +} + +// Like bytes.Compare but opposite if not ascending. +func (iter *mvsMergeIterator) compare(a, b []byte) int { + if iter.ascending { + return bytes.Compare(a, b) + } + + return bytes.Compare(a, b) * -1 +} + +// Skip all delete-items from the cache w/ `key < until`. After this function, +// current cache item is a non-delete-item, or `until <= key`. +// If the current cache item is not a delete item, does nothing. +// If `until` is nil, there is no limit, and cache may end up invalid. +// CONTRACT: cache is valid. +func (iter *mvsMergeIterator) skipCacheDeletes(until []byte) { + for iter.cache.Valid() && + iter.cache.Value() == nil && + (until == nil || iter.compare(iter.cache.Key(), until) < 0) { + iter.cache.Next() + } +} + +// Fast forwards cache (or parent+cache in case of deleted items) until current +// item exists, or until iterator becomes invalid. +// Returns whether the iterator is valid. +func (iter *mvsMergeIterator) skipUntilExistsOrInvalid() bool { + for { + // If parent is invalid, fast-forward cache. + if !iter.parent.Valid() { + iter.skipCacheDeletes(nil) + return iter.cache.Valid() + } + // Parent is valid. + if !iter.cache.Valid() { + return true + } + // Parent is valid, cache is valid. + + // Compare parent and cache. + keyP := iter.parent.Key() + keyC := iter.cache.Key() + + switch iter.compare(keyP, keyC) { + case -1: // parent < cache. + return true + + case 0: // parent == cache. + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.parent.Next() + iter.cache.Next() + + continue + } + // Cache is not a delete. + + return true // cache exists. + case 1: // cache < parent + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.skipCacheDeletes(keyP) + continue + } + // Cache is not a delete. + + return true // cache exists. + } + } +} diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 697561355..b96338c1e 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -20,8 +20,6 @@ type VersionIndexedStore struct { writeset map[string][]byte // contains the key -> value mapping for all keys written to the store // TODO: need to add iterateset here as well - // TODO: do we need this? - I think so? / maybe we just treat `nil` value in the writeset as a delete - deleted *sync.Map // dirty keys that haven't been sorted yet for iteration dirtySet map[string]struct{} // used for iterators - populated at the time of iterator instantiation @@ -43,7 +41,6 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion return &VersionIndexedStore{ readset: make(map[string][]byte), writeset: make(map[string][]byte), - deleted: &sync.Map{}, dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, @@ -191,12 +188,63 @@ func (store *VersionIndexedStore) Set(key []byte, value []byte) { // Iterator implements types.KVStore. func (v *VersionIndexedStore) Iterator(start []byte, end []byte) dbm.Iterator { - panic("unimplemented") + return v.iterator(start, end, true) } // ReverseIterator implements types.KVStore. func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iterator { - panic("unimplemented") + return v.iterator(start, end, false) +} + +// TODO: still needs iterateset tracking +// Iterator implements types.KVStore. +func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator { + store.mtx.Lock() + defer store.mtx.Unlock() + // TODO: ideally we persist writeset keys into a sorted btree for later use + // make a set of total keys across mvkv and mvs to iterate + keysToIterate := make(map[string]struct{}) + for key := range store.writeset { + keysToIterate[key] = struct{}{} + } + + // TODO: ideally we take advantage of mvs keys already being sorted + // get the multiversion store sorted keys + writesetMap := store.multiVersionStore.GetAllWritesetKeys() + for i := 0; i < store.transactionIndex; i++ { + // add all the writesets keys up until current index + for _, key := range writesetMap[i] { + keysToIterate[key] = struct{}{} + } + } + // TODO: ideally merge btree and mvs keys into a single sorted btree + + // TODO: this is horribly inefficient, fix this + sortedKeys := make([]string, len(keysToIterate)) + for key := range keysToIterate { + sortedKeys = append(sortedKeys, key) + } + sort.Strings(sortedKeys) + + memDB := dbm.NewMemDB() + for _, key := range sortedKeys { + memDB.Set([]byte(key), []byte{}) + } + + var parent, memIterator types.Iterator + + // make a memIterator + memIterator = store.newMemIterator(start, end, memDB, ascending) + + if ascending { + parent = store.parent.Iterator(start, end) + } else { + parent = store.parent.ReverseIterator(start, end) + } + + // mergeIterator + return NewMVSMergeIterator(parent, memIterator, ascending) + } // GetStoreType implements types.KVStore. @@ -230,11 +278,6 @@ func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirt keyStr := string(key) store.writeset[keyStr] = value - if deleted { - store.deleted.Store(keyStr, struct{}{}) - } else { - store.deleted.Delete(keyStr) - } if dirty { store.dirtySet[keyStr] = struct{}{} } @@ -261,8 +304,3 @@ func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { // add to dirty set store.dirtySet[keyStr] = struct{}{} } - -func (store *VersionIndexedStore) isDeleted(key string) bool { - _, ok := store.deleted.Load(key) - return ok -} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index e17cba65c..23bcc472a 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -296,3 +296,77 @@ func TestVersionIndexedStoreValidation(t *testing.T) { vis.ValidateReadset() }) } + +func TestIterator(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore(parentKVStore) + // initialize a new VersionIndexedStore + abortC := make(chan scheduler.Abort) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC) + + // set some initial values + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + parentKVStore.Set([]byte("deletedKey"), []byte("foo")) + mvs.SetWriteset(0, 1, map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "deletedKey": nil, + }) + // add an estimate to MVS + mvs.SetEstimatedWriteset(3, 1, map[string][]byte{ + "key3": []byte("value1_b"), + }) + + // iterate over the keys - exclusive on key5 + iter := vis.Iterator([]byte("000"), []byte("key5")) + vals := []string{} + defer iter.Close() + for ; iter.Valid(); iter.Next() { + vals = append(vals, string(iter.Value())) + } + require.Equal(t, []string{"value1", "value2", "value4"}, vals) + iter.Close() + + // test reverse iteration + vals2 := []string{} + iter2 := vis.ReverseIterator([]byte("000"), []byte("key6")) + defer iter2.Close() + for ; iter2.Valid(); iter2.Next() { + vals2 = append(vals2, string(iter2.Value())) + } + // has value5 because of end being key6 + require.Equal(t, []string{"value5", "value4", "value2", "value1"}, vals2) + iter2.Close() + + // add items to writeset + vis.Set([]byte("key3"), []byte("value3")) + vis.Set([]byte("key4"), []byte("valueNew")) + + // iterate over the keys - exclusive on key5 + iter3 := vis.Iterator([]byte("000"), []byte("key5")) + vals3 := []string{} + defer iter3.Close() + for ; iter3.Valid(); iter3.Next() { + vals3 = append(vals3, string(iter3.Value())) + } + require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3) + iter3.Close() + + // add an estimate to MVS + mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ + "key2": []byte("value1_b"), + }) + + go func() { + // new iter + iter4 := vis.Iterator([]byte("000"), []byte("key5")) + defer iter4.Close() + for ; iter4.Valid(); iter4.Next() { + } + }() + abort := <-abortC // read the abort from the channel + require.Equal(t, 1, abort.DependentTxIdx) + +}