Skip to content

Commit

Permalink
Add mvkv readset validation and write to mvs helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
udpatil committed Oct 6, 2023
1 parent 8bedb7b commit 1ad3c3e
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 5 deletions.
66 changes: 61 additions & 5 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multiversion

import (
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -102,7 +103,6 @@ func (store *VersionIndexedStore) Get(key []byte) []byte {
// if we didn't find it, then we want to check the multivalue store + add to readset if applicable
mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key)
if mvsValue != nil {
// found something,
if mvsValue.IsEstimate() {
store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index())
// TODO: is it safe to return nil here?
Expand All @@ -128,6 +128,55 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV
return value
}

// This function iterates over the readset, validating that the values in the readset are consistent with the values in the multiversion store and underlying parent store, and returns a boolean indicating validity
func (store *VersionIndexedStore) ValidateReadset() bool {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "validate_readset")

// sort the readset keys - this is so we have consistent behavior when theres varying conflicts within the readset (eg. read conflict vs estimate)
readsetKeys := make([]string, 0, len(store.readset))
for key := range store.readset {
readsetKeys = append(readsetKeys, key)
}
sort.Strings(readsetKeys)

// iterate over readset keys and values
for _, strKey := range readsetKeys {
key := []byte(strKey)
value := store.readset[strKey]
mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key)
if mvsValue != nil {
if mvsValue.IsEstimate() {
// if we see an estimate, that means that we need to abort and rerun
store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index())
return false
} else {
if mvsValue.IsDeleted() {
// check for `nil`
if value != nil {
return false
}
} else {
// check for equality
if string(value) != string(mvsValue.Value()) {
return false
}
}
}
continue // value is valid, continue to next key
}

parentValue := store.parent.Get(key)
if string(parentValue) != string(value) {
// this shouldnt happen because if we have a conflict it should always happen within multiversion store
panic("we shouldn't ever have a readset conflict in parent store")
}
// value was correct, we can continue to the next value
}
return true
}

// Delete implements types.KVStore.
func (store *VersionIndexedStore) Delete(key []byte) {
store.mtx.Lock()
Expand Down Expand Up @@ -207,10 +256,17 @@ func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirt
}

func (store *VersionIndexedStore) WriteToMultiVersionStore() {
// write the writeset to the multiversion store
for key, value := range store.writeset {
store.multiVersionStore.Set(store.transactionIndex, store.incarnation, []byte(key), value)
}
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset)
}

func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset)
}

func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) {
Expand Down
123 changes: 123 additions & 0 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,126 @@ func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) {
// assert properly returns store type
require.Equal(t, types.StoreTypeDB, vis.GetStoreType())
}

func TestVersionIndexedStoreWrite(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
require.True(t, mvs.Has(3, []byte("key3")))

// write some keys
vis.Set([]byte("key1"), []byte("value1"))
vis.Set([]byte("key2"), []byte("value2"))
vis.Delete([]byte("key3"))

vis.WriteToMultiVersionStore()

require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value())
require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value())
require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted())
}

func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
require.True(t, mvs.Has(3, []byte("key3")))

// write some keys
vis.Set([]byte("key1"), []byte("value1"))
vis.Set([]byte("key2"), []byte("value2"))
vis.Delete([]byte("key3"))

vis.WriteEstimatesToMultiVersionStore()

require.True(t, mvs.GetLatest([]byte("key1")).IsEstimate())
require.True(t, mvs.GetLatest([]byte("key2")).IsEstimate())
require.True(t, mvs.GetLatest([]byte("key3")).IsEstimate())
}

func TestVersionIndexedStoreValidation(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
// initialize a new VersionIndexedStore
abortC := make(chan scheduler.Abort)
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC)
// set some initial values
parentKVStore.Set([]byte("key4"), []byte("value4"))
parentKVStore.Set([]byte("key5"), []byte("value5"))
parentKVStore.Set([]byte("deletedKey"), []byte("foo"))
mvs.Set(0, 1, []byte("key1"), []byte("value1"))
mvs.Set(0, 1, []byte("key2"), []byte("value2"))
mvs.Delete(0, 1, []byte("deletedKey"))

// load those into readset
vis.Get([]byte("key1"))
vis.Get([]byte("key2"))
vis.Get([]byte("key4"))
vis.Get([]byte("key5"))
vis.Get([]byte("keyDNE"))
vis.Get([]byte("deletedKey"))

// everything checks out, so we should be able to validate successfully
require.True(t, vis.ValidateReadset())
// modify underlying transaction key that is unrelated
mvs.Set(1, 1, []byte("key3"), []byte("value3"))
// should still have valid readset
require.True(t, vis.ValidateReadset())

// modify underlying transaction key that is related
mvs.Set(1, 1, []byte("key1"), []byte("value1_b"))
// should now have invalid readset
require.False(t, vis.ValidateReadset())
// reset so readset is valid again
mvs.Set(1, 1, []byte("key1"), []byte("value1"))
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent
mvs.Set(1, 2, []byte("key4"), []byte("value4_b"))
require.False(t, vis.ValidateReadset())
// reset key
mvs.Set(1, 2, []byte("key4"), []byte("value4"))
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent - BUT in a later tx index
mvs.Set(4, 2, []byte("key4"), []byte("value4_c"))
// readset should remain valid
require.True(t, vis.ValidateReadset())

// mvs has an estimate
mvs.SetEstimate(1, 2, []byte("key2"))
// readset should be invalid now - but via abort channel write
go func() {
vis.ValidateReadset()
}()
abort := <-abortC // read the abort from the channel
require.Equal(t, 1, abort.DependentTxIdx)

// test key deleted later
mvs.Delete(1, 1, []byte("key2"))
require.False(t, vis.ValidateReadset())
// reset key2
mvs.Set(1, 1, []byte("key2"), []byte("value2"))

// lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic
parentKVStore.Set([]byte("keyDNE"), []byte("foobar"))
require.Equal(t, []byte("foobar"), parentKVStore.Get([]byte("keyDNE")))
require.Panics(t, func() {
vis.ValidateReadset()
})
}

0 comments on commit 1ad3c3e

Please sign in to comment.