Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: non blocking da #213

Closed
wants to merge 4 commits into from
Closed

op-batcher: non blocking da #213

wants to merge 4 commits into from

Conversation

karlb
Copy link

@karlb karlb commented Aug 13, 2024

Avoid blocking op-batcher when write to the DA layer are very slow (as is the case for EigenDA proxy). This is done by moving the DA writes to separate goroutines.

See https://www.notion.so/clabsco/Non-blocking-DA-in-op-batcher-923aeb5ae3a24948875e7e041a9ad202
See https://github.com/celo-org/celo-blockchain-planning/issues/532

Slow writes to DA (e.g. writing to EigenDA proxy) will block the main
loop. To avoid this, the DA writes are moved to a goroutine managed by
an errgroup. If one write fails, all writes within the same
publishStateToL1 call are aborted, which is similar to the previous
error handling.
sendTransaction calls (which write to the DA not the L1, despite the
naming) communicate through Queue.Send and
BatchSubmitter.recordFailedTx, so moving it to a goroutine works fine.
publishAndWait is meant to wait until all remaining data is written.
Only waiting to the txs in the queue won't work, since those are only
created after the data has been written to the DA.
@karlb karlb marked this pull request as ready for review August 16, 2024 14:37
@ezdac ezdac self-assigned this Aug 19, 2024
Copy link

@ezdac ezdac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Nice and easy solution using the WaitGroup instead of some return value channel communication.

I added some comments and nit-picks.

op-batcher/batcher/driver.go Outdated Show resolved Hide resolved
op-batcher/batcher/driver.go Outdated Show resolved Hide resolved
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
daWaitGroup.Add(1)
errGroup.Go(func() error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In regards to the L1 tip in this function above:
	l1tip, err := l.l1Tip(ctx)
	if err != nil {
		l.Log.Error("Failed to query L1 tip", "err", err)
		return err
	}

This will potentially make a lot of successive eth_getHeaderByNumber requests to query for the latest block, since we spawn the DA send asynchronously and iterate quickly through the outer for loop that calls the publishTxToL1.
I don't think it's too important, but if we want to limit this without using a push based notification for new headers we could sleep a little bit after the errGroup.Go call. This could also help not spamming the DA node when there is a persistent error returned from PlasmaDA.SetInput.

  1. You raised the question in terms of resource caps:

I think this could easily be added by the errgroup.SetLimit. This would make the .Go call block when the limit is reached, which is a similar behaviour as before. We could set it to a high, arbitrary number at first and observe the behaviour. A log message for the limit case could be created by using something like:

if spawned := errGroup.TryGo( ... ); !spawned{
  log.Info("limit reached")
  errGroup.Go( ... )
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1. regarding "This could also help not spamming the DA node when there is a persistent error returned from PlasmaDA.SetInput", I observed this exact behavior in the current op-batcher code, see ethereum-optimism#11534

For 2. agree, we will need to set a limit since disperser has a rate limiter per sender anyways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to deviate from the limiting offered by the errgroup in one way. Instead of blocking when creating a new goroutine, I would like to skip creating new writes on a higher level (maybe not call publishStateToL1 when the limit is reached), so that the op-batcher main loop keeps running even in that case.

I assume that blocking the main loop for a longer time will invite all kinds of different problems. So unless we can assure that the blocking is short, we should always return to the main loop quickly.

If we can expect somewhat quick confirmations for all DAs soon (30s confirmations for EigenDA were mentioned), this might be overengineering, though.

There were overlapping and it was nit clear why an error should only
cancel goroutines from a single publishStateToL1 call.
It also allows limiting the total number of goroutines spawned that way
(although this introduces same blocking possibility again).
The only error handling we do is logging, so we can log directly in the
goroutine and avoid resetting the errgroup, which feels like it goes
against the errgroup concept.
If we only want to limit the number of goroutines, we can add our own
semaphore. That would also allow us to skip adding new goroutines
without blocking in errgroup.Group.Go.
Copy link

@ezdac ezdac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good like that.
Limiting the number of routines can be added after this PR

Copy link

@samlaf samlaf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey guys :) samlaf from Eigenlayer here. We're working on the exact same thing. Teddy wrote to you guys about starting a colab communication channel to merge our efforts.

I like this waitGroup solution. We were doing something much lower level, but we had completely forgotten about the L2 reorg possibility (which your waitgroup allows flushing the channels when a reorg happen). Here's our current (also suuper preliminary) solution we were thinking, which has the goroutine much lower in the driver event loop.

One (potential?) issue I see with your current implementation is that this error is no longer returned to the outer for loop, which breaks the current logic of waiting for next poll interval upon error. I'm not sure this is a problem... since frames don't have to be sent in order. But it's something to keep in mind when we try to upstream this, since it affects the normal (non alt-da) route logic.

Copy link

@mcortesi mcortesi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read the comments, i've mention daQueue or da ... things.

We need a better name. To clearly define responsabilities.

One option i can think of, is that we have the BatchSubmitter and the BatchSubmitter talks to a BatchPublisher.

So what we're creating it's a BatchPublisher and then if we then use a queue, it could be called publishingQueue..

@@ -268,6 +268,7 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
var daWaitGroup sync.WaitGroup

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd create a similar interface to txmgr.Queue to hide impl details on how you're doing this

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we should have a config option like l.Config.MaxPendingTransactions but for DA.

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
waitGroup.Add(1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would replace this by a daQueue.send(), and again, hide away details of how you manage this

@@ -289,8 +290,11 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this new architecture, the mainLoop talks to the daQueue which then talks to queue.

With that in mind. I think if we have a DaPublisher (similar to the TxManager) we can abstract away all details of the TxManager... all in all, the TxManager is an impl detail of the DaPublisher.

From the BatchSubmitter perspective, It doesn't need to know about the TxManager. It only needs to be able to "publish txData", and for each txData to be published it needs a "receipt"... a resulst to know wether publishing was successful or a failure.

Also, better to have logic from the "publisher" in it's own class and outside the BatchSubmitter, so moving sendTransaction out. That would give us more control to what "state" is accesible, and we don't. have 2 goroutines accesing state in concurrent manner for thing that don't support that

@mcortesi
Copy link

Hey guys :) samlaf from Eigenlayer here. We're working on the exact same thing. Teddy wrote to you guys about starting a colab communication channel to merge our efforts.

I like this waitGroup solution. We were doing something much lower level, but we had completely forgotten about the L2 reorg possibility (which your waitgroup allows flushing the channels when a reorg happen). Here's our current (also suuper preliminary) solution we were thinking, which has the goroutine much lower in the driver event loop.

One (potential?) issue I see with your current implementation is that this error is no longer returned to the outer for loop, which breaks the current logic of waiting for next poll interval upon error. I'm not sure this is a problem... since frames don't have to be sent in order. But it's something to keep in mind when we try to upstream this, since it affects the normal (non alt-da) route logic.

@samlaf Thanks for the review.

I agree with you that we should capture and return errors. Logic should be the same as before. Maybe there's a way to only run with a goroutine if we're on the plasma case. To me, all code from sendTransaction should be moved to a BatchPublisher, a wether this does things async or sync can depend on what it's actually doing.

@karlb
Copy link
Author

karlb commented Aug 23, 2024

Thanks for all the feedback!

One (potential?) issue I see with your current implementation is that this error is no longer returned to the outer for loop, which breaks the current logic of waiting for next poll interval upon error. I'm not sure this is a problem... since frames don't have to be sent in order. But it's something to keep in mind when we try to upstream this, since it affects the normal (non alt-da) route logic.

Error handling is one of the main points I wanted to have a discussion about. For that specific error, this PR has two earlier versions were it is handled by putting the goroutines in an errgroup and fetching the errgroups error:

  • One where I try to keep the original error handling as closely as possible by having one errgroup per publishTxToL1 call eba6f86
  • And one where there is a single errgroup for all goroutines spawned DA writes 7be9273

But overall I felt these were to many concurrency primitives for not enough benefit, so I trimmed it down and wanted to talk about what the actual intention in the different error cases is and at which point we want to abstract over the different DAs (do we parallelize all DA access, only alt-DA or only EigenDA).
Some points relevant to this are:

  • Do we want to cancel other DA writes on one fails (And what does it mean to fail in this case? Failing plasma writes are not returned as errors from sendTransaction)
  • How do we want to limit processing? How many parallel goroutines, how quickly and how often do we want to retry which operation?
  • What classes of failures do we have?
    • Failing DA writes retried via recordFailedTx
    • Other expected occasional failures. The logic of how the data is eventually written in these cases is less obvious.
    • Some errors must indicate bugs (or at least unplanned cases). Should we still just log and retry forever on those or is there something else we should do (slow down retries, stop operation to avoid corrupting state, etc.)?

I hope that after some discussion I can do something better than my first attempts at just mimicking the original error handling closely.

Copy link

github-actions bot commented Sep 7, 2024

This PR is stale because it has been open 14 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the Stale label Sep 7, 2024
@github-actions github-actions bot closed this Sep 12, 2024
@karlb
Copy link
Author

karlb commented Sep 13, 2024

Closed in favour of the upstream PR ethereum-optimism#11698

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants