Skip to content

Commit

Permalink
contention issue fix (#21)
Browse files Browse the repository at this point in the history
* remove finalise

* fix: update maindb txIndex after merge slotDB

otherwise there can be issue that txIndex is load before the change in
mergeSlotDB.

* Fix: avoid update mainDB nonce in executeInSlot

It should use slotDB, otherwise it cause the stateObjects change in
mainDB, which cause racing issue.

* Fix: remove stateDB update during conflict check

stateDB.getState() will update the stateDB's stateObjects, which should
not be updated for the purpose of state read for conflict check.

---------

Co-authored-by: Sunny <[email protected]>
  • Loading branch information
2 people authored and welkin22 committed Oct 22, 2024
1 parent 24763c4 commit fd2a810
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 12 deletions.
4 changes: 2 additions & 2 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR

on := txReq.tx.Nonce()
if txReq.msg.IsDepositTx && p.config.IsOptimismRegolith(vmenv.Context.Time) {
on = txReq.baseStateDB.GetNonce(txReq.msg.From)
on = slotDB.GetNonce(txReq.msg.From)
}

slotDB.SetTxContext(txReq.tx.Hash(), txReq.txIndex)
Expand Down Expand Up @@ -365,7 +365,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR
conflictIndex = txReq.conflictIndex.Load()
if conflictIndex < mIndex {
if txReq.conflictIndex.CompareAndSwap(conflictIndex, mIndex) {
log.Debug("Update conflictIndex in execution because of error, new conflictIndex: %d", conflictIndex)
log.Debug(fmt.Sprintf("Update conflictIndex in execution because of error: %s, new conflictIndex: %d", err.Error(), conflictIndex))
}
}
atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1)
Expand Down
6 changes: 4 additions & 2 deletions core/state/parallel_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash
}
}
}
valMain := mainDB.GetState(addr, key)
valMain := mainDB.GetStateNoUpdate(addr, key)

if !bytes.Equal(val.Bytes(), valMain.Bytes()) {
log.Debug("hasKvConflict is invalid", "addr", addr,
Expand Down Expand Up @@ -1238,6 +1238,8 @@ func (s *ParallelStateDB) getKVFromUnconfirmedDB(addr common.Address, key common
if obj.deleted || obj.selfDestructed {
return common.Hash{}, true
}
// The dirty object in unconfirmed DB will never be finalised and changed after execution.
// So no storageRecordsLock requried.
if val, exist := obj.dirtyStorage.GetValue(key); exist {
return val, true
}
Expand Down Expand Up @@ -1594,8 +1596,8 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S
if !s.isParallel || !s.parallel.isSlotDB {
obj.finalise(true) // Prefetch slots in the background
} else {
// don't do finalise() here as to keep dirtyObjects unchanged in dirtyStorages, which avoid contention issue.
obj.fixUpOriginAndResetPendingStorage()
obj.finalise(false)
}
}

Expand Down
95 changes: 93 additions & 2 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,7 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject {
}

object.code = s.code

// The lock is unnecessary since deepCopy only invoked at global phase. No concurrent racing.
// The lock is unnecessary since deepCopy only invoked at global phase and with dirty object that never changed.
object.dirtyStorage = s.dirtyStorage.Copy()
object.originStorage = s.originStorage.Copy()
object.pendingStorage = s.pendingStorage.Copy()
Expand All @@ -883,6 +882,17 @@ func (s *stateObject) MergeSlotObject(db Database, dirtyObjs *stateObject, keys
// But here, it should be ok, since the KV should be changed and valid in the SlotDB,
s.setState(key, dirtyObjs.GetState(key))
}

// The dirtyObject may have new state accessed from Snap and Trie, so merge the origins.
dirtyObjs.originStorage.Range(func(keyItf, valueItf interface{}) bool {
key := keyItf.(common.Hash)
value := valueItf.(common.Hash)
// Skip noop changes, persist actual changes
if _, ok := s.originStorage.GetValue(key); !ok {
s.originStorage.StoreValue(key, value)
}
return true
})
}

//
Expand Down Expand Up @@ -982,6 +992,87 @@ func (s *stateObject) Root() common.Hash {
return s.data.Root
}

// GetStateNoUpdate retrieves a value from the account storage trie, but never update the stateDB cache
func (s *stateObject) GetStateNoUpdate(key common.Hash) common.Hash {
// If we have a dirty value for this state entry, return it
value, dirty := s.dirtyStorage.GetValue(key)
if dirty {
return value
}
// Otherwise return the entry's original value
result := s.GetCommittedStateNoUpdate(key)
return result
}

// GetCommittedStateNoUpdate retrieves a value from the committed account storage trie, but never update the
// stateDB cache (object.originStorage)
func (s *stateObject) GetCommittedStateNoUpdate(key common.Hash) common.Hash {
// If we have a pending write or clean cached, return that
// if value, pending := s.pendingStorage[key]; pending {
if value, pending := s.pendingStorage.GetValue(key); pending {
return value
}
if value, cached := s.originStorage.GetValue(key); cached {
return value
}

// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
// database about any storage values. The only possible alternatives are:
// 1) resurrect happened, and new slot values were set -- those should
// have been handles via pendingStorage above.
// 2) we don't have new values, and can deliver empty response back
//if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed {
s.db.stateObjectDestructLock.RLock()
if _, destructed := s.db.getStateObjectsDegetstruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex?
s.db.stateObjectDestructLock.RUnlock()
return common.Hash{}
}
s.db.stateObjectDestructLock.RUnlock()

// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
value common.Hash
)
if s.db.snap != nil {
start := time.Now()
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
}
if len(enc) > 0 {
_, content, _, err := rlp.Split(enc)
if err != nil {
s.db.setError(err)
}
value.SetBytes(content)
}
}
// If the snapshot is unavailable or reading from it fails, load from the database.
if s.db.snap == nil || err != nil {
start := time.Now()
tr, err := s.getTrie()
if err != nil {
s.db.setError(err)
return common.Hash{}
}
s.db.trieParallelLock.Lock()
val, err := tr.GetStorage(s.address, key.Bytes())
s.db.trieParallelLock.Unlock()
if metrics.EnabledExpensive {
s.db.StorageReads += time.Since(start)
}
if err != nil {
s.db.setError(err)
return common.Hash{}
}
value.SetBytes(val)
}
return value
}

// fixUpOriginAndResetPendingStorage is used for slot object only, the target is to fix up the origin storage of the
// object with the latest mainDB. And reset the pendingStorage as the execution recorded the changes in dirty and the
// dirties will be merged to pending at finalise. so the current pendingStorage contains obsoleted info mainly from
Expand Down
18 changes: 12 additions & 6 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,15 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
return nil
}

// GetStateNoUpdate retrieves a value from the given account's storage trie, but do not update the db.stateObjects cache.
func (s *StateDB) GetStateNoUpdate(addr common.Address, hash common.Hash) (ret common.Hash) {
object := s.getStateObjectNoUpdate(addr)
if object != nil {
return object.GetStateNoUpdate(hash)
}
return common.Hash{}
}

// getStateObjectNoUpdate is similar with getStateObject except that it does not
// update stateObjects records.
func (s *StateDB) getStateObjectNoUpdate(addr common.Address) *stateObject {
Expand All @@ -837,8 +846,6 @@ func (s *StateDB) getStateObjectNoUpdate(addr common.Address) *stateObject {
}

func (s *StateDB) getDeletedStateObjectNoUpdate(addr common.Address) *stateObject {
s.RecordRead(types.AccountStateKey(addr, types.AccountSelf), struct{}{})

// Prefer live objects if any is available
if obj, _ := s.getStateObjectFromStateObjects(addr); obj != nil {
return obj
Expand All @@ -848,7 +855,6 @@ func (s *StateDB) getDeletedStateObjectNoUpdate(addr common.Address) *stateObjec
if !ok {
return nil
}
// Insert into the live set
obj := newObject(s, s.isParallel, addr, data)
return obj
}
Expand Down Expand Up @@ -2593,6 +2599,7 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) {
if obj.deleted {
continue
}
obj.storageRecordsLock.RLock()
// copied from obj.finalise(true)
slotsToPrefetch := make([][]byte, 0, obj.dirtyStorage.Length())
obj.dirtyStorage.Range(func(key, value interface{}) bool {
Expand All @@ -2603,6 +2610,7 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) {
}
return true
})
obj.storageRecordsLock.RUnlock()
if s.prefetcher != nil && len(slotsToPrefetch) > 0 {
s.prefetcher.prefetch(obj.addrHash, obj.data.Root, obj.address, slotsToPrefetch)
}
Expand All @@ -2620,8 +2628,6 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) {
// merged back to the main StateDB.
func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receipt, txIndex int, fees *DelayedGasFee) *StateDB {

s.SetTxContext(slotDb.thash, slotDb.txIndex)

for s.nextRevisionId < slotDb.nextRevisionId {
if len(slotDb.validRevisions) > 0 {
r := slotDb.validRevisions[s.nextRevisionId]
Expand Down Expand Up @@ -2823,7 +2829,7 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip
s.snapParallelLock.Unlock()
}
}

s.SetTxContext(slotDb.thash, slotDb.txIndex)
return s
}

Expand Down

0 comments on commit fd2a810

Please sign in to comment.