From 9e42a3b8c5a8e155f6b5cd14008ea196905a7cf4 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 10 Oct 2023 14:30:14 -0500 Subject: [PATCH] [occ] MVKV store implementation and tests (#323) ## Describe your changes and provide context This implements an mvkv store that will manage access from a transaction execution to the underlying multiversion store and underlying parent store if the multiversion store doesn't have that key. It will first serve any reads from its own writeset and readset, but if it does have to fall through to multiversion store or parent store, it will add those values to the readset. ## Testing performed to validate your change Unit tests --- store/multiversion/mvkv.go | 268 ++++++++++++++++++++++++++++++++ store/multiversion/mvkv_test.go | 250 +++++++++++++++++++++++++++++ types/occ/scheduler.go | 20 +++ 3 files changed, 538 insertions(+) create mode 100644 store/multiversion/mvkv.go create mode 100644 store/multiversion/mvkv_test.go create mode 100644 types/occ/scheduler.go diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go new file mode 100644 index 000000000..697561355 --- /dev/null +++ b/store/multiversion/mvkv.go @@ -0,0 +1,268 @@ +package multiversion + +import ( + "io" + "sort" + "sync" + "time" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" + dbm "github.com/tendermint/tm-db" +) + +// Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index +type VersionIndexedStore struct { + mtx sync.Mutex + // used for tracking reads and writes for eventual validation + persistence into multi-version store + readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) + writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + // TODO: need to add iterateset here as well + + // TODO: do we need this? - I think so? / maybe we just treat `nil` value in the writeset as a delete + deleted *sync.Map + // dirty keys that haven't been sorted yet for iteration + dirtySet map[string]struct{} + // used for iterators - populated at the time of iterator instantiation + // TODO: when we want to perform iteration, we need to move all the dirty keys (writeset and readset) into the sortedTree and then combine with the iterators for the underlying stores + sortedStore *dbm.MemDB // always ascending sorted + // parent stores (both multiversion and underlying parent store) + multiVersionStore MultiVersionStore + parent types.KVStore + // transaction metadata for versioned operations + transactionIndex int + incarnation int + // have abort channel here for aborting transactions + abortChannel chan scheduler.Abort +} + +var _ types.KVStore = (*VersionIndexedStore)(nil) + +func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore { + return &VersionIndexedStore{ + readset: make(map[string][]byte), + writeset: make(map[string][]byte), + deleted: &sync.Map{}, + dirtySet: make(map[string]struct{}), + sortedStore: dbm.NewMemDB(), + parent: parent, + multiVersionStore: multiVersionStore, + transactionIndex: transactionIndex, + incarnation: incarnation, + abortChannel: abortChannel, + } +} + +// GetReadset returns the readset +func (store *VersionIndexedStore) GetReadset() map[string][]byte { + return store.readset +} + +// GetWriteset returns the writeset +func (store *VersionIndexedStore) GetWriteset() map[string][]byte { + return store.writeset +} + +// Get implements types.KVStore. +func (store *VersionIndexedStore) Get(key []byte) []byte { + // first try to get from writeset cache, if cache miss, then try to get from multiversion store, if that misses, then get from parent store + // if the key is in the cache, return it + + // don't have RW mutex because we have to update readset + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "get") + + types.AssertValidKey(key) + strKey := string(key) + // first check the MVKV writeset, and return that value if present + cacheValue, ok := store.writeset[strKey] + if ok { + // return the value from the cache, no need to update any readset stuff + return cacheValue + } + // read the readset to see if the value exists - and return if applicable + if readsetVal, ok := store.readset[strKey]; ok { + return readsetVal + } + + // if we didn't find it, then we want to check the multivalue store + add to readset if applicable + mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key) + if mvsValue != nil { + if mvsValue.IsEstimate() { + store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index()) + return nil + } else { + // This handles both detecting readset conflicts and updating readset if applicable + return store.parseValueAndUpdateReadset(strKey, mvsValue) + } + } + // if we didn't find it in the multiversion store, then we want to check the parent store + add to readset + parentValue := store.parent.Get(key) + store.updateReadSet(key, parentValue) + return parentValue +} + +// This functions handles reads with deleted items and values and verifies that the data is consistent to what we currently have in the readset (IF we have a readset value for that key) +func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsValue MultiVersionValueItem) []byte { + value := mvsValue.Value() + if mvsValue.IsDeleted() { + value = nil + } + store.updateReadSet([]byte(strKey), value) + return value +} + +// This function iterates over the readset, validating that the values in the readset are consistent with the values in the multiversion store and underlying parent store, and returns a boolean indicating validity +func (store *VersionIndexedStore) ValidateReadset() bool { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "validate_readset") + + // sort the readset keys - this is so we have consistent behavior when theres varying conflicts within the readset (eg. read conflict vs estimate) + readsetKeys := make([]string, 0, len(store.readset)) + for key := range store.readset { + readsetKeys = append(readsetKeys, key) + } + sort.Strings(readsetKeys) + + // iterate over readset keys and values + for _, strKey := range readsetKeys { + key := []byte(strKey) + value := store.readset[strKey] + mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key) + if mvsValue != nil { + if mvsValue.IsEstimate() { + // if we see an estimate, that means that we need to abort and rerun + store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index()) + return false + } else { + if mvsValue.IsDeleted() { + // check for `nil` + if value != nil { + return false + } + } else { + // check for equality + if string(value) != string(mvsValue.Value()) { + return false + } + } + } + continue // value is valid, continue to next key + } + + parentValue := store.parent.Get(key) + if string(parentValue) != string(value) { + // this shouldnt happen because if we have a conflict it should always happen within multiversion store + panic("we shouldn't ever have a readset conflict in parent store") + } + // value was correct, we can continue to the next value + } + return true +} + +// Delete implements types.KVStore. +func (store *VersionIndexedStore) Delete(key []byte) { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete") + + types.AssertValidKey(key) + store.setValue(key, nil, true, true) +} + +// Has implements types.KVStore. +func (store *VersionIndexedStore) Has(key []byte) bool { + // necessary locking happens within store.Get + return store.Get(key) != nil +} + +// Set implements types.KVStore. +func (store *VersionIndexedStore) Set(key []byte, value []byte) { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set") + + types.AssertValidKey(key) + store.setValue(key, value, false, true) +} + +// Iterator implements types.KVStore. +func (v *VersionIndexedStore) Iterator(start []byte, end []byte) dbm.Iterator { + panic("unimplemented") +} + +// ReverseIterator implements types.KVStore. +func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iterator { + panic("unimplemented") +} + +// GetStoreType implements types.KVStore. +func (v *VersionIndexedStore) GetStoreType() types.StoreType { + return v.parent.GetStoreType() +} + +// CacheWrap implements types.KVStore. +func (*VersionIndexedStore) CacheWrap(storeKey types.StoreKey) types.CacheWrap { + panic("CacheWrap not supported for version indexed store") +} + +// CacheWrapWithListeners implements types.KVStore. +func (*VersionIndexedStore) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + panic("CacheWrapWithListeners not supported for version indexed store") +} + +// CacheWrapWithTrace implements types.KVStore. +func (*VersionIndexedStore) CacheWrapWithTrace(storeKey types.StoreKey, w io.Writer, tc types.TraceContext) types.CacheWrap { + panic("CacheWrapWithTrace not supported for version indexed store") +} + +// GetWorkingHash implements types.KVStore. +func (v *VersionIndexedStore) GetWorkingHash() ([]byte, error) { + panic("should never attempt to get working hash from version indexed store") +} + +// Only entrypoint to mutate writeset +func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirty bool) { + types.AssertValidKey(key) + + keyStr := string(key) + store.writeset[keyStr] = value + if deleted { + store.deleted.Store(keyStr, struct{}{}) + } else { + store.deleted.Delete(keyStr) + } + if dirty { + store.dirtySet[keyStr] = struct{}{} + } +} + +func (store *VersionIndexedStore) WriteToMultiVersionStore() { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") + store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset) +} + +func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") + store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset) +} + +func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { + // add to readset + keyStr := string(key) + store.readset[keyStr] = value + // add to dirty set + store.dirtySet[keyStr] = struct{}{} +} + +func (store *VersionIndexedStore) isDeleted(key string) bool { + _, ok := store.deleted.Load(key) + return ok +} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go new file mode 100644 index 000000000..df1692d1f --- /dev/null +++ b/store/multiversion/mvkv_test.go @@ -0,0 +1,250 @@ +package multiversion_test + +import ( + "testing" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" + "github.com/cosmos/cosmos-sdk/store/types" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func TestVersionIndexedStoreGetters(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // mock a value in the parent store + parentKVStore.Set([]byte("key1"), []byte("value1")) + + // read key that doesn't exist + val := vis.Get([]byte("key2")) + require.Nil(t, val) + require.False(t, vis.Has([]byte("key2"))) + + // read key that falls down to parent store + val2 := vis.Get([]byte("key1")) + require.Equal(t, []byte("value1"), val2) + require.True(t, vis.Has([]byte("key1"))) + // verify value now in readset + require.Equal(t, []byte("value1"), vis.GetReadset()["key1"]) + + // read the same key that should now be served from the readset (can be verified by setting a different value for the key in the parent store) + parentKVStore.Set([]byte("key1"), []byte("value2")) // realistically shouldn't happen, modifying to verify readset access + val3 := vis.Get([]byte("key1")) + require.True(t, vis.Has([]byte("key1"))) + require.Equal(t, []byte("value1"), val3) + + // test deleted value written to MVS but not parent store + mvs.Delete(0, 2, []byte("delKey")) + parentKVStore.Set([]byte("delKey"), []byte("value4")) + valDel := vis.Get([]byte("delKey")) + require.Nil(t, valDel) + require.False(t, vis.Has([]byte("delKey"))) + + // set different key in MVS - for various indices + mvs.Set(0, 2, []byte("key3"), []byte("value3")) + mvs.Set(2, 1, []byte("key3"), []byte("value4")) + mvs.SetEstimate(5, 0, []byte("key3")) + + // read the key that falls down to MVS + val4 := vis.Get([]byte("key3")) + // should equal value3 because value4 is later than the key in question + require.Equal(t, []byte("value3"), val4) + require.True(t, vis.Has([]byte("key3"))) + + // try a read that falls through to MVS with a later tx index + vis2 := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 3, 2, make(chan scheduler.Abort)) + val5 := vis2.Get([]byte("key3")) + // should equal value3 because value4 is later than the key in question + require.Equal(t, []byte("value4"), val5) + require.True(t, vis2.Has([]byte("key3"))) + + // test estimate values writing to abortChannel + abortChannel := make(chan scheduler.Abort) + vis3 := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 6, 2, abortChannel) + go func() { + vis3.Get([]byte("key3")) + }() + abort := <-abortChannel // read the abort from the channel + require.Equal(t, 5, abort.DependentTxIdx) + require.Equal(t, scheduler.ErrReadEstimate, abort.Err) + + vis.Set([]byte("key4"), []byte("value4")) + // verify proper response for GET + val6 := vis.Get([]byte("key4")) + require.True(t, vis.Has([]byte("key4"))) + require.Equal(t, []byte("value4"), val6) + // verify that its in the writeset + require.Equal(t, []byte("value4"), vis.GetWriteset()["key4"]) + // verify that its not in the readset + require.Nil(t, vis.GetReadset()["key4"]) +} + +func TestVersionIndexedStoreSetters(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // test simple set + vis.Set([]byte("key1"), []byte("value1")) + require.Equal(t, []byte("value1"), vis.GetWriteset()["key1"]) + + mvs.Set(0, 1, []byte("key2"), []byte("value2")) + vis.Delete([]byte("key2")) + require.Nil(t, vis.Get([]byte("key2"))) + // because the delete should be at the writeset level, we should not have populated the readset + require.Zero(t, len(vis.GetReadset())) + + // try setting the value again, and then read + vis.Set([]byte("key2"), []byte("value3")) + require.Equal(t, []byte("value3"), vis.Get([]byte("key2"))) + require.Zero(t, len(vis.GetReadset())) +} + +func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // asserts panics where appropriate + require.Panics(t, func() { vis.CacheWrap(types.NewKVStoreKey("mock")) }) + require.Panics(t, func() { vis.CacheWrapWithListeners(types.NewKVStoreKey("mock"), nil) }) + require.Panics(t, func() { vis.CacheWrapWithTrace(types.NewKVStoreKey("mock"), nil, nil) }) + require.Panics(t, func() { vis.GetWorkingHash() }) + + // assert properly returns store type + require.Equal(t, types.StoreTypeDB, vis.GetStoreType()) +} + +func TestVersionIndexedStoreWrite(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + mvs.Set(0, 1, []byte("key3"), []byte("value3")) + + require.False(t, mvs.Has(3, []byte("key1"))) + require.False(t, mvs.Has(3, []byte("key2"))) + require.True(t, mvs.Has(3, []byte("key3"))) + + // write some keys + vis.Set([]byte("key1"), []byte("value1")) + vis.Set([]byte("key2"), []byte("value2")) + vis.Delete([]byte("key3")) + + vis.WriteToMultiVersionStore() + + require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value()) + require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value()) + require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted()) +} + +func TestVersionIndexedStoreWriteEstimates(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + mvs.Set(0, 1, []byte("key3"), []byte("value3")) + + require.False(t, mvs.Has(3, []byte("key1"))) + require.False(t, mvs.Has(3, []byte("key2"))) + require.True(t, mvs.Has(3, []byte("key3"))) + + // write some keys + vis.Set([]byte("key1"), []byte("value1")) + vis.Set([]byte("key2"), []byte("value2")) + vis.Delete([]byte("key3")) + + vis.WriteEstimatesToMultiVersionStore() + + require.True(t, mvs.GetLatest([]byte("key1")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key2")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key3")).IsEstimate()) +} + +func TestVersionIndexedStoreValidation(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + abortC := make(chan scheduler.Abort) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC) + // set some initial values + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + parentKVStore.Set([]byte("deletedKey"), []byte("foo")) + mvs.Set(0, 1, []byte("key1"), []byte("value1")) + mvs.Set(0, 1, []byte("key2"), []byte("value2")) + mvs.Delete(0, 1, []byte("deletedKey")) + + // load those into readset + vis.Get([]byte("key1")) + vis.Get([]byte("key2")) + vis.Get([]byte("key4")) + vis.Get([]byte("key5")) + vis.Get([]byte("keyDNE")) + vis.Get([]byte("deletedKey")) + + // everything checks out, so we should be able to validate successfully + require.True(t, vis.ValidateReadset()) + // modify underlying transaction key that is unrelated + mvs.Set(1, 1, []byte("key3"), []byte("value3")) + // should still have valid readset + require.True(t, vis.ValidateReadset()) + + // modify underlying transaction key that is related + mvs.Set(1, 1, []byte("key1"), []byte("value1_b")) + // should now have invalid readset + require.False(t, vis.ValidateReadset()) + // reset so readset is valid again + mvs.Set(1, 1, []byte("key1"), []byte("value1")) + require.True(t, vis.ValidateReadset()) + + // mvs has a value that was initially read from parent + mvs.Set(1, 2, []byte("key4"), []byte("value4_b")) + require.False(t, vis.ValidateReadset()) + // reset key + mvs.Set(1, 2, []byte("key4"), []byte("value4")) + require.True(t, vis.ValidateReadset()) + + // mvs has a value that was initially read from parent - BUT in a later tx index + mvs.Set(4, 2, []byte("key4"), []byte("value4_c")) + // readset should remain valid + require.True(t, vis.ValidateReadset()) + + // mvs has an estimate + mvs.SetEstimate(1, 2, []byte("key2")) + // readset should be invalid now - but via abort channel write + go func() { + vis.ValidateReadset() + }() + abort := <-abortC // read the abort from the channel + require.Equal(t, 1, abort.DependentTxIdx) + + // test key deleted later + mvs.Delete(1, 1, []byte("key2")) + require.False(t, vis.ValidateReadset()) + // reset key2 + mvs.Set(1, 1, []byte("key2"), []byte("value2")) + + // lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic + parentKVStore.Set([]byte("keyDNE"), []byte("foobar")) + require.Equal(t, []byte("foobar"), parentKVStore.Get([]byte("keyDNE"))) + require.Panics(t, func() { + vis.ValidateReadset() + }) +} diff --git a/types/occ/scheduler.go b/types/occ/scheduler.go new file mode 100644 index 000000000..3905be395 --- /dev/null +++ b/types/occ/scheduler.go @@ -0,0 +1,20 @@ +package scheduler + +import "errors" + +var ( + ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting") +) + +// define the return struct for abort due to conflict +type Abort struct { + DependentTxIdx int + Err error +} + +func NewEstimateAbort(dependentTxIdx int) Abort { + return Abort{ + DependentTxIdx: dependentTxIdx, + Err: ErrReadEstimate, + } +}