From 5b937781f6aefd2b4ed000c58d8551a55be3a4a8 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 20:14:38 -0500 Subject: [PATCH 1/9] implement iterateset validation --- store/multiversion/memiterator.go | 54 ++++- store/multiversion/mergeiterator.go | 9 +- store/multiversion/mvkv.go | 113 +++++++--- store/multiversion/mvkv_test.go | 11 + store/multiversion/store.go | 118 +++++++++- store/multiversion/store_test.go | 307 +++++++++++++++++++++++++- store/multiversion/trackediterator.go | 41 ++++ 7 files changed, 600 insertions(+), 53 deletions(-) create mode 100644 store/multiversion/trackediterator.go diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 17ffdd0d6..4aa13a9e0 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" occtypes "github.com/cosmos/cosmos-sdk/types/occ" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" ) // Iterates over iterKVCache items. @@ -14,15 +15,17 @@ type memIterator struct { types.Iterator mvStore MultiVersionStore - writeset map[string][]byte + writeset WriteSet index int abortChannel chan occtypes.Abort + ReadsetHandler } func (store *VersionIndexedStore) newMemIterator( start, end []byte, items *dbm.MemDB, ascending bool, + readsetHandler ReadsetHandler, ) *memIterator { var iter types.Iterator var err error @@ -41,11 +44,12 @@ func (store *VersionIndexedStore) newMemIterator( } return &memIterator{ - Iterator: iter, - mvStore: store.multiVersionStore, - index: store.transactionIndex, - abortChannel: store.abortChannel, - writeset: store.GetWriteset(), + Iterator: iter, + mvStore: store.multiVersionStore, + index: store.transactionIndex, + abortChannel: store.abortChannel, + writeset: store.GetWriteset(), + ReadsetHandler: readsetHandler, } } @@ -66,9 +70,47 @@ func (mi *memIterator) Value() []byte { mi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) } + // need to update readset // if we have a deleted value, return nil if val.IsDeleted() { + mi.ReadsetHandler.UpdateReadSet(key, nil) return nil } + mi.ReadsetHandler.UpdateReadSet(key, val.Value()) return val.Value() } + +func (store *Store) newMVSValidationIterator( + index int, + start, end []byte, + items *dbm.MemDB, + ascending bool, + writeset WriteSet, +) (iterator *memIterator, abortChannel chan scheduler.Abort) { + var iter types.Iterator + var err error + + if ascending { + iter, err = items.Iterator(start, end) + } else { + iter, err = items.ReverseIterator(start, end) + } + + if err != nil { + if iter != nil { + iter.Close() + } + panic(err) + } + + abortChannel = make(chan scheduler.Abort, 1) + + return &memIterator{ + Iterator: iter, + mvStore: store, + index: index, + abortChannel: abortChannel, + ReadsetHandler: NoOpHandler{}, + writeset: writeset, + }, abortChannel +} diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go index c0a9d23ef..3b5cee741 100644 --- a/store/multiversion/mergeiterator.go +++ b/store/multiversion/mergeiterator.go @@ -16,6 +16,7 @@ type mvsMergeIterator struct { parent types.Iterator cache types.Iterator ascending bool + ReadsetHandler } var _ types.Iterator = (*mvsMergeIterator)(nil) @@ -23,11 +24,13 @@ var _ types.Iterator = (*mvsMergeIterator)(nil) func NewMVSMergeIterator( parent, cache types.Iterator, ascending bool, + readsetHandler ReadsetHandler, ) *mvsMergeIterator { iter := &mvsMergeIterator{ - parent: parent, - cache: cache, - ascending: ascending, + parent: parent, + cache: cache, + ascending: ascending, + ReadsetHandler: readsetHandler, } return iter diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index b96338c1e..9addc84f7 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -12,12 +12,70 @@ import ( dbm "github.com/tendermint/tm-db" ) +// exposes a handler for adding items to readset, useful for iterators +type ReadsetHandler interface { + UpdateReadSet(key []byte, value []byte) +} + +type NoOpHandler struct{} + +func (NoOpHandler) UpdateReadSet(key []byte, value []byte) {} + +// exposes a handler for adding items to iterateset, to be called upon iterator close +type IterateSetHandler interface { + UpdateIterateSet(*iterationTracker) +} + +type iterationTracker struct { + startKey []byte // start of the iteration range + endKey []byte // end of the iteration range + earlyStopKey []byte // key that caused early stop + iteratedKeys [][]byte // list of keys that were iterated TODO: this is necessary over just a count because we ned to know if key A was removed and B added which would still have the same count, maybe not, because we add iterated keys to readset, maybe its fine? + ascending bool + + writeset WriteSet + + // TODO: is it possible that terimation is affected by keys later in iteration that weren't reached? eg. number of keys affecting iteration? + // TODO: i believe to get number of keys the iteration would need to be done fully so its not a concern? + + // TODO: maybe we need to store keys served from writeset for the transaction? that way if theres OTHER keys within the writeset and the iteration range, and were written to the writeset later, we can discriminate between the groups? + // keysServedFromWriteset map[string]struct{} + + // actually its simpler to just store a copy of the writeset at the time of iterator creation +} + +func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) *iterationTracker { + copyWriteset := make(WriteSet, len(writeset)) + + for key, value := range writeset { + copyWriteset[key] = value + } + + return &iterationTracker{ + startKey: startKey, + endKey: endKey, + iteratedKeys: [][]byte{}, + ascending: ascending, + writeset: copyWriteset, + } +} + +func (item *iterationTracker) AddKey(key []byte) { + item.iteratedKeys = append(item.iteratedKeys, key) +} + +func (item *iterationTracker) SetEarlyStopKey(key []byte) { + item.earlyStopKey = key +} + // Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index type VersionIndexedStore struct { mtx sync.Mutex // used for tracking reads and writes for eventual validation + persistence into multi-version store - readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) - writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + // TODO: does this need sync.Map? + readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) + writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + iterateset []*iterationTracker // TODO: need to add iterateset here as well // dirty keys that haven't been sorted yet for iteration @@ -36,11 +94,14 @@ type VersionIndexedStore struct { } var _ types.KVStore = (*VersionIndexedStore)(nil) +var _ ReadsetHandler = (*VersionIndexedStore)(nil) +var _ IterateSetHandler = (*VersionIndexedStore)(nil) func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore { return &VersionIndexedStore{ readset: make(map[string][]byte), writeset: make(map[string][]byte), + iterateset: []*iterationTracker{}, dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, @@ -97,7 +158,7 @@ func (store *VersionIndexedStore) Get(key []byte) []byte { } // if we didn't find it in the multiversion store, then we want to check the parent store + add to readset parentValue := store.parent.Get(key) - store.updateReadSet(key, parentValue) + store.UpdateReadSet(key, parentValue) return parentValue } @@ -107,7 +168,7 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV if mvsValue.IsDeleted() { value = nil } - store.updateReadSet([]byte(strKey), value) + store.UpdateReadSet([]byte(strKey), value) return value } @@ -201,40 +262,22 @@ func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iter func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator { store.mtx.Lock() defer store.mtx.Unlock() - // TODO: ideally we persist writeset keys into a sorted btree for later use - // make a set of total keys across mvkv and mvs to iterate - keysToIterate := make(map[string]struct{}) - for key := range store.writeset { - keysToIterate[key] = struct{}{} - } + // get the sorted keys from MVS // TODO: ideally we take advantage of mvs keys already being sorted - // get the multiversion store sorted keys - writesetMap := store.multiVersionStore.GetAllWritesetKeys() - for i := 0; i < store.transactionIndex; i++ { - // add all the writesets keys up until current index - for _, key := range writesetMap[i] { - keysToIterate[key] = struct{}{} - } - } // TODO: ideally merge btree and mvs keys into a single sorted btree + memDB := store.multiVersionStore.CollectIteratorItems(store.transactionIndex) - // TODO: this is horribly inefficient, fix this - sortedKeys := make([]string, len(keysToIterate)) - for key := range keysToIterate { - sortedKeys = append(sortedKeys, key) - } - sort.Strings(sortedKeys) - - memDB := dbm.NewMemDB() - for _, key := range sortedKeys { + // TODO: ideally we persist writeset keys into a sorted btree for later use + // make a set of total keys across mvkv and mvs to iterate + for key := range store.writeset { memDB.Set([]byte(key), []byte{}) } var parent, memIterator types.Iterator // make a memIterator - memIterator = store.newMemIterator(start, end, memDB, ascending) + memIterator = store.newMemIterator(start, end, memDB, ascending, store) if ascending { parent = store.parent.Iterator(start, end) @@ -242,8 +285,13 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b parent = store.parent.ReverseIterator(start, end) } + mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store) + + iterationTracker := NewIterationTracker(start, end, ascending, store.writeset) + trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store) + // mergeIterator - return NewMVSMergeIterator(parent, memIterator, ascending) + return trackedIterator } @@ -297,10 +345,15 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset) } -func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { +func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { // add to readset keyStr := string(key) store.readset[keyStr] = value // add to dirty set store.dirtySet[keyStr] = struct{}{} } + +func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker *iterationTracker) { + // append to iterateset + store.iterateset = append(store.iterateset, iterationTracker) +} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index 23bcc472a..b8aaf4090 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -354,6 +354,17 @@ func TestIterator(t *testing.T) { require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3) iter3.Close() + vis.Set([]byte("key6"), []byte("value6")) + // iterate over the keys, writeset being the last of the iteration range + iter4 := vis.Iterator([]byte("000"), []byte("key7")) + vals4 := []string{} + defer iter4.Close() + for ; iter4.Valid(); iter4.Next() { + vals4 = append(vals4, string(iter4.Value())) + } + require.Equal(t, []string{"value1", "value2", "value3", "valueNew", "value5", "value6"}, vals4) + iter4.Close() + // add an estimate to MVS mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ "key2": []byte("value1_b"), diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 08c45204b..43c6ed4ad 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -8,6 +8,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" + db "github.com/tendermint/tm-db" ) type MultiVersionStore interface { @@ -19,13 +20,17 @@ type MultiVersionStore interface { InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) GetAllWritesetKeys() map[int][]string + CollectIteratorItems(index int) *db.MemDB SetReadset(index int, readset ReadSet) GetReadset(index int) ReadSet - ValidateTransactionState(index int) []int + SetIterateset(index int, iterateset Iterateset) + GetIterateset(index int) Iterateset + ValidateTransactionState(index int) (bool, []int) } type WriteSet map[string][]byte type ReadSet map[string][]byte +type Iterateset []iterationTracker var _ MultiVersionStore = (*Store)(nil) @@ -37,6 +42,7 @@ type Store struct { txWritesetKeys map[int][]string // map of tx index -> writeset keys txReadSets map[int]ReadSet + txIterateSets map[int]Iterateset parentStore types.KVStore } @@ -46,6 +52,7 @@ func NewMultiVersionStore(parentStore types.KVStore) *Store { multiVersionMap: make(map[string]MultiVersionValue), txWritesetKeys: make(map[int][]string), txReadSets: make(map[int]ReadSet), + txIterateSets: make(map[int]Iterateset), parentStore: parentStore, } } @@ -212,9 +219,108 @@ func (s *Store) GetReadset(index int) ReadSet { return s.txReadSets[index] } -func (s *Store) ValidateTransactionState(index int) []int { +func (s *Store) SetIterateset(index int, iterateset Iterateset) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.txIterateSets[index] = iterateset +} + +func (s *Store) GetIterateset(index int) Iterateset { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.txIterateSets[index] +} + +// CollectIteratorItems implements MultiVersionStore. It will return a memDB containing all of the keys present in the multiversion store within the iteration range prior to (exclusive of) the index. +func (s *Store) CollectIteratorItems(index int) *db.MemDB { + sortedItems := db.NewMemDB() + + // get all writeset keys prior to index + keys := s.GetAllWritesetKeys() + for i := 0; i < index; i++ { + indexedWriteset := keys[i] + // TODO: do we want to exclude keys out of the range or just let the iterator handle it? + for _, key := range indexedWriteset { + // TODO: inefficient because (logn) for each key + rebalancing? maybe theres a better way to add to a tree to reduce rebalancing overhead + sortedItems.Set([]byte(key), []byte{}) + } + } + return sortedItems +} + +func (s *Store) validateIterator(index int, iterationTracker iterationTracker) bool { + // TODO: what if we added a key LATER within the transaction AFTER iteration. in that case, it won't be in expected Keys, but would be in the iteration range, causing issue? + // collect items from multiversion store + sortedItems := s.CollectIteratorItems(index) + // add the iterationtracker writeset keys to the sorted items + for key, _ := range iterationTracker.writeset { + sortedItems.Set([]byte(key), []byte{}) + } + + var parentIter types.Iterator + + iter, abortChannel := s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, sortedItems, iterationTracker.ascending, iterationTracker.writeset) + if iterationTracker.ascending { + parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) + } else { + parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) + } + // create a new MVSMergeiterator + mergeIterator := NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) + defer mergeIterator.Close() + keys := iterationTracker.iteratedKeys + + validChan := make(chan bool, 1) + + // listen for abort while iterating + go func(expectedKeys [][]byte, returnChan chan bool) { + for ; mergeIterator.Valid(); mergeIterator.Next() { + if len(expectedKeys) == 0 { + // if we have no more expected keys, then the iterator is invalid + returnChan <- false + return + } + key := mergeIterator.Key() + if !bytes.Equal(key, expectedKeys[0]) { + // if key isn't equal, that means that the iterator is invalid + returnChan <- false + return + } + expectedKeys = expectedKeys[1:] + + // if our iterator key was the early stop, then we can break + if bytes.Equal(key, iterationTracker.earlyStopKey) { + returnChan <- true + return + } + } + returnChan <- !(len(expectedKeys) > 0) + }(keys, validChan) + + select { + case <-abortChannel: + // if we get an abort, then we know that the iterator is invalid + return false + case valid := <-validChan: + return valid + } +} + +// TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? +func (s *Store) ValidateTransactionState(index int) (bool, []int) { defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate") conflictSet := map[int]struct{}{} + valid := true + + // TODO: validate iterateset + // TODO: can we parallelize for all iterators? + iterateset := s.GetIterateset(index) + for _, iterationTracker := range iterateset { + iteratorValid := s.validateIterator(index, iterationTracker) + valid = valid && iteratorValid + } // validate readset readset := s.GetReadset(index) @@ -235,14 +341,14 @@ func (s *Store) ValidateTransactionState(index int) []int { } else if latestValue.IsDeleted() { if value != nil { // conflict - conflictSet[latestValue.Index()] = struct{}{} + // TODO: would we want to return early? + valid = false } } else if !bytes.Equal(latestValue.Value(), value) { - conflictSet[latestValue.Index()] = struct{}{} + valid = false } } } - // TODO: validate iterateset // convert conflictset into sorted indices conflictIndices := make([]int, 0, len(conflictSet)) @@ -251,7 +357,7 @@ func (s *Store) ValidateTransactionState(index int) []int { } sort.Ints(conflictIndices) - return conflictIndices + return valid, conflictIndices } func (s *Store) WriteLatestToStore() { diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index bb56d1e71..15333d5cc 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -191,11 +191,13 @@ func TestMultiVersionStoreValidateState(t *testing.T) { mvs.SetReadset(5, readset) // assert no readset is valid - conflicts := mvs.ValidateTransactionState(4) + valid, conflicts := mvs.ValidateTransactionState(4) + require.True(t, valid) require.Empty(t, conflicts) // assert readset index 5 is valid - conflicts = mvs.ValidateTransactionState(5) + valid, conflicts = mvs.ValidateTransactionState(5) + require.True(t, valid) require.Empty(t, conflicts) // introduce conflict @@ -204,8 +206,9 @@ func TestMultiVersionStoreValidateState(t *testing.T) { }) // expect index 2 to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2}, conflicts) + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) // add a conflict due to deletion mvs.SetWriteset(3, 1, map[string][]byte{ @@ -213,8 +216,9 @@ func TestMultiVersionStoreValidateState(t *testing.T) { }) // expect indices 2 and 3 to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2, 3}, conflicts) + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) // add a conflict due to estimate mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ @@ -222,10 +226,297 @@ func TestMultiVersionStoreValidateState(t *testing.T) { }) // expect indices 2, 3, and 4to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2, 3, 4}, conflicts) + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Equal(t, []int{4}, conflicts) // assert panic for parent store mismatch parentKVStore.Set([]byte("key5"), []byte("value6")) require.Panics(t, func() { mvs.ValidateTransactionState(5) }) } + +func TestMVSIteratorValidation(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // test basic iteration + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithEstimate(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + writeset2 := make(multiversion.WriteSet) + writeset2["key2"] = []byte("value2") + mvs.SetEstimatedWriteset(2, 2, writeset2) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Equal(t, []int{2}, conflicts) +} + +func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + + // deletion of 2 and introduction of 3 + writeset2 := make(multiversion.WriteSet) + writeset2["key2"] = nil + writeset2["key3"] = []byte("valueX") + mvs.SetWriteset(2, 2, writeset2) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, make(multiversion.WriteSet)) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + + // addition of key6 + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(2, 2, writeset2) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithWritesetValues(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // addition of key6 IN the transaction - but BEFORE the iteration occurred + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(5, 2, writeset2) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, writeset2) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + iterationTracker.AddKey([]byte("key6")) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithWritesetValuesSetAfterIteration(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // addition of key6 IN the transaction - but BEFORE the iteration occurred + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(5, 2, writeset2) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, make(multiversion.WriteSet)) + iterationTracker.AddKey([]byte("key1")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key5")) + // no key6 because the iteration was performed BEFORE the write + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationReverse(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // addition of key6 IN the transaction - but BEFORE the iteration occurred + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(5, 2, writeset2) + + iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), false, writeset2) + iterationTracker.AddKey([]byte("key6")) + iterationTracker.AddKey([]byte("key5")) + iterationTracker.AddKey([]byte("key4")) + iterationTracker.AddKey([]byte("key2")) + iterationTracker.AddKey([]byte("key1")) + + mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} diff --git a/store/multiversion/trackediterator.go b/store/multiversion/trackediterator.go new file mode 100644 index 000000000..d537a3023 --- /dev/null +++ b/store/multiversion/trackediterator.go @@ -0,0 +1,41 @@ +package multiversion + +import "github.com/cosmos/cosmos-sdk/store/types" + +// tracked iterator is a wrapper around an existing iterator to track the iterator progress and monitor which keys are iterated. +type trackedIterator struct { + types.Iterator + + iterateset *iterationTracker + IterateSetHandler +} + +// TODO: test + +func NewTrackedIterator(iter types.Iterator, iterationTracker *iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { + return &trackedIterator{ + Iterator: iter, + iterateset: iterationTracker, + IterateSetHandler: iterateSetHandler, + } +} + +// Close calls first updates the iterateset from the iterator, and then calls iterator.Close() +func (ti *trackedIterator) Close() error { + // TODO: if there are more keys to the iterator, then we consider it early stopped? + if ti.Iterator.Valid() { + // TODO: test whether reaching end of iteration range means valid is true or false + ti.iterateset.SetEarlyStopKey(ti.Iterator.Key()) + } + // Update iterate set + ti.IterateSetHandler.UpdateIterateSet(ti.iterateset) + return ti.Iterator.Close() +} + +// Key calls the iterator.Key() and adds the key to the iterateset, then returns the key from the iterator +func (ti *trackedIterator) Key() []byte { + key := ti.Iterator.Key() + // add key to the tracker + ti.iterateset.AddKey(key) + return key +} From ddb92a89997a7e054373d142f07b976c13b5ca01 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 22:55:13 -0500 Subject: [PATCH 2/9] fix early stop detection in tracked iterator and update tests --- store/multiversion/memiterator.go | 5 +- store/multiversion/mvkv.go | 27 +-- store/multiversion/store.go | 11 +- store/multiversion/store_test.go | 229 +++++++++++++++----------- store/multiversion/trackediterator.go | 20 ++- 5 files changed, 178 insertions(+), 114 deletions(-) diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 4aa13a9e0..cd1c575d3 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -5,7 +5,6 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" occtypes "github.com/cosmos/cosmos-sdk/types/occ" - scheduler "github.com/cosmos/cosmos-sdk/types/occ" ) // Iterates over iterKVCache items. @@ -86,7 +85,7 @@ func (store *Store) newMVSValidationIterator( items *dbm.MemDB, ascending bool, writeset WriteSet, -) (iterator *memIterator, abortChannel chan scheduler.Abort) { +) (iterator *memIterator, abortChannel chan occtypes.Abort) { var iter types.Iterator var err error @@ -103,7 +102,7 @@ func (store *Store) newMVSValidationIterator( panic(err) } - abortChannel = make(chan scheduler.Abort, 1) + abortChannel = make(chan occtypes.Abort, 1) return &memIterator{ Iterator: iter, diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 9addc84f7..1b2f947c1 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -23,14 +23,14 @@ func (NoOpHandler) UpdateReadSet(key []byte, value []byte) {} // exposes a handler for adding items to iterateset, to be called upon iterator close type IterateSetHandler interface { - UpdateIterateSet(*iterationTracker) + UpdateIterateSet(iterationTracker) } type iterationTracker struct { - startKey []byte // start of the iteration range - endKey []byte // end of the iteration range - earlyStopKey []byte // key that caused early stop - iteratedKeys [][]byte // list of keys that were iterated TODO: this is necessary over just a count because we ned to know if key A was removed and B added which would still have the same count, maybe not, because we add iterated keys to readset, maybe its fine? + startKey []byte // start of the iteration range + endKey []byte // end of the iteration range + earlyStopKey []byte // key that caused early stop + iteratedKeys map[string]struct{} // TODO: is a map okay because the ordering will be enforced when we replay the iterator? ascending bool writeset WriteSet @@ -44,24 +44,24 @@ type iterationTracker struct { // actually its simpler to just store a copy of the writeset at the time of iterator creation } -func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) *iterationTracker { +func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) iterationTracker { copyWriteset := make(WriteSet, len(writeset)) for key, value := range writeset { copyWriteset[key] = value } - return &iterationTracker{ + return iterationTracker{ startKey: startKey, endKey: endKey, - iteratedKeys: [][]byte{}, + iteratedKeys: make(map[string]struct{}), ascending: ascending, writeset: copyWriteset, } } func (item *iterationTracker) AddKey(key []byte) { - item.iteratedKeys = append(item.iteratedKeys, key) + item.iteratedKeys[string(key)] = struct{}{} } func (item *iterationTracker) SetEarlyStopKey(key []byte) { @@ -75,7 +75,7 @@ type VersionIndexedStore struct { // TODO: does this need sync.Map? readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) writeset map[string][]byte // contains the key -> value mapping for all keys written to the store - iterateset []*iterationTracker + iterateset Iterateset // TODO: need to add iterateset here as well // dirty keys that haven't been sorted yet for iteration @@ -101,7 +101,7 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion return &VersionIndexedStore{ readset: make(map[string][]byte), writeset: make(map[string][]byte), - iterateset: []*iterationTracker{}, + iterateset: []iterationTracker{}, dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, @@ -336,6 +336,8 @@ func (store *VersionIndexedStore) WriteToMultiVersionStore() { defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset) + store.multiVersionStore.SetReadset(store.transactionIndex, store.readset) + store.multiVersionStore.SetIterateset(store.transactionIndex, store.iterateset) } func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { @@ -343,6 +345,7 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset) + // TODO: do we need to write readset and iterateset in this case? I don't think so since if this is called it means we aren't doing validation } func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { @@ -353,7 +356,7 @@ func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { store.dirtySet[keyStr] = struct{}{} } -func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker *iterationTracker) { +func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) { // append to iterateset store.iterateset = append(store.iterateset, iterationTracker) } diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 43c6ed4ad..12344f76b 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -255,7 +255,7 @@ func (s *Store) validateIterator(index int, iterationTracker iterationTracker) b // collect items from multiversion store sortedItems := s.CollectIteratorItems(index) // add the iterationtracker writeset keys to the sorted items - for key, _ := range iterationTracker.writeset { + for key := range iterationTracker.writeset { sortedItems.Set([]byte(key), []byte{}) } @@ -275,7 +275,7 @@ func (s *Store) validateIterator(index int, iterationTracker iterationTracker) b validChan := make(chan bool, 1) // listen for abort while iterating - go func(expectedKeys [][]byte, returnChan chan bool) { + go func(expectedKeys map[string]struct{}, returnChan chan bool) { for ; mergeIterator.Valid(); mergeIterator.Next() { if len(expectedKeys) == 0 { // if we have no more expected keys, then the iterator is invalid @@ -283,12 +283,13 @@ func (s *Store) validateIterator(index int, iterationTracker iterationTracker) b return } key := mergeIterator.Key() - if !bytes.Equal(key, expectedKeys[0]) { - // if key isn't equal, that means that the iterator is invalid + if _, ok := expectedKeys[string(key)]; !ok { + // if key isn't found returnChan <- false return } - expectedKeys = expectedKeys[1:] + //remove from expected keys + delete(expectedKeys, string(key)) // if our iterator key was the early stop, then we can break if bytes.Equal(key, iterationTracker.earlyStopKey) { diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 15333d5cc..d5cfef4ad 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -1,10 +1,12 @@ package multiversion_test import ( + "bytes" "testing" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/multiversion" + "github.com/cosmos/cosmos-sdk/types/occ" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" ) @@ -238,6 +240,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) { func TestMVSIteratorValidation(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -250,22 +253,15 @@ func TestMVSIteratorValidation(t *testing.T) { writeset["key3"] = nil mvs.SetWriteset(1, 2, writeset) - readset := make(multiversion.ReadSet) - readset["key1"] = []byte("value1") - readset["key2"] = []byte("value2") - readset["key3"] = nil - readset["key4"] = []byte("value4") - readset["key5"] = []byte("value5") - mvs.SetReadset(5, readset) - // test basic iteration - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) + iter := vis.ReverseIterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be valid valid, conflicts := mvs.ValidateTransactionState(5) require.True(t, valid) @@ -275,6 +271,7 @@ func TestMVSIteratorValidation(t *testing.T) { func TestMVSIteratorValidationWithEstimate(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -287,25 +284,18 @@ func TestMVSIteratorValidationWithEstimate(t *testing.T) { writeset["key3"] = nil mvs.SetWriteset(1, 2, writeset) + iter := vis.Iterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() + writeset2 := make(multiversion.WriteSet) writeset2["key2"] = []byte("value2") mvs.SetEstimatedWriteset(2, 2, writeset2) - readset := make(multiversion.ReadSet) - readset["key1"] = []byte("value1") - readset["key2"] = []byte("value2") - readset["key3"] = nil - readset["key4"] = []byte("value4") - readset["key5"] = []byte("value5") - mvs.SetReadset(5, readset) - - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) - - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be invalid valid, conflicts := mvs.ValidateTransactionState(5) require.False(t, valid) @@ -315,6 +305,7 @@ func TestMVSIteratorValidationWithEstimate(t *testing.T) { func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -327,19 +318,13 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { writeset["key3"] = nil mvs.SetWriteset(1, 2, writeset) - readset := make(multiversion.ReadSet) - readset["key1"] = []byte("value1") - readset["key2"] = []byte("value2") - readset["key3"] = nil - readset["key4"] = []byte("value4") - readset["key5"] = []byte("value5") - mvs.SetReadset(5, readset) - - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key6"), true, make(multiversion.WriteSet)) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) + iter := vis.Iterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() // deletion of 2 and introduction of 3 writeset2 := make(multiversion.WriteSet) @@ -347,7 +332,6 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { writeset2["key3"] = []byte("valueX") mvs.SetWriteset(2, 2, writeset2) - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be invalid valid, conflicts := mvs.ValidateTransactionState(5) require.False(t, valid) @@ -357,6 +341,7 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -369,26 +354,19 @@ func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { writeset["key3"] = nil mvs.SetWriteset(1, 2, writeset) - readset := make(multiversion.ReadSet) - readset["key1"] = []byte("value1") - readset["key2"] = []byte("value2") - readset["key3"] = nil - readset["key4"] = []byte("value4") - readset["key5"] = []byte("value5") - mvs.SetReadset(5, readset) - - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, make(multiversion.WriteSet)) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() // addition of key6 writeset2 := make(multiversion.WriteSet) writeset2["key6"] = []byte("value6") mvs.SetWriteset(2, 2, writeset2) - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be invalid valid, conflicts := mvs.ValidateTransactionState(5) require.False(t, valid) @@ -398,6 +376,38 @@ func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { func TestMVSIteratorValidationWithWritesetValues(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + // set a key BEFORE iteration occurred + vis.Set([]byte("key6"), []byte("value6")) + + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() + vis.WriteToMultiVersionStore() + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithWritesetValuesSetAfterIteration(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -418,28 +428,26 @@ func TestMVSIteratorValidationWithWritesetValues(t *testing.T) { readset["key5"] = []byte("value5") mvs.SetReadset(5, readset) - // addition of key6 IN the transaction - but BEFORE the iteration occurred - writeset2 := make(multiversion.WriteSet) - writeset2["key6"] = []byte("value6") - mvs.SetWriteset(5, 2, writeset2) + // no key6 because the iteration was performed BEFORE the write + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, writeset2) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) - iterationTracker.AddKey([]byte("key6")) + // write key 6 AFTER iterator went + vis.Set([]byte("key6"), []byte("value6")) + vis.WriteToMultiVersionStore() - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be valid valid, conflicts := mvs.ValidateTransactionState(5) require.True(t, valid) require.Empty(t, conflicts) } -func TestMVSIteratorValidationWithWritesetValuesSetAfterIteration(t *testing.T) { +func TestMVSIteratorValidationReverse(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -460,28 +468,25 @@ func TestMVSIteratorValidationWithWritesetValuesSetAfterIteration(t *testing.T) readset["key5"] = []byte("value5") mvs.SetReadset(5, readset) - // addition of key6 IN the transaction - but BEFORE the iteration occurred - writeset2 := make(multiversion.WriteSet) - writeset2["key6"] = []byte("value6") - mvs.SetWriteset(5, 2, writeset2) + // set a key BEFORE iteration occurred + vis.Set([]byte("key6"), []byte("value6")) - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), true, make(multiversion.WriteSet)) - iterationTracker.AddKey([]byte("key1")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key5")) - // no key6 because the iteration was performed BEFORE the write + iter := vis.ReverseIterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() + vis.WriteToMultiVersionStore() - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be valid valid, conflicts := mvs.ValidateTransactionState(5) require.True(t, valid) require.Empty(t, conflicts) } -func TestMVSIteratorValidationReverse(t *testing.T) { +func TestMVSIteratorValidationEarlyStop(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key3"), []byte("value3")) @@ -499,22 +504,62 @@ func TestMVSIteratorValidationReverse(t *testing.T) { readset["key2"] = []byte("value2") readset["key3"] = nil readset["key4"] = []byte("value4") - readset["key5"] = []byte("value5") mvs.SetReadset(5, readset) - // addition of key6 IN the transaction - but BEFORE the iteration occurred + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read the value and see if we want to break + if bytes.Equal(iter.Key(), []byte("key4")) { + break + } + } + iter.Close() + vis.WriteToMultiVersionStore() + + // removal of key5 - but irrelevant because of early stop writeset2 := make(multiversion.WriteSet) - writeset2["key6"] = []byte("value6") - mvs.SetWriteset(5, 2, writeset2) + writeset2["key5"] = nil + mvs.SetWriteset(2, 2, writeset2) + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +// TODO: what about early stop with a new key added in the range? - especially if its the last key that we stopped at? +func TestMVSIteratorValidationEarlyStopAtEndOfRange(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) - iterationTracker := multiversion.NewIterationTracker([]byte("key1"), []byte("key7"), false, writeset2) - iterationTracker.AddKey([]byte("key6")) - iterationTracker.AddKey([]byte("key5")) - iterationTracker.AddKey([]byte("key4")) - iterationTracker.AddKey([]byte("key2")) - iterationTracker.AddKey([]byte("key1")) + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + // test basic iteration + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read the value and see if we want to break + if bytes.Equal(iter.Key(), []byte("key5")) { + break + } + } + iter.Close() + vis.WriteToMultiVersionStore() + + // add key6 + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(2, 2, writeset2) - mvs.SetIterateset(5, multiversion.Iterateset{*iterationTracker}) // should be valid valid, conflicts := mvs.ValidateTransactionState(5) require.True(t, valid) diff --git a/store/multiversion/trackediterator.go b/store/multiversion/trackediterator.go index d537a3023..361d848cb 100644 --- a/store/multiversion/trackediterator.go +++ b/store/multiversion/trackediterator.go @@ -6,13 +6,13 @@ import "github.com/cosmos/cosmos-sdk/store/types" type trackedIterator struct { types.Iterator - iterateset *iterationTracker + iterateset iterationTracker IterateSetHandler } // TODO: test -func NewTrackedIterator(iter types.Iterator, iterationTracker *iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { +func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { return &trackedIterator{ Iterator: iter, iterateset: iterationTracker, @@ -39,3 +39,19 @@ func (ti *trackedIterator) Key() []byte { ti.iterateset.AddKey(key) return key } + +// Value calls the iterator.Key() and adds the key to the iterateset, then returns the value from the iterator +func (ti *trackedIterator) Value() []byte { + key := ti.Iterator.Key() + // add key to the tracker + ti.iterateset.AddKey(key) + return ti.Iterator.Value() +} + +func (ti *trackedIterator) Next() { + // add current key to the tracker + key := ti.Iterator.Key() + ti.iterateset.AddKey(key) + // call next + ti.Iterator.Next() +} From 65369e32ddd4eae6d15a70790fa6f472c339b5ff Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 23:36:03 -0500 Subject: [PATCH 3/9] resolved race with iterator close and removed some TODOs --- store/multiversion/memiterator.go | 7 +++-- store/multiversion/store.go | 45 +++++++++++++++---------------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index cd1c575d3..7985590ea 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -85,7 +85,8 @@ func (store *Store) newMVSValidationIterator( items *dbm.MemDB, ascending bool, writeset WriteSet, -) (iterator *memIterator, abortChannel chan occtypes.Abort) { + abortChannel chan occtypes.Abort, +) *memIterator { var iter types.Iterator var err error @@ -102,8 +103,6 @@ func (store *Store) newMVSValidationIterator( panic(err) } - abortChannel = make(chan occtypes.Abort, 1) - return &memIterator{ Iterator: iter, mvStore: store, @@ -111,5 +110,5 @@ func (store *Store) newMVSValidationIterator( abortChannel: abortChannel, ReadsetHandler: NoOpHandler{}, writeset: writeset, - }, abortChannel + } } diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 12344f76b..9c45d8992 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -8,6 +8,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" + occtypes "github.com/cosmos/cosmos-sdk/types/occ" db "github.com/tendermint/tm-db" ) @@ -250,32 +251,29 @@ func (s *Store) CollectIteratorItems(index int) *db.MemDB { return sortedItems } -func (s *Store) validateIterator(index int, iterationTracker iterationTracker) bool { - // TODO: what if we added a key LATER within the transaction AFTER iteration. in that case, it won't be in expected Keys, but would be in the iteration range, causing issue? +func (s *Store) validateIterator(index int, tracker iterationTracker) bool { // collect items from multiversion store sortedItems := s.CollectIteratorItems(index) // add the iterationtracker writeset keys to the sorted items - for key := range iterationTracker.writeset { + for key := range tracker.writeset { sortedItems.Set([]byte(key), []byte{}) } - - var parentIter types.Iterator - - iter, abortChannel := s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, sortedItems, iterationTracker.ascending, iterationTracker.writeset) - if iterationTracker.ascending { - parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) - } else { - parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) - } - // create a new MVSMergeiterator - mergeIterator := NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) - defer mergeIterator.Close() - keys := iterationTracker.iteratedKeys - - validChan := make(chan bool, 1) + validChannel := make(chan bool, 1) + abortChannel := make(chan occtypes.Abort, 1) // listen for abort while iterating - go func(expectedKeys map[string]struct{}, returnChan chan bool) { + go func(iterationTracker iterationTracker, items *db.MemDB, returnChan chan bool, abortChan chan occtypes.Abort) { + var parentIter types.Iterator + expectedKeys := iterationTracker.iteratedKeys + iter := s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, items, iterationTracker.ascending, iterationTracker.writeset, abortChan) + if iterationTracker.ascending { + parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) + } else { + parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) + } + // create a new MVSMergeiterator + mergeIterator := NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) + defer mergeIterator.Close() for ; mergeIterator.Valid(); mergeIterator.Next() { if len(expectedKeys) == 0 { // if we have no more expected keys, then the iterator is invalid @@ -288,7 +286,7 @@ func (s *Store) validateIterator(index int, iterationTracker iterationTracker) b returnChan <- false return } - //remove from expected keys + // remove from expected keys delete(expectedKeys, string(key)) // if our iterator key was the early stop, then we can break @@ -298,13 +296,12 @@ func (s *Store) validateIterator(index int, iterationTracker iterationTracker) b } } returnChan <- !(len(expectedKeys) > 0) - }(keys, validChan) - + }(tracker, sortedItems, validChannel, abortChannel) select { case <-abortChannel: // if we get an abort, then we know that the iterator is invalid return false - case valid := <-validChan: + case valid := <-validChannel: return valid } } @@ -315,7 +312,6 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { conflictSet := map[int]struct{}{} valid := true - // TODO: validate iterateset // TODO: can we parallelize for all iterators? iterateset := s.GetIterateset(index) for _, iterationTracker := range iterateset { @@ -338,6 +334,7 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { } else { // if estimate, mark as conflict index if latestValue.IsEstimate() { + valid = false conflictSet[latestValue.Index()] = struct{}{} } else if latestValue.IsDeleted() { if value != nil { From ebcc614d5755fb5b39d1afa411cbfadc98c6d7f1 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 23:41:03 -0500 Subject: [PATCH 4/9] Address comments --- store/multiversion/memiterator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 7985590ea..43e8e306b 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -72,10 +72,10 @@ func (mi *memIterator) Value() []byte { // need to update readset // if we have a deleted value, return nil if val.IsDeleted() { - mi.ReadsetHandler.UpdateReadSet(key, nil) + defer mi.ReadsetHandler.UpdateReadSet(key, nil) return nil } - mi.ReadsetHandler.UpdateReadSet(key, val.Value()) + defer mi.ReadsetHandler.UpdateReadSet(key, val.Value()) return val.Value() } From 7cfcbe77a98beb1104df489fff805ee882a49b2d Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 23:41:14 -0500 Subject: [PATCH 5/9] Address comments --- store/multiversion/store.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 9c45d8992..45ad2e727 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -241,7 +241,10 @@ func (s *Store) CollectIteratorItems(index int) *db.MemDB { // get all writeset keys prior to index keys := s.GetAllWritesetKeys() for i := 0; i < index; i++ { - indexedWriteset := keys[i] + indexedWriteset, ok := keys[i] + if !ok { + continue + } // TODO: do we want to exclude keys out of the range or just let the iterator handle it? for _, key := range indexedWriteset { // TODO: inefficient because (logn) for each key + rebalancing? maybe theres a better way to add to a tree to reduce rebalancing overhead From 8d72fc4ad0cff7eb90710f3636258fcfdb4f7696 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Wed, 18 Oct 2023 23:48:24 -0500 Subject: [PATCH 6/9] update test --- store/multiversion/mvkv_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index b8aaf4090..44304fd50 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -321,6 +321,12 @@ func TestIterator(t *testing.T) { // iterate over the keys - exclusive on key5 iter := vis.Iterator([]byte("000"), []byte("key5")) + + // verify domain is superset + start, end := iter.Domain() + require.Equal(t, []byte("000"), start) + require.Equal(t, []byte("key5"), end) + vals := []string{} defer iter.Close() for ; iter.Valid(); iter.Next() { From 828ad57856e3d5c03bee5faccc8f5f298981c5ca Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Thu, 19 Oct 2023 08:40:52 -0500 Subject: [PATCH 7/9] add todo --- store/multiversion/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 45ad2e727..1759dd464 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -358,6 +358,7 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { } sort.Ints(conflictIndices) + // TODO: maybe we have an indicator for the case where all of the validations are valid EXCEPT for some estimates encountered, in which case we may not want to re-execute, simply revalidate once the ESTIMATES are cleared return valid, conflictIndices } From 34ee51f20ab3bfaf264ce367344140496376c605 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Thu, 19 Oct 2023 09:00:55 -0500 Subject: [PATCH 8/9] Add validation condition where estimates don't invalidate the readset, since we can validate again --- store/multiversion/store.go | 3 +-- store/multiversion/store_test.go | 40 +++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 1759dd464..77573f417 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -335,9 +335,8 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { panic("there shouldn't be readset conflicts with parent kv store, since it shouldn't change") } } else { - // if estimate, mark as conflict index + // if estimate, mark as conflict index - but don't invalidate if latestValue.IsEstimate() { - valid = false conflictSet[latestValue.Index()] = struct{}{} } else if latestValue.IsDeleted() { if value != nil { diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index d5cfef4ad..84e9f77ac 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -207,7 +207,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) { "key3": []byte("value6"), }) - // expect index 2 to be returned + // expect failure with empty conflicts valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) require.Empty(t, conflicts) @@ -217,7 +217,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) { "key1": nil, }) - // expect indices 2 and 3 to be returned + // expect failure with empty conflicts valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) require.Empty(t, conflicts) @@ -227,7 +227,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) { "key2": []byte("test"), }) - // expect indices 2, 3, and 4to be returned + // expect index 4 to be returned valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) require.Equal(t, []int{4}, conflicts) @@ -237,6 +237,40 @@ func TestMultiVersionStoreValidateState(t *testing.T) { require.Panics(t, func() { mvs.ValidateTransactionState(5) }) } +func TestMVSValidationWithOnlyEstimate(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // add a conflict due to estimate + mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ + "key2": []byte("test"), + }) + + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Equal(t, []int{4}, conflicts) + +} + func TestMVSIteratorValidation(t *testing.T) { parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} mvs := multiversion.NewMultiVersionStore(parentKVStore) From 690034d3e1b075bf758c39182bac0e9eee404979 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Thu, 19 Oct 2023 09:01:12 -0500 Subject: [PATCH 9/9] remove todo --- store/multiversion/store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 77573f417..0d16f12d6 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -357,7 +357,6 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { } sort.Ints(conflictIndices) - // TODO: maybe we have an indicator for the case where all of the validations are valid EXCEPT for some estimates encountered, in which case we may not want to re-execute, simply revalidate once the ESTIMATES are cleared return valid, conflictIndices }