Skip to content

Commit

Permalink
have batcher submit appropriate cancellation transactions when mempoo…
Browse files Browse the repository at this point in the history
…l is blocked (#10941)

* have batcher submit appropriate cancellation transactions when mempool is blocked

* use a txRef type with a isCancel indicator instead of a magic channel id indicator
  • Loading branch information
roberto-bayardo authored Jul 18, 2024
1 parent 4b80464 commit b7f8188
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 50 deletions.
93 changes: 81 additions & 12 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"io"
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
Expand All @@ -18,11 +20,26 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

var ErrBatcherNotRunning = errors.New("batcher is not running")
var (
ErrBatcherNotRunning = errors.New("batcher is not running")
emptyTxData = txData{
frames: []frameData{
frameData{
data: []byte{},
},
},
}
)

type txRef struct {
id txID
isCancel bool
}

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
Expand All @@ -43,7 +60,7 @@ type DriverSetup struct {
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
Txmgr *txmgr.SimpleTxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
Expand Down Expand Up @@ -255,6 +272,20 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Submitted batch, but it is not valid
// Missed L2 block somehow.

const (
// Txpool states. Possible state transitions:
// TxpoolGood -> TxpoolBlocked:
// happens when ErrAlreadyReserved is ever returned by the TxMgr.
// TxpoolBlocked -> TxpoolCancelPending:
// happens once the send loop detects the txpool is blocked, and results in attempting to
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int32 = iota
TxpoolBlocked
TxpoolCancelPending
)

func (l *BatchSubmitter) loop() {
defer l.wg.Done()
if l.Config.WaitNodeSync {
Expand All @@ -265,16 +296,26 @@ func (l *BatchSubmitter) loop() {
}
}

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

var txpoolState atomic.Int32
txpoolState.Store(TxpoolGood)
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
Expand All @@ -299,6 +340,15 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh)
}
if txpoolState.Load() != TxpoolGood {
continue
}
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
Expand Down Expand Up @@ -371,7 +421,7 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
Expand Down Expand Up @@ -428,7 +478,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand Down Expand Up @@ -478,9 +528,24 @@ func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error)
return status.SafeL2.L1Origin, nil
}

// cancelBlockingTx creates an empty transaction of appropriate type to cancel out the incompatible
// transaction stuck in the txpool. In the future we might send an actual batch transaction instead
// of an empty one to avoid wasting the tx fee.
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool")
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

Expand Down Expand Up @@ -515,6 +580,11 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate = l.calldataTxCandidate(data)
}

l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
Expand All @@ -523,8 +593,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate.GasLimit = intrinsicGas
}

queue.Send(txdata.ID(), *candidate, receiptsCh)
return nil
queue.Send(txRef{txdata.ID(), isCancel}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand All @@ -551,12 +620,12 @@ func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate {
}
}

func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
// Record TX Status
if r.Err != nil {
l.recordFailedTx(r.ID, r.Err)
l.recordFailedTx(r.ID.id, r.Err)
} else {
l.recordConfirmedTx(r.ID, r.Receipt)
l.recordConfirmedTx(r.ID.id, r.Receipt)
}
}

Expand Down
12 changes: 7 additions & 5 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BatcherService struct {
Metrics metrics.Metricer
L1Client *ethclient.Client
EndpointProvider dial.L2EndpointProvider
TxManager txmgr.TxManager
TxManager *txmgr.SimpleTxManager
PlasmaDA *plasma.DAClient

BatcherConfig
Expand Down Expand Up @@ -426,8 +426,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {

var _ cliapp.Lifecycle = (*BatcherService)(nil)

// Driver returns the handler on the batch-submitter driver element,
// to start/stop/restart the batch-submission work, for use in testing.
func (bs *BatcherService) Driver() rpc.BatcherDriver {
return bs.driver
// TestDriver returns a handler for the batch-submitter driver element, to start/stop/restart the
// batch-submission work, for use only in testing.
func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
return &TestBatchSubmitter{
BatchSubmitter: bs.driver,
}
}
61 changes: 61 additions & 0 deletions op-batcher/batcher/test_batch_submitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package batcher

import (
"context"
"errors"
"strings"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"

"github.com/ethereum-optimism/optimism/op-service/txmgr"
)

type TestBatchSubmitter struct {
*BatchSubmitter
ttm *txmgr.TestTxManager
}

// JamTxPool is for testing ONLY. It sends a txpool-blocking transaction. This function must be
// called *before* the batcher starts submitting batches to ensure successful jamming, and will
// error out otherwise.
func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.running {
return errors.New("tried to jam tx pool but batcher is already running")
}
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
}
if candidate.GasLimit, err = core.IntrinsicGas(candidate.TxData, nil, false, true, true, false); err != nil {
return err
}

l.ttm = &txmgr.TestTxManager{
SimpleTxManager: l.Txmgr,
}
l.Log.Info("sending txpool blocking test tx")
if err := l.ttm.JamTxPool(ctx, *candidate); err != nil {
return err
}
return nil
}

// Wait on the jamming transaction, and return error if it completes successfully. (Tests should
// expect the blocking transaction to result in error from the context being cancelled.)
func (l *TestBatchSubmitter) WaitOnJammingTx(ctx context.Context) error {
err := l.ttm.WaitOnJammingTx(ctx)
if err == nil {
return errors.New("txpool blocking tx didn't block!")
}
if strings.Contains(err.Error(), txpool.ErrAlreadyReserved.Error()) {
return errors.New("txpool blocking tx failed because other tx in mempool is blocking it")
}
l.Log.Info("done waiting on jamming tx", "err", err)
return nil
}
Loading

0 comments on commit b7f8188

Please sign in to comment.