From fd2a810fae8259a3c2efa190040d07be07617a6d Mon Sep 17 00:00:00 2001 From: DavidZang <110075234+DavidZangNR@users.noreply.github.com> Date: Mon, 5 Aug 2024 09:47:53 +0800 Subject: [PATCH] contention issue fix (#21) * remove finalise * fix: update maindb txIndex after merge slotDB otherwise there can be issue that txIndex is load before the change in mergeSlotDB. * Fix: avoid update mainDB nonce in executeInSlot It should use slotDB, otherwise it cause the stateObjects change in mainDB, which cause racing issue. * Fix: remove stateDB update during conflict check stateDB.getState() will update the stateDB's stateObjects, which should not be updated for the purpose of state read for conflict check. --------- Co-authored-by: Sunny --- core/parallel_state_processor.go | 4 +- core/state/parallel_statedb.go | 6 +- core/state/state_object.go | 95 +++++++++++++++++++++++++++++++- core/state/statedb.go | 18 ++++-- 4 files changed, 111 insertions(+), 12 deletions(-) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index cc50e1b82e..b99d7de5e0 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -330,7 +330,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR on := txReq.tx.Nonce() if txReq.msg.IsDepositTx && p.config.IsOptimismRegolith(vmenv.Context.Time) { - on = txReq.baseStateDB.GetNonce(txReq.msg.From) + on = slotDB.GetNonce(txReq.msg.From) } slotDB.SetTxContext(txReq.tx.Hash(), txReq.txIndex) @@ -365,7 +365,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR conflictIndex = txReq.conflictIndex.Load() if conflictIndex < mIndex { if txReq.conflictIndex.CompareAndSwap(conflictIndex, mIndex) { - log.Debug("Update conflictIndex in execution because of error, new conflictIndex: %d", conflictIndex) + log.Debug(fmt.Sprintf("Update conflictIndex in execution because of error: %s, new conflictIndex: %d", err.Error(), conflictIndex)) } } atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1) diff --git a/core/state/parallel_statedb.go b/core/state/parallel_statedb.go index ff0d05a5f9..c3a049a750 100644 --- a/core/state/parallel_statedb.go +++ b/core/state/parallel_statedb.go @@ -93,7 +93,7 @@ func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash } } } - valMain := mainDB.GetState(addr, key) + valMain := mainDB.GetStateNoUpdate(addr, key) if !bytes.Equal(val.Bytes(), valMain.Bytes()) { log.Debug("hasKvConflict is invalid", "addr", addr, @@ -1238,6 +1238,8 @@ func (s *ParallelStateDB) getKVFromUnconfirmedDB(addr common.Address, key common if obj.deleted || obj.selfDestructed { return common.Hash{}, true } + // The dirty object in unconfirmed DB will never be finalised and changed after execution. + // So no storageRecordsLock requried. if val, exist := obj.dirtyStorage.GetValue(key); exist { return val, true } @@ -1594,8 +1596,8 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S if !s.isParallel || !s.parallel.isSlotDB { obj.finalise(true) // Prefetch slots in the background } else { + // don't do finalise() here as to keep dirtyObjects unchanged in dirtyStorages, which avoid contention issue. obj.fixUpOriginAndResetPendingStorage() - obj.finalise(false) } } diff --git a/core/state/state_object.go b/core/state/state_object.go index 67e1b06205..161d2a81cb 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -861,8 +861,7 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject { } object.code = s.code - - // The lock is unnecessary since deepCopy only invoked at global phase. No concurrent racing. + // The lock is unnecessary since deepCopy only invoked at global phase and with dirty object that never changed. object.dirtyStorage = s.dirtyStorage.Copy() object.originStorage = s.originStorage.Copy() object.pendingStorage = s.pendingStorage.Copy() @@ -883,6 +882,17 @@ func (s *stateObject) MergeSlotObject(db Database, dirtyObjs *stateObject, keys // But here, it should be ok, since the KV should be changed and valid in the SlotDB, s.setState(key, dirtyObjs.GetState(key)) } + + // The dirtyObject may have new state accessed from Snap and Trie, so merge the origins. + dirtyObjs.originStorage.Range(func(keyItf, valueItf interface{}) bool { + key := keyItf.(common.Hash) + value := valueItf.(common.Hash) + // Skip noop changes, persist actual changes + if _, ok := s.originStorage.GetValue(key); !ok { + s.originStorage.StoreValue(key, value) + } + return true + }) } // @@ -982,6 +992,87 @@ func (s *stateObject) Root() common.Hash { return s.data.Root } +// GetStateNoUpdate retrieves a value from the account storage trie, but never update the stateDB cache +func (s *stateObject) GetStateNoUpdate(key common.Hash) common.Hash { + // If we have a dirty value for this state entry, return it + value, dirty := s.dirtyStorage.GetValue(key) + if dirty { + return value + } + // Otherwise return the entry's original value + result := s.GetCommittedStateNoUpdate(key) + return result +} + +// GetCommittedStateNoUpdate retrieves a value from the committed account storage trie, but never update the +// stateDB cache (object.originStorage) +func (s *stateObject) GetCommittedStateNoUpdate(key common.Hash) common.Hash { + // If we have a pending write or clean cached, return that + // if value, pending := s.pendingStorage[key]; pending { + if value, pending := s.pendingStorage.GetValue(key); pending { + return value + } + if value, cached := s.originStorage.GetValue(key); cached { + return value + } + + // If the object was destructed in *this* block (and potentially resurrected), + // the storage has been cleared out, and we should *not* consult the previous + // database about any storage values. The only possible alternatives are: + // 1) resurrect happened, and new slot values were set -- those should + // have been handles via pendingStorage above. + // 2) we don't have new values, and can deliver empty response back + //if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { + s.db.stateObjectDestructLock.RLock() + if _, destructed := s.db.getStateObjectsDegetstruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex? + s.db.stateObjectDestructLock.RUnlock() + return common.Hash{} + } + s.db.stateObjectDestructLock.RUnlock() + + // If no live objects are available, attempt to use snapshots + var ( + enc []byte + err error + value common.Hash + ) + if s.db.snap != nil { + start := time.Now() + enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes())) + if metrics.EnabledExpensive { + s.db.SnapshotStorageReads += time.Since(start) + } + if len(enc) > 0 { + _, content, _, err := rlp.Split(enc) + if err != nil { + s.db.setError(err) + } + value.SetBytes(content) + } + } + // If the snapshot is unavailable or reading from it fails, load from the database. + if s.db.snap == nil || err != nil { + start := time.Now() + tr, err := s.getTrie() + if err != nil { + s.db.setError(err) + return common.Hash{} + } + s.db.trieParallelLock.Lock() + val, err := tr.GetStorage(s.address, key.Bytes()) + s.db.trieParallelLock.Unlock() + if metrics.EnabledExpensive { + s.db.StorageReads += time.Since(start) + } + if err != nil { + s.db.setError(err) + return common.Hash{} + } + value.SetBytes(val) + } + return value +} + // fixUpOriginAndResetPendingStorage is used for slot object only, the target is to fix up the origin storage of the // object with the latest mainDB. And reset the pendingStorage as the execution recorded the changes in dirty and the // dirties will be merged to pending at finalise. so the current pendingStorage contains obsoleted info mainly from diff --git a/core/state/statedb.go b/core/state/statedb.go index 8ee3df823e..7fbcef7171 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -826,6 +826,15 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject { return nil } +// GetStateNoUpdate retrieves a value from the given account's storage trie, but do not update the db.stateObjects cache. +func (s *StateDB) GetStateNoUpdate(addr common.Address, hash common.Hash) (ret common.Hash) { + object := s.getStateObjectNoUpdate(addr) + if object != nil { + return object.GetStateNoUpdate(hash) + } + return common.Hash{} +} + // getStateObjectNoUpdate is similar with getStateObject except that it does not // update stateObjects records. func (s *StateDB) getStateObjectNoUpdate(addr common.Address) *stateObject { @@ -837,8 +846,6 @@ func (s *StateDB) getStateObjectNoUpdate(addr common.Address) *stateObject { } func (s *StateDB) getDeletedStateObjectNoUpdate(addr common.Address) *stateObject { - s.RecordRead(types.AccountStateKey(addr, types.AccountSelf), struct{}{}) - // Prefer live objects if any is available if obj, _ := s.getStateObjectFromStateObjects(addr); obj != nil { return obj @@ -848,7 +855,6 @@ func (s *StateDB) getDeletedStateObjectNoUpdate(addr common.Address) *stateObjec if !ok { return nil } - // Insert into the live set obj := newObject(s, s.isParallel, addr, data) return obj } @@ -2593,6 +2599,7 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) { if obj.deleted { continue } + obj.storageRecordsLock.RLock() // copied from obj.finalise(true) slotsToPrefetch := make([][]byte, 0, obj.dirtyStorage.Length()) obj.dirtyStorage.Range(func(key, value interface{}) bool { @@ -2603,6 +2610,7 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) { } return true }) + obj.storageRecordsLock.RUnlock() if s.prefetcher != nil && len(slotsToPrefetch) > 0 { s.prefetcher.prefetch(obj.addrHash, obj.data.Root, obj.address, slotsToPrefetch) } @@ -2620,8 +2628,6 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) { // merged back to the main StateDB. func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receipt, txIndex int, fees *DelayedGasFee) *StateDB { - s.SetTxContext(slotDb.thash, slotDb.txIndex) - for s.nextRevisionId < slotDb.nextRevisionId { if len(slotDb.validRevisions) > 0 { r := slotDb.validRevisions[s.nextRevisionId] @@ -2823,7 +2829,7 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip s.snapParallelLock.Unlock() } } - + s.SetTxContext(slotDb.thash, slotDb.txIndex) return s }