Skip to content

Commit

Permalink
txdag: support write & read TxDAG from file; (#9)
Browse files Browse the repository at this point in the history
txdag: record txdag metrics;

txdag: opt txdag flag name;

Co-authored-by: galaio <[email protected]>
  • Loading branch information
2 people authored and sunny2022da committed Oct 11, 2024
1 parent 07d6a82 commit ff2d6d0
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 41 deletions.
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ var (
utils.RollupSuperchainUpgradesFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
configFileFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
Expand Down
21 changes: 21 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,19 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxDAGFlag = &cli.BoolFlag{
Name: "parallel.txdag",
Usage: "Enable the experimental parallel TxDAG generation, only valid in full sync mode (default = false)",
Category: flags.VMCategory,
}

ParallelTxDAGFileFlag = &cli.StringFlag{
Name: "parallel.txdagfile",
Usage: "It indicates the TxDAG file path",
Value: "./parallel-txdag-output.csv",
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -2017,6 +2030,14 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxNum = parallelNum
}

if ctx.IsSet(ParallelTxDAGFlag.Name) {
cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name)
}

if ctx.IsSet(ParallelTxDAGFileFlag.Name) {
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
}

if ctx.IsSet(VMOpcodeOptimizeFlag.Name) {
cfg.EnableOpcodeOptimizing = ctx.Bool(VMOpcodeOptimizeFlag.Name)
if cfg.EnableOpcodeOptimizing {
Expand Down
123 changes: 112 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package core

import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"math/big"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -91,6 +96,9 @@ var (
triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil)
txDAGDispatchTimer = metrics.NewRegisteredTimer("chain/block/txdag/dispatch", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)

Expand Down Expand Up @@ -297,6 +305,9 @@ type BlockChain struct {
forker *ForkChoice
vmConfig vm.Config
parallelExecution bool
enableTxDAG bool
txDAGWriteCh chan TxDAGOutputItem
txDAGMapping map[uint64]types.TxDAG
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1881,16 +1892,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

// TODO(galaio): use txDAG in some accelerate scenarios.
if len(block.TxDAG()) > 0 {
txDAG, err := types.DecodeTxDAG(block.TxDAG())
if err != nil {
return it.index, err
}
log.Info("Insert chain", "block", block.NumberU64(), "txDAG", txDAG.Type())
}
// TODO(galaio): use txDAG in some accelerate scenarios, like state pre-fetcher.
//if bc.enableTxDAG && len(block.TxDAG()) > 0 {
// txDAG, err := types.DecodeTxDAG(block.TxDAG())
// if err != nil {
// return it.index, err
// }
// log.Info("Insert chain", "block", block.NumberU64(), "txDAG", txDAG)
//}
// TODO(galaio): need hardfork
if bc.chainConfig.Optimism != nil && len(block.Header().Extra) > 0 {
if bc.enableTxDAG && bc.chainConfig.Optimism != nil && len(block.Header().Extra) > 0 {
txDAG, err := types.DecodeTxDAG(block.Header().Extra)
if err != nil {
return it.index, err
Expand Down Expand Up @@ -1952,8 +1963,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation
txDAGGenerateTimer.Update(statedb.TxDAGGenerate)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation

innerExecutionTimer.Update(DebugInnerExecutionDuration)

Expand Down Expand Up @@ -2689,3 +2701,92 @@ func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash,
func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}

func (bc *BlockChain) TxDAGEnabled() bool {
return bc.enableTxDAG
}

func (bc *BlockChain) EnableTxDAGGeneration(output string) {
bc.enableTxDAG = true
if len(output) == 0 {
return
}
// read TxDAG file, and cache in mem
var err error
bc.txDAGMapping, err = readTxDAGMappingFromFile(output)
if err != nil {
log.Error("read TxDAG err", err)
}

// write handler
bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000)
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output)
return
}
defer writeHandle.Close()
for {
select {
case <-bc.quit:
return
case item := <-bc.txDAGWriteCh:
if err := writeTxDAGToFile(writeHandle, item); err != nil {
log.Error("encode TxDAG err in OutputHandler", "err", err)
continue
}
}
}
}()
}

type TxDAGOutputItem struct {
blockNumber uint64
txDAG types.TxDAG
}

func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error {
var buf bytes.Buffer
buf.WriteString(strconv.FormatUint(item.blockNumber, 10))
buf.WriteByte(',')
enc, err := types.EncodeTxDAG(item.txDAG)
if err != nil {
return err
}
buf.WriteString(hex.EncodeToString(enc))
buf.WriteByte('\n')
_, err = writeHandle.Write(buf.Bytes())
return err
}

func readTxDAGMappingFromFile(output string) (map[uint64]types.TxDAG, error) {
file, err := os.Open(output)
if err != nil {
return nil, err
}
defer file.Close()

mapping := make(map[uint64]types.TxDAG)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
tokens := strings.Split(scanner.Text(), ",")
if len(tokens) != 2 {
return nil, errors.New("txDAG output contain wrong size")
}
num, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, err
}
enc, err := hex.DecodeString(tokens[1])
if err != nil {
return nil, err
}
txDAG, err := types.DecodeTxDAG(enc)
if err != nil {
return nil, err
}
mapping[uint64(num)] = txDAG
}
return mapping, nil
}
2 changes: 1 addition & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
}
mgaspsGauge.Update(int64(st.usedGas)*1000/int64(elapsed))
mgaspsGauge.Update(int64(st.usedGas) * 1000 / int64(elapsed))
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
Expand Down
32 changes: 32 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"math/big"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/consensus"
Expand Down Expand Up @@ -4717,3 +4720,32 @@ func TestEIP3651(t *testing.T) {
t.Fatalf("sender balance incorrect: expected %d, got %d", expected, actual)
}
}

func TestTxDAGFile_ReadWrite(t *testing.T) {
path := filepath.Join(os.TempDir(), "test.csv")
except := map[uint64]types.TxDAG{
0: types.NewEmptyTxDAG(),
1: makeEmptyPlainTxDAG(1),
2: makeEmptyPlainTxDAG(2),
}
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
require.NoError(t, err)
for num, dag := range except {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Close()

actual, err := readTxDAGMappingFromFile(path)
require.NoError(t, err)
for num, dag := range except {
require.Equal(t, dag, actual[num])
}
}

func makeEmptyPlainTxDAG(cnt int) *types.PlainTxDAG {
dag := types.NewPlainTxDAG(cnt)
for i := range dag.TxDeps {
dag.TxDeps[i].TxIndexes = make([]uint64, 0)
}
return dag
}
30 changes: 22 additions & 8 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package core
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand All @@ -11,10 +16,8 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"runtime"
"sync"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -189,6 +192,11 @@ func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest,
return
}

if metrics.EnabledExpensive {
defer func(start time.Time) {
txDAGDispatchTimer.Update(time.Since(start))
}(time.Now())
}
// resolve isolate execution paths from TxDAG, it indicates the tx dispatch
paths := types.MergeTxDAGExecutionPaths(txDAG)
log.Info("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths))
Expand Down Expand Up @@ -735,14 +743,20 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
txDAG types.TxDAG
err error
)
if len(block.TxDAG()) != 0 {
txDAG, err = types.DecodeTxDAG(block.TxDAG())
if err != nil {
return nil, nil, 0, err
if p.bc.enableTxDAG {
if len(block.TxDAG()) != 0 {
txDAG, err = types.DecodeTxDAG(block.TxDAG())
if err != nil {
return nil, nil, 0, err
}
}
// load cache txDAG from file
if txDAG == nil && len(p.bc.txDAGMapping) > 0 {
txDAG = p.bc.txDAGMapping[block.NumberU64()]
}
}
// TODO(galaio): need hardfork
if p.bc.chainConfig.Optimism != nil && len(block.Header().Extra) > 0 {
if p.bc.enableTxDAG && p.bc.chainConfig.Optimism != nil && len(block.Header().Extra) > 0 {
txDAG, err = types.DecodeTxDAG(block.Header().Extra)
if err != nil {
return nil, nil, 0, err
Expand Down
11 changes: 11 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type StateDB struct {
TrieDBCommits time.Duration
TrieCommits time.Duration
CodeCommits time.Duration
TxDAGGenerate time.Duration

AccountUpdated int
StorageUpdated int
Expand Down Expand Up @@ -2419,6 +2420,11 @@ func (s *StateDB) FinaliseRWSet() error {
if s.mvStates == nil || s.rwSet == nil {
return nil
}
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.TxDAGGenerate += time.Since(start)
}(time.Now())
}
ver := types.StateVersion{
TxIndex: s.txIndex,
}
Expand Down Expand Up @@ -2486,6 +2492,11 @@ func (s *StateDB) MVStates2TxDAG() (types.TxDAG, map[int]*types.ExeStat) {
if s.mvStates == nil {
return types.NewEmptyTxDAG(), nil
}
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.TxDAGGenerate += time.Since(start)
}(time.Now())
}

return s.mvStates.ResolveTxDAG(), s.mvStates.Stats()
}
Expand Down
Loading

0 comments on commit ff2d6d0

Please sign in to comment.