Skip to content

Commit

Permalink
FIX: redundancy execution and incorrect merge of dirty object (bnb-ch…
Browse files Browse the repository at this point in the history
…ain#12)

Fix 3 issues:
  - re-execution happens only to new version of baseDB to remove
    redundancy execution. And remove the retry with same baseIndex
    that is conflicted.
  - incorrect merge dirty objects in addrStateChangeInSlot, which
    cause incorrect data.root copied with obseleted stateDB, this
    fix handle the created, stateChanged and deleted object separately.
  - stateObject.GetCommitedState check mainDB of the object delete.

Co-authored-by: Sunny <[email protected]>
  • Loading branch information
DavidZangNR and sunny2022da committed Oct 11, 2024
1 parent ff2d6d0 commit f616f3b
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 90 deletions.
110 changes: 77 additions & 33 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ type ParallelTxRequest struct {
usedGas *uint64
curTxChan chan int
systemAddrRedo bool
runnable int32 // 0: not runnable, 1: runnable
runnable int32 // 0: not runnable, executing, 1: runnable, on hold, can be scheduled
executedNum atomic.Int32
retryNum int32
conflictIndex atomic.Int32
}

// to create and start the execution slot goroutines
Expand Down Expand Up @@ -302,9 +303,16 @@ func (p *ParallelStateProcessor) switchSlot(slotIndex int) {
}

func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult {
mIndex := p.mergedTxIndex.Load()
conflictIndex := txReq.conflictIndex.Load()
if mIndex <= conflictIndex {
// The conflicted TX has not been finished executing, skip execution.
// the transaction failed at check(nonce or balance), actually it has not been executed yet.
atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1)
return nil
}
execNum := txReq.executedNum.Add(1)
slotDB := state.NewSlotDB(txReq.baseStateDB, txReq.txIndex, int(p.mergedTxIndex.Load()), p.unconfirmedDBs)

slotDB := state.NewSlotDB(txReq.baseStateDB, txReq.txIndex, int(mIndex), p.unconfirmedDBs)
blockContext := NewEVMBlockContext(txReq.block.Header(), p.bc, nil, p.config, slotDB) // can share blockContext within a block for efficiency
txContext := NewEVMTxContext(txReq.msg)
vmenv := vm.NewEVM(blockContext, txContext, slotDB, p.config, txReq.vmConfig)
Expand Down Expand Up @@ -341,6 +349,21 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR
p.unconfirmedDBs.Store(txReq.txIndex, slotDB)
} else {
// the transaction failed at check(nonce or balance), actually it has not been executed yet.
// the error here can be both expected and unexpected
// expected - the execution is correct and the error is normal result
// unexpected - the execution is incorrectly accessed the state because of parallelization.
// In both case, rerun with next version of stateDB, it is a waste and buggy to rerun with same
// version of stateDB.
// Therefore, treat it as conflict and rerun, leave the result to conflict check.
// Load conflict as it maybe updated by conflict checker or other execution slots.
// use old mIndex so that we can try the new one that is updated by other thread of merging
// during execution.
conflictIndex = txReq.conflictIndex.Load()
if conflictIndex < mIndex {
if txReq.conflictIndex.CompareAndSwap(conflictIndex, mIndex) {
log.Debug("Update conflictIndex in execution because of error, new conflictIndex: %d", conflictIndex)
}
}
atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1)
// the error could be caused by unconfirmed balance reference,
// the balance could insufficient to pay its gas limit, which cause it preCheck.buyGas() failed
Expand Down Expand Up @@ -397,6 +420,14 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
valid := p.toConfirmTxIndexResult(targetResult, isStage2)
if !valid {
staticSlotIndex := targetResult.txReq.staticSlotIndex // it is better to run the TxReq in its static dispatch slot
conflictBase := targetResult.slotDB.BaseTxIndex()
conflictIndex := targetResult.txReq.conflictIndex.Load()
if conflictIndex < int32(conflictBase) {
if targetResult.txReq.conflictIndex.CompareAndSwap(conflictIndex, int32(conflictBase)) {
// updated successfully
log.Debug("Update conflict index", "conflictIndex", conflictIndex, "conflictBase", conflictBase)
}
}
if isStage2 {
atomic.CompareAndSwapInt32(&targetResult.txReq.runnable, 0, 1) // needs redo
p.debugConflictRedoNum++
Expand All @@ -406,30 +437,32 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
}

if len(p.pendingConfirmResults[targetTxIndex]) == 0 { // this is the last result to check, and it is not valid
blockTxCount := targetResult.txReq.block.Transactions().Len()
// This means that the tx has been executed more than blockTxCount times, so it exits with the error.
// TODO-dav: p.mergedTxIndex+2 may be more reasonable? - this is buggy for expected exit
if targetResult.txReq.txIndex == int(p.mergedTxIndex.Load())+1 {
// txReq is the next to merge
if atomic.LoadInt32(&targetResult.txReq.retryNum) <= int32(blockTxCount)+3000 {
atomic.AddInt32(&targetResult.txReq.retryNum, 1)
// conflict retry
} else {
// retry many times and still conflict, either the tx is expected to be wrong, or something wrong.
if targetResult.err != nil {
if true { // TODO: delete the printf
fmt.Printf("!!!!!!!!!!! Parallel execution exited with error!!!!!, txIndex:%d, err: %v\n", targetResult.txReq.txIndex, targetResult.err)
}
return targetResult
if targetResult.txReq.txIndex == int(p.mergedTxIndex.Load())+1 &&
targetResult.slotDB.BaseTxIndex() == int(p.mergedTxIndex.Load()) {
/*
// txReq is the next to merge
if atomic.LoadInt32(&targetResult.txReq.retryNum) <= int32(blockTxCount)+3000 {
atomic.AddInt32(&targetResult.txReq.retryNum, 1)
// conflict retry
} else {
// abnormal exit with conflict error, need check the parallel algorithm
targetResult.err = ErrParallelUnexpectedConflict
if true {
fmt.Printf("!!!!!!!!!!! Parallel execution exited unexpected conflict!!!!!, txIndex:%d\n", targetResult.txReq.txIndex)
}
return targetResult
*/
// retry many times and still conflict, either the tx is expected to be wrong, or something wrong.
if targetResult.err != nil {
if false { // TODO: delete the printf
fmt.Printf("!!!!!!!!!!! Parallel execution exited with error!!!!!, txIndex:%d, err: %v\n", targetResult.txReq.txIndex, targetResult.err)
}
return targetResult
} else {
// abnormal exit with conflict error, need check the parallel algorithm
targetResult.err = ErrParallelUnexpectedConflict
if false {
fmt.Printf("!!!!!!!!!!! Parallel execution exited unexpected conflict!!!!!, txIndex:%d\n", targetResult.txReq.txIndex)
}
return targetResult
}
//}
}
atomic.CompareAndSwapInt32(&targetResult.txReq.runnable, 0, 1) // needs redo
p.debugConflictRedoNum++
Expand Down Expand Up @@ -508,11 +541,13 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
}
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
// not swapped: txReq.runnable == 0
//fmt.Printf("Dav -- runInLoop, - not runnable - TxREQ: %d\n", txReq.txIndex)
continue
}
// fmt.Printf("Dav -- runInLoop, - executeInSlot - TxREQ: %d\n", txReq.txIndex)
p.txResultChan <- p.executeInSlot(slotIndex, txReq)
res := p.executeInSlot(slotIndex, txReq)
if res == nil {
continue
}
p.txResultChan <- res
// fmt.Printf("Dav -- runInLoop, - loopbody tail - TxREQ: %d\n", txReq.txIndex)
}
// switched to the other slot.
Expand All @@ -531,7 +566,6 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
if atomic.LoadInt32(&curSlot.activatedType) != slotType {
interrupted = true
// fmt.Printf("Dav -- stealLoop, - activatedType - TxREQ: %d\n", stealTxReq.txIndex)

break
}

Expand All @@ -542,7 +576,11 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
continue
}
// fmt.Printf("Dav -- stealLoop, - executeInSlot - TxREQ: %d\n", stealTxReq.txIndex)
p.txResultChan <- p.executeInSlot(slotIndex, stealTxReq)
res := p.executeInSlot(slotIndex, stealTxReq)
if res == nil {
continue
}
p.txResultChan <- res
// fmt.Printf("Dav -- stealLoop, - loopbody tail - TxREQ: %d\n", stealTxReq.txIndex)
}
}
Expand Down Expand Up @@ -625,15 +663,20 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga

var root []byte
header := result.txReq.block.Header()
if p.config.IsByzantium(header.Number) {
result.slotDB.FinaliseForParallel(true, statedb)
} else {
root = result.slotDB.IntermediateRootForSlotDB(p.config.IsEIP158(header.Number), statedb).Bytes()
}
result.receipt.PostState = root

isByzantium := p.config.IsByzantium(header.Number)
isEIP158 := p.config.IsEIP158(header.Number)
result.slotDB.FinaliseForParallel(isByzantium || isEIP158, statedb)

// merge slotDB into mainDB
statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex)

// Do IntermediateRoot after mergeSlotDB.
if !isByzantium {
root = statedb.IntermediateRoot(isEIP158).Bytes()
}
result.receipt.PostState = root

if resultTxIndex != int(p.mergedTxIndex.Load())+1 {
log.Error("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex,
"p.mergedTxIndex", p.mergedTxIndex.Load())
Expand Down Expand Up @@ -729,6 +772,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
retryNum: 0,
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
p.allTxReqs = append(p.allTxReqs, txReq)
}
// set up stage2 enter criteria
Expand Down
11 changes: 8 additions & 3 deletions core/state/parallel_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash
}
}
valMain := mainDB.GetState(addr, key)

if !bytes.Equal(val.Bytes(), valMain.Bytes()) {
log.Debug("hasKvConflict is invalid", "addr", addr,
"key", key, "valSlot", val,
"valMain", valMain, "SlotIndex", slotDB.parallel.SlotIndex,
"txIndex", slotDB.txIndex, "baseTxIndex", slotDB.parallel.baseTxIndex)

return true // return false, Range will be terminated.
}
return false
Expand Down Expand Up @@ -656,8 +658,6 @@ func (s *ParallelStateDB) GetState(addr common.Address, hash common.Hash) common
// it could be suicided within this SlotDB?
// it should be able to get state from suicided address within a Tx:
// e.g. within a transaction: call addr:suicide -> get state: should be ok
// return common.Hash{}
log.Info("ParallelStateDB GetState suicided", "addr", addr, "hash", hash)

if dirtyObj == nil {
log.Error("ParallelStateDB GetState access untouched object after create, may check create2")
Expand Down Expand Up @@ -1285,7 +1285,6 @@ func (slotDB *ParallelStateDB) IsParallelReadsValid(isStage2 bool) bool {
}
}
}

/* can not use mainDB.GetNonce() because we do not want to record the stateObject */
var nonceMain uint64 = 0
mainObj := mainDB.getStateObjectNoUpdate(addr)
Expand All @@ -1296,6 +1295,7 @@ func (slotDB *ParallelStateDB) IsParallelReadsValid(isStage2 bool) bool {
log.Debug("IsSlotDBReadsValid nonce read is invalid", "addr", addr,
"nonceSlot", nonceSlot, "nonceMain", nonceMain, "SlotIndex", slotDB.parallel.SlotIndex,
"txIndex", slotDB.txIndex, "baseTxIndex", slotDB.parallel.baseTxIndex)

return false
}
}
Expand Down Expand Up @@ -1395,6 +1395,7 @@ func (slotDB *ParallelStateDB) IsParallelReadsValid(isStage2 bool) bool {
}
}
}

if isStage2 { // stage2 skip check code, or state, since they are likely unchanged.
return true
}
Expand Down Expand Up @@ -1596,7 +1597,11 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S
}
}

if obj.created {
s.parallel.createdObjectRecord[addr] = struct{}{}
}
obj.created = false

s.stateObjectsPending[addr] = struct{}{}
s.stateObjectsDirty[addr] = struct{}{}

Expand Down
48 changes: 30 additions & 18 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,23 +349,33 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
return value
}

// Add-Dav:
// Need to confirm the object is not destructed in unconfirmed db and resurrected in this tx.
// otherwise there is an issue for cases like:
// B0: TX0 --> createAccount @addr1 -- merged into DB
// B1: Tx1 and Tx2
// Tx1 account@addr1, setState(key0), setState(key1) selfDestruct -- unconfirmed
// Tx2 recreate account@addr2, setState(key0) -- executing
// TX2 GetState(addr2, key1) ---
// key1 is never set after recurrsect, and should not return state in trie as it destructed in unconfirmed
// TODO - dav: do we need try storages from unconfirmedDB? - currently not because conflict detection need it for get from mainDB.
obj, exist := s.dbItf.GetStateObjectFromUnconfirmedDB(s.address)
if exist {
if obj.deleted || obj.selfDestructed {
return common.Hash{}
if s.db.isParallel && s.db.parallel.isSlotDB {
// Add-Dav:
// Need to confirm the object is not destructed in unconfirmed db and resurrected in this tx.
// otherwise there is an issue for cases like:
// B0: TX0 --> createAccount @addr1 -- merged into DB
// B1: Tx1 and Tx2
// Tx1 account@addr1, setState(key0), setState(key1) selfDestruct -- unconfirmed
// Tx2 recreate account@addr2, setState(key0) -- executing
// TX2 GetState(addr2, key1) ---
// key1 is never set after recurrsect, and should not return state in trie as it destructed in unconfirmed
// TODO - dav: do we need try storages from unconfirmedDB? - currently not because conflict detection need it for get from mainDB.
obj, exist := s.dbItf.GetStateObjectFromUnconfirmedDB(s.address)
if exist {
if obj.deleted || obj.selfDestructed {
return common.Hash{}
}
}
}

// also test whether the object is in mainDB and deleted.
pdb := s.db.parallel.baseStateDB
obj, exist = pdb.getStateObjectFromStateObjects(s.address)
if exist {
if obj.deleted || obj.selfDestructed {
return common.Hash{}
}
}
}
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
// database about any storage values. The only possible alternatives are:
Expand Down Expand Up @@ -526,7 +536,6 @@ func (s *stateObject) updateTrie() (Trie, error) {
maindb.accountStorageParallelLock.Lock()
defer maindb.accountStorageParallelLock.Unlock()
}

// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false)

Expand All @@ -549,6 +558,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
maindb.setError(err)
return nil, err
}

// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, s.pendingStorage.Length())
dirtyStorage := make(map[common.Hash][]byte)
Expand Down Expand Up @@ -594,6 +604,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
go func() {
defer wg.Done()
maindb.StorageMux.Lock()
defer maindb.StorageMux.Unlock()
// The snapshot storage map for the object
storage = maindb.storages[s.addrHash]
if storage == nil {
Expand All @@ -606,7 +617,6 @@ func (s *stateObject) updateTrie() (Trie, error) {
origin = make(map[common.Hash][]byte)
maindb.storagesOrigin[s.address] = origin
}
maindb.StorageMux.Unlock()
for key, value := range dirtyStorage {
khash := crypto.HashData(hasher, key[:])

Expand Down Expand Up @@ -636,6 +646,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
if maindb.prefetcher != nil {
maindb.prefetcher.used(s.addrHash, s.data.Root, usedStorage)
}

s.pendingStorage = newStorage(s.isParallel) // reset pending map
return tr, nil
/*
Expand Down Expand Up @@ -714,8 +725,9 @@ func (s *stateObject) updateRoot() {
// is occurred or there is not change in the trie.
// TODO: The trieParallelLock seems heavy, can we remove it?
s.db.trieParallelLock.Lock()
defer s.db.trieParallelLock.Unlock()

tr, err := s.updateTrie()
s.db.trieParallelLock.Unlock()
if err != nil || tr == nil {
return
}
Expand Down
Loading

0 comments on commit f616f3b

Please sign in to comment.