Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fjord: Add Brotli channel compression support #10358

Merged
merged 37 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a727046
wip
cody-wang-cb Apr 26, 2024
9e1673b
wip
cody-wang-cb Apr 30, 2024
e732b21
fix
cody-wang-cb May 1, 2024
6566283
fix
cody-wang-cb May 1, 2024
4eb83ea
fix
cody-wang-cb May 1, 2024
934ed5e
fix
cody-wang-cb May 1, 2024
ec984a3
address some of the bots comments
cody-wang-cb May 1, 2024
a5ba98e
use version bit of 1
cody-wang-cb May 2, 2024
940f2bc
fix lint
cody-wang-cb May 2, 2024
642933e
adding compression type
cody-wang-cb May 2, 2024
8bd2eb6
update batch reader
cody-wang-cb May 2, 2024
314e99d
abstract span channel compressor
cody-wang-cb May 3, 2024
fee3e44
test and singular batch compressor
cody-wang-cb May 5, 2024
633aae2
fix
cody-wang-cb May 5, 2024
bf3675a
lint
cody-wang-cb May 5, 2024
1efd338
move channel compressor as interface
cody-wang-cb May 7, 2024
42ade28
add base class
cody-wang-cb May 7, 2024
6e09caf
fix go mod
cody-wang-cb May 8, 2024
e5f7e6c
test fixes
roberto-bayardo May 8, 2024
1730bfc
Merge pull request #4 from roberto-bayardo/tmp
cody-wang-cb May 8, 2024
c5f28f3
address comments
cody-wang-cb May 9, 2024
aade67d
fix
cody-wang-cb May 9, 2024
c69be77
merge from develop
cody-wang-cb May 9, 2024
ca47d0f
fix
cody-wang-cb May 10, 2024
729616f
revert channel builder test
cody-wang-cb May 10, 2024
7aaa4ed
revert ratio compressor test
cody-wang-cb May 10, 2024
da699a9
add checks to accept brotli only post fjord
cody-wang-cb May 10, 2024
abdea7d
revemo unnecessary in test
cody-wang-cb May 10, 2024
3ad55be
Merge branch 'develop' into cody/brotli-impl
cody-wang-cb May 10, 2024
5f1b198
fix forge-std
cody-wang-cb May 10, 2024
7298161
gofmt
cody-wang-cb May 10, 2024
aa0823b
address comments
cody-wang-cb May 11, 2024
010ed14
remove methods in compressor
cody-wang-cb May 11, 2024
d508412
fix error msg
cody-wang-cb May 11, 2024
7ecd688
Merge branch 'develop' into cody/brotli-impl
cody-wang-cb May 13, 2024
be6629e
add compression algo flag to optional flags
cody-wang-cb May 13, 2024
eadc24e
add Clone() function
cody-wang-cb May 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/BurntSushi/toml v1.3.2
github.com/andybalholm/brotli v1.1.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
Expand Down Expand Up @@ -64,7 +65,6 @@ require (
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
}
var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo, cfg.CompressorConfig.CompressLevel)
} else {
co, err = derive.NewSingularChannelOut(c)
}
Expand Down
168 changes: 101 additions & 67 deletions op-batcher/batcher/channel_builder_test.go
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1
channelConfig.TargetNumFrames = 1000
channelConfig.InitNoneCompressor()
channelConfig.InitNoneCompressor("brotli", 10)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
Expand Down Expand Up @@ -430,54 +430,72 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 5
channelConfig.BatchType = derive.SpanBatchType
channelConfig.InitRatioCompressor(1)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(t, err)
require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames())

// Calling OutputFrames without having called [AddBlock]
// should return no error
require.NoError(t, cb.OutputFrames())

// There should be no ready bytes yet
require.Equal(t, 0, cb.co.ReadyBytes())

// fill up
for {
err = addMiniBlock(cb)
if err == nil {
require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0)
} else {
require.ErrorIs(t, err, derive.ErrCompressorFull)
break
}
testCases := []struct {
testName string
algo string
compressLevel int
targetNumFrames int
}{
{testName: "Span Batch output frames with zlib", algo: "zlib", compressLevel: 9, targetNumFrames: 5},
{testName: "Span Batch output frames with brotli", algo: "brotli", compressLevel: 10, targetNumFrames: 1}, // to fill faster
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}

require.True(t, cb.IsFull())
// Check how many ready bytes
require.GreaterOrEqual(t,
cb.co.ReadyBytes()+derive.FrameV0OverHeadSize,
int(channelConfig.MaxFrameSize))
require.Equal(t, 0, cb.PendingFrames())

// We should be able to output the frames
require.NoError(t, cb.OutputFrames())

// There should be several frames in the channel builder now
require.Greater(t, cb.PendingFrames(), 1)
for i := 0; i < cb.numFrames-1; i++ {
require.Len(t, cb.frames[i].data, int(channelConfig.MaxFrameSize))
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = tc.targetNumFrames
channelConfig.BatchType = derive.SpanBatchType
channelConfig.InitRatioCompressor(1, tc.algo, tc.compressLevel)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(t, err)
require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames())

// Calling OutputFrames without having called [AddBlock]
// should return no error
require.NoError(t, cb.OutputFrames())

// There should be no ready bytes yet
require.Equal(t, 0, cb.co.ReadyBytes())

// fill up
for {
err = addMiniBlock(cb)
if err == nil {
if cb.IsFull() {
// this happens when the data exactly fills the channel
break
}
require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0)
} else {
require.ErrorIs(t, err, derive.ErrCompressorFull)
break
}
}

require.True(t, cb.IsFull())
// Check how many ready bytes
require.GreaterOrEqual(t,
cb.co.ReadyBytes()+derive.FrameV0OverHeadSize,
int(channelConfig.MaxFrameSize))
require.Equal(t, 0, cb.PendingFrames())

// We should be able to output the frames
require.NoError(t, cb.OutputFrames())

// There should be several frames in the channel builder now
require.Greater(t, cb.PendingFrames(), 1)
for i := 0; i < cb.numFrames-1; i++ {
require.Len(t, cb.frames[i].data, int(channelConfig.MaxFrameSize))
}
require.LessOrEqual(t, len(cb.frames[len(cb.frames)-1].data), int(channelConfig.MaxFrameSize))
})
}
require.LessOrEqual(t, len(cb.frames[len(cb.frames)-1].data), int(channelConfig.MaxFrameSize))
}

// ChannelBuilder_MaxRLPBytesPerChannel tests the [ChannelBuilder.OutputFrames]
Expand All @@ -486,7 +504,7 @@ func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) {
t.Parallel()
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.InitNoneCompressor()
channelConfig.InitNoneCompressor("brotli", 10)
channelConfig.BatchType = batchType

// Construct the channel builder
Expand All @@ -504,7 +522,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1
channelConfig.TargetNumFrames = math.MaxUint16 + 1
channelConfig.InitRatioCompressor(.1)
channelConfig.InitRatioCompressor(.1, "brotli", 10)
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
channelConfig.BatchType = batchType

rng := rand.New(rand.NewSource(123))
Expand Down Expand Up @@ -546,27 +564,43 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {
TargetNumFrames: 1,
BatchType: derive.SpanBatchType,
}
cfg.InitShadowCompressor()

cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)
testCases := []struct {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
testName string
algo string
compressLevel int
targetNumFrames int
}{
{testName: "Full shadow compressor with zlib", algo: "zlib", compressLevel: 9},
{testName: "Full shadow compressor with brotli", algo: "brotli", compressLevel: 10},
}

rng := rand.New(rand.NewSource(420))
a := dtest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)
_, err = cb.AddBlock(a)
require.NoError(err)
_, err = cb.AddBlock(a)
require.ErrorIs(err, derive.ErrCompressorFull)
// without fix, adding the second block would succeed and then adding a
// third block would fail with full error and the compressor would be full.
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
cfg.InitShadowCompressor(tc.algo, tc.compressLevel)

require.NoError(cb.OutputFrames())
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

rng := rand.New(rand.NewSource(420))
a := dtest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)

for {
_, err = cb.AddBlock(a)
if err != nil {
require.ErrorIs(err, derive.ErrCompressorFull)
break
}
}

require.True(cb.HasFrame())
f := cb.NextFrame()
require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame
require.NoError(cb.OutputFrames())
require.True(cb.HasFrame())
f := cb.NextFrame()
require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame

require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix
require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix
})
}
}

func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
Expand All @@ -577,7 +611,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 2
// Configure the Input Threshold params so we observe a full channel
channelConfig.InitRatioCompressor(1)
channelConfig.InitRatioCompressor(1, "brotli", 10)
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
Expand Down Expand Up @@ -700,7 +734,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = tnf
cfg.BatchType = batchType
cfg.InitShadowCompressor()
cfg.InitShadowCompressor("brotli", 10)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

Expand Down Expand Up @@ -782,7 +816,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16
cfg.BatchType = batchType
cfg.InitRatioCompressor(1.0)
cfg.InitRatioCompressor(1.0, "brotli", 10)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err, "NewChannelBuilder")

Expand Down
16 changes: 9 additions & 7 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,27 @@ type ChannelConfig struct {
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will
// be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) {
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo string, compressLevel int) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio,
Kind: comprKind,
CompressionAlgo: compressionAlgo,
CompressLevel: compressLevel,
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind)
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo string, compressLevel int) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo, compressLevel)
}

func (cc *ChannelConfig) InitShadowCompressor() {
cc.InitCompressorConfig(0, compressor.ShadowKind)
func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo string, compressLevel int) {
cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo, compressLevel)
}

func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind)
func (cc *ChannelConfig) InitNoneCompressor(compressionAlgo string, compressLevel int) {
cc.InitCompressorConfig(0, compressor.NoneKind, compressionAlgo, compressLevel)
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig {
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
}
c.InitRatioCompressor(0.4)
c.InitRatioCompressor(0.4, "brotli", 10)
return c
}

Expand Down
10 changes: 5 additions & 5 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig
TargetNumFrames: 1,
BatchType: batchType,
}
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, "brotli", 10)
return cfg
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
cfg.ChannelTimeout = 10
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, "brotli", 10)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)

// Channel Manager state should be empty by default
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
ChannelTimeout: 1000,
TargetNumFrames: 100,
}
cfg.InitNoneCompressor()
cfg.InitNoneCompressor("brotli", 10)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})

Expand Down Expand Up @@ -397,7 +397,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, batchType)
cfg.TargetNumFrames = 1000
cfg.InitNoneCompressor()
cfg.InitNoneCompressor("brotli", 10)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})

Expand Down Expand Up @@ -445,7 +445,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
const maxChannelDuration = 15
cfg := channelManagerTestConfig(1000, derive.SpanBatchType)
cfg.MaxChannelDuration = maxChannelDuration
cfg.InitNoneCompressor()
cfg.InitNoneCompressor("brotli", 10)

for _, tt := range []struct {
name string
Expand Down
14 changes: 14 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string

// Type of compression algorithm to use. Must be one of [zlib, brotli]
CompressionAlgo string

// Levels of compression to use. E.g. 1-11 for brotli, 0-9 for zlib
CompressLevel int

// If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC.
Stopped bool
Expand Down Expand Up @@ -124,6 +130,12 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if c.CompressionAlgo == "" {
return errors.New("must set compression algo")
}
if (c.CompressionAlgo == "zlib" && (c.CompressLevel < 0 || c.CompressLevel > 9)) || (c.CompressionAlgo == "brotli" && (c.CompressLevel < 1 || c.CompressLevel > 11)) {
return fmt.Errorf("invalid compression level %v for %v", c.CompressLevel, c.CompressionAlgo)
}
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
Expand Down Expand Up @@ -168,6 +180,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: ctx.String(flags.CompressionAlgoFlag.Name),
CompressLevel: ctx.Int(flags.CompressLevelFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
Expand Down
4 changes: 3 additions & 1 deletion op-batcher/batcher/config_test.go
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func validBatcherConfig() batcher.CLIConfig {
MetricsConfig: metrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check()
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
RPC: rpc.DefaultCLIConfig(),
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: "brotli",
CompressLevel: 9,
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading