Skip to content

Commit

Permalink
op-batcher: Add back outputFrames call to channelManager.Close
Browse files Browse the repository at this point in the history
Test added that validates that in rare circumstances this is needed.
This happens in scenarios where a block is written to the compressor,
but not flushed yet to the output buffer. If we don't call outputFrames
in channelManager.Close, this test fails.
  • Loading branch information
sebastianst committed Dec 14, 2023
1 parent 930548c commit 7ad152a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
26 changes: 18 additions & 8 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,22 @@ func (s *channelManager) Close() error {
return nil
}

// Force-close the remaining open channel early (if not already closed):
// it will be marked as "full" due to service termination.
s.currentChannel.Close()

// We do not s.outputFrames():
// this will only error if we cannot submit it in one go due to channel limits.
// Instead, make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
// If the channel is already full, we don't need to close it or output frames.
// This would already have happened in TxData.
if !s.currentChannel.IsFull() {
// Force-close the remaining open channel early (if not already closed):
// it will be marked as "full" due to service termination.
s.currentChannel.Close()

// Final outputFrames call in case there was unflushed data in the compressor.
if err := s.outputFrames(); err != nil {
return fmt.Errorf("outputting frames during close: %w", err)
}
}

if s.currentChannel.HasFrame() {
// Make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
return nil
}
85 changes: 74 additions & 11 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,14 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 10000,
ChannelTimeout: 1000,
MaxFrameSize: 10_000,
ChannelTimeout: 1_000,
CompressorConfig: compressor.Config{
TargetNumFrames: 1,
TargetFrameSize: 10000,
TargetFrameSize: 10_000,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
Expand All @@ -339,11 +339,6 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {

numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
Expand All @@ -365,8 +360,76 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}

// ChannelManager_Close_PartiallyPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, where a block is still waiting
// inside the compressor to be flushed.
//
// This test runs only for singular batches on purpose.
// The SpanChannelOut writes full span batches to the compressor for
// every new block that's added, so NonCompressor cannot be used to
// set up a scenario where data is only partially flushed.
// Couldn't get the test to work even with modifying NonCompressor
// to flush half-way through writing to the compressor...
func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlError)
const framesize = 2200
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: framesize,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: framesize,
ApproxComprRatio: 1.0,
Kind: "none",
},
},
&defaultTestRollupConfig,
)
m.Clear()

numTx := 3 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)

require.NoError(m.AddL2Block(a), "adding 1st L2 block")
require.NoError(m.AddL2Block(b), "adding 2nd L2 block")

// Inside TxData, the two blocks queued above are written to the compressor.
// The NonCompressor will flush the first, but not the second block, when
// adding the second block, setting up the test with a partially flushed
// compressor.
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

// ensure no new ready data before closing
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected unclosed channel manager to only return a single frame")

require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")
require.NotNil(m.currentChannel)
require.ErrorIs(m.currentChannel.FullErr(), ErrTerminated, "Expected current channel to be terminated by Close")

txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
Expand Down

0 comments on commit 7ad152a

Please sign in to comment.