From 1e602463a634b7a94786a945ac1b8dc581ff29f8 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 10 Oct 2023 08:36:35 -0500 Subject: [PATCH] [occ] Occ multiversion store (#326) ## Describe your changes and provide context This adds in functionality to write the latest multiversion values to another store (to be used for writing to parent after transaction execution), and also adds in helpers for writeset management such as setting, invalidating, and setting estimated writesets. ## Testing performed to validate your change Unit testing for added functionality --- store/multiversion/data_structures.go | 34 ++++- store/multiversion/data_structures_test.go | 28 ++++ store/multiversion/store.go | 153 +++++++++++++++++++-- store/multiversion/store_test.go | 88 ++++++++++++ 4 files changed, 292 insertions(+), 11 deletions(-) diff --git a/store/multiversion/data_structures.go b/store/multiversion/data_structures.go index c4ca7b995..cba10d0f4 100644 --- a/store/multiversion/data_structures.go +++ b/store/multiversion/data_structures.go @@ -14,10 +14,12 @@ const ( type MultiVersionValue interface { GetLatest() (value MultiVersionValueItem, found bool) + GetLatestNonEstimate() (value MultiVersionValueItem, found bool) GetLatestBeforeIndex(index int) (value MultiVersionValueItem, found bool) Set(index int, incarnation int, value []byte) SetEstimate(index int, incarnation int) Delete(index int, incarnation int) + Remove(index int) } type MultiVersionValueItem interface { @@ -42,8 +44,6 @@ func NewMultiVersionItem() *multiVersionItem { } // GetLatest returns the latest written value to the btree, and returns a boolean indicating whether it was found. -// -// A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { item.mtx.RLock() defer item.mtx.RUnlock() @@ -56,6 +56,29 @@ func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { return valueItem, true } +// GetLatestNonEstimate returns the latest written value that isn't an ESTIMATE and returns a boolean indicating whether it was found. +// This can be used when we want to write finalized values, since ESTIMATEs can be considered to be irrelevant at that point +func (item *multiVersionItem) GetLatestNonEstimate() (MultiVersionValueItem, bool) { + item.mtx.RLock() + defer item.mtx.RUnlock() + + var vItem *valueItem + var found bool + item.valueTree.Descend(func(bTreeItem btree.Item) bool { + // only return if non-estimate + item := bTreeItem.(*valueItem) + if item.IsEstimate() { + // if estimate, continue + return true + } + // else we want to return + vItem = item + found = true + return false + }) + return vItem, found +} + // GetLatest returns the latest written value to the btree prior to the index passed in, and returns a boolean indicating whether it was found. // // A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. @@ -95,6 +118,13 @@ func (item *multiVersionItem) Delete(index int, incarnation int) { item.valueTree.ReplaceOrInsert(deletedItem) } +func (item *multiVersionItem) Remove(index int) { + item.mtx.Lock() + defer item.mtx.Unlock() + + item.valueTree.Delete(&valueItem{index: index}) +} + func (item *multiVersionItem) SetEstimate(index int, incarnation int) { item.mtx.Lock() defer item.mtx.Unlock() diff --git a/store/multiversion/data_structures_test.go b/store/multiversion/data_structures_test.go index 31696d366..fccc26a8b 100644 --- a/store/multiversion/data_structures_test.go +++ b/store/multiversion/data_structures_test.go @@ -198,3 +198,31 @@ func TestMultiversionItemEstimate(t *testing.T) { require.True(t, found) require.Equal(t, one, value.Value()) } + +func TestMultiversionItemRemove(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.Set(1, 0, []byte("one")) + mvItem.Set(2, 0, []byte("two")) + + mvItem.Remove(2) + value, found := mvItem.GetLatest() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) +} + +func TestMultiversionItemGetLatestNonEstimate(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.SetEstimate(3, 0) + + value, found := mvItem.GetLatestNonEstimate() + require.False(t, found) + require.Nil(t, value) + + mvItem.Set(1, 0, []byte("one")) + value, found = mvItem.GetLatestNonEstimate() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) + +} diff --git a/store/multiversion/store.go b/store/multiversion/store.go index b52c6af1a..3aa4800f3 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -1,31 +1,43 @@ package multiversion import ( + "sort" "sync" + + "github.com/cosmos/cosmos-sdk/store/types" ) type MultiVersionStore interface { GetLatest(key []byte) (value MultiVersionValueItem) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) - Set(index int, incarnation int, key []byte, value []byte) - SetEstimate(index int, incarnation int, key []byte) - Delete(index int, incarnation int, key []byte) + Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets + SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets + Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets Has(index int, key []byte) bool - // TODO: do we want to add helper functions for validations with readsets / applying writesets ? + WriteLatestToStore(parentStore types.KVStore) + SetWriteset(index int, incarnation int, writeset WriteSet) + InvalidateWriteset(index int, incarnation int) + SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) + GetAllWritesetKeys() map[int][]string } +type WriteSet map[string][]byte + +var _ MultiVersionStore = (*Store)(nil) + type Store struct { mtx sync.RWMutex // map that stores the key -> MultiVersionValue mapping for accessing from a given key multiVersionMap map[string]MultiVersionValue - // TODO: do we need to add something here to persist readsets for later validation - // TODO: we need to support iterators as well similar to how cachekv does it - // TODO: do we need secondary indexing on index -> keys - this way if we need to abort we can replace those keys with ESTIMATE values? - maybe this just means storing writeset + // TODO: do we need to support iterators as well similar to how cachekv does it - yes + + txWritesetKeys map[int][]string // map of tx index -> writeset keys } func NewMultiVersionStore() *Store { return &Store{ multiVersionMap: make(map[string]MultiVersionValue), + txWritesetKeys: make(map[int][]string), } } @@ -41,7 +53,7 @@ func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) { } val, found := s.multiVersionMap[keyString].GetLatest() if !found { - return nil // this shouldn't be possible + return nil // this is possible IF there is are writeset that are then removed for that key } return val } @@ -97,6 +109,95 @@ func (s *Store) Set(index int, incarnation int, key []byte, value []byte) { s.multiVersionMap[keyString].Set(index, incarnation, value) } +func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { + writeset := make(map[string][]byte) + if newWriteSet != nil { + // if non-nil writeset passed in, we can use that to optimize removals + writeset = newWriteSet + } + // if there is already a writeset existing, we should remove that fully + if keys, ok := s.txWritesetKeys[index]; ok { + // we need to delete all of the keys in the writeset from the multiversion store + for _, key := range keys { + // small optimization to check if the new writeset is going to write this key, if so, we can leave it behind + if _, ok := writeset[key]; ok { + // we don't need to remove this key because it will be overwritten anyways - saves the operation of removing + rebalancing underlying btree + continue + } + // remove from the appropriate item if present in multiVersionMap + if val, ok := s.multiVersionMap[key]; ok { + val.Remove(index) + } + } + } + // unset the writesetKeys for this index + delete(s.txWritesetKeys, index) +} + +// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store. +func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) + for key, value := range writeset { + writeSetKeys = append(writeSetKeys, key) + s.tryInitMultiVersionItem(key) + if value == nil { + // delete if nil value + s.multiVersionMap[key].Delete(index, incarnation) + } else { + s.multiVersionMap[key].Set(index, incarnation, value) + } + } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys +} + +// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs +func (s *Store) InvalidateWriteset(index int, incarnation int) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if keys, ok := s.txWritesetKeys[index]; ok { + for _, key := range keys { + // invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level + s.tryInitMultiVersionItem(key) // this SHOULD no-op because we're invalidating existing keys + s.multiVersionMap[key].SetEstimate(index, incarnation) + } + } + // we leave the writeset in place because we'll need it for key removal later if/when we replace with a new writeset +} + +// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating +func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) + // still need to save the writeset so we can remove the elements later: + for key := range writeset { + writeSetKeys = append(writeSetKeys, key) + s.tryInitMultiVersionItem(key) + s.multiVersionMap[key].SetEstimate(index, incarnation) + } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys +} + +// GetWritesetKeys implements MultiVersionStore. +func (s *Store) GetAllWritesetKeys() map[int][]string { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.txWritesetKeys +} + // SetEstimate implements MultiVersionStore. func (s *Store) SetEstimate(index int, incarnation int, key []byte) { s.mtx.Lock() @@ -117,4 +218,38 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { s.multiVersionMap[keyString].Delete(index, incarnation) } -var _ MultiVersionStore = (*Store)(nil) +func (s *Store) WriteLatestToStore(parentStore types.KVStore) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // sort the keys + keys := make([]string, 0, len(s.multiVersionMap)) + for key := range s.multiVersionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + mvValue, found := s.multiVersionMap[key].GetLatestNonEstimate() + if !found { + // this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip + continue + } + // we shouldn't have any ESTIMATE values when performing the write, because we read the latest non-estimate values only + if mvValue.IsEstimate() { + panic("should not have any estimate values when writing to parent store") + } + // if the value is deleted, then delete it from the parent store + if mvValue.IsDeleted() { + // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot + // be sure if the underlying store might do a save with the byteslice or + // not. Once we get confirmation that .Delete is guaranteed not to + // save the byteslice, then we can assume only a read-only copy is sufficient. + parentStore.Delete([]byte(key)) + continue + } + if mvValue.Value() != nil { + parentStore.Set([]byte(key), mvValue.Value()) + } + } +} diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 91465c435..732a5a6ba 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -3,8 +3,10 @@ package multiversion_test import ( "testing" + "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/multiversion" "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" ) func TestMultiVersionStore(t *testing.T) { @@ -52,3 +54,89 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) { require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1"))) require.False(t, store.Has(0, []byte("key1"))) } + +func TestMultiVersionStoreWriteToParent(t *testing.T) { + // initialize cachekv store + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore() + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + + mvs.Set(1, 1, []byte("key1"), []byte("value1")) + mvs.Set(2, 1, []byte("key1"), []byte("value2")) + mvs.Set(3, 1, []byte("key2"), []byte("value3")) + mvs.Delete(1, 1, []byte("key3")) + mvs.Delete(1, 1, []byte("key4")) + + mvs.WriteLatestToStore(parentKVStore) + + // assert state in parent store + require.Equal(t, []byte("value2"), parentKVStore.Get([]byte("key1"))) + require.Equal(t, []byte("value3"), parentKVStore.Get([]byte("key2"))) + require.False(t, parentKVStore.Has([]byte("key3"))) + require.False(t, parentKVStore.Has([]byte("key4"))) + + // verify no-op if mvs contains ESTIMATE + mvs.SetEstimate(1, 2, []byte("key5")) + mvs.WriteLatestToStore(parentKVStore) + require.False(t, parentKVStore.Has([]byte("key5"))) +} + +func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { + mvs := multiversion.NewMultiVersionStore() + + writeset := make(map[string][]byte) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + + mvs.SetWriteset(1, 2, writeset) + require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value()) + require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value()) + require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted()) + + writeset2 := make(map[string][]byte) + writeset2["key1"] = []byte("value3") + + mvs.SetWriteset(2, 1, writeset2) + require.Equal(t, []byte("value3"), mvs.GetLatest([]byte("key1")).Value()) + + // invalidate writeset1 + mvs.InvalidateWriteset(1, 2) + + // verify estimates + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key1")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key2")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key3")).IsEstimate()) + + // third writeset + writeset3 := make(map[string][]byte) + writeset3["key4"] = []byte("foo") + writeset3["key5"] = nil + + // write the writeset directly as estimate + mvs.SetEstimatedWriteset(3, 1, writeset3) + + require.True(t, mvs.GetLatest([]byte("key4")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key5")).IsEstimate()) + + // try replacing writeset1 to verify old keys removed + writeset1_b := make(map[string][]byte) + writeset1_b["key1"] = []byte("value4") + + mvs.SetWriteset(1, 2, writeset1_b) + require.Equal(t, []byte("value4"), mvs.GetLatestBeforeIndex(2, []byte("key1")).Value()) + require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key2"))) + // verify that GetLatest for key3 returns nil - because of removal from writeset + require.Nil(t, mvs.GetLatest([]byte("key3"))) + + // verify output for GetAllWritesetKeys + writesetKeys := mvs.GetAllWritesetKeys() + // we have 3 writesets + require.Equal(t, 3, len(writesetKeys)) + require.Equal(t, []string{"key1"}, writesetKeys[1]) + require.Equal(t, []string{"key1"}, writesetKeys[2]) + require.Equal(t, []string{"key4", "key5"}, writesetKeys[3]) + +}