diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index 551e5d96e6721..98956a3f334b6 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -321,7 +321,11 @@ func (c *channelBuilder) setFullErr(err error) { // frames will be created, possibly with a small leftover frame. func (c *channelBuilder) OutputFrames() error { if c.IsFull() { - return c.closeAndOutputAllFrames() + err := c.closeAndOutputAllFrames() + if err != nil { + return fmt.Errorf("error while closing full channel (full reason: %w): %w", c.FullErr(), err) + } + return err } return c.outputReadyFrames() } diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 052b9dbe1e3c9..6c5d8f5c9dbc8 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -332,9 +332,15 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) } } -// Close closes the current pending channel, if one exists, outputs any remaining frames, -// and prevents the creation of any new channels. -// Any outputted frames still need to be published. +var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager") + +// Close clears any pending channels that are not in-flight already, to leave a clean derivation state. +// Close then marks the remaining current open channel, if any, as "full" so it can be submitted as well. +// Close does NOT immediately output frames for the current remaining channel: +// as this might error, due to limitations on a single channel. +// Instead, this is part of the pending-channel submission work: after closing, +// the caller SHOULD drain pending channels by generating TxData repeatedly until there is none left (io.EOF). +// A ErrPendingAfterClose error will be returned if there are any remaining pending channels to submit. func (s *channelManager) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -343,19 +349,29 @@ func (s *channelManager) Close() error { } s.closed = true + s.log.Info("Channel manager is closing") // Any pending state can be proactively cleared if there are no submitted transactions for _, ch := range s.channelQueue { if ch.NoneSubmitted() { + s.log.Info("Channel has no past or pending submission and might as well not exist, channel will be dropped", "channel", ch.ID(), "") s.removePendingChannel(ch) + } else { + s.log.Info("Channel is in-flight and will need to be submitted after close", "channel", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions)) } } + s.log.Info("reviewed all pending channels on close", "remaining", len(s.channelQueue)) if s.currentChannel == nil { return nil } + // Force-close the remaining open channel early (if not already closed): + // it will be marked as "full" due to service termination. s.currentChannel.Close() - return s.outputFrames() + // We do not s.outputFrames(): + // this will only error if we cannot submit it in one go due to channel limits. + // Instead, make it clear to the caller that there is remaining pending work. + return ErrPendingAfterClose } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 4621dde1bdb98..2ab07a5b4cdd5 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -213,7 +213,7 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { a, _ := derivetest.RandomL2Block(rng, 4) - m.Close() + require.NoError(m.Close(), "Expected to close channel manager gracefully") err := m.AddL2Block(a) require.NoError(err, "Failed to add L2 block") @@ -252,7 +252,7 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { _, err = m.TxData(eth.BlockID{}) require.ErrorIs(err, io.EOF, "Expected channel manager to EOF") - m.Close() + require.NoError(m.Close(), "Expected to close channel manager gracefully") err = m.AddL2Block(b) require.NoError(err, "Failed to add L2 block") @@ -286,13 +286,15 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { txdata, err := m.TxData(eth.BlockID{}) require.NoError(err, "Expected channel manager to produce valid tx data") + log.Info("generated first tx data", "len", txdata.Len()) m.TxConfirmed(txdata.ID(), eth.BlockID{}) - m.Close() + require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data") txdata, err = m.TxData(eth.BlockID{}) require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data") + log.Info("generated more tx data", "len", txdata.Len()) m.TxConfirmed(txdata.ID(), eth.BlockID{}) @@ -340,7 +342,7 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { m.TxFailed(txdata.ID()) - m.Close() + require.NoError(m.Close(), "Expected to close channel manager gracefully") _, err = m.TxData(eth.BlockID{}) require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index bcffef6a9edf2..eb86df1e666c8 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -256,7 +256,11 @@ func (l *BatchSubmitter) loop() { if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) { err := l.state.Close() if err != nil { - l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err) + if errors.Is(err, ErrPendingAfterClose) { + l.Log.Warn("closed channel manager to handle L2 reorg, but pending channel(s) remain to be submitted") + } else { + l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err) + } } l.publishStateToL1(queue, receiptsCh, true) l.state.Clear() @@ -266,11 +270,18 @@ func (l *BatchSubmitter) loop() { case r := <-receiptsCh: l.handleReceipt(r) case <-l.shutdownCtx.Done(): + // This removes any pending channels, so these do not have to be drained . + // But it also tries to write ev err := l.state.Close() if err != nil { - l.Log.Error("error closing the channel manager", "err", err) + if errors.Is(err, ErrPendingAfterClose) { + l.Log.Warn("closed channel manager, but pending channel(s) remain to be submitted") + } else { + l.Log.Error("error closing the channel manager", "err", err) + } } l.publishStateToL1(queue, receiptsCh, true) + l.Log.Info("finished publishing all remaining channel data") return } } diff --git a/op-node/rollup/derive/channel_out.go b/op-node/rollup/derive/channel_out.go index 63e7cc38bc62d..6ffe2e9f99b0c 100644 --- a/op-node/rollup/derive/channel_out.go +++ b/op-node/rollup/derive/channel_out.go @@ -16,6 +16,7 @@ import ( var ErrMaxFrameSizeTooSmall = errors.New("maxSize is too small to fit the fixed frame overhead") var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx") var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit") +var ErrChannelOutAlreadyClosed = errors.New("channel-out already closed") // FrameV0OverHeadSize is the absolute minimum size of a frame. // This is the fixed overhead frame size, calculated as specified @@ -96,7 +97,7 @@ func (co *ChannelOut) Reset() error { // should be closed and a new one should be made. func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { if co.closed { - return 0, errors.New("already closed") + return 0, ErrChannelOutAlreadyClosed } batch, _, err := BlockToBatch(block) @@ -116,7 +117,7 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { // the batch data with AddBlock. func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { if co.closed { - return 0, errors.New("already closed") + return 0, ErrChannelOutAlreadyClosed } // We encode to a temporary buffer to determine the encoded length to @@ -160,7 +161,7 @@ func (co *ChannelOut) FullErr() error { func (co *ChannelOut) Close() error { if co.closed { - return errors.New("already closed") + return ErrChannelOutAlreadyClosed } co.closed = true return co.compress.Close()