Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: op-batcher auto switch to economic DA type #209

Merged
merged 15 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"sync"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -225,6 +226,21 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) {
s.mu.Lock()
defer s.mu.Unlock()
switch targetDAType {
case flags.BlobsType:
s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1
s.cfg.MultiFrameTxs = true
case flags.CalldataType:
s.cfg.MaxFrameSize = CallDataMaxTxSize - 1
s.cfg.MultiFrameTxs = false
default:
s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// registerL1Block registers the given block at the pending channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (c *CLIConfig) Check() error {
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
Expand Down
169 changes: 168 additions & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,39 @@ import (
"io"
"math/big"
_ "net/http/pprof"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

const LimitLoadBlocksOneTime uint64 = 30

// Auto DA params
const DATypeSwitchThrehold int = 3
const CallDataMaxTxSize uint64 = 120000
const ApproximateGasPerCallDataTx int64 = 1934892
const MaxBlobsNumberPerTx int64 = 6

var ErrBatcherNotRunning = errors.New("batcher is not running")

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
}

type L2Client interface {
Expand All @@ -47,6 +61,7 @@ type DriverSetup struct {
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
PlasmaDA *plasma.DAClient
AutoSwitchDA bool
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand All @@ -68,6 +83,9 @@ type BatchSubmitter struct {
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef

// addressReservedError is recorded from L1 txpool, which may occur when switch DA type
addressReservedError atomic.Bool

state *channelManager
}

Expand Down Expand Up @@ -155,10 +173,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
} else if start.Number >= end.Number {
return errors.New("start number is >= end number")
}
// Limit the max loaded blocks one time
endNumber := end.Number
if endNumber-start.Number > LimitLoadBlocksOneTime {
endNumber = start.Number + LimitLoadBlocksOneTime
}

var latestBlock *types.Block
// Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ {
for i := start.Number + 1; i < endNumber+1; i++ {
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
Expand Down Expand Up @@ -272,6 +295,72 @@ func (l *BatchSubmitter) loop() {
}
}()

economicDATypeCh := make(chan flags.DataAvailabilityType)
if l.AutoSwitchDA {
// start auto choose economic DA type processing loop
economicDALoopDone := make(chan struct{})
defer close(economicDALoopDone) // shut down auto DA loop
go func() {
economicDAType := flags.BlobsType
l.Metr.RecordAutoChoosedDAType(economicDAType)
switchCount := 0
economicDATicker := time.NewTicker(5 * time.Second)
defer economicDATicker.Stop()
addressReservedErrorTicker := time.NewTicker(time.Second)
defer addressReservedErrorTicker.Stop()
for {
select {
case <-economicDATicker.C:
newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx)
if err != nil {
l.Log.Error("getEconomicDAType failed: %w", err)
continue
}
if newEconomicDAType != economicDAType {
switchCount++
} else {
switchCount = 0
}
if switchCount >= DATypeSwitchThrehold {
l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String())
start := time.Now()
economicDAType = newEconomicDAType
switchCount = 0
economicDATypeCh <- economicDAType
l.Log.Info("finish economic switch", "duration", time.Since(start))
l.Metr.RecordAutoChoosedDAType(economicDAType)
l.Metr.RecordEconomicAutoSwitchCount()
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
}
case <-addressReservedErrorTicker.C:
if l.addressReservedError.Load() {
if economicDAType == flags.BlobsType {
economicDAType = flags.CalldataType
l.Log.Info("start resolve addressReservedError switch", "from type", flags.BlobsType.String(), "to type", flags.CalldataType.String())
} else if economicDAType == flags.CalldataType {
economicDAType = flags.BlobsType
l.Log.Info("start resolve addressReservedError switch", "from type", flags.CalldataType.String(), "to type", flags.BlobsType.String())
} else {
l.Log.Crit("invalid DA type in economic switch loop", "invalid type", economicDAType.String())
}
switchCount = 0
start := time.Now()
economicDATypeCh <- economicDAType
l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start))
l.Metr.RecordAutoChoosedDAType(economicDAType)
l.Metr.RecordReservedErrorSwitchCount()
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
time.Sleep(time.Second) // stop to let last addressRservedError handled first
l.addressReservedError.Store(false)
}
case <-economicDALoopDone:
l.Log.Info("auto DA processing loop done")
return
}
}
}()
}

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -302,6 +391,24 @@ func (l *BatchSubmitter) loop() {
continue
}
l.publishStateToL1(queue, receiptsCh)
case targetDAType := <-economicDATypeCh:
l.lastStoredBlock = eth.BlockID{}
// close current state to prepare for switch
err := l.state.Close()
if err != nil {
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err)
}
}
// on DA type switch we want to publish all pending state then wait until each result clears before resetting
// the state.
publishAndWait()
l.clearState(l.shutdownCtx)
// switch action after clear state
l.switchDAType(targetDAType)
continue
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand All @@ -324,6 +431,54 @@ func (l *BatchSubmitter) loop() {
}
}

func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) {
sCtx, sCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer sCancel()
gasPrice, err := l.L1Client.SuggestGasTipCap(sCtx)
if err != nil {
return "", fmt.Errorf("getEconomicDAType failed to fetch the suggested gas tip cap: %w", err)
}
calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice)

hCtx, hCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer hCancel()
header, err := l.L1Client.HeaderByNumber(hCtx, nil)
if err != nil {
return "", fmt.Errorf("getEconomicDAType failed to fetch the latest header: %w", err)
}
if header.ExcessBlobGas == nil {
return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", header)
}
blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas)
blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice))

l.Metr.RecordEstimatedCalldataTypeFee(calldataCost)
l.Metr.RecordEstimatedBlobTypeFee(blobCost)
if calldataCost.Cmp(blobCost) < 0 {
l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.CalldataType, nil
}
l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.BlobsType, nil
}

func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) {
krish-nr marked this conversation as resolved.
Show resolved Hide resolved
switch targetDAType {
case flags.BlobsType:
l.Config.UseBlobs = true
l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1
l.ChannelConfig.MultiFrameTxs = true
l.state.SwitchDAType(targetDAType)
case flags.CalldataType:
l.Config.UseBlobs = false
l.ChannelConfig.MaxFrameSize = CallDataMaxTxSize - 1
l.ChannelConfig.MultiFrameTxs = false
l.state.SwitchDAType(targetDAType)
default:
l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
Expand Down Expand Up @@ -525,6 +680,9 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id)
if errStringMatch(err, txmgr.ErrAlreadyReserved) && l.AutoSwitchDA {
l.addressReservedError.Store(true)
}
}

func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
Expand Down Expand Up @@ -560,3 +718,12 @@ func logFields(xs ...any) (fs []any) {
}
return fs
}

func errStringMatch(err, target error) bool {
if err == nil && target == nil {
return true
} else if err == nil || target == nil {
return false
}
return strings.Contains(err.Error(), target.Error())
}
8 changes: 5 additions & 3 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPlasmaDA(cfg); err != nil {
return fmt.Errorf("failed to init plasma DA: %w", err)
}
bs.initDriver()
bs.initDriver(cfg)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
}

switch cfg.DataAvailabilityType {
case flags.BlobsType:
case flags.BlobsType, flags.AutoType:
welkin22 marked this conversation as resolved.
Show resolved Hide resolved
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
Expand Down Expand Up @@ -228,6 +228,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("invalid channel configuration: %w", err)
}
bs.Log.Info("Initialized channel-config",
"da_type", cfg.DataAvailabilityType.String(),
"use_blobs", bs.UseBlobs,
"use_plasma", bs.UsePlasma,
"max_frame_size", cc.MaxFrameSize,
Expand Down Expand Up @@ -286,7 +287,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDriver() {
func (bs *BatcherService) initDriver(cfg *CLIConfig) {
bs.driver = NewBatchSubmitter(DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
Expand All @@ -297,6 +298,7 @@ func (bs *BatcherService) initDriver() {
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
PlasmaDA: bs.PlasmaDA,
AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType,
})
}

Expand Down
2 changes: 2 additions & 0 deletions op-batcher/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const (
// data availability types
CalldataType DataAvailabilityType = "calldata"
BlobsType DataAvailabilityType = "blobs"
AutoType DataAvailabilityType = "auto"
)

var DataAvailabilityTypes = []DataAvailabilityType{
CalldataType,
BlobsType,
AutoType,
}

func (kind DataAvailabilityType) String() string {
Expand Down
Loading
Loading