Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: adjust error handling on pending-channels after close #7683

Merged
merged 7 commits into from
Dec 18, 2023
6 changes: 5 additions & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ func (c *channelBuilder) setFullErr(err error) {
// frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error {
if c.IsFull() {
return c.closeAndOutputAllFrames()
err := c.closeAndOutputAllFrames()
if err != nil {
return fmt.Errorf("error while closing full channel (full reason: %w): %w", c.FullErr(), err)
}
return nil
}
return c.outputReadyFrames()
}
Expand Down
36 changes: 31 additions & 5 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,15 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
}
}

// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")

// Close clears any pending channels that are not in-flight already, to leave a clean derivation state.
// Close then marks the remaining current open channel, if any, as "full" so it can be submitted as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we're marking non-full channels as full specifically so they'll be handled in a certain way. Is that an appropriate use of the marking? I see elsewhere that we emit errors about a "error while closing full channel", and worry that will be confusing if the channel isn't actually full.

// Close does NOT immediately output frames for the current remaining channel:
// as this might error, due to limitations on a single channel.
// Instead, this is part of the pending-channel submission work: after closing,
// the caller SHOULD drain pending channels by generating TxData repeatedly until there is none left (io.EOF).
// A ErrPendingAfterClose error will be returned if there are any remaining pending channels to submit.
func (s *channelManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -348,19 +354,39 @@ func (s *channelManager) Close() error {
}

s.closed = true
s.log.Info("Channel manager is closing")

// Any pending state can be proactively cleared if there are no submitted transactions
for _, ch := range s.channelQueue {
if ch.NoneSubmitted() {
s.log.Info("Channel has no past or pending submission and might as well not exist, channel will be dropped", "channel", ch.ID(), "")
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
s.removePendingChannel(ch)
} else {
s.log.Info("Channel is in-flight and will need to be submitted after close", "channel", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions))
}
}
s.log.Info("Reviewed all pending channels on close", "remaining", len(s.channelQueue))
sebastianst marked this conversation as resolved.
Show resolved Hide resolved

if s.currentChannel == nil {
return nil
}

s.currentChannel.Close()
// If the channel is already full, we don't need to close it or output frames.
// This would already have happened in TxData.
if !s.currentChannel.IsFull() {
// Force-close the remaining open channel early (if not already closed):
// it will be marked as "full" due to service termination.
s.currentChannel.Close()

// Final outputFrames call in case there was unflushed data in the compressor.
if err := s.outputFrames(); err != nil {
return fmt.Errorf("outputting frames during close: %w", err)
}
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved

return s.outputFrames()
if s.currentChannel.HasFrame() {
// Make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
return nil
}
95 changes: 80 additions & 15 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {

a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
Expand Down Expand Up @@ -304,7 +304,7 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
Expand All @@ -321,14 +321,14 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 10000,
ChannelTimeout: 1000,
MaxFrameSize: 10_000,
ChannelTimeout: 1_000,
CompressorConfig: compressor.Config{
TargetNumFrames: 1,
TargetFrameSize: 10000,
TargetFrameSize: 10_000,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
Expand All @@ -339,32 +339,97 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {

numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)

err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")

txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

m.Close()
require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")

txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")

err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}

// ChannelManager_Close_PartiallyPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, where a block is still waiting
// inside the compressor to be flushed.
//
// This test runs only for singular batches on purpose.
// The SpanChannelOut writes full span batches to the compressor for
// every new block that's added, so NonCompressor cannot be used to
// set up a scenario where data is only partially flushed.
// Couldn't get the test to work even with modifying NonCompressor
// to flush half-way through writing to the compressor...
func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlError)
const framesize = 2200
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: framesize,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: framesize,
ApproxComprRatio: 1.0,
Kind: "none",
},
},
&defaultTestRollupConfig,
)
m.Clear()

numTx := 3 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)

require.NoError(m.AddL2Block(a), "adding 1st L2 block")
require.NoError(m.AddL2Block(b), "adding 2nd L2 block")

// Inside TxData, the two blocks queued above are written to the compressor.
// The NonCompressor will flush the first, but not the second block, when
// adding the second block, setting up the test with a partially flushed
// compressor.
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

// ensure no new ready data before closing
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected unclosed channel manager to only return a single frame")

require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")
require.NotNil(m.currentChannel)
require.ErrorIs(m.currentChannel.FullErr(), ErrTerminated, "Expected current channel to be terminated by Close")

txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())

m.TxConfirmed(txdata.ID(), eth.BlockID{})

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
Expand Down Expand Up @@ -408,7 +473,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {

m.TxFailed(txdata.ID())

m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")

_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
Expand Down
15 changes: 13 additions & 2 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ func (l *BatchSubmitter) loop() {
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager to handle L2 reorg, but pending channel(s) remain to be submitted")
} else {
l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.state.Clear()
Expand All @@ -274,11 +278,18 @@ func (l *BatchSubmitter) loop() {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
// This removes any never-submitted pending channels, so these do not have to be drained with transactions.
// Any remaining unfinished channel is terminated, so its data gets submitted.
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager, but pending channel(s) remain to be submitted")
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
} else {
l.Log.Error("Error closing the channel manager", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.Log.Info("Finished publishing all remaining channel data")
return
}
}
Expand Down
8 changes: 6 additions & 2 deletions op-batcher/compressor/compressors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (

type FactoryFunc func(Config) (derive.Compressor, error)

const RatioKind = "ratio"
const ShadowKind = "shadow"
const (
RatioKind = "ratio"
ShadowKind = "shadow"
NoneKind = "none"
)

var Kinds = map[string]FactoryFunc{
RatioKind: NewRatioCompressor,
ShadowKind: NewShadowCompressor,
NoneKind: NewNonCompressor,
}

var KindKeys []string
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
77 changes: 77 additions & 0 deletions op-batcher/compressor/non_compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package compressor

import (
"bytes"
"compress/zlib"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)

type NonCompressor struct {
config Config

buf bytes.Buffer
compress *zlib.Writer

fullErr error
}

// NewNonCompressor creates a new derive.Compressor implementation that doesn't
// compress by using zlib.NoCompression.
// It flushes to the underlying buffer any data from a prior write call.
// This is very unoptimal behavior and should only be used in tests.
// The NonCompressor can be used in tests to create a partially flushed channel.
// If the output buffer size after a write exceeds TargetFrameSize*TargetNumFrames,
// the compressor is marked as full, but the write succeeds.
func NewNonCompressor(config Config) (derive.Compressor, error) {
c := &NonCompressor{
config: config,
}

var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.NoCompression)
if err != nil {
return nil, err
}
return c, nil
}

func (t *NonCompressor) Write(p []byte) (int, error) {
if err := t.compress.Flush(); err != nil {
return 0, err
}
n, err := t.compress.Write(p)
if err != nil {
return 0, err
}
if uint64(t.buf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) {
t.fullErr = derive.CompressorFullErr
}
return n, nil
}

func (t *NonCompressor) Close() error {
return t.compress.Close()
}

func (t *NonCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}

func (t *NonCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.fullErr = nil
}

func (t *NonCompressor) Len() int {
return t.buf.Len()
}

func (t *NonCompressor) Flush() error {
return t.compress.Flush()
}

func (t *NonCompressor) FullErr() error {
return t.fullErr
}
39 changes: 39 additions & 0 deletions op-batcher/compressor/non_compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package compressor

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestNonCompressor(t *testing.T) {
require := require.New(t)
c, err := NewNonCompressor(Config{
TargetFrameSize: 1000,
TargetNumFrames: 100,
})
require.NoError(err)

const dlen = 100
data := make([]byte, dlen)
rng := rand.New(rand.NewSource(42))
rng.Read(data)

n, err := c.Write(data)
require.NoError(err)
require.Equal(n, dlen)
l0 := c.Len()
require.Less(l0, dlen)
require.Equal(7, l0)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
c.Flush()
l1 := c.Len()
require.Greater(l1, l0)
require.Greater(l1, dlen)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved

n, err = c.Write(data)
require.NoError(err)
require.Equal(n, dlen)
l2 := c.Len()
require.Equal(l1+5, l2)
}
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
Loading