Skip to content

Commit

Permalink
[occ] Occ multiversion store (#326)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This adds in functionality to write the latest multiversion values to
another store (to be used for writing to parent after transaction
execution), and also adds in helpers for writeset management such as
setting, invalidating, and setting estimated writesets.

## Testing performed to validate your change
Unit testing for added functionality
  • Loading branch information
udpatil committed Oct 17, 2023
1 parent eef2149 commit 30fb602
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 11 deletions.
34 changes: 32 additions & 2 deletions store/multiversion/data_structures.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ const (

type MultiVersionValue interface {
GetLatest() (value MultiVersionValueItem, found bool)
GetLatestNonEstimate() (value MultiVersionValueItem, found bool)
GetLatestBeforeIndex(index int) (value MultiVersionValueItem, found bool)
Set(index int, incarnation int, value []byte)
SetEstimate(index int, incarnation int)
Delete(index int, incarnation int)
Remove(index int)
}

type MultiVersionValueItem interface {
Expand All @@ -42,8 +44,6 @@ func NewMultiVersionItem() *multiVersionItem {
}

// GetLatest returns the latest written value to the btree, and returns a boolean indicating whether it was found.
//
// A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit.
func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) {
item.mtx.RLock()
defer item.mtx.RUnlock()
Expand All @@ -56,6 +56,29 @@ func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) {
return valueItem, true
}

// GetLatestNonEstimate returns the latest written value that isn't an ESTIMATE and returns a boolean indicating whether it was found.
// This can be used when we want to write finalized values, since ESTIMATEs can be considered to be irrelevant at that point
func (item *multiVersionItem) GetLatestNonEstimate() (MultiVersionValueItem, bool) {
item.mtx.RLock()
defer item.mtx.RUnlock()

var vItem *valueItem
var found bool
item.valueTree.Descend(func(bTreeItem btree.Item) bool {
// only return if non-estimate
item := bTreeItem.(*valueItem)
if item.IsEstimate() {
// if estimate, continue
return true
}
// else we want to return
vItem = item
found = true
return false
})
return vItem, found
}

// GetLatest returns the latest written value to the btree prior to the index passed in, and returns a boolean indicating whether it was found.
//
// A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit.
Expand Down Expand Up @@ -95,6 +118,13 @@ func (item *multiVersionItem) Delete(index int, incarnation int) {
item.valueTree.ReplaceOrInsert(deletedItem)
}

func (item *multiVersionItem) Remove(index int) {
item.mtx.Lock()
defer item.mtx.Unlock()

item.valueTree.Delete(&valueItem{index: index})
}

func (item *multiVersionItem) SetEstimate(index int, incarnation int) {
item.mtx.Lock()
defer item.mtx.Unlock()
Expand Down
28 changes: 28 additions & 0 deletions store/multiversion/data_structures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,31 @@ func TestMultiversionItemEstimate(t *testing.T) {
require.True(t, found)
require.Equal(t, one, value.Value())
}

func TestMultiversionItemRemove(t *testing.T) {
mvItem := mv.NewMultiVersionItem()

mvItem.Set(1, 0, []byte("one"))
mvItem.Set(2, 0, []byte("two"))

mvItem.Remove(2)
value, found := mvItem.GetLatest()
require.True(t, found)
require.Equal(t, []byte("one"), value.Value())
}

func TestMultiversionItemGetLatestNonEstimate(t *testing.T) {
mvItem := mv.NewMultiVersionItem()

mvItem.SetEstimate(3, 0)

value, found := mvItem.GetLatestNonEstimate()
require.False(t, found)
require.Nil(t, value)

mvItem.Set(1, 0, []byte("one"))
value, found = mvItem.GetLatestNonEstimate()
require.True(t, found)
require.Equal(t, []byte("one"), value.Value())

}
153 changes: 144 additions & 9 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,43 @@
package multiversion

import (
"sort"
"sync"

"github.com/cosmos/cosmos-sdk/store/types"
)

type MultiVersionStore interface {
GetLatest(key []byte) (value MultiVersionValueItem)
GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem)
Set(index int, incarnation int, key []byte, value []byte)
SetEstimate(index int, incarnation int, key []byte)
Delete(index int, incarnation int, key []byte)
Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets
SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Has(index int, key []byte) bool
// TODO: do we want to add helper functions for validations with readsets / applying writesets ?
WriteLatestToStore(parentStore types.KVStore)
SetWriteset(index int, incarnation int, writeset WriteSet)
InvalidateWriteset(index int, incarnation int)
SetEstimatedWriteset(index int, incarnation int, writeset WriteSet)
GetAllWritesetKeys() map[int][]string
}

type WriteSet map[string][]byte

var _ MultiVersionStore = (*Store)(nil)

type Store struct {
mtx sync.RWMutex
// map that stores the key -> MultiVersionValue mapping for accessing from a given key
multiVersionMap map[string]MultiVersionValue
// TODO: do we need to add something here to persist readsets for later validation
// TODO: we need to support iterators as well similar to how cachekv does it
// TODO: do we need secondary indexing on index -> keys - this way if we need to abort we can replace those keys with ESTIMATE values? - maybe this just means storing writeset
// TODO: do we need to support iterators as well similar to how cachekv does it - yes

txWritesetKeys map[int][]string // map of tx index -> writeset keys
}

func NewMultiVersionStore() *Store {
return &Store{
multiVersionMap: make(map[string]MultiVersionValue),
txWritesetKeys: make(map[int][]string),
}
}

Expand All @@ -41,7 +53,7 @@ func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) {
}
val, found := s.multiVersionMap[keyString].GetLatest()
if !found {
return nil // this shouldn't be possible
return nil // this is possible IF there is are writeset that are then removed for that key
}
return val
}
Expand Down Expand Up @@ -97,6 +109,95 @@ func (s *Store) Set(index int, incarnation int, key []byte, value []byte) {
s.multiVersionMap[keyString].Set(index, incarnation, value)
}

func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
writeset := make(map[string][]byte)
if newWriteSet != nil {
// if non-nil writeset passed in, we can use that to optimize removals
writeset = newWriteSet
}
// if there is already a writeset existing, we should remove that fully
if keys, ok := s.txWritesetKeys[index]; ok {
// we need to delete all of the keys in the writeset from the multiversion store
for _, key := range keys {
// small optimization to check if the new writeset is going to write this key, if so, we can leave it behind
if _, ok := writeset[key]; ok {
// we don't need to remove this key because it will be overwritten anyways - saves the operation of removing + rebalancing underlying btree
continue
}
// remove from the appropriate item if present in multiVersionMap
if val, ok := s.multiVersionMap[key]; ok {
val.Remove(index)
}
}
}
// unset the writesetKeys for this index
delete(s.txWritesetKeys, index)
}

// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store.
func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
s.mtx.Lock()
defer s.mtx.Unlock()

// remove old writeset if it exists
s.removeOldWriteset(index, writeset)

writeSetKeys := make([]string, 0, len(writeset))
for key, value := range writeset {
writeSetKeys = append(writeSetKeys, key)
s.tryInitMultiVersionItem(key)
if value == nil {
// delete if nil value
s.multiVersionMap[key].Delete(index, incarnation)
} else {
s.multiVersionMap[key].Set(index, incarnation, value)
}
}
sort.Strings(writeSetKeys)
s.txWritesetKeys[index] = writeSetKeys
}

// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs
func (s *Store) InvalidateWriteset(index int, incarnation int) {
s.mtx.Lock()
defer s.mtx.Unlock()

if keys, ok := s.txWritesetKeys[index]; ok {
for _, key := range keys {
// invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level
s.tryInitMultiVersionItem(key) // this SHOULD no-op because we're invalidating existing keys
s.multiVersionMap[key].SetEstimate(index, incarnation)
}
}
// we leave the writeset in place because we'll need it for key removal later if/when we replace with a new writeset
}

// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating
func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) {
s.mtx.Lock()
defer s.mtx.Unlock()

// remove old writeset if it exists
s.removeOldWriteset(index, writeset)

writeSetKeys := make([]string, 0, len(writeset))
// still need to save the writeset so we can remove the elements later:
for key := range writeset {
writeSetKeys = append(writeSetKeys, key)
s.tryInitMultiVersionItem(key)
s.multiVersionMap[key].SetEstimate(index, incarnation)
}
sort.Strings(writeSetKeys)
s.txWritesetKeys[index] = writeSetKeys
}

// GetWritesetKeys implements MultiVersionStore.
func (s *Store) GetAllWritesetKeys() map[int][]string {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.txWritesetKeys
}

// SetEstimate implements MultiVersionStore.
func (s *Store) SetEstimate(index int, incarnation int, key []byte) {
s.mtx.Lock()
Expand All @@ -117,4 +218,38 @@ func (s *Store) Delete(index int, incarnation int, key []byte) {
s.multiVersionMap[keyString].Delete(index, incarnation)
}

var _ MultiVersionStore = (*Store)(nil)
func (s *Store) WriteLatestToStore(parentStore types.KVStore) {
s.mtx.Lock()
defer s.mtx.Unlock()

// sort the keys
keys := make([]string, 0, len(s.multiVersionMap))
for key := range s.multiVersionMap {
keys = append(keys, key)
}
sort.Strings(keys)

for _, key := range keys {
mvValue, found := s.multiVersionMap[key].GetLatestNonEstimate()
if !found {
// this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip
continue
}
// we shouldn't have any ESTIMATE values when performing the write, because we read the latest non-estimate values only
if mvValue.IsEstimate() {
panic("should not have any estimate values when writing to parent store")
}
// if the value is deleted, then delete it from the parent store
if mvValue.IsDeleted() {
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
parentStore.Delete([]byte(key))
continue
}
if mvValue.Value() != nil {
parentStore.Set([]byte(key), mvValue.Value())
}
}
}
88 changes: 88 additions & 0 deletions store/multiversion/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package multiversion_test
import (
"testing"

"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/multiversion"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
)

func TestMultiVersionStore(t *testing.T) {
Expand Down Expand Up @@ -52,3 +54,89 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) {
require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1")))
require.False(t, store.Has(0, []byte("key1")))
}

func TestMultiVersionStoreWriteToParent(t *testing.T) {
// initialize cachekv store
parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()}
mvs := multiversion.NewMultiVersionStore()

parentKVStore.Set([]byte("key2"), []byte("value0"))
parentKVStore.Set([]byte("key4"), []byte("value4"))

mvs.Set(1, 1, []byte("key1"), []byte("value1"))
mvs.Set(2, 1, []byte("key1"), []byte("value2"))
mvs.Set(3, 1, []byte("key2"), []byte("value3"))
mvs.Delete(1, 1, []byte("key3"))
mvs.Delete(1, 1, []byte("key4"))

mvs.WriteLatestToStore(parentKVStore)

// assert state in parent store
require.Equal(t, []byte("value2"), parentKVStore.Get([]byte("key1")))
require.Equal(t, []byte("value3"), parentKVStore.Get([]byte("key2")))
require.False(t, parentKVStore.Has([]byte("key3")))
require.False(t, parentKVStore.Has([]byte("key4")))

// verify no-op if mvs contains ESTIMATE
mvs.SetEstimate(1, 2, []byte("key5"))
mvs.WriteLatestToStore(parentKVStore)
require.False(t, parentKVStore.Has([]byte("key5")))
}

func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) {
mvs := multiversion.NewMultiVersionStore()

writeset := make(map[string][]byte)
writeset["key1"] = []byte("value1")
writeset["key2"] = []byte("value2")
writeset["key3"] = nil

mvs.SetWriteset(1, 2, writeset)
require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value())
require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value())
require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted())

writeset2 := make(map[string][]byte)
writeset2["key1"] = []byte("value3")

mvs.SetWriteset(2, 1, writeset2)
require.Equal(t, []byte("value3"), mvs.GetLatest([]byte("key1")).Value())

// invalidate writeset1
mvs.InvalidateWriteset(1, 2)

// verify estimates
require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key1")).IsEstimate())
require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key2")).IsEstimate())
require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key3")).IsEstimate())

// third writeset
writeset3 := make(map[string][]byte)
writeset3["key4"] = []byte("foo")
writeset3["key5"] = nil

// write the writeset directly as estimate
mvs.SetEstimatedWriteset(3, 1, writeset3)

require.True(t, mvs.GetLatest([]byte("key4")).IsEstimate())
require.True(t, mvs.GetLatest([]byte("key5")).IsEstimate())

// try replacing writeset1 to verify old keys removed
writeset1_b := make(map[string][]byte)
writeset1_b["key1"] = []byte("value4")

mvs.SetWriteset(1, 2, writeset1_b)
require.Equal(t, []byte("value4"), mvs.GetLatestBeforeIndex(2, []byte("key1")).Value())
require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key2")))
// verify that GetLatest for key3 returns nil - because of removal from writeset
require.Nil(t, mvs.GetLatest([]byte("key3")))

// verify output for GetAllWritesetKeys
writesetKeys := mvs.GetAllWritesetKeys()
// we have 3 writesets
require.Equal(t, 3, len(writesetKeys))
require.Equal(t, []string{"key1"}, writesetKeys[1])
require.Equal(t, []string{"key1"}, writesetKeys[2])
require.Equal(t, []string{"key4", "key5"}, writesetKeys[3])

}

0 comments on commit 30fb602

Please sign in to comment.