Skip to content

Commit

Permalink
improve log fetch block range decision (#38)
Browse files Browse the repository at this point in the history
* improve log fetch block range dicision

* change fixed number to const variable

* fix: prevent share blockRangeManager across all verse

* delete blockRangeManager from the struct

* respond feed back from @ironbeer

* Add NextIndexEventEmittedBlock method

* Modify NextIndexEventEmittedBlock method to GetEventEmittedBlock

* Add utility funcs

* Modify return type

* Improved determining start and end block of verification

* Improved determining start and end block of verification

* Add generics SyncMap

* Background publishing of unverified signatures

* Adjustment of intervals

* Add comment

* Move backoffDecr

* Catch publish error

* Exit log of background tasks

* Improved log

* Unwrap shortage error

* Fixed an issue where receiving unverified signatures was missed due to `peer == proc.peer`

* Improved error name

* Add util.WorkerPool

* Add header get method with caching

* Add global verse pool

* Improved woker pool

* Support for verse pool

* Add task cache cleanup

* Rename ticker

* Reducing RPC load for event retrieval request

* Fixed overflow bug

* Fixed workers were blocked due to an infinite loop

---------

Co-authored-by: ironbeer <[email protected]>
  • Loading branch information
tak1827 and ironbeer authored Oct 3, 2024
1 parent c4b1ca3 commit 243a43e
Show file tree
Hide file tree
Showing 29 changed files with 2,473 additions and 638 deletions.
12 changes: 7 additions & 5 deletions cmd/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ func (s *ConfigLoaderTestSuite) configWithMinCliArgs() *config.Config {
Keystore: s.keystoreDir,
Wallets: map[string]*config.Wallet{},
HubLayer: config.HubLayer{
ChainID: 1,
RPC: "https://rpc.hub.example.com/",
ChainID: 1,
RPC: "https://rpc.hub.example.com/",
BlockTime: time.Second * 6,
},
VerseLayer: config.VerseLayer{
Discovery: struct {
Expand Down Expand Up @@ -334,19 +335,20 @@ func (s *ConfigLoaderTestSuite) configWithMinCliArgs() *config.Config {
Verifier: config.Verifier{
Enable: false,
Wallet: "",
MaxWorkers: defaults["verifier.max_workers"].(int),
Interval: defaults["verifier.interval"].(time.Duration),
StateCollectLimit: defaults["verifier.state_collect_limit"].(int),
StateCollectTimeout: defaults["verifier.state_collect_timeout"].(time.Duration),
Confirmations: defaults["verifier.confirmations"].(int),
StartBlockOffset: defaults["verifier.start_block_offset"].(uint64),
MaxLogFetchBlockRange: defaults["verifier.max_log_fetch_block_range"].(uint64),
MaxLogFetchBlockRange: defaults["verifier.max_log_fetch_block_range"].(int),
MaxIndexDiff: defaults["verifier.max_index_diff"].(int),
MaxRetryBackoff: defaults["verifier.max_retry_backoff"].(time.Duration),
RetryTimeout: defaults["verifier.retry_timeout"].(time.Duration),
},
Submitter: config.Submitter{
Enable: false,
Confirmations: defaults["submitter.confirmations"].(int),
Concurrency: defaults["submitter.concurrency"].(int),
MaxWorkers: defaults["submitter.max_workers"].(int),
Interval: defaults["submitter.interval"].(time.Duration),
GasMultiplier: defaults["submitter.gas_multiplier"].(float64),
BatchSize: defaults["submitter.batch_size"].(int),
Expand Down
123 changes: 74 additions & 49 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func runStartCmd(cmd *cobra.Command, args []string) {

s.startVerseDiscovery(ctx)
s.startBeacon(ctx)
s.startVerifier(ctx)
s.startSubmitter(ctx)
log.Info("All workers started")

// wait for signal
Expand Down Expand Up @@ -149,6 +151,7 @@ type server struct {
hub ethutil.Client
smcache *stakemanager.Cache
p2p *p2p.Node
versepool verse.VersePool
verifier *verifier.Verifier
submitter *submitter.Submitter
bw *beacon.BeaconWorker
Expand Down Expand Up @@ -179,17 +182,20 @@ func mustNewServer(ctx context.Context) *server {
}

// construct hub-layer client
if s.hub, err = ethutil.NewClient(s.conf.HubLayer.RPC); err != nil {
if s.hub, err = ethutil.NewClient(s.conf.HubLayer.RPC, s.conf.HubLayer.BlockTime); err != nil {
log.Crit("Failed to construct hub-layer client", "err", err)
}

// Make sue the s.hub can connect to the chain
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if _, err := s.hub.BlockNumber(ctx); err != nil {
if _, err := s.hub.HeaderWithCache(ctx); err != nil {
log.Crit("Failed to connect to the hub-layer chain", "err", err)
}

// construct global verse pool
s.versepool = verse.NewVersePool(s.hub)

// construct stakemanager cache
sm, err := stakemanager.NewStakemanagerCaller(
common.HexToAddress(StakeManagerAddress), s.hub)
Expand Down Expand Up @@ -310,15 +316,27 @@ func (s *server) mustSetupVerifier() {
}

l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer)
s.verifier = verifier.NewVerifier(&s.conf.Verifier, s.db, s.p2p, l1Signer)
s.verifier = verifier.NewVerifier(
&s.conf.Verifier, s.db, s.p2p, l1Signer, ethutil.NewClient, s.versepool)
}

func (s *server) setupSubmitter() {
if !s.conf.Submitter.Enable {
return
}

s.submitter = submitter.NewSubmitter(&s.conf.Submitter, s.db, s.smcache)
var newSignerFn submitter.L1SignerFn = func(chainID uint64) ethutil.SignableClient {
for _, cfg := range s.conf.Submitter.Targets {
if cfg.ChainID == chainID {
if signer, ok := s.signers[cfg.Wallet]; ok {
return ethutil.NewSignableClient(
new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer)
}
}
}
return nil
}
s.submitter = submitter.NewSubmitter(&s.conf.Submitter, s.db, newSignerFn, s.smcache, s.versepool)
}

func (s *server) startVerseDiscovery(ctx context.Context) {
Expand Down Expand Up @@ -396,61 +414,46 @@ func (s *server) verseDiscoveryHandler(ctx context.Context, discovers []*config.
L2OOName: common.HexToAddress(s.conf.Submitter.L2OOVerifierAddress),
}

type verse_ struct {
cfg *config.Verse
verse verse.Verse
verify common.Address
// Delete erased Verse-Layer from the discovery JSON from the pool.
erased := make(map[common.Address]bool)
s.versepool.Range(func(item *verse.VersePoolItem) bool {
erased[item.Verse().RollupContract()] = true
return true
})

// Marking the Verse-Layer to be processed by the Submitter.
canSubmits := make(map[uint64]bool)
for _, cfg := range s.conf.Submitter.Targets {
canSubmits[cfg.ChainID] = true
}
var (
verses []*verse_
verseChainIDs []uint64
)

// Create a new Verse instance and add it to the pool.
var chainIDs []uint64
for _, cfg := range discovers {
for name, addr := range cfg.L1Contracts {
if factory, ok := verseFactories[name]; ok {
verses = append(verses, &verse_{
cfg: cfg,
verse: factory(s.db, s.hub, common.HexToAddress(addr)),
verify: verifyContracts[name],
})
verseChainIDs = append(verseChainIDs, cfg.ChainID)
}
}
}

log.Info("Discovered verses", "count", len(verses), "chain-ids", verseChainIDs)
verse := factory(s.db, s.hub, cfg.ChainID,
cfg.RPC, common.HexToAddress(addr), verifyContracts[name])
if s.versepool.Add(verse, canSubmits[cfg.ChainID]) {
log.Info("Add verse to verse pool",
"chain-id", cfg.ChainID, "rpc", cfg.RPC)
}

for _, x := range verses {
// add verse to Verifier
if s.verifier != nil && !s.verifier.HasTask(x.verse.RollupContract(), x.cfg.RPC) {
l2Client, err := ethutil.NewClient(x.cfg.RPC)
if err != nil {
log.Error("Failed to construct verse-layer client", "err", err)
} else {
log.Info("Add verse to Verifier", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract())
s.verifier.AddTask(ctx, x.verse.WithVerifiable(l2Client), x.cfg.ChainID)
delete(erased, verse.RollupContract())
chainIDs = append(chainIDs, cfg.ChainID)
}
}
}

// add verse to Submitter
if s.submitter != nil {
for _, tg := range s.conf.Submitter.Targets {
if tg.ChainID != x.cfg.ChainID || s.submitter.HasTask(x.verse.RollupContract()) {
continue
}

signer, ok := s.signers[tg.Wallet]
if !ok {
log.Error("Wallet for the Submitter not found", "wallet", tg.Wallet)
continue
}

log.Info("Add verse to Submitter", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract())
l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer)
s.submitter.AddTask(ctx, x.verse.WithTransactable(l1Signer, x.verify), x.cfg.ChainID)
}
// Delete erased verses from the pool.
for contract := range erased {
if verse, ok := s.versepool.Get(contract); ok {
log.Info("Delete verse from verse pool", "chain-id", verse.Verse().ChainID())
s.versepool.Delete(contract)
}
}

log.Info("Discovered verses", "count", len(chainIDs), "chain-ids", chainIDs)
}

func (s *server) mustSetupBeacon() {
Expand Down Expand Up @@ -485,6 +488,28 @@ func (s *server) startBeacon(ctx context.Context) {
}()
}

func (s *server) startVerifier(ctx context.Context) {
if s.verifier == nil {
return
}
s.wg.Add(1)
go func() {
s.verifier.Start(ctx)
s.wg.Done()
}()
}

func (s *server) startSubmitter(ctx context.Context) {
if s.submitter == nil {
return
}
s.wg.Add(1)
go func() {
s.submitter.Start(ctx)
s.wg.Done()
}()
}

func getOrCreateP2PKey(filename string) (crypto.PrivKey, error) {
data, err := os.ReadFile(filename)

Expand Down
67 changes: 52 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"fmt"
"path/filepath"
"reflect"
"strings"
Expand All @@ -14,6 +15,10 @@ import (
"github.com/knadh/koanf/v2"
)

const (
l1BlockTime = time.Second * 6
)

var (
validate = validator.New()
)
Expand All @@ -36,6 +41,8 @@ func init() {

func Defaults() map[string]interface{} {
return map[string]interface{}{
"hub_layer.block_time": l1BlockTime,

"verse_layer.discovery.refresh_interval": time.Hour,

"p2p.no_announce": []string{
Expand Down Expand Up @@ -66,20 +73,21 @@ func Defaults() map[string]interface{} {

"ipc.sockname": "oasvlfy",

"verifier.interval": 6 * time.Second,
"verifier.max_workers": 10,
"verifier.interval": l1BlockTime,
"verifier.state_collect_limit": 1000,
"verifier.state_collect_timeout": 15 * time.Second,
"verifier.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1.
"verifier.start_block_offset": uint64(14400 * 2), // 2 days in case of 6s block time
"verifier.max_log_fetch_block_range": uint64(14400), // 1 day in case of 6s block time
"verifier.max_retry_backoff": time.Hour,
"verifier.retry_timeout": time.Hour * 24,
"verifier.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1.
"verifier.max_log_fetch_block_range": 14400, // 1 day in case of 6s block time
"verifier.max_index_diff": 86400 * 2 / 120, // Number of rollups for 2days(L2BlockTime=1s,RollupInterval=120s)
"verifier.max_retry_backoff": time.Minute * 5,
"verifier.retry_timeout": time.Hour,

// The minimum interval for Verse v0 is 15 seconds.
// On the other hand, the minimum interval for Verse v1 is 80 seconds.
// Balance the two by setting the default to 30 seconds.
"submitter.max_workers": 5,
"submitter.interval": 30 * time.Second,
"submitter.concurrency": 50,
"submitter.confirmations": 3, // 3 confirmations are enough for later than v1.3.0 L1.
"submitter.gas_multiplier": 1.1,
"submitter.batch_size": 20,
Expand Down Expand Up @@ -240,6 +248,9 @@ type HubLayer struct {

// RPC of the Hub-Layer(HTTP or WebSocket).
RPC string `validate:"url"`

// Block interval of the Hub-Layer.
BlockTime time.Duration `koanf:"block_time"`
}

type Verse struct {
Expand Down Expand Up @@ -374,6 +385,9 @@ type Verifier struct {
// Name of the wallet to create signature.
Wallet string `validate:"required_if=Enable true"`

// Maximum number of concurrent workers.
MaxWorkers int `koanf:"max_workers"`

// Interval for get block data.
Interval time.Duration

Expand All @@ -386,12 +400,11 @@ type Verifier struct {
// Number of confirmation blocks for transaction receipt.
Confirmations int

// The number of start fetching events is offset from the current block.
// This offset is used at the first time to fetch events.
StartBlockOffset uint64 `koanf:"start_block_offset"`

// The max block range to fetch events.
MaxLogFetchBlockRange uint64 `koanf:"max_log_fetch_block_range"`
MaxLogFetchBlockRange int `koanf:"max_log_fetch_block_range"`

// Do not verify if `rollup index - next index` is greater than this value.
MaxIndexDiff int `koanf:"max_index_diff"`

// The maximum exponential backoff time for retries.
MaxRetryBackoff time.Duration `koanf:"max_retry_backoff"`
Expand All @@ -400,16 +413,26 @@ type Verifier struct {
RetryTimeout time.Duration `koanf:"retry_timeout"`
}

func (c *Verifier) String() string {
return fmt.Sprintf(
"wallet:%s max_workers:%d interval:%s state_collect_limit:%d state_collect_timeout:%s"+
" confirmations:%d max_log_fetch_block_range:%d max_index_diff:%d max_retry_backoff:%s"+
" retry_timeout:%s",
c.Wallet, c.MaxWorkers, c.Interval, c.StateCollectLimit, c.StateCollectTimeout,
c.Confirmations, c.MaxLogFetchBlockRange, c.MaxIndexDiff, c.MaxRetryBackoff,
c.RetryTimeout)
}

type Submitter struct {
// Whether to enable worker.
Enable bool `koanf:"enable"`

// Maximum number of concurrent workers.
MaxWorkers int `koanf:"max_workers"`

// Interval for send transaction.
Interval time.Duration

// Number of concurrent executions.
Concurrency int

// Number of confirmation blocks for transaction receipt.
Confirmations int

Expand All @@ -436,6 +459,20 @@ type Submitter struct {
Targets []*SubmitterTarget `validate:"dive"`
}

func (c *Submitter) String() string {
var targets []string
for _, tg := range c.Targets {
targets = append(targets, fmt.Sprintf("{chain_id=%d wallet=%s}", tg.ChainID, tg.Wallet))
}

return fmt.Sprintf("max_workers:%d interval:%s confirmations:%d gas_multiplier:%f"+
" max_gas:%d batch_size:%d scc_verifier_address:%s l2oo_verifier_address:%s"+
" use_multicall:%v multicall_address:%s targets:[%s]",
c.MaxWorkers, c.Interval, c.Confirmations, c.GasMultiplier,
c.MaxGas, c.BatchSize, c.SCCVerifierAddress, c.L2OOVerifierAddress,
c.UseMulticall, c.MulticallAddress, strings.Join(targets, ","))
}

type SubmitterTarget struct {
// Chain ID of the Verse-Layer.
ChainID uint64 `koanf:"chain_id" validate:"required"`
Expand Down
Loading

0 comments on commit 243a43e

Please sign in to comment.