diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index df1692d1f..e17cba65c 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -15,7 +15,7 @@ import ( func TestVersionIndexedStoreGetters(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) @@ -41,16 +41,25 @@ func TestVersionIndexedStoreGetters(t *testing.T) { require.Equal(t, []byte("value1"), val3) // test deleted value written to MVS but not parent store - mvs.Delete(0, 2, []byte("delKey")) + mvs.SetWriteset(0, 2, map[string][]byte{ + "delKey": nil, + }) parentKVStore.Set([]byte("delKey"), []byte("value4")) valDel := vis.Get([]byte("delKey")) require.Nil(t, valDel) require.False(t, vis.Has([]byte("delKey"))) // set different key in MVS - for various indices - mvs.Set(0, 2, []byte("key3"), []byte("value3")) - mvs.Set(2, 1, []byte("key3"), []byte("value4")) - mvs.SetEstimate(5, 0, []byte("key3")) + mvs.SetWriteset(0, 2, map[string][]byte{ + "delKey": nil, + "key3": []byte("value3"), + }) + mvs.SetWriteset(2, 1, map[string][]byte{ + "key3": []byte("value4"), + }) + mvs.SetEstimatedWriteset(5, 0, map[string][]byte{ + "key3": nil, + }) // read the key that falls down to MVS val4 := vis.Get([]byte("key3")) @@ -89,7 +98,7 @@ func TestVersionIndexedStoreGetters(t *testing.T) { func TestVersionIndexedStoreSetters(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) @@ -97,7 +106,9 @@ func TestVersionIndexedStoreSetters(t *testing.T) { vis.Set([]byte("key1"), []byte("value1")) require.Equal(t, []byte("value1"), vis.GetWriteset()["key1"]) - mvs.Set(0, 1, []byte("key2"), []byte("value2")) + mvs.SetWriteset(0, 1, map[string][]byte{ + "key2": []byte("value2"), + }) vis.Delete([]byte("key2")) require.Nil(t, vis.Get([]byte("key2"))) // because the delete should be at the writeset level, we should not have populated the readset @@ -112,7 +123,7 @@ func TestVersionIndexedStoreSetters(t *testing.T) { func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) @@ -129,11 +140,13 @@ func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) { func TestVersionIndexedStoreWrite(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) - mvs.Set(0, 1, []byte("key3"), []byte("value3")) + mvs.SetWriteset(0, 1, map[string][]byte{ + "key3": []byte("value3"), + }) require.False(t, mvs.Has(3, []byte("key1"))) require.False(t, mvs.Has(3, []byte("key2"))) @@ -154,11 +167,13 @@ func TestVersionIndexedStoreWrite(t *testing.T) { func TestVersionIndexedStoreWriteEstimates(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) - mvs.Set(0, 1, []byte("key3"), []byte("value3")) + mvs.SetWriteset(0, 1, map[string][]byte{ + "key3": []byte("value3"), + }) require.False(t, mvs.Has(3, []byte("key1"))) require.False(t, mvs.Has(3, []byte("key2"))) @@ -179,7 +194,7 @@ func TestVersionIndexedStoreWriteEstimates(t *testing.T) { func TestVersionIndexedStoreValidation(t *testing.T) { mem := dbadapter.Store{DB: dbm.NewMemDB()} parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) // initialize a new VersionIndexedStore abortC := make(chan scheduler.Abort) vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC) @@ -187,9 +202,12 @@ func TestVersionIndexedStoreValidation(t *testing.T) { parentKVStore.Set([]byte("key4"), []byte("value4")) parentKVStore.Set([]byte("key5"), []byte("value5")) parentKVStore.Set([]byte("deletedKey"), []byte("foo")) - mvs.Set(0, 1, []byte("key1"), []byte("value1")) - mvs.Set(0, 1, []byte("key2"), []byte("value2")) - mvs.Delete(0, 1, []byte("deletedKey")) + + mvs.SetWriteset(0, 1, map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "deletedKey": nil, + }) // load those into readset vis.Get([]byte("key1")) @@ -202,32 +220,52 @@ func TestVersionIndexedStoreValidation(t *testing.T) { // everything checks out, so we should be able to validate successfully require.True(t, vis.ValidateReadset()) // modify underlying transaction key that is unrelated - mvs.Set(1, 1, []byte("key3"), []byte("value3")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + }) // should still have valid readset require.True(t, vis.ValidateReadset()) // modify underlying transaction key that is related - mvs.Set(1, 1, []byte("key1"), []byte("value1_b")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1_b"), + }) // should now have invalid readset require.False(t, vis.ValidateReadset()) // reset so readset is valid again - mvs.Set(1, 1, []byte("key1"), []byte("value1")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1"), + }) require.True(t, vis.ValidateReadset()) // mvs has a value that was initially read from parent - mvs.Set(1, 2, []byte("key4"), []byte("value4_b")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1"), + "key4": []byte("value4_b"), + }) require.False(t, vis.ValidateReadset()) // reset key - mvs.Set(1, 2, []byte("key4"), []byte("value4")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1"), + "key4": []byte("value4"), + }) require.True(t, vis.ValidateReadset()) // mvs has a value that was initially read from parent - BUT in a later tx index - mvs.Set(4, 2, []byte("key4"), []byte("value4_c")) + mvs.SetWriteset(4, 2, map[string][]byte{ + "key4": []byte("value4_c"), + }) // readset should remain valid require.True(t, vis.ValidateReadset()) // mvs has an estimate - mvs.SetEstimate(1, 2, []byte("key2")) + mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ + "key2": nil, + }) // readset should be invalid now - but via abort channel write go func() { vis.ValidateReadset() @@ -236,10 +274,20 @@ func TestVersionIndexedStoreValidation(t *testing.T) { require.Equal(t, 1, abort.DependentTxIdx) // test key deleted later - mvs.Delete(1, 1, []byte("key2")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1"), + "key4": []byte("value4"), + "key2": nil, + }) require.False(t, vis.ValidateReadset()) // reset key2 - mvs.Set(1, 1, []byte("key2"), []byte("value2")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key3": []byte("value3"), + "key1": []byte("value1"), + "key4": []byte("value4"), + "key2": []byte("value2"), + }) // lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic parentKVStore.Set([]byte("keyDNE"), []byte("foobar")) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 3aa4800f3..08c45204b 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -1,27 +1,31 @@ package multiversion import ( + "bytes" "sort" "sync" + "time" "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" ) type MultiVersionStore interface { GetLatest(key []byte) (value MultiVersionValueItem) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) - Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets - SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets - Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets Has(index int, key []byte) bool - WriteLatestToStore(parentStore types.KVStore) + WriteLatestToStore() SetWriteset(index int, incarnation int, writeset WriteSet) InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) GetAllWritesetKeys() map[int][]string + SetReadset(index int, readset ReadSet) + GetReadset(index int) ReadSet + ValidateTransactionState(index int) []int } type WriteSet map[string][]byte +type ReadSet map[string][]byte var _ MultiVersionStore = (*Store)(nil) @@ -32,12 +36,17 @@ type Store struct { // TODO: do we need to support iterators as well similar to how cachekv does it - yes txWritesetKeys map[int][]string // map of tx index -> writeset keys + txReadSets map[int]ReadSet + + parentStore types.KVStore } -func NewMultiVersionStore() *Store { +func NewMultiVersionStore(parentStore types.KVStore) *Store { return &Store{ multiVersionMap: make(map[string]MultiVersionValue), txWritesetKeys: make(map[int][]string), + txReadSets: make(map[int]ReadSet), + parentStore: parentStore, } } @@ -99,16 +108,6 @@ func (s *Store) tryInitMultiVersionItem(keyString string) { } } -// Set implements MultiVersionStore. -func (s *Store) Set(index int, incarnation int, key []byte, value []byte) { - s.mtx.Lock() - defer s.mtx.Unlock() - - keyString := string(key) - s.tryInitMultiVersionItem(keyString) - s.multiVersionMap[keyString].Set(index, incarnation, value) -} - func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { writeset := make(map[string][]byte) if newWriteSet != nil { @@ -135,6 +134,7 @@ func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { } // SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store. +// TODO: returns a list of NEW keys added func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) { s.mtx.Lock() defer s.mtx.Unlock() @@ -153,7 +153,7 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) { s.multiVersionMap[key].Set(index, incarnation, value) } } - sort.Strings(writeSetKeys) + sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice s.txWritesetKeys[index] = writeSetKeys } @@ -198,27 +198,63 @@ func (s *Store) GetAllWritesetKeys() map[int][]string { return s.txWritesetKeys } -// SetEstimate implements MultiVersionStore. -func (s *Store) SetEstimate(index int, incarnation int, key []byte) { +func (s *Store) SetReadset(index int, readset ReadSet) { s.mtx.Lock() defer s.mtx.Unlock() - keyString := string(key) - s.tryInitMultiVersionItem(keyString) - s.multiVersionMap[keyString].SetEstimate(index, incarnation) + s.txReadSets[index] = readset } -// Delete implements MultiVersionStore. -func (s *Store) Delete(index int, incarnation int, key []byte) { - s.mtx.Lock() - defer s.mtx.Unlock() +func (s *Store) GetReadset(index int) ReadSet { + s.mtx.RLock() + defer s.mtx.RUnlock() - keyString := string(key) - s.tryInitMultiVersionItem(keyString) - s.multiVersionMap[keyString].Delete(index, incarnation) + return s.txReadSets[index] +} + +func (s *Store) ValidateTransactionState(index int) []int { + defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate") + conflictSet := map[int]struct{}{} + + // validate readset + readset := s.GetReadset(index) + // iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store + for key, value := range readset { + // get the latest value from the multiversion store + latestValue := s.GetLatestBeforeIndex(index, []byte(key)) + if latestValue == nil { + // TODO: maybe we don't even do this check? + parentVal := s.parentStore.Get([]byte(key)) + if !bytes.Equal(parentVal, value) { + panic("there shouldn't be readset conflicts with parent kv store, since it shouldn't change") + } + } else { + // if estimate, mark as conflict index + if latestValue.IsEstimate() { + conflictSet[latestValue.Index()] = struct{}{} + } else if latestValue.IsDeleted() { + if value != nil { + // conflict + conflictSet[latestValue.Index()] = struct{}{} + } + } else if !bytes.Equal(latestValue.Value(), value) { + conflictSet[latestValue.Index()] = struct{}{} + } + } + } + // TODO: validate iterateset + + // convert conflictset into sorted indices + conflictIndices := make([]int, 0, len(conflictSet)) + for index := range conflictSet { + conflictIndices = append(conflictIndices, index) + } + + sort.Ints(conflictIndices) + return conflictIndices } -func (s *Store) WriteLatestToStore(parentStore types.KVStore) { +func (s *Store) WriteLatestToStore() { s.mtx.Lock() defer s.mtx.Unlock() @@ -245,11 +281,11 @@ func (s *Store) WriteLatestToStore(parentStore types.KVStore) { // be sure if the underlying store might do a save with the byteslice or // not. Once we get confirmation that .Delete is guaranteed not to // save the byteslice, then we can assume only a read-only copy is sufficient. - parentStore.Delete([]byte(key)) + s.parentStore.Delete([]byte(key)) continue } if mvValue.Value() != nil { - parentStore.Set([]byte(key), mvValue.Value()) + s.parentStore.Set([]byte(key), mvValue.Value()) } } } diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 732a5a6ba..bb56d1e71 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -10,25 +10,38 @@ import ( ) func TestMultiVersionStore(t *testing.T) { - store := multiversion.NewMultiVersionStore() + store := multiversion.NewMultiVersionStore(nil) // Test Set and GetLatest - store.Set(1, 1, []byte("key1"), []byte("value1")) - store.Set(2, 1, []byte("key1"), []byte("value2")) - store.Set(3, 1, []byte("key2"), []byte("value3")) + store.SetWriteset(1, 1, map[string][]byte{ + "key1": []byte("value1"), + }) + store.SetWriteset(2, 1, map[string][]byte{ + "key1": []byte("value2"), + }) + store.SetWriteset(3, 1, map[string][]byte{ + "key2": []byte("value3"), + }) + require.Equal(t, []byte("value2"), store.GetLatest([]byte("key1")).Value()) require.Equal(t, []byte("value3"), store.GetLatest([]byte("key2")).Value()) // Test SetEstimate - store.SetEstimate(4, 1, []byte("key1")) + store.SetEstimatedWriteset(4, 1, map[string][]byte{ + "key1": nil, + }) require.True(t, store.GetLatest([]byte("key1")).IsEstimate()) // Test Delete - store.Delete(5, 1, []byte("key1")) + store.SetWriteset(5, 1, map[string][]byte{ + "key1": nil, + }) require.True(t, store.GetLatest([]byte("key1")).IsDeleted()) // Test GetLatestBeforeIndex - store.Set(6, 1, []byte("key1"), []byte("value4")) + store.SetWriteset(6, 1, map[string][]byte{ + "key1": []byte("value4"), + }) require.True(t, store.GetLatestBeforeIndex(5, []byte("key1")).IsEstimate()) require.Equal(t, []byte("value4"), store.GetLatestBeforeIndex(7, []byte("key1")).Value()) @@ -39,16 +52,18 @@ func TestMultiVersionStore(t *testing.T) { } func TestMultiVersionStoreHasLaterValue(t *testing.T) { - store := multiversion.NewMultiVersionStore() + store := multiversion.NewMultiVersionStore(nil) - store.Set(5, 1, []byte("key1"), []byte("value2")) + store.SetWriteset(5, 1, map[string][]byte{ + "key1": []byte("value2"), + }) require.Nil(t, store.GetLatestBeforeIndex(4, []byte("key1"))) require.Equal(t, []byte("value2"), store.GetLatestBeforeIndex(6, []byte("key1")).Value()) } func TestMultiVersionStoreKeyDNE(t *testing.T) { - store := multiversion.NewMultiVersionStore() + store := multiversion.NewMultiVersionStore(nil) require.Nil(t, store.GetLatest([]byte("key1"))) require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1"))) @@ -58,18 +73,24 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) { func TestMultiVersionStoreWriteToParent(t *testing.T) { // initialize cachekv store parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(parentKVStore) parentKVStore.Set([]byte("key2"), []byte("value0")) parentKVStore.Set([]byte("key4"), []byte("value4")) - mvs.Set(1, 1, []byte("key1"), []byte("value1")) - mvs.Set(2, 1, []byte("key1"), []byte("value2")) - mvs.Set(3, 1, []byte("key2"), []byte("value3")) - mvs.Delete(1, 1, []byte("key3")) - mvs.Delete(1, 1, []byte("key4")) + mvs.SetWriteset(1, 1, map[string][]byte{ + "key1": []byte("value1"), + "key3": nil, + "key4": nil, + }) + mvs.SetWriteset(2, 1, map[string][]byte{ + "key1": []byte("value2"), + }) + mvs.SetWriteset(3, 1, map[string][]byte{ + "key2": []byte("value3"), + }) - mvs.WriteLatestToStore(parentKVStore) + mvs.WriteLatestToStore() // assert state in parent store require.Equal(t, []byte("value2"), parentKVStore.Get([]byte("key1"))) @@ -78,13 +99,18 @@ func TestMultiVersionStoreWriteToParent(t *testing.T) { require.False(t, parentKVStore.Has([]byte("key4"))) // verify no-op if mvs contains ESTIMATE - mvs.SetEstimate(1, 2, []byte("key5")) - mvs.WriteLatestToStore(parentKVStore) + mvs.SetEstimatedWriteset(1, 2, map[string][]byte{ + "key1": []byte("value1"), + "key3": nil, + "key4": nil, + "key5": nil, + }) + mvs.WriteLatestToStore() require.False(t, parentKVStore.Has([]byte("key5"))) } func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { - mvs := multiversion.NewMultiVersionStore() + mvs := multiversion.NewMultiVersionStore(nil) writeset := make(map[string][]byte) writeset["key1"] = []byte("value1") @@ -140,3 +166,66 @@ func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { require.Equal(t, []string{"key4", "key5"}, writesetKeys[3]) } + +func TestMultiVersionStoreValidateState(t *testing.T) { + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore(parentKVStore) + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key3"), []byte("value3")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + + writeset := make(multiversion.WriteSet) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + mvs.SetWriteset(1, 2, writeset) + + readset := make(multiversion.ReadSet) + readset["key1"] = []byte("value1") + readset["key2"] = []byte("value2") + readset["key3"] = nil + readset["key4"] = []byte("value4") + readset["key5"] = []byte("value5") + mvs.SetReadset(5, readset) + + // assert no readset is valid + conflicts := mvs.ValidateTransactionState(4) + require.Empty(t, conflicts) + + // assert readset index 5 is valid + conflicts = mvs.ValidateTransactionState(5) + require.Empty(t, conflicts) + + // introduce conflict + mvs.SetWriteset(2, 1, map[string][]byte{ + "key3": []byte("value6"), + }) + + // expect index 2 to be returned + conflicts = mvs.ValidateTransactionState(5) + require.Equal(t, []int{2}, conflicts) + + // add a conflict due to deletion + mvs.SetWriteset(3, 1, map[string][]byte{ + "key1": nil, + }) + + // expect indices 2 and 3 to be returned + conflicts = mvs.ValidateTransactionState(5) + require.Equal(t, []int{2, 3}, conflicts) + + // add a conflict due to estimate + mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ + "key2": []byte("test"), + }) + + // expect indices 2, 3, and 4to be returned + conflicts = mvs.ValidateTransactionState(5) + require.Equal(t, []int{2, 3, 4}, conflicts) + + // assert panic for parent store mismatch + parentKVStore.Set([]byte("key5"), []byte("value6")) + require.Panics(t, func() { mvs.ValidateTransactionState(5) }) +}