From 64672e9c7c16ba9fe2e41974d1ece31175e85e8e Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 30 Oct 2023 15:37:14 -0400 Subject: [PATCH] Fix map access panic (#343) ## Describe your changes and provide context - `CollectIteratorItems` needs to hold an RLock to avoid a concurrent access panic ## Testing performed to validate your change - Reproduced through a sei-chain-side test (concurrent instantiates) --- store/multiversion/store.go | 45 +++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 7c14c6415..2ee1c31b9 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -206,7 +206,7 @@ func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteS s.txWritesetKeys[index] = writeSetKeys } -// GetWritesetKeys implements MultiVersionStore. +// GetAllWritesetKeys implements MultiVersionStore. func (s *Store) GetAllWritesetKeys() map[int][]string { s.mtx.RLock() defer s.mtx.RUnlock() @@ -243,10 +243,13 @@ func (s *Store) GetIterateset(index int) Iterateset { // CollectIteratorItems implements MultiVersionStore. It will return a memDB containing all of the keys present in the multiversion store within the iteration range prior to (exclusive of) the index. func (s *Store) CollectIteratorItems(index int) *db.MemDB { + s.mtx.RLock() + defer s.mtx.RUnlock() + sortedItems := db.NewMemDB() // get all writeset keys prior to index - keys := s.GetAllWritesetKeys() + keys := s.txWritesetKeys for i := 0; i < index; i++ { indexedWriteset, ok := keys[i] if !ok { @@ -316,21 +319,27 @@ func (s *Store) validateIterator(index int, tracker iterationTracker) bool { } } -// TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? -func (s *Store) ValidateTransactionState(index int) (bool, []int) { - defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate") - conflictSet := map[int]struct{}{} - valid := true +func (s *Store) checkIteratorAtIndex(index int) bool { + s.mtx.RLock() + defer s.mtx.RUnlock() - // TODO: can we parallelize for all iterators? - iterateset := s.GetIterateset(index) + valid := true + iterateset := s.txIterateSets[index] for _, iterationTracker := range iterateset { iteratorValid := s.validateIterator(index, iterationTracker) valid = valid && iteratorValid } + return valid +} + +func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { + s.mtx.RLock() + defer s.mtx.RUnlock() + + conflictSet := make(map[int]struct{}) + readset := s.txReadSets[index] + valid := true - // validate readset - readset := s.GetReadset(index) // iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store for key, value := range readset { // get the latest value from the multiversion store @@ -357,16 +366,28 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { } } - // convert conflictset into sorted indices conflictIndices := make([]int, 0, len(conflictSet)) for index := range conflictSet { conflictIndices = append(conflictIndices, index) } sort.Ints(conflictIndices) + return valid, conflictIndices } +// TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? +func (s *Store) ValidateTransactionState(index int) (bool, []int) { + defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate") + + // TODO: can we parallelize for all iterators? + iteratorValid := s.checkIteratorAtIndex(index) + + readsetValid, conflictIndices := s.checkReadsetAtIndex(index) + + return iteratorValid && readsetValid, conflictIndices +} + func (s *Store) WriteLatestToStore() { s.mtx.Lock() defer s.mtx.Unlock()