From 03750d480970f27c672a47b3ed2b734e8e414a9a Mon Sep 17 00:00:00 2001 From: bendanzhentan <455462586@qq.com> Date: Wed, 16 Aug 2023 17:22:07 +0800 Subject: [PATCH] feat: check channel timed out before data submission --- op-batcher/batcher/channel_manager.go | 33 ++++++++++++++++++++-- op-batcher/batcher/channel_manager_test.go | 12 ++++---- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index b43fff7ae4..4687313a49 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -154,6 +154,16 @@ func (s *channelManager) pendingChannelIsTimedOut() bool { return max-min >= s.cfg.ChannelTimeout } +func (s *channelManager) pendingChannelMinConfirmedInclusionBlock() uint64 { + min := uint64(math.MaxUint64) + for _, inclusionBlock := range s.confirmedTransactions { + if inclusionBlock.Number < min { + min = inclusionBlock.Number + } + } + return min +} + // pendingChannelIsFullySubmitted returns true if the channel has been fully submitted. func (s *channelManager) pendingChannelIsFullySubmitted() bool { if s.pendingChannel == nil { @@ -163,12 +173,26 @@ func (s *channelManager) pendingChannelIsFullySubmitted() bool { } // nextTxData pops off s.datas & handles updating the internal state -func (s *channelManager) nextTxData() (txData, error) { +func (s *channelManager) nextTxData(l1Head eth.BlockID) (txData, error) { if s.pendingChannel == nil || !s.pendingChannel.HasFrame() { s.log.Trace("no next tx data") return txData{}, io.EOF // TODO: not enough data error instead } + // If the channel has timed out relative to the current L1 head and there are still frames not submitted, + // cancel the channel, because even if these frames are submitted, the channel has still timed out and + // will discard by OP Node's derivation pipeline. + if s.pendingChannel.HasFrame() { + minConfirmed := s.pendingChannelMinConfirmedInclusionBlock() + if l1Head.Number > minConfirmed && l1Head.Number-minConfirmed >= s.cfg.MaxChannelDuration { + s.metr.RecordChannelTimedOut(s.pendingChannel.ID()) + s.log.Warn("Channel timed out", "id", s.pendingChannel.ID(), "rest frames", s.pendingChannel.NumFrames()) + s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) + s.clearPendingChannel() + return txData{}, io.EOF + } + } + frame := s.pendingChannel.NextFrame() txdata := txData{frame} id := txdata.ID() @@ -189,7 +213,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { // Short circuit if there is a pending frame or the channel manager is closed. if dataPending || s.closed { - return s.nextTxData() + return s.nextTxData(l1Head) } // No pending frame, so we have to add new blocks to the channel @@ -221,7 +245,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { return txData{}, err } - return s.nextTxData() + return s.nextTxData(l1Head) } func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error { @@ -269,6 +293,9 @@ func (s *channelManager) processBlocks() error { } else if err != nil { return fmt.Errorf("adding block[%d] to channel builder: %w", i, err) } + blockID := eth.BlockID{Number: block.NumberU64(), Hash: block.Hash()} + s.log.Info("Added block to channel", "channel", s.pendingChannel.ID(), "block", blockID) + blocksAdded += 1 latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info) s.metr.RecordL2BlockInChannel(block) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 47a6ad0127..bb5f7f7067 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -125,7 +125,7 @@ func TestChannelManagerNextTxData(t *testing.T) { m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) // Nil pending channel should return EOF - returnedTxData, err := m.nextTxData() + returnedTxData, err := m.nextTxData(eth.BlockID{}) require.ErrorIs(t, err, io.EOF) require.Equal(t, txData{}, returnedTxData) @@ -133,7 +133,7 @@ func TestChannelManagerNextTxData(t *testing.T) { // The nextTxData function should still return EOF // since the pending channel has no frames require.NoError(t, m.ensurePendingChannel(eth.BlockID{})) - returnedTxData, err = m.nextTxData() + returnedTxData, err = m.nextTxData(eth.BlockID{}) require.ErrorIs(t, err, io.EOF) require.Equal(t, txData{}, returnedTxData) @@ -150,7 +150,7 @@ func TestChannelManagerNextTxData(t *testing.T) { require.Equal(t, 1, m.pendingChannel.NumFrames()) // Now the nextTxData function should return the frame - returnedTxData, err = m.nextTxData() + returnedTxData, err = m.nextTxData(eth.BlockID{}) expectedTxData := txData{frame} expectedChannelID := expectedTxData.ID() require.NoError(t, err) @@ -209,7 +209,7 @@ func TestChannelManager_Clear(t *testing.T) { require.NoError(m.processBlocks()) require.NoError(m.pendingChannel.co.Flush()) require.NoError(m.pendingChannel.OutputFrames()) - _, err := m.nextTxData() + _, err := m.nextTxData(l1BlockID) require.NoError(err) require.Len(m.blocks, 0) require.Equal(newL1Tip, m.tip) @@ -260,7 +260,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) { } m.pendingChannel.PushFrame(frame) require.Equal(t, 1, m.pendingChannel.NumFrames()) - returnedTxData, err := m.nextTxData() + returnedTxData, err := m.nextTxData(eth.BlockID{}) expectedTxData := txData{frame} expectedChannelID := expectedTxData.ID() require.NoError(t, err) @@ -308,7 +308,7 @@ func TestChannelManagerTxFailed(t *testing.T) { } m.pendingChannel.PushFrame(frame) require.Equal(t, 1, m.pendingChannel.NumFrames()) - returnedTxData, err := m.nextTxData() + returnedTxData, err := m.nextTxData(eth.BlockID{}) expectedTxData := txData{frame} expectedChannelID := expectedTxData.ID() require.NoError(t, err)