diff --git a/Dockerfile b/Dockerfile index c8b36f0785..37226c397c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -264,6 +264,8 @@ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/nitro-val /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/seq-coordinator-manager /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/prover /usr/local/bin/ +COPY --from=node-builder /workspace/target/bin/dbconv /usr/local/bin/ +COPY ./scripts/convert-databases.bash /usr/local/bin/ COPY --from=machine-versions /workspace/machines /home/user/target/machines COPY ./scripts/validate-wasm-module-root.sh . RUN ./validate-wasm-module-root.sh /home/user/target/machines /usr/local/bin/prover diff --git a/Makefile b/Makefile index dc8927dd22..e46bdbbe61 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,7 @@ all: build build-replay-env test-gen-proofs @touch .make/all .PHONY: build -build: $(patsubst %,$(output_root)/bin/%, nitro deploy relay daserver datool seq-coordinator-invalidate nitro-val seq-coordinator-manager) +build: $(patsubst %,$(output_root)/bin/%, nitro deploy relay daserver datool seq-coordinator-invalidate nitro-val seq-coordinator-manager dbconv) @printf $(done) .PHONY: build-node-deps @@ -310,6 +310,9 @@ $(output_root)/bin/nitro-val: $(DEP_PREDICATE) build-node-deps $(output_root)/bin/seq-coordinator-manager: $(DEP_PREDICATE) build-node-deps go build $(GOLANG_PARAMS) -o $@ "$(CURDIR)/cmd/seq-coordinator-manager" +$(output_root)/bin/dbconv: $(DEP_PREDICATE) build-node-deps + go build $(GOLANG_PARAMS) -o $@ "$(CURDIR)/cmd/dbconv" + # recompile wasm, but don't change timestamp unless files differ $(replay_wasm): $(DEP_PREDICATE) $(go_source) .make/solgen mkdir -p `dirname $(replay_wasm)` diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 98c19ce361..a582b64ffa 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -39,6 +39,7 @@ type SeqCoordinator struct { redisutil.RedisCoordinator + sync *SyncMonitor streamer *TransactionStreamer sequencer execution.ExecutionSequencer delayedSequencer *DelayedSequencer @@ -69,9 +70,10 @@ type SeqCoordinatorConfig struct { SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"` ReleaseRetries int `koanf:"release-retries"` // Max message per poll. - MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` - MyUrl string `koanf:"my-url"` - Signer signature.SignVerifyConfig `koanf:"signer"` + MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` + MyUrl string `koanf:"my-url"` + DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"` + Signer signature.SignVerifyConfig `koanf:"signer"` } func (c *SeqCoordinatorConfig) Url() string { @@ -95,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown") f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind") f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen") + f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis") signature.SignVerifyConfigAddOptions(prefix+".signer", f) } @@ -104,7 +107,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RedisUrl: "", LockoutDuration: time.Minute, LockoutSpare: 30 * time.Second, - SeqNumDuration: 24 * time.Hour, + SeqNumDuration: 10 * 24 * time.Hour, UpdateInterval: 250 * time.Millisecond, HandoffTimeout: 30 * time.Second, SafeShutdownDelay: 5 * time.Second, @@ -112,23 +115,25 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RetryInterval: 50 * time.Millisecond, MsgPerPoll: 2000, MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, Signer: signature.DefaultSignVerifyConfig, } var TestSeqCoordinatorConfig = SeqCoordinatorConfig{ - Enable: false, - RedisUrl: "", - LockoutDuration: time.Second * 2, - LockoutSpare: time.Millisecond * 10, - SeqNumDuration: time.Minute * 10, - UpdateInterval: time.Millisecond * 10, - HandoffTimeout: time.Millisecond * 200, - SafeShutdownDelay: time.Millisecond * 100, - ReleaseRetries: 4, - RetryInterval: time.Millisecond * 3, - MsgPerPoll: 20, - MyUrl: redisutil.INVALID_URL, - Signer: signature.DefaultSignVerifyConfig, + Enable: false, + RedisUrl: "", + LockoutDuration: time.Second * 2, + LockoutSpare: time.Millisecond * 10, + SeqNumDuration: time.Minute * 10, + UpdateInterval: time.Millisecond * 10, + HandoffTimeout: time.Millisecond * 200, + SafeShutdownDelay: time.Millisecond * 100, + ReleaseRetries: 4, + RetryInterval: time.Millisecond * 3, + MsgPerPoll: 20, + MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, + Signer: signature.DefaultSignVerifyConfig, } func NewSeqCoordinator( @@ -149,6 +154,7 @@ func NewSeqCoordinator( } coordinator := &SeqCoordinator{ RedisCoordinator: *redisCoordinator, + sync: sync, streamer: streamer, sequencer: sequencer, config: config, @@ -338,6 +344,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC return nil } +func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result() + if err != nil { + return 0, err + } + return c.signedBytesToMsgCount(ctx, []byte(resStr)) +} + func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) { resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result() if errors.Is(err, redis.Nil) { @@ -473,6 +487,17 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } // Was, and still is, the active sequencer + if c.config.DeleteFinalizedMsgs { + // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) + } else if finalized == 0 { + log.Warn("SyncMonitor returned zero finalizedMessageCount") + } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { + log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + } + } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) { @@ -492,6 +517,62 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } +func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { + deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error { + if len(keys) > 0 { + // To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations + // next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower. + // In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized + batchDeleteCount := 1000 + for i := len(keys); i > 0; i -= batchDeleteCount { + if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil { + return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + } + } + } + finalizedBytes, err := c.msgCountToSignedBytes(finalized) + if err != nil { + return err + } + if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil { + return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err) + } + return nil + } + prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) + if errors.Is(err, redis.Nil) { + var keys []string + for msg := finalized - 1; msg > 0; msg-- { + exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() + if err != nil { + // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another + return err + } + if exists == 0 { + break + } + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) + return deleteMsgsAndUpdateFinalizedMsgCount(keys) + } else if err != nil { + return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) + } + remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client) + if err != nil { + return fmt.Errorf("cannot get remote message count: %w", err) + } + msgToDelete := min(finalized, remoteMsgCount) + if prevFinalized < msgToDelete { + var keys []string + for msg := prevFinalized; msg < msgToDelete; msg++ { + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + return deleteMsgsAndUpdateFinalizedMsgCount(keys) + } + return nil +} + func (c *SeqCoordinator) update(ctx context.Context) time.Duration { chosenSeq, err := c.RecommendSequencerWantingLockout(ctx) if err != nil { @@ -522,19 +603,24 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Error("cannot read message count", "err", err) return c.config.UpdateInterval } + remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) + if err != nil { + loglevel := log.Error + if errors.Is(err, redis.Nil) { + loglevel = log.Debug + } + loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) + } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { log.Warn("cannot get remote message count", "err", err) return c.retryAfterRedisError() } - readUntil := remoteMsgCount - if readUntil > localMsgCount+c.config.MsgPerPoll { - readUntil = localMsgCount + c.config.MsgPerPoll - } + readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount) var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error - for msgToRead < readUntil { + for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { var resString string resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() if msgReadErr != nil { diff --git a/arbnode/seq_coordinator_atomic_test.go b/arbnode/seq_coordinator_test.go similarity index 57% rename from arbnode/seq_coordinator_atomic_test.go rename to arbnode/seq_coordinator_test.go index 9b9d9dea81..6498543f3a 100644 --- a/arbnode/seq_coordinator_atomic_test.go +++ b/arbnode/seq_coordinator_test.go @@ -156,3 +156,94 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { } } + +func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + coordConfig := TestSeqCoordinatorConfig + coordConfig.LockoutDuration = time.Millisecond * 100 + coordConfig.LockoutSpare = time.Millisecond * 10 + coordConfig.Signer.ECDSA.AcceptSequencer = false + coordConfig.Signer.SymmetricFallback = true + coordConfig.Signer.SymmetricSign = true + coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true + coordConfig.Signer.Symmetric.SigningKey = "" + + nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil) + Require(t, err) + + redisUrl := redisutil.CreateTestRedis(ctx, t) + coordConfig.RedisUrl = redisUrl + + config := coordConfig + config.MyUrl = "test" + redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl) + Require(t, err) + coordinator := &SeqCoordinator{ + RedisCoordinator: *redisCoordinator, + config: config, + signer: nullSigner, + } + + // Add messages to redis + var keys []string + msgBytes, err := coordinator.msgCountToSignedBytes(0) + Require(t, err) + for i := arbutil.MessageIndex(1); i <= 10; i++ { + err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err() + Require(t, err) + err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err() + Require(t, err) + keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i)) + } + // Set msgCount key + msgCountBytes, err := coordinator.msgCountToSignedBytes(11) + Require(t, err) + err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err() + Require(t, err) + exists, err := coordinator.Client.Exists(ctx, keys...).Result() + Require(t, err) + if exists != 20 { + t.Fatal("couldn't find all messages and signatures in redis") + } + + // Set finalizedMsgCount and delete finalized messages + err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5) + Require(t, err) + + // Check if messages and signatures were deleted successfully + exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result() + Require(t, err) + if exists != 0 { + t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted") + } + + // Check if finalizedMsgCount was set to correct value + finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx) + Require(t, err) + if finalized != 5 { + t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized) + } + + // Try deleting finalized messages when theres already a finalizedMsgCount + err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7) + Require(t, err) + exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result() + Require(t, err) + if exists != 0 { + t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted") + } + finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx) + Require(t, err) + if finalized != 7 { + t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized) + } + + // Check that non-finalized messages are still available in redis + exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result() + Require(t, err) + if exists != 8 { + t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available") + } +} diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index d3b9a7e1c6..5ab1ede2d6 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -72,6 +72,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex { return s.syncTarget } +func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + if s.inboxReader != nil && s.inboxReader.l1Reader != nil { + return s.inboxReader.GetFinalizedMsgCount(ctx) + } + return 0, nil +} + func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { msgCount, err := s.txStreamer.GetMessageCount() if err != nil { diff --git a/cmd/chaininfo/arbitrum_chain_info.json b/cmd/chaininfo/arbitrum_chain_info.json index 7d47d13e84..524433a7b5 100644 --- a/cmd/chaininfo/arbitrum_chain_info.json +++ b/cmd/chaininfo/arbitrum_chain_info.json @@ -164,7 +164,7 @@ "EnableArbOS": true, "AllowDebugPrecompiles": true, "DataAvailabilityCommittee": false, - "InitialArbOSVersion": 11, + "InitialArbOSVersion": 31, "InitialChainOwner": "0x0000000000000000000000000000000000000000", "GenesisBlockNum": 0 } @@ -196,7 +196,7 @@ "EnableArbOS": true, "AllowDebugPrecompiles": true, "DataAvailabilityCommittee": true, - "InitialArbOSVersion": 11, + "InitialArbOSVersion": 31, "InitialChainOwner": "0x0000000000000000000000000000000000000000", "GenesisBlockNum": 0 } diff --git a/cmd/conf/database.go b/cmd/conf/database.go index a75cca77d5..af18bacd57 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -43,7 +43,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database") f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened") f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use. If set to empty string the database type will be autodetected and if no pre-existing database is found it will default to creating new pebble database ('leveldb', 'pebble' or '' = auto-detect)") - PebbleConfigAddOptions(prefix+".pebble", f) + PebbleConfigAddOptions(prefix+".pebble", f, &PersistentConfigDefault.Pebble) } func (c *PersistentConfig) ResolveDirectoryNames() error { @@ -120,9 +120,9 @@ var PebbleConfigDefault = PebbleConfig{ Experimental: PebbleExperimentalConfigDefault, } -func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions") - PebbleExperimentalConfigAddOptions(prefix+".experimental", f) +func PebbleConfigAddOptions(prefix string, f *flag.FlagSet, defaultConfig *PebbleConfig) { + f.Int(prefix+".max-concurrent-compactions", defaultConfig.MaxConcurrentCompactions, "maximum number of concurrent compactions") + PebbleExperimentalConfigAddOptions(prefix+".experimental", f, &defaultConfig.Experimental) } func (c *PebbleConfig) Validate() error { @@ -189,29 +189,29 @@ var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{ ForceWriterParallelism: false, } -func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".bytes-per-sync", PebbleExperimentalConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background") - f.Int(prefix+".l0-compaction-file-threshold", PebbleExperimentalConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction") - f.Int(prefix+".l0-compaction-threshold", PebbleExperimentalConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction") - f.Int(prefix+".l0-stop-writes-threshold", PebbleExperimentalConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached") - f.Int64(prefix+".l-base-max-bytes", PebbleExperimentalConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") - f.Int(prefix+".mem-table-stop-writes-threshold", PebbleExperimentalConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") - f.Bool(prefix+".disable-automatic-compactions", PebbleExperimentalConfigDefault.DisableAutomaticCompactions, "disables automatic compactions") - f.Int(prefix+".wal-bytes-per-sync", PebbleExperimentalConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the background") - f.String(prefix+".wal-dir", PebbleExperimentalConfigDefault.WALDir, "absolute path of directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") - f.Int(prefix+".wal-min-sync-interval", PebbleExperimentalConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") - f.Int(prefix+".target-byte-deletion-rate", PebbleExperimentalConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") - f.Int(prefix+".block-size", PebbleExperimentalConfigDefault.BlockSize, "target uncompressed size in bytes of each table block") - f.Int(prefix+".index-block-size", PebbleExperimentalConfigDefault.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32)) - f.Int64(prefix+".target-file-size", PebbleExperimentalConfigDefault.TargetFileSize, "target file size for the level 0") - f.Bool(prefix+".target-file-size-equal-levels", PebbleExperimentalConfigDefault.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1") +func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet, defaultConfig *PebbleExperimentalConfig) { + f.Int(prefix+".bytes-per-sync", defaultConfig.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background") + f.Int(prefix+".l0-compaction-file-threshold", defaultConfig.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction") + f.Int(prefix+".l0-compaction-threshold", defaultConfig.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction") + f.Int(prefix+".l0-stop-writes-threshold", defaultConfig.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached") + f.Int64(prefix+".l-base-max-bytes", defaultConfig.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.") + f.Int(prefix+".mem-table-stop-writes-threshold", defaultConfig.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables") + f.Bool(prefix+".disable-automatic-compactions", defaultConfig.DisableAutomaticCompactions, "disables automatic compactions") + f.Int(prefix+".wal-bytes-per-sync", defaultConfig.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the background") + f.String(prefix+".wal-dir", defaultConfig.WALDir, "absolute path of directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables") + f.Int(prefix+".wal-min-sync-interval", defaultConfig.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.") + f.Int(prefix+".target-byte-deletion-rate", defaultConfig.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).") + f.Int(prefix+".block-size", defaultConfig.BlockSize, "target uncompressed size in bytes of each table block") + f.Int(prefix+".index-block-size", defaultConfig.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32)) + f.Int64(prefix+".target-file-size", defaultConfig.TargetFileSize, "target file size for the level 0") + f.Bool(prefix+".target-file-size-equal-levels", defaultConfig.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1") - f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions.") - f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen.") - f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate") - f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).") - f.Int(prefix+".max-writer-concurrency", PebbleExperimentalConfigDefault.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.") - f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.") + f.Int(prefix+".l0-compaction-concurrency", defaultConfig.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions.") + f.Uint64(prefix+".compaction-debt-concurrency", defaultConfig.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen.") + f.Int64(prefix+".read-compaction-rate", defaultConfig.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate") + f.Int64(prefix+".read-sampling-multiplier", defaultConfig.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).") + f.Int(prefix+".max-writer-concurrency", defaultConfig.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.") + f.Bool(prefix+".force-writer-parallelism", defaultConfig.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.") } func (c *PebbleExperimentalConfig) Validate() error { diff --git a/cmd/conf/init.go b/cmd/conf/init.go index a3b5504077..d88bcdd241 100644 --- a/cmd/conf/init.go +++ b/cmd/conf/init.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/execution/gethexec" "github.com/spf13/pflag" ) @@ -55,7 +54,7 @@ var InitConfigDefault = InitConfig{ Prune: "", PruneBloomSize: 2048, PruneThreads: runtime.NumCPU(), - PruneTrieCleanCache: gethexec.DefaultCachingConfig.TrieCleanCache, + PruneTrieCleanCache: 600, RecreateMissingStateFrom: 0, // 0 = disabled RebuildLocalWasm: true, ReorgToBatch: -1, diff --git a/cmd/dbconv/dbconv/config.go b/cmd/dbconv/dbconv/config.go new file mode 100644 index 0000000000..74623bc264 --- /dev/null +++ b/cmd/dbconv/dbconv/config.go @@ -0,0 +1,95 @@ +package dbconv + +import ( + "errors" + "fmt" + + "github.com/offchainlabs/nitro/cmd/conf" + "github.com/offchainlabs/nitro/cmd/genericconf" + flag "github.com/spf13/pflag" +) + +type DBConfig struct { + Data string `koanf:"data"` + DBEngine string `koanf:"db-engine"` + Handles int `koanf:"handles"` + Cache int `koanf:"cache"` + Namespace string `koanf:"namespace"` + Pebble conf.PebbleConfig `koanf:"pebble"` +} + +var DBConfigDefaultDst = DBConfig{ + DBEngine: "pebble", + Handles: conf.PersistentConfigDefault.Handles, + Cache: 2048, // 2048 MB + Namespace: "dstdb/", + Pebble: conf.PebbleConfigDefault, +} + +var DBConfigDefaultSrc = DBConfig{ + DBEngine: "leveldb", + Handles: conf.PersistentConfigDefault.Handles, + Cache: 2048, // 2048 MB + Namespace: "srcdb/", +} + +func DBConfigAddOptions(prefix string, f *flag.FlagSet, defaultConfig *DBConfig) { + f.String(prefix+".data", defaultConfig.Data, "directory of stored chain state") + f.String(prefix+".db-engine", defaultConfig.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')") + f.Int(prefix+".handles", defaultConfig.Handles, "number of files to be open simultaneously") + f.Int(prefix+".cache", defaultConfig.Cache, "the capacity(in megabytes) of the data caching") + f.String(prefix+".namespace", defaultConfig.Namespace, "metrics namespace") + conf.PebbleConfigAddOptions(prefix+".pebble", f, &defaultConfig.Pebble) +} + +type DBConvConfig struct { + Src DBConfig `koanf:"src"` + Dst DBConfig `koanf:"dst"` + IdealBatchSize int `koanf:"ideal-batch-size"` + Convert bool `koanf:"convert"` + Compact bool `koanf:"compact"` + Verify string `koanf:"verify"` + LogLevel string `koanf:"log-level"` + LogType string `koanf:"log-type"` + Metrics bool `koanf:"metrics"` + MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` +} + +var DefaultDBConvConfig = DBConvConfig{ + Src: DBConfigDefaultSrc, + Dst: DBConfigDefaultDst, + IdealBatchSize: 100 * 1024 * 1024, // 100 MB + Convert: false, + Compact: false, + Verify: "", + LogLevel: "INFO", + LogType: "plaintext", + Metrics: false, + MetricsServer: genericconf.MetricsServerConfigDefault, +} + +func DBConvConfigAddOptions(f *flag.FlagSet) { + DBConfigAddOptions("src", f, &DefaultDBConvConfig.Src) + DBConfigAddOptions("dst", f, &DefaultDBConvConfig.Dst) + f.Int("ideal-batch-size", DefaultDBConvConfig.IdealBatchSize, "ideal write batch size") + f.Bool("convert", DefaultDBConvConfig.Convert, "enables conversion step") + f.Bool("compact", DefaultDBConvConfig.Compact, "enables compaction step") + f.String("verify", DefaultDBConvConfig.Verify, "enables verification step (\"\" = disabled, \"keys\" = only keys, \"full\" = keys and values)") + f.String("log-level", DefaultDBConvConfig.LogLevel, "log level, valid values are CRIT, ERROR, WARN, INFO, DEBUG, TRACE") + f.String("log-type", DefaultDBConvConfig.LogType, "log type (plaintext or json)") + f.Bool("metrics", DefaultDBConvConfig.Metrics, "enable metrics") + genericconf.MetricsServerAddOptions("metrics-server", f) +} + +func (c *DBConvConfig) Validate() error { + if c.Verify != "keys" && c.Verify != "full" && c.Verify != "" { + return fmt.Errorf("Invalid verify mode: %v", c.Verify) + } + if !c.Convert && c.Verify == "" && !c.Compact { + return errors.New("nothing to be done, conversion, verification and compaction disabled") + } + if c.IdealBatchSize <= 0 { + return fmt.Errorf("Invalid ideal batch size: %d, has to be greater then 0", c.IdealBatchSize) + } + return nil +} diff --git a/cmd/dbconv/dbconv/dbconv.go b/cmd/dbconv/dbconv/dbconv.go new file mode 100644 index 0000000000..6a97df31c0 --- /dev/null +++ b/cmd/dbconv/dbconv/dbconv.go @@ -0,0 +1,172 @@ +package dbconv + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/util/dbutil" +) + +type DBConverter struct { + config *DBConvConfig + stats Stats +} + +func NewDBConverter(config *DBConvConfig) *DBConverter { + return &DBConverter{ + config: config, + } +} + +func openDB(config *DBConfig, name string, readonly bool) (ethdb.Database, error) { + db, err := rawdb.Open(rawdb.OpenOptions{ + Type: config.DBEngine, + Directory: config.Data, + // we don't open freezer, it doesn't need to be converted as it has format independent of db-engine + // note: user needs to handle copying/moving the ancient directory + AncientsDirectory: "", + Namespace: config.Namespace, + Cache: config.Cache, + Handles: config.Handles, + ReadOnly: readonly, + PebbleExtraOptions: config.Pebble.ExtraOptions(name), + }) + if err != nil { + return nil, err + } + if err := dbutil.UnfinishedConversionCheck(db); err != nil { + if closeErr := db.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + return nil, err + } + + return db, nil +} + +func (c *DBConverter) Convert(ctx context.Context) error { + var err error + src, err := openDB(&c.config.Src, "src", true) + if err != nil { + return err + } + defer src.Close() + dst, err := openDB(&c.config.Dst, "dst", false) + if err != nil { + return err + } + defer dst.Close() + c.stats.Reset() + log.Info("Converting database", "src", c.config.Src.Data, "dst", c.config.Dst.Data, "db-engine", c.config.Dst.DBEngine) + if err = dbutil.PutUnfinishedConversionCanary(dst); err != nil { + return err + } + it := src.NewIterator(nil, nil) + defer it.Release() + batch := dst.NewBatch() + entriesInBatch := 0 + for it.Next() && ctx.Err() == nil { + if err = batch.Put(it.Key(), it.Value()); err != nil { + return err + } + entriesInBatch++ + if batchSize := batch.ValueSize(); batchSize >= c.config.IdealBatchSize { + if err = batch.Write(); err != nil { + return err + } + c.stats.LogEntries(int64(entriesInBatch)) + c.stats.LogBytes(int64(batchSize)) + batch.Reset() + entriesInBatch = 0 + } + } + if err = ctx.Err(); err == nil { + batchSize := batch.ValueSize() + if err = batch.Write(); err != nil { + return err + } + c.stats.LogEntries(int64(entriesInBatch)) + c.stats.LogBytes(int64(batchSize)) + } + if err == nil { + if err = dbutil.DeleteUnfinishedConversionCanary(dst); err != nil { + return err + } + } + return err +} + +func (c *DBConverter) CompactDestination() error { + dst, err := openDB(&c.config.Dst, "dst", false) + if err != nil { + return err + } + defer dst.Close() + start := time.Now() + log.Info("Compacting destination database", "dst", c.config.Dst.Data) + if err := dst.Compact(nil, nil); err != nil { + return err + } + log.Info("Compaction done", "elapsed", time.Since(start)) + return nil +} + +func (c *DBConverter) Verify(ctx context.Context) error { + if c.config.Verify == "keys" { + log.Info("Starting quick verification - verifying only keys existence") + } else if c.config.Verify == "full" { + log.Info("Starting full verification - verifying keys and values") + } + var err error + src, err := openDB(&c.config.Src, "src", true) + if err != nil { + return err + } + defer src.Close() + + dst, err := openDB(&c.config.Dst, "dst", true) + if err != nil { + return err + } + defer dst.Close() + + c.stats.Reset() + it := src.NewIterator(nil, nil) + defer it.Release() + for it.Next() && ctx.Err() == nil { + switch c.config.Verify { + case "keys": + has, err := dst.Has(it.Key()) + if err != nil { + return fmt.Errorf("Failed to check key existence in destination db, key: %v, err: %w", it.Key(), err) + } + if !has { + return fmt.Errorf("Missing key in destination db, key: %v", it.Key()) + } + c.stats.LogBytes(int64(len(it.Key()))) + case "full": + dstValue, err := dst.Get(it.Key()) + if err != nil { + return err + } + if !bytes.Equal(dstValue, it.Value()) { + return fmt.Errorf("Value mismatch for key: %v, src value: %v, dst value: %s", it.Key(), it.Value(), dstValue) + } + c.stats.LogBytes(int64(len(it.Key()) + len(dstValue))) + default: + return fmt.Errorf("Invalid verify config value: %v", c.config.Verify) + } + c.stats.LogEntries(1) + } + return ctx.Err() +} + +func (c *DBConverter) Stats() *Stats { + return &c.stats +} diff --git a/cmd/dbconv/dbconv/dbconv_test.go b/cmd/dbconv/dbconv/dbconv_test.go new file mode 100644 index 0000000000..f31dd68618 --- /dev/null +++ b/cmd/dbconv/dbconv/dbconv_test.go @@ -0,0 +1,72 @@ +package dbconv + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/util/testhelpers" +) + +func TestConversion(t *testing.T) { + _ = testhelpers.InitTestLog(t, log.LvlTrace) + oldDBConfig := DBConfigDefaultSrc + oldDBConfig.Data = t.TempDir() + + newDBConfig := DBConfigDefaultDst + newDBConfig.Data = t.TempDir() + + func() { + oldDb, err := openDB(&oldDBConfig, "", false) + defer oldDb.Close() + Require(t, err) + err = oldDb.Put([]byte{}, []byte{0xde, 0xed, 0xbe, 0xef}) + Require(t, err) + for i := 0; i < 20; i++ { + err = oldDb.Put([]byte{byte(i)}, []byte{byte(i + 1)}) + Require(t, err) + } + }() + + config := DefaultDBConvConfig + config.Src = oldDBConfig + config.Dst = newDBConfig + config.IdealBatchSize = 5 + config.Verify = "full" + conv := NewDBConverter(&config) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := conv.Convert(ctx) + Require(t, err) + + err = conv.Verify(ctx) + Require(t, err) + + // check if new database doesn't have any extra keys + oldDb, err := openDB(&oldDBConfig, "", true) + Require(t, err) + defer oldDb.Close() + newDb, err := openDB(&newDBConfig, "", true) + Require(t, err) + defer newDb.Close() + it := newDb.NewIterator(nil, nil) + defer it.Release() + for it.Next() { + has, err := oldDb.Has(it.Key()) + Require(t, err) + if !has { + Fail(t, "Unexpected key in the converted db, key:", it.Key()) + } + } +} + +func Require(t *testing.T, err error, printables ...interface{}) { + t.Helper() + testhelpers.RequireImpl(t, err, printables...) +} + +func Fail(t *testing.T, printables ...interface{}) { + t.Helper() + testhelpers.FailImpl(t, printables...) +} diff --git a/cmd/dbconv/dbconv/stats.go b/cmd/dbconv/dbconv/stats.go new file mode 100644 index 0000000000..729a408f38 --- /dev/null +++ b/cmd/dbconv/dbconv/stats.go @@ -0,0 +1,96 @@ +package dbconv + +import ( + "sync/atomic" + "time" +) + +type Stats struct { + entries atomic.Int64 + bytes atomic.Int64 + + startTimestamp int64 + prevEntries int64 + prevBytes int64 + prevEntriesTimestamp int64 + prevBytesTimestamp int64 +} + +func (s *Stats) Reset() { + now := time.Now().UnixNano() + s.entries.Store(0) + s.bytes.Store(0) + s.startTimestamp = now + s.prevEntries = 0 + s.prevBytes = 0 + s.prevEntriesTimestamp = now + s.prevBytesTimestamp = now +} + +func (s *Stats) LogEntries(entries int64) { + s.entries.Add(entries) +} + +func (s *Stats) Entries() int64 { + return s.entries.Load() +} + +func (s *Stats) LogBytes(bytes int64) { + s.bytes.Add(bytes) +} + +func (s *Stats) Bytes() int64 { + return s.bytes.Load() +} + +func (s *Stats) Elapsed() time.Duration { + now := time.Now().UnixNano() + dt := now - s.startTimestamp + return time.Duration(dt) +} + +// not thread safe vs itself +func (s *Stats) EntriesPerSecond() float64 { + now := time.Now().UnixNano() + current := s.Entries() + dt := now - s.prevEntriesTimestamp + if dt == 0 { + dt = 1 + } + de := current - s.prevEntries + s.prevEntries = current + s.prevEntriesTimestamp = now + return float64(de) * 1e9 / float64(dt) +} + +// not thread safe vs itself +func (s *Stats) BytesPerSecond() float64 { + now := time.Now().UnixNano() + current := s.Bytes() + dt := now - s.prevBytesTimestamp + if dt == 0 { + dt = 1 + } + db := current - s.prevBytes + s.prevBytes = current + s.prevBytesTimestamp = now + return float64(db) * 1e9 / float64(dt) +} + +func (s *Stats) AverageEntriesPerSecond() float64 { + now := time.Now().UnixNano() + dt := now - s.startTimestamp + if dt == 0 { + dt = 1 + } + return float64(s.Entries()) * 1e9 / float64(dt) +} + +func (s *Stats) AverageBytesPerSecond() float64 { + now := time.Now().UnixNano() + dt := now - s.startTimestamp + if dt == 0 { + dt = 1 + } + return float64(s.Bytes()) * 1e9 / float64(dt) +} diff --git a/cmd/dbconv/main.go b/cmd/dbconv/main.go new file mode 100644 index 0000000000..c0b5c8f8e4 --- /dev/null +++ b/cmd/dbconv/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/exp" + "github.com/offchainlabs/nitro/cmd/dbconv/dbconv" + "github.com/offchainlabs/nitro/cmd/genericconf" + "github.com/offchainlabs/nitro/cmd/util/confighelpers" + flag "github.com/spf13/pflag" +) + +func parseDBConv(args []string) (*dbconv.DBConvConfig, error) { + f := flag.NewFlagSet("dbconv", flag.ContinueOnError) + dbconv.DBConvConfigAddOptions(f) + k, err := confighelpers.BeginCommonParse(f, args) + if err != nil { + return nil, err + } + var config dbconv.DBConvConfig + if err := confighelpers.EndCommonParse(k, &config); err != nil { + return nil, err + } + return &config, config.Validate() +} + +func printSampleUsage(name string) { + fmt.Printf("Sample usage: %s --help \n\n", name) +} + +func printProgress(conv *dbconv.DBConverter) { + stats := conv.Stats() + fmt.Printf("Progress:\n") + fmt.Printf("\tprocessed entries: %d\n", stats.Entries()) + fmt.Printf("\tprocessed data (MB): %d\n", stats.Bytes()/1024/1024) + fmt.Printf("\telapsed:\t%v\n", stats.Elapsed()) + fmt.Printf("\tcurrent:\t%.3e entries/s\t%.3f MB/s\n", stats.EntriesPerSecond()/1000, stats.BytesPerSecond()/1024/1024) + fmt.Printf("\taverage:\t%.3e entries/s\t%.3f MB/s\n", stats.AverageEntriesPerSecond()/1000, stats.AverageBytesPerSecond()/1024/1024) +} + +func main() { + args := os.Args[1:] + config, err := parseDBConv(args) + if err != nil { + confighelpers.PrintErrorAndExit(err, printSampleUsage) + } + + err = genericconf.InitLog(config.LogType, config.LogLevel, &genericconf.FileLoggingConfig{Enable: false}, nil) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing logging: %v\n", err) + os.Exit(1) + } + + if config.Metrics { + go metrics.CollectProcessMetrics(config.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", config.MetricsServer.Addr, config.MetricsServer.Port)) + } + + conv := dbconv.NewDBConverter(config) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ticker := time.NewTicker(10 * time.Second) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + printProgress(conv) + case <-ctx.Done(): + return + } + } + }() + + if config.Convert { + err = conv.Convert(ctx) + if err != nil { + log.Error("Conversion error", "err", err) + os.Exit(1) + } + stats := conv.Stats() + log.Info("Conversion finished.", "entries", stats.Entries(), "MB", stats.Bytes()/1024/1024, "avg Ke/s", stats.AverageEntriesPerSecond()/1000, "avg MB/s", stats.AverageBytesPerSecond()/1024/1024, "elapsed", stats.Elapsed()) + } + + if config.Compact { + ticker.Stop() + err = conv.CompactDestination() + if err != nil { + log.Error("Compaction error", "err", err) + os.Exit(1) + } + } + + if config.Verify != "" { + ticker.Reset(10 * time.Second) + err = conv.Verify(ctx) + if err != nil { + log.Error("Verification error", "err", err) + os.Exit(1) + } + stats := conv.Stats() + log.Info("Verification completed successfully.", "elapsed", stats.Elapsed()) + } +} diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index a958572458..c364da5932 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -46,6 +46,7 @@ import ( "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/statetransfer" "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/dbutil" ) var notFoundError = errors.New("file not found") @@ -396,16 +397,6 @@ func checkEmptyDatabaseDir(dir string, force bool) error { return nil } -var pebbleNotExistErrorRegex = regexp.MustCompile("pebble: database .* does not exist") - -func isPebbleNotExistError(err error) bool { - return pebbleNotExistErrorRegex.MatchString(err.Error()) -} - -func isLeveldbNotExistError(err error) bool { - return os.IsNotExist(err) -} - func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, config.Persistent.Ancient, "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil { @@ -418,10 +409,16 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if err != nil { return nil, nil, err } + if err := dbutil.UnfinishedConversionCheck(chainData); err != nil { + return nil, nil, fmt.Errorf("l2chaindata unfinished database conversion check error: %w", err) + } wasmDb, err := stack.OpenDatabaseWithExtraOptions("wasm", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, "wasm/", false, persistentConfig.Pebble.ExtraOptions("wasm")) if err != nil { return nil, nil, err } + if err := dbutil.UnfinishedConversionCheck(wasmDb); err != nil { + return nil, nil, fmt.Errorf("wasm unfinished database conversion check error: %w", err) + } chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb, 1) _, err = rawdb.ParseStateScheme(cacheConfig.StateScheme, chainDb) if err != nil { @@ -480,8 +477,8 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo return chainDb, l2BlockChain, nil } readOnlyDb.Close() - } else if !isLeveldbNotExistError(err) && !isPebbleNotExistError(err) { - // we only want to continue if the error is pebble or leveldb not exist error + } else if !dbutil.IsNotExistError(err) { + // we only want to continue if the database does not exist return nil, nil, fmt.Errorf("Failed to open database: %w", err) } } diff --git a/cmd/nitro/init_test.go b/cmd/nitro/init_test.go index 0797ac9b46..95a4b208d4 100644 --- a/cmd/nitro/init_test.go +++ b/cmd/nitro/init_test.go @@ -286,38 +286,6 @@ func startFileServer(t *testing.T, ctx context.Context, dir string) string { return addr } -func testIsNotExistError(t *testing.T, dbEngine string, isNotExist func(error) bool) { - stackConf := node.DefaultConfig - stackConf.DataDir = t.TempDir() - stackConf.DBEngine = dbEngine - stack, err := node.New(&stackConf) - if err != nil { - t.Fatalf("Failed to created test stack: %v", err) - } - defer stack.Close() - readonly := true - _, err = stack.OpenDatabaseWithExtraOptions("test", 16, 16, "", readonly, nil) - if err == nil { - t.Fatal("Opening non-existent database did not fail") - } - if !isNotExist(err) { - t.Fatalf("Failed to classify error as not exist error - internal implementation of OpenDatabaseWithExtraOptions might have changed, err: %v", err) - } - err = errors.New("some other error") - if isNotExist(err) { - t.Fatalf("Classified other error as not exist, err: %v", err) - } -} - -func TestIsNotExistError(t *testing.T) { - t.Run("TestIsPebbleNotExistError", func(t *testing.T) { - testIsNotExistError(t, "pebble", isPebbleNotExistError) - }) - t.Run("TestIsLeveldbNotExistError", func(t *testing.T) { - testIsNotExistError(t, "leveldb", isLeveldbNotExistError) - }) -} - func TestEmptyDatabaseDir(t *testing.T) { testCases := []struct { name string diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 2c7d07cf3b..ab6bf3181d 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -62,6 +62,7 @@ import ( "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/staker/validatorwallet" "github.com/offchainlabs/nitro/util/colors" + "github.com/offchainlabs/nitro/util/dbutil" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/iostat" "github.com/offchainlabs/nitro/util/rpcclient" @@ -237,6 +238,10 @@ func mainImpl() int { if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer { log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer) } + if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable { + log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url") + return 1 + } var dataSigner signature.DataSignerFunc var l1TransactionOptsValidator *bind.TransactOpts @@ -494,6 +499,10 @@ func mainImpl() int { log.Error("database is corrupt; delete it and try again", "database-directory", stack.InstanceDir()) return 1 } + if err := dbutil.UnfinishedConversionCheck(arbDb); err != nil { + log.Error("arbitrumdata unfinished conversion check error", "err", err) + return 1 + } fatalErrChan := make(chan error, 10) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 6624188cbd..af40b4b3f7 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -22,6 +22,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" + "github.com/offchainlabs/nitro/util/dbutil" "github.com/offchainlabs/nitro/util/headerreader" flag "github.com/spf13/pflag" ) @@ -181,11 +182,16 @@ func CreateExecutionNode( var classicOutbox *ClassicOutboxRetriever if l2BlockChain.Config().ArbitrumChainParams.GenesisBlockNum > 0 { - classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true) // TODO can we skip using ExtraOptions here? - if err != nil { + classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true) + if dbutil.IsNotExistError(err) { log.Warn("Classic Msg Database not found", "err", err) classicOutbox = nil + } else if err != nil { + return nil, fmt.Errorf("Failed to open classic-msg database: %w", err) } else { + if err := dbutil.UnfinishedConversionCheck(classicMsgDb); err != nil { + return nil, fmt.Errorf("classic-msg unfinished database conversion check error: %w", err) + } classicOutbox = NewClassicOutboxRetriever(classicMsgDb) } } diff --git a/scripts/convert-databases.bash b/scripts/convert-databases.bash new file mode 100755 index 0000000000..bd898c2c98 --- /dev/null +++ b/scripts/convert-databases.bash @@ -0,0 +1,278 @@ +#!/usr/bin/env bash + +DEFAULT_DBCONV=/usr/local/bin/dbconv +DEFAULT_SRC=/home/user/.arbitrum/arb1/nitro + +dbconv=$DEFAULT_DBCONV +src=$DEFAULT_SRC +dst= +force=false +skip_existing=false +clean="failed" + +l2chaindata_status="not started" +l2chaindata_ancient_status="not started" +arbitrumdata_status="not started" +wasm_status="not started" +classicmsg_status="not started" + +checkMissingValue () { + if [[ $1 -eq 0 || $2 == -* ]]; then + echo "missing $3 argument value" + exit 1 + fi +} + +printStatus() { + echo "== Conversion status:" + echo " l2chaindata database: $l2chaindata_status" + echo " l2chaindata database freezer (ancient): $l2chaindata_ancient_status" + echo " arbitrumdata database: $arbitrumdata_status" + echo " wasm database: $wasm_status" + echo " classic-msg database: $classicmsg_status" +} + +printUsage() { +echo Usage: $0 \[OPTIONS..\] + echo + echo OPTIONS: + echo "--dbconv dbconv binary path (default: \"$DEFAULT_DBCONV\")" + echo "--src directory containing source databases (default: \"$DEFAULT_SRC\")" + echo "--dst destination directory" + echo "--force remove destination directory if it exists" + echo "--skip-existing skip convertion of databases which directories already exist in the destination directory" + echo "--clean sets what should be removed in case of error, possible values:" + echo " \"failed\" - remove database which conversion failed (default)" + echo " \"none\" - remove nothing, leave unfinished and potentially corrupted databases" + echo " \"all\" - remove whole destination directory" +} + +removeDir() { + cmd="rm -r \"$1\"" + echo $cmd + eval $cmd + return $? +} + +cleanup() { + case $clean in + all) + echo "== Removing destination directory" + removeDir "$dst" + ;; + failed) + echo "== Note: removing only failed destination directory" + dstdir=$(echo $dst/$1 | tr -s /) + removeDir "$dstdir" + ;; + none) + echo "== Warning: not removing destination directories, the destination databases might be incomplete and/or corrupted!" + ;; + *) + # shouldn't happen + echo "Script error, invalid --clean flag value: $clean" + exit 1 + ;; + + esac +} + +while [[ $# -gt 0 ]]; do + case $1 in + --dbconv) + shift + checkMissingValue $# "$1" "--dbconv" + dbconv=$1 + shift + ;; + --src) + shift + checkMissingValue $# "$1" "--src" + src=$1 + shift + ;; + --dst) + shift + checkMissingValue $# "$1" "--dst" + dst=$1 + shift + ;; + --force) + force=true + shift + ;; + --skip-existing) + skip_existing=true + shift + ;; + --clean) + shift + checkMissingValue $# "$1" "--clean" + clean=$1 + shift + ;; + --help) + printUsage + exit 0 + ;; + *) + printUsage + exit 0 + esac +done + +if $force && $skip_existing; then + echo Error: Cannot use both --force and --skipexisting + printUsage + exit 1 +fi + +if [ $clean != "all" ] && [ $clean != "failed" ] && [ $clean != "none" ] ; then + echo Error: Invalid --clean value: $clean + printUsage + exit 1 +fi + +if ! [ -e "$dbconv" ]; then + echo Error: Invalid dbconv binary path: "$dbconv" does not exist + exit 1 +fi + +if ! [ -n "$dst" ]; then + echo Error: Missing destination directory \(\-\-dst\) + printUsage + exit 1 +fi + +if ! [ -d "$src" ]; then + echo Error: Invalid source directory: \""$src"\" is missing + exit 1 +fi + +src=$(realpath "$src") + +if ! [ -d "$src"/l2chaindata ]; then + echo Error: Invalid source directory: \""$src"/l2chaindata\" is missing + exit 1 +fi + +if ! [ -d "$src"/l2chaindata/ancient ]; then + echo Error: Invalid source directory: \""$src"/l2chaindata/ancient\" is missing + exit 1 +fi + +if ! [ -d "$src"/arbitrumdata ]; then + echo Error: Invalid source directory: missing "$src/arbitrumdata" directory + exit 1 +fi + +if [ -e "$dst" ] && ! $skip_existing; then + if $force; then + echo == Warning! Destination already exists, --force is set, removing all files under path: "$dst" + removeDir "$dst" + if [ $? -ne 0 ]; then + echo Error: failed to remove "$dst" + exit 1 + fi + else + echo Error: invalid destination path: "$dst" already exists + exit 1 + fi +fi + +convert_result= +convert () { + srcdir=$(echo $src/$1 | tr -s /) + dstdir=$(echo $dst/$1 | tr -s /) + if ! [ -e $dstdir ]; then + echo "== Converting $1 db" + cmd="$dbconv --src.db-engine=leveldb --src.data \"$srcdir\" --dst.db-engine=pebble --dst.data \"$dstdir\" --convert --compact" + echo $cmd + eval $cmd + if [ $? -ne 0 ]; then + cleanup $1 + convert_result="FAILED" + return 1 + fi + convert_result="converted" + return 0 + else + if $skip_existing; then + echo "== Note: $dstdir directory already exists, skipping conversion (--skip-existing flag is set)" + convert_result="skipped" + return 0 + else + convert_result="FAILED ($dstdir already exists)" + return 1 + fi + fi +} + +convert "l2chaindata" +res=$? +l2chaindata_status=$convert_result +if [ $res -ne 0 ]; then + printStatus + exit 1 +fi + +if ! [ -e "$dst"/l2chaindata/ancient ]; then + ancient_src=$(echo "$src"/l2chaindata/ancient | tr -s /) + ancient_dst=$(echo "$dst"/l2chaindata/ | tr -s /) + echo "== Copying l2chaindata ancients" + cmd="cp -r \"$ancient_src\" \"$ancient_dst\"" + echo $cmd + eval $cmd + if [ $? -ne 0 ]; then + l2chaindata_ancient_status="FAILED (failed to copy)" + cleanup "l2chaindata" + printStatus + exit 1 + fi + l2chaindata_ancient_status="copied" +else + if $skip_existing; then + echo "== Note: l2chaindata/ancient directory already exists, skipping copy (--skip-existing flag is set)" + l2chaindata_ancient_status="skipped" + else + # unreachable, we already had to remove root directory + echo script error, reached unreachable + exit 1 + fi +fi + +convert "arbitrumdata" +res=$? +arbitrumdata_status=$convert_result +if [ $res -ne 0 ]; then + printStatus + exit 1 +fi + +if [ -e $src/wasm ]; then + convert "wasm" + res=$? + wasm_status=$convert_result + if [ $res -ne 0 ]; then + printStatus + exit 1 + fi +else + echo "== Note: Source directory does not contain wasm database." + wasm_status="not found in source directory" +fi + +if [ -e $src/classic-msg ]; then + convert "classic-msg" + res=$? + classicmsg_status=$convert_result + if [ $res -ne 0 ]; then + printStatus + exit 1 + fi +else + echo "== Note: Source directory does not contain classic-msg database." + classicmsg_status="not found in source directory" +fi + +printStatus diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 8ad0832633..e14eb45a27 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -458,7 +458,7 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { } b.L2.cleanup() - l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, b.L2Info, b.dataDir, b.chainConfig, &b.execConfig.Caching) + l2info, stack, chainDb, arbDb, blockchain := createL2BlockChainWithStackConfig(t, b.L2Info, b.dataDir, b.chainConfig, b.initMessage, b.l2StackConfig, &b.execConfig.Caching) execConfigFetcher := func() *gethexec.Config { return b.execConfig } execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher) diff --git a/system_tests/db_conversion_test.go b/system_tests/db_conversion_test.go new file mode 100644 index 0000000000..aca28262cb --- /dev/null +++ b/system_tests/db_conversion_test.go @@ -0,0 +1,125 @@ +package arbtest + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/trie" + "github.com/offchainlabs/nitro/cmd/dbconv/dbconv" + "github.com/offchainlabs/nitro/util/arbmath" +) + +func TestDatabaseConversion(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.l2StackConfig.DBEngine = "leveldb" + builder.l2StackConfig.Name = "testl2" + // currently only HashScheme supports archive mode + if builder.execConfig.Caching.StateScheme == rawdb.HashScheme { + builder.execConfig.Caching.Archive = true + } + cleanup := builder.Build(t) + dataDir := builder.dataDir + cleanupDone := false + defer func() { // TODO we should be able to call cleanup twice, rn it gets stuck then + if !cleanupDone { + cleanup() + } + }() + builder.L2Info.GenerateAccount("User2") + var txs []*types.Transaction + for i := uint64(0); i < 200; i++ { + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) + txs = append(txs, tx) + err := builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + } + for _, tx := range txs { + _, err := builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + } + block, err := builder.L2.Client.BlockByNumber(ctx, nil) + Require(t, err) + user2Balance := builder.L2.GetBalance(t, builder.L2Info.GetAddress("User2")) + ownerBalance := builder.L2.GetBalance(t, builder.L2Info.GetAddress("Owner")) + + cleanup() + cleanupDone = true + t.Log("stopped first node") + + instanceDir := filepath.Join(dataDir, builder.l2StackConfig.Name) + for _, dbname := range []string{"l2chaindata", "arbitrumdata", "wasm"} { + err := os.Rename(filepath.Join(instanceDir, dbname), filepath.Join(instanceDir, fmt.Sprintf("%s_old", dbname))) + Require(t, err) + t.Log("converting:", dbname) + convConfig := dbconv.DefaultDBConvConfig + convConfig.Src.Data = path.Join(instanceDir, fmt.Sprintf("%s_old", dbname)) + convConfig.Dst.Data = path.Join(instanceDir, dbname) + conv := dbconv.NewDBConverter(&convConfig) + err = conv.Convert(ctx) + Require(t, err) + } + + builder.l2StackConfig.DBEngine = "pebble" + builder.nodeConfig.ParentChainReader.Enable = false + builder.withL1 = false + builder.L2.cleanup = func() {} + builder.RestartL2Node(t) + t.Log("restarted the node") + + blockAfterRestart, err := builder.L2.Client.BlockByNumber(ctx, nil) + Require(t, err) + user2BalanceAfterRestart := builder.L2.GetBalance(t, builder.L2Info.GetAddress("User2")) + ownerBalanceAfterRestart := builder.L2.GetBalance(t, builder.L2Info.GetAddress("Owner")) + if block.Hash() != blockAfterRestart.Hash() { + t.Fatal("block hash mismatch") + } + if !arbmath.BigEquals(user2Balance, user2BalanceAfterRestart) { + t.Fatal("unexpected User2 balance, have:", user2BalanceAfterRestart, "want:", user2Balance) + } + if !arbmath.BigEquals(ownerBalance, ownerBalanceAfterRestart) { + t.Fatal("unexpected Owner balance, have:", ownerBalanceAfterRestart, "want:", ownerBalance) + } + + bc := builder.L2.ExecNode.Backend.ArbInterface().BlockChain() + current := bc.CurrentBlock() + if current == nil { + Fatal(t, "failed to get current block header") + } + triedb := bc.StateCache().TrieDB() + visited := 0 + i := uint64(0) + // don't query historical blocks when PathSchem is used + if builder.execConfig.Caching.StateScheme == rawdb.PathScheme { + i = current.Number.Uint64() + } + for ; i <= current.Number.Uint64(); i++ { + header := bc.GetHeaderByNumber(i) + _, err := bc.StateAt(header.Root) + Require(t, err) + tr, err := trie.New(trie.TrieID(header.Root), triedb) + Require(t, err) + it, err := tr.NodeIterator(nil) + Require(t, err) + for it.Next(true) { + visited++ + } + Require(t, it.Error()) + } + t.Log("visited nodes:", visited) + + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) + err = builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + _, err = builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + +} diff --git a/util/dbutil/dbutil.go b/util/dbutil/dbutil.go index a1eb6ce208..ca0f5aaaeb 100644 --- a/util/dbutil/dbutil.go +++ b/util/dbutil/dbutil.go @@ -5,8 +5,12 @@ package dbutil import ( "errors" + "fmt" + "os" + "regexp" "github.com/cockroachdb/pebble" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/syndtr/goleveldb/leveldb" ) @@ -14,3 +18,38 @@ import ( func IsErrNotFound(err error) bool { return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) || errors.Is(err, memorydb.ErrMemorydbNotFound) } + +var pebbleNotExistErrorRegex = regexp.MustCompile("pebble: database .* does not exist") + +func isPebbleNotExistError(err error) bool { + return pebbleNotExistErrorRegex.MatchString(err.Error()) +} + +func isLeveldbNotExistError(err error) bool { + return os.IsNotExist(err) +} + +func IsNotExistError(err error) bool { + return isLeveldbNotExistError(err) || isPebbleNotExistError(err) +} + +var unfinishedConversionCanaryKey = []byte("unfinished-conversion-canary-key") + +func PutUnfinishedConversionCanary(db ethdb.KeyValueStore) error { + return db.Put(unfinishedConversionCanaryKey, []byte{1}) +} + +func DeleteUnfinishedConversionCanary(db ethdb.KeyValueStore) error { + return db.Delete(unfinishedConversionCanaryKey) +} + +func UnfinishedConversionCheck(db ethdb.KeyValueStore) error { + unfinished, err := db.Has(unfinishedConversionCanaryKey) + if err != nil { + return fmt.Errorf("Failed to check UnfinishedConversionCanaryKey existence: %w", err) + } + if unfinished { + return errors.New("Unfinished conversion canary key detected") + } + return nil +} diff --git a/util/dbutil/dbutil_test.go b/util/dbutil/dbutil_test.go new file mode 100644 index 0000000000..b28f8a2c23 --- /dev/null +++ b/util/dbutil/dbutil_test.go @@ -0,0 +1,46 @@ +package dbutil + +import ( + "errors" + "testing" + + "github.com/ethereum/go-ethereum/node" +) + +func testIsNotExistError(t *testing.T, dbEngine string, isNotExist func(error) bool) { + stackConf := node.DefaultConfig + stackConf.DataDir = t.TempDir() + stackConf.DBEngine = dbEngine + stack, err := node.New(&stackConf) + if err != nil { + t.Fatalf("Failed to created test stack: %v", err) + } + defer stack.Close() + readonly := true + _, err = stack.OpenDatabaseWithExtraOptions("test", 16, 16, "", readonly, nil) + if err == nil { + t.Fatal("Opening non-existent database did not fail") + } + if !isNotExist(err) { + t.Fatalf("Failed to classify error as not exist error - internal implementation of OpenDatabaseWithExtraOptions might have changed, err: %v", err) + } + err = errors.New("some other error") + if isNotExist(err) { + t.Fatalf("Classified other error as not exist, err: %v", err) + } +} + +func TestIsNotExistError(t *testing.T) { + t.Run("TestIsPebbleNotExistError", func(t *testing.T) { + testIsNotExistError(t, "pebble", isPebbleNotExistError) + }) + t.Run("TestIsLeveldbNotExistError", func(t *testing.T) { + testIsNotExistError(t, "leveldb", isLeveldbNotExistError) + }) + t.Run("TestIsNotExistErrorWithPebble", func(t *testing.T) { + testIsNotExistError(t, "pebble", IsNotExistError) + }) + t.Run("TestIsNotExistErrorWithLeveldb", func(t *testing.T) { + testIsNotExistError(t, "leveldb", IsNotExistError) + }) +} diff --git a/util/redisutil/redis_coordinator.go b/util/redisutil/redis_coordinator.go index 59e3b0e0f9..2c12ffec50 100644 --- a/util/redisutil/redis_coordinator.go +++ b/util/redisutil/redis_coordinator.go @@ -13,12 +13,13 @@ import ( "github.com/offchainlabs/nitro/arbutil" ) -const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only -const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key -const PRIORITIES_KEY string = "coordinator.priorities" // Read only -const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self -const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN -const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN +const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only +const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key +const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key +const PRIORITIES_KEY string = "coordinator.priorities" // Read only +const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self +const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN +const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN const WANTS_LOCKOUT_VAL string = "OK" const INVALID_VAL string = "INVALID" const INVALID_URL string = ""