From 72143b45c351781cf998989f58e9ea6e1c0a9bd5 Mon Sep 17 00:00:00 2001 From: tak Date: Fri, 19 Jul 2024 22:11:11 +0900 Subject: [PATCH] verifier: expand retry limit (#34) * verifier: expand retry limit * add comment * increment version * include remaing time to log * Improved retry backoff (#35) * Deleted unused code * Improved retry backoff * comment case of overflow --------- Co-authored-by: ironbeer <7997273+ironbeer@users.noreply.github.com> --- cmd/config_loader.go | 23 +++- cmd/config_loader_test.go | 12 +- collector/block_collector.go | 191 --------------------------- collector/block_collector_test.go | 182 -------------------------- collector/event_collector.go | 201 ---------------------------- collector/event_collector_test.go | 210 ------------------------------ config/config.go | 24 ++-- config/config_test.go | 18 +-- verifier/verifier.go | 107 +++++++++------ verifier/verifier_test.go | 38 +++++- version/version.go | 2 +- 11 files changed, 144 insertions(+), 864 deletions(-) delete mode 100644 collector/block_collector.go delete mode 100644 collector/block_collector_test.go delete mode 100644 collector/event_collector.go delete mode 100644 collector/event_collector_test.go diff --git a/cmd/config_loader.go b/cmd/config_loader.go index 5982ceb..6c344f9 100644 --- a/cmd/config_loader.go +++ b/cmd/config_loader.go @@ -3,6 +3,7 @@ package cmd import ( "os" "strings" + "time" "github.com/oasysgames/oasys-optimism-verifier/config" "github.com/spf13/cobra" @@ -27,8 +28,10 @@ type configLoader struct { scc, l2oo string } verifier struct { - use bool - wallet config.Wallet + use bool + wallet config.Wallet + MaxRetryBackoff time.Duration + RetryTimeout time.Duration } submitter struct { use bool @@ -101,10 +104,12 @@ func mustNewConfigLoader(cmd *cobra.Command) *configLoader { { name: "verifier", flags: map[string]func(name string){ - "": argConfigFlag(&opts.verifier.use, f.BoolVar, "Enable the verifier feature"), - "wallet.address": argConfigFlag(&opts.verifier.wallet.Address, f.StringVar, "Address of the verifier wallet"), - "wallet.password": argConfigFlag(&opts.verifier.wallet.Password, f.StringVar, "Password file of the verifier wallet"), - "wallet.plain": argConfigFlag(&opts.verifier.wallet.Plain, f.StringVar, "Plaintext private key of the verifier wallet"), + "": argConfigFlag(&opts.verifier.use, f.BoolVar, "Enable the verifier feature"), + "wallet.address": argConfigFlag(&opts.verifier.wallet.Address, f.StringVar, "Address of the verifier wallet"), + "wallet.password": argConfigFlag(&opts.verifier.wallet.Password, f.StringVar, "Password file of the verifier wallet"), + "wallet.plain": argConfigFlag(&opts.verifier.wallet.Plain, f.StringVar, "Plaintext private key of the verifier wallet"), + "max-retry-backoff": argConfigFlag(&opts.verifier.MaxRetryBackoff, f.DurationVar, "Maximum exponential backoff time for retries"), + "retry-timeout": argConfigFlag(&opts.verifier.RetryTimeout, f.DurationVar, "Maximum duration to attempt retries"), }, }, { @@ -177,6 +182,12 @@ func (opts *configLoader) load(enableStrictValidation bool) (*config.Config, err opts.cfg.Wallets["verifier"] = &opts.verifier.wallet opts.cfg.Verifier.Enable = true opts.cfg.Verifier.Wallet = "verifier" + if opts.verifier.MaxRetryBackoff > 0 { + opts.cfg.Verifier.MaxRetryBackoff = opts.verifier.MaxRetryBackoff + } + if opts.verifier.RetryTimeout > 0 { + opts.cfg.Verifier.RetryTimeout = opts.verifier.RetryTimeout + } } if opts.submitter.use { diff --git a/cmd/config_loader_test.go b/cmd/config_loader_test.go index 06b5c3d..d2a00db 100644 --- a/cmd/config_loader_test.go +++ b/cmd/config_loader_test.go @@ -92,6 +92,8 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() { verifier: enable: true wallet: verifier + max_retry_backoff: 1m + retry_timeout: 2m submitter: enable: true @@ -186,6 +188,8 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigWithVerifierArgs() { "--config.verifier.wallet.address", "0x08E9441C28c9f34dcB1fa06f773a0450f15B6F43", "--config.verifier.wallet.password", s.passwdFile1.Name(), "--config.verifier.wallet.plain", "0x5ea366a14e0bd46e7da7e894c8cc896ebecd1f6452b674aaa41688878f45ff73", + "--config.verifier.max-retry-backoff", "1m", + "--config.verifier.retry-timeout", "2m", }) s.Equal(want, got) @@ -331,14 +335,12 @@ func (s *ConfigLoaderTestSuite) configWithMinCliArgs() *config.Config { Enable: false, Wallet: "", Interval: defaults["verifier.interval"].(time.Duration), - Concurrency: defaults["verifier.concurrency"].(int), - BlockLimit: defaults["verifier.block_limit"].(int), - EventFilterLimit: defaults["verifier.event_filter_limit"].(int), StateCollectLimit: defaults["verifier.state_collect_limit"].(int), StateCollectTimeout: defaults["verifier.state_collect_timeout"].(time.Duration), - OptimizeInterval: defaults["verifier.db_optimize_interval"].(time.Duration), Confirmations: defaults["verifier.confirmations"].(int), StartBlockOffset: defaults["verifier.start_block_offset"].(uint64), + MaxRetryBackoff: defaults["verifier.max_retry_backoff"].(time.Duration), + RetryTimeout: defaults["verifier.retry_timeout"].(time.Duration), }, Submitter: config.Submitter{ Enable: false, @@ -417,6 +419,8 @@ func (s *ConfigLoaderTestSuite) applyVerifierCliArgs(c *config.Config) { } c.Verifier.Enable = true c.Verifier.Wallet = "verifier" + c.Verifier.MaxRetryBackoff = time.Minute + c.Verifier.RetryTimeout = time.Minute * 2 } func (s *ConfigLoaderTestSuite) applySubmitterCliArgs(c *config.Config) { diff --git a/collector/block_collector.go b/collector/block_collector.go deleted file mode 100644 index 44a0025..0000000 --- a/collector/block_collector.go +++ /dev/null @@ -1,191 +0,0 @@ -package collector - -import ( - "context" - "errors" - "math/big" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/oasysgames/oasys-optimism-verifier/config" - "github.com/oasysgames/oasys-optimism-verifier/database" - "github.com/oasysgames/oasys-optimism-verifier/ethutil" -) - -// Worker to collect new blocks. -type BlockCollector struct { - cfg *config.Verifier - db *database.Database - hub ethutil.Client - - log log.Logger -} - -// Deprecated: -func NewBlockCollector( - cfg *config.Verifier, - db *database.Database, - hub ethutil.Client, -) *BlockCollector { - return &BlockCollector{ - cfg: cfg, - db: db, - hub: hub, - log: log.New("worker", "block-collector"), - } -} - -func (w *BlockCollector) Start( - ctx context.Context, -) { - w.log.Info("Block collector started", "interval", w.cfg.Interval, "block-limit", w.cfg.BlockLimit) - - ticker := time.NewTicker(w.cfg.Interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - w.log.Info("Block collector stopped") - return - case <-ticker.C: - w.Work(ctx) - } - } -} - -func (w *BlockCollector) Work(ctx context.Context) { - // get local highest block - start := uint64(1) - if highest, err := w.db.Block.FindHighest(); err == nil { - start = highest.Number + 1 - } else if !errors.Is(err, database.ErrNotFound) { - w.log.Error("Failed to find highest block", "err", err) - return - } - - // get on-chain highest block - latestHeader, err := w.hub.HeaderByNumber(ctx, nil) - if err != nil { - w.log.Error("Failed to fetch the latest block header", "err", err) - return - } - - end := latestHeader.Number.Uint64() - if end < start { - w.log.Debug("Wait for new block", "number", start) - return - } - - if end == start { - w.log.Info("New block header is corrected", "number", start, "hash", latestHeader.Hash()) - w.saveHeaders(ctx, []*types.Header{latestHeader}) - } else { - w.log.Info("Will collect new block headers", "start", start, "end", end) - w.batchCollect(ctx, start, end) - } -} - -func (w *BlockCollector) saveHeaders(ctx context.Context, headers []*types.Header) error { - if len(headers) == 0 { - return nil - } - - if deleted, err := w.deleteReorganizedBlocks(ctx, headers[0]); err != nil { - w.log.Error("Failed to delete reorganized blocks", "err", err) - return err - } else if deleted { - return errors.New("reorganized") - } - - var prev *types.Header - for _, h := range headers { - if prev != nil && prev.Hash() != h.ParentHash { - return errors.New("block order is wrong") - } - if err := w.db.Block.Save(h.Number.Uint64(), h.Hash()); err != nil { - w.log.Error("Failed to save new block", "err", err) - return err - } - prev = h - } - - return nil -} - -func (w *BlockCollector) batchCollect(ctx context.Context, start, end uint64) { - bc, err := w.hub.NewBatchHeaderClient() - if err != nil { - w.log.Error("Failed to construct batch client", "err", err) - return - } - - bi := ethutil.NewBatchHeaderIterator(bc, start, end, w.cfg.BlockLimit) - defer bi.Close() - - for { - st := time.Now() - headers, err := bi.Next(ctx) - if err != nil { - w.log.Error("Failed to collect block headers from hub-layer", "err", err) - return - } else if len(headers) == 0 { - return - } - - if err = w.saveHeaders(ctx, headers); err != nil { - return - } - - size := len(headers) - w.log.Debug( - "New blocks", - "len", size, "elapsed", time.Since(st), - "start", headers[0].Number, "end", headers[size-1].Number) - } -} - -func (w *BlockCollector) deleteReorganizedBlocks( - ctx context.Context, - comp *types.Header, -) (bool, error) { - // check if reorganization has occurred - highest, err := w.db.Block.FindHighest() - if (err == nil && highest.Hash == comp.ParentHash) || errors.Is(err, database.ErrNotFound) { - return false, nil - } else if err != nil { - return false, err - } - - w.log.Info("Reorganization detected", "number", comp.Number, "hash", comp.Hash()) - - var deletesAfter uint64 - for number := highest.Number; number > 0; number-- { - local, err := w.db.Block.Find(number) - if err != nil && !errors.Is(err, database.ErrNotFound) { - return false, err - } - - remote, err := w.hub.HeaderByNumber(ctx, new(big.Int).SetUint64(number)) - if err != nil { - return false, err - } - if local.Hash == remote.Hash() { - w.log.Info("Reached reorganization starting block", - "number", number, "hash", remote.Hash().String()) - break - } - - w.log.Info("Found reorganized block", - "number", number, "local-hash", local.Hash, "remote-hash", remote.Hash()) - deletesAfter = number - } - - if err := w.db.Block.Deletes(deletesAfter); err != nil { - return false, err - } - - w.log.Info("Deleted reorganized block", "from", deletesAfter, "to", highest.Number) - return true, nil -} diff --git a/collector/block_collector_test.go b/collector/block_collector_test.go deleted file mode 100644 index b60000f..0000000 --- a/collector/block_collector_test.go +++ /dev/null @@ -1,182 +0,0 @@ -package collector - -import ( - "context" - "math/big" - "sync" - "testing" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/oasysgames/oasys-optimism-verifier/config" - "github.com/oasysgames/oasys-optimism-verifier/database" - "github.com/oasysgames/oasys-optimism-verifier/ethutil" - "github.com/oasysgames/oasys-optimism-verifier/testhelper" - "github.com/oasysgames/oasys-optimism-verifier/testhelper/backend" - "github.com/stretchr/testify/suite" -) - -type BlockCollectorTestSuite struct { - testhelper.Suite - - db *database.Database - backend *backend.Backend -} - -func TestBlockCollector(t *testing.T) { - suite.Run(t, new(BlockCollectorTestSuite)) -} - -func (s *BlockCollectorTestSuite) SetupTest() { - s.db, _ = database.NewDatabase(&config.Database{Path: ":memory:"}) - s.backend = backend.NewBackend(nil, 0) -} - -func (s *BlockCollectorTestSuite) TestCollectNewBlocks() { - worker := NewBlockCollector(&config.Verifier{Interval: 0, BlockLimit: 2}, s.db, s.backend) - - // mining 5 blocks - var wants []*types.Header - for range s.Range(0, 5) { - wants = append(wants, s.backend.Mining()) - } - - // collect blocks - worker.Work(context.Background()) - - // assert - gots, _ := s.db.Block.FindUncollecteds(20) - s.Len(gots, len(wants)) - for i, want := range wants { - s.Equal(want.Number.Uint64(), gots[i].Number) - s.Equal(want.Hash(), gots[i].Hash) - } -} - -func (s *BlockCollectorTestSuite) TestHandleReorganization() { - // mining 10 blocks - var mined []*types.Header - for range s.Range(0, 10) { - mined = append(mined, s.backend.Mining()) - } - - // simulate chain reorganization - reorgedBlock := uint64(5) - rb := newReorgBackend(s.backend, mined, reorgedBlock) - worker := NewBlockCollector(&config.Verifier{Interval: 0, BlockLimit: 2}, s.db, rb) - - // collect blocks - worker.Work(context.Background()) - - // assert - gots, _ := s.db.Block.FindUncollecteds(20) - s.Len(gots, len(mined)) - for i, want := range mined { - s.Equal(want.Number.Uint64(), gots[i].Number) - s.Equal(want.Hash(), gots[i].Hash) - } - - // reorg occurred - rb.reorg() - worker.Work(context.Background()) - - // assert - gots, _ = s.db.Block.FindUncollecteds(20) - s.Equal(reorgedBlock-1, gots[len(gots)-1].Number) - for i, want := range mined[:reorgedBlock-1] { - s.Equal(want.Number.Uint64(), gots[i].Number) - s.Equal(want.Hash(), gots[i].Hash) - } - - // collect reorganized blocks - worker.Work(context.Background()) - - // assert - gots, _ = s.db.Block.FindUncollecteds(20) - s.Len(gots, len(rb.reorged)) - for i, want := range mined[:reorgedBlock-1] { - s.Equal(want.Number.Uint64(), gots[i].Number) - s.Equal(want.Hash(), gots[i].Hash) - } - for i, want := range rb.reorged { - s.Equal(want.Number.Uint64(), gots[i].Number) - s.Equal(want.Hash(), gots[i].Hash) - } -} - -type reorgBackend struct { - *backend.Backend - - mu *sync.Mutex - do bool - - mined, reorged []*types.Header -} - -func newReorgBackend( - tb *backend.Backend, - mined []*types.Header, - reorgedBlock uint64, -) *reorgBackend { - b := &reorgBackend{ - Backend: tb, - mu: &sync.Mutex{}, - mined: mined, - reorged: make([]*types.Header, len(mined)), - } - - for i, src := range mined { - cpy := types.CopyHeader(src) - num := cpy.Number.Uint64() - - if num == reorgedBlock { - // reorganization start block. - cpy.Difficulty.Add(cpy.Difficulty, common.Big1) - } else if num > reorgedBlock { - cpy.ParentHash = b.reorged[i-1].Hash() - } - - b.reorged[i] = cpy - } - - return b -} - -func (r *reorgBackend) reorg() { - r.mu.Lock() - defer r.mu.Unlock() - - h := r.Backend.Mining() - h.ParentHash = r.reorged[len(r.reorged)-1].Hash() - - r.reorged = append(r.reorged, h) - r.do = true -} - -func (r *reorgBackend) NewBatchHeaderClient() (ethutil.BatchHeaderClient, error) { - return &backend.BatchHeaderClient{Client: r}, nil -} - -func (r *reorgBackend) HeaderByNumber(_ context.Context, b *big.Int) (*types.Header, error) { - r.mu.Lock() - defer r.mu.Unlock() - - if b == nil { - if r.do { - return r.reorged[len(r.reorged)-1], nil - } - return r.mined[len(r.mined)-1], nil - } - - n := int(b.Uint64()) - if r.do { - if n <= len(r.reorged) { - return r.reorged[n-1], nil - } - } else if n <= len(r.mined) { - return r.mined[n-1], nil - } - - return nil, ethereum.NotFound -} diff --git a/collector/event_collector.go b/collector/event_collector.go deleted file mode 100644 index 22c4339..0000000 --- a/collector/event_collector.go +++ /dev/null @@ -1,201 +0,0 @@ -package collector - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/oasysgames/oasys-optimism-verifier/config" - "github.com/oasysgames/oasys-optimism-verifier/database" - "github.com/oasysgames/oasys-optimism-verifier/ethutil" - "github.com/oasysgames/oasys-optimism-verifier/verse" -) - -// Worker to collect events for OasysStateCommitmentChain. -type EventCollector struct { - cfg *config.Verifier - db *database.Database - hub ethutil.Client - signer common.Address - log log.Logger -} - -// Deprecated: -func NewEventCollector( - cfg *config.Verifier, - db *database.Database, - hub ethutil.Client, - signer common.Address, -) *EventCollector { - return &EventCollector{ - cfg: cfg, - db: db, - hub: hub, - signer: signer, - log: log.New("worker", "event-collector"), - } -} - -func (w *EventCollector) Start(ctx context.Context) { - w.log.Info("Event collector started", - "interval", w.cfg.Interval, "event-filter-limit", w.cfg.EventFilterLimit) - - ticker := time.NewTicker(w.cfg.Interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - w.log.Info("Event collector stopped") - return - case <-ticker.C: - w.Work(ctx) - } - } -} - -func (w *EventCollector) Work(ctx context.Context) { - for { - // get new blocks from database - blocks, err := w.db.Block.FindUncollecteds(w.cfg.EventFilterLimit) - if err != nil && !errors.Is(err, database.ErrNotFound) { - w.log.Error("Failed to find uncollected blocks", "err", err) - return - } else if len(blocks) == 0 { - w.log.Debug("Wait for new block") - return - } - - // collect event logs from hub-layer - start, end := blocks[0], blocks[len(blocks)-1] - logs, err := w.hub.FilterLogs(ctx, verse.NewEventLogFilter(start.Number, end.Number, nil)) - if err != nil { - w.log.Error("Failed to fetch event logs from hub-layer", - "start", start, "end", end, "err", err) - return - } - if len(logs) == 0 { - w.log.Debug("No event log", "start", start, "end", end) - } - - w.log.Info("Collected event logs", "start", start.Number, "end", end.Number) - - if err = w.db.Transaction(func(tx *database.Database) error { - for _, log := range logs { - if err := w.processLog(tx, &log); err != nil { - return err - } - } - if err := tx.Block.SaveCollected(end.Number, end.Hash); err != nil { - w.log.Error("Failed to save collected block", "number", end, "err", err) - return err - } - return nil - }); err != nil { - return - } - } -} - -// Handler for event log from rollup contracts. -func (w *EventCollector) processLog(tx *database.Database, log *types.Log) error { - event, err := verse.ParseEventLog(log) - if err != nil { - w.log.Error("Failed to parse event log", - "block", log.BlockNumber, "contract", log.Address.Hex(), "err", err) - return err - } - - err = nil - switch t := event.(type) { - case *verse.RollupedEvent: - err = w.handleRollupedEvent(tx, t) - case *verse.DeletedEvent: - err = w.handleDeletedEvent(tx, t) - case *verse.VerifiedEvent: - err = w.handleVerifiedEvent(tx, t) - default: - err = fmt.Errorf("unknown event") - } - - return err -} - -// Handler for new rollup event. -func (w *EventCollector) handleRollupedEvent(txdb *database.Database, e *verse.RollupedEvent) error { - eventDB := e.EventDB(txdb) - - log := e.Logger(w.log) - log.Debug("New rollup event") - - // delete the `OptimismState` records in consideration of chain reorganization - rows, err := eventDB.Deletes(e.Log.Address, e.RollupIndex) - if err != nil { - log.Error("Failed to delete reorganized events", "err", err) - return err - } else if rows > 0 { - log.Info("Deleted reorganized events", "rows", rows) - } - - // delete the `OptimismSignature` records in consideration of chain reorganization - rows, err = txdb.OPSignature.Deletes(w.signer, e.Log.Address, e.RollupIndex) - if err != nil { - log.Error("Failed to delete reorganized signatures", "err", err) - return err - } else if rows > 0 { - log.Info("Deleted reorganized signatures", "rows", rows) - } - - if _, err := eventDB.Save(e.Log.Address, e.Parsed); err != nil { - log.Error("Failed to save rollup event", "err", err) - return err - } - - return nil -} - -// Handler for rollup delete event. -func (w *EventCollector) handleDeletedEvent(txdb *database.Database, e *verse.DeletedEvent) error { - eventDB := e.EventDB(txdb) - - log := e.Logger(w.log) - log.Info("New rollup delete event") - - // delete `OptimismState` records after target batchIndex - rows, err := eventDB.Deletes(e.Log.Address, e.RollupIndex) - if err != nil { - log.Error("Failed to delete events", "err", err) - return err - } else if rows > 0 { - log.Info("Deleted events", "rows", rows) - } - - // delete the `OptimismSignature` records in consideration of chain reorganization - rows, err = txdb.OPSignature.Deletes(w.signer, e.Log.Address, e.RollupIndex) - if err != nil { - log.Error("Failed to delete signatures", "err", err) - return err - } else if rows > 0 { - log.Info("Deleted signatures", "rows", rows) - } - - return nil -} - -// Handler for rollup verified event. -func (w *EventCollector) handleVerifiedEvent(txdb *database.Database, e *verse.VerifiedEvent) error { - log := e.Logger(w.log) - log.Debug("New rollup verified event") - - err := txdb.OPContract.SaveNextIndex(e.Log.Address, e.RollupIndex+1) - if err != nil { - log.Error("Failed to save next index", "err", err) - return err - } - - return nil -} diff --git a/collector/event_collector_test.go b/collector/event_collector_test.go deleted file mode 100644 index f2cf482..0000000 --- a/collector/event_collector_test.go +++ /dev/null @@ -1,210 +0,0 @@ -package collector - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/oasysgames/oasys-optimism-verifier/config" - "github.com/oasysgames/oasys-optimism-verifier/database" - "github.com/oasysgames/oasys-optimism-verifier/testhelper/backend" - tscc "github.com/oasysgames/oasys-optimism-verifier/testhelper/contract/scc" - "github.com/stretchr/testify/suite" -) - -type EventCollectorTestSuite struct { - backend.BackendSuite - - collector *EventCollector - eventDB database.IOPEventDB -} - -func TestEventCollector(t *testing.T) { - suite.Run(t, new(EventCollectorTestSuite)) -} - -func (s *EventCollectorTestSuite) SetupTest() { - s.BackendSuite.SetupTest() - - s.collector = NewEventCollector(&config.Verifier{ - Interval: time.Millisecond, - EventFilterLimit: 1000, - }, s.DB, s.Hub, s.SignableHub.Signer()) - s.eventDB = database.NewOPEventDB[database.OptimismState](s.DB) -} - -func (s *EventCollectorTestSuite) TestHandleRollupedEvent() { - // emit `StateBatchAppended` events - emits := make([]*tscc.SccStateBatchAppended, 10) - for i := range s.Range(0, len(emits)) { - _, emits[i] = s.EmitStateBatchAppended(i) - } - - // collect `StateBatchAppended` events - s.collector.Work(context.Background()) - - // assert - for i := range s.Range(0, len(emits)) { - got, err := s.eventDB.FindByRollupIndex(s.SCCAddr, uint64(i)) - s.NoError(err) - - gott := got.(*database.OptimismState) - s.Equal(s.SCCAddr, gott.Contract.Address) - s.Equal(emits[i].BatchIndex.Uint64(), gott.BatchIndex) - s.Equal(emits[i].BatchRoot[:], gott.BatchRoot[:]) - s.Equal(emits[i].BatchSize.Uint64(), gott.BatchSize) - s.Equal(emits[i].PrevTotalElements.Uint64(), gott.PrevTotalElements) - s.Equal(emits[i].ExtraData, gott.ExtraData) - } -} - -func (s *EventCollectorTestSuite) TestHandleDeletedEvent() { - ctx := context.Background() - - // emit `StateBatchAppended` events - emits := make([]*tscc.SccStateBatchAppended, 10) - for i := range s.Range(0, len(emits)) { - _, emits[i] = s.EmitStateBatchAppended(i) - } - - // collect `StateBatchAppended` events - s.collector.Work(ctx) - - // create signature records - var creates []*database.OptimismSignature - for i := range s.Range(0, len(emits)) { - sig, err := s.DB.OPSignature.Save( - nil, nil, - s.SignableHub.Signer(), - s.SCCAddr, - emits[i].BatchIndex.Uint64(), - emits[i].BatchRoot, - true, - database.RandSignature(), - ) - s.NoError(err) - creates = append(creates, sig) - } - - // emit `StateBatchDeleted` event - _, err := s.TSCC.EmitStateBatchDeleted( - s.SignableHub.TransactOpts(ctx), - emits[5].BatchIndex, - emits[5].BatchRoot, - ) - s.NoError(err) - s.Mining() - - // collect `StateBatchDeleted` events - s.collector.Work(ctx) - - // assert - for i := range s.Range(0, 10) { - var want error - if i >= 5 { - want = database.ErrNotFound - } - _, err0 := s.eventDB.FindByRollupIndex(s.SCCAddr, uint64(i)) - _, err1 := s.DB.OPSignature.FindByID(creates[i].ID) - s.Equal(want, err0) - s.Equal(want, err1) - } -} - -func (s *EventCollectorTestSuite) TestHandleVerifiedEvent() { - // emit `EmitStateBatchVerified` events - for index := range s.Range(0, 5) { - _, err := s.TSCC.EmitStateBatchVerified( - s.SignableHub.TransactOpts(context.Background()), - big.NewInt(int64(index)), - s.RandHash(), - ) - s.NoError(err) - s.Mining() - } - - // collect `EmitStateBatchVerified` events - s.collector.Work(context.Background()) - - // assert - scc, _ := s.DB.OPContract.FindOrCreate(s.SCCAddr) - s.Equal(uint64(5), scc.NextIndex) -} - -func (s *EventCollectorTestSuite) TestIgnoreOtherEvent() { - ctx := context.Background() - - // emit `StateBatchAppended` and `Other` events - for i := range s.Range(0, 10) { - s.EmitStateBatchAppended(i) - s.Mining() - - _, err := s.TSCC.EmitOtherEvent(s.SignableHub.TransactOpts(ctx), big.NewInt(11)) - s.NoError(err) - s.Mining() - } - - // collect `StateBatchAppended` events - s.collector.Work(ctx) - - // assert - for i := range s.Range(0, 20) { - var want error - if i >= 10 { - want = database.ErrNotFound - } - _, err := s.eventDB.FindByRollupIndex(s.SCCAddr, uint64(i)) - s.ErrorIs(err, want) - } -} - -func (s *EventCollectorTestSuite) TestHandleReorganization() { - ctx := context.Background() - - // emit `StateBatchAppended` events - emits := make([]*tscc.SccStateBatchAppended, 10) - for i := range s.Range(0, len(emits)) { - _, emits[i] = s.EmitStateBatchAppended(i) - } - - // collect `StateBatchAppended` events - s.collector.Work(ctx) - - // create signature records - var creates []*database.OptimismSignature - for i := range s.Range(0, len(emits)) { - sig, err := s.DB.OPSignature.Save( - nil, nil, - s.SignableHub.Signer(), - s.SCCAddr, - emits[i].BatchIndex.Uint64(), - emits[i].BatchRoot, - true, - database.RandSignature(), - ) - s.NoError(err) - creates = append(creates, sig) - } - - // simulate chain reorganization - s.EmitStateBatchAppended(4) - s.collector.Work(ctx) - - // assert - for i := range s.Range(0, len(emits)) { - _, err := s.eventDB.FindByRollupIndex(s.SCCAddr, uint64(i)) - if i < 5 { - s.NoError(err) - } else { - s.Error(err, database.ErrNotFound) - } - - _, err = s.DB.OPSignature.FindByID(creates[i].ID) - if i < 4 { - s.NoError(err) - } else { - s.Error(err, database.ErrNotFound) - } - } -} diff --git a/config/config.go b/config/config.go index fffac62..73876d4 100644 --- a/config/config.go +++ b/config/config.go @@ -67,14 +67,12 @@ func Defaults() map[string]interface{} { "ipc.sockname": "oasvlfy", "verifier.interval": 15 * time.Second, - "verifier.concurrency": 50, - "verifier.block_limit": 1000, - "verifier.event_filter_limit": 1000, "verifier.state_collect_limit": 1000, "verifier.state_collect_timeout": 15 * time.Second, - "verifier.db_optimize_interval": time.Hour, "verifier.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1. "verifier.start_block_offset": uint64(5760 * 2), // 2 days + "verifier.max_retry_backoff": time.Hour, + "verifier.retry_timeout": time.Hour * 24, // The minimum interval for Verse v0 is 15 seconds. // On the other hand, the minimum interval for Verse v1 is 80 seconds. @@ -378,30 +376,24 @@ type Verifier struct { // Interval for get block data. Interval time.Duration - // Number of concurrent executions. - Concurrency int - - // Number of block headers to collect at a time. - BlockLimit int `koanf:"block_limit"` - - // Number of blocks to event filter. - EventFilterLimit int `koanf:"event_filter_limit"` - // Number of state root to collect at a time. StateCollectLimit int `koanf:"state_collect_limit"` // Timeout for state root collection. StateCollectTimeout time.Duration `koanf:"state_collect_timeout"` - // Interval to optimize database. - OptimizeInterval time.Duration `koanf:"db_optimize_interval"` - // Number of confirmation blocks for transaction receipt. Confirmations int // The number of start fetching events is offset from the current block. // This offset is used at the first time to fetch events. StartBlockOffset uint64 `koanf:"start_block_offset"` + + // The maximum exponential backoff time for retries. + MaxRetryBackoff time.Duration `koanf:"max_retry_backoff"` + + // The maximum duration to attempt retries. + RetryTimeout time.Duration `koanf:"retry_timeout"` } type Submitter struct { diff --git a/config/config_test.go b/config/config_test.go index b717cd6..fb61de7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -74,14 +74,12 @@ func (s *ConfigTestSuite) TestNewConfig() { enable: true wallet: wallet1 interval: 5s - concurrency: 10 - block_limit: 500 - event_filter_limit: 50 state_collect_limit: 5 state_collect_timeout: 1s - db_optimize_interval: 2s confirmations: 4 start_block_offset: 5760 + max_retry_backoff: 1m + retry_timeout: 2m submitter: enable: true @@ -236,14 +234,12 @@ func (s *ConfigTestSuite) TestNewConfig() { Enable: true, Wallet: "wallet1", Interval: 5 * time.Second, - Concurrency: 10, - BlockLimit: 500, - EventFilterLimit: 50, StateCollectLimit: 5, StateCollectTimeout: time.Second, - OptimizeInterval: time.Second * 2, Confirmations: 4, StartBlockOffset: 5760, + MaxRetryBackoff: time.Minute, + RetryTimeout: time.Minute * 2, }, Submitter: Submitter{ Enable: true, @@ -408,13 +404,11 @@ func (s *ConfigTestSuite) TestDefaultValues() { s.Equal("oasvlfy", got.IPC.Sockname) s.Equal(15*time.Second, got.Verifier.Interval) - s.Equal(50, got.Verifier.Concurrency) - s.Equal(1000, got.Verifier.BlockLimit) - s.Equal(1000, got.Verifier.EventFilterLimit) s.Equal(1000, got.Verifier.StateCollectLimit) s.Equal(15*time.Second, got.Verifier.StateCollectTimeout) - s.Equal(time.Hour, got.Verifier.OptimizeInterval) s.Equal(3, got.Verifier.Confirmations) + s.Equal(time.Hour, got.Verifier.MaxRetryBackoff) + s.Equal(time.Hour*24, got.Verifier.RetryTimeout) s.Equal(30*time.Second, got.Submitter.Interval) s.Equal(50, got.Submitter.Concurrency) diff --git a/verifier/verifier.go b/verifier/verifier.go index 2eac398..952ea63 100644 --- a/verifier/verifier.go +++ b/verifier/verifier.go @@ -17,10 +17,6 @@ import ( "github.com/oasysgames/oasys-optimism-verifier/verse" ) -const ( - VerifyRetryLimit = 3 -) - // Worker to verify rollups. type Verifier struct { cfg *config.Verifier @@ -120,18 +116,15 @@ func (w *Verifier) startVerifier(ctx context.Context, contract common.Address, c } } -func (w *Verifier) work(ctx context.Context, task verse.VerifiableVerse, chainId uint64, nextStart *uint64, publishAllUnverifiedSigs bool) error { +func (w *Verifier) work(parent context.Context, task verse.VerifiableVerse, chainId uint64, nextStart *uint64, publishAllUnverifiedSigs bool) error { // run verification tasks until time out - var ( - cancel context.CancelFunc - ctxOrigin = ctx - setNextStart = func(endBlock uint64) { - // Next start block is the current end block + 1 - endBlock += 1 - *nextStart = endBlock - } - ) - ctx, cancel = context.WithTimeout(ctx, w.cfg.StateCollectTimeout) + setNextStart := func(endBlock uint64) { + // Next start block is the current end block + 1 + endBlock += 1 + *nextStart = endBlock + } + + ctx, cancel := context.WithTimeout(parent, w.cfg.StateCollectTimeout) defer cancel() // Assume the fetched nextIndex is not reorged, as we confirm `w.cfg.Confirmations` blocks @@ -199,39 +192,52 @@ func (w *Verifier) work(ctx context.Context, task verse.VerifiableVerse, chainId // verify the fetched logs var ( - opsigs = []*database.OptimismSignature{} - // flag at least one log verification failed + opsigs []*database.OptimismSignature + // flag at least one log verification failed. atLeastOneLogVerificationFailed bool + // As the replica syncing is not real-time, the retry mechanism is required. + retryBackoff = w.retryBackoff() ) for i := range logs { - // verify with retry - var row *database.OptimismSignature - for counter := 0; counter < VerifyRetryLimit; counter++ { - if row, err = w.verifyAndSaveLog(ctx, &logs[i], task, nextIndex.Uint64(), log); err != nil { - log.Warn("retry verification", "retry-counter", counter, "err", err) - if errors.Is(err, context.Canceled) { - // exit if context have been canceled - return err - } else if errors.Is(err, context.DeadlineExceeded) { - // expand the deadline if the deadline is exceeded - log.Info("expand the deadline", "log-index", i) - cancel() // cancel previous context - ctx, cancel = context.WithTimeout(ctxOrigin, 30*time.Second) // expand the deadline - defer cancel() - continue // retry immediately - } - // retry after backoff delay: 100ms, 1600ms, 25600ms - time.Sleep(100 << (4 * counter) * time.Millisecond) + var ( + row *database.OptimismSignature + err error + ) + for { + row, err = w.verifyAndSaveLog(ctx, &logs[i], task, nextIndex.Uint64(), log) + // break if the verification is successful or skip + if err == nil { + break + } + // exit if context have been canceled + if errors.Is(err, context.Canceled) { + return err + } + + delay, remain, attempts := retryBackoff() + // give up if the retry time limit is exceeded + if remain <= 0 { + break + } + // expand the deadline if the deadline is exceeded and retry immediately + if errors.Is(err, context.DeadlineExceeded) { + log.Info("expand the deadline", "log-index", i) + cancel() // cancel previous context + ctx, cancel = context.WithTimeout(parent, w.cfg.StateCollectTimeout*2) + defer cancel() continue } - // break if the verification is successful - break + + // exponential backoff til max delay + log.Warn("retry verification", + "delay", delay, "remain", remain, "attempts", attempts, "err", err) + time.Sleep(delay) } // verification failed if err != nil { // skip the log if the verification failed - log.Error("Failed to verification", "log-index", i, "err", err) + log.Error("Failed to verify a log", "log-index", i, "err", err) atLeastOneLogVerificationFailed = true } @@ -269,8 +275,8 @@ func (w *Verifier) work(ctx context.Context, task verse.VerifiableVerse, chainId if atLeastOneLogVerificationFailed { // Remove task if at least one log verification failed. - // The removed task will be added again in the next verse discovery. - // As the verse discovery interval is 1h, the faild log verification will be retried 1h later. + // dinamic discovery on : The removed task will be added again in the next verse discovery + // dinamic discovery off: restarting is required to add the removed task again w.RemoveTask(task.RollupContract()) } @@ -349,3 +355,24 @@ func (w *Verifier) cleanOldSignatures(contract common.Address, nextIndex uint64) } return nil } + +func (w *Verifier) retryBackoff() func() (delay, remain time.Duration, attempts int) { + started := time.Now() + attempts := 0 + + return func() (time.Duration, time.Duration, int) { + // backoff delay: 0.1s, 0.8s, 6.4s, 51.2s, 409.6s(7m), 3276.8s(54m), + delay := 100 << (3 * attempts) * time.Millisecond + if delay <= 0 || delay > w.cfg.MaxRetryBackoff { // delay <= 0 is overflow + delay = w.cfg.MaxRetryBackoff + } + + remain := w.cfg.RetryTimeout - time.Since(started) + if remain < 0 { + remain = 0 + } + + attempts++ + return delay, remain, attempts + } +} diff --git a/verifier/verifier_test.go b/verifier/verifier_test.go index adab73a..23b9c2f 100644 --- a/verifier/verifier_test.go +++ b/verifier/verifier_test.go @@ -43,7 +43,6 @@ func (s *VerifierTestSuite) SetupTest() { s.sigsCh = make(chan []*database.OptimismSignature, 4) s.verifier = NewVerifier(&config.Verifier{ Interval: 50 * time.Millisecond, - Concurrency: 10, StateCollectLimit: 3, StateCollectTimeout: time.Second, Confirmations: 2, @@ -125,6 +124,43 @@ func (s *VerifierTestSuite) TestStartVerifier() { } } +func (s *VerifierTestSuite) TestRetryBackoff() { + verifier := &Verifier{ + cfg: &config.Verifier{ + MaxRetryBackoff: time.Minute, + RetryTimeout: time.Millisecond * 100, + }, + } + + backoff := verifier.retryBackoff() + + wait := time.Millisecond * 10 + for i := 0; i < 10; i++ { + delay, remain, attempts := backoff() + + s.Equal(i+1, attempts) + s.Less(remain, verifier.cfg.RetryTimeout-wait*time.Duration(i)) + + switch i { + case 0: + s.Equal(delay, time.Millisecond*100) + case 1: + s.Equal(delay, time.Millisecond*800) + case 2: + s.Equal(delay, time.Millisecond*6400) + case 3: + s.Equal(delay, time.Millisecond*51200) + default: // i >= 4 + s.Equal(delay, time.Minute) + } + + time.Sleep(wait) + } + + _, remain, _ := backoff() + s.Equal(remain, time.Duration(0)) +} + func (s *VerifierTestSuite) sendVerseTransactions(count int) (headers []*types.Header) { ctx := context.Background() to := common.HexToAddress("0x09ad74977844F513E61AdE2B50b0C06268A4f6d7") diff --git a/version/version.go b/version/version.go index f01189e..71f7946 100644 --- a/version/version.go +++ b/version/version.go @@ -5,7 +5,7 @@ import "fmt" const ( Major = 1 Minor = 2 - Patch = 1 + Patch = 2 Meta = "" )