From cee7d1fb02cf17d06a71e71ef070c139be3ed406 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 8 Apr 2024 16:41:30 -0500 Subject: [PATCH 01/41] Improve blocks re-execution and make it compatible with --init.then-quit --- blocks_reexecutor/blocks_reexecutor.go | 51 ++++++++++++++++++-------- cmd/nitro/nitro.go | 26 ++++++++++--- system_tests/blocks_reexecutor_test.go | 26 +++++-------- 3 files changed, 65 insertions(+), 38 deletions(-) diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index bb6de00cad..bedea37776 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -42,10 +42,9 @@ func (c *Config) Validate() error { } var DefaultConfig = Config{ - Enable: false, - Mode: "random", - Room: runtime.NumCPU(), - BlocksPerThread: 10000, + Enable: false, + Mode: "random", + Room: runtime.NumCPU(), } var TestConfig = Config{ @@ -84,25 +83,38 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block start = chainStart end = chainEnd } - if start < chainStart { - log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis") + if start < chainStart || start > chainEnd { + log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart) start = chainStart } - if end > chainEnd { - log.Warn("state reexecutor's end block number is greater than latest, resetting to latest") + if end > chainEnd || end < chainStart { + log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd) end = chainEnd } if c.Mode == "random" && end != start { - if c.BlocksPerThread > end-start { - c.BlocksPerThread = end - start + // Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly + rng := uint64(10000) + if c.BlocksPerThread != 0 { + rng = c.BlocksPerThread + } + if rng > end-start { + rng = end - start } - start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1))) - end = start + c.BlocksPerThread + start += uint64(rand.Intn(int(end - start - rng + 1))) + end = start + rng } - // inclusive of block reexecution [start, end] + // Inclusive of block reexecution [start, end] if start > 0 { start-- } + // Divide work equally among available threads + if c.BlocksPerThread == 0 { + c.BlocksPerThread = 10000 + work := (end - start) / uint64(c.Room) + if work > 0 { + c.BlocksPerThread = work + } + } return &BlocksReExecutor{ config: c, blockchain: blockchain, @@ -125,11 +137,13 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB } // we don't use state release pattern here // TODO do we want to use release pattern here? - startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) + startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) if err != nil { s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err) return s.startBlock } + // NoOp + defer release() start = startHeader.Number.Uint64() s.LaunchThread(func(ctx context.Context) { _, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil) @@ -169,9 +183,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) { log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end) } -func (s *BlocksReExecutor) Start(ctx context.Context) { +func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) { s.StopWaiter.Start(ctx, s) - s.LaunchThread(s.Impl) + s.LaunchThread(func(ctx context.Context) { + s.Impl(ctx) + if done != nil { + close(done) + } + }) } func (s *BlocksReExecutor) StopAndWait() { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 997adf9369..59241204f1 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -494,6 +494,25 @@ func mainImpl() int { return 1 } + fatalErrChan := make(chan error, 10) + + var blocksReExecutor *blocksreexecutor.BlocksReExecutor + if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil { + blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan) + if nodeConfig.Init.ThenQuit { + success := make(chan struct{}) + blocksReExecutor.Start(ctx, success) + deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() }) + select { + case err := <-fatalErrChan: + log.Error("shutting down due to fatal error", "err", err) + defer log.Error("shut down due to fatal error", "err", err) + return 1 + case <-success: + } + } + } + if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 { return 0 } @@ -514,8 +533,6 @@ func mainImpl() int { return 1 } - fatalErrChan := make(chan error, 10) - var valNode *valnode.ValidationNode if sameProcessValidationNodeEnabled { valNode, err = valnode.CreateValidationNode( @@ -644,9 +661,8 @@ func mainImpl() int { // remove previous deferFuncs, StopAndWait closes database and blockchain. deferFuncs = []func(){func() { currentNode.StopAndWait() }} } - if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil { - blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan) - blocksReExecutor.Start(ctx) + if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit { + blocksReExecutor.Start(ctx, nil) deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() }) } diff --git a/system_tests/blocks_reexecutor_test.go b/system_tests/blocks_reexecutor_test.go index c2941ddcc4..66690d1427 100644 --- a/system_tests/blocks_reexecutor_test.go +++ b/system_tests/blocks_reexecutor_test.go @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) { } } + // Reexecute blocks at mode full success := make(chan struct{}) + executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan) + executorFull.Start(ctx, success) - // Reexecute blocks at mode full - go func() { - executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan) - executorFull.StopWaiter.Start(ctx, executorFull) - executorFull.Impl(ctx) - executorFull.StopAndWait() - success <- struct{}{} - }() select { case err := <-feedErrChan: t.Errorf("error occurred: %v", err) @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) { } // Reexecute blocks at mode random - go func() { - c := &blocksreexecutor.TestConfig - c.Mode = "random" - executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan) - executorRandom.StopWaiter.Start(ctx, executorRandom) - executorRandom.Impl(ctx) - executorRandom.StopAndWait() - success <- struct{}{} - }() + success = make(chan struct{}) + c := &blocksreexecutor.TestConfig + c.Mode = "random" + executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan) + executorRandom.Start(ctx, success) + select { case err := <-feedErrChan: t.Errorf("error occurred: %v", err) From f9055c93622bd32fe2f5049bd89600236c4ee689 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Sat, 20 Apr 2024 02:26:52 +0200 Subject: [PATCH 02/41] add pebble extra options --- arbnode/dataposter/storage_test.go | 3 +- cmd/conf/database.go | 123 +++++++++++++++++++++++++++-- cmd/nitro/init.go | 12 +-- cmd/nitro/nitro.go | 5 +- cmd/pruning/pruning.go | 8 +- execution/gethexec/node.go | 2 +- go-ethereum | 2 +- system_tests/common_test.go | 11 ++- system_tests/das_test.go | 6 +- system_tests/pruning_test.go | 6 +- system_tests/staterecovery_test.go | 4 +- 11 files changed, 152 insertions(+), 30 deletions(-) diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go index f98c120f38..343efac3c7 100644 --- a/arbnode/dataposter/storage_test.go +++ b/arbnode/dataposter/storage_test.go @@ -19,6 +19,7 @@ import ( "github.com/offchainlabs/nitro/arbnode/dataposter/redis" "github.com/offchainlabs/nitro/arbnode/dataposter/slice" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" + "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/signature" @@ -44,7 +45,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage { t.Helper() - db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true) + db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions()) if err != nil { t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err) } diff --git a/cmd/conf/database.go b/cmd/conf/database.go index b049375d66..be0c630fa9 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -8,17 +8,21 @@ import ( "os" "path" "path/filepath" + "runtime" + "time" + "github.com/ethereum/go-ethereum/ethdb/pebble" flag "github.com/spf13/pflag" ) type PersistentConfig struct { - GlobalConfig string `koanf:"global-config"` - Chain string `koanf:"chain"` - LogDir string `koanf:"log-dir"` - Handles int `koanf:"handles"` - Ancient string `koanf:"ancient"` - DBEngine string `koanf:"db-engine"` + GlobalConfig string `koanf:"global-config"` + Chain string `koanf:"chain"` + LogDir string `koanf:"log-dir"` + Handles int `koanf:"handles"` + Ancient string `koanf:"ancient"` + DBEngine string `koanf:"db-engine"` + Pebble PebbleConfig `koanf:"pebble"` } var PersistentConfigDefault = PersistentConfig{ @@ -28,6 +32,7 @@ var PersistentConfigDefault = PersistentConfig{ Handles: 512, Ancient: "", DBEngine: "leveldb", + Pebble: PebbleConfigDefault, } func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -37,6 +42,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database") f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened") f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')") + PebbleConfigAddOptions(prefix+".pebble", f) } func (c *PersistentConfig) ResolveDirectoryNames() error { @@ -96,3 +102,108 @@ func (c *PersistentConfig) Validate() error { } return nil } + +type PebbleConfig struct { + BytesPerSync int `koanf:"bytes-per-sync"` + L0CompactionFileThreshold int `koanf:"l0-compaction-file-threshold"` + L0CompactionThreshold int `koanf:"l0-compaction-threshold"` + L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"` + LBaseMaxBytes int64 `koanf:"l-base-max-bytes"` + MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"` + DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"` + WALBytesPerSync int `koanf:"wal-bytes-per-sync"` + WALDir string `koanf:"wal-dir"` + WALMinSyncInterval int `koanf:"wal-min-sync-interval"` + TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` + Experimental PebbleExperimentalConfig `koaf:"experimental"` +} + +var PebbleConfigDefault = PebbleConfig{ + BytesPerSync: 0, // pebble default will be used + L0CompactionFileThreshold: 0, // pebble default will be used + L0CompactionThreshold: 0, // pebble default will be used + L0StopWritesThreshold: 0, // pebble default will be used + LBaseMaxBytes: 0, // pebble default will be used + MaxConcurrentCompactions: runtime.NumCPU(), + DisableAutomaticCompactions: false, + WALBytesPerSync: 0, // pebble default will be used + WALDir: "", // default will use same dir as for sstables + WALMinSyncInterval: 0, // pebble default will be used + TargetByteDeletionRate: 0, // pebble default will be used + Experimental: PebbleExperimentalConfigDefault, +} + +func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Int(prefix+".bytes-per-sync", PebbleConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background (0 = pebble default)") + f.Int(prefix+".l0-compaction-file-threshold", PebbleConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction (0 = pebble default)") + f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction (0 = pebble default)") + f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") + f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") + f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions (0 = pebble default)") + f.Bool(prefix+".disable-automatic-compactions", PebbleConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") + f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)") + f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") + f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") + PebbleExperimentalConfigAddOptions(".experimental", f) +} + +type PebbleExperimentalConfig struct { + L0CompactionConcurrency int `koanf:"l0-compaction-concurrency"` + CompactionDebtConcurrency uint64 `koanf:"compaction-debt-concurrency"` + ReadCompactionRate int64 `koanf:"read-compaction-rate"` + ReadSamplingMultiplier int64 `koanf:"read-sampling-multiplier"` + MaxWriterConcurrency int `koanf:"max-writer-concurrency"` + ForceWriterParallelism bool `koanf:"force-writer-parallelism"` +} + +var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ + L0CompactionConcurrency: 0, + CompactionDebtConcurrency: 0, + ReadCompactionRate: 0, + ReadSamplingMultiplier: -1, + MaxWriterConcurrency: 0, + ForceWriterParallelism: false, +} + +func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions. (0 = pebble default)") + f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen. (0 = pebble default)") + f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate") + f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB). (0 = pebble default)") + f.Int(prefix+".max-writer-concurrency", PebbleExperimentalConfigDefault.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.") + f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.") +} + +func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { + var maxConcurrentCompactions func() int + if c.MaxConcurrentCompactions > 0 { + maxConcurrentCompactions = func() int { return c.MaxConcurrentCompactions } + } + var walMinSyncInterval func() time.Duration + if c.WALMinSyncInterval > 0 { + walMinSyncInterval = func() time.Duration { + return time.Microsecond * time.Duration(c.WALMinSyncInterval) + } + } + return &pebble.ExtraOptions{ + BytesPerSync: c.BytesPerSync, + L0CompactionFileThreshold: c.L0CompactionFileThreshold, + L0CompactionThreshold: c.L0CompactionThreshold, + L0StopWritesThreshold: c.L0StopWritesThreshold, + LBaseMaxBytes: c.LBaseMaxBytes, + MaxConcurrentCompactions: maxConcurrentCompactions, + DisableAutomaticCompactions: c.DisableAutomaticCompactions, + WALBytesPerSync: c.WALBytesPerSync, + WALDir: c.WALDir, + WALMinSyncInterval: walMinSyncInterval, + TargetByteDeletionRate: c.TargetByteDeletionRate, + Experimental: pebble.ExtraOptionsExperimental{ + L0CompactionConcurrency: c.Experimental.L0CompactionConcurrency, + CompactionDebtConcurrency: c.Experimental.CompactionDebtConcurrency, + ReadCompactionRate: c.Experimental.ReadCompactionRate, + ReadSamplingMultiplier: c.Experimental.ReadSamplingMultiplier, + MaxWriterConcurrency: c.Experimental.MaxWriterConcurrency, + ForceWriterParallelism: c.Experimental.ForceWriterParallelism, + }, + } +} diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 6ebfec3bb1..2bae2d9e11 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -159,19 +159,19 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo return nil } -func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { +func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { - if readOnlyDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", 0, 0, "", "l2chaindata/", true); err == nil { + if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions()); err == nil { if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil { readOnlyDb.Close() if !arbmath.BigEquals(chainConfig.ChainID, chainId) { return nil, nil, fmt.Errorf("database has chain ID %v but config has chain ID %v (are you sure this database is for the right chain?)", chainConfig.ChainID, chainId) } - chainDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false) + chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions()) if err != nil { return chainDb, nil, err } - err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired()) + err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired()) if err != nil { return chainDb, nil, fmt.Errorf("error pruning: %w", err) } @@ -219,7 +219,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo var initDataReader statetransfer.InitDataReader = nil - chainDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false) + chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions()) if err != nil { return chainDb, nil, err } @@ -367,7 +367,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo return chainDb, l2BlockChain, err } - err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired()) + err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired()) if err != nil { return chainDb, nil, fmt.Errorf("error pruning: %w", err) } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 79ecd51ac2..f70d16a25a 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -177,6 +177,7 @@ func mainImpl() int { nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) nodeConfig.GraphQL.Apply(&stackConf) + if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } @@ -476,7 +477,7 @@ func mainImpl() int { } } - chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), l1Client, rollupAddrs) + chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), &nodeConfig.Persistent, l1Client, rollupAddrs) if l2BlockChain != nil { deferFuncs = append(deferFuncs, func() { l2BlockChain.Stop() }) } @@ -487,7 +488,7 @@ func mainImpl() int { return 1 } - arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, nodeConfig.Persistent.Pebble.ExtraOptions()) deferFuncs = append(deferFuncs, func() { closeDb(arbDb, "arbDb") }) if err != nil { log.Error("failed to open database", "err", err) diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index c483526aa1..363126a49f 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -80,12 +80,12 @@ func (r *importantRoots) addHeader(header *types.Header, overwrite bool) error { var hashListRegex = regexp.MustCompile("^(0x)?[0-9a-fA-F]{64}(,(0x)?[0-9a-fA-F]{64})*$") // Finds important roots to retain while proving -func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) ([]common.Hash, error) { +func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) ([]common.Hash, error) { chainConfig := gethexec.TryReadStoredChainConfig(chainDb) if chainConfig == nil { return nil, errors.New("database doesn't have a chain config (was this node initialized?)") } - arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", true) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", true, persistentConfig.Pebble.ExtraOptions()) if err != nil { return nil, err } @@ -232,11 +232,11 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return roots.roots, nil } -func PruneChainDb(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) error { +func PruneChainDb(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) error { if initConfig.Prune == "" { return pruner.RecoverPruning(stack.InstanceDir(), chainDb) } - root, err := findImportantRoots(ctx, chainDb, stack, initConfig, cacheConfig, l1Client, rollupAddrs, validatorRequired) + root, err := findImportantRoots(ctx, chainDb, stack, initConfig, cacheConfig, persistentConfig, l1Client, rollupAddrs, validatorRequired) if err != nil { return fmt.Errorf("failed to find root to retain for pruning: %w", err) } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 54f9ed6fe1..284934245b 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -216,7 +216,7 @@ func CreateExecutionNode( var classicOutbox *ClassicOutboxRetriever if l2BlockChain.Config().ArbitrumChainParams.GenesisBlockNum > 0 { - classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true) + classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true) // TODO can we skip using ExtraOptions here? if err != nil { log.Warn("Classic Msg Database not found", "err", err) classicOutbox = nil diff --git a/go-ethereum b/go-ethereum index daccadb06c..935cb21640 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit daccadb06c7bd9ad7e86c74f33ea39d897f0ece4 +Subproject commit 935cb216402c9693faf86d75a7fbb045109ed4a3 diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 7f9f4844fd..4bcf1349e2 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -21,6 +21,7 @@ import ( "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/cmd/chaininfo" + "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/deploy" @@ -718,9 +719,10 @@ func createL2BlockChainWithStackConfig( stack, err = node.New(stackConfig) Require(t, err) - chainDb, err := stack.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) + // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) - arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) initReader := statetransfer.NewMemoryInitDataReader(&l2info.ArbInitData) @@ -922,9 +924,10 @@ func Create2ndNodeWithConfig( l2stack, err := node.New(stackConfig) Require(t, err) - l2chainDb, err := l2stack.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) + // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs + l2chainDb, err := l2stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) - l2arbDb, err := l2stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) + l2arbDb, err := l2stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) initReader := statetransfer.NewMemoryInitDataReader(l2InitData) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index c4a3c453d8..be0ef9c957 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -25,6 +25,7 @@ import ( "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/blsSignatures" + "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/execution/gethexec" @@ -175,10 +176,11 @@ func TestDASRekey(t *testing.T) { l2stackA, err := node.New(stackConfig) Require(t, err) - l2chainDb, err := l2stackA.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) + // TODO get pebble.ExtraOptions from conf.PersistentConfig + l2chainDb, err := l2stackA.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) - l2arbDb, err := l2stackA.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false) + l2arbDb, err := l2stackA.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) l2blockchain, err := gethexec.GetBlockChain(l2chainDb, nil, chainConfig, gethexec.ConfigDefaultTest().TxLookupLimit) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index 8efc8653e6..e83c350804 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -65,7 +65,8 @@ func TestPruning(t *testing.T) { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() - chainDb, err := stack.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) + // TODO get pebble.ExtraOptions from conf.PersistentConfig + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) defer chainDb.Close() chainDbEntriesBeforePruning := countStateEntries(chainDb) @@ -89,7 +90,8 @@ func TestPruning(t *testing.T) { initConfig := conf.InitConfigDefault initConfig.Prune = "full" coreCacheConfig := gethexec.DefaultCacheConfigFor(stack, &builder.execConfig.Caching) - err = pruning.PruneChainDb(ctx, chainDb, stack, &initConfig, coreCacheConfig, builder.L1.Client, *builder.L2.ConsensusNode.DeployInfo, false) + persistentConfig := conf.PersistentConfigDefault + err = pruning.PruneChainDb(ctx, chainDb, stack, &initConfig, coreCacheConfig, &persistentConfig, builder.L1.Client, *builder.L2.ConsensusNode.DeployInfo, false) Require(t, err) for _, key := range testKeys { diff --git a/system_tests/staterecovery_test.go b/system_tests/staterecovery_test.go index 632e748da8..9dc1081a7b 100644 --- a/system_tests/staterecovery_test.go +++ b/system_tests/staterecovery_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/trie" + "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/staterecovery" "github.com/offchainlabs/nitro/execution/gethexec" ) @@ -49,7 +50,8 @@ func TestRectreateMissingStates(t *testing.T) { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() - chainDb, err := stack.OpenDatabase("l2chaindata", 0, 0, "l2chaindata/", false) + // TODO get pebble.ExtraOptions from conf.PersistentConfig + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) Require(t, err) defer chainDb.Close() cacheConfig := gethexec.DefaultCacheConfigFor(stack, &gethexec.DefaultCachingConfig) From 43d6e82020468299029f96426f20dd98635380b2 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Mon, 22 Apr 2024 16:20:57 +0200 Subject: [PATCH 03/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 935cb21640..9e62e652e2 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 935cb216402c9693faf86d75a7fbb045109ed4a3 +Subproject commit 9e62e652e211a47ad1c71a428b4a7ea6b96ae710 From 8981880ff6d341a9961dcaa0ee4466ee872de339 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Mon, 22 Apr 2024 16:47:21 +0200 Subject: [PATCH 04/41] fix koanf prefix --- cmd/conf/database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index be0c630fa9..8e3759ee73 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -115,7 +115,7 @@ type PebbleConfig struct { WALDir string `koanf:"wal-dir"` WALMinSyncInterval int `koanf:"wal-min-sync-interval"` TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` - Experimental PebbleExperimentalConfig `koaf:"experimental"` + Experimental PebbleExperimentalConfig `koanf:"experimental"` } var PebbleConfigDefault = PebbleConfig{ @@ -144,7 +144,7 @@ func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)") f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") - PebbleExperimentalConfigAddOptions(".experimental", f) + PebbleExperimentalConfigAddOptions(prefix+".experimental", f) } type PebbleExperimentalConfig struct { From 9d128ea332bbd987b739add12f6d142953595645 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Tue, 23 Apr 2024 15:30:03 +0200 Subject: [PATCH 05/41] add missing koanf pebble flag --- cmd/conf/database.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index 8e3759ee73..9264baa843 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -144,6 +144,7 @@ func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)") f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") + f.Int(prefix+".target-byte-deletion-rate", PebbleConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") PebbleExperimentalConfigAddOptions(prefix+".experimental", f) } From 7be2e34314ac234149e7408663781ce160817bcf Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Tue, 23 Apr 2024 19:40:19 +0200 Subject: [PATCH 06/41] add pebble layers config --- cmd/conf/database.go | 19 +++++++++++++++++++ go-ethereum | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index 9264baa843..fdf8eed565 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -116,6 +116,8 @@ type PebbleConfig struct { WALMinSyncInterval int `koanf:"wal-min-sync-interval"` TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` Experimental PebbleExperimentalConfig `koanf:"experimental"` + TargetFileSize int64 `koanf:"target-file-size"` + TargetFileSizeEqualLayers bool `koanf:"target-file-size-equal-layers"` } var PebbleConfigDefault = PebbleConfig{ @@ -131,6 +133,8 @@ var PebbleConfigDefault = PebbleConfig{ WALMinSyncInterval: 0, // pebble default will be used TargetByteDeletionRate: 0, // pebble default will be used Experimental: PebbleExperimentalConfigDefault, + TargetFileSize: 2 * 1024 * 1024, + TargetFileSizeEqualLayers: true, } func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -146,6 +150,8 @@ func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") f.Int(prefix+".target-byte-deletion-rate", PebbleConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") PebbleExperimentalConfigAddOptions(prefix+".experimental", f) + f.Int64(prefix+".target-file-size", PebbleConfigDefault.TargetFileSize, "target file size for the level 0") + f.Bool(prefix+".target-file-size-equal-layers", PebbleConfigDefault.TargetFileSizeEqualLayers, "if true same target-file-size will be uses for all layers, otherwise target size for layer n = 2 * target size for layer n - 1") } type PebbleExperimentalConfig struct { @@ -186,6 +192,18 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { return time.Microsecond * time.Duration(c.WALMinSyncInterval) } } + var levels []pebble.ExtraLevelOptions + if c.TargetFileSize > 0 { + if c.TargetFileSizeEqualLayers { + for i := 0; i < 7; i++ { + levels = append(levels, pebble.ExtraLevelOptions{TargetFileSize: c.TargetFileSize}) + } + } else { + for i := 0; i < 7; i++ { + levels = append(levels, pebble.ExtraLevelOptions{TargetFileSize: c.TargetFileSize << i}) + } + } + } return &pebble.ExtraOptions{ BytesPerSync: c.BytesPerSync, L0CompactionFileThreshold: c.L0CompactionFileThreshold, @@ -206,5 +224,6 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { MaxWriterConcurrency: c.Experimental.MaxWriterConcurrency, ForceWriterParallelism: c.Experimental.ForceWriterParallelism, }, + Levels: levels, } } diff --git a/go-ethereum b/go-ethereum index 9e62e652e2..d6428a6842 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 9e62e652e211a47ad1c71a428b4a7ea6b96ae710 +Subproject commit d6428a6842a8c7d39821e74662fe3e0af34babd7 From 95422f94ecfd17e2a5e3be49efe1d3fd605d51d6 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 24 Apr 2024 15:15:58 +0200 Subject: [PATCH 07/41] add pebble block size and index block size options --- cmd/conf/database.go | 40 +++++++++++++++++++++++++++------------- go-ethereum | 2 +- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index fdf8eed565..bdaf8c1b73 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -5,6 +5,7 @@ package conf import ( "fmt" + "math" "os" "path" "path/filepath" @@ -116,8 +117,12 @@ type PebbleConfig struct { WALMinSyncInterval int `koanf:"wal-min-sync-interval"` TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` Experimental PebbleExperimentalConfig `koanf:"experimental"` - TargetFileSize int64 `koanf:"target-file-size"` - TargetFileSizeEqualLayers bool `koanf:"target-file-size-equal-layers"` + + // level specific + BlockSize int `koanf:"block-size"` + IndexBlockSize int `koanf:"index-block-size"` + TargetFileSize int64 `koanf:"target-file-size"` + TargetFileSizeEqualLevels bool `koanf:"target-file-size-equal-levels"` } var PebbleConfigDefault = PebbleConfig{ @@ -133,8 +138,10 @@ var PebbleConfigDefault = PebbleConfig{ WALMinSyncInterval: 0, // pebble default will be used TargetByteDeletionRate: 0, // pebble default will be used Experimental: PebbleExperimentalConfigDefault, + BlockSize: 4096, + IndexBlockSize: 4096, TargetFileSize: 2 * 1024 * 1024, - TargetFileSizeEqualLayers: true, + TargetFileSizeEqualLevels: true, } func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -149,9 +156,11 @@ func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") f.Int(prefix+".target-byte-deletion-rate", PebbleConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") + f.Int(prefix+".block-size", PebbleConfigDefault.BlockSize, "target uncompressed size in bytes of each table block") + f.Int(prefix+".index-block-size", PebbleConfigDefault.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32)) PebbleExperimentalConfigAddOptions(prefix+".experimental", f) f.Int64(prefix+".target-file-size", PebbleConfigDefault.TargetFileSize, "target file size for the level 0") - f.Bool(prefix+".target-file-size-equal-layers", PebbleConfigDefault.TargetFileSizeEqualLayers, "if true same target-file-size will be uses for all layers, otherwise target size for layer n = 2 * target size for layer n - 1") + f.Bool(prefix+".target-file-size-equal-levels", PebbleConfigDefault.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1") } type PebbleExperimentalConfig struct { @@ -193,16 +202,16 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { } } var levels []pebble.ExtraLevelOptions - if c.TargetFileSize > 0 { - if c.TargetFileSizeEqualLayers { - for i := 0; i < 7; i++ { - levels = append(levels, pebble.ExtraLevelOptions{TargetFileSize: c.TargetFileSize}) - } - } else { - for i := 0; i < 7; i++ { - levels = append(levels, pebble.ExtraLevelOptions{TargetFileSize: c.TargetFileSize << i}) - } + for i := 0; i < 7; i++ { + targetFileSize := c.TargetFileSize + if !c.TargetFileSizeEqualLevels { + targetFileSize = targetFileSize << i } + levels = append(levels, pebble.ExtraLevelOptions{ + BlockSize: c.BlockSize, + IndexBlockSize: c.IndexBlockSize, + TargetFileSize: targetFileSize, + }) } return &pebble.ExtraOptions{ BytesPerSync: c.BytesPerSync, @@ -227,3 +236,8 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { Levels: levels, } } + +func (c *PebbleConfig) Validate() error { + // TODO + return nil +} diff --git a/go-ethereum b/go-ethereum index d6428a6842..509f1114ed 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit d6428a6842a8c7d39821e74662fe3e0af34babd7 +Subproject commit 509f1114edd9d4e367cedfe4011ceed5766e3f07 From 69d65fedd619a44e186be83de134e6ae3681d63c Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 24 Apr 2024 15:38:25 +0200 Subject: [PATCH 08/41] add MemTableStopWritesThreshold pebble option --- cmd/conf/database.go | 4 ++++ go-ethereum | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index bdaf8c1b73..59a7cafd51 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -110,6 +110,7 @@ type PebbleConfig struct { L0CompactionThreshold int `koanf:"l0-compaction-threshold"` L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"` LBaseMaxBytes int64 `koanf:"l-base-max-bytes"` + MemTableStopWritesThreshold int `koanf:"mem-table-stop-writes-threshold"` MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"` DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"` WALBytesPerSync int `koanf:"wal-bytes-per-sync"` @@ -131,6 +132,7 @@ var PebbleConfigDefault = PebbleConfig{ L0CompactionThreshold: 0, // pebble default will be used L0StopWritesThreshold: 0, // pebble default will be used LBaseMaxBytes: 0, // pebble default will be used + MemTableStopWritesThreshold: 2, MaxConcurrentCompactions: runtime.NumCPU(), DisableAutomaticCompactions: false, WALBytesPerSync: 0, // pebble default will be used @@ -150,6 +152,7 @@ func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction (0 = pebble default)") f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") + f.Int(prefix+".mem-table-stop-writes-threshold", PebbleConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions (0 = pebble default)") f.Bool(prefix+".disable-automatic-compactions", PebbleConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)") @@ -219,6 +222,7 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { L0CompactionThreshold: c.L0CompactionThreshold, L0StopWritesThreshold: c.L0StopWritesThreshold, LBaseMaxBytes: c.LBaseMaxBytes, + MemTableStopWritesThreshold: c.MemTableStopWritesThreshold, MaxConcurrentCompactions: maxConcurrentCompactions, DisableAutomaticCompactions: c.DisableAutomaticCompactions, WALBytesPerSync: c.WALBytesPerSync, diff --git a/go-ethereum b/go-ethereum index 509f1114ed..040c6f7870 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 509f1114edd9d4e367cedfe4011ceed5766e3f07 +Subproject commit 040c6f787056826112340ce0b4e5b8d43503f20a From c835fea2a840120adfc3459933a7118ae5219265 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 24 Apr 2024 16:02:57 +0200 Subject: [PATCH 09/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 040c6f7870..5e8d11c191 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 040c6f787056826112340ce0b4e5b8d43503f20a +Subproject commit 5e8d11c191c4b88e53ca53e69b7854efe89487fd From 04b16998573bcb0ae4bf10c6dd316e7eda004000 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 25 Apr 2024 00:48:59 +0200 Subject: [PATCH 10/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 5e8d11c191..07d08fede3 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 5e8d11c191c4b88e53ca53e69b7854efe89487fd +Subproject commit 07d08fede3e5e8bbfbdb3797fad08d94f8c7699a From 08ece6f85c9bf9c2ef393ab7d1cdcf5d53f7cac7 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 25 Apr 2024 01:01:52 +0200 Subject: [PATCH 11/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 07d08fede3..31dcc54970 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 07d08fede3e5e8bbfbdb3797fad08d94f8c7699a +Subproject commit 31dcc54970876a09e13820a4a7334f39af38157d From 028fd31cc45f3948420ea6cd76e40251a177edd6 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 25 Apr 2024 02:29:22 +0200 Subject: [PATCH 12/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 31dcc54970..a67aac7029 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 31dcc54970876a09e13820a4a7334f39af38157d +Subproject commit a67aac7029db022dd0e078783809e2fedf20de53 From 6d5343d5e2882a30b2fcae7d17f02591289c0f26 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Fri, 26 Apr 2024 23:23:01 +0200 Subject: [PATCH 13/41] update pebble options descriptions --- cmd/conf/database.go | 20 ++++++++++---------- go-ethereum | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index 59a7cafd51..1c8b673dd3 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -147,15 +147,15 @@ var PebbleConfigDefault = PebbleConfig{ } func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".bytes-per-sync", PebbleConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background (0 = pebble default)") - f.Int(prefix+".l0-compaction-file-threshold", PebbleConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction (0 = pebble default)") - f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction (0 = pebble default)") - f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") - f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)") + f.Int(prefix+".bytes-per-sync", PebbleConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background") + f.Int(prefix+".l0-compaction-file-threshold", PebbleConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction") + f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction") + f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached") + f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") f.Int(prefix+".mem-table-stop-writes-threshold", PebbleConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") - f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions (0 = pebble default)") + f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions") f.Bool(prefix+".disable-automatic-compactions", PebbleConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") - f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)") + f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud") f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") f.Int(prefix+".target-byte-deletion-rate", PebbleConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") @@ -185,10 +185,10 @@ var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ } func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions. (0 = pebble default)") - f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen. (0 = pebble default)") + f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions.") + f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen.") f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate") - f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB). (0 = pebble default)") + f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).") f.Int(prefix+".max-writer-concurrency", PebbleExperimentalConfigDefault.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.") f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.") } diff --git a/go-ethereum b/go-ethereum index a67aac7029..9f39f194d0 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit a67aac7029db022dd0e078783809e2fedf20de53 +Subproject commit 9f39f194d0a5b1ab1a47b1d4f83cd112f18dc4b3 From 3919a6e8175588c78b8ae7b8bd5f2e3e6ae84253 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 2 May 2024 17:39:12 +0200 Subject: [PATCH 14/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 9f39f194d0..3ecb5979ae 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 9f39f194d0a5b1ab1a47b1d4f83cd112f18dc4b3 +Subproject commit 3ecb5979ae489902c97d7146209c35071d167be6 From 991f07d2e3e6c8d1d74368977f463b21d73dc59e Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 2 May 2024 18:05:05 +0200 Subject: [PATCH 15/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 3ecb5979ae..1aaeef7598 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 3ecb5979ae489902c97d7146209c35071d167be6 +Subproject commit 1aaeef75987a3d4379cf7d876cdf1526d8701884 From 39d33c7a88e01a1e5ca77f2c2ff45d06ede45498 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Sat, 4 May 2024 00:27:13 +0200 Subject: [PATCH 16/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 1aaeef7598..ac85a19d5f 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 1aaeef75987a3d4379cf7d876cdf1526d8701884 +Subproject commit ac85a19d5f56231076d5bab95504d666b084fa3b From 04b9b373b6fb4e529c6a0b27d6fc847de97ee35d Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 7 May 2024 16:01:27 -0500 Subject: [PATCH 17/41] Block reexecutor should not try to reexecute genesis block --- blocks_reexecutor/blocks_reexecutor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index bedea37776..0ad4337e0f 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -104,7 +104,8 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block end = start + rng } // Inclusive of block reexecution [start, end] - if start > 0 { + // Do not reexecute genesis block i,e chainStart + if start > 0 && start != chainStart { start-- } // Divide work equally among available threads From 90374dc51277dd99c6fce14737f0cd17e4406f29 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Tue, 14 May 2024 12:32:23 +0200 Subject: [PATCH 18/41] add subdirectory to wal-dir to avoid filename collision between dbs, move most pebble options to experimental section --- arbnode/dataposter/storage_test.go | 2 +- cmd/conf/database.go | 171 +++++++++++++++++------------ cmd/nitro/init.go | 6 +- cmd/nitro/nitro.go | 2 +- cmd/pruning/pruning.go | 2 +- system_tests/common_test.go | 8 +- system_tests/das_test.go | 4 +- system_tests/pruning_test.go | 2 +- system_tests/staterecovery_test.go | 2 +- 9 files changed, 113 insertions(+), 86 deletions(-) diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go index 343efac3c7..e2aa321e0d 100644 --- a/arbnode/dataposter/storage_test.go +++ b/arbnode/dataposter/storage_test.go @@ -45,7 +45,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage { t.Helper() - db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions("pebble")) if err != nil { t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err) } diff --git a/cmd/conf/database.go b/cmd/conf/database.go index 1c8b673dd3..d60ee51c5b 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -101,81 +101,85 @@ func (c *PersistentConfig) Validate() error { if c.DBEngine != "leveldb" && c.DBEngine != "pebble" { return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb" or "pebble"`, c.DBEngine) } + if c.DBEngine == "pebble" { + if err := c.Pebble.Validate(); err != nil { + return err + } + } return nil } type PebbleConfig struct { - BytesPerSync int `koanf:"bytes-per-sync"` - L0CompactionFileThreshold int `koanf:"l0-compaction-file-threshold"` - L0CompactionThreshold int `koanf:"l0-compaction-threshold"` - L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"` - LBaseMaxBytes int64 `koanf:"l-base-max-bytes"` - MemTableStopWritesThreshold int `koanf:"mem-table-stop-writes-threshold"` - MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"` - DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"` - WALBytesPerSync int `koanf:"wal-bytes-per-sync"` - WALDir string `koanf:"wal-dir"` - WALMinSyncInterval int `koanf:"wal-min-sync-interval"` - TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` - Experimental PebbleExperimentalConfig `koanf:"experimental"` + MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"` + Experimental PebbleExperimentalConfig `koanf:"experimental"` +} + +var PebbleConfigDefault = PebbleConfig{ + MaxConcurrentCompactions: runtime.NumCPU(), + Experimental: PebbleExperimentalConfigDefault, +} + +func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions") + PebbleExperimentalConfigAddOptions(prefix+".experimental", f) +} + +func (c *PebbleConfig) Validate() error { + if c.MaxConcurrentCompactions < 1 { + return fmt.Errorf("invalid .max-concurrent-compactions value: %d, has to be greater then 0", c.MaxConcurrentCompactions) + } + if err := c.Experimental.Validate(); err != nil { + return err + } + return nil +} + +type PebbleExperimentalConfig struct { + BytesPerSync int `koanf:"bytes-per-sync"` + L0CompactionFileThreshold int `koanf:"l0-compaction-file-threshold"` + L0CompactionThreshold int `koanf:"l0-compaction-threshold"` + L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"` + LBaseMaxBytes int64 `koanf:"l-base-max-bytes"` + MemTableStopWritesThreshold int `koanf:"mem-table-stop-writes-threshold"` + DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"` + WALBytesPerSync int `koanf:"wal-bytes-per-sync"` + WALDir string `koanf:"wal-dir"` + WALMinSyncInterval int `koanf:"wal-min-sync-interval"` + TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"` // level specific BlockSize int `koanf:"block-size"` IndexBlockSize int `koanf:"index-block-size"` TargetFileSize int64 `koanf:"target-file-size"` TargetFileSizeEqualLevels bool `koanf:"target-file-size-equal-levels"` + + // pebble experimental + L0CompactionConcurrency int `koanf:"l0-compaction-concurrency"` + CompactionDebtConcurrency uint64 `koanf:"compaction-debt-concurrency"` + ReadCompactionRate int64 `koanf:"read-compaction-rate"` + ReadSamplingMultiplier int64 `koanf:"read-sampling-multiplier"` + MaxWriterConcurrency int `koanf:"max-writer-concurrency"` + ForceWriterParallelism bool `koanf:"force-writer-parallelism"` } -var PebbleConfigDefault = PebbleConfig{ +var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ BytesPerSync: 0, // pebble default will be used L0CompactionFileThreshold: 0, // pebble default will be used L0CompactionThreshold: 0, // pebble default will be used L0StopWritesThreshold: 0, // pebble default will be used LBaseMaxBytes: 0, // pebble default will be used MemTableStopWritesThreshold: 2, - MaxConcurrentCompactions: runtime.NumCPU(), DisableAutomaticCompactions: false, WALBytesPerSync: 0, // pebble default will be used WALDir: "", // default will use same dir as for sstables WALMinSyncInterval: 0, // pebble default will be used TargetByteDeletionRate: 0, // pebble default will be used - Experimental: PebbleExperimentalConfigDefault, - BlockSize: 4096, - IndexBlockSize: 4096, - TargetFileSize: 2 * 1024 * 1024, - TargetFileSizeEqualLevels: true, -} -func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".bytes-per-sync", PebbleConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background") - f.Int(prefix+".l0-compaction-file-threshold", PebbleConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction") - f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction") - f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached") - f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") - f.Int(prefix+".mem-table-stop-writes-threshold", PebbleConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") - f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions") - f.Bool(prefix+".disable-automatic-compactions", PebbleConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") - f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud") - f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") - f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") - f.Int(prefix+".target-byte-deletion-rate", PebbleConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") - f.Int(prefix+".block-size", PebbleConfigDefault.BlockSize, "target uncompressed size in bytes of each table block") - f.Int(prefix+".index-block-size", PebbleConfigDefault.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32)) - PebbleExperimentalConfigAddOptions(prefix+".experimental", f) - f.Int64(prefix+".target-file-size", PebbleConfigDefault.TargetFileSize, "target file size for the level 0") - f.Bool(prefix+".target-file-size-equal-levels", PebbleConfigDefault.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1") -} - -type PebbleExperimentalConfig struct { - L0CompactionConcurrency int `koanf:"l0-compaction-concurrency"` - CompactionDebtConcurrency uint64 `koanf:"compaction-debt-concurrency"` - ReadCompactionRate int64 `koanf:"read-compaction-rate"` - ReadSamplingMultiplier int64 `koanf:"read-sampling-multiplier"` - MaxWriterConcurrency int `koanf:"max-writer-concurrency"` - ForceWriterParallelism bool `koanf:"force-writer-parallelism"` -} + BlockSize: 4096, + IndexBlockSize: 4096, + TargetFileSize: 2 * 1024 * 1024, + TargetFileSizeEqualLevels: true, -var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ L0CompactionConcurrency: 0, CompactionDebtConcurrency: 0, ReadCompactionRate: 0, @@ -185,6 +189,22 @@ var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ } func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Int(prefix+".bytes-per-sync", PebbleExperimentalConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background") + f.Int(prefix+".l0-compaction-file-threshold", PebbleExperimentalConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction") + f.Int(prefix+".l0-compaction-threshold", PebbleExperimentalConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction") + f.Int(prefix+".l0-stop-writes-threshold", PebbleExperimentalConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached") + f.Int64(prefix+".l-base-max-bytes", PebbleExperimentalConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") + f.Int(prefix+".mem-table-stop-writes-threshold", PebbleExperimentalConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") + f.Bool(prefix+".disable-automatic-compactions", PebbleExperimentalConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") + f.Int(prefix+".wal-bytes-per-sync", PebbleExperimentalConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud") + f.String(prefix+".wal-dir", PebbleExperimentalConfigDefault.WALDir, "absolute path of directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") + f.Int(prefix+".wal-min-sync-interval", PebbleExperimentalConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") + f.Int(prefix+".target-byte-deletion-rate", PebbleExperimentalConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") + f.Int(prefix+".block-size", PebbleExperimentalConfigDefault.BlockSize, "target uncompressed size in bytes of each table block") + f.Int(prefix+".index-block-size", PebbleExperimentalConfigDefault.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32)) + f.Int64(prefix+".target-file-size", PebbleExperimentalConfigDefault.TargetFileSize, "target file size for the level 0") + f.Bool(prefix+".target-file-size-equal-levels", PebbleExperimentalConfigDefault.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1") + f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions.") f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen.") f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate") @@ -193,42 +213,54 @@ func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.") } -func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { +func (c *PebbleExperimentalConfig) Validate() error { + if !filepath.IsAbs(c.WALDir) { + return fmt.Errorf("invalid .wal-dir directory (%s) - has to be an absolute path", c.WALDir) + } + // TODO + return nil +} + +func (c *PebbleConfig) ExtraOptions(namespace string) *pebble.ExtraOptions { var maxConcurrentCompactions func() int if c.MaxConcurrentCompactions > 0 { maxConcurrentCompactions = func() int { return c.MaxConcurrentCompactions } } var walMinSyncInterval func() time.Duration - if c.WALMinSyncInterval > 0 { + if c.Experimental.WALMinSyncInterval > 0 { walMinSyncInterval = func() time.Duration { - return time.Microsecond * time.Duration(c.WALMinSyncInterval) + return time.Microsecond * time.Duration(c.Experimental.WALMinSyncInterval) } } var levels []pebble.ExtraLevelOptions for i := 0; i < 7; i++ { - targetFileSize := c.TargetFileSize - if !c.TargetFileSizeEqualLevels { + targetFileSize := c.Experimental.TargetFileSize + if !c.Experimental.TargetFileSizeEqualLevels { targetFileSize = targetFileSize << i } levels = append(levels, pebble.ExtraLevelOptions{ - BlockSize: c.BlockSize, - IndexBlockSize: c.IndexBlockSize, + BlockSize: c.Experimental.BlockSize, + IndexBlockSize: c.Experimental.IndexBlockSize, TargetFileSize: targetFileSize, }) } + walDir := c.Experimental.WALDir + if walDir != "" { + walDir = path.Join(walDir, namespace) + } return &pebble.ExtraOptions{ - BytesPerSync: c.BytesPerSync, - L0CompactionFileThreshold: c.L0CompactionFileThreshold, - L0CompactionThreshold: c.L0CompactionThreshold, - L0StopWritesThreshold: c.L0StopWritesThreshold, - LBaseMaxBytes: c.LBaseMaxBytes, - MemTableStopWritesThreshold: c.MemTableStopWritesThreshold, + BytesPerSync: c.Experimental.BytesPerSync, + L0CompactionFileThreshold: c.Experimental.L0CompactionFileThreshold, + L0CompactionThreshold: c.Experimental.L0CompactionThreshold, + L0StopWritesThreshold: c.Experimental.L0StopWritesThreshold, + LBaseMaxBytes: c.Experimental.LBaseMaxBytes, + MemTableStopWritesThreshold: c.Experimental.MemTableStopWritesThreshold, MaxConcurrentCompactions: maxConcurrentCompactions, - DisableAutomaticCompactions: c.DisableAutomaticCompactions, - WALBytesPerSync: c.WALBytesPerSync, - WALDir: c.WALDir, + DisableAutomaticCompactions: c.Experimental.DisableAutomaticCompactions, + WALBytesPerSync: c.Experimental.WALBytesPerSync, + WALDir: walDir, WALMinSyncInterval: walMinSyncInterval, - TargetByteDeletionRate: c.TargetByteDeletionRate, + TargetByteDeletionRate: c.Experimental.TargetByteDeletionRate, Experimental: pebble.ExtraOptionsExperimental{ L0CompactionConcurrency: c.Experimental.L0CompactionConcurrency, CompactionDebtConcurrency: c.Experimental.CompactionDebtConcurrency, @@ -240,8 +272,3 @@ func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions { Levels: levels, } } - -func (c *PebbleConfig) Validate() error { - // TODO - return nil -} diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 9362154ec0..31ce4b91ea 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -172,13 +172,13 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { - if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions()); err == nil { + if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil { if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil { readOnlyDb.Close() if !arbmath.BigEquals(chainConfig.ChainID, chainId) { return nil, nil, fmt.Errorf("database has chain ID %v but config has chain ID %v (are you sure this database is for the right chain?)", chainConfig.ChainID, chainId) } - chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions()) + chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions("l2chaindata")) if err != nil { return chainDb, nil, err } @@ -230,7 +230,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo var initDataReader statetransfer.InitDataReader = nil - chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions()) + chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions("l2chaindata")) if err != nil { return chainDb, nil, err } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 277afa302a..4ee042d477 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -489,7 +489,7 @@ func mainImpl() int { return 1 } - arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, nodeConfig.Persistent.Pebble.ExtraOptions()) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, nodeConfig.Persistent.Pebble.ExtraOptions("arbitrumdata")) deferFuncs = append(deferFuncs, func() { closeDb(arbDb, "arbDb") }) if err != nil { log.Error("failed to open database", "err", err) diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index 363126a49f..72e7d2c516 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -85,7 +85,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node if chainConfig == nil { return nil, errors.New("database doesn't have a chain config (was this node initialized?)") } - arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", true, persistentConfig.Pebble.ExtraOptions()) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", true, persistentConfig.Pebble.ExtraOptions("arbitrumdata")) if err != nil { return nil, err } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index fd63eb9431..4aa8581bd0 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -774,9 +774,9 @@ func createL2BlockChainWithStackConfig( Require(t, err) // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs - chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) - arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) Require(t, err) initReader := statetransfer.NewMemoryInitDataReader(&l2info.ArbInitData) @@ -979,9 +979,9 @@ func Create2ndNodeWithConfig( Require(t, err) // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs - l2chainDb, err := l2stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + l2chainDb, err := l2stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) - l2arbDb, err := l2stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + l2arbDb, err := l2stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) Require(t, err) initReader := statetransfer.NewMemoryInitDataReader(l2InitData) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 7495b9a13e..2febadb3d2 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -179,10 +179,10 @@ func TestDASRekey(t *testing.T) { Require(t, err) // TODO get pebble.ExtraOptions from conf.PersistentConfig - l2chainDb, err := l2stackA.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + l2chainDb, err := l2stackA.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) - l2arbDb, err := l2stackA.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + l2arbDb, err := l2stackA.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) Require(t, err) l2blockchain, err := gethexec.GetBlockChain(l2chainDb, nil, chainConfig, gethexec.ConfigDefaultTest().TxLookupLimit) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index e83c350804..d2453887ee 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -66,7 +66,7 @@ func TestPruning(t *testing.T) { Require(t, err) defer stack.Close() // TODO get pebble.ExtraOptions from conf.PersistentConfig - chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) defer chainDb.Close() chainDbEntriesBeforePruning := countStateEntries(chainDb) diff --git a/system_tests/staterecovery_test.go b/system_tests/staterecovery_test.go index 9dc1081a7b..459a6e3ee8 100644 --- a/system_tests/staterecovery_test.go +++ b/system_tests/staterecovery_test.go @@ -51,7 +51,7 @@ func TestRectreateMissingStates(t *testing.T) { Require(t, err) defer stack.Close() // TODO get pebble.ExtraOptions from conf.PersistentConfig - chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions()) + chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) defer chainDb.Close() cacheConfig := gethexec.DefaultCacheConfigFor(stack, &gethexec.DefaultCachingConfig) From d949c071bee0a9969955dc97834bdd168cabcd95 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Tue, 14 May 2024 12:41:53 +0200 Subject: [PATCH 19/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index ac85a19d5f..6d23a7b7e6 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit ac85a19d5f56231076d5bab95504d666b084fa3b +Subproject commit 6d23a7b7e6a99701adf1f69701ad367dec61c08c From cb72afd4ba1977b41bc4587901bea052a31e9f54 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Tue, 14 May 2024 17:16:28 +0200 Subject: [PATCH 20/41] set PebbleConfig defaults to geth / pebble defaults --- cmd/conf/database.go | 34 +++++++++++++++++----------------- cmd/nitro/nitro.go | 1 - 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/cmd/conf/database.go b/cmd/conf/database.go index d60ee51c5b..57674ba7f1 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -163,27 +163,27 @@ type PebbleExperimentalConfig struct { } var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ - BytesPerSync: 0, // pebble default will be used - L0CompactionFileThreshold: 0, // pebble default will be used - L0CompactionThreshold: 0, // pebble default will be used - L0StopWritesThreshold: 0, // pebble default will be used - LBaseMaxBytes: 0, // pebble default will be used + BytesPerSync: 512 << 10, // 512 KB + L0CompactionFileThreshold: 500, + L0CompactionThreshold: 4, + L0StopWritesThreshold: 12, + LBaseMaxBytes: 64 << 20, // 64 MB MemTableStopWritesThreshold: 2, DisableAutomaticCompactions: false, - WALBytesPerSync: 0, // pebble default will be used - WALDir: "", // default will use same dir as for sstables - WALMinSyncInterval: 0, // pebble default will be used - TargetByteDeletionRate: 0, // pebble default will be used + WALBytesPerSync: 0, // no background syncing + WALDir: "", // use same dir as for sstables + WALMinSyncInterval: 0, // no artificial delay + TargetByteDeletionRate: 0, // deletion pacing disabled - BlockSize: 4096, - IndexBlockSize: 4096, - TargetFileSize: 2 * 1024 * 1024, + BlockSize: 4 << 10, // 4 KB + IndexBlockSize: 4 << 10, // 4 KB + TargetFileSize: 2 << 20, // 2 MB TargetFileSizeEqualLevels: true, - L0CompactionConcurrency: 0, - CompactionDebtConcurrency: 0, - ReadCompactionRate: 0, - ReadSamplingMultiplier: -1, + L0CompactionConcurrency: 10, + CompactionDebtConcurrency: 1 << 30, // 1GB + ReadCompactionRate: 16000, // see ReadSamplingMultiplier comment + ReadSamplingMultiplier: -1, // geth default, disables read sampling and disables read triggered compaction MaxWriterConcurrency: 0, ForceWriterParallelism: false, } @@ -196,7 +196,7 @@ func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int64(prefix+".l-base-max-bytes", PebbleExperimentalConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") f.Int(prefix+".mem-table-stop-writes-threshold", PebbleExperimentalConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") f.Bool(prefix+".disable-automatic-compactions", PebbleExperimentalConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") - f.Int(prefix+".wal-bytes-per-sync", PebbleExperimentalConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud") + f.Int(prefix+".wal-bytes-per-sync", PebbleExperimentalConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the background") f.String(prefix+".wal-dir", PebbleExperimentalConfigDefault.WALDir, "absolute path of directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") f.Int(prefix+".wal-min-sync-interval", PebbleExperimentalConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") f.Int(prefix+".target-byte-deletion-rate", PebbleExperimentalConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 434f36eeb2..9cf2a1a136 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -177,7 +177,6 @@ func mainImpl() int { nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) nodeConfig.GraphQL.Apply(&stackConf) - if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } From 03ee1dc52e2f163b569b17b36edb96c65a04d9c2 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 14 May 2024 10:38:30 -0500 Subject: [PATCH 21/41] address PR comments --- blocks_reexecutor/blocks_reexecutor.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index 0ad4337e0f..a03b29fefd 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -25,6 +25,8 @@ type Config struct { EndBlock uint64 `koanf:"end-block"` Room int `koanf:"room"` BlocksPerThread uint64 `koanf:"blocks-per-thread"` + + blocksPerThread uint64 } func (c *Config) Validate() error { @@ -35,8 +37,13 @@ func (c *Config) Validate() error { if c.EndBlock < c.StartBlock { return errors.New("invalid block range for blocks re-execution") } - if c.Room == 0 { - return errors.New("room for blocks re-execution cannot be zero") + if c.Room < 0 { + return errors.New("room for blocks re-execution should be greater than 0") + } + if c.BlocksPerThread != 0 { + c.blocksPerThread = c.BlocksPerThread + } else { + c.blocksPerThread = 10000 } return nil } @@ -52,6 +59,7 @@ var TestConfig = Config{ Mode: "full", Room: runtime.NumCPU(), BlocksPerThread: 10, + blocksPerThread: 10, } func ConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -93,10 +101,7 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block } if c.Mode == "random" && end != start { // Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly - rng := uint64(10000) - if c.BlocksPerThread != 0 { - rng = c.BlocksPerThread - } + rng := c.blocksPerThread if rng > end-start { rng = end - start } @@ -108,12 +113,11 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block if start > 0 && start != chainStart { start-- } - // Divide work equally among available threads + // Divide work equally among available threads when BlocksPerThread is zero if c.BlocksPerThread == 0 { - c.BlocksPerThread = 10000 work := (end - start) / uint64(c.Room) if work > 0 { - c.BlocksPerThread = work + c.blocksPerThread = work } } return &BlocksReExecutor{ @@ -132,12 +136,10 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block // LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 { - start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread) + start := arbmath.SaturatingUSub(currentBlock, s.config.blocksPerThread) if start < s.startBlock { start = s.startBlock } - // we don't use state release pattern here - // TODO do we want to use release pattern here? startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) if err != nil { s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err) From e35c1c0d43cdc45a467767acdac326b1c8adf2ed Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 15 May 2024 21:02:31 +0200 Subject: [PATCH 22/41] clean up TODOs --- system_tests/common_test.go | 2 -- system_tests/das_test.go | 1 - system_tests/pruning_test.go | 1 - system_tests/staterecovery_test.go | 1 - 4 files changed, 5 deletions(-) diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 7a5296516e..f8ba4c8b77 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -773,7 +773,6 @@ func createL2BlockChainWithStackConfig( stack, err = node.New(stackConfig) Require(t, err) - // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) @@ -978,7 +977,6 @@ func Create2ndNodeWithConfig( l2stack, err := node.New(stackConfig) Require(t, err) - // TODO get pebble.ExtraOptions from conf.PersistentConfig when opening the DBs l2chainDb, err := l2stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) l2arbDb, err := l2stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 2febadb3d2..11d887315a 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -178,7 +178,6 @@ func TestDASRekey(t *testing.T) { l2stackA, err := node.New(stackConfig) Require(t, err) - // TODO get pebble.ExtraOptions from conf.PersistentConfig l2chainDb, err := l2stackA.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index d2453887ee..041781ac48 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -65,7 +65,6 @@ func TestPruning(t *testing.T) { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() - // TODO get pebble.ExtraOptions from conf.PersistentConfig chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) defer chainDb.Close() diff --git a/system_tests/staterecovery_test.go b/system_tests/staterecovery_test.go index 459a6e3ee8..a20cffc787 100644 --- a/system_tests/staterecovery_test.go +++ b/system_tests/staterecovery_test.go @@ -50,7 +50,6 @@ func TestRectreateMissingStates(t *testing.T) { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() - // TODO get pebble.ExtraOptions from conf.PersistentConfig chainDb, err := stack.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) Require(t, err) defer chainDb.Close() From b426fdd5ec0cd6ade88de0eadd17caf7202083cb Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 15 May 2024 21:26:21 +0200 Subject: [PATCH 23/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 6d23a7b7e6..5b7b36a339 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 6d23a7b7e6a99701adf1f69701ad367dec61c08c +Subproject commit 5b7b36a339ac28d708bca072dc5ec8189ceadac2 From 459106163d80806e556f2c23c896cc74acd0c1d3 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 21:34:04 +0200 Subject: [PATCH 24/41] Gracefully shutdown consumer on interrupts --- pubsub/consumer.go | 48 ++++++++++++++++++++++++++++++++++++++++--- pubsub/pubsub_test.go | 8 +++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7a5078ee00..af1345f059 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,6 +5,10 @@ import ( "encoding/json" "errors" "fmt" + "os" + "os/signal" + "sync/atomic" + "syscall" "time" "github.com/ethereum/go-ethereum/log" @@ -46,6 +50,10 @@ type Consumer[Request any, Response any] struct { redisStream string redisGroup string cfg *ConsumerConfig + // terminating indicates whether interrupt was received, in which case + // consumer should clean up for graceful shutdown. + terminating atomic.Bool + signals chan os.Signal } type Message[Request any] struct { @@ -57,29 +65,51 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream if streamName == "" { return nil, fmt.Errorf("redis stream name cannot be empty") } - consumer := &Consumer[Request, Response]{ + return &Consumer[Request, Response]{ id: uuid.NewString(), client: client, redisStream: streamName, redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, - } - return consumer, nil + terminating: atomic.Bool{}, + signals: make(chan os.Signal, 1), + }, nil } // Start starts the consumer to iteratively perform heartbeat in configured intervals. func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) + c.listenForInterrupt() c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { + if !c.terminating.Load() { + log.Trace("Consumer is terminating, stopping heartbeat update") + return time.Hour + } c.heartBeat(ctx) return c.cfg.KeepAliveTimeout / 10 }, ) } +// listenForInterrupt launches a thread that notifies the channel when interrupt +// is received. +func (c *Consumer[Request, Response]) listenForInterrupt() { + signal.Notify(c.signals, syscall.SIGINT, syscall.SIGTERM) + c.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case sig := <-c.signals: + log.Info("Received interrup", "signal", sig.String()) + case <-ctx.Done(): + log.Info("Context is done", "error", ctx.Err()) + } + c.deleteHeartBeat(ctx) + }) +} + func (c *Consumer[Request, Response]) StopAndWait() { c.StopWaiter.StopAndWait() + c.deleteHeartBeat(c.GetContext()) } func heartBeatKey(id string) string { @@ -90,6 +120,18 @@ func (c *Consumer[Request, Response]) heartBeatKey() string { return heartBeatKey(c.id) } +// deleteHeartBeat deletes the heartbeat to indicate it is being shut down. +func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) { + c.terminating.Store(true) + if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil { + l := log.Info + if ctx.Err() != nil { + l = log.Error + } + l("Deleting heardbeat", "consumer", c.id, "error", err) + } +} + // heartBeat updates the heartBeat key indicating aliveness. func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) { if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 31f6d9e20a..11407e686f 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "sort" "testing" @@ -232,7 +233,12 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - consumers[i].StopAndWait() + // Terminate half of the consumers, send interrupt to others. + if i%2 == 0 { + consumers[i].StopAndWait() + } else { + consumers[i].signals <- os.Interrupt + } } } From f18419b0fb92bf523668d1272c1bd9764c5015ed Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 22:48:37 +0200 Subject: [PATCH 25/41] Delete heartbeat before stopAndWait --- pubsub/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index af1345f059..d74d4ef1b2 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -108,8 +108,8 @@ func (c *Consumer[Request, Response]) listenForInterrupt() { } func (c *Consumer[Request, Response]) StopAndWait() { - c.StopWaiter.StopAndWait() c.deleteHeartBeat(c.GetContext()) + c.StopWaiter.StopAndWait() } func heartBeatKey(id string) string { From 3373c4a5b84010a7cd5f3b47a7f43de5cf46f476 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 23:18:47 +0200 Subject: [PATCH 26/41] Fix test --- pubsub/consumer.go | 1 - pubsub/pubsub_test.go | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index d74d4ef1b2..97ab004764 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -108,7 +108,6 @@ func (c *Consumer[Request, Response]) listenForInterrupt() { } func (c *Consumer[Request, Response]) StopAndWait() { - c.deleteHeartBeat(c.GetContext()) c.StopWaiter.StopAndWait() } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 11407e686f..9111c5cf66 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -7,6 +7,7 @@ import ( "os" "sort" "testing" + "time" "github.com/ethereum/go-ethereum/log" "github.com/go-redis/redis/v8" @@ -202,6 +203,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques } func TestRedisProduce(t *testing.T) { + log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) t.Parallel() for _, tc := range []struct { name string @@ -213,7 +215,7 @@ func TestRedisProduce(t *testing.T) { }, { name: "some consumers killed, others should take over their work", - killConsumers: false, + killConsumers: true, }, } { t.Run(tc.name, func(t *testing.T) { @@ -233,7 +235,7 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - // Terminate half of the consumers, send interrupt to others. + //Terminate half of the consumers, send interrupt to others. if i%2 == 0 { consumers[i].StopAndWait() } else { @@ -242,6 +244,7 @@ func TestRedisProduce(t *testing.T) { } } + time.Sleep(time.Second) gotMessages, wantResponses := consume(ctx, t, consumers) gotResponses, err := awaitResponses(ctx, promises) if err != nil { From 3fd412e78e725b0bc850e800741b16effab491b3 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 23:46:56 +0200 Subject: [PATCH 27/41] Fix lint --- pubsub/pubsub_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 9111c5cf66..85314dc29a 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -235,7 +235,7 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - //Terminate half of the consumers, send interrupt to others. + // Terminate half of the consumers, send interrupt to others. if i%2 == 0 { consumers[i].StopAndWait() } else { From 6884188d20b089f9320b6fc26bad6d049583364f Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 17 May 2024 13:21:12 -0500 Subject: [PATCH 28/41] address PR comments --- blocks_reexecutor/blocks_reexecutor.go | 46 ++++++++++++-------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index a03b29fefd..f58e0ce00f 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -25,8 +25,6 @@ type Config struct { EndBlock uint64 `koanf:"end-block"` Room int `koanf:"room"` BlocksPerThread uint64 `koanf:"blocks-per-thread"` - - blocksPerThread uint64 } func (c *Config) Validate() error { @@ -40,11 +38,6 @@ func (c *Config) Validate() error { if c.Room < 0 { return errors.New("room for blocks re-execution should be greater than 0") } - if c.BlocksPerThread != 0 { - c.blocksPerThread = c.BlocksPerThread - } else { - c.blocksPerThread = 10000 - } return nil } @@ -59,7 +52,6 @@ var TestConfig = Config{ Mode: "full", Room: runtime.NumCPU(), BlocksPerThread: 10, - blocksPerThread: 10, } func ConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -73,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { type BlocksReExecutor struct { stopwaiter.StopWaiter - config *Config - blockchain *core.BlockChain - stateFor arbitrum.StateForHeaderFunction - done chan struct{} - fatalErrChan chan error - startBlock uint64 - currentBlock uint64 + config *Config + blockchain *core.BlockChain + stateFor arbitrum.StateForHeaderFunction + done chan struct{} + fatalErrChan chan error + startBlock uint64 + currentBlock uint64 + blocksPerThread uint64 } func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor { @@ -99,9 +92,13 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd) end = chainEnd } + blocksPerThread := uint64(10000) + if c.BlocksPerThread != 0 { + blocksPerThread = c.BlocksPerThread + } if c.Mode == "random" && end != start { // Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly - rng := c.blocksPerThread + rng := blocksPerThread if rng > end-start { rng = end - start } @@ -117,16 +114,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block if c.BlocksPerThread == 0 { work := (end - start) / uint64(c.Room) if work > 0 { - c.blocksPerThread = work + blocksPerThread = work } } return &BlocksReExecutor{ - config: c, - blockchain: blockchain, - currentBlock: end, - startBlock: start, - done: make(chan struct{}, c.Room), - fatalErrChan: fatalErrChan, + config: c, + blockchain: blockchain, + currentBlock: end, + startBlock: start, + blocksPerThread: blocksPerThread, + done: make(chan struct{}, c.Room), + fatalErrChan: fatalErrChan, stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) { state, err := blockchain.StateAt(header.Root) return state, arbitrum.NoopStateRelease, err @@ -136,7 +134,7 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block // LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 { - start := arbmath.SaturatingUSub(currentBlock, s.config.blocksPerThread) + start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread) if start < s.startBlock { start = s.startBlock } From aaf4d1c8ce1baa12d14b3becaf51510fb687d654 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 21 May 2024 21:03:29 -0500 Subject: [PATCH 29/41] Fix off-by-one in data poster nonce check --- arbnode/dataposter/data_poster.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index fb35ac3c8d..34ca9e1483 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -857,24 +857,23 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed - var latestBlockNumber, prevBlockNumber, reorgResistantNonce uint64 if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent { - latestBlockNumber, err = p.client.BlockNumber(ctx) + latestBlockNumber, err := p.client.BlockNumber(ctx) if err != nil { return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } - prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1) - reorgResistantNonce, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) + prevBlockNumber := arbmath.SaturatingUSub(latestBlockNumber, 1) + reorgResistantTxCount, err := p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) if err != nil { return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } - if precedingTx.FullTx.Nonce() > reorgResistantNonce { - log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent) + if newTx.FullTx.Nonce() > reorgResistantTxCount { + log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount) return nil } } else { - log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantNonce", reorgResistantNonce) + log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent) } } } From 345e828b430efff7b66d401abe21759cc0af3abc Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 21 May 2024 21:41:53 -0500 Subject: [PATCH 30/41] Always log when sending previously unsent tx --- arbnode/dataposter/data_poster.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 34ca9e1483..399bc19dbd 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -857,13 +857,14 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed + var latestBlockNumber, prevBlockNumber, reorgResistantTxCount uint64 if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent { - latestBlockNumber, err := p.client.BlockNumber(ctx) + latestBlockNumber, err = p.client.BlockNumber(ctx) if err != nil { return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } - prevBlockNumber := arbmath.SaturatingUSub(latestBlockNumber, 1) - reorgResistantTxCount, err := p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) + prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1) + reorgResistantTxCount, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) if err != nil { return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } @@ -872,9 +873,8 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount) return nil } - } else { - log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent) } + log.Debug("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount) } } From 8737d5ccca5c252797af89906f1c5840df93d6ee Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 21 May 2024 21:45:39 -0500 Subject: [PATCH 31/41] Improve previouslySent check --- arbnode/dataposter/data_poster.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 399bc19dbd..5aaef959d8 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -851,7 +851,8 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti // different type with a lower nonce. // If we decide not to send this tx yet, just leave it queued and with Sent set to false. // The resending/repricing loop in DataPoster.Start will keep trying. - if !newTx.Sent && newTx.FullTx.Nonce() > 0 { + previouslySent := newTx.Sent || (prevTx != nil && prevTx.Sent) // if we've previously sent this nonce + if !previouslySent && newTx.FullTx.Nonce() > 0 { precedingTx, err := p.queue.Get(ctx, arbmath.SaturatingUSub(newTx.FullTx.Nonce(), 1)) if err != nil { return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) From 65f8bb569adeb743f645412df0e4c80346920c39 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Wed, 22 May 2024 13:53:13 +0200 Subject: [PATCH 32/41] update geth --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 5b7b36a339..07f6d7a8c1 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 5b7b36a339ac28d708bca072dc5ec8189ceadac2 +Subproject commit 07f6d7a8c149f8752aa8deef4598cfd184a37e94 From 5570fb3e57c574b2004aca9ecd3ad1831bd5ee4c Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 22 May 2024 14:18:09 +0200 Subject: [PATCH 33/41] Drop listenForInterrupt, since stopAndWait is already called on sigint --- pubsub/consumer.go | 22 +--------------------- pubsub/pubsub_test.go | 10 +++------- 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 97ab004764..0288c19e45 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,10 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "os" - "os/signal" "sync/atomic" - "syscall" "time" "github.com/ethereum/go-ethereum/log" @@ -53,7 +50,6 @@ type Consumer[Request any, Response any] struct { // terminating indicates whether interrupt was received, in which case // consumer should clean up for graceful shutdown. terminating atomic.Bool - signals chan os.Signal } type Message[Request any] struct { @@ -72,14 +68,12 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, terminating: atomic.Bool{}, - signals: make(chan os.Signal, 1), }, nil } // Start starts the consumer to iteratively perform heartbeat in configured intervals. func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) - c.listenForInterrupt() c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { if !c.terminating.Load() { @@ -92,22 +86,8 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { ) } -// listenForInterrupt launches a thread that notifies the channel when interrupt -// is received. -func (c *Consumer[Request, Response]) listenForInterrupt() { - signal.Notify(c.signals, syscall.SIGINT, syscall.SIGTERM) - c.StopWaiter.LaunchThread(func(ctx context.Context) { - select { - case sig := <-c.signals: - log.Info("Received interrup", "signal", sig.String()) - case <-ctx.Done(): - log.Info("Context is done", "error", ctx.Err()) - } - c.deleteHeartBeat(ctx) - }) -} - func (c *Consumer[Request, Response]) StopAndWait() { + c.deleteHeartBeat(c.GetParentContext()) c.StopWaiter.StopAndWait() } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 85314dc29a..cdf5fa1ef6 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -232,15 +232,11 @@ func TestRedisProduce(t *testing.T) { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { + consumers[i].Start(ctx) if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - // Terminate half of the consumers, send interrupt to others. - if i%2 == 0 { - consumers[i].StopAndWait() - } else { - consumers[i].signals <- os.Interrupt - } + consumers[i].StopAndWait() } } @@ -252,7 +248,7 @@ func TestRedisProduce(t *testing.T) { } producer.StopAndWait() for _, c := range consumers { - c.StopWaiter.StopAndWait() + c.StopAndWait() } got, err := mergeValues(gotMessages) if err != nil { From 9a866114c9ea15e6efd0bf4452dfa0ec67cdb3b8 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 22 May 2024 14:55:52 +0200 Subject: [PATCH 34/41] Fix test --- pubsub/pubsub_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index cdf5fa1ef6..72504602e3 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -285,6 +285,7 @@ func TestRedisReproduceDisabled(t *testing.T) { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { + consumers[i].Start(ctx) if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } From 0ce93785e406c3375cb1931297b4e9580e4faf4f Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 22 May 2024 08:47:57 -0600 Subject: [PATCH 35/41] block_validator: fail but dont segfault if no validator --- staker/block_validator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 027ee78248..5a511920f2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -791,8 +791,9 @@ validationsLoop: } for _, moduleRoot := range wasmRoots { if v.chosenValidator[moduleRoot] == nil { - v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot)) - continue + notFoundErr := fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot) + v.possiblyFatal(notFoundErr) + return nil, notFoundErr } if v.chosenValidator[moduleRoot].Room() == 0 { log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot) @@ -1107,7 +1108,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } } if v.chosenValidator[root] == nil { - log.Error("validator not found", "WasmModuleRoot", root) + return fmt.Errorf("cannot validate WasmModuleRoot %v", root) } } } From 16c95d730f4614625c9d5a10b88984d06aaac645 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 17:28:11 +0200 Subject: [PATCH 36/41] Delete heartbeat after stopAndWait --- pubsub/consumer.go | 2 +- system_tests/block_validator_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 0288c19e45..4b51d24f2d 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -87,8 +87,8 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { } func (c *Consumer[Request, Response]) StopAndWait() { - c.deleteHeartBeat(c.GetParentContext()) c.StopWaiter.StopAndWait() + c.deleteHeartBeat(c.GetParentContext()) } func heartBeatKey(id string) string { diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index dfd892a079..debd6d4c7c 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -72,7 +72,7 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops redisURL := "" if useRedisStreams { redisURL = redisutil.CreateTestRedis(ctx, t) - validatorConfig.BlockValidator.RedisValidationClientConfig = redis.DefaultValidationClientConfig + validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL } From 8504c5c0ba8303fdf18ce8efc0f94b1e81b47f00 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 23 May 2024 08:54:02 -0700 Subject: [PATCH 37/41] Update blocks_reexecutor/blocks_reexecutor.go Co-authored-by: Maciej Kulawik <10907694+magicxyyz@users.noreply.github.com> --- blocks_reexecutor/blocks_reexecutor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index f58e0ce00f..1e4a06fe90 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -35,7 +35,7 @@ func (c *Config) Validate() error { if c.EndBlock < c.StartBlock { return errors.New("invalid block range for blocks re-execution") } - if c.Room < 0 { + if c.Room <= 0 { return errors.New("room for blocks re-execution should be greater than 0") } return nil From 56fc8d4b867351b9d2ed7714360389dd5d5b76ee Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 20:50:19 +0200 Subject: [PATCH 38/41] Drop terminating atomic bool --- pubsub/consumer.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 4b51d24f2d..c9590de8e6 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -47,9 +46,6 @@ type Consumer[Request any, Response any] struct { redisStream string redisGroup string cfg *ConsumerConfig - // terminating indicates whether interrupt was received, in which case - // consumer should clean up for graceful shutdown. - terminating atomic.Bool } type Message[Request any] struct { @@ -67,7 +63,6 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream redisStream: streamName, redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, - terminating: atomic.Bool{}, }, nil } @@ -76,10 +71,6 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { - if !c.terminating.Load() { - log.Trace("Consumer is terminating, stopping heartbeat update") - return time.Hour - } c.heartBeat(ctx) return c.cfg.KeepAliveTimeout / 10 }, @@ -101,7 +92,6 @@ func (c *Consumer[Request, Response]) heartBeatKey() string { // deleteHeartBeat deletes the heartbeat to indicate it is being shut down. func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) { - c.terminating.Store(true) if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil { l := log.Info if ctx.Err() != nil { From e44cfafd05066c250a98c1960240ba88a1087a11 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 23 May 2024 16:54:43 -0600 Subject: [PATCH 39/41] testnode: update pin --- nitro-testnode | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nitro-testnode b/nitro-testnode index e530842e58..c334820b2d 160000 --- a/nitro-testnode +++ b/nitro-testnode @@ -1 +1 @@ -Subproject commit e530842e583e2f3543f97a71c3a7cb53f8a10814 +Subproject commit c334820b2dba6dfa4078f81ed242afbbccc19c91 From 570c31d51d692610607096b4f4a4e92ffa5538d0 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 23 May 2024 16:20:07 -0700 Subject: [PATCH 40/41] update geth pin --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 07f6d7a8c1..f45f6d7560 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 07f6d7a8c149f8752aa8deef4598cfd184a37e94 +Subproject commit f45f6d75601626daf108aa62ea6cb1549d91c528 From dd27ef17d584d607e0972ac7cd12c734ebf1462d Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 23 May 2024 23:31:18 -0500 Subject: [PATCH 41/41] Allow 0x prefix for allowed-wasm-module-roots flag --- cmd/nitro/nitro.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 26aedfbfb7..427974b34f 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -453,7 +453,7 @@ func mainImpl() int { if len(allowedWasmModuleRoots) > 0 { moduleRootMatched := false for _, root := range allowedWasmModuleRoots { - bytes, err := hex.DecodeString(root) + bytes, err := hex.DecodeString(strings.TrimPrefix(root, "0x")) if err == nil { if common.HexToHash(root) == common.BytesToHash(bytes) { moduleRootMatched = true