Skip to content

Commit

Permalink
feat: initial goroutine blob submission implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Aug 22, 2024
1 parent e51e887 commit ed95b85
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
var daWaitGroup sync.WaitGroup

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand Down Expand Up @@ -334,8 +335,11 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
if !l.Txmgr.IsClosed() {
l.Log.Info("Wait for pure DA writes, not L1 txs")
daWaitGroup.Wait()
l.Log.Info("Wait for L1 writes (blobs or DA commitments)")
queue.Wait()
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -369,7 +373,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -426,14 +430,14 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -483,7 +487,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand All @@ -503,7 +507,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daWaitGroup); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
return nil
Expand Down Expand Up @@ -550,9 +554,33 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if !txdata.asBlob && l.Config.UseAltDA {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while to post the txdata to the DA Provider.
go func() {
comm, err := l.AltDA.SetInput(ctx, txdata.CallData())
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return
}
l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
// signal altda commitment tx with TxDataVersion1
candidate := l.calldataTxCandidate(comm.TxData())
l.queueTx(txdata, false, candidate, queue, receiptsCh)
}()
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
Expand All @@ -568,21 +596,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
comm, err := l.AltDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return nil
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// signal AltDA commitment tx with TxDataVersion1
data = comm.TxData()
}
candidate = l.calldataTxCandidate(data)
candidate = l.calldataTxCandidate(txdata.CallData())
}

l.queueTx(txdata, false, candidate, queue, receiptsCh)
Expand Down

0 comments on commit ed95b85

Please sign in to comment.