From 41d58374e1f1c96fb66ccb96eb1fc18c4060ea10 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Thu, 19 Oct 2023 09:51:08 -0500 Subject: [PATCH] [occ] Iterateset tracking and validation implementation (#337) ## Describe your changes and provide context This implements a tracked iterator that is used to keep track of keys that have been iterated, and to also save metadata about the iteration for LATER validation. The iterator will be replayed and if there are any new keys / any keys missing within the iteration range, it will fail validation. the actual values served by the iterator are covered by readset validation. Additionally, the early stop behavior allows the iterateset to ONLY be sensitive to changes to the keys available WITHIN the iteration range. In the event that we perform iteration, and THEN write a key within the range of iteration, this will not fail iteration because we take a snapshot of the mvkv writeset at the moment of iteration, so when we replay the iterator, we populate that iterator with the writeset at that time, so we appropriately replicate the iterator behavior. In the case that we encounter an ESTIMATE, we have to terminate the iterator validation and mark it as failed because it is impossible to know whether that ESTIMATE represents a value change or a delete, since the latter, will affect the keys available for iteration. This change also implements handlers that iterators receive for updating readset and iterateset in the `mvkv` ## Testing performed to validate your change Unit tests for various iteration scenarios --- store/multiversion/memiterator.go | 52 +++- store/multiversion/mergeiterator.go | 9 +- store/multiversion/mvkv.go | 116 ++++++-- store/multiversion/mvkv_test.go | 17 ++ store/multiversion/store.go | 120 +++++++- store/multiversion/store_test.go | 392 +++++++++++++++++++++++++- store/multiversion/trackediterator.go | 57 ++++ 7 files changed, 706 insertions(+), 57 deletions(-) create mode 100644 store/multiversion/trackediterator.go diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 17ffdd0d6..43e8e306b 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -14,15 +14,17 @@ type memIterator struct { types.Iterator mvStore MultiVersionStore - writeset map[string][]byte + writeset WriteSet index int abortChannel chan occtypes.Abort + ReadsetHandler } func (store *VersionIndexedStore) newMemIterator( start, end []byte, items *dbm.MemDB, ascending bool, + readsetHandler ReadsetHandler, ) *memIterator { var iter types.Iterator var err error @@ -41,11 +43,12 @@ func (store *VersionIndexedStore) newMemIterator( } return &memIterator{ - Iterator: iter, - mvStore: store.multiVersionStore, - index: store.transactionIndex, - abortChannel: store.abortChannel, - writeset: store.GetWriteset(), + Iterator: iter, + mvStore: store.multiVersionStore, + index: store.transactionIndex, + abortChannel: store.abortChannel, + writeset: store.GetWriteset(), + ReadsetHandler: readsetHandler, } } @@ -66,9 +69,46 @@ func (mi *memIterator) Value() []byte { mi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) } + // need to update readset // if we have a deleted value, return nil if val.IsDeleted() { + defer mi.ReadsetHandler.UpdateReadSet(key, nil) return nil } + defer mi.ReadsetHandler.UpdateReadSet(key, val.Value()) return val.Value() } + +func (store *Store) newMVSValidationIterator( + index int, + start, end []byte, + items *dbm.MemDB, + ascending bool, + writeset WriteSet, + abortChannel chan occtypes.Abort, +) *memIterator { + var iter types.Iterator + var err error + + if ascending { + iter, err = items.Iterator(start, end) + } else { + iter, err = items.ReverseIterator(start, end) + } + + if err != nil { + if iter != nil { + iter.Close() + } + panic(err) + } + + return &memIterator{ + Iterator: iter, + mvStore: store, + index: index, + abortChannel: abortChannel, + ReadsetHandler: NoOpHandler{}, + writeset: writeset, + } +} diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go index c0a9d23ef..3b5cee741 100644 --- a/store/multiversion/mergeiterator.go +++ b/store/multiversion/mergeiterator.go @@ -16,6 +16,7 @@ type mvsMergeIterator struct { parent types.Iterator cache types.Iterator ascending bool + ReadsetHandler } var _ types.Iterator = (*mvsMergeIterator)(nil) @@ -23,11 +24,13 @@ var _ types.Iterator = (*mvsMergeIterator)(nil) func NewMVSMergeIterator( parent, cache types.Iterator, ascending bool, + readsetHandler ReadsetHandler, ) *mvsMergeIterator { iter := &mvsMergeIterator{ - parent: parent, - cache: cache, - ascending: ascending, + parent: parent, + cache: cache, + ascending: ascending, + ReadsetHandler: readsetHandler, } return iter diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index b96338c1e..1b2f947c1 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -12,12 +12,70 @@ import ( dbm "github.com/tendermint/tm-db" ) +// exposes a handler for adding items to readset, useful for iterators +type ReadsetHandler interface { + UpdateReadSet(key []byte, value []byte) +} + +type NoOpHandler struct{} + +func (NoOpHandler) UpdateReadSet(key []byte, value []byte) {} + +// exposes a handler for adding items to iterateset, to be called upon iterator close +type IterateSetHandler interface { + UpdateIterateSet(iterationTracker) +} + +type iterationTracker struct { + startKey []byte // start of the iteration range + endKey []byte // end of the iteration range + earlyStopKey []byte // key that caused early stop + iteratedKeys map[string]struct{} // TODO: is a map okay because the ordering will be enforced when we replay the iterator? + ascending bool + + writeset WriteSet + + // TODO: is it possible that terimation is affected by keys later in iteration that weren't reached? eg. number of keys affecting iteration? + // TODO: i believe to get number of keys the iteration would need to be done fully so its not a concern? + + // TODO: maybe we need to store keys served from writeset for the transaction? that way if theres OTHER keys within the writeset and the iteration range, and were written to the writeset later, we can discriminate between the groups? + // keysServedFromWriteset map[string]struct{} + + // actually its simpler to just store a copy of the writeset at the time of iterator creation +} + +func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) iterationTracker { + copyWriteset := make(WriteSet, len(writeset)) + + for key, value := range writeset { + copyWriteset[key] = value + } + + return iterationTracker{ + startKey: startKey, + endKey: endKey, + iteratedKeys: make(map[string]struct{}), + ascending: ascending, + writeset: copyWriteset, + } +} + +func (item *iterationTracker) AddKey(key []byte) { + item.iteratedKeys[string(key)] = struct{}{} +} + +func (item *iterationTracker) SetEarlyStopKey(key []byte) { + item.earlyStopKey = key +} + // Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index type VersionIndexedStore struct { mtx sync.Mutex // used for tracking reads and writes for eventual validation + persistence into multi-version store - readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) - writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + // TODO: does this need sync.Map? + readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) + writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + iterateset Iterateset // TODO: need to add iterateset here as well // dirty keys that haven't been sorted yet for iteration @@ -36,11 +94,14 @@ type VersionIndexedStore struct { } var _ types.KVStore = (*VersionIndexedStore)(nil) +var _ ReadsetHandler = (*VersionIndexedStore)(nil) +var _ IterateSetHandler = (*VersionIndexedStore)(nil) func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore { return &VersionIndexedStore{ readset: make(map[string][]byte), writeset: make(map[string][]byte), + iterateset: []iterationTracker{}, dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, @@ -97,7 +158,7 @@ func (store *VersionIndexedStore) Get(key []byte) []byte { } // if we didn't find it in the multiversion store, then we want to check the parent store + add to readset parentValue := store.parent.Get(key) - store.updateReadSet(key, parentValue) + store.UpdateReadSet(key, parentValue) return parentValue } @@ -107,7 +168,7 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV if mvsValue.IsDeleted() { value = nil } - store.updateReadSet([]byte(strKey), value) + store.UpdateReadSet([]byte(strKey), value) return value } @@ -201,40 +262,22 @@ func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iter func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator { store.mtx.Lock() defer store.mtx.Unlock() - // TODO: ideally we persist writeset keys into a sorted btree for later use - // make a set of total keys across mvkv and mvs to iterate - keysToIterate := make(map[string]struct{}) - for key := range store.writeset { - keysToIterate[key] = struct{}{} - } + // get the sorted keys from MVS // TODO: ideally we take advantage of mvs keys already being sorted - // get the multiversion store sorted keys - writesetMap := store.multiVersionStore.GetAllWritesetKeys() - for i := 0; i < store.transactionIndex; i++ { - // add all the writesets keys up until current index - for _, key := range writesetMap[i] { - keysToIterate[key] = struct{}{} - } - } // TODO: ideally merge btree and mvs keys into a single sorted btree + memDB := store.multiVersionStore.CollectIteratorItems(store.transactionIndex) - // TODO: this is horribly inefficient, fix this - sortedKeys := make([]string, len(keysToIterate)) - for key := range keysToIterate { - sortedKeys = append(sortedKeys, key) - } - sort.Strings(sortedKeys) - - memDB := dbm.NewMemDB() - for _, key := range sortedKeys { + // TODO: ideally we persist writeset keys into a sorted btree for later use + // make a set of total keys across mvkv and mvs to iterate + for key := range store.writeset { memDB.Set([]byte(key), []byte{}) } var parent, memIterator types.Iterator // make a memIterator - memIterator = store.newMemIterator(start, end, memDB, ascending) + memIterator = store.newMemIterator(start, end, memDB, ascending, store) if ascending { parent = store.parent.Iterator(start, end) @@ -242,8 +285,13 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b parent = store.parent.ReverseIterator(start, end) } + mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store) + + iterationTracker := NewIterationTracker(start, end, ascending, store.writeset) + trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store) + // mergeIterator - return NewMVSMergeIterator(parent, memIterator, ascending) + return trackedIterator } @@ -288,6 +336,8 @@ func (store *VersionIndexedStore) WriteToMultiVersionStore() { defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset) + store.multiVersionStore.SetReadset(store.transactionIndex, store.readset) + store.multiVersionStore.SetIterateset(store.transactionIndex, store.iterateset) } func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { @@ -295,12 +345,18 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset) + // TODO: do we need to write readset and iterateset in this case? I don't think so since if this is called it means we aren't doing validation } -func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { +func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { // add to readset keyStr := string(key) store.readset[keyStr] = value // add to dirty set store.dirtySet[keyStr] = struct{}{} } + +func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) { + // append to iterateset + store.iterateset = append(store.iterateset, iterationTracker) +} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index 23bcc472a..44304fd50 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -321,6 +321,12 @@ func TestIterator(t *testing.T) { // iterate over the keys - exclusive on key5 iter := vis.Iterator([]byte("000"), []byte("key5")) + + // verify domain is superset + start, end := iter.Domain() + require.Equal(t, []byte("000"), start) + require.Equal(t, []byte("key5"), end) + vals := []string{} defer iter.Close() for ; iter.Valid(); iter.Next() { @@ -354,6 +360,17 @@ func TestIterator(t *testing.T) { require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3) iter3.Close() + vis.Set([]byte("key6"), []byte("value6")) + // iterate over the keys, writeset being the last of the iteration range + iter4 := vis.Iterator([]byte("000"), []byte("key7")) + vals4 := []string{} + defer iter4.Close() + for ; iter4.Valid(); iter4.Next() { + vals4 = append(vals4, string(iter4.Value())) + } + require.Equal(t, []string{"value1", "value2", "value3", "valueNew", "value5", "value6"}, vals4) + iter4.Close() + // add an estimate to MVS mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ "key2": []byte("value1_b"), diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 08c45204b..0d16f12d6 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -8,6 +8,8 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" + occtypes "github.com/cosmos/cosmos-sdk/types/occ" + db "github.com/tendermint/tm-db" ) type MultiVersionStore interface { @@ -19,13 +21,17 @@ type MultiVersionStore interface { InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) GetAllWritesetKeys() map[int][]string + CollectIteratorItems(index int) *db.MemDB SetReadset(index int, readset ReadSet) GetReadset(index int) ReadSet - ValidateTransactionState(index int) []int + SetIterateset(index int, iterateset Iterateset) + GetIterateset(index int) Iterateset + ValidateTransactionState(index int) (bool, []int) } type WriteSet map[string][]byte type ReadSet map[string][]byte +type Iterateset []iterationTracker var _ MultiVersionStore = (*Store)(nil) @@ -37,6 +43,7 @@ type Store struct { txWritesetKeys map[int][]string // map of tx index -> writeset keys txReadSets map[int]ReadSet + txIterateSets map[int]Iterateset parentStore types.KVStore } @@ -46,6 +53,7 @@ func NewMultiVersionStore(parentStore types.KVStore) *Store { multiVersionMap: make(map[string]MultiVersionValue), txWritesetKeys: make(map[int][]string), txReadSets: make(map[int]ReadSet), + txIterateSets: make(map[int]Iterateset), parentStore: parentStore, } } @@ -212,9 +220,107 @@ func (s *Store) GetReadset(index int) ReadSet { return s.txReadSets[index] } -func (s *Store) ValidateTransactionState(index int) []int { +func (s *Store) SetIterateset(index int, iterateset Iterateset) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.txIterateSets[index] = iterateset +} + +func (s *Store) GetIterateset(index int) Iterateset { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.txIterateSets[index] +} + +// CollectIteratorItems implements MultiVersionStore. It will return a memDB containing all of the keys present in the multiversion store within the iteration range prior to (exclusive of) the index. +func (s *Store) CollectIteratorItems(index int) *db.MemDB { + sortedItems := db.NewMemDB() + + // get all writeset keys prior to index + keys := s.GetAllWritesetKeys() + for i := 0; i < index; i++ { + indexedWriteset, ok := keys[i] + if !ok { + continue + } + // TODO: do we want to exclude keys out of the range or just let the iterator handle it? + for _, key := range indexedWriteset { + // TODO: inefficient because (logn) for each key + rebalancing? maybe theres a better way to add to a tree to reduce rebalancing overhead + sortedItems.Set([]byte(key), []byte{}) + } + } + return sortedItems +} + +func (s *Store) validateIterator(index int, tracker iterationTracker) bool { + // collect items from multiversion store + sortedItems := s.CollectIteratorItems(index) + // add the iterationtracker writeset keys to the sorted items + for key := range tracker.writeset { + sortedItems.Set([]byte(key), []byte{}) + } + validChannel := make(chan bool, 1) + abortChannel := make(chan occtypes.Abort, 1) + + // listen for abort while iterating + go func(iterationTracker iterationTracker, items *db.MemDB, returnChan chan bool, abortChan chan occtypes.Abort) { + var parentIter types.Iterator + expectedKeys := iterationTracker.iteratedKeys + iter := s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, items, iterationTracker.ascending, iterationTracker.writeset, abortChan) + if iterationTracker.ascending { + parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) + } else { + parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) + } + // create a new MVSMergeiterator + mergeIterator := NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) + defer mergeIterator.Close() + for ; mergeIterator.Valid(); mergeIterator.Next() { + if len(expectedKeys) == 0 { + // if we have no more expected keys, then the iterator is invalid + returnChan <- false + return + } + key := mergeIterator.Key() + if _, ok := expectedKeys[string(key)]; !ok { + // if key isn't found + returnChan <- false + return + } + // remove from expected keys + delete(expectedKeys, string(key)) + + // if our iterator key was the early stop, then we can break + if bytes.Equal(key, iterationTracker.earlyStopKey) { + returnChan <- true + return + } + } + returnChan <- !(len(expectedKeys) > 0) + }(tracker, sortedItems, validChannel, abortChannel) + select { + case <-abortChannel: + // if we get an abort, then we know that the iterator is invalid + return false + case valid := <-validChannel: + return valid + } +} + +// TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? +func (s *Store) ValidateTransactionState(index int) (bool, []int) { defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate") conflictSet := map[int]struct{}{} + valid := true + + // TODO: can we parallelize for all iterators? + iterateset := s.GetIterateset(index) + for _, iterationTracker := range iterateset { + iteratorValid := s.validateIterator(index, iterationTracker) + valid = valid && iteratorValid + } // validate readset readset := s.GetReadset(index) @@ -229,20 +335,20 @@ func (s *Store) ValidateTransactionState(index int) []int { panic("there shouldn't be readset conflicts with parent kv store, since it shouldn't change") } } else { - // if estimate, mark as conflict index + // if estimate, mark as conflict index - but don't invalidate if latestValue.IsEstimate() { conflictSet[latestValue.Index()] = struct{}{} } else if latestValue.IsDeleted() { if value != nil { // conflict - conflictSet[latestValue.Index()] = struct{}{} + // TODO: would we want to return early? + valid = false } } else if !bytes.Equal(latestValue.Value(), value) { - conflictSet[latestValue.Index()] = struct{}{} + valid = false } } } - // TODO: validate iterateset // convert conflictset into sorted indices conflictIndices := make([]int, 0, len(conflictSet)) @@ -251,7 +357,7 @@ func (s *Store) ValidateTransactionState(index int) []int { } sort.Ints(conflictIndices) - return conflictIndices + return valid, conflictIndices } func (s *Store) WriteLatestToStore() { diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index bb56d1e71..84e9f77ac 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -1,10 +1,12 @@ package multiversion_test import ( + "bytes" "testing" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/multiversion" + "github.com/cosmos/cosmos-sdk/types/occ" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" ) @@ -191,11 +193,13 @@ func TestMultiVersionStoreValidateState(t *testing.T) { mvs.SetReadset(5, readset) // assert no readset is valid - conflicts := mvs.ValidateTransactionState(4) + valid, conflicts := mvs.ValidateTransactionState(4) + require.True(t, valid) require.Empty(t, conflicts) // assert readset index 5 is valid - conflicts = mvs.ValidateTransactionState(5) + valid, conflicts = mvs.ValidateTransactionState(5) + require.True(t, valid) require.Empty(t, conflicts) // introduce conflict @@ -203,29 +207,395 @@ func TestMultiVersionStoreValidateState(t *testing.T) { "key3": []byte("value6"), }) - // expect index 2 to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2}, conflicts) + // expect failure with empty conflicts + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) // add a conflict due to deletion mvs.SetWriteset(3, 1, map[string][]byte{ "key1": nil, }) - // expect indices 2 and 3 to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2, 3}, conflicts) + // expect failure with empty conflicts + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) // add a conflict due to estimate mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ "key2": []byte("test"), }) - // expect indices 2, 3, and 4to be returned - conflicts = mvs.ValidateTransactionState(5) - require.Equal(t, []int{2, 3, 4}, conflicts) + // expect index 4 to be returned + valid, conflicts = mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Equal(t, []int{4}, conflicts) // assert panic for parent store mismatch parentKVStore.Set([]byte("key5"), []byte("value6")) require.Panics(t, func() { mvs.ValidateTransactionState(5) }) } + +func TestMVSValidationWithOnlyEstimate(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // add a conflict due to estimate + mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ + "key2": []byte("test"), + }) + + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Equal(t, []int{4}, conflicts) + +} + +func TestMVSIteratorValidation(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + // test basic iteration + iter := vis.ReverseIterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithEstimate(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + iter := vis.Iterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() + + writeset2 := make(multiversion.WriteSet) + writeset2["key2"] = []byte("value2") + mvs.SetEstimatedWriteset(2, 2, writeset2) + + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Equal(t, []int{2}, conflicts) +} + +func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + iter := vis.Iterator([]byte("key1"), []byte("key6")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() + + // deletion of 2 and introduction of 3 + writeset2 := make(multiversion.WriteSet) + writeset2["key2"] = nil + writeset2["key3"] = []byte("valueX") + mvs.SetWriteset(2, 2, writeset2) + + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read value + iter.Value() + } + iter.Close() + vis.WriteToMultiVersionStore() + + // addition of key6 + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(2, 2, writeset2) + + // should be invalid + valid, conflicts := mvs.ValidateTransactionState(5) + require.False(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithWritesetValues(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + // set a key BEFORE iteration occurred + vis.Set([]byte("key6"), []byte("value6")) + + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() + vis.WriteToMultiVersionStore() + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationWithWritesetValuesSetAfterIteration(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // no key6 because the iteration was performed BEFORE the write + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() + + // write key 6 AFTER iterator went + vis.Set([]byte("key6"), []byte("value6")) + vis.WriteToMultiVersionStore() + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationReverse(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // set a key BEFORE iteration occurred + vis.Set([]byte("key6"), []byte("value6")) + + iter := vis.ReverseIterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + } + iter.Close() + vis.WriteToMultiVersionStore() + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +func TestMVSIteratorValidationEarlyStop(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + mvs.SetReadset(5, readset) + + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read the value and see if we want to break + if bytes.Equal(iter.Key(), []byte("key4")) { + break + } + } + iter.Close() + vis.WriteToMultiVersionStore() + + // removal of key5 - but irrelevant because of early stop + writeset2 := make(multiversion.WriteSet) + writeset2["key5"] = nil + mvs.SetWriteset(2, 2, writeset2) + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} + +// TODO: what about early stop with a new key added in the range? - especially if its the last key that we stopped at? +func TestMVSIteratorValidationEarlyStopAtEndOfRange(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 5, 1, make(chan occ.Abort)) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + // test basic iteration + iter := vis.Iterator([]byte("key1"), []byte("key7")) + for ; iter.Valid(); iter.Next() { + // read the value and see if we want to break + if bytes.Equal(iter.Key(), []byte("key5")) { + break + } + } + iter.Close() + vis.WriteToMultiVersionStore() + + // add key6 + writeset2 := make(multiversion.WriteSet) + writeset2["key6"] = []byte("value6") + mvs.SetWriteset(2, 2, writeset2) + + // should be valid + valid, conflicts := mvs.ValidateTransactionState(5) + require.True(t, valid) + require.Empty(t, conflicts) +} diff --git a/store/multiversion/trackediterator.go b/store/multiversion/trackediterator.go new file mode 100644 index 000000000..361d848cb --- /dev/null +++ b/store/multiversion/trackediterator.go @@ -0,0 +1,57 @@ +package multiversion + +import "github.com/cosmos/cosmos-sdk/store/types" + +// tracked iterator is a wrapper around an existing iterator to track the iterator progress and monitor which keys are iterated. +type trackedIterator struct { + types.Iterator + + iterateset iterationTracker + IterateSetHandler +} + +// TODO: test + +func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { + return &trackedIterator{ + Iterator: iter, + iterateset: iterationTracker, + IterateSetHandler: iterateSetHandler, + } +} + +// Close calls first updates the iterateset from the iterator, and then calls iterator.Close() +func (ti *trackedIterator) Close() error { + // TODO: if there are more keys to the iterator, then we consider it early stopped? + if ti.Iterator.Valid() { + // TODO: test whether reaching end of iteration range means valid is true or false + ti.iterateset.SetEarlyStopKey(ti.Iterator.Key()) + } + // Update iterate set + ti.IterateSetHandler.UpdateIterateSet(ti.iterateset) + return ti.Iterator.Close() +} + +// Key calls the iterator.Key() and adds the key to the iterateset, then returns the key from the iterator +func (ti *trackedIterator) Key() []byte { + key := ti.Iterator.Key() + // add key to the tracker + ti.iterateset.AddKey(key) + return key +} + +// Value calls the iterator.Key() and adds the key to the iterateset, then returns the value from the iterator +func (ti *trackedIterator) Value() []byte { + key := ti.Iterator.Key() + // add key to the tracker + ti.iterateset.AddKey(key) + return ti.Iterator.Value() +} + +func (ti *trackedIterator) Next() { + // add current key to the tracker + key := ti.Iterator.Key() + ti.iterateset.AddKey(key) + // call next + ti.Iterator.Next() +}