From b3d359f3f76fcb6085260645333bc7d41f7da266 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Fri, 6 Oct 2023 13:05:43 -0500 Subject: [PATCH 1/6] Add MVS write latest and update writesets --- store/multiversion/store.go | 90 ++++++++++++++++++++++++++++++-- store/multiversion/store_test.go | 71 +++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 3 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index b52c6af1a..d0c75dc14 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -1,7 +1,10 @@ package multiversion import ( + "sort" "sync" + + "github.com/cosmos/cosmos-sdk/store/types" ) type MultiVersionStore interface { @@ -11,6 +14,10 @@ type MultiVersionStore interface { SetEstimate(index int, incarnation int, key []byte) Delete(index int, incarnation int, key []byte) Has(index int, key []byte) bool + WriteLatestToStore(parentStore types.KVStore) + SetWriteset(index int, incarnation int, writeset map[string][]byte) + InvalidateWriteset(index int, incarnation int) + SetEstimatedWriteset(index int, incarnation int, writeset map[string][]byte) // TODO: do we want to add helper functions for validations with readsets / applying writesets ? } @@ -18,14 +25,15 @@ type Store struct { mtx sync.RWMutex // map that stores the key -> MultiVersionValue mapping for accessing from a given key multiVersionMap map[string]MultiVersionValue - // TODO: do we need to add something here to persist readsets for later validation - // TODO: we need to support iterators as well similar to how cachekv does it - // TODO: do we need secondary indexing on index -> keys - this way if we need to abort we can replace those keys with ESTIMATE values? - maybe this just means storing writeset + // TODO: do we need to support iterators as well similar to how cachekv does it - yes + + txWritesets map[int]map[string][]byte // map of tx index -> writeset } func NewMultiVersionStore() *Store { return &Store{ multiVersionMap: make(map[string]MultiVersionValue), + txWritesets: make(map[int]map[string][]byte), } } @@ -97,6 +105,49 @@ func (s *Store) Set(index int, incarnation int, key []byte, value []byte) { s.multiVersionMap[keyString].Set(index, incarnation, value) } +// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store. +func (s *Store) SetWriteset(index int, incarnation int, writeset map[string][]byte) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.txWritesets[index] = writeset + for key, value := range writeset { + s.tryInitMultiVersionItem(key) + if value == nil { // TODO: safe assumption? + // delete if nil value + s.multiVersionMap[key].Delete(index, incarnation) + } else { + s.multiVersionMap[key].Set(index, incarnation, value) + } + } +} + +// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs +func (s *Store) InvalidateWriteset(index int, incarnation int) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if _, ok := s.txWritesets[index]; ok { + for key := range s.txWritesets[index] { + // invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level + s.tryInitMultiVersionItem(key) // this SHOULD no-op because we're invalidating existing keys + s.multiVersionMap[key].SetEstimate(index, incarnation) + } + s.txWritesets[index] = nil + } +} + +// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating +func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset map[string][]byte) { + s.mtx.Lock() + defer s.mtx.Unlock() + + for key := range writeset { + s.tryInitMultiVersionItem(key) + s.multiVersionMap[key].SetEstimate(index, incarnation) + } +} + // SetEstimate implements MultiVersionStore. func (s *Store) SetEstimate(index int, incarnation int, key []byte) { s.mtx.Lock() @@ -118,3 +169,36 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { } var _ MultiVersionStore = (*Store)(nil) + +func (s *Store) WriteLatestToStore(parentStore types.KVStore) { + // TODO: this shouldn't affect gas because the gas meter wrap happens further within transaction handling - but lets confirm + s.mtx.Lock() + defer s.mtx.Unlock() + + // sort the keys + keys := make([]string, 0, len(s.multiVersionMap)) + for key := range s.multiVersionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + mvValue, _ := s.multiVersionMap[key].GetLatest() + // we shouldn't have any ESTIMATE values when performing the write, because all transactions should be complete by this point + if mvValue.IsEstimate() { + panic("should not have any estimate values when writing to parent store") + } + // if the value is deleted, then delete it from the parent store + if mvValue.IsDeleted() { + // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot + // be sure if the underlying store might do a save with the byteslice or + // not. Once we get confirmation that .Delete is guaranteed not to + // save the byteslice, then we can assume only a read-only copy is sufficient. + parentStore.Delete([]byte(key)) + continue + } + if mvValue.Value() != nil { + parentStore.Set([]byte(key), mvValue.Value()) + } + } +} diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 91465c435..7b25dcfb0 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -3,8 +3,10 @@ package multiversion_test import ( "testing" + "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/multiversion" "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" ) func TestMultiVersionStore(t *testing.T) { @@ -52,3 +54,72 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) { require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1"))) require.False(t, store.Has(0, []byte("key1"))) } + +func TestMultiVersionStoreWriteToParent(t *testing.T) { + // initialize cachekv store + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore() + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + + mvs.Set(1, 1, []byte("key1"), []byte("value1")) + mvs.Set(2, 1, []byte("key1"), []byte("value2")) + mvs.Set(3, 1, []byte("key2"), []byte("value3")) + mvs.Delete(1, 1, []byte("key3")) + mvs.Delete(1, 1, []byte("key4")) + + mvs.WriteLatestToStore(parentKVStore) + + // assert state in parent store + require.Equal(t, []byte("value2"), parentKVStore.Get([]byte("key1"))) + require.Equal(t, []byte("value3"), parentKVStore.Get([]byte("key2"))) + require.False(t, parentKVStore.Has([]byte("key3"))) + require.False(t, parentKVStore.Has([]byte("key4"))) + + // verify panic if mvs contains ESTIMATE + mvs.SetEstimate(1, 2, []byte("key5")) + + require.Panics(t, func() { + mvs.WriteLatestToStore(parentKVStore) + }) +} + +func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { + mvs := multiversion.NewMultiVersionStore() + + writeset := make(map[string][]byte) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + + mvs.SetWriteset(1, 2, writeset) + require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value()) + require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value()) + require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted()) + + writeset2 := make(map[string][]byte) + writeset2["key1"] = []byte("value3") + + mvs.SetWriteset(2, 1, writeset2) + require.Equal(t, []byte("value3"), mvs.GetLatest([]byte("key1")).Value()) + + // invalidate writeset1 + mvs.InvalidateWriteset(1, 2) + + // verify estimates + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key1")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key2")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key3")).IsEstimate()) + + // third writeset + writeset3 := make(map[string][]byte) + writeset3["key4"] = []byte("foo") + writeset3["key5"] = nil + + // write the writeset directly as estimate + mvs.SetEstimatedWriteset(3, 1, writeset3) + + require.True(t, mvs.GetLatest([]byte("key4")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key5")).IsEstimate()) +} From 62b20e118ac4cb198c39e4c5c7b4a8fdaad25f58 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Fri, 6 Oct 2023 13:06:21 -0500 Subject: [PATCH 2/6] remove TODOs --- store/multiversion/store.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index d0c75dc14..f9b4f3856 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -18,7 +18,6 @@ type MultiVersionStore interface { SetWriteset(index int, incarnation int, writeset map[string][]byte) InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset map[string][]byte) - // TODO: do we want to add helper functions for validations with readsets / applying writesets ? } type Store struct { @@ -113,7 +112,7 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset map[string][]by s.txWritesets[index] = writeset for key, value := range writeset { s.tryInitMultiVersionItem(key) - if value == nil { // TODO: safe assumption? + if value == nil { // delete if nil value s.multiVersionMap[key].Delete(index, incarnation) } else { @@ -171,7 +170,6 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { var _ MultiVersionStore = (*Store)(nil) func (s *Store) WriteLatestToStore(parentStore types.KVStore) { - // TODO: this shouldn't affect gas because the gas meter wrap happens further within transaction handling - but lets confirm s.mtx.Lock() defer s.mtx.Unlock() From 5492d4fb8f1de7000034aafd29c9fba8feb8087e Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Fri, 6 Oct 2023 15:52:44 -0500 Subject: [PATCH 3/6] Allow removal of old writesets --- store/multiversion/data_structures.go | 34 ++++++- store/multiversion/data_structures_test.go | 28 ++++++ store/multiversion/store.go | 108 ++++++++++++++++++--- store/multiversion/store_test.go | 18 +++- 4 files changed, 169 insertions(+), 19 deletions(-) diff --git a/store/multiversion/data_structures.go b/store/multiversion/data_structures.go index c4ca7b995..cba10d0f4 100644 --- a/store/multiversion/data_structures.go +++ b/store/multiversion/data_structures.go @@ -14,10 +14,12 @@ const ( type MultiVersionValue interface { GetLatest() (value MultiVersionValueItem, found bool) + GetLatestNonEstimate() (value MultiVersionValueItem, found bool) GetLatestBeforeIndex(index int) (value MultiVersionValueItem, found bool) Set(index int, incarnation int, value []byte) SetEstimate(index int, incarnation int) Delete(index int, incarnation int) + Remove(index int) } type MultiVersionValueItem interface { @@ -42,8 +44,6 @@ func NewMultiVersionItem() *multiVersionItem { } // GetLatest returns the latest written value to the btree, and returns a boolean indicating whether it was found. -// -// A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { item.mtx.RLock() defer item.mtx.RUnlock() @@ -56,6 +56,29 @@ func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { return valueItem, true } +// GetLatestNonEstimate returns the latest written value that isn't an ESTIMATE and returns a boolean indicating whether it was found. +// This can be used when we want to write finalized values, since ESTIMATEs can be considered to be irrelevant at that point +func (item *multiVersionItem) GetLatestNonEstimate() (MultiVersionValueItem, bool) { + item.mtx.RLock() + defer item.mtx.RUnlock() + + var vItem *valueItem + var found bool + item.valueTree.Descend(func(bTreeItem btree.Item) bool { + // only return if non-estimate + item := bTreeItem.(*valueItem) + if item.IsEstimate() { + // if estimate, continue + return true + } + // else we want to return + vItem = item + found = true + return false + }) + return vItem, found +} + // GetLatest returns the latest written value to the btree prior to the index passed in, and returns a boolean indicating whether it was found. // // A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. @@ -95,6 +118,13 @@ func (item *multiVersionItem) Delete(index int, incarnation int) { item.valueTree.ReplaceOrInsert(deletedItem) } +func (item *multiVersionItem) Remove(index int) { + item.mtx.Lock() + defer item.mtx.Unlock() + + item.valueTree.Delete(&valueItem{index: index}) +} + func (item *multiVersionItem) SetEstimate(index int, incarnation int) { item.mtx.Lock() defer item.mtx.Unlock() diff --git a/store/multiversion/data_structures_test.go b/store/multiversion/data_structures_test.go index 31696d366..fccc26a8b 100644 --- a/store/multiversion/data_structures_test.go +++ b/store/multiversion/data_structures_test.go @@ -198,3 +198,31 @@ func TestMultiversionItemEstimate(t *testing.T) { require.True(t, found) require.Equal(t, one, value.Value()) } + +func TestMultiversionItemRemove(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.Set(1, 0, []byte("one")) + mvItem.Set(2, 0, []byte("two")) + + mvItem.Remove(2) + value, found := mvItem.GetLatest() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) +} + +func TestMultiversionItemGetLatestNonEstimate(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.SetEstimate(3, 0) + + value, found := mvItem.GetLatestNonEstimate() + require.False(t, found) + require.Nil(t, value) + + mvItem.Set(1, 0, []byte("one")) + value, found = mvItem.GetLatestNonEstimate() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) + +} diff --git a/store/multiversion/store.go b/store/multiversion/store.go index f9b4f3856..a6ebfde50 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -14,25 +14,31 @@ type MultiVersionStore interface { SetEstimate(index int, incarnation int, key []byte) Delete(index int, incarnation int, key []byte) Has(index int, key []byte) bool + Iterator(index int, start, end []byte) types.Iterator + ReverseIterator(index int, start, end []byte) types.Iterator WriteLatestToStore(parentStore types.KVStore) - SetWriteset(index int, incarnation int, writeset map[string][]byte) + SetWriteset(index int, incarnation int, writeset WriteSet) InvalidateWriteset(index int, incarnation int) - SetEstimatedWriteset(index int, incarnation int, writeset map[string][]byte) + SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) } +type WriteSet map[string][]byte + +var _ MultiVersionStore = (*Store)(nil) + type Store struct { mtx sync.RWMutex // map that stores the key -> MultiVersionValue mapping for accessing from a given key multiVersionMap map[string]MultiVersionValue // TODO: do we need to support iterators as well similar to how cachekv does it - yes - txWritesets map[int]map[string][]byte // map of tx index -> writeset + txWritesetKeys map[int][]string // map of tx index -> writeset keys } func NewMultiVersionStore() *Store { return &Store{ multiVersionMap: make(map[string]MultiVersionValue), - txWritesets: make(map[int]map[string][]byte), + txWritesetKeys: make(map[int][]string), } } @@ -104,13 +110,42 @@ func (s *Store) Set(index int, incarnation int, key []byte, value []byte) { s.multiVersionMap[keyString].Set(index, incarnation, value) } +func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { + writeset := make(map[string][]byte) + if newWriteSet != nil { + // if non-nil writeset passed in, we can use that to optimize removals + writeset = newWriteSet + } + // if there is already a writeset existing, we should remove that fully + if keys, ok := s.txWritesetKeys[index]; ok { + // we need to delete all of the keys in the writeset from the multiversion store + for _, key := range keys { + // small optimization to check if the new writeset is going to write this key, if so, we can leave it behind + if _, ok := writeset[key]; ok { + // we don't need to remove this key because it will be overwritten anyways - saves the operation of removing + rebalancing underlying btree + continue + } + // remove from the appropriate item if present in multiVersionMap + if val, ok := s.multiVersionMap[key]; ok { + val.Remove(index) + } + } + } + // unset the writesetKeys for this index + s.txWritesetKeys[index] = nil +} + // SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store. -func (s *Store) SetWriteset(index int, incarnation int, writeset map[string][]byte) { +func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) { s.mtx.Lock() defer s.mtx.Unlock() - s.txWritesets[index] = writeset + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) for key, value := range writeset { + writeSetKeys = append(writeSetKeys, key) s.tryInitMultiVersionItem(key) if value == nil { // delete if nil value @@ -119,6 +154,8 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset map[string][]by s.multiVersionMap[key].Set(index, incarnation, value) } } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys } // InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs @@ -126,25 +163,33 @@ func (s *Store) InvalidateWriteset(index int, incarnation int) { s.mtx.Lock() defer s.mtx.Unlock() - if _, ok := s.txWritesets[index]; ok { - for key := range s.txWritesets[index] { + if keys, ok := s.txWritesetKeys[index]; ok { + for _, key := range keys { // invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level s.tryInitMultiVersionItem(key) // this SHOULD no-op because we're invalidating existing keys s.multiVersionMap[key].SetEstimate(index, incarnation) } - s.txWritesets[index] = nil } + // we leave the writeset in place because we'll need it for key removal later if/when we replace with a new writeset } // SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating -func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset map[string][]byte) { +func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) { s.mtx.Lock() defer s.mtx.Unlock() + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) + // still need to save the writeset so we can remove the elements later: for key := range writeset { + writeSetKeys = append(writeSetKeys, key) s.tryInitMultiVersionItem(key) s.multiVersionMap[key].SetEstimate(index, incarnation) } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys } // SetEstimate implements MultiVersionStore. @@ -167,7 +212,42 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { s.multiVersionMap[keyString].Delete(index, incarnation) } -var _ MultiVersionStore = (*Store)(nil) +// Iterator implements MultiVersionStore. +func (store *Store) Iterator(index int, start, end []byte) types.Iterator { + // return store.iterator(index, start, end, true) + panic("unimplemented") +} + +// ReverseIterator implements MultiVersionStore. +func (store *Store) ReverseIterator(index int, start, end []byte) types.Iterator { + // return store.iterator(index, start, end, false) + panic("unimplemented") +} + +// func (store *Store) iterator(index int, start, end []byte, ascending bool) types.Iterator { +// store.mtx.Lock() +// defer store.mtx.Unlock() + +// var parent, cache types.Iterator + +// if ascending { +// parent = store.parent.Iterator(start, end) +// } else { +// parent = store.parent.ReverseIterator(start, end) +// } +// defer func() { +// if err := recover(); err != nil { +// // close out parent iterator, then reraise panic +// if parent != nil { +// parent.Close() +// } +// panic(err) +// } +// }() +// store.dirtyItems(start, end) +// cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending, store.eventManager, store.storeKey) +// return NewCacheMergeIterator(parent, cache, ascending, store.eventManager, store.storeKey) +// } func (s *Store) WriteLatestToStore(parentStore types.KVStore) { s.mtx.Lock() @@ -181,7 +261,11 @@ func (s *Store) WriteLatestToStore(parentStore types.KVStore) { sort.Strings(keys) for _, key := range keys { - mvValue, _ := s.multiVersionMap[key].GetLatest() + mvValue, found := s.multiVersionMap[key].GetLatestNonEstimate() + if !found { + // this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip + continue + } // we shouldn't have any ESTIMATE values when performing the write, because all transactions should be complete by this point if mvValue.IsEstimate() { panic("should not have any estimate values when writing to parent store") diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 7b25dcfb0..4bcddd9a9 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -77,12 +77,10 @@ func TestMultiVersionStoreWriteToParent(t *testing.T) { require.False(t, parentKVStore.Has([]byte("key3"))) require.False(t, parentKVStore.Has([]byte("key4"))) - // verify panic if mvs contains ESTIMATE + // verify no-op if mvs contains ESTIMATE mvs.SetEstimate(1, 2, []byte("key5")) - - require.Panics(t, func() { - mvs.WriteLatestToStore(parentKVStore) - }) + mvs.WriteLatestToStore(parentKVStore) + require.False(t, parentKVStore.Has([]byte("key5"))) } func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { @@ -122,4 +120,14 @@ func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { require.True(t, mvs.GetLatest([]byte("key4")).IsEstimate()) require.True(t, mvs.GetLatest([]byte("key5")).IsEstimate()) + + // try replacing writeset1 to verify old keys removed + writeset1_b := make(map[string][]byte) + writeset1_b["key1"] = []byte("value4") + + mvs.SetWriteset(1, 2, writeset1_b) + require.Equal(t, []byte("value4"), mvs.GetLatestBeforeIndex(2, []byte("key1")).Value()) + require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key2"))) + require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key3"))) + } From 1978bc7e921f579f1bb33e6b631f780f4b48cb36 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Fri, 6 Oct 2023 16:47:38 -0500 Subject: [PATCH 4/6] update tests and add GetWritesetKeys --- store/multiversion/store.go | 57 ++++++++------------------------ store/multiversion/store_test.go | 11 +++++- 2 files changed, 23 insertions(+), 45 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index a6ebfde50..6ce2ebb55 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -10,16 +10,15 @@ import ( type MultiVersionStore interface { GetLatest(key []byte) (value MultiVersionValueItem) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) - Set(index int, incarnation int, key []byte, value []byte) - SetEstimate(index int, incarnation int, key []byte) - Delete(index int, incarnation int, key []byte) + Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets + SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets + Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets Has(index int, key []byte) bool - Iterator(index int, start, end []byte) types.Iterator - ReverseIterator(index int, start, end []byte) types.Iterator WriteLatestToStore(parentStore types.KVStore) SetWriteset(index int, incarnation int, writeset WriteSet) InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) + GetWritesetKeys() map[int][]string } type WriteSet map[string][]byte @@ -54,7 +53,7 @@ func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) { } val, found := s.multiVersionMap[keyString].GetLatest() if !found { - return nil // this shouldn't be possible + return nil // this is possible IF there is are writeset that are then removed for that key } return val } @@ -192,6 +191,13 @@ func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteS s.txWritesetKeys[index] = writeSetKeys } +// GetWritesetKeys implements MultiVersionStore. +func (s *Store) GetWritesetKeys() map[int][]string { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.txWritesetKeys +} + // SetEstimate implements MultiVersionStore. func (s *Store) SetEstimate(index int, incarnation int, key []byte) { s.mtx.Lock() @@ -212,43 +218,6 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { s.multiVersionMap[keyString].Delete(index, incarnation) } -// Iterator implements MultiVersionStore. -func (store *Store) Iterator(index int, start, end []byte) types.Iterator { - // return store.iterator(index, start, end, true) - panic("unimplemented") -} - -// ReverseIterator implements MultiVersionStore. -func (store *Store) ReverseIterator(index int, start, end []byte) types.Iterator { - // return store.iterator(index, start, end, false) - panic("unimplemented") -} - -// func (store *Store) iterator(index int, start, end []byte, ascending bool) types.Iterator { -// store.mtx.Lock() -// defer store.mtx.Unlock() - -// var parent, cache types.Iterator - -// if ascending { -// parent = store.parent.Iterator(start, end) -// } else { -// parent = store.parent.ReverseIterator(start, end) -// } -// defer func() { -// if err := recover(); err != nil { -// // close out parent iterator, then reraise panic -// if parent != nil { -// parent.Close() -// } -// panic(err) -// } -// }() -// store.dirtyItems(start, end) -// cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending, store.eventManager, store.storeKey) -// return NewCacheMergeIterator(parent, cache, ascending, store.eventManager, store.storeKey) -// } - func (s *Store) WriteLatestToStore(parentStore types.KVStore) { s.mtx.Lock() defer s.mtx.Unlock() @@ -266,7 +235,7 @@ func (s *Store) WriteLatestToStore(parentStore types.KVStore) { // this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip continue } - // we shouldn't have any ESTIMATE values when performing the write, because all transactions should be complete by this point + // we shouldn't have any ESTIMATE values when performing the write, because we read the latest non-estimate values only if mvValue.IsEstimate() { panic("should not have any estimate values when writing to parent store") } diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 4bcddd9a9..776fa80db 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -128,6 +128,15 @@ func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { mvs.SetWriteset(1, 2, writeset1_b) require.Equal(t, []byte("value4"), mvs.GetLatestBeforeIndex(2, []byte("key1")).Value()) require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key2"))) - require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key3"))) + // verify that GetLatest for key3 returns nil - because of removal from writeset + require.Nil(t, mvs.GetLatest([]byte("key3"))) + + // verify output for GetWritesetKeys + writesetKeys := mvs.GetWritesetKeys() + // we have 3 writesets + require.Equal(t, 3, len(writesetKeys)) + require.Equal(t, []string{"key1"}, writesetKeys[1]) + require.Equal(t, []string{"key1"}, writesetKeys[2]) + require.Equal(t, []string{"key4", "key5"}, writesetKeys[3]) } From a1a21495616eb736d979a90d6e8a5d4853cc3528 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Fri, 6 Oct 2023 16:48:06 -0500 Subject: [PATCH 5/6] rename function --- store/multiversion/store.go | 4 ++-- store/multiversion/store_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 6ce2ebb55..76af0fbea 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -18,7 +18,7 @@ type MultiVersionStore interface { SetWriteset(index int, incarnation int, writeset WriteSet) InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) - GetWritesetKeys() map[int][]string + GetAllWritesetKeys() map[int][]string } type WriteSet map[string][]byte @@ -192,7 +192,7 @@ func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteS } // GetWritesetKeys implements MultiVersionStore. -func (s *Store) GetWritesetKeys() map[int][]string { +func (s *Store) GetAllWritesetKeys() map[int][]string { s.mtx.RLock() defer s.mtx.RUnlock() return s.txWritesetKeys diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 776fa80db..732a5a6ba 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -131,8 +131,8 @@ func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { // verify that GetLatest for key3 returns nil - because of removal from writeset require.Nil(t, mvs.GetLatest([]byte("key3"))) - // verify output for GetWritesetKeys - writesetKeys := mvs.GetWritesetKeys() + // verify output for GetAllWritesetKeys + writesetKeys := mvs.GetAllWritesetKeys() // we have 3 writesets require.Equal(t, 3, len(writesetKeys)) require.Equal(t, []string{"key1"}, writesetKeys[1]) From 17f058ec2fe3d4a39c6df07d12e63ebf7f2ba667 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Mon, 9 Oct 2023 09:36:20 -0500 Subject: [PATCH 6/6] use map delete instead of nil assigment --- store/multiversion/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 76af0fbea..3aa4800f3 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -131,7 +131,7 @@ func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { } } // unset the writesetKeys for this index - s.txWritesetKeys[index] = nil + delete(s.txWritesetKeys, index) } // SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store.