Skip to content

Commit

Permalink
op-batcher: adjust error handling on pending-channels after close
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Oct 14, 2023
1 parent 8f7f94d commit c57fc21
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
6 changes: 5 additions & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ func (c *channelBuilder) setFullErr(err error) {
// frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error {
if c.IsFull() {
return c.closeAndOutputAllFrames()
err := c.closeAndOutputAllFrames()
if err != nil {
return fmt.Errorf("error while closing full channel (full reason: %w): %w", c.FullErr(), err)
}
return err
}
return c.outputReadyFrames()
}
Expand Down
24 changes: 20 additions & 4 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,15 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
}
}

// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")

// Close clears any pending channels that are not in-flight already, to leave a clean derivation state.
// Close then marks the remaining current open channel, if any, as "full" so it can be submitted as well.
// Close does NOT immediately output frames for the current remaining channel:
// as this might error, due to limitations on a single channel.
// Instead, this is part of the pending-channel submission work: after closing,
// the caller SHOULD drain pending channels by generating TxData repeatedly until there is none left (io.EOF).
// A ErrPendingAfterClose error will be returned if there are any remaining pending channels to submit.
func (s *channelManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -343,19 +349,29 @@ func (s *channelManager) Close() error {
}

s.closed = true
s.log.Info("Channel manager is closing")

// Any pending state can be proactively cleared if there are no submitted transactions
for _, ch := range s.channelQueue {
if ch.NoneSubmitted() {
s.log.Info("Channel has no past or pending submission and might as well not exist, channel will be dropped", "channel", ch.ID(), "")
s.removePendingChannel(ch)
} else {
s.log.Info("Channel is in-flight and will need to be submitted after close", "channel", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions))
}
}
s.log.Info("reviewed all pending channels on close", "remaining", len(s.channelQueue))

if s.currentChannel == nil {
return nil
}

// Force-close the remaining open channel early (if not already closed):
// it will be marked as "full" due to service termination.
s.currentChannel.Close()

return s.outputFrames()
// We do not s.outputFrames():
// this will only error if we cannot submit it in one go due to channel limits.
// Instead, make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
10 changes: 6 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {

a, _ := derivetest.RandomL2Block(rng, 4)

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
Expand Down Expand Up @@ -286,13 +286,15 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {

txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

m.Close()
require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")

txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

Expand Down Expand Up @@ -340,7 +342,7 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {

m.TxFailed(txdata.ID())

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
Expand Down
15 changes: 13 additions & 2 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ func (l *BatchSubmitter) loop() {
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("closed channel manager to handle L2 reorg, but pending channel(s) remain to be submitted")
} else {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.state.Clear()
Expand All @@ -266,11 +270,18 @@ func (l *BatchSubmitter) loop() {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
// This removes any pending channels, so these do not have to be drained .
// But it also tries to write ev
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("closed channel manager, but pending channel(s) remain to be submitted")
} else {
l.Log.Error("error closing the channel manager", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.Log.Info("finished publishing all remaining channel data")
return
}
}
Expand Down
7 changes: 4 additions & 3 deletions op-node/rollup/derive/channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var ErrMaxFrameSizeTooSmall = errors.New("maxSize is too small to fit the fixed frame overhead")
var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx")
var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit")
var ErrChannelOutAlreadyClosed = errors.New("channel-out already closed")

// FrameV0OverHeadSize is the absolute minimum size of a frame.
// This is the fixed overhead frame size, calculated as specified
Expand Down Expand Up @@ -96,7 +97,7 @@ func (co *ChannelOut) Reset() error {
// should be closed and a new one should be made.
func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}

batch, _, err := BlockToBatch(block)
Expand All @@ -116,7 +117,7 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
// the batch data with AddBlock.
func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}

// We encode to a temporary buffer to determine the encoded length to
Expand Down Expand Up @@ -160,7 +161,7 @@ func (co *ChannelOut) FullErr() error {

func (co *ChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
return ErrChannelOutAlreadyClosed
}
co.closed = true
return co.compress.Close()
Expand Down

0 comments on commit c57fc21

Please sign in to comment.