From f12348825fdd629f6ae933752a48f041d8109424 Mon Sep 17 00:00:00 2001 From: ironbeer <7997273+ironbeer@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:31:38 +0900 Subject: [PATCH] Improved resilience to contract interface changes --- cmd/start.go | 9 +- contract/stakemanager/cache.go | 154 +++++++++------------------- contract/stakemanager/cache_test.go | 44 ++++---- p2p/node.go | 6 +- p2p/node_test.go | 3 +- submitter/signature_iter.go | 15 +-- submitter/signature_iter_test.go | 15 +-- submitter/submitter.go | 4 +- submitter/submitter_test.go | 12 +-- testhelper/stakemanager.go | 42 ++------ version/version.go | 2 +- 11 files changed, 113 insertions(+), 193 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index a318155..bcc161f 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -86,15 +86,10 @@ func runStartCmd(cmd *cobra.Command, args []string) { s.mustSetupBeacon() // Fetch the total stake and the stakes synchronously - if err := s.smcache.Refresh(ctx); err != nil { + if _, err := s.smcache.TotalStakeWithError(ctx); err != nil { // Exit if the first refresh faild, because the following refresh higly likely fail log.Crit("Failed to refresh stake cache", "err", err) } - // start cache updater - go func() { - // NOTE: Don't add wait group, as no need to guarantee the completion - s.smcache.RefreshLoop(ctx, time.Hour) - }() s.startVerseDiscovery(ctx) s.startBeacon(ctx) @@ -202,7 +197,7 @@ func mustNewServer(ctx context.Context) *server { if err != nil { log.Crit("Failed to construct StakeManager", "err", err) } - s.smcache = stakemanager.NewCache(sm) + s.smcache = stakemanager.NewCache(sm, time.Hour) return s } diff --git a/contract/stakemanager/cache.go b/contract/stakemanager/cache.go index ec7263b..b829b7e 100644 --- a/contract/stakemanager/cache.go +++ b/contract/stakemanager/cache.go @@ -11,130 +11,78 @@ import ( "github.com/ethereum/go-ethereum/log" ) +var ( + totalStakeKey = common.HexToAddress("0xffffffffffffffffffffffffffffffffffffffff") +) + type IStakeManager interface { GetTotalStake(callOpts *bind.CallOpts, epoch *big.Int) (*big.Int, error) - GetValidators(callOpts *bind.CallOpts, epoch, cursol, howMany *big.Int) (struct { - Owners []common.Address - Operators []common.Address - Stakes []*big.Int - BlsPublicKeys [][]byte - Candidates []bool - NewCursor *big.Int - }, error) + GetOperatorStakes(callOpts *bind.CallOpts, operator common.Address, epoch *big.Int) (*big.Int, error) } type Cache struct { - sm IStakeManager - mu sync.Mutex - total *big.Int - signerStakes map[common.Address]*big.Int - candidates map[common.Address]bool + sm IStakeManager + ttl time.Duration + mu sync.Mutex + entries map[common.Address]*cacheEntry } -func NewCache(sm IStakeManager) *Cache { - return &Cache{ - sm: sm, - total: big.NewInt(0), - signerStakes: make(map[common.Address]*big.Int), - } +type cacheEntry struct { + amount *big.Int + expireAt time.Time } -func (c *Cache) RefreshLoop(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - log.Info("Stake cache refresh loop stopped") - return - case <-ticker.C: - if err := c.Refresh(ctx); err != nil { - log.Error("Failed to refresh", "err", err) - } - } +func NewCache(sm IStakeManager, ttl time.Duration) *Cache { + return &Cache{ + sm: sm, + ttl: ttl, + entries: make(map[common.Address]*cacheEntry), } } -func (c *Cache) Refresh(parent context.Context) error { - ctx, cancel := context.WithTimeout(parent, time.Second*15) - defer cancel() - - total, err := c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0) - if err != nil { - return err - } - - cursor, howMany := big.NewInt(0), big.NewInt(50) - signerStakes := make(map[common.Address]*big.Int) - candidates := make(map[common.Address]bool) - for { - result, err := c.sm.GetValidators(&bind.CallOpts{Context: ctx}, common.Big0, cursor, howMany) - if err != nil { - return err - } else if len(result.Owners) == 0 { - break - } - - for i, operator := range result.Operators { - signerStakes[operator] = result.Stakes[i] - candidates[operator] = result.Candidates[i] - } - cursor = result.NewCursor - } - - c.mu.Lock() - defer c.mu.Unlock() - - c.total = total - c.signerStakes = signerStakes - c.candidates = candidates - return nil +func (c *Cache) TotalStake(ctx context.Context) *big.Int { + amount, _ := c.get(totalStakeKey, func() (*big.Int, error) { + return c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0) + }) + return amount } -func (c *Cache) TotalStake() *big.Int { - c.mu.Lock() - defer c.mu.Unlock() - - return new(big.Int).Set(c.total) +func (c *Cache) TotalStakeWithError(ctx context.Context) (*big.Int, error) { + return c.get(totalStakeKey, func() (*big.Int, error) { + return c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0) + }) } -func (c *Cache) SignerStakes() map[common.Address]*big.Int { - c.mu.Lock() - defer c.mu.Unlock() - - cpy := make(map[common.Address]*big.Int) - for k, v := range c.signerStakes { - cpy[k] = new(big.Int).Set(v) - } - return cpy +func (c *Cache) StakeBySigner(ctx context.Context, signer common.Address) *big.Int { + amount, _ := c.get(signer, func() (*big.Int, error) { + return c.sm.GetOperatorStakes(&bind.CallOpts{Context: ctx}, signer, common.Big0) + }) + return amount } -func (c *Cache) StakeBySigner(signer common.Address) *big.Int { +func (c *Cache) get(key common.Address, getAmount func() (*big.Int, error)) (*big.Int, error) { c.mu.Lock() - defer c.mu.Unlock() - - if b := c.signerStakes[signer]; b != nil { - return new(big.Int).Set(b) + if _, ok := c.entries[key]; !ok { + c.entries[key] = &cacheEntry{amount: big.NewInt(0)} } - return big.NewInt(0) -} - -func (c *Cache) Candidates() map[common.Address]bool { - c.mu.Lock() - defer c.mu.Unlock() - - cpy := make(map[common.Address]bool) - for k, v := range c.candidates { - cpy[k] = v + c.mu.Unlock() + + var ( + cache = c.entries[key] + err error + ) + if time.Now().After(cache.expireAt) { + ttl := c.ttl + if value, innerErr := getAmount(); innerErr == nil { + cache.amount = value + } else { + log.Error("Failed to refresh", "err", innerErr) + err = innerErr + ttl /= 10 // prevent requests when RPC is down + } + cache.expireAt = time.Now().Add(ttl) } - return cpy -} - -func (c *Cache) Candidate(signer common.Address) bool { - c.mu.Lock() - defer c.mu.Unlock() - return c.candidates[signer] + return new(big.Int).Set(cache.amount), err } diff --git a/contract/stakemanager/cache_test.go b/contract/stakemanager/cache_test.go index 22091d8..0392c69 100644 --- a/contract/stakemanager/cache_test.go +++ b/contract/stakemanager/cache_test.go @@ -4,7 +4,9 @@ import ( "context" "math/big" "testing" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/oasysgames/oasys-optimism-verifier/testhelper" "github.com/stretchr/testify/suite" ) @@ -22,7 +24,7 @@ func TestNewCache(t *testing.T) { func (s *CacheTestSuite) SetupTest() { s.sm = &testhelper.StakeManagerMock{} - s.vs = NewCache(s.sm) + s.vs = NewCache(s.sm, time.Millisecond*5) for i := range s.Range(0, 1000) { s.sm.Owners = append(s.sm.Owners, s.RandAddress()) @@ -32,33 +34,29 @@ func (s *CacheTestSuite) SetupTest() { } } -func (s *CacheTestSuite) TestRefresh() { - s.Nil(s.vs.Refresh(context.Background())) +func (s *CacheTestSuite) TestTotalStake() { + ctx := context.Background() - // assert `TotalStake()` - s.Equal(big.NewInt(499500), s.vs.TotalStake()) + s.Equal(big.NewInt(499500), s.vs.TotalStake(ctx)) - // assert `SignerStakes()` - signerStakes := s.vs.SignerStakes() - s.Len(signerStakes, len(s.sm.Operators)) - for i, signer := range s.sm.Operators { - s.Equal(s.sm.Stakes[i], signerStakes[signer]) - } + s.sm.Stakes[0] = new(big.Int).Add(s.sm.Stakes[0], common.Big1) + s.Equal(big.NewInt(499500), s.vs.TotalStake(ctx)) - // assert `StakeBySigner(common.Address)` - for i, signer := range s.sm.Operators { - s.Equal(s.sm.Stakes[i], s.vs.StakeBySigner(signer)) - } + time.Sleep(time.Millisecond * 10) + s.Equal(big.NewInt(499501), s.vs.TotalStake(ctx)) +} - // assert `Candidates()` - candidates := s.vs.Candidates() - s.Len(candidates, len(s.sm.Operators)) - for i, signer := range s.sm.Operators { - s.Equal(s.sm.Candidates[i], candidates[signer]) - } +func (s *CacheTestSuite) TestStakeBySigner() { + ctx := context.Background() - // assert `Candidate(common.Address)` for i, signer := range s.sm.Operators { - s.Equal(s.sm.Candidates[i], s.vs.Candidate(signer)) + s.Equal(s.sm.Stakes[i], s.vs.StakeBySigner(ctx, signer)) } + + old := new(big.Int).Set(s.sm.Stakes[0]) + s.sm.Stakes[0] = new(big.Int).Add(s.sm.Stakes[0], common.Big1) + s.Equal(old, s.vs.StakeBySigner(ctx, s.sm.Operators[0])) + + time.Sleep(time.Millisecond * 10) + s.Equal(s.sm.Stakes[0], s.vs.StakeBySigner(ctx, s.sm.Operators[0])) } diff --git a/p2p/node.go b/p2p/node.go index 4e4c23a..569c8e4 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -291,7 +291,7 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( return false } // Receive signatures only from signers with stake >= validator candidate minimum. - if w.stakemanager.StakeBySigner(signer).Cmp(ethutil.TenMillionOAS) == -1 { + if w.stakemanager.StakeBySigner(ctx, signer).Cmp(ethutil.TenMillionOAS) == -1 { return false } @@ -370,7 +370,7 @@ func (w *Node) handleOptimismSignatureExchangeRequest( for _, req := range requests { signer := common.BytesToAddress(req.Signer) - if w.stakemanager.StakeBySigner(signer).Cmp(ethutil.TenMillionOAS) == -1 { + if w.stakemanager.StakeBySigner(ctx, signer).Cmp(ethutil.TenMillionOAS) == -1 { continue } @@ -652,7 +652,7 @@ func (w *Node) publishLatestSignatures(ctx context.Context) { } filterd := []*database.OptimismSignature{} for _, sig := range latests { - if w.stakemanager.StakeBySigner(sig.Signer.Address).Cmp(ethutil.TenMillionOAS) >= 0 { + if w.stakemanager.StakeBySigner(ctx, sig.Signer.Address).Cmp(ethutil.TenMillionOAS) >= 0 { filterd = append(filterd, sig) } } diff --git a/p2p/node_test.go b/p2p/node_test.go index 7cbcaf5..b45919f 100644 --- a/p2p/node_test.go +++ b/p2p/node_test.go @@ -68,14 +68,13 @@ func (s *NodeTestSuite) SetupTest() { // setup stakemanager mock sm := &testhelper.StakeManagerMock{} - s.stakemanager = stakemanager.NewCache(sm) + s.stakemanager = stakemanager.NewCache(sm, time.Hour) for _, signer := range signers { sm.Owners = append(sm.Owners, s.RandAddress()) sm.Operators = append(sm.Operators, signer) sm.Stakes = append(sm.Stakes, ethutil.TenMillionOAS) sm.Candidates = append(sm.Candidates, true) } - s.stakemanager.Refresh(context.Background()) // setup verse pool s.versepool = verse.NewVersePool(s.b0) diff --git a/submitter/signature_iter.go b/submitter/signature_iter.go index 8f82919..f7698b5 100644 --- a/submitter/signature_iter.go +++ b/submitter/signature_iter.go @@ -1,6 +1,7 @@ package submitter import ( + "context" "fmt" "math/big" "sort" @@ -18,14 +19,14 @@ type signatureIterator struct { rollupIndex uint64 } -func (si *signatureIterator) next() ([]*database.OptimismSignature, error) { +func (si *signatureIterator) next(ctx context.Context) ([]*database.OptimismSignature, error) { rows, err := si.db.OPSignature.Find(nil, nil, &si.contract, &si.rollupIndex, 1000, 0) if err != nil { return nil, err } - rows, err = filterSignatures(rows, ethutil.TenMillionOAS, - si.stakemanager.TotalStake(), si.stakemanager.SignerStakes()) + rows, err = filterSignatures(rows, ethutil.TenMillionOAS, si.stakemanager.TotalStake(ctx), + func(signer common.Address) *big.Int { return si.stakemanager.StakeBySigner(ctx, signer) }) if err != nil { return nil, err } @@ -37,7 +38,7 @@ func (si *signatureIterator) next() ([]*database.OptimismSignature, error) { func filterSignatures( rows []*database.OptimismSignature, minStake, totalStake *big.Int, - signerStakes map[common.Address]*big.Int, + stakeBySigner func(signer common.Address) *big.Int, ) (filterd []*database.OptimismSignature, err error) { // group by RollupHash and Approved type group struct { @@ -45,12 +46,14 @@ func filterSignatures( rows []*database.OptimismSignature } groups := map[string]*group{} + signerStakes := map[common.Address]*big.Int{} for _, row := range rows { - stake, ok := signerStakes[row.Signer.Address] - if !ok || stake.Cmp(minStake) == -1 { + stake := stakeBySigner(row.Signer.Address) + if stake.Cmp(minStake) == -1 { continue } + signerStakes[row.Signer.Address] = stake key := fmt.Sprintf("%s:%v", row.RollupHash, row.Approved) if _, ok := groups[key]; !ok { diff --git a/submitter/signature_iter_test.go b/submitter/signature_iter_test.go index 61c7653..30aae29 100644 --- a/submitter/signature_iter_test.go +++ b/submitter/signature_iter_test.go @@ -4,6 +4,7 @@ import ( "context" "math/big" "sort" + "time" "github.com/ethereum/go-ethereum/common" "github.com/oasysgames/oasys-optimism-verifier/contract/stakemanager" @@ -13,6 +14,8 @@ import ( ) func (s *SubmitterTestSuite) TestSignatureIterator() { + ctx := context.Background() + var signers [20]common.Address for i := range signers { signers[i] = s.RandAddress() @@ -60,7 +63,7 @@ func (s *SubmitterTestSuite) TestSignatureIterator() { // setup stakemanager sm := &testhelper.StakeManagerMock{} - smcache := stakemanager.NewCache(sm) + smcache := stakemanager.NewCache(sm, time.Millisecond) for _, group := range signerGroups { for i, signer := range group.signers { sm.Owners = append(sm.Owners, s.RandAddress()) @@ -70,7 +73,6 @@ func (s *SubmitterTestSuite) TestSignatureIterator() { sm.Candidates = append(sm.Candidates, true) } } - smcache.Refresh(context.Background()) // save signatures for rollupIndex, c := range sigGroups { @@ -119,8 +121,8 @@ func (s *SubmitterTestSuite) TestSignatureIterator() { } // assert - gots0, err0 := iter.next() - gots1, err1 := iter.next() + gots0, err0 := iter.next(ctx) + gots1, err1 := iter.next(ctx) s.Nil(err0) s.Nil(err1) @@ -141,7 +143,8 @@ func (s *SubmitterTestSuite) TestSignatureIterator() { for i := range sm.Operators { sm.Stakes[i] = ethutil.TenMillionOAS } - smcache.Refresh(context.Background()) - _, err := iter.next() + time.Sleep(time.Millisecond) // wait for cache to expire + + _, err := iter.next(ctx) s.ErrorContains(err, "stake amount shortage") } diff --git a/submitter/submitter.go b/submitter/submitter.go index 2b35628..d7157c5 100644 --- a/submitter/submitter.go +++ b/submitter/submitter.go @@ -262,7 +262,7 @@ func (w *Submitter) sendNormalTx( task verse.TransactableVerse, iter *signatureIterator, ) (*types.Transaction, error) { - rows, err := iter.next() + rows, err := iter.next(ctx) if err != nil { log.Error("Failed to find signatures", "err", err) return nil, err @@ -330,7 +330,7 @@ func (w *Submitter) sendMulticallTx( errShortage error ) for i := 0; i < w.cfg.BatchSize; i++ { - rows, err := iter.next() + rows, err := iter.next(ctx) if _, ok := err.(*StakeAmountShortage); ok { errShortage = err break diff --git a/submitter/submitter_test.go b/submitter/submitter_test.go index 2039475..b04687d 100644 --- a/submitter/submitter_test.go +++ b/submitter/submitter_test.go @@ -62,7 +62,7 @@ func (s *SubmitterTestSuite) SetupTest() { UseMulticall: true, // TODO: No single tx testing MulticallAddress: s.MulticallAddr.String(), } - s.submitter = NewSubmitter(s.cfg, s.DB, nil, stakemanager.NewCache(s.StakeManager), s.versepool) + s.submitter = NewSubmitter(s.cfg, s.DB, nil, stakemanager.NewCache(s.StakeManager, time.Hour), s.versepool) s.submitter.l1SignerFn = func(chainID uint64) ethutil.SignableClient { return s.SignableHub } @@ -96,8 +96,8 @@ func (s *SubmitterTestSuite) TestSubmit() { // no more signatures than the minimum stake should be sent sort.Slice(signatures[i], func(j, h int) bool { // sort by stake amount - a := s.submitter.stakemanager.StakeBySigner(signatures[i][j].Signer.Address) - b := s.submitter.stakemanager.StakeBySigner(signatures[i][h].Signer.Address) + a := s.submitter.stakemanager.StakeBySigner(ctx, signatures[i][j].Signer.Address) + b := s.submitter.stakemanager.StakeBySigner(ctx, signatures[i][h].Signer.Address) return a.Cmp(b) == 1 // order by desc }) signatures[i] = signatures[i][:6] @@ -114,7 +114,6 @@ func (s *SubmitterTestSuite) TestSubmit() { } // submitter do the work. - s.submitter.stakemanager.Refresh(ctx) go s.submitter.Start(ctx) time.Sleep(s.cfg.Interval * 2) s.Hub.Commit() @@ -178,7 +177,6 @@ func (s *SubmitterTestSuite) TestStartSubmitter() { signers := s.StakeManager.Operators // Start submitter by adding task - s.submitter.stakemanager.Refresh(ctx) go s.submitter.Start(ctx) // Dry run to cover no signature case // Manually confirmed by checking the logs @@ -230,8 +228,8 @@ func (s *SubmitterTestSuite) TestStartSubmitter() { // no more signatures than the minimum stake should be sent sort.Slice(signatures[i], func(j, h int) bool { // sort by stake amount - a := s.submitter.stakemanager.StakeBySigner(signatures[i][j].Signer.Address) - b := s.submitter.stakemanager.StakeBySigner(signatures[i][h].Signer.Address) + a := s.submitter.stakemanager.StakeBySigner(ctx, signatures[i][j].Signer.Address) + b := s.submitter.stakemanager.StakeBySigner(ctx, signatures[i][h].Signer.Address) return a.Cmp(b) == 1 // order by desc }) signatures[i] = signatures[i][:6] diff --git a/testhelper/stakemanager.go b/testhelper/stakemanager.go index 0bb0d1f..1d974e6 100644 --- a/testhelper/stakemanager.go +++ b/testhelper/stakemanager.go @@ -25,39 +25,15 @@ func (b *StakeManagerMock) GetTotalStake( return tot, nil } -func (b *StakeManagerMock) GetValidators( +func (b *StakeManagerMock) GetOperatorStakes( callOpts *bind.CallOpts, - epoch, cursol, howMany *big.Int, -) (struct { - Owners []common.Address - Operators []common.Address - Stakes []*big.Int - BlsPublicKeys [][]byte - Candidates []bool - NewCursor *big.Int -}, error) { - length := big.NewInt(int64(len(b.Owners))) - if new(big.Int).Add(cursol, howMany).Cmp(length) >= 0 { - howMany = new(big.Int).Sub(length, cursol) - } - - start := cursol.Uint64() - end := start + howMany.Uint64() - - ret := struct { - Owners []common.Address - Operators []common.Address - Stakes []*big.Int - BlsPublicKeys [][]byte - Candidates []bool - NewCursor *big.Int - }{ - Owners: b.Owners[start:end], - Operators: b.Operators[start:end], - Stakes: b.Stakes[start:end], - Candidates: b.Candidates[start:end], - NewCursor: new(big.Int).Add(cursol, howMany), + operator common.Address, + epoch *big.Int, +) (*big.Int, error) { + for i, addr := range b.Operators { + if addr == operator { + return b.Stakes[i], nil + } } - - return ret, nil + return big.NewInt(0), nil } diff --git a/version/version.go b/version/version.go index 25ef0fd..dec78a9 100644 --- a/version/version.go +++ b/version/version.go @@ -5,7 +5,7 @@ import "fmt" const ( Major = 1 Minor = 2 - Patch = 6 + Patch = 7 Meta = "" )