Skip to content

Commit

Permalink
Remove megabundles as they are no longer needed (ethereum#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruteri authored and avalonche committed Feb 6, 2023
1 parent 5d394c9 commit cb82f07
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 228 deletions.
11 changes: 2 additions & 9 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ jobs:
uses: actions/checkout@v2
with:
repository: flashbots/mev-geth-demo
ref: no-megabundles
path: e2e

- run: cd e2e && yarn install
Expand All @@ -71,17 +72,9 @@ jobs:
cd e2e
GETH=`pwd`/../build/bin/geth ./run.sh &
# Second node, not mining
P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh &
P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 AUTH_PORT=8552 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
DATADIR1=datadir DATADIR2=datadir2 GETH=`pwd`/../build/bin/geth ./peer_nodes.sh
sleep 15
yarn run demo-private-tx
pkill -9 geth || true
- name: Run megabundle-only node checking for reverts
run: |
cd e2e
# Disable bundle workers
MINER_ARGS='--miner.etherbase=0xd912aecb07e9f4e1ea8e6b4779e7fb6aa1c3e4d8 --miner.trustedrelays=0xfb11e78C4DaFec86237c2862441817701fdf197F --mine --miner.threads=2 --miner.maxmergedbundles=0' GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
yarn run e2e-reverting-megabundle
pkill -9 geth || true
53 changes: 0 additions & 53 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,59 +633,6 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
return nil
}

// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already.
func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
pool.mu.Lock()
defer pool.mu.Unlock()

fromTrustedRelay := false
for _, trustedAddr := range pool.config.TrustedRelays {
if relayAddr == trustedAddr {
fromTrustedRelay = true
}
}
if !fromTrustedRelay {
return errors.New("megabundle from non-trusted address")
}

megabundle := types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
}

pool.megabundles[relayAddr] = megabundle

for _, hook := range pool.NewMegabundleHooks {
go hook(relayAddr, &megabundle)
}

return nil
}

// GetMegabundle returns the latest megabundle submitted by a given relay.
func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) {
pool.mu.Lock()
defer pool.mu.Unlock()

megabundle, ok := pool.megabundles[relayAddr]
if !ok {
return types.MevBundle{}, errors.New("No megabundle found")
}
if megabundle.BlockNumber.Cmp(blockNumber) != 0 {
return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints")
}
if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints")
}
if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints")
}
return megabundle, nil
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 0 additions & 4 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,6 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions,
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
var txs types.Transactions
Expand Down
74 changes: 0 additions & 74 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,25 +2140,6 @@ type SendBundleArgs struct {
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
}

// SendMegabundleArgs represents the arguments for a SendMegabundle call.
type SendMegabundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
BlockNumber uint64 `json:"blockNumber"`
MinTimestamp *uint64 `json:"minTimestamp"`
MaxTimestamp *uint64 `json:"maxTimestamp"`
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
RelaySignature hexutil.Bytes `json:"relaySignature"`
}

// UnsignedMegabundle is used for serialization and subsequent digital signing.
type UnsignedMegabundle struct {
Txs []hexutil.Bytes
BlockNumber uint64
MinTimestamp uint64
MaxTimestamp uint64
RevertingTxHashes []common.Hash
}

// SendBundle will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error {
Expand Down Expand Up @@ -2189,61 +2170,6 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs
return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes)
}

// Recovers the Ethereum address of the trusted relay that signed the megabundle.
func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) {
megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes}
if args.MinTimestamp != nil {
megabundle.MinTimestamp = *args.MinTimestamp
} else {
megabundle.MinTimestamp = 0
}
if args.MaxTimestamp != nil {
megabundle.MaxTimestamp = *args.MaxTimestamp
} else {
megabundle.MaxTimestamp = 0
}
rlpEncoding, _ := rlp.EncodeToBytes(megabundle)
signature := args.RelaySignature
signature[64] -= 27 // account for Ethereum V
recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature)
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*recoveredPubkey), nil
}

// SendMegabundle will add the signed megabundle to one of the workers for evaluation.
func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error {
log.Info("Received a Megabundle request", "signature", args.RelaySignature)
var txs types.Transactions
if len(args.Txs) == 0 {
return errors.New("megabundle missing txs")
}
if args.BlockNumber == 0 {
return errors.New("megabundle missing blockNumber")
}
for _, encodedTx := range args.Txs {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(encodedTx); err != nil {
return err
}
txs = append(txs, tx)
}
var minTimestamp, maxTimestamp uint64
if args.MinTimestamp != nil {
minTimestamp = *args.MinTimestamp
}
if args.MaxTimestamp != nil {
maxTimestamp = *args.MaxTimestamp
}
relayAddr, err := RecoverRelayAddress(args)
log.Info("Megabundle", "relayAddr", relayAddr, "err", err)
if err != nil {
return err
}
return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr)
}

// BundleAPI offers an API for accepting bundled transactions
type BundleAPI struct {
b Backend
Expand Down
1 change: 0 additions & 1 deletion internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type Backend interface {
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error
SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error
SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
Expand Down
5 changes: 0 additions & 5 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,6 @@ web3._extend({
params: 3,
inputFormatter: [web3._extend.formatters.inputCallFormatter, web3._extend.formatters.inputDefaultBlockNumberFormatter, null],
}),
new web3._extend.Method({
name: 'sendMegabundle',
call: 'eth_sendMegabundle',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
43 changes: 8 additions & 35 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (w *multiWorker) GetSealingBlockAsync(parent common.Hash, timestamp uint64,
for _, worker := range append(w.workers, w.regularWorker) {
resCh, errCh, err := worker.getSealingBlock(parent, timestamp, coinbase, gasLimit, random, noTxs, noExtra)
if err != nil {
log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "isMegabundleWorker", worker.flashbots.isMegabundleWorker, "#bundles", worker.flashbots.maxMergedBundles)
log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "#bundles", worker.flashbots.maxMergedBundles)
continue
}
resChans = append(resChans, resChPair{resCh, errCh})
Expand Down Expand Up @@ -153,38 +153,12 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
for i := 1; i <= config.MaxMergedBundles; i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: false,
queue: queue,
maxMergedBundles: i,
isFlashbots: true,
queue: queue,
maxMergedBundles: i,
}))
}

relayWorkerMap := make(map[common.Address]*worker)

for i := 0; i < len(config.TrustedRelays); i++ {
relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
})
workers = append(workers, relayWorker)
relayWorkerMap[config.TrustedRelays[i]] = relayWorker
}

eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) {
worker, found := relayWorkerMap[relayAddr]
if !found {
return
}

select {
case worker.newMegabundleCh <- megabundle:
default:
}
})

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
return &multiWorker{
regularWorker: regularWorker,
Expand All @@ -193,9 +167,8 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
}

type flashbotsData struct {
isFlashbots bool
isMegabundleWorker bool
queue chan *task
maxMergedBundles int
relayAddr common.Address
isFlashbots bool
queue chan *task
maxMergedBundles int
relayAddr common.Address
}
53 changes: 6 additions & 47 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,9 @@ type task struct {
block *types.Block
createdAt time.Time

profit *big.Int
isFlashbots bool
worker int
isMegabundle bool
profit *big.Int
isFlashbots bool
worker int
}

const (
Expand Down Expand Up @@ -233,7 +232,6 @@ type worker struct {
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
newMegabundleCh chan *types.MevBundle

wg sync.WaitGroup

Expand Down Expand Up @@ -583,11 +581,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)

case <-w.newMegabundleCh:
if w.isRunning() {
commit(true, commitInterruptNone)
}

case <-timer.C:
// If sealing is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
Expand Down Expand Up @@ -796,7 +789,7 @@ func (w *worker) taskLoop() {
// Interrupt previous sealing operation
interrupt()
stopCh, prev = make(chan struct{}), sealHash
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle)
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker)
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
Expand Down Expand Up @@ -1304,7 +1297,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
return err
}
}
if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker {
if w.flashbots.isFlashbots {
bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
Expand All @@ -1325,40 +1318,6 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
}
env.profit.Add(env.profit, bundle.ethSentToCoinbase)
}
if w.flashbots.isMegabundleWorker {
megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time)
log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err)
if err != nil {
return err // no valid megabundle for this relay, nothing to do
}

// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
// Megabundles API focuses on speed and runs everything in one cycle.
coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)
if err := w.commitBundle(env, megabundle.Txs, interrupt); err != nil {
log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "err", err)
return err
}
var txStatuses = map[common.Hash]bool{}
for _, receipt := range env.receipts {
txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful
}
for _, tx := range megabundle.Txs {
status, ok := txStatuses[tx.Hash()]
if !ok {
log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash())
return errors.New("no tx receipt after megabundle simulation")
}
if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) {
log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash())
return errors.New("megabundle contains failing tx")
}
}
coinbaseBalanceAfter := env.state.GetBalance(env.coinbase)
coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore)
env.profit = coinbaseDelta
log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit))
}

if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
Expand Down Expand Up @@ -1519,7 +1478,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
// If we're post merge, just ignore
if !w.isTTDReached(block.Header()) {
select {
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}:
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}:
w.unconfirmed.Shift(block.NumberU64() - 1)

fees := totalFees(block, env.receipts)
Expand Down

0 comments on commit cb82f07

Please sign in to comment.