From 93aed9cb538df3162cec57e5ea73f4826c75978b Mon Sep 17 00:00:00 2001 From: galaio <12880651+galaio@users.noreply.github.com> Date: Mon, 29 Jul 2024 15:53:24 +0800 Subject: [PATCH] pevm: support delay gas fee calculation & Uts; (#11) * pevm: support delay gas fee calculation; txdag: check gas fee receiver; tests: support PEVM+TxDAG UTs; * txdag: skip some cost time operation; tests: fix some broken UTs; --------- Co-authored-by: galaio --- cmd/evm/blockrunner.go | 2 +- core/blockchain.go | 4 +-- core/parallel_state_processor.go | 34 +++++++++++++++--- core/state/statedb.go | 39 ++++++++++++++------- core/state/statedb_test.go | 2 +- core/state_processor.go | 12 ++++--- core/state_transition.go | 40 +++++++++++++++++++-- core/types/dag.go | 39 +++++---------------- core/types/dag_test.go | 6 ++-- core/types/mvstates.go | 11 ++++-- eth/backend.go | 2 +- miner/worker.go | 8 ++--- tests/block_test.go | 60 +++++++++++++++++++++++++++++--- tests/block_test_util.go | 8 +++-- 14 files changed, 189 insertions(+), 78 deletions(-) diff --git a/cmd/evm/blockrunner.go b/cmd/evm/blockrunner.go index c5d836e0ea..196e86d591 100644 --- a/cmd/evm/blockrunner.go +++ b/cmd/evm/blockrunner.go @@ -92,7 +92,7 @@ func blockTestCmd(ctx *cli.Context) error { fmt.Println(string(state.Dump(nil))) } } - }); err != nil { + }, "", true); err != nil { return fmt.Errorf("test %v: %w", name, err) } } diff --git a/core/blockchain.go b/core/blockchain.go index ecdb3044a9..c5078159bd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2837,7 +2837,7 @@ func (bc *BlockChain) TxDAGEnabled() bool { return bc.enableTxDAG } -func (bc *BlockChain) EnableTxDAGGeneration(output string) { +func (bc *BlockChain) SetupTxDAGGeneration(output string) { bc.enableTxDAG = true if len(output) == 0 { return @@ -2846,7 +2846,7 @@ func (bc *BlockChain) EnableTxDAGGeneration(output string) { var err error bc.txDAGMapping, err = readTxDAGMappingFromFile(output) if err != nil { - log.Error("read TxDAG err", err) + log.Error("read TxDAG err", "err", err) } // write handler diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 05a1f9bdff..93c56f5c31 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -50,6 +50,7 @@ type ParallelStateProcessor struct { targetStage2Count int // when executed txNUM reach it, enter stage2 RT confirm nextStage2TxIndex int disableStealTx bool + delayGasFee bool // it is provided by TxDAG } func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int) *ParallelStateProcessor { @@ -187,6 +188,8 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { // 3. TODO(galaio) it need to schedule the slow dep tx path properly; // 4. TODO(galaio) it is unfriendly for cross slot deps, maybe we can delay dispatch when tx cross in slots, it may increase PEVM parallelism; func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest, txDAG types.TxDAG) { + p.disableStealTx = false + p.delayGasFee = false // only support PlainTxDAG dispatch now. if txDAG == nil || txDAG.Type() != types.PlainTxDAGType { p.doStaticDispatch(txReqs) @@ -213,6 +216,7 @@ func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest, // it's unnecessary to enable slot steal mechanism, opt the steal mechanism later; p.disableStealTx = true + p.delayGasFee = true } // Benefits of StaticDispatch: @@ -331,7 +335,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR slotDB.SetTxContext(txReq.tx.Hash(), txReq.txIndex) - evm, result, err := applyTransactionStageExecution(txReq.msg, gpSlot, slotDB, vmenv) + evm, result, err := applyTransactionStageExecution(txReq.msg, gpSlot, slotDB, vmenv, p.delayGasFee) txResult := ParallelTxResult{ executedIndex: execNum, slotIndex: slotIndex, @@ -660,7 +664,19 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga } resultTxIndex := result.txReq.txIndex - + delayGasFee := result.result.delayFees + // add delayed gas fee + if delayGasFee != nil { + if delayGasFee.TipFee != nil { + result.slotDB.AddBalance(delayGasFee.Coinbase, delayGasFee.TipFee) + } + if delayGasFee.BaseFee != nil { + result.slotDB.AddBalance(params.OptimismBaseFeeRecipient, delayGasFee.BaseFee) + } + if delayGasFee.L1Fee != nil { + result.slotDB.AddBalance(params.OptimismL1FeeRecipient, delayGasFee.L1Fee) + } + } var root []byte header := result.txReq.block.Header() @@ -669,7 +685,7 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga result.slotDB.FinaliseForParallel(isByzantium || isEIP158, statedb) // merge slotDB into mainDB - statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex) + statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex, result.result.delayFees) // Do IntermediateRoot after mergeSlotDB. if !isByzantium { @@ -887,13 +903,21 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat return receipts, allLogs, *usedGas, nil } -func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.ParallelStateDB, evm *vm.EVM) (*vm.EVM, *ExecutionResult, error) { +func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.ParallelStateDB, evm *vm.EVM, delayGasFee bool) (*vm.EVM, *ExecutionResult, error) { // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg) evm.Reset(txContext, statedb) // Apply the transaction to the current state (included in the env). - result, err := ApplyMessage(evm, msg, gp) + var ( + result *ExecutionResult + err error + ) + if delayGasFee { + result, err = ApplyMessageDelayGasFee(evm, msg, gp) + } else { + result, err = ApplyMessage(evm, msg, gp) + } if err != nil { return nil, nil, err diff --git a/core/state/statedb.go b/core/state/statedb.go index e236256d59..f9d3faf132 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -60,6 +60,13 @@ type StateObjectSyncMap struct { sync.Map } +type DelayedGasFee struct { + BaseFee *big.Int + TipFee *big.Int + L1Fee *big.Int + Coinbase common.Address +} + func (s *StateObjectSyncMap) LoadStateObject(addr common.Address) (*stateObject, bool) { so, ok := s.Load(addr) if !ok { @@ -239,9 +246,10 @@ type StateDB struct { logSize uint // parallel EVM related - rwSet *types.RWSet - mvStates *types.MVStates - es *types.ExeStat + rwSet *types.RWSet + mvStates *types.MVStates + stat *types.ExeStat + rwRecordFlag bool // Preimages occurred seen by VM in the scope of block. preimages map[common.Hash][]byte @@ -2370,6 +2378,7 @@ func (s *StateDB) BeforeTxTransition() { s.rwSet = types.NewRWSet(types.StateVersion{ TxIndex: s.txIndex, }) + s.rwRecordFlag = true } func (s *StateDB) BeginTxStat(index int) { @@ -2379,7 +2388,9 @@ func (s *StateDB) BeginTxStat(index int) { if s.mvStates == nil { return } - s.es = types.NewExeStat(index).Begin() + if metrics.EnabledExpensive { + s.stat = types.NewExeStat(index).Begin() + } } func (s *StateDB) StopTxStat(usedGas uint64) { @@ -2390,8 +2401,8 @@ func (s *StateDB) StopTxStat(usedGas uint64) { return } // record stat first - if s.es != nil { - s.es.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet())) + if metrics.EnabledExpensive && s.stat != nil { + s.stat.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet())) } } @@ -2399,7 +2410,7 @@ func (s *StateDB) RecordRead(key types.RWKey, val interface{}) { if s.isParallel && s.parallel.isSlotDB { return } - if s.mvStates == nil || s.rwSet == nil { + if !s.rwRecordFlag { return } // TODO: read from MVStates, record with ver @@ -2412,7 +2423,7 @@ func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) { if s.isParallel && s.parallel.isSlotDB { return } - if s.mvStates == nil || s.rwSet == nil { + if !s.rwRecordFlag { return } s.rwSet.RecordWrite(key, val) @@ -2424,13 +2435,14 @@ func (s *StateDB) ResetMVStates(txCount int) { } s.mvStates = types.NewMVStates(txCount) s.rwSet = nil + s.rwRecordFlag = false } func (s *StateDB) FinaliseRWSet() error { if s.isParallel && s.parallel.isSlotDB { return nil } - if s.mvStates == nil || s.rwSet == nil { + if !s.rwRecordFlag { return nil } if metrics.EnabledExpensive { @@ -2468,7 +2480,8 @@ func (s *StateDB) FinaliseRWSet() error { } } - return s.mvStates.FulfillRWSet(s.rwSet, s.es) + s.rwRecordFlag = false + return s.mvStates.FulfillRWSet(s.rwSet, s.stat) } func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) { @@ -2498,7 +2511,7 @@ func (s *StateDB) deleteStateObjectsDestruct(addr common.Address) { delete(s.stateObjectsDestruct, addr) } -func (s *StateDB) MVStates2TxDAG() (types.TxDAG, map[int]*types.ExeStat) { +func (s *StateDB) ResolveTxDAG(gasFeeReceivers []common.Address) (types.TxDAG, map[int]*types.ExeStat) { if s.isParallel && s.parallel.isSlotDB { return nil, nil } @@ -2511,7 +2524,7 @@ func (s *StateDB) MVStates2TxDAG() (types.TxDAG, map[int]*types.ExeStat) { }(time.Now()) } - return s.mvStates.ResolveTxDAG(), s.mvStates.Stats() + return s.mvStates.ResolveTxDAG(gasFeeReceivers), s.mvStates.Stats() } func (s *StateDB) MVStates() *types.MVStates { @@ -2600,7 +2613,7 @@ func (s *StateDB) AddrPrefetch(slotDb *ParallelStateDB) { // MergeSlotDB is for Parallel execution mode, when the transaction has been // finalized(dirty -> pending) on execution slot, the execution results should be // merged back to the main StateDB. -func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receipt, txIndex int) *StateDB { +func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receipt, txIndex int, fees *DelayedGasFee) *StateDB { s.SetTxContext(slotDb.thash, slotDb.txIndex) for s.nextRevisionId < slotDb.nextRevisionId { diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index ee8ed7da0b..35cfe36fad 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -1541,7 +1541,7 @@ func TestMergeSlotDB(t *testing.T) { newSlotDb.SelfDestruct(addr) newSlotDb.Finalise(true) - changeList := oldSlotDb.MergeSlotDB(newSlotDb, &types.Receipt{}, 0) + changeList := oldSlotDb.MergeSlotDB(newSlotDb, &types.Receipt{}, 0, nil) if ok := changeList.getDeletedStateObject(addr); ok == nil || !ok.selfDestructed { t.Fatalf("address should exist in StateObjectSuicided") diff --git a/core/state_processor.go b/core/state_processor.go index 9dff86489f..2498448a0f 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -22,6 +22,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -125,10 +127,12 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg if p.bc.enableTxDAG { // TODO(galaio): append dag into block body, TxDAGPerformance will print metrics when profile is enabled // compare input TxDAG when it enable in consensus - dag, exrStats := statedb.MVStates2TxDAG() - fmt.Print(types.EvaluateTxDAGPerformance(dag, exrStats)) - //log.Info("Process result", "block", block.NumberU64(), "txDAG", dag) - // try write txDAG into file + dag, extraStats := statedb.ResolveTxDAG([]common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) + log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag) + if metrics.EnabledExpensive { + types.EvaluateTxDAGPerformance(dag, extraStats) + } + // try to write txDAG into file if p.bc.txDAGWriteCh != nil && dag != nil { p.bc.txDAGWriteCh <- TxDAGOutputItem{ blockNumber: block.NumberU64(), diff --git a/core/state_transition.go b/core/state_transition.go index 4a95d0c5f1..45a3003950 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -19,6 +19,7 @@ package core import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/core/state" "math" "math/big" "time" @@ -40,6 +41,7 @@ type ExecutionResult struct { RefundedGas uint64 // Total gas refunded after execution Err error // Any error encountered during the execution(listed in core/vm/errors.go) ReturnData []byte // Returned data from evm(function result or data supplied with revert opcode) + delayFees *state.DelayedGasFee } // Unwrap returns the internal evm error which allows us for further @@ -196,6 +198,12 @@ func ApplyMessage(evm *vm.EVM, msg *Message, gp *GasPool) (*ExecutionResult, err return NewStateTransition(evm, msg, gp).TransitionDb() } +func ApplyMessageDelayGasFee(evm *vm.EVM, msg *Message, gp *GasPool) (*ExecutionResult, error) { + transition := NewStateTransition(evm, msg, gp) + transition.delayGasFee = true + return transition.TransitionDb() +} + // StateTransition represents a state transition. // // == The State Transitioning Model @@ -225,6 +233,7 @@ type StateTransition struct { initialGas uint64 state vm.StateDB evm *vm.EVM + delayGasFee bool } // NewStateTransition initialises and returns a new state transition object. @@ -540,11 +549,17 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { }, nil } + var ( + tipFee *big.Int + baseFee *big.Int + l1Fee *big.Int + ) effectiveTip := msg.GasPrice if rules.IsLondon { effectiveTip = cmath.BigMin(msg.GasTipCap, new(big.Int).Sub(msg.GasFeeCap, st.evm.Context.BaseFee)) } + // delay gas fee calculation, provide from TxDAG if st.evm.Config.NoBaseFee && msg.GasFeeCap.Sign() == 0 && msg.GasTipCap.Sign() == 0 { // Skip fee payment when NoBaseFee is set and the fee fields // are 0. This avoids a negative effectiveTip being applied to @@ -552,13 +567,22 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { } else { fee := new(big.Int).SetUint64(st.gasUsed()) fee.Mul(fee, effectiveTip) - st.state.AddBalance(st.evm.Context.Coinbase, fee) + if st.delayGasFee { + tipFee = fee + } else { + st.state.AddBalance(st.evm.Context.Coinbase, fee) + } } // Check that we are post bedrock to enable op-geth to be able to create pseudo pre-bedrock blocks (these are pre-bedrock, but don't follow l2 geth rules) // Note optimismConfig will not be nil if rules.IsOptimismBedrock is true if optimismConfig := st.evm.ChainConfig().Optimism; optimismConfig != nil && rules.IsOptimismBedrock && !st.msg.IsDepositTx { - st.state.AddBalance(params.OptimismBaseFeeRecipient, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.evm.Context.BaseFee)) + fee := new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.evm.Context.BaseFee) + if st.delayGasFee { + baseFee = fee + } else { + st.state.AddBalance(params.OptimismBaseFeeRecipient, fee) + } var l1Cost *big.Int if st.msg.GasPrice.Cmp(big.NewInt(0)) == 0 && st.evm.ChainConfig().IsWright(st.evm.Context.Time) { l1Cost = big.NewInt(0) @@ -566,7 +590,11 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { l1Cost = st.evm.Context.L1CostFunc(st.msg.RollupCostData, st.evm.Context.Time) } if l1Cost != nil { - st.state.AddBalance(params.OptimismL1FeeRecipient, l1Cost) + if st.delayGasFee { + l1Fee = l1Cost + } else { + st.state.AddBalance(params.OptimismL1FeeRecipient, l1Cost) + } } } return &ExecutionResult{ @@ -574,6 +602,12 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { RefundedGas: gasRefund, Err: vmerr, ReturnData: ret, + delayFees: &state.DelayedGasFee{ + TipFee: tipFee, + BaseFee: baseFee, + L1Fee: l1Fee, + Coinbase: st.evm.Context.Coinbase, + }, }, nil } diff --git a/core/types/dag.go b/core/types/dag.go index dbd7cf9ea8..1cbd87e431 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -29,7 +29,6 @@ type TxDAG interface { DelayGasDistribution() bool // TxDep query TxDeps from TxDAG - // TODO(galaio): txDAG must convert to dependency relation TxDep(int) TxDep // TxCount return tx count @@ -269,27 +268,18 @@ var ( longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil) serialTimeTimer = metrics.NewRegisteredTimer("dag/serialtime", nil) totalTxMeter = metrics.NewRegisteredMeter("dag/txcnt", nil) - totalNoDepMeter = metrics.NewRegisteredMeter("dag/nodepcntcnt", nil) - total2DepMeter = metrics.NewRegisteredMeter("dag/2depcntcnt", nil) - total4DepMeter = metrics.NewRegisteredMeter("dag/4depcntcnt", nil) - total8DepMeter = metrics.NewRegisteredMeter("dag/8depcntcnt", nil) - total16DepMeter = metrics.NewRegisteredMeter("dag/16depcntcnt", nil) - total32DepMeter = metrics.NewRegisteredMeter("dag/32depcntcnt", nil) + totalNoDepMeter = metrics.NewRegisteredMeter("dag/nodepcnt", nil) + total2DepMeter = metrics.NewRegisteredMeter("dag/2depcnt", nil) + total4DepMeter = metrics.NewRegisteredMeter("dag/4depcnt", nil) + total8DepMeter = metrics.NewRegisteredMeter("dag/8depcnt", nil) + total16DepMeter = metrics.NewRegisteredMeter("dag/16depcnt", nil) + total32DepMeter = metrics.NewRegisteredMeter("dag/32depcnt", nil) ) -func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { +func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { if len(stats) != dag.TxCount() || dag.TxCount() == 0 { - return "" + return } - sb := strings.Builder{} - //sb.WriteString("TxDAG:\n") - //for i, dep := range dag.TxDeps { - // if stats[i].mustSerialFlag { - // continue - // } - // sb.WriteString(fmt.Sprintf("%v: %v\n", i, dep.TxIndexes)) - //} - //sb.WriteString("Parallel Execution Path:\n") paths := travelTxDAGExecutionPaths(dag) // Attention: this is based on best schedule, it will reduce a lot by executing previous txs in parallel // It assumes that there is no parallel thread limit @@ -347,8 +337,6 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { txGases[i] += stats[i].usedGas txReads[i] += stats[i].readCount - //sb.WriteString(fmt.Sprintf("Tx%v, %.2fms|%vgas|%vreads\npath: %v\n", i, float64(txTimes[i].Microseconds())/1000, txGases[i], txReads[i], path)) - //sb.WriteString(fmt.Sprintf("%v: %v\n", i, path)) // try to find max gas if txGases[i] > maxGas { maxGas = txGases[i] @@ -360,8 +348,6 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { } } - sb.WriteString(fmt.Sprintf("LargestGasPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxGasIndex].Microseconds())/1000, txGases[maxGasIndex], txReads[maxGasIndex], paths[maxGasIndex])) - sb.WriteString(fmt.Sprintf("LongestTimePath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxTimeIndex].Microseconds())/1000, txGases[maxTimeIndex], txReads[maxTimeIndex], paths[maxTimeIndex])) longestTimeTimer.Update(txTimes[maxTimeIndex]) longestGasTimer.Update(txTimes[maxGasIndex]) // serial path @@ -380,16 +366,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { sGas += stat.usedGas sRead += stat.readCount } - if sTime == 0 { - return "" - } - sb.WriteString(fmt.Sprintf("SerialPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(sTime.Microseconds())/1000, sGas, sRead, sPath)) - maxParaTime := txTimes[maxTimeIndex] - sb.WriteString(fmt.Sprintf("Estimated saving: %.2fms, %.2f%%, %.2fX, noDepCnt: %v|%.2f%%\n", - float64((sTime-maxParaTime).Microseconds())/1000, float64(sTime-maxParaTime)/float64(sTime)*100, - float64(sTime)/float64(maxParaTime), noDepdencyCount, float64(noDepdencyCount)/float64(txCount)*100)) serialTimeTimer.Update(sTime) - return sb.String() } // travelTxDAGTargetPath will print target execution path diff --git a/core/types/dag_test.go b/core/types/dag_test.go index bfda2de6e3..b83da5f5fb 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -33,7 +33,7 @@ func TestEvaluateTxDAG(t *testing.T) { stats[i].WithSerialFlag() } } - t.Log(EvaluateTxDAGPerformance(dag, stats)) + EvaluateTxDAGPerformance(dag, stats) } func TestSimpleMVStates2TxDAG(t *testing.T) { @@ -50,7 +50,7 @@ func TestSimpleMVStates2TxDAG(t *testing.T) { ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) - dag := ms.ResolveTxDAG() + dag := ms.ResolveTxDAG(nil) require.Equal(t, mockSimpleDAG(), dag) t.Log(dag) } @@ -71,7 +71,7 @@ func TestSystemTxMVStates2TxDAG(t *testing.T) { ms.rwSets[10] = mockRWSet(10, []string{"0x10"}, []string{"0x10"}).WithSerialFlag() ms.rwSets[11] = mockRWSet(11, []string{"0x11"}, []string{"0x11"}).WithSerialFlag() - dag := ms.ResolveTxDAG() + dag := ms.ResolveTxDAG(nil) require.Equal(t, mockSystemTxDAG(), dag) t.Log(dag) } diff --git a/core/types/mvstates.go b/core/types/mvstates.go index 2584807467..5b7580df1b 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -339,7 +339,7 @@ func (s *MVStates) ReadState(key RWKey) (interface{}, bool) { } // FulfillRWSet it can execute as async, and rwSet & stat must guarantee read-only -// TODO(galaio): try to generate TxDAG, when fulfill RWSet +// try to generate TxDAG, when fulfill RWSet // TODO(galaio): support flag to stat execution as optional func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { log.Debug("FulfillRWSet", "s.len", len(s.rwSets), "cur", rwSet.ver.TxIndex, "reads", len(rwSet.readSet), "writes", len(rwSet.writeSet)) @@ -382,7 +382,6 @@ func (s *MVStates) resolveDepsCache(index int, rwSet *RWSet) { if _, ok := s.rwSets[prev]; !ok { continue } - // TODO: check if there are RW with system address for gas delay calculation // check if there has written op before i if checkDependency(s.rwSets[prev].writeSet, rwSet.readSet) { s.depsCache[index].add(prev) @@ -420,10 +419,16 @@ func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, w } // ResolveTxDAG generate TxDAG from RWSets -func (s *MVStates) ResolveTxDAG() TxDAG { +func (s *MVStates) ResolveTxDAG(gasFeeReceivers []common.Address) TxDAG { rwSets := s.RWSets() txDAG := NewPlainTxDAG(len(rwSets)) for i := len(rwSets) - 1; i >= 0; i-- { + // check if there are RW with gas fee receiver for gas delay calculation + for _, addr := range gasFeeReceivers { + if _, ok := rwSets[i].readSet[AccountStateKey(addr, AccountSelf)]; ok { + return NewEmptyTxDAG() + } + } txDAG.TxDeps[i].TxIndexes = []uint64{} if rwSets[i].mustSerial { txDAG.TxDeps[i].Relation = 1 diff --git a/eth/backend.go b/eth/backend.go index d1fa93277e..bfb9a8c86c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -270,7 +270,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } if config.EnableParallelTxDAG { - eth.blockchain.EnableTxDAGGeneration(config.ParallelTxDAGFile) + eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile) } if chainConfig := eth.blockchain.Config(); chainConfig.Optimism != nil { // config.Genesis.Config.ChainID cannot be used because it's based on CLI flags only, thus default to mainnet L1 config.NetworkId = chainConfig.ChainID.Uint64() // optimism defaults eth network ID to chain ID diff --git a/miner/worker.go b/miner/worker.go index 0a732f117d..34b94acf16 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1271,7 +1271,7 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { // Because the TxDAG appends after sidecar, so we only enable after cancun if w.chain.TxDAGEnabled() && w.chainConfig.IsCancun(block.Number(), block.Time()) && w.chainConfig.Optimism == nil { - txDAG, _ := work.state.MVStates2TxDAG() + txDAG, _ := work.state.ResolveTxDAG([]common.Address{work.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) rawTxDAG, err := types.EncodeTxDAG(txDAG) if err != nil { return &newPayloadResult{err: err} @@ -1281,7 +1281,7 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { // TODO(galaio): need hardfork if w.chain.TxDAGEnabled() && w.chainConfig.Optimism != nil { - txDAG, _ := work.state.MVStates2TxDAG() + txDAG, _ := work.state.ResolveTxDAG([]common.Address{work.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) rawTxDAG, err := types.EncodeTxDAG(txDAG) if err != nil { return &newPayloadResult{err: err} @@ -1401,7 +1401,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti for i := len(env.txs); i < len(block.Transactions()); i++ { env.state.RecordSystemTxRWSet(i) } - txDAG, _ := env.state.MVStates2TxDAG() + txDAG, _ := env.state.ResolveTxDAG([]common.Address{env.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) rawTxDAG, err := types.EncodeTxDAG(txDAG) if err != nil { return err @@ -1411,7 +1411,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // TODO(galaio): need hardfork if w.chain.TxDAGEnabled() && w.chainConfig.Optimism != nil { - txDAG, _ := env.state.MVStates2TxDAG() + txDAG, _ := env.state.ResolveTxDAG([]common.Address{env.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) rawTxDAG, err := types.EncodeTxDAG(txDAG) if err != nil { return err diff --git a/tests/block_test.go b/tests/block_test.go index 9a1c1c519a..c540cc0cdc 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -17,7 +17,10 @@ package tests import ( + "fmt" "math/rand" + "os" + "path/filepath" "runtime" "testing" @@ -26,6 +29,38 @@ import ( "github.com/ethereum/go-ethereum/common" ) +func TestBlockchainWithTxDAG(t *testing.T) { + bt := new(testMatcher) + // General state tests are 'exported' as blockchain tests, but we can run them natively. + // For speedier CI-runs, the line below can be uncommented, so those are skipped. + // For now, in hardfork-times (Berlin), we run the tests both as StateTests and + // as blockchain tests, since the latter also covers things like receipt root + bt.skipLoad(`^GeneralStateTests/`) + + // Skip random failures due to selfish mining test + bt.skipLoad(`.*bcForgedTest/bcForkUncle\.json`) + + // Slow tests + bt.slow(`.*bcExploitTest/DelegateCallSpam.json`) + bt.slow(`.*bcExploitTest/ShanghaiLove.json`) + bt.slow(`.*bcExploitTest/SuicideIssue.json`) + bt.slow(`.*/bcForkStressTest/`) + bt.slow(`.*/bcGasPricerTest/RPC_API_Test.json`) + bt.slow(`.*/bcWalletTest/`) + + // Very slow test + bt.skipLoad(`.*/stTimeConsuming/.*`) + // test takes a lot for time and goes easily OOM because of sha3 calculation on a huge range, + // using 4.6 TGas + bt.skipLoad(`.*randomStatetest94.json.*`) + + bt.walk(t, blockTestDir, func(t *testing.T, name string, test *BlockTest) { + if runtime.GOARCH == "386" && runtime.GOOS == "windows" && rand.Int63()%2 == 0 { + t.Skip("test (randomly) skipped on 32-bit windows") + } + execBlockTestWithTxDAG(t, bt, test) + }) +} func TestBlockchain(t *testing.T) { bt := new(testMatcher) // General state tests are 'exported' as blockchain tests, but we can run them natively. @@ -74,20 +109,37 @@ func TestExecutionSpec(t *testing.T) { }) } +func execBlockTestWithTxDAG(t *testing.T, bt *testMatcher, test *BlockTest) { + txDAGFile := filepath.Join(os.TempDir(), fmt.Sprintf("test_txdag_%s.csv", t.Name())) + if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, nil, txDAGFile, false)); err != nil { + t.Errorf("test in path mode with snapshotter failed: %v", err) + return + } + + // run again with dagFile + if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, nil, txDAGFile, true)); err != nil { + t.Errorf("test in path mode with snapshotter failed: %v", err) + return + } + + // clean + os.Remove(txDAGFile) +} + func execBlockTest(t *testing.T, bt *testMatcher, test *BlockTest) { - if err := bt.checkFailure(t, test.Run(false, rawdb.HashScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(false, rawdb.HashScheme, nil, nil, "", true)); err != nil { t.Errorf("test in hash mode without snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(true, rawdb.HashScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(true, rawdb.HashScheme, nil, nil, "", true)); err != nil { t.Errorf("test in hash mode with snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(false, rawdb.PathScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(false, rawdb.PathScheme, nil, nil, "", true)); err != nil { t.Errorf("test in path mode without snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, nil, "", true)); err != nil { t.Errorf("test in path mode with snapshotter failed: %v", err) return } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 2d2b490411..837d257ef0 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -108,7 +108,7 @@ type btHeaderMarshaling struct { ExcessBlobGas *math.HexOrDecimal64 } -func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, postCheck func(error, *core.BlockChain)) (result error) { +func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, postCheck func(error, *core.BlockChain), dagFile string, enableParallel bool) (result error) { config, ok := Forks[t.json.Network] if !ok { return UnsupportedForkError{t.json.Network} @@ -150,7 +150,7 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, po cache.SnapshotWait = true } chain, err := core.NewBlockChain(db, cache, gspec, nil, engine, vm.Config{ - EnableParallelExec: true, + EnableParallelExec: enableParallel, ParallelTxNum: 4, Tracer: tracer, }, nil, nil) @@ -158,7 +158,9 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, po return err } defer chain.Stop() - + if len(dagFile) > 0 { + chain.SetupTxDAGGeneration(dagFile) + } validBlocks, err := t.insertBlocks(chain) if err != nil { return err