diff --git a/Makefile b/Makefile index 2c20e4f..a959894 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,16 @@ build: go build -o bin/oasvlfy . +.PHONY: proto +proto: + protoc --go_out=./proto/p2p/v1 \ + ./proto/p2p/v1/*.proto + fmt: go fmt ./... +fmtproto: + clang-format -i ./proto/p2p/**/*.proto + test: go test -v ./... \ No newline at end of file diff --git a/cmd/config_loader.go b/cmd/config_loader.go index 0869e36..5982ceb 100644 --- a/cmd/config_loader.go +++ b/cmd/config_loader.go @@ -1,7 +1,7 @@ package cmd import ( - "io/ioutil" + "os" "strings" "github.com/oasysgames/oasys-optimism-verifier/config" @@ -139,7 +139,7 @@ func mustNewConfigLoader(cmd *cobra.Command) *configLoader { return opts } -func (opts *configLoader) load() (*config.Config, error) { +func (opts *configLoader) load(enableStrictValidation bool) (*config.Config, error) { // load config from the file if !opts.fromCli { path, err := opts.cmd.Flags().GetString(fileConfigFlag) @@ -147,12 +147,12 @@ func (opts *configLoader) load() (*config.Config, error) { return nil, err } - input, err := ioutil.ReadFile(path) + input, err := os.ReadFile(path) if err != nil { return nil, err } - conf, err := config.NewConfig(input) + conf, err := config.NewConfig(input, true) if err != nil { return nil, err } @@ -188,7 +188,7 @@ func (opts *configLoader) load() (*config.Config, error) { } } - if err := config.Validate(opts.cfg); err != nil { + if err := config.Validate(opts.cfg, enableStrictValidation); err != nil { return nil, err } diff --git a/cmd/config_loader_test.go b/cmd/config_loader_test.go index 46a21c1..06b5c3d 100644 --- a/cmd/config_loader_test.go +++ b/cmd/config_loader_test.go @@ -57,11 +57,11 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() { address: '0xD244F03CA3e99C6093f6cBEFBD2f4508244C59D4' password: %s plain: '0xebf3a7f5f805e02c0bbbd599acd5c881f40db22caa95127d4bf48e2dde5fd7bb' - + hub_layer: chain_id: 1 rpc: https://rpc.hub.example.com/ - + verse_layer: discovery: endpoint: https://discovery.example.com/ @@ -71,7 +71,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() { l1_contracts: StateCommitmentChain: '0x01E901F3c65fA7CBd4505F5eF3A88e4ce432e4B5' L2OutputOracle: '0x2489317FA6e003550111D5D196302Ba0879354e2' - + p2p: listens: - listen0 @@ -92,7 +92,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() { verifier: enable: true wallet: verifier - + submitter: enable: true confirmations: 10 @@ -123,7 +123,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() { }) rootCmd.Execute() - got, _ := globalConfigLoader.load() + got, _ := globalConfigLoader.load(false) s.Equal(want, got) } @@ -227,7 +227,8 @@ func (s *ConfigLoaderTestSuite) executeWithCliArgs(appendArgs []string) *config. }, appendArgs...)) cmd.Execute() - conf, _ := opts.load() + conf, err := opts.load(false) + s.Require().NoError(err) return conf } @@ -336,6 +337,8 @@ func (s *ConfigLoaderTestSuite) configWithMinCliArgs() *config.Config { StateCollectLimit: defaults["verifier.state_collect_limit"].(int), StateCollectTimeout: defaults["verifier.state_collect_timeout"].(time.Duration), OptimizeInterval: defaults["verifier.db_optimize_interval"].(time.Duration), + Confirmations: defaults["verifier.confirmations"].(int), + StartBlockOffset: defaults["verifier.start_block_offset"].(uint64), }, Submitter: config.Submitter{ Enable: false, diff --git a/cmd/ping.go b/cmd/ping.go index b561b41..e3e5c52 100644 --- a/cmd/ping.go +++ b/cmd/ping.go @@ -16,7 +16,7 @@ var pingCmd = &cobra.Command{ Short: "Send ping via P2P to specified peer", Long: "Send ping via P2P to specified peer", Run: func(cmd *cobra.Command, args []string) { - conf, err := globalConfigLoader.load() + conf, err := globalConfigLoader.load(true) if err != nil { util.Exit(1, "Failed to load configuration: %s\n", err) } diff --git a/cmd/start.go b/cmd/start.go index b1ae8ac..8652850 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -19,7 +19,6 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/oasysgames/oasys-optimism-verifier/beacon" "github.com/oasysgames/oasys-optimism-verifier/cmd/ipccmd" - "github.com/oasysgames/oasys-optimism-verifier/collector" "github.com/oasysgames/oasys-optimism-verifier/config" "github.com/oasysgames/oasys-optimism-verifier/contract/stakemanager" "github.com/oasysgames/oasys-optimism-verifier/database" @@ -29,7 +28,6 @@ import ( "github.com/oasysgames/oasys-optimism-verifier/metrics" "github.com/oasysgames/oasys-optimism-verifier/p2p" "github.com/oasysgames/oasys-optimism-verifier/submitter" - "github.com/oasysgames/oasys-optimism-verifier/util" "github.com/oasysgames/oasys-optimism-verifier/verifier" "github.com/oasysgames/oasys-optimism-verifier/verse" "github.com/oasysgames/oasys-optimism-verifier/version" @@ -83,7 +81,6 @@ func runStartCmd(cmd *cobra.Command, args []string) { }) // setup workers - s.mustSetupCollector() s.mustSetupVerifier() s.setupSubmitter() s.mustSetupBeacon() @@ -99,10 +96,6 @@ func runStartCmd(cmd *cobra.Command, args []string) { s.smcache.RefreshLoop(ctx, time.Hour) }() - // start workers - s.startCollector(ctx) - s.startVerifier(ctx) - s.startSubmitter(ctx) s.startVerseDiscovery(ctx) s.startBeacon(ctx) log.Info("All workers started") @@ -149,21 +142,19 @@ func runStartCmd(cmd *cobra.Command, args []string) { } type server struct { - wg sync.WaitGroup - conf *config.Config - db *database.Database - signers map[string]ethutil.Signer - hub ethutil.Client - smcache *stakemanager.Cache - p2p *p2p.Node - blockCollector *collector.BlockCollector - eventCollector *collector.EventCollector - verifier *verifier.Verifier - submitter *submitter.Submitter - bw *beacon.BeaconWorker - msvr *http.Server - psvr *http.Server - ipc *ipc.IPCServer + wg sync.WaitGroup + conf *config.Config + db *database.Database + signers map[string]ethutil.Signer + hub ethutil.Client + smcache *stakemanager.Cache + p2p *p2p.Node + verifier *verifier.Verifier + submitter *submitter.Submitter + bw *beacon.BeaconWorker + msvr *http.Server + psvr *http.Server + ipc *ipc.IPCServer } func mustNewServer(ctx context.Context) *server { @@ -173,7 +164,7 @@ func mustNewServer(ctx context.Context) *server { signers: map[string]ethutil.Signer{}, } - if s.conf, err = globalConfigLoader.load(); err != nil { + if s.conf, err = globalConfigLoader.load(true); err != nil { log.Crit("Failed to load configuration", "err", err) } @@ -219,7 +210,10 @@ func (s *server) mustStartMetrics(ctx context.Context) { go func() { // NOTE: Don't add wait group, as no need to guarantee the completion if err := metrics.ListenAndServe(ctx, s.msvr); err != nil { - log.Crit("Failed to start metrics server", "err", err) + // `ErrServerClosed` is thrown when `Shutdown` is intentionally called + if !errors.Is(err, http.ErrServerClosed) { + log.Crit("Failed to start metrics server", "err", err) + } } log.Info("Metrics server have exited listening", "addr", s.conf.Metrics.Listen) }() @@ -236,7 +230,10 @@ func (s *server) mustStartPprof(ctx context.Context) { go func() { // NOTE: Don't add wait group, as no need to guarantee the completion if err := ps.ListenAndServe(ctx, s.psvr); err != nil { - log.Crit("Failed to start pprof server", "err", err) + // `ErrServerClosed` is thrown when `Shutdown` is intentionally called + if !errors.Is(err, http.ErrServerClosed) { + log.Crit("Failed to start pprof server", "err", err) + } } log.Info("pprof server have exited listening", "addr", s.conf.Debug.Pprof.Listen) }() @@ -297,53 +294,8 @@ func (s *server) mustStartP2P(ctx context.Context, ipc *ipc.IPCServer) { go func() { defer s.wg.Done() - s.p2p.Start(ctx) - log.Info("P2P node has stopped, decrement wait group") - }() -} - -func (s *server) mustSetupCollector() { - if !s.conf.Verifier.Enable { - return - } - - signer, ok := s.signers[s.conf.Verifier.Wallet] - if !ok { - log.Crit("Wallet for the Verifier not found", "wallet", s.conf.Verifier.Wallet) - } - - s.blockCollector = collector.NewBlockCollector(&s.conf.Verifier, s.db, s.hub) - s.eventCollector = collector.NewEventCollector(&s.conf.Verifier, s.db, s.hub, signer.From()) -} - -func (s *server) startCollector(ctx context.Context) { - if s.blockCollector == nil || s.eventCollector == nil { - return - } - - s.wg.Add(1) - go func() { - defer func() { - defer s.wg.Done() - log.Info("Block collector has stopped, decrement wait group") - }() - - ticker := time.NewTicker(s.conf.Verifier.Interval) - defer ticker.Stop() - - log.Info("Block collector started", "interval", s.conf.Verifier.Interval, "block-limit", s.conf.Verifier.BlockLimit) - log.Info("Event collector started", "interval", s.conf.Verifier.Interval, "block-limit", s.conf.Verifier.BlockLimit) - - for { - select { - case <-ctx.Done(): - log.Info("Block collector stopped") - return - case <-ticker.C: - s.blockCollector.Work(ctx) - s.eventCollector.Work(ctx) - } - } + enableSubscriber := s.conf.Submitter.Enable + s.p2p.Start(ctx, enableSubscriber) }() } @@ -358,67 +310,7 @@ func (s *server) mustSetupVerifier() { } l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer) - s.verifier = verifier.NewVerifier(&s.conf.Verifier, s.db, l1Signer) -} - -func (s *server) startVerifier(ctx context.Context) { - if s.verifier == nil { - return - } - - s.wg.Add(1) - go func() { - defer func() { - defer s.wg.Done() - log.Info("Verifier has stopped, decrement wait group") - }() - - // Start verifier ticker - vTick := time.NewTicker(s.conf.Verifier.Interval) - defer vTick.Stop() - - // Subscribe new signature from validators - var ( - sub = s.verifier.SubscribeNewSignature(ctx) - subscribes = map[common.Address]*database.OptimismSignature{} - ) - defer sub.Cancel() - - // Publish new signature via p2p - debounce := time.NewTicker(time.Second * 5) - defer debounce.Stop() - - // Optimize database every hour - dbTick := util.NewTicker(s.conf.Verifier.OptimizeInterval, 1) - defer dbTick.Stop() - - log.Info("Verifier started", "signer", s.verifier.L1Signer().Signer()) - - for { - select { - case <-ctx.Done(): - log.Info("Verifier stopped") - return - case <-vTick.C: - s.verifier.Work(ctx) - case sig := <-sub.Next(): - subscribes[sig.Signer.Address] = sig - case <-debounce.C: - if len(subscribes) == 0 { - continue - } - var publishes []*database.OptimismSignature - for _, sig := range subscribes { - publishes = append(publishes, sig) - } - s.p2p.PublishSignatures(ctx, publishes) - subscribes = map[common.Address]*database.OptimismSignature{} - case <-dbTick.C: - log.Info("Optimize database") - s.db.OPSignature.RepairPreviousID(s.verifier.L1Signer().Signer()) - } - } - }() + s.verifier = verifier.NewVerifier(&s.conf.Verifier, s.db, s.p2p, l1Signer) } func (s *server) setupSubmitter() { @@ -429,48 +321,48 @@ func (s *server) setupSubmitter() { s.submitter = submitter.NewSubmitter(&s.conf.Submitter, s.db, s.smcache) } -func (s *server) startSubmitter(ctx context.Context) { - if s.submitter == nil { - return +func (s *server) startVerseDiscovery(ctx context.Context) { + if len(s.conf.VerseLayer.Directs) != 0 { + // read verses from the configuration + s.verseDiscoveryHandler(ctx, s.conf.VerseLayer.Directs) } - s.wg.Add(1) - go func() { - defer s.wg.Done() - - s.submitter.Start(ctx) - log.Info("Submitter has stopped, decrement wait group") - }() -} - -func (s *server) startVerseDiscovery(ctx context.Context) { if s.conf.VerseLayer.Discovery.Endpoint == "" { - // read verses from the configuration only, if the discovery endpoint is not set - s.verseDiscoveryHandler(s.conf.VerseLayer.Directs) + // Disable dinamically discovered verses, if the endpoint is not set return } // dinamically discovered verses - disc := config.NewVerseDiscovery( + disc, err := config.NewVerseDiscovery( + ctx, http.DefaultClient, s.conf.VerseLayer.Discovery.Endpoint, s.conf.VerseLayer.Discovery.RefreshInterval, ) + if err != nil { + log.Crit("Failed to construct verse discovery", "err", err) + } + + // Subscribed verses to verifier and submitter + discSub := disc.Subscribe(ctx) + + // synchronously try the first discovery + if err := disc.Work(ctx); err != nil { + // exit if the first discovery faild, because the following discovery highly likely fail + log.Crit("Failed to work verse discovery", "err", err) + } s.wg.Add(1) go func() { defer func() { defer s.wg.Done() + discSub.Cancel() log.Info("Verse discovery has stopped, decrement wait group") }() discTick := time.NewTicker(s.conf.VerseLayer.Discovery.RefreshInterval) defer discTick.Stop() - // Subscribed verses to verifier and submitter - sub := disc.Subscribe(ctx) - defer sub.Cancel() - log.Info("Verse discovery started", "endpoint", s.conf.VerseLayer.Discovery.Endpoint, "interval", s.conf.VerseLayer.Discovery.RefreshInterval) for { @@ -478,8 +370,8 @@ func (s *server) startVerseDiscovery(ctx context.Context) { case <-ctx.Done(): log.Info("Verse discovery stopped") return - case verses := <-sub.Next(): - s.verseDiscoveryHandler(verses) + case verses := <-discSub.Next(): + s.verseDiscoveryHandler(ctx, verses) case <-discTick.C: if err := disc.Work(ctx); err != nil { log.Error("Failed to work verse discovery", "err", err) @@ -489,7 +381,7 @@ func (s *server) startVerseDiscovery(ctx context.Context) { }() } -func (s *server) verseDiscoveryHandler(discovers []*config.Verse) { +func (s *server) verseDiscoveryHandler(ctx context.Context, discovers []*config.Verse) { if s.verifier == nil && s.submitter == nil { log.Warn("Both Verifier and Submitter are disabled") return @@ -509,7 +401,10 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) { verse verse.Verse verify common.Address } - var verses []*verse_ + var ( + verses []*verse_ + verseChainIDs []uint64 + ) for _, cfg := range discovers { for name, addr := range cfg.L1Contracts { if factory, ok := verseFactories[name]; ok { @@ -518,10 +413,13 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) { verse: factory(s.db, s.hub, common.HexToAddress(addr)), verify: verifyContracts[name], }) + verseChainIDs = append(verseChainIDs, cfg.ChainID) } } } + log.Info("Discovered verses", "count", len(verses), "chain-ids", verseChainIDs) + for _, x := range verses { // add verse to Verifier if s.verifier != nil && !s.verifier.HasTask(x.verse.RollupContract(), x.cfg.RPC) { @@ -529,7 +427,8 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) { if err != nil { log.Error("Failed to construct verse-layer client", "err", err) } else { - s.verifier.AddTask(x.verse.WithVerifiable(l2Client)) + log.Info("Add verse to Verifier", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract()) + s.verifier.AddTask(ctx, x.verse.WithVerifiable(l2Client), x.cfg.ChainID) } } @@ -546,8 +445,9 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) { continue } + log.Info("Add verse to Submitter", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract()) l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer) - s.submitter.AddTask(x.verse.WithTransactable(l1Signer, x.verify)) + s.submitter.AddTask(ctx, x.verse.WithTransactable(l1Signer, x.verify), x.cfg.ChainID) } } } diff --git a/cmd/status.go b/cmd/status.go index 115824d..c48f90c 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -11,7 +11,7 @@ var statusCmd = &cobra.Command{ Short: "Show status", Long: "Show status", Run: func(cmd *cobra.Command, args []string) { - conf, err := globalConfigLoader.load() + conf, err := globalConfigLoader.load(true) if err != nil { util.Exit(1, "Failed to load configuration: %s\n", err) } diff --git a/cmd/unlock.go b/cmd/unlock.go index a9da7dd..9e6f1dc 100644 --- a/cmd/unlock.go +++ b/cmd/unlock.go @@ -32,7 +32,7 @@ func init() { } func runUnlockCmd(cmd *cobra.Command, args []string) { - conf, err := globalConfigLoader.load() + conf, err := globalConfigLoader.load(true) if err != nil { util.Exit(1, "Failed to load configuration: %s\n", err) } diff --git a/collector/block_collector.go b/collector/block_collector.go index 44f233c..44a0025 100644 --- a/collector/block_collector.go +++ b/collector/block_collector.go @@ -22,6 +22,7 @@ type BlockCollector struct { log log.Logger } +// Deprecated: func NewBlockCollector( cfg *config.Verifier, db *database.Database, diff --git a/collector/event_collector.go b/collector/event_collector.go index d356f12..22c4339 100644 --- a/collector/event_collector.go +++ b/collector/event_collector.go @@ -24,6 +24,7 @@ type EventCollector struct { log log.Logger } +// Deprecated: func NewEventCollector( cfg *config.Verifier, db *database.Database, @@ -71,7 +72,7 @@ func (w *EventCollector) Work(ctx context.Context) { // collect event logs from hub-layer start, end := blocks[0], blocks[len(blocks)-1] - logs, err := w.hub.FilterLogs(ctx, verse.NewEventLogFilter(start.Number, end.Number)) + logs, err := w.hub.FilterLogs(ctx, verse.NewEventLogFilter(start.Number, end.Number, nil)) if err != nil { w.log.Error("Failed to fetch event logs from hub-layer", "start", start, "end", end, "err", err) diff --git a/config/config.go b/config/config.go index dc797b1..fffac62 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "errors" "path/filepath" "reflect" "strings" @@ -72,10 +73,15 @@ func Defaults() map[string]interface{} { "verifier.state_collect_limit": 1000, "verifier.state_collect_timeout": 15 * time.Second, "verifier.db_optimize_interval": time.Hour, + "verifier.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1. + "verifier.start_block_offset": uint64(5760 * 2), // 2 days - "submitter.interval": 15 * time.Second, + // The minimum interval for Verse v0 is 15 seconds. + // On the other hand, the minimum interval for Verse v1 is 80 seconds. + // Balance the two by setting the default to 30 seconds. + "submitter.interval": 30 * time.Second, "submitter.concurrency": 50, - "submitter.confirmations": 6, + "submitter.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1. "submitter.gas_multiplier": 1.1, "submitter.batch_size": 20, "submitter.max_gas": 5_000_000, @@ -105,7 +111,7 @@ func Defaults() map[string]interface{} { } // Build configuration. -func NewConfig(input []byte) (*Config, error) { +func NewConfig(input []byte, enableStrictValidation bool) (*Config, error) { k := koanf.New(".") // load default values @@ -126,7 +132,7 @@ func NewConfig(input []byte) (*Config, error) { } // run validation - if err := Validate(&conf); err != nil { + if err := Validate(&conf, enableStrictValidation); err != nil { return nil, err } @@ -150,8 +156,22 @@ func MustNewDefaultConfig() *Config { return &conf } -func Validate(conf *Config) error { - return validate.Struct(conf) +func Validate(conf *Config, strict bool) error { + if err := validate.Struct(conf); err != nil { + return err + } + if strict { + // validate verse discovery configuration + if conf.VerseLayer.Discovery.Endpoint == "" && len(conf.VerseLayer.Directs) == 0 { + return errors.New("either verse.discovery or verse.directs must be set") + } + // NOTE: Commented out because bootnode disable verifier and submitter + // validate verifier and submitter configuration + // if !conf.Verifier.Enable && !conf.Submitter.Enable { + // return errors.New("either verifier.enable or submitter.enable must be set") + // } + } + return nil } // App configuration. @@ -375,6 +395,13 @@ type Verifier struct { // Interval to optimize database. OptimizeInterval time.Duration `koanf:"db_optimize_interval"` + + // Number of confirmation blocks for transaction receipt. + Confirmations int + + // The number of start fetching events is offset from the current block. + // This offset is used at the first time to fetch events. + StartBlockOffset uint64 `koanf:"start_block_offset"` } type Submitter struct { diff --git a/config/config_test.go b/config/config_test.go index e54b429..b717cd6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -28,11 +28,11 @@ func (s *ConfigTestSuite) TestNewConfig() { address: '0xBA3186c30Bb0d9e8c7924147238F82617C3fE729' password: /etc/passwd plain: '0x70ce1ba0e76547883c0999662d093dd3426d550ec783a6c775b0060bf4ee6d0f' - + hub_layer: chain_id: 12345 rpc: http://127.0.0.1:8545/ - + verse_layer: discovery: endpoint: http://127.0.0.1/api/v1/verse-layers.json @@ -43,7 +43,7 @@ func (s *ConfigTestSuite) TestNewConfig() { rpc: http://127.0.0.1:8545/ l1_contracts: StateCommitmentChain: '0x62b105FD57A11819f9E50892E18a354bd7c89937' - + p2p: listens: - listen0 @@ -80,7 +80,9 @@ func (s *ConfigTestSuite) TestNewConfig() { state_collect_limit: 5 state_collect_timeout: 1s db_optimize_interval: 2s - + confirmations: 4 + start_block_offset: 5760 + submitter: enable: true interval: 5s @@ -240,6 +242,8 @@ func (s *ConfigTestSuite) TestNewConfig() { StateCollectLimit: 5, StateCollectTimeout: time.Second, OptimizeInterval: time.Second * 2, + Confirmations: 4, + StartBlockOffset: 5760, }, Submitter: Submitter{ Enable: true, @@ -293,7 +297,7 @@ func (s *ConfigTestSuite) TestNewConfig() { }, } - got, _ := NewConfig(s.toBytes(input)) + got, _ := NewConfig(s.toBytes(input), false) s.Equal(want, got) } @@ -341,7 +345,7 @@ func (s *ConfigTestSuite) TestValidate() { "Config.metrics.listen": "hostname_port", } - _, err := NewConfig(s.toBytes(input)) + _, err := NewConfig(s.toBytes(input), false) gots := map[string]string{} for _, e := range err.(validator.ValidationErrors) { @@ -358,20 +362,21 @@ func (s *ConfigTestSuite) TestDefaultValues() { input := (` datastore: /tmp keystore: /tmp - + hub_layer: chain_id: 12345 rpc: http://127.0.0.1:8545/ - + verse_layer: discovery: endpoint: http://127.0.0.1/ - + p2p: listen: 127.0.0.1:20001 `) - got, _ := NewConfig(s.toBytes(input)) + got, err := NewConfig(s.toBytes(input), false) + s.NoError(err) s.Equal(time.Hour, got.VerseLayer.Discovery.RefreshInterval) @@ -409,10 +414,11 @@ func (s *ConfigTestSuite) TestDefaultValues() { s.Equal(1000, got.Verifier.StateCollectLimit) s.Equal(15*time.Second, got.Verifier.StateCollectTimeout) s.Equal(time.Hour, got.Verifier.OptimizeInterval) + s.Equal(3, got.Verifier.Confirmations) - s.Equal(15*time.Second, got.Submitter.Interval) + s.Equal(30*time.Second, got.Submitter.Interval) s.Equal(50, got.Submitter.Concurrency) - s.Equal(6, got.Submitter.Confirmations) + s.Equal(3, got.Submitter.Confirmations) s.Equal(1.1, got.Submitter.GasMultiplier) s.Equal(20, got.Submitter.BatchSize) s.Equal(uint64(5_000_000), got.Submitter.MaxGas) diff --git a/config/verse_discovery.go b/config/verse_discovery.go index a546ea6..2552c63 100644 --- a/config/verse_discovery.go +++ b/config/verse_discovery.go @@ -20,17 +20,23 @@ type VerseDiscovery struct { } func NewVerseDiscovery( + ctx context.Context, client *http.Client, url string, refreshInterval time.Duration, -) *VerseDiscovery { - return &VerseDiscovery{ +) (disc *VerseDiscovery, err error) { + disc = &VerseDiscovery{ client: client, url: url, refreshInterval: refreshInterval, topic: util.NewTopic(), log: log.New("worker", "verse-discovery"), } + // Commented out the initial fetch, as it will be done in the worker + // if _, err = disc.fetch(ctx); err != nil { + // return nil, fmt.Errorf("the inital verse discovery failed, make sure the url(%s) is reachable: %w", url, err) + // } + return } func (w *VerseDiscovery) Subscribe(ctx context.Context) *VerseSubscription { diff --git a/config/verse_discovery_test.go b/config/verse_discovery_test.go index 1fcd06d..f6eec2a 100644 --- a/config/verse_discovery_test.go +++ b/config/verse_discovery_test.go @@ -50,7 +50,8 @@ func (s *VerseDiscoveryTestSuite) TestDiscover() { }) // setup pubsub - discovery := NewVerseDiscovery(client, "https://example.com/", time.Second) + discovery, err := NewVerseDiscovery(context.Background(), client, "https://example.com/", time.Second) + s.Require().NoError(err) sub0 := discovery.Subscribe(context.Background()) sub1 := discovery.Subscribe(context.Background()) diff --git a/database/op_event.go b/database/op_event.go index 5f4add0..bd8ecd4 100644 --- a/database/op_event.go +++ b/database/op_event.go @@ -14,12 +14,12 @@ type OPEvent interface { idCol() string contractCol() string rollupIndexCol() string - assignEvent(contract *OptimismContract, e any) error Logger(base log.Logger) log.Logger GetContract() *OptimismContract GetRollupIndex() uint64 GetRollupHash() common.Hash + AssignEvent(contract *OptimismContract, e any) error } type OPEventConstraint[X any] interface { @@ -131,7 +131,7 @@ func (db *OPEventDB[T, PT]) Save(contract common.Address, e any) (OPEvent, error err := db.db.Transaction(func(txdb *Database) error { if c, err := txdb.OPContract.FindOrCreate(contract); err != nil { return err - } else if err := row.assignEvent(c, e); err != nil { + } else if err := row.AssignEvent(c, e); err != nil { return err } return txdb.rawdb.Create(&row).Error diff --git a/database/op_proposal.go b/database/op_proposal.go index be56d9e..b19f834 100644 --- a/database/op_proposal.go +++ b/database/op_proposal.go @@ -43,7 +43,7 @@ func (row *OpstackProposal) GetRollupHash() common.Hash { return crypto.Keccak256Hash(msg) } -func (row *OpstackProposal) assignEvent(contract *OptimismContract, e any) error { +func (row *OpstackProposal) AssignEvent(contract *OptimismContract, e any) error { t, ok := e.(*l2oo.OasysL2OutputOracleOutputProposed) if !ok { return errors.New("invalid event") diff --git a/database/op_signature.go b/database/op_signature.go index 30a4eeb..30402fe 100644 --- a/database/op_signature.go +++ b/database/op_signature.go @@ -3,6 +3,7 @@ package database import ( "bytes" "errors" + "fmt" "strings" "github.com/ethereum/go-ethereum/common" @@ -10,6 +11,11 @@ import ( "gorm.io/gorm" ) +const ( + DeleteOldsLimit = 1024 + FindUnverifiedBySignerLimit = 64 +) + type OptimismSignatureDB db func (db *OptimismSignatureDB) FindByID(id string) (*OptimismSignature, error) { @@ -128,6 +134,41 @@ func (db *OptimismSignatureDB) FindLatestsBySigner( return rows, nil } +func (db *OptimismSignatureDB) FindUnverifiedBySigner( + signer common.Address, + unverifiedIndex uint64, + contract *common.Address, + limit int, +) ([]*OptimismSignature, error) { + _signer, err := db.db.Signer.FindOrCreate(signer) + if err != nil { + return nil, err + } + + tx := db.rawdb. + Joins("Signer"). + Joins("Contract"). + Where("optimism_signatures.signer_id = ?", _signer.ID). + Where("optimism_signatures.batch_index >= ?", unverifiedIndex). + Order("optimism_signatures.batch_index"). + Limit(limit) + + if contract != nil { + _contract, err := db.db.OPContract.FindOrCreate(*contract) + if err != nil { + return nil, err + } + tx = tx.Where("optimism_signatures.optimism_scc_id = ?", _contract.ID) + } + var rows []*OptimismSignature + tx = tx.Find(&rows) + + if tx.Error != nil { + return nil, tx.Error + } + return rows, nil +} + func (db *OptimismSignatureDB) Save( id, previousID *string, signer common.Address, @@ -205,7 +246,7 @@ func (db *OptimismSignatureDB) Save( return &created, nil } -// Delete signatures after the specified rollup index. +// Delete signatures after the specified rollup index by signer. func (db *OptimismSignatureDB) Deletes( signer common.Address, contract common.Address, @@ -240,6 +281,46 @@ func (db *OptimismSignatureDB) Deletes( return affected, nil } +// Delete signatures after the specified rollup index. +func (db *OptimismSignatureDB) DeleteOlds( + contract common.Address, + rollupIndex uint64, + limit int, +) (int64, error) { + _contract, err := db.db.OPContract.FindOrCreate(contract) + if err != nil { + return 0, fmt.Errorf("failed to find contract(%s) during delete old signatures: %w", contract.Hex(), err) + } + + var affected int64 + err = db.rawdb.Transaction(func(s *gorm.DB) error { + + var ids []string + tx := s. + Model(&OptimismSignature{}). + Where("optimism_signatures.optimism_scc_id = ?", _contract.ID). + Where("optimism_signatures.batch_index <= ?", rollupIndex). + Limit(limit). + Pluck("optimism_signatures.id", &ids) + if tx.Error != nil { + return tx.Error + } + + tx = s.Where("id IN (?)", ids).Delete(&OptimismSignature{}) + if tx.Error != nil { + return tx.Error + } + + affected = tx.RowsAffected + return nil + }) + if err != nil { + return -1, err + } + + return affected, nil +} + // for debug func (db *OptimismSignatureDB) SequentialFinder(startPrevID string) func() ([]*OptimismSignature, error) { var prevRows []*OptimismSignature diff --git a/database/op_state.go b/database/op_state.go index 46e033c..0c46aae 100644 --- a/database/op_state.go +++ b/database/op_state.go @@ -36,7 +36,7 @@ func (row *OptimismState) GetRollupHash() common.Hash { return row.BatchRoot } -func (row *OptimismState) assignEvent(contract *OptimismContract, e any) error { +func (row *OptimismState) AssignEvent(contract *OptimismContract, e any) error { t, ok := e.(*scc.SccStateBatchAppended) if !ok { return errors.New("invalid event") diff --git a/ethutil/client.go b/ethutil/client.go index 3a44df0..028b1b0 100644 --- a/ethutil/client.go +++ b/ethutil/client.go @@ -2,8 +2,13 @@ package ethutil import ( "context" + "errors" + "fmt" "math/big" + "strings" + "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -13,6 +18,11 @@ import ( "github.com/lmittmann/w3" "github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/w3types" + "golang.org/x/sync/semaphore" +) + +var ( + ErrTooManyRequests = errors.New("too many requests") ) type SignDataFn = func(hash []byte) (sig []byte, err error) @@ -27,6 +37,7 @@ type Client interface { ctx context.Context, hash common.Hash, ) (tx *types.Transaction, isPending bool, err error) + FilterLogsWithRateThottling(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) NewBatchHeaderClient() (BatchHeaderClient, error) GetProof(ctx context.Context, account common.Address, keys []string, blockNumber *big.Int) (*gethclient.AccountResult, error) } @@ -46,6 +57,8 @@ type client struct { url string rpc *rpc.Client + // used for api rate thottling. + sem *semaphore.Weighted } func NewClient(url string) (Client, error) { @@ -54,10 +67,14 @@ func NewClient(url string) (Client, error) { return nil, err } + // This is magic number, it should be updated based on the network. + // semaphore is used for log filtering rate thottling for now. + const concurrency = 2 return &client{ Client: ethclient.NewClient(c), url: url, rpc: c, + sem: semaphore.NewWeighted(concurrency), }, nil } @@ -72,6 +89,30 @@ func (c *client) TransactionByHash( return c.Client.TransactionByHash(ctx, hash) } +func (c *client) FilterLogsWithRateThottling(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error) { + if err = c.sem.Acquire(ctx, 1); err != nil { + // continue even if we can't acquire semaphore. + fmt.Printf("***** WARN *****\nfailed to acquire semaphore in filter: %v\n", err) + } else { + defer c.sem.Release(1) + } + + logs, err = c.Client.FilterLogs(ctx, q) + if err != nil && strings.Contains(err.Error(), "too many requests") { + // sleep longer if the rate limit is reached. + time.Sleep(3 * time.Second) + err = fmt.Errorf("%w: %v", ErrTooManyRequests, err) + return + } + + // sleep if the filter range is big or not set. + if q.ToBlock == nil || q.FromBlock == nil || 1024 <= q.ToBlock.Sub(q.ToBlock, q.FromBlock).Uint64() { + time.Sleep(300 * time.Microsecond) + } + + return +} + func (c *client) NewBatchHeaderClient() (BatchHeaderClient, error) { client, err := w3.Dial(c.URL()) if err != nil { diff --git a/p2p/node.go b/p2p/node.go index 9a21a5c..51e75ce 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -34,7 +34,8 @@ import ( ) const ( - pubsubTopic = "/oasys-optimism-verifier/pubsub/1.0.0" + pubsubTopic = "/oasys-optimism-verifier/pubsub/1.0.0" + // Deprecated: streamProtocol = "/oasys-optimism-verifier/stream/1.0.0" ) @@ -156,25 +157,27 @@ func NewNode( return worker, nil } -func (w *Node) Start(ctx context.Context) { +func (w *Node) Start(ctx context.Context, enableSubscriber bool) { defer w.h.Close() defer w.topic.Close() defer w.sub.Cancel() + + // For backward compatibility(older than v1.1.0), we support the stream protocol. w.h.SetStreamHandler(streamProtocol, w.newStreamHandler(ctx)) var ( - wg sync.WaitGroup - publishTicker = time.NewTicker(w.cfg.PublishInterval) - meterTicker = time.NewTicker(time.Second * 60) + wg sync.WaitGroup + meterTicker = time.NewTicker(time.Second * 60) ) - defer publishTicker.Stop() defer meterTicker.Stop() - wg.Add(1) - go func() { - defer wg.Done() - w.subscribeLoop(ctx) - }() + if enableSubscriber { + wg.Add(1) + go func() { + defer wg.Done() + w.subscribeLoop(ctx) + }() + } w.showBootstrapLog() @@ -184,8 +187,6 @@ func (w *Node) Start(ctx context.Context) { w.log.Info("P2P node stopping...") wg.Wait() return - case <-publishTicker.C: - w.publishLatestSignatures(ctx) case <-meterTicker.C: nwstat := newNetworkStatus(w.h) w.meterTCPConnections.Set(float64(nwstat.connections.tcp)) @@ -204,41 +205,6 @@ func (w *Node) Host() host.Host { return w.h } func (w *Node) Routing() routing.Routing { return w.dht } func (w *Node) HolePunchHelper() HolePunchHelper { return w.hpHelper } -func (w *Node) meterLoop(ctx context.Context) { - ticker := time.NewTicker(time.Second * 15) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - nwstat := newNetworkStatus(w.h) - w.meterTCPConnections.Set(float64(nwstat.connections.tcp)) - w.meterUDPConnections.Set(float64(nwstat.connections.udp)) - w.meterRelayConnections.Set(float64(nwstat.connections.relay)) - w.meterRelayHopStreams.Set(float64(nwstat.streams.hop)) - w.meterRelayStopStreams.Set(float64(nwstat.streams.stop)) - w.meterVerifierStreams.Set(float64(nwstat.streams.verifier)) - w.meterPeers.Set(float64(w.h.Peerstore().Peers().Len())) - } - } -} - -func (w *Node) publishLoop(ctx context.Context) { - ticker := time.NewTicker(w.cfg.PublishInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - w.publishLatestSignatures(ctx) - } - } -} - func (w *Node) subscribeLoop(ctx context.Context) { type job struct { ctx context.Context @@ -252,8 +218,11 @@ func (w *Node) subscribeLoop(ctx context.Context) { workers := util.NewWorkerGroup(100) procs := &sync.Map{} + w.log.Info("Start subscribing to pubsub messages") + for { - peer, msg, err := subscribe(ctx, w.sub, w.h.ID()) + var msg pb.PubSub + peer, err := subscribe(ctx, w.sub, w.h.ID(), &msg) if errors.Is(err, context.Canceled) { // worker stopped return @@ -369,7 +338,7 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( ctx context.Context, sender peer.ID, remote *pb.OptimismSignature, -) { +) bool { signer := common.BytesToAddress(remote.Signer) logctx := []interface{}{ "peer", sender, @@ -381,97 +350,35 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( if err := verifySignature(w.hubLayerChainID, remote); err != nil { w.log.Error("Invalid signature", append(logctx, "err", err)...) - return - } - - local, err := w.db.OPSignature.FindLatestsBySigner(signer, 1, 0) - if err != nil { - w.log.Error("Failed to find the latest signature", append(logctx, "err", err)...) - return - } else if len(local) > 0 && strings.Compare(local[0].ID, remote.Id) == 1 { - // fully synchronized or less than local - return + return false } - // open stream to peer - var s network.Stream - openStream := func() error { - if ss, err := w.openStream(ctx, sender); err != nil { - return err - } else { - s = ss - return nil - } + if _, err := w.db.OPSignature.FindByID(remote.Id); err == nil { + // duplicated + w.log.Debug("Duplicated signature", append(logctx, "id", remote.Id)...) + return false + } else if !errors.Is(err, database.ErrNotFound) { + w.log.Error("Unexpected error happen during finding signature by id", append(logctx, "id", remote.Id, "err", err)...) + return false } - returned := make(chan any) - defer func() { close(returned) }() - go func() { - select { - case <-ctx.Done(): - // canceled because newer signature were received - case <-returned: - } - if s != nil { - w.closeStream(s) - } - }() - - var idAfter string - if len(local) == 0 { - w.log.Info("Request all signatures", logctx...) - } else { - if openStream() != nil { - return - } - if found, err := w.findCommonLatestSignature(ctx, s, signer); err == nil { - fsigner := common.BytesToAddress(found.Signer) - if fsigner != signer { - w.log.Error("Signer does not match", append(logctx, "found-signer", fsigner)...) - return - } - idAfter = found.Id - w.log.Info("Found common signature from peer", - "signer", signer, "id", found.Id, "previous-id", found.PreviousId) - } else if errors.Is(err, database.ErrNotFound) { - if localID, err := ulid.ParseStrict(local[0].ID); err == nil { - // Prevent out-of-sync by specifying the ID of 1 second ago - ms := localID.Time() - 1000 - idAfter = ulid.MustNew(ms, ulid.DefaultEntropy()).String() - logctx = append(logctx, "local-id", local[0].ID, "created-after", time.UnixMilli(int64(ms))) - } else { - w.log.Error("Failed to parse ULID", "local-id", local[0].ID, "err", err) - return - } - } else { - return - } - - w.log.Info("Request signatures", append(logctx, "id-after", idAfter)...) - } + w.log.Info("Received new signature", logctx...) - // send request to peer - m := &pb.Stream{ - Body: &pb.Stream_OptimismSignatureExchange{ - OptimismSignatureExchange: &pb.OptimismSignatureExchange{ - Requests: []*pb.OptimismSignatureExchange_Request{ - { - Signer: remote.Signer, - IdAfter: idAfter, - }, - }, - }, - }, - } - if s == nil && openStream() != nil { - return - } - if err = w.writeStream(s, m); err != nil { - w.log.Error("Failed to send signature request", "err", err) - return + // save signature + if _, err := w.db.OPSignature.Save( + &remote.Id, &remote.PreviousId, + signer, + common.BytesToAddress(remote.Contract), + remote.RollupIndex, + common.BytesToHash(remote.RollupHash), + remote.Approved, + database.BytesSignature(remote.Signature), + ); err != nil { + w.log.Error("Failed to save signature", append(logctx, "err", err)...) + return false } - w.handleOptimismSignatureExchangeResponses(ctx, s) + return true } func (w *Node) handleOptimismSignatureExchangeRequest( @@ -814,8 +721,6 @@ func (w *Node) PublishSignatures(ctx context.Context, rows []*database.OptimismS w.log.Error("Failed to publish latest signatures", "err", err) return } - - w.log.Info("Publish latest signatures", "len", len(rows)) } func (w *Node) openStream(ctx context.Context, peer peer.ID) (network.Stream, error) { @@ -992,7 +897,7 @@ func closeStream(s network.Stream) { } // Publish new message. -func publish(ctx context.Context, topic *ps.Topic, m *pb.PubSub) error { +func publish(ctx context.Context, topic *ps.Topic, m proto.Message) error { data, err := proto.Marshal(m) if err != nil { return fmt.Errorf("failed to marshal pubsub message: %w", err) @@ -1014,27 +919,27 @@ func subscribe( ctx context.Context, sub *ps.Subscription, self peer.ID, -) (peer.ID, *pb.PubSub, error) { + msg proto.Message, +) (peer.ID, error) { recv, err := sub.Next(ctx) if err != nil { - return "", nil, fmt.Errorf("failed to subscribe pubsub message: %w", err) + return "", fmt.Errorf("failed to subscribe pubsub message: %w", err) } if recv.ReceivedFrom == self || recv.GetFrom() == self { - return "", nil, errSelfMessage + return "", errSelfMessage } data, err := decompress(recv.Data) if err != nil { - return "", nil, fmt.Errorf("failed to decompress pubsub message: %w", err) + return "", fmt.Errorf("failed to decompress pubsub message: %w", err) } - var m pb.PubSub - if err = proto.Unmarshal(data, &m); err != nil { - return "", nil, fmt.Errorf("failed to unmarshal pubsub message: %w", err) + if err = proto.Unmarshal(data, msg); err != nil { + return "", fmt.Errorf("failed to unmarshal pubsub message: %w", err) } - return recv.GetFrom(), &m, nil + return recv.GetFrom(), nil } func verifySignature(hubLayerChainID *big.Int, sig *pb.OptimismSignature) error { diff --git a/p2p/node_test.go b/p2p/node_test.go index 8d05e1e..a15b3a5 100644 --- a/p2p/node_test.go +++ b/p2p/node_test.go @@ -20,7 +20,6 @@ import ( "github.com/oasysgames/oasys-optimism-verifier/testhelper" "github.com/oasysgames/oasys-optimism-verifier/testhelper/backend" "github.com/oasysgames/oasys-optimism-verifier/util" - "github.com/oklog/ulid/v2" "github.com/stretchr/testify/suite" ) @@ -137,170 +136,22 @@ func (s *NodeTestSuite) SetupTest() { } func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeFromPubSub() { - type want struct { - signer common.Address - idAfter string - } - type testcase struct { - msg *pb.OptimismSignature - want want - } - cases := []*testcase{ - { - toProtoBufSig(s.sigs[s.signer0][s.contract1][99]), - want{s.signer0, s.sigs[s.signer0][s.contract1][99].ID}, - }, - { - toProtoBufSig(s.sigs[s.signer1][s.contract1][199]), - want{s.signer1, s.sigs[s.signer1][s.contract1][199].ID}, - }, - { - &pb.OptimismSignature{ - Id: util.ULID(nil).String(), - PreviousId: util.ULID(nil).String(), - Signer: s.signer2[:], - Contract: s.contract2[:], - RollupIndex: 0, - RollupHash: s.genStateRoot(s.contract2[:], 0).Bytes(), - Approved: true, - }, - want{s.signer2, ""}, - }, - } - - lastcase := cases[len(cases)-1] - msg := ethutil.NewMessage( - s.b2.ChainID(), - common.BytesToAddress(lastcase.msg.Contract), - new(big.Int).SetUint64(lastcase.msg.RollupIndex), - util.BytesToBytes32(lastcase.msg.RollupHash), - lastcase.msg.Approved, - ) - sigbin, _ := msg.Signature(s.b2.SignData) - lastcase.msg.Signature = sigbin[:] - - // set assertion func to subscriber - var ( - reads = []*pb.Stream{} - wg sync.WaitGroup - ) - wg.Add(len(cases)) - s.node2.h.SetStreamHandler(streamProtocol, func(st network.Stream) { - defer wg.Done() - defer closeStream(st) - - for { - m, _ := readStream(st) - reads = append(reads, m) - - switch m.Body.(type) { - case *pb.Stream_FindCommonOptimismSignature: - writeStream(st, &pb.Stream{Body: &pb.Stream_FindCommonOptimismSignature{ - FindCommonOptimismSignature: &pb.FindCommonOptimismSignature{ - Found: nil, - }, - }}) - case *pb.Stream_OptimismSignatureExchange: - writeStream(st, eom) - case *pb.Stream_Eom: - return - } - } - }) - - // publish message - for _, tt := range cases { - go s.node1.handleOptimismSignatureExchangeFromPubSub(context.Background(), s.node2.h.ID(), tt.msg) - time.Sleep(time.Millisecond * 50) - } - wg.Wait() - - s.Len(reads, 16) - - // signer0 - gots0 := reads[0].GetFindCommonOptimismSignature().Locals - s.Len(gots0, 50) - s.Equal(gots0[0].Id, s.sigs[s.signer0][s.contract1][99].ID) - s.Equal(gots0[49].Id, s.sigs[s.signer0][s.contract1][50].ID) - - gots1 := reads[1].GetFindCommonOptimismSignature().Locals - s.Len(gots1, 50) - s.Equal(gots1[0].Id, s.sigs[s.signer0][s.contract1][49].ID) - s.Equal(gots1[49].Id, s.sigs[s.signer0][s.contract1][0].ID) - - gots2 := reads[2].GetFindCommonOptimismSignature().Locals - s.Len(gots2, 50) - s.Equal(gots2[0].Id, s.sigs[s.signer0][s.contract0][49].ID) - s.Equal(gots2[49].Id, s.sigs[s.signer0][s.contract0][0].ID) - - gots3 := reads[3].GetOptimismSignatureExchange().Requests - s.Len(gots3, 1) - s.Equal(cases[0].want.signer[:], gots3[0].Signer) - func() { - gt := ulid.MustParse(gots3[0].IdAfter) - wt := ulid.MustParse(cases[0].want.idAfter) - s.Equal(wt.Time()-1000, gt.Time()) - }() - - gots4 := reads[4].GetEom() - s.NotNil(gots4) - - // signer1 - gots5 := reads[5].GetFindCommonOptimismSignature().Locals - s.Len(gots5, 50) - s.Equal(gots5[0].Id, s.sigs[s.signer1][s.contract1][199].ID) - s.Equal(gots5[49].Id, s.sigs[s.signer1][s.contract1][150].ID) - - gots6 := reads[6].GetFindCommonOptimismSignature().Locals - s.Len(gots6, 50) - s.Equal(gots6[0].Id, s.sigs[s.signer1][s.contract1][149].ID) - s.Equal(gots6[49].Id, s.sigs[s.signer1][s.contract1][100].ID) - - gots7 := reads[7].GetFindCommonOptimismSignature().Locals - s.Len(gots7, 50) - s.Equal(gots7[0].Id, s.sigs[s.signer1][s.contract1][99].ID) - s.Equal(gots7[49].Id, s.sigs[s.signer1][s.contract1][50].ID) - - gots8 := reads[8].GetFindCommonOptimismSignature().Locals - s.Len(gots8, 50) - s.Equal(gots8[0].Id, s.sigs[s.signer1][s.contract1][49].ID) - s.Equal(gots8[49].Id, s.sigs[s.signer1][s.contract1][0].ID) - - gots9 := reads[9].GetFindCommonOptimismSignature().Locals - s.Len(gots9, 50) - s.Equal(gots9[0].Id, s.sigs[s.signer1][s.contract0][149].ID) - s.Equal(gots9[49].Id, s.sigs[s.signer1][s.contract0][100].ID) - - gots10 := reads[10].GetFindCommonOptimismSignature().Locals - s.Len(gots10, 50) - s.Equal(gots10[0].Id, s.sigs[s.signer1][s.contract0][99].ID) - s.Equal(gots10[49].Id, s.sigs[s.signer1][s.contract0][50].ID) - - gots11 := reads[11].GetFindCommonOptimismSignature().Locals - s.Len(gots11, 50) - s.Equal(gots11[0].Id, s.sigs[s.signer1][s.contract0][49].ID) - s.Equal(gots11[49].Id, s.sigs[s.signer1][s.contract0][0].ID) - - gots12 := reads[12].GetOptimismSignatureExchange().Requests - s.Len(gots12, 1) - s.Equal(cases[1].want.signer[:], gots12[0].Signer) - func() { - gt := ulid.MustParse(gots12[0].IdAfter) - wt := ulid.MustParse(cases[1].want.idAfter) - s.Equal(wt.Time()-1000, gt.Time()) - }() - - gots13 := reads[13].GetEom() - s.NotNil(gots13) - - // signer2 - gots14 := reads[14].GetOptimismSignatureExchange().Requests - s.Len(gots14, 1) - s.Equal(cases[2].want.signer[:], gots14[0].Signer) - s.Equal("", gots14[0].IdAfter) - - gots15 := reads[15].GetEom() - s.NotNil(gots15) + // succeed to save signature + msg := toProtoBufSig(s.sigs[s.signer0][s.contract0][49]) + saved := s.node2.handleOptimismSignatureExchangeFromPubSub(context.Background(), s.node1.h.ID(), msg) + s.True(saved) + got, err := s.node2.db.OPSignature.FindByID(msg.Id) + s.NoError(err) + s.Equal(msg.Id, got.ID) + + // saving duplicate signature + saved = s.node2.handleOptimismSignatureExchangeFromPubSub(context.Background(), s.node1.h.ID(), msg) + s.False(saved) + + // saving too old signature (no pruneRollupIndexDepth) + msg = toProtoBufSig(s.sigs[s.signer0][s.contract0][0]) + saved = s.node2.handleOptimismSignatureExchangeFromPubSub(context.Background(), s.node1.h.ID(), msg) + s.True(saved) } func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeRequests() { @@ -532,7 +383,8 @@ func (s *NodeTestSuite) TestPublishLatestSignatures() { go func() { defer cancel() - peerID, m, err := subscribe(ctx, s.node2.sub, s.node2.h.ID()) + var m pb.PubSub + peerID, err := subscribe(ctx, s.node2.sub, s.node2.h.ID(), &m) if err != nil { s.Fail(err.Error()) } diff --git a/p2p/p2p.go b/p2p/p2p.go index cfcc449..0381a92 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -7,7 +7,7 @@ import ( "crypto/rand" "errors" "fmt" - "io/ioutil" + "io" "strings" "github.com/ethereum/go-ethereum/log" @@ -347,7 +347,7 @@ func decompress(b []byte) ([]byte, error) { if err = r.Close(); err != nil { return nil, err } - if b, err = ioutil.ReadAll(r); err != nil { + if b, err = io.ReadAll(r); err != nil { return nil, err } return b, nil diff --git a/proto/.clang-format b/proto/.clang-format new file mode 100644 index 0000000..6d41fa6 --- /dev/null +++ b/proto/.clang-format @@ -0,0 +1,6 @@ +--- +Language: Proto +BasedOnStyle: Google +ColumnLimit: 1024 +AlignConsecutiveAssignments: true +AlignConsecutiveDeclarations: true diff --git a/proto/p2p/v1/message.proto b/proto/p2p/v1/message.proto index 1a06b89..f17483e 100644 --- a/proto/p2p/v1/message.proto +++ b/proto/p2p/v1/message.proto @@ -6,53 +6,53 @@ option go_package = "./gen"; message PubSub { oneof body { - bytes misc = 1; + bytes misc = 1; OptimismSignatureExchange optimism_signature_exchange = 2; } } message Stream { oneof body { - bytes misc = 1; - bytes eom = 2; - OptimismSignatureExchange optimism_signature_exchange = 3; + bytes misc = 1; + bytes eom = 2; + OptimismSignatureExchange optimism_signature_exchange = 3; FindCommonOptimismSignature find_common_optimism_signature = 4; } } message OptimismSignature { - string id = 1; - string previous_id = 2; - bytes signer = 3; - bytes contract = 4; + string id = 1; + string previous_id = 2; + bytes signer = 3; + bytes contract = 4; uint64 rollup_index = 5; - bytes rollup_hash = 6; - bool approved = 10; - bytes signature = 11; + bytes rollup_hash = 6; + bool approved = 10; + bytes signature = 11; // These are fields that do not exist in OPStack and will be removed. - uint64 batch_size = 7 [ deprecated = true ]; - uint64 prev_total_elements = 8 [ deprecated = true ]; - bytes extra_data = 9 [ deprecated = true ]; + uint64 batch_size = 7 [deprecated = true]; + uint64 prev_total_elements = 8 [deprecated = true]; + bytes extra_data = 9 [deprecated = true]; } message OptimismSignatureExchange { - repeated OptimismSignature latests = 1; - repeated Request requests = 2; + repeated OptimismSignature latests = 1; + repeated Request requests = 2; repeated OptimismSignature responses = 3; message Request { - bytes signer = 1; + bytes signer = 1; string id_after = 2; } } message FindCommonOptimismSignature { - repeated Local locals = 1; - optional OptimismSignature found = 2; + repeated Local locals = 1; + optional OptimismSignature found = 2; message Local { - string id = 1; + string id = 1; string previous_id = 2; } } diff --git a/submitter/signature_iter.go b/submitter/signature_iter.go index 11ead35..8f82919 100644 --- a/submitter/signature_iter.go +++ b/submitter/signature_iter.go @@ -109,3 +109,8 @@ func (err *StakeAmountShortage) Error() string { return fmt.Sprintf("stake amount shortage, required=%s actual=%s", fromWei(err.required), fromWei(err.actual)) } + +func (err *StakeAmountShortage) Is(target error) bool { + _, ok := target.(*StakeAmountShortage) + return ok +} diff --git a/submitter/submitter.go b/submitter/submitter.go index f0cdeef..c2f6378 100644 --- a/submitter/submitter.go +++ b/submitter/submitter.go @@ -1,6 +1,8 @@ package submitter import ( + "errors" + "fmt" "math/big" "sync" "time" @@ -25,6 +27,11 @@ const ( minTxGas = 24871 // Multicall minimum gas ) +var ( + ErrNoSignatures = errors.New("no signatures") + ErrAlreadyVerified = errors.New("already verified") +) + type Submitter struct { cfg *config.Submitter db *database.Database @@ -46,6 +53,7 @@ func NewSubmitter( } } +// Deprecated: func (w *Submitter) Start(ctx context.Context) { w.log.Info("Submitter started", "interval", w.cfg.Interval, @@ -61,6 +69,82 @@ func (w *Submitter) Start(ctx context.Context) { w.workLoop(ctx) } +func (w *Submitter) startSubmitter(ctx context.Context, contract common.Address, chainId uint64) { + var ( + tick = time.NewTicker(w.cfg.Interval) + duration = w.cfg.Interval + verifiedIndex *uint64 + resetDuration = func(target time.Duration) { + if duration == target { + return + } + duration = target + tick.Reset(duration) + } + ) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + w.log.Info("Submitting work stopped", "chainId", chainId) + return + case <-tick.C: + v, found := w.GetTask(contract) + if !found { + w.log.Info("Exit submitter loop as the task is evicted", "chainId", chainId) + return + } + nextIndex, err := w.work(ctx, v, verifiedIndex) + if errors.Is(err, verse.ErrNotSufficientConfirmations) { + w.log.Info("Not enough confirmations", "nextIndex", nextIndex, "chainId", chainId) + continue + } else if errors.Is(err, ErrNoSignatures) { + w.log.Info("No signatures to submit", "nextIndex", nextIndex, "chainId", chainId) + // Reset the ticker to the original interval + resetDuration(w.cfg.Interval) + continue + } else if errors.Is(err, &StakeAmountShortage{}) { + // Wait until enough signatures are collected + w.log.Info("Waiting for enough signatures", "nextIndex", nextIndex, "chainId", chainId) + // Reset the ticker to shorten the interval to be able to submit verify tx without waiting for the next interval + resetDuration(w.cfg.Interval / 10) + continue + } else if err == nil { + // Finally, succeeded to verify the corresponding rollup index, So move to the next index + verifiedIndex = &nextIndex + w.log.Info("Successfully verified the rollup index", "verifiedIndex", *verifiedIndex, "chainId", chainId) + // Clean up old signatures + if err := w.cleanOldSignatures(v.RollupContract(), *verifiedIndex); err != nil { + w.log.Warn("Failed to delete old signatures", "verifiedIndex", *verifiedIndex, "chainId", chainId, "err", err) + } + // Reset the ticker to the original interval + resetDuration(w.cfg.Interval) + continue + } else if errors.Is(err, ErrAlreadyVerified) { + // Skip if the nextIndex is already verified + w.log.Info("Already verified the rollup index", "nextIndex", nextIndex, "chainId", chainId) + continue + } else { + w.log.Error("Failed to verify the rollup index", "nextIndex", nextIndex, "chainId", chainId, "err", err) + } + } + } +} + +func (w *Submitter) cleanOldSignatures(contract common.Address, verifiedIndex uint64) error { + if verifiedIndex == 0 { + return nil + } + // Just keep the last verified index + deleteIndex := uint64(verifiedIndex - 1) + if _, err := w.db.OPSignature.DeleteOlds(contract, deleteIndex, database.DeleteOldsLimit); err != nil { + return fmt.Errorf("failed to delete old signatures. deleteIndex: %d, : %w", deleteIndex, err) + } + return nil +} + +// Deprecated: func (w *Submitter) workLoop(ctx context.Context) { wg := util.NewWorkerGroup(w.cfg.Concurrency) running := &sync.Map{} @@ -87,7 +171,7 @@ func (w *Submitter) workLoop(ctx context.Context) { if !wg.Has(workerID) { worker := func(ctx context.Context, rname string, data interface{}) { defer running.Delete(rname) - w.work(ctx, data.(verse.TransactableVerse)) + w.work(ctx, data.(verse.TransactableVerse), nil) } wg.AddWorker(ctx, workerID, worker) } @@ -104,29 +188,54 @@ func (w *Submitter) HasTask(contract common.Address) bool { return ok } -func (w *Submitter) AddTask(task verse.TransactableVerse) { - task.Logger(w.log).Info("Add submitter task") +func (w *Submitter) AddTask(ctx context.Context, task verse.TransactableVerse, chainId uint64) { + exists := w.HasTask(task.RollupContract()) w.tasks.Store(task.RollupContract(), task) + if !exists { + // Start submitting loop by each contract. + // 1. Request signatures every interval + // 2. Submit verify tx if enough signatures are collected + go w.startSubmitter(ctx, task.RollupContract(), chainId) + } +} + +func (w *Submitter) GetTask(contract common.Address) (task verse.TransactableVerse, found bool) { + var val any + val, found = w.tasks.Load(contract) + if !found { + return + } + task, found = val.(verse.TransactableVerse) + return } func (w *Submitter) RemoveTask(contract common.Address) { w.tasks.Delete(contract) } -func (w *Submitter) work(ctx context.Context, task verse.TransactableVerse) { +func (w *Submitter) work(ctx context.Context, task verse.TransactableVerse, verifiedIndex *uint64) (uint64, error) { log := task.Logger(w.log) ctx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() - // fetch the next index from hub-layer - nextIndex, err := task.NextIndex(&bind.CallOpts{Context: ctx}) + // Assume the fetched nextIndex is not reorged, as we confirm `w.cfg.Confirmations` blocks + nextIndex, err := task.NextIndexWithConfirm(&bind.CallOpts{Context: ctx}, uint64(w.cfg.Confirmations), false) if err != nil { - log.Error("Failed to get next index", "err", err) - return + return 0, fmt.Errorf("failed to fetch next index: %w", err) } log = log.New("next-index", nextIndex) + if verifiedIndex != nil { + if *verifiedIndex == nextIndex.Uint64() { + // Skip if the nextIndex is already verified + return nextIndex.Uint64(), ErrAlreadyVerified + } else if *verifiedIndex > nextIndex.Uint64() { + // Continue as purhaps reorged + log.Warn("Possible reorged. next index is smaller than the verified index", "verified-index", *verifiedIndex, "next-index", nextIndex.Uint64()) + } + } + iter := &signatureIterator{ db: w.db, stakemanager: w.stakemanager, @@ -140,12 +249,16 @@ func (w *Submitter) work(ctx context.Context, task verse.TransactableVerse) { } else { tx, err = w.sendNormalTx(log, ctx, task, iter) } - if err != nil { - log.Error(err.Error()) - } else if tx != nil { - w.waitForCconfirmation(log.New("tx", tx.Hash()), ctx, task.L1Signer(), tx) + log.Debug(err.Error()) + return nextIndex.Uint64(), fmt.Errorf("failed to send transaction: %w", err) + } + + if err = w.waitForReceipt(ctx, task.L1Signer(), tx); err != nil { + return nextIndex.Uint64(), fmt.Errorf("failed to wait for receipt: %w", err) } + + return nextIndex.Uint64(), nil } func (w *Submitter) sendNormalTx( @@ -160,7 +273,7 @@ func (w *Submitter) sendNormalTx( return nil, err } else if len(rows) == 0 { log.Debug("No signatures") - return nil, nil + return nil, ErrNoSignatures } opts := task.L1Signer().TransactOpts(ctx) @@ -219,15 +332,15 @@ func (w *Submitter) sendMulticallTx( var ( calls []multicall2.Multicall2Call - shortageErr *StakeAmountShortage + errShortage error ) for i := 0; i < w.cfg.BatchSize; i++ { rows, err := iter.next() - if t, ok := err.(*StakeAmountShortage); ok { - shortageErr = t + if _, ok := err.(*StakeAmountShortage); ok { + errShortage = err break } else if err != nil { - log.Error("Failed to find signatures", "err", err) + log.Debug("Failed to find signatures", "err", err) return nil, err } else if len(rows) == 0 { break @@ -261,12 +374,12 @@ func (w *Submitter) sendMulticallTx( } } if len(calls) == 0 { - if shortageErr != nil { - log.Error("No calldata", "err", shortageErr) - } else { - log.Info("No calldata") + if errShortage != nil { + log.Debug("No calldata", "err", errShortage) + return nil, errShortage } - return nil, nil + log.Debug("No calldata") + return nil, ErrNoSignatures } // call estimateGas @@ -315,42 +428,20 @@ func (w *Submitter) sendMulticallTx( return tx, nil } -func (w *Submitter) waitForCconfirmation( - log log.Logger, +func (w *Submitter) waitForReceipt( ctx context.Context, l1Client ethutil.SignableClient, tx *types.Transaction, -) { +) error { // wait for block to be validated receipt, err := bind.WaitMined(ctx, l1Client, tx) if err != nil { - log.Error("Failed to receive receipt", "err", err) - return + return fmt.Errorf("failed to receive receipt. tx: %s, : %w", tx.Hash().Hex(), err) } if receipt.Status != 1 { - log.Error("Transaction reverted") - return - } - - // wait for confirmations - confirmed := map[common.Hash]bool{receipt.BlockHash: true} - for { - remaining := w.cfg.Confirmations - len(confirmed) - if remaining <= 0 { - log.Info("Transaction succeeded") - return - } - - log.Info("Wait for confirmation", "remaining", remaining) - time.Sleep(time.Second) - - h, err := l1Client.HeaderByNumber(ctx, nil) - if err != nil { - log.Error("Failed to fetch block header", "err", err) - continue - } - confirmed[h.Hash()] = true + return fmt.Errorf("transaction reverted. tx: %s", tx.Hash().Hex()) } + return nil } func fromWei(wei *big.Int) *big.Int { diff --git a/submitter/submitter_test.go b/submitter/submitter_test.go index af7ff10..553ecc1 100644 --- a/submitter/submitter_test.go +++ b/submitter/submitter_test.go @@ -44,9 +44,9 @@ func (s *SubmitterTestSuite) SetupTest() { // Setup submitter s.submitter = NewSubmitter(&config.Submitter{ - Interval: 0, + Interval: 100 * time.Millisecond, Concurrency: 0, - Confirmations: 0, + Confirmations: 2, GasMultiplier: 1.0, BatchSize: 20, MaxGas: 500_000_000, @@ -99,9 +99,14 @@ func (s *SubmitterTestSuite) TestSubmit() { s.TSCC.SetNextIndex(s.SignableHub.TransactOpts(ctx), big.NewInt(int64(nextIndex))) s.Hub.Commit() + // Confirm blocks + for i := 0; i < s.submitter.cfg.Confirmations; i++ { + s.Hub.Mining() + } + // submitter do the work. s.submitter.stakemanager.Refresh(ctx) - go s.submitter.work(ctx, s.task) + go s.submitter.work(ctx, s.task, nil) time.Sleep(time.Second / 10) s.Hub.Commit() @@ -155,3 +160,135 @@ func (s *SubmitterTestSuite) TestSubmit() { } } } + +func (s *SubmitterTestSuite) TestStartSubmitter() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + batchIndexes := s.Range(0, 5) + nextIndex := 2 + signers := s.StakeManager.Operators + + // Start submitter by adding task + s.submitter.stakemanager.Refresh(ctx) + s.submitter.AddTask(ctx, s.task, 0) + // Dry run to cover no signature case + // Manually confirmed by checking the logs + time.Sleep(s.submitter.cfg.Interval) + + // Confirm the `stake amount shortage` case is covered + // Manually confirmed by checking the logs + _, event := s.EmitStateBatchAppended(0) + s.DB.OPSignature.Save( + nil, nil, + signers[0], + s.SCCAddr, + event.BatchIndex.Uint64(), + event.BatchRoot, + true, + database.RandSignature(), + ) + // wait for the submitter to work + time.Sleep(s.submitter.cfg.Interval * 2) + + // Confirm succcesfully tx submission case + // set the `SCC.nextIndex` + s.TSCC.SetNextIndex(s.SignableHub.TransactOpts(ctx), big.NewInt(int64(nextIndex))) + s.Hub.Commit() + // Confirm blocks + for i := 0; i < s.submitter.cfg.Confirmations; i++ { + s.Hub.Mining() + } + + // save dummy signatures + events := make([]*tscc.SccStateBatchAppended, len(batchIndexes)) + signatures := make([][]*database.OptimismSignature, len(batchIndexes)) + for i := range batchIndexes { + _, events[i] = s.EmitStateBatchAppended(i) + signatures[i] = make([]*database.OptimismSignature, len(signers)) + + for j := range s.Range(0, len(signers)) { + signatures[i][j], _ = s.DB.OPSignature.Save( + nil, nil, + signers[j], + s.SCCAddr, + events[i].BatchIndex.Uint64(), + events[i].BatchRoot, + i < len(batchIndexes)-1, + database.RandSignature(), + ) + } + + // no more signatures than the minimum stake should be sent + sort.Slice(signatures[i], func(j, h int) bool { + // sort by stake amount + a := s.submitter.stakemanager.StakeBySigner(signatures[i][j].Signer.Address) + b := s.submitter.stakemanager.StakeBySigner(signatures[i][h].Signer.Address) + return a.Cmp(b) == 1 // order by desc + }) + signatures[i] = signatures[i][:6] + sort.Sort(database.OptimismSignatures(signatures[i])) + } + + // submitter do the work. + time.Sleep(s.submitter.cfg.Interval * 3) + s.Hub.Commit() + + // assert multicall transaction + currBlock, _ := s.Hub.Client().BlockByNumber(ctx, nil) + mcallTx := currBlock.Transactions()[0] + sender, _ := s.Hub.TxSender(mcallTx) + s.Equal(s.task.L1Signer().Signer(), sender) + s.Equal(s.MulticallAddr, *mcallTx.To()) + + mcallReceipt, err := s.Hub.TransactionReceipt(context.Background(), mcallTx.Hash()) + s.NoError(err) + s.Len(mcallReceipt.Logs, 6) + s.Equal(s.SCCAddr, mcallReceipt.Logs[0].Address) + s.Equal(s.SCCVAddr, mcallReceipt.Logs[1].Address) + s.Equal(s.SCCAddr, mcallReceipt.Logs[2].Address) + s.Equal(s.SCCVAddr, mcallReceipt.Logs[3].Address) + s.Equal(s.SCCAddr, mcallReceipt.Logs[4].Address) + s.Equal(s.SCCVAddr, mcallReceipt.Logs[5].Address) + + // assert call parameters + length, _ := s.TSCCV.SccAssertLogsLen(&bind.CallOpts{Context: ctx}) + s.Equal(uint64(3), length.Uint64()) + + for i := range batchIndexes { + if i < nextIndex { + got, err := s.TSCCV.AssertLogs( + &bind.CallOpts{Context: ctx}, + big.NewInt(int64(i+nextIndex+1))) + s.ErrorContains(err, "execution reverted") + s.Equal(common.Address{}, got.StateCommitmentChain) + } else { + got, err := s.TSCCV.AssertLogs( + &bind.CallOpts{Context: ctx}, + big.NewInt(int64(i-nextIndex))) + s.NoError(err) + s.Equal(s.SCCAddr, got.StateCommitmentChain) + s.Equal(events[i].BatchIndex.Uint64(), got.BatchHeader.BatchIndex.Uint64()) + s.Equal(events[i].BatchRoot, got.BatchHeader.BatchRoot) + s.Equal(events[i].BatchSize.Uint64(), got.BatchHeader.BatchSize.Uint64()) + s.Equal(events[i].PrevTotalElements.Uint64(), got.BatchHeader.PrevTotalElements.Uint64()) + s.Equal(events[i].ExtraData, got.BatchHeader.ExtraData) + s.Equal(i < len(batchIndexes)-1, got.Approve) + + // no more signatures than the minimum stake should be sent + s.Len(got.Signatures, len(signatures[i])*65) + for j, sig := range signatures[i] { + start := j * 65 + end := start + 65 + s.Equal(sig.Signature[:], got.Signatures[start:end]) + } + } + } + // Wait 1s as the receipt waiting loop 1s interval + time.Sleep(1 * time.Second) + + // Confirm old signatures are cleaned up + deleteIndex := uint64(1) + rows, err := s.DB.OPSignature.Find(nil, nil, &s.SCCAddr, &deleteIndex, 1000, 0) + s.NoError(err) + s.True(len(rows) == 0) +} diff --git a/testhelper/backend/backend.go b/testhelper/backend/backend.go index 68505fc..9a92b12 100644 --- a/testhelper/backend/backend.go +++ b/testhelper/backend/backend.go @@ -37,6 +37,10 @@ type Backend struct { *simulated.Backend } +func (b *Backend) FilterLogsWithRateThottling(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error) { + return b.FilterLogs(ctx, q) +} + func (b *Backend) NewBatchHeaderClient() (ethutil.BatchHeaderClient, error) { return &BatchHeaderClient{b}, nil } diff --git a/verifier/verifier.go b/verifier/verifier.go index 204095f..25bdae9 100644 --- a/verifier/verifier.go +++ b/verifier/verifier.go @@ -3,15 +3,17 @@ package verifier import ( "context" "errors" + "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/oasysgames/oasys-optimism-verifier/config" "github.com/oasysgames/oasys-optimism-verifier/database" "github.com/oasysgames/oasys-optimism-verifier/ethutil" - "github.com/oasysgames/oasys-optimism-verifier/util" "github.com/oasysgames/oasys-optimism-verifier/verse" ) @@ -20,26 +22,29 @@ type Verifier struct { cfg *config.Verifier db *database.Database l1Signer ethutil.SignableClient - topic *util.Topic + p2p P2P tasks sync.Map log log.Logger - wg *util.WorkerGroup running *sync.Map } +type P2P interface { + PublishSignatures(ctx context.Context, sigs []*database.OptimismSignature) +} + // Returns the new verifier. func NewVerifier( cfg *config.Verifier, db *database.Database, + p2p P2P, l1Signer ethutil.SignableClient, ) *Verifier { return &Verifier{ cfg: cfg, db: db, + p2p: p2p, l1Signer: l1Signer, - topic: util.NewTopic(), log: log.New("worker", "verifier"), - wg: util.NewWorkerGroup(cfg.Concurrency), running: &sync.Map{}, } } @@ -57,164 +62,281 @@ func (w *Verifier) HasTask(contract common.Address, l2RPC string) bool { return l2RPC == val.(verse.VerifiableVerse).L2Client().URL() } -func (w *Verifier) AddTask(task verse.VerifiableVerse) { - task.Logger(w.log).Info("Add verifier task") +func (w *Verifier) AddTask(ctx context.Context, task verse.VerifiableVerse, chainId uint64) { + _, exists := w.tasks.Load(task.RollupContract()) w.tasks.Store(task.RollupContract(), task) + if !exists { + // Start the verifier by each contract. + go w.startVerifier(ctx, task.RollupContract(), chainId) + } } -func (w *Verifier) RemoveTask(contract common.Address) { - w.tasks.Delete(contract) +func (w *Verifier) GetTask(contract common.Address) (task verse.VerifiableVerse, found bool) { + var val any + val, found = w.tasks.Load(contract) + if !found { + return + } + task, found = val.(verse.VerifiableVerse) + return } -func (w *Verifier) SubscribeNewSignature(ctx context.Context) *SignatureSubscription { - ch := make(chan *database.OptimismSignature) - cancel := w.topic.Subscribe(ctx, func(ctx context.Context, data interface{}) { - ch <- data.(*database.OptimismSignature) - }) - return &SignatureSubscription{Cancel: cancel, ch: ch} +func (w *Verifier) RemoveTask(contract common.Address) { + w.tasks.Delete(contract) } -func (w *Verifier) Work(ctx context.Context) { - w.tasks.Range(func(key, val interface{}) bool { - workerID := key.(common.Address).Hex() - task := val.(verse.VerifiableVerse) - - // deduplication - if _, ok := w.running.Load(workerID); ok { - return true +func (w *Verifier) startVerifier(ctx context.Context, contract common.Address, chainId uint64) { + var ( + tick = time.NewTicker(w.cfg.Interval) + nextEventFetchStartBlock uint64 + counter int + publishAllUnverifiedSigs = func() bool { + counter++ + // Publish all unverified signatures every 4 times. + return counter%4 == 0 } - w.running.Store(workerID, 1) + ) + defer tick.Stop() - if !w.wg.Has(workerID) { - worker := func(ctx context.Context, rname string, data interface{}) { - defer w.running.Delete(rname) - w.work(ctx, data.(verse.VerifiableVerse)) + for { + select { + case <-ctx.Done(): + w.log.Info("Verifying work stopped", "chainId", chainId) + return + case <-tick.C: + task, found := w.GetTask(contract) + if !found { + w.log.Info("exit verifier as task is evicted", "chainId", chainId) + return + } + if err := w.work(ctx, task, chainId, &nextEventFetchStartBlock, publishAllUnverifiedSigs()); err != nil { + w.log.Error("Failed to run verification", "err", err) } - w.wg.AddWorker(ctx, workerID, worker) } - - w.wg.Enqueue(workerID, task) - return true - }) + } } -func (w *Verifier) work(ctx context.Context, task verse.VerifiableVerse) { - log := task.Logger(w.log) +func (w *Verifier) work(ctx context.Context, task verse.VerifiableVerse, chainId uint64, nextStart *uint64, publishAllUnverifiedSigs bool) error { + // run verification tasks until time out + var ( + cancel context.CancelFunc + ctxOrigin = ctx + setNextStart = func(endBlock uint64) { + // Next start block is the current end block + 1 + endBlock += 1 + *nextStart = endBlock + } + ) + ctx, cancel = context.WithTimeout(ctx, w.cfg.StateCollectTimeout) + defer cancel() - // fetch the next index from hub-layer - nextIndex, err := task.NextIndex(&bind.CallOpts{Context: ctx}) + // Assume the fetched nextIndex is not reorged, as we confirm `w.cfg.Confirmations` blocks + nextIndex, err := task.NextIndexWithConfirm(&bind.CallOpts{Context: ctx}, uint64(w.cfg.Confirmations), true) if err != nil { - log.Error("Failed to call the NextIndex method", "err", err) - return + return fmt.Errorf("failed to call the NextIndex method: %w", err) } - log = log.New("next-index", nextIndex) + log := log.New("chainId", chainId, "next-index", nextIndex) - // verify the signature that match the nextIndex - // and delete after signatures if there is a problem. - // Prevent getting stuck indefinitely in the Verify waiting - // event due to a bug in the signature creation process. - w.deleteInvalidNextIndexSignature(task, nextIndex.Uint64()) + // Clean up old signatures + if err := w.cleanOldSignatures(task.RollupContract(), nextIndex.Uint64()); err != nil { + log.Warn("Failed to delete old signatures", "err", err) + } - // run verification tasks until time out - ctx, cancel := context.WithTimeout(ctx, w.cfg.StateCollectTimeout) - defer cancel() + // determine the start block number + var ( + start uint64 + skipFetchlog bool + oneDayBlocks = uint64(5760) + ) + end, err := w.l1Signer.BlockNumber(ctx) + if err != nil { + return fmt.Errorf("failed to fetch the latest block number: %w", err) + } + if 0 < *nextStart { + start = *nextStart + if start > end { + // Block number is not updated yet. + skipFetchlog = true + } + } else { + offset := w.cfg.StartBlockOffset + if end < offset { + start = 0 + } else { + start = end - offset + } + } + if start < end && oneDayBlocks < end-start { + // If the range is too wide, divide it into one-day blocks. + end = start + oneDayBlocks + } + log = log.New("start", start, "end", end) - for rollupIndex := nextIndex.Uint64(); ; rollupIndex++ { - events, err := task.EventDB().FindForVerification( - w.l1Signer.Signer(), task.RollupContract(), rollupIndex, 1) - if err != nil { - log.Error("Failed to find rollup events", "err", err) - return - } else if len(events) == 0 { - log.Debug("Wait for new rollup event") - return + if skipFetchlog && !publishAllUnverifiedSigs { + log.Info("Skip fetching logs") + setNextStart(end) + return nil + } + + // fetch event logs + var logs []types.Log + if !skipFetchlog { + if logs, err = w.l1Signer.FilterLogsWithRateThottling(ctx, verse.NewEventLogFilter(start, end, []common.Address{task.RollupContract()})); err != nil { + if errors.Is(err, ethutil.ErrTooManyRequests) { + log.Warn("Rate limit exceeded", "err", err) + } + return fmt.Errorf("failed to fetch(start: %d, end: %d) event logs from hub-layer: %w", start, end, err) } + } + log = log.New("count-logs", len(logs)) - log := log.New("rollup-index", events[0].GetRollupIndex()) - log.Info("Start verification") + // Only if succeed to fetch logs, update the next start block. + setNextStart(end) - approved, err := task.Verify(log, ctx, events[0], w.cfg.StateCollectLimit) + // verify the fetched logs + var ( + opsigs = []*database.OptimismSignature{} + // flag at least one log verification failed + atLeastOneLogVerificationFailed bool + ) + for i := range logs { + row, err := w.verifyAndSaveLog(ctx, &logs[i], task, nextIndex.Uint64(), log) if err != nil { - log.Error("Failed to verification", "err", err) - return + if errors.Is(err, context.Canceled) { + // exit if context have been canceled + return err + } else if errors.Is(err, context.DeadlineExceeded) { + // retry if the deadline is exceeded + log.Warn("too much time spent on log iteration", "current-index", i) + cancel() // cancel previous context + ctx, cancel = context.WithTimeout(ctxOrigin, 30*time.Second) // expand the deadline + defer cancel() + row, err = w.verifyAndSaveLog(ctx, &logs[i], task, nextIndex.Uint64(), log) + if err != nil { + // give up if the retry fails + log.Error("Failed to verification", "err", err, "rollup-index", nextIndex.Uint64()) + atLeastOneLogVerificationFailed = true + continue + } + } else { + // continue if other errors + log.Error("Failed to verification", "err", err, "rollup-index", nextIndex.Uint64()) + atLeastOneLogVerificationFailed = true + continue + } } - msg := database.NewMessage(events[0], w.l1Signer.ChainID(), approved) - sig, err := msg.Signature(w.l1Signer.SignData) - if err != nil { - log.Error("Failed to calculate signature", "err", err) - return + if row == nil { + // skip if the row is nil + // - when the event is not a rollup event + // - when the event is already verified + continue } - row, err := w.db.OPSignature.Save( - nil, nil, - w.l1Signer.Signer(), - events[0].GetContract().Address, - events[0].GetRollupIndex(), - events[0].GetRollupHash(), - approved, - sig) + opsigs = append(opsigs, row) + log.Debug("Verification completed", "approved", row.Approved, "rollup-index", row.RollupIndex) + } + if len(opsigs) > 0 { + log.Info("Completed verification of all fetched logs", "count-newsigs", len(opsigs)) + } + + // Will publish all unverified signatures if the flag is set. + if publishAllUnverifiedSigs { + contract := task.RollupContract() + rows, err := w.db.OPSignature.FindUnverifiedBySigner(w.l1Signer.Signer(), nextIndex.Uint64(), &contract, database.FindUnverifiedBySignerLimit) if err != nil { - log.Error("Failed to save signature", "err", err) - return + log.Error("Failed to find unverified signatures", "err", err) } + opsigs = append(opsigs, rows...) + } - w.topic.Publish(row) - log.Info("Verification completed", "approved", approved) + if len(opsigs) > 0 { + // publish all signatures at once + w.p2p.PublishSignatures(ctx, opsigs) + log.Info("Published signatures", "count-sigs", len(opsigs), "first-rollup-index", opsigs[0].RollupIndex, "last-rollup-index", opsigs[len(opsigs)-1].RollupIndex) + } else { + log.Info("No signatures to publish") } -} -func (w *Verifier) deleteInvalidNextIndexSignature(task verse.VerifiableVerse, nextIndex uint64) { - log := task.Logger(w.log).New("next-index", nextIndex) + if atLeastOneLogVerificationFailed { + // Remove task if at least one log verification failed. + // The removed task will be added again in the next verse discovery. + // As the verse discovery interval is 1h, the faild log verification will be retried 1h later. + w.RemoveTask(task.RollupContract()) + } + + return nil +} - signer := w.l1Signer.Signer() - contract := task.RollupContract() - sigs, err := w.db.OPSignature.Find(nil, &signer, &contract, &nextIndex, 1, 0) +func (w *Verifier) verifyAndSaveLog(ctx context.Context, log *types.Log, task verse.VerifiableVerse, nextIndex uint64, logger log.Logger) (*database.OptimismSignature, error) { + event, err := verse.ParseEventLog(log) if err != nil { - log.Error("Unable to find signatures", "err", err) - return - } else if len(sigs) == 0 { - log.Debug("No invalid signature") - return + return nil, fmt.Errorf("failed to parse event log. block: %d contract: %s,: %w", log.BlockNumber, log.Address.Hex(), err) + } + + // parse event log + rollupEvent, ok := event.(*verse.RollupedEvent) + if !ok { + // skip `*verse.DeletedEvent` or `*verse.VerifiedEvent` + return nil, nil } - event, err := task.EventDB().FindByRollupIndex(contract, nextIndex) + // cast to database event + contract, err := w.db.OPContract.FindOrCreate(log.Address) if err != nil { - if errors.Is(err, database.ErrNotFound) { - log.Debug("No rollup event") - } else { - log.Error("Unable to find rollup event", "err", err) - } - return + return nil, fmt.Errorf("failed to find or create contract(%s): %w", log.Address.Hex(), err) + } + dbEvent, err := rollupEvent.CastToDatabaseOPEvent(contract) + if err != nil { + return nil, fmt.Errorf("failed to cast to database. rollup-index: %d, event: %w", dbEvent.GetRollupIndex(), err) } - err = database.NewMessage(event, w.l1Signer.ChainID(), true). - VerifySigner(sigs[0].Signature[:], signer) - if _, ok := err.(*ethutil.SignerMismatchError); ok { - // possible reject signature - err = database.NewMessage(event, w.l1Signer.ChainID(), false). - VerifySigner(sigs[0].Signature[:], signer) + if dbEvent.GetRollupIndex() < nextIndex { + // skip old events + return nil, nil } - if err == nil { - log.Debug("No invalid signature") - return + + approved, err := task.Verify(logger, ctx, dbEvent, w.cfg.StateCollectLimit) + if err != nil { + return nil, fmt.Errorf("failed to verification. rollup-index: %d, : %w", dbEvent.GetRollupIndex(), err) } - log.Warn("Found invalid signature", "signature", sigs[0].Signature.Hex()) + msg := database.NewMessage(dbEvent, w.l1Signer.ChainID(), approved) + sig, err := msg.Signature(w.l1Signer.SignData) + if err != nil { + return nil, fmt.Errorf("failed to calculate signature. rollup-index: %d, : %w", dbEvent.GetRollupIndex(), err) + } - rows, err := w.db.OPSignature.Deletes(signer, contract, nextIndex) + row, err := w.db.OPSignature.Save( + nil, nil, + w.l1Signer.Signer(), + dbEvent.GetContract().Address, + dbEvent.GetRollupIndex(), + dbEvent.GetRollupHash(), + approved, + sig) if err != nil { - log.Error("Failed to delete invalid signatures", "err", err) - } else { - log.Warn("Deleted invalid signatures", "rows", rows) + return nil, fmt.Errorf("failed to save signature. rollup-index: %d, : %w", dbEvent.GetRollupIndex(), err) } -} -type SignatureSubscription struct { - Cancel context.CancelFunc - ch chan *database.OptimismSignature + return row, nil } -func (s *SignatureSubscription) Next() <-chan *database.OptimismSignature { - return s.ch +func (w *Verifier) cleanOldSignatures(contract common.Address, nextIndex uint64) error { + var verifiedIndex uint64 + if nextIndex == 0 { + verifiedIndex = 0 + } else { + verifiedIndex = nextIndex - 1 + } + + // Keep the last 3 signatures. + if verifiedIndex < 3 { + return nil + } + deleteIndex := uint64(verifiedIndex - 3) + if _, err := w.db.OPSignature.DeleteOlds(contract, deleteIndex, database.DeleteOldsLimit); err != nil { + return fmt.Errorf("failed to delete old signatures. deleteIndex: %d, : %w", deleteIndex, err) + } + return nil } diff --git a/verifier/verifier_test.go b/verifier/verifier_test.go index 63c1508..adab73a 100644 --- a/verifier/verifier_test.go +++ b/verifier/verifier_test.go @@ -10,9 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/oasysgames/oasys-optimism-verifier/config" - "github.com/oasysgames/oasys-optimism-verifier/contract/scc" "github.com/oasysgames/oasys-optimism-verifier/database" - "github.com/oasysgames/oasys-optimism-verifier/util" "github.com/oasysgames/oasys-optimism-verifier/verse" "github.com/stretchr/testify/suite" @@ -24,108 +22,46 @@ type VerifierTestSuite struct { verifier *Verifier task verse.VerifiableVerse - stopWork context.CancelFunc + sigsCh chan []*database.OptimismSignature } func TestVerifier(t *testing.T) { suite.Run(t, new(VerifierTestSuite)) } +type MockP2P struct { + sigsCh chan []*database.OptimismSignature +} + +func (m *MockP2P) PublishSignatures(ctx context.Context, sigs []*database.OptimismSignature) { + m.sigsCh <- sigs +} + func (s *VerifierTestSuite) SetupTest() { s.BackendSuite.SetupTest() + s.sigsCh = make(chan []*database.OptimismSignature, 4) s.verifier = NewVerifier(&config.Verifier{ Interval: 50 * time.Millisecond, Concurrency: 10, StateCollectLimit: 3, StateCollectTimeout: time.Second, - }, s.DB, s.SignableHub) + Confirmations: 2, + StartBlockOffset: 100, + }, s.DB, &MockP2P{sigsCh: s.sigsCh}, s.SignableHub) s.task = verse.NewOPLegacy(s.DB, s.Hub, s.SCCAddr).WithVerifiable(s.Verse) - s.verifier.AddTask(s.task) - - ctx := context.Background() - ctx, s.stopWork = context.WithCancel(ctx) - - go func() { - tick := time.NewTicker(s.verifier.cfg.Interval) - defer tick.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - s.verifier.Work(ctx) - } - } - }() -} - -func (s *VerifierTestSuite) TestVerify() { - defer s.stopWork() - - cases := []struct { - batchRoot string - wantSignature string - wantApproved bool - }{ - { - "0x9ad778e5c9936769419b31119fb0bbc9d7b66c88ee10f0986ce46a6d302792b7", - "0xa01df213459635dcd05e84b1828ba26b9469d52bf2860698437ac466d0e9afba5bda3efa378d9c36ca9eb7f4ce87f2aad73deeea357d7dba141d3469d095bb8c1c", - true, - }, - { - "0x3b6af01f7666ff6990d8ccaa995f6efdae442ad24b5a354a70029ed8a2713357", - "0x21c90d613eb6a8fbb43d858de6c6aa8c569e0c04e0e26af73f0a1043e533f26631e76beda58d5084bd93a5159a25cd6c80d5396916d1247644e63422e7cef85c1c", - false, - }, - } - - batchSize := 10 - - // send transactions to verse-layer - s.sendVerseTransactions(10 * len(cases)) - - // emit and collect `StateBatchAppended` events - for batchIndex, tt := range cases { - _, err := s.task.EventDB().Save( - s.task.RollupContract(), - &scc.SccStateBatchAppended{ - BatchIndex: big.NewInt(int64(batchIndex)), - BatchRoot: util.BytesToBytes32(common.FromHex(tt.batchRoot)), - BatchSize: big.NewInt(int64(batchSize)), - PrevTotalElements: big.NewInt(int64(batchSize * batchIndex)), - ExtraData: []byte(fmt.Sprintf("test-%d", batchSize))}) - s.NoError(err) - } - - // subscribe new signature - subscribes := s.waitPublished(len(cases)) - - // assert - for batchIndex, tt := range cases { - bi64 := uint64(batchIndex) - got0, _ := s.DB.OPSignature.Find(nil, nil, nil, &bi64, 1, 0) - got1 := subscribes[batchIndex] - - s.Equal(tt.batchRoot, got0[0].RollupHash.Hex()) - s.Equal(tt.batchRoot, got1.RollupHash.Hex()) - - s.Equal(tt.wantApproved, got0[0].Approved) - s.Equal(tt.wantApproved, got1.Approved) - - s.Equal(tt.wantSignature, got0[0].Signature.Hex()) - s.Equal(tt.wantSignature, got1.Signature.Hex()) - } + s.verifier.AddTask(context.Background(), s.task, 0) } -func (s *VerifierTestSuite) TestDeleteInvalidSignature() { - defer s.stopWork() +func (s *VerifierTestSuite) TestStartVerifier() { + // start verifier by adding task + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.verifier.AddTask(ctx, s.task, 0) - batches := s.Range(0, 10) + batches := s.Range(0, 3) batchSize := 5 - invalidBatch := 6 // send transactions to verse-layer merkleRoots := make([][32]byte, len(batches)) @@ -138,76 +74,54 @@ func (s *VerifierTestSuite) TestDeleteInvalidSignature() { merkleRoots[batchIdx] = merkleRoot } } - - createds := make([]*database.OptimismSignature, len(batches)) for batchIdx, merkleRoot := range merkleRoots { - // save verify waiting state - _, err := s.task.EventDB().Save( - s.task.RollupContract(), - &scc.SccStateBatchAppended{ - BatchIndex: big.NewInt(int64(batchIdx)), - BatchRoot: merkleRoot, - BatchSize: big.NewInt(int64(batchSize)), - PrevTotalElements: big.NewInt(int64(batchIdx * batchSize)), - ExtraData: []byte(fmt.Sprintf("test-%d", batchIdx))}) - s.NoError(err) - - // run verification - sigs := s.waitPublished(1) - s.Len(sigs, 1) - s.Equal(merkleRoot[:], sigs[0].RollupHash[:]) - createds[batchIdx] = sigs[0] - } - - // increment `nextIndex` - for batchIdx := range s.Range(0, invalidBatch) { - s.TSCC.EmitStateBatchVerified( + _, err := s.TSCC.EmitStateBatchAppended( s.SignableHub.TransactOpts(context.Background()), big.NewInt(int64(batchIdx)), - merkleRoots[batchIdx], + merkleRoot, big.NewInt(int64(batchSize)), + big.NewInt(int64(batchIdx*batchSize)), []byte(fmt.Sprintf("test-%d", batchIdx)), ) - s.SignableHub.Commit() + s.NoError(err) + s.Mining() } - // run `deleteInvalidSignature`, but nothing happens - s.Len(s.waitPublished(1), 0) - - signer := s.SignableHub.Signer() - contract := s.task.RollupContract() - gots, _ := s.DB.OPSignature.Find(nil, &signer, &contract, nil, 100, 0) - s.Equal(len(batches), len(gots)) + // run verification + sigs := <-s.sigsCh + s.Len(sigs, len(merkleRoots)) - for batchIdx := range batches { - // should not be re-created - s.Equal(createds[batchIdx].ID, gots[batchIdx].ID) - s.Equal(createds[batchIdx].Signature, gots[batchIdx].Signature) - } + // assert + for batchIndex, sig := range sigs { + bi64 := uint64(batchIndex) + got0, _ := s.DB.OPSignature.Find(nil, nil, nil, &bi64, 1, 0) - // update to invalid signature - s.DB.OPSignature.Save( - &createds[invalidBatch].ID, - &createds[invalidBatch].PreviousID, - createds[invalidBatch].Signer.Address, - createds[invalidBatch].Contract.Address, - createds[invalidBatch].RollupIndex, - createds[invalidBatch].RollupHash, - createds[invalidBatch].Approved, - database.RandSignature()) + s.Equal(sig.RollupHash.Hex(), got0[0].RollupHash.Hex()) + s.Equal(merkleRoots[batchIndex][:], got0[0].RollupHash[:]) - // run `deleteInvalidSignature` - s.Len(s.waitPublished(len(batches)-invalidBatch), len(batches)-invalidBatch) + s.Equal(sig.Approved, got0[0].Approved) + s.Equal(true, got0[0].Approved) - gots, _ = s.DB.OPSignature.Find(nil, &signer, &contract, nil, 100, 0) - s.Equal(len(batches), len(gots)) + s.Equal(sig.Signature.Hex(), got0[0].Signature.Hex()) + } - for batchIdx := range batches { - if batchIdx < invalidBatch { - s.Equal(createds[batchIdx].ID, gots[batchIdx].ID) - } else { - // should be re-created - s.NotEqual(createds[batchIdx].ID, gots[batchIdx].ID) + // increment `nextIndex` + nextIndex := 1 + s.TSCC.SetNextIndex(s.SignableHub.TransactOpts(ctx), big.NewInt(int64(nextIndex))) + s.Hub.Commit() + + // confirm blocks + s.Hub.Commit() + s.Hub.Commit() + + // no prior signature than verified index should be sent + for i := 0; i < 3; i++ { + sigs = <-s.sigsCh + if len(sigs) == len(batches) { + // signatures before incrementing `nextIndex` are remained + continue + } + for _, sig := range sigs { + s.True(sig.RollupIndex >= uint64(nextIndex)) } - s.Equal(createds[batchIdx].Signature, gots[batchIdx].Signature) } } @@ -237,31 +151,3 @@ func (s *VerifierTestSuite) sendVerseTransactions(count int) (headers []*types.H } return headers } - -func (s *VerifierTestSuite) waitPublished(count int) []*database.OptimismSignature { - ctx, candel := context.WithTimeout(context.Background(), time.Second/3) - defer candel() - - sub := s.verifier.SubscribeNewSignature(ctx) - defer sub.Cancel() - - var published []*database.OptimismSignature - go func() { - defer candel() - - for { - select { - case <-ctx.Done(): - return - case sig := <-sub.Next(): - published = append(published, sig) - if len(published) == count { - return - } - } - } - }() - - <-ctx.Done() - return published -} diff --git a/verse/event_log.go b/verse/event_log.go index c0be875..a4e23bf 100644 --- a/verse/event_log.go +++ b/verse/event_log.go @@ -136,12 +136,15 @@ func init() { } } -func NewEventLogFilter(fromBlock, toBlock uint64) ethereum.FilterQuery { +func NewEventLogFilter(fromBlock, toBlock uint64, addresss []common.Address) ethereum.FilterQuery { query := ethereum.FilterQuery{ Topics: [][]common.Hash{make([]common.Hash, len(eventTopics))}, FromBlock: new(big.Int).SetUint64(fromBlock), ToBlock: new(big.Int).SetUint64(toBlock), } + if len(addresss) != 0 { + query.Addresses = addresss + } copy(query.Topics[0], eventTopics) return query } @@ -192,3 +195,22 @@ LOOP: Parsed: rawEvent, }), nil } + +func (e *RollupedEvent) CastToDatabaseOPEvent(contract *database.OptimismContract) (dbEvent database.OPEvent, err error) { + if e.Parsed == nil { + return nil, fmt.Errorf("parsed event is nil, event: %v", e) + } + switch t := e.Parsed.(type) { + case *scc.SccStateBatchAppended: + var model database.OptimismState + err = model.AssignEvent(contract, e.Parsed) + dbEvent = &model + case *l2oo.OasysL2OutputOracleOutputProposed: + var model database.OpstackProposal + err = model.AssignEvent(contract, e.Parsed) + dbEvent = &model + default: + err = fmt.Errorf("unsupported event type: %T", t) + } + return +} diff --git a/verse/event_log_test.go b/verse/event_log_test.go index 91df9c4..4e0e7b0 100644 --- a/verse/event_log_test.go +++ b/verse/event_log_test.go @@ -52,7 +52,7 @@ func (s *EventLogTestSuite) TestNewEventLogFilter() { wants[i] = receipt.Logs[0] } - gots, _ := s.Hub.FilterLogs(context.Background(), NewEventLogFilter(0, 100)) + gots, _ := s.Hub.FilterLogs(context.Background(), NewEventLogFilter(0, 100, nil)) s.Len(gots, len(wants)) for i, want := range wants { s.Equal(*want, gots[i]) diff --git a/verse/oplegacy.go b/verse/oplegacy.go index 49c534f..544a426 100644 --- a/verse/oplegacy.go +++ b/verse/oplegacy.go @@ -55,6 +55,24 @@ func (op *oplegacy) NextIndex(opts *bind.CallOpts) (*big.Int, error) { return sc.NextIndex(opts) } +func (op *oplegacy) NextIndexWithConfirm(opts *bind.CallOpts, confirmation uint64, waits bool) (*big.Int, error) { + var err error + if opts.BlockNumber, err = decideConfirmationBlockNumber(opts, confirmation, op.L1Client()); err != nil { + if errors.Is(err, ErrNotSufficientConfirmations) && waits { + // wait for the next block, then retry + time.Sleep(10 * time.Second) + return op.NextIndexWithConfirm(opts, confirmation, waits) + } + return nil, err + } + + sc, err := scc.NewScc(op.RollupContract(), op.L1Client()) + if err != nil { + return nil, err + } + return sc.NextIndex(opts) +} + func (op *oplegacy) WithVerifiable(l2Client ethutil.Client) VerifiableVerse { return &verifiableOPLegacy{&verifiableVerse{op, l2Client}} } @@ -114,7 +132,7 @@ func (op *verifiableOPLegacy) Verify( } } - log.Info("Collected L2 states", "elapsed", time.Since(st)) + log.Debug("Collected L2 states", "elapsed", time.Since(st)) // calc and compare state root merkleRoot, err := CalcMerkleRoot(elements) diff --git a/verse/opstack.go b/verse/opstack.go index 5a3e893..46e9f62 100644 --- a/verse/opstack.go +++ b/verse/opstack.go @@ -5,6 +5,7 @@ import ( "context" "errors" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -52,6 +53,24 @@ func (op *opstack) NextIndex(opts *bind.CallOpts) (*big.Int, error) { return lo.NextVerifyIndex(opts) } +func (op *opstack) NextIndexWithConfirm(opts *bind.CallOpts, confirmation uint64, waits bool) (*big.Int, error) { + var err error + if opts.BlockNumber, err = decideConfirmationBlockNumber(opts, confirmation, op.L1Client()); err != nil { + if errors.Is(err, ErrNotSufficientConfirmations) && waits { + // wait for the next block, then retry + time.Sleep(10 * time.Second) + return op.NextIndexWithConfirm(opts, confirmation, waits) + } + return nil, err + } + + lo, err := l2oo.NewOasysL2OutputOracle(op.RollupContract(), op.L1Client()) + if err != nil { + return nil, err + } + return lo.NextVerifyIndex(opts) +} + func (op *opstack) WithVerifiable(l2Client ethutil.Client) VerifiableVerse { return &verifiableOPStack{&verifiableVerse{op, l2Client}} } diff --git a/verse/verse.go b/verse/verse.go index e0c051a..0ab858a 100644 --- a/verse/verse.go +++ b/verse/verse.go @@ -2,6 +2,8 @@ package verse import ( "context" + "errors" + "fmt" "math/big" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -12,6 +14,10 @@ import ( "github.com/oasysgames/oasys-optimism-verifier/ethutil" ) +var ( + ErrNotSufficientConfirmations = errors.New("not sufficient confirmations") +) + type Verse interface { Logger(base log.Logger) log.Logger DB() *database.Database @@ -19,6 +25,7 @@ type Verse interface { RollupContract() common.Address EventDB() database.IOPEventDB NextIndex(opts *bind.CallOpts) (*big.Int, error) + NextIndexWithConfirm(opts *bind.CallOpts, confirmation uint64, waits bool) (*big.Int, error) WithVerifiable(l2Client ethutil.Client) VerifiableVerse WithTransactable(l1Signer ethutil.SignableClient, verifyContract common.Address) TransactableVerse } @@ -73,6 +80,9 @@ func (v *verse) L1Client() ethutil.Client { return v.l1Client func (v *verse) RollupContract() common.Address { return v.rollupContract } func (v *verse) EventDB() database.IOPEventDB { panic("not implemented") } func (v *verse) NextIndex(*bind.CallOpts) (*big.Int, error) { panic("not implemented") } +func (v *verse) NextIndexWithConfirm(opts *bind.CallOpts, confirmation uint64, waits bool) (*big.Int, error) { + panic("not implemented") +} func (v *verse) WithVerifiable(l2Client ethutil.Client) VerifiableVerse { return &verifiableVerse{v, l2Client} } @@ -123,3 +133,22 @@ func newVerseFactory(conv func(Verse) Verse) VerseFactory { }) } } + +func decideConfirmationBlockNumber(opts *bind.CallOpts, confirmation uint64, client ethutil.Client) (*big.Int, error) { + if opts.BlockNumber != nil { + return nil, errors.New("block number is overridden. should be nil") + } + if 16 < confirmation { + return nil, errors.New("confirmation is too large") + } + // get the latest block number + latest, err := client.BlockNumber(opts.Context) + if err != nil { + return nil, fmt.Errorf("failed to fetch latest block height: %w", err) + } + if latest < confirmation { + return nil, fmt.Errorf("not enough blocks to confirm: %d < %d, %w", latest, confirmation, ErrNotSufficientConfirmations) + } + confirmedNumber := latest - confirmation + return new(big.Int).SetUint64(confirmedNumber), nil +} diff --git a/version/version.go b/version/version.go index e53b04d..e06c0c3 100644 --- a/version/version.go +++ b/version/version.go @@ -4,7 +4,7 @@ import "fmt" const ( Major = 1 - Minor = 1 + Minor = 2 Patch = 0 Meta = "" )