Skip to content

Commit

Permalink
feat: check channel timed out before data submission
Browse files Browse the repository at this point in the history
  • Loading branch information
bendanzhentan committed Aug 17, 2023
1 parent e363912 commit 03750d4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
33 changes: 30 additions & 3 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ func (s *channelManager) pendingChannelIsTimedOut() bool {
return max-min >= s.cfg.ChannelTimeout
}

func (s *channelManager) pendingChannelMinConfirmedInclusionBlock() uint64 {
min := uint64(math.MaxUint64)
for _, inclusionBlock := range s.confirmedTransactions {
if inclusionBlock.Number < min {
min = inclusionBlock.Number
}
}
return min
}

// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channelManager) pendingChannelIsFullySubmitted() bool {
if s.pendingChannel == nil {
Expand All @@ -163,12 +173,26 @@ func (s *channelManager) pendingChannelIsFullySubmitted() bool {
}

// nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() (txData, error) {
func (s *channelManager) nextTxData(l1Head eth.BlockID) (txData, error) {
if s.pendingChannel == nil || !s.pendingChannel.HasFrame() {
s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead
}

// If the channel has timed out relative to the current L1 head and there are still frames not submitted,
// cancel the channel, because even if these frames are submitted, the channel has still timed out and
// will discard by OP Node's derivation pipeline.
if s.pendingChannel.HasFrame() {
minConfirmed := s.pendingChannelMinConfirmedInclusionBlock()
if l1Head.Number > minConfirmed && l1Head.Number-minConfirmed >= s.cfg.MaxChannelDuration {
s.metr.RecordChannelTimedOut(s.pendingChannel.ID())
s.log.Warn("Channel timed out", "id", s.pendingChannel.ID(), "rest frames", s.pendingChannel.NumFrames())
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel()
return txData{}, io.EOF
}
}

frame := s.pendingChannel.NextFrame()
txdata := txData{frame}
id := txdata.ID()
Expand All @@ -189,7 +213,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {

// Short circuit if there is a pending frame or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData()
return s.nextTxData(l1Head)
}

// No pending frame, so we have to add new blocks to the channel
Expand Down Expand Up @@ -221,7 +245,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, err
}

return s.nextTxData()
return s.nextTxData(l1Head)
}

func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
Expand Down Expand Up @@ -269,6 +293,9 @@ func (s *channelManager) processBlocks() error {
} else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
}
blockID := eth.BlockID{Number: block.NumberU64(), Hash: block.Hash()}
s.log.Info("Added block to channel", "channel", s.pendingChannel.ID(), "block", blockID)

blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
s.metr.RecordL2BlockInChannel(block)
Expand Down
12 changes: 6 additions & 6 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ func TestChannelManagerNextTxData(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})

// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData()
returnedTxData, err := m.nextTxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)

// Set the pending channel
// The nextTxData function should still return EOF
// since the pending channel has no frames
require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
returnedTxData, err = m.nextTxData()
returnedTxData, err = m.nextTxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)

Expand All @@ -150,7 +150,7 @@ func TestChannelManagerNextTxData(t *testing.T) {
require.Equal(t, 1, m.pendingChannel.NumFrames())

// Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData()
returnedTxData, err = m.nextTxData(eth.BlockID{})
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestChannelManager_Clear(t *testing.T) {
require.NoError(m.processBlocks())
require.NoError(m.pendingChannel.co.Flush())
require.NoError(m.pendingChannel.OutputFrames())
_, err := m.nextTxData()
_, err := m.nextTxData(l1BlockID)
require.NoError(err)
require.Len(m.blocks, 0)
require.Equal(newL1Tip, m.tip)
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) {
}
m.pendingChannel.PushFrame(frame)
require.Equal(t, 1, m.pendingChannel.NumFrames())
returnedTxData, err := m.nextTxData()
returnedTxData, err := m.nextTxData(eth.BlockID{})
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestChannelManagerTxFailed(t *testing.T) {
}
m.pendingChannel.PushFrame(frame)
require.Equal(t, 1, m.pendingChannel.NumFrames())
returnedTxData, err := m.nextTxData()
returnedTxData, err := m.nextTxData(eth.BlockID{})
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
Expand Down

0 comments on commit 03750d4

Please sign in to comment.