Skip to content

Commit

Permalink
Merge pull request #46 from ironbeer/imp-stakemanager-cache
Browse files Browse the repository at this point in the history
Improved resilience to contract interface changes
  • Loading branch information
ironbeer authored Nov 12, 2024
2 parents 79c67b3 + f123488 commit 6e230bb
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 193 deletions.
9 changes: 2 additions & 7 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,10 @@ func runStartCmd(cmd *cobra.Command, args []string) {
s.mustSetupBeacon()

// Fetch the total stake and the stakes synchronously
if err := s.smcache.Refresh(ctx); err != nil {
if _, err := s.smcache.TotalStakeWithError(ctx); err != nil {
// Exit if the first refresh faild, because the following refresh higly likely fail
log.Crit("Failed to refresh stake cache", "err", err)
}
// start cache updater
go func() {
// NOTE: Don't add wait group, as no need to guarantee the completion
s.smcache.RefreshLoop(ctx, time.Hour)
}()

s.startVerseDiscovery(ctx)
s.startBeacon(ctx)
Expand Down Expand Up @@ -202,7 +197,7 @@ func mustNewServer(ctx context.Context) *server {
if err != nil {
log.Crit("Failed to construct StakeManager", "err", err)
}
s.smcache = stakemanager.NewCache(sm)
s.smcache = stakemanager.NewCache(sm, time.Hour)

return s
}
Expand Down
154 changes: 51 additions & 103 deletions contract/stakemanager/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,130 +11,78 @@ import (
"github.com/ethereum/go-ethereum/log"
)

var (
totalStakeKey = common.HexToAddress("0xffffffffffffffffffffffffffffffffffffffff")
)

type IStakeManager interface {
GetTotalStake(callOpts *bind.CallOpts, epoch *big.Int) (*big.Int, error)

GetValidators(callOpts *bind.CallOpts, epoch, cursol, howMany *big.Int) (struct {
Owners []common.Address
Operators []common.Address
Stakes []*big.Int
BlsPublicKeys [][]byte
Candidates []bool
NewCursor *big.Int
}, error)
GetOperatorStakes(callOpts *bind.CallOpts, operator common.Address, epoch *big.Int) (*big.Int, error)
}

type Cache struct {
sm IStakeManager
mu sync.Mutex
total *big.Int
signerStakes map[common.Address]*big.Int
candidates map[common.Address]bool
sm IStakeManager
ttl time.Duration
mu sync.Mutex
entries map[common.Address]*cacheEntry
}

func NewCache(sm IStakeManager) *Cache {
return &Cache{
sm: sm,
total: big.NewInt(0),
signerStakes: make(map[common.Address]*big.Int),
}
type cacheEntry struct {
amount *big.Int
expireAt time.Time
}

func (c *Cache) RefreshLoop(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Info("Stake cache refresh loop stopped")
return
case <-ticker.C:
if err := c.Refresh(ctx); err != nil {
log.Error("Failed to refresh", "err", err)
}
}
func NewCache(sm IStakeManager, ttl time.Duration) *Cache {
return &Cache{
sm: sm,
ttl: ttl,
entries: make(map[common.Address]*cacheEntry),
}
}

func (c *Cache) Refresh(parent context.Context) error {
ctx, cancel := context.WithTimeout(parent, time.Second*15)
defer cancel()

total, err := c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0)
if err != nil {
return err
}

cursor, howMany := big.NewInt(0), big.NewInt(50)
signerStakes := make(map[common.Address]*big.Int)
candidates := make(map[common.Address]bool)
for {
result, err := c.sm.GetValidators(&bind.CallOpts{Context: ctx}, common.Big0, cursor, howMany)
if err != nil {
return err
} else if len(result.Owners) == 0 {
break
}

for i, operator := range result.Operators {
signerStakes[operator] = result.Stakes[i]
candidates[operator] = result.Candidates[i]
}
cursor = result.NewCursor
}

c.mu.Lock()
defer c.mu.Unlock()

c.total = total
c.signerStakes = signerStakes
c.candidates = candidates
return nil
func (c *Cache) TotalStake(ctx context.Context) *big.Int {
amount, _ := c.get(totalStakeKey, func() (*big.Int, error) {
return c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0)
})
return amount
}

func (c *Cache) TotalStake() *big.Int {
c.mu.Lock()
defer c.mu.Unlock()

return new(big.Int).Set(c.total)
func (c *Cache) TotalStakeWithError(ctx context.Context) (*big.Int, error) {
return c.get(totalStakeKey, func() (*big.Int, error) {
return c.sm.GetTotalStake(&bind.CallOpts{Context: ctx}, common.Big0)
})
}

func (c *Cache) SignerStakes() map[common.Address]*big.Int {
c.mu.Lock()
defer c.mu.Unlock()

cpy := make(map[common.Address]*big.Int)
for k, v := range c.signerStakes {
cpy[k] = new(big.Int).Set(v)
}
return cpy
func (c *Cache) StakeBySigner(ctx context.Context, signer common.Address) *big.Int {
amount, _ := c.get(signer, func() (*big.Int, error) {
return c.sm.GetOperatorStakes(&bind.CallOpts{Context: ctx}, signer, common.Big0)
})
return amount
}

func (c *Cache) StakeBySigner(signer common.Address) *big.Int {
func (c *Cache) get(key common.Address, getAmount func() (*big.Int, error)) (*big.Int, error) {
c.mu.Lock()
defer c.mu.Unlock()

if b := c.signerStakes[signer]; b != nil {
return new(big.Int).Set(b)
if _, ok := c.entries[key]; !ok {
c.entries[key] = &cacheEntry{amount: big.NewInt(0)}
}
return big.NewInt(0)
}

func (c *Cache) Candidates() map[common.Address]bool {
c.mu.Lock()
defer c.mu.Unlock()

cpy := make(map[common.Address]bool)
for k, v := range c.candidates {
cpy[k] = v
c.mu.Unlock()

var (
cache = c.entries[key]
err error
)
if time.Now().After(cache.expireAt) {
ttl := c.ttl
if value, innerErr := getAmount(); innerErr == nil {
cache.amount = value
} else {
log.Error("Failed to refresh", "err", innerErr)
err = innerErr
ttl /= 10 // prevent requests when RPC is down
}
cache.expireAt = time.Now().Add(ttl)
}
return cpy
}

func (c *Cache) Candidate(signer common.Address) bool {
c.mu.Lock()
defer c.mu.Unlock()

return c.candidates[signer]
return new(big.Int).Set(cache.amount), err
}
44 changes: 21 additions & 23 deletions contract/stakemanager/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/oasysgames/oasys-optimism-verifier/testhelper"
"github.com/stretchr/testify/suite"
)
Expand All @@ -22,7 +24,7 @@ func TestNewCache(t *testing.T) {

func (s *CacheTestSuite) SetupTest() {
s.sm = &testhelper.StakeManagerMock{}
s.vs = NewCache(s.sm)
s.vs = NewCache(s.sm, time.Millisecond*5)

for i := range s.Range(0, 1000) {
s.sm.Owners = append(s.sm.Owners, s.RandAddress())
Expand All @@ -32,33 +34,29 @@ func (s *CacheTestSuite) SetupTest() {
}
}

func (s *CacheTestSuite) TestRefresh() {
s.Nil(s.vs.Refresh(context.Background()))
func (s *CacheTestSuite) TestTotalStake() {
ctx := context.Background()

// assert `TotalStake()`
s.Equal(big.NewInt(499500), s.vs.TotalStake())
s.Equal(big.NewInt(499500), s.vs.TotalStake(ctx))

// assert `SignerStakes()`
signerStakes := s.vs.SignerStakes()
s.Len(signerStakes, len(s.sm.Operators))
for i, signer := range s.sm.Operators {
s.Equal(s.sm.Stakes[i], signerStakes[signer])
}
s.sm.Stakes[0] = new(big.Int).Add(s.sm.Stakes[0], common.Big1)
s.Equal(big.NewInt(499500), s.vs.TotalStake(ctx))

// assert `StakeBySigner(common.Address)`
for i, signer := range s.sm.Operators {
s.Equal(s.sm.Stakes[i], s.vs.StakeBySigner(signer))
}
time.Sleep(time.Millisecond * 10)
s.Equal(big.NewInt(499501), s.vs.TotalStake(ctx))
}

// assert `Candidates()`
candidates := s.vs.Candidates()
s.Len(candidates, len(s.sm.Operators))
for i, signer := range s.sm.Operators {
s.Equal(s.sm.Candidates[i], candidates[signer])
}
func (s *CacheTestSuite) TestStakeBySigner() {
ctx := context.Background()

// assert `Candidate(common.Address)`
for i, signer := range s.sm.Operators {
s.Equal(s.sm.Candidates[i], s.vs.Candidate(signer))
s.Equal(s.sm.Stakes[i], s.vs.StakeBySigner(ctx, signer))
}

old := new(big.Int).Set(s.sm.Stakes[0])
s.sm.Stakes[0] = new(big.Int).Add(s.sm.Stakes[0], common.Big1)
s.Equal(old, s.vs.StakeBySigner(ctx, s.sm.Operators[0]))

time.Sleep(time.Millisecond * 10)
s.Equal(s.sm.Stakes[0], s.vs.StakeBySigner(ctx, s.sm.Operators[0]))
}
6 changes: 3 additions & 3 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub(
return false
}
// Receive signatures only from signers with stake >= validator candidate minimum.
if w.stakemanager.StakeBySigner(signer).Cmp(ethutil.TenMillionOAS) == -1 {
if w.stakemanager.StakeBySigner(ctx, signer).Cmp(ethutil.TenMillionOAS) == -1 {
return false
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func (w *Node) handleOptimismSignatureExchangeRequest(

for _, req := range requests {
signer := common.BytesToAddress(req.Signer)
if w.stakemanager.StakeBySigner(signer).Cmp(ethutil.TenMillionOAS) == -1 {
if w.stakemanager.StakeBySigner(ctx, signer).Cmp(ethutil.TenMillionOAS) == -1 {
continue
}

Expand Down Expand Up @@ -652,7 +652,7 @@ func (w *Node) publishLatestSignatures(ctx context.Context) {
}
filterd := []*database.OptimismSignature{}
for _, sig := range latests {
if w.stakemanager.StakeBySigner(sig.Signer.Address).Cmp(ethutil.TenMillionOAS) >= 0 {
if w.stakemanager.StakeBySigner(ctx, sig.Signer.Address).Cmp(ethutil.TenMillionOAS) >= 0 {
filterd = append(filterd, sig)
}
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ func (s *NodeTestSuite) SetupTest() {

// setup stakemanager mock
sm := &testhelper.StakeManagerMock{}
s.stakemanager = stakemanager.NewCache(sm)
s.stakemanager = stakemanager.NewCache(sm, time.Hour)
for _, signer := range signers {
sm.Owners = append(sm.Owners, s.RandAddress())
sm.Operators = append(sm.Operators, signer)
sm.Stakes = append(sm.Stakes, ethutil.TenMillionOAS)
sm.Candidates = append(sm.Candidates, true)
}
s.stakemanager.Refresh(context.Background())

// setup verse pool
s.versepool = verse.NewVersePool(s.b0)
Expand Down
15 changes: 9 additions & 6 deletions submitter/signature_iter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package submitter

import (
"context"
"fmt"
"math/big"
"sort"
Expand All @@ -18,14 +19,14 @@ type signatureIterator struct {
rollupIndex uint64
}

func (si *signatureIterator) next() ([]*database.OptimismSignature, error) {
func (si *signatureIterator) next(ctx context.Context) ([]*database.OptimismSignature, error) {
rows, err := si.db.OPSignature.Find(nil, nil, &si.contract, &si.rollupIndex, 1000, 0)
if err != nil {
return nil, err
}

rows, err = filterSignatures(rows, ethutil.TenMillionOAS,
si.stakemanager.TotalStake(), si.stakemanager.SignerStakes())
rows, err = filterSignatures(rows, ethutil.TenMillionOAS, si.stakemanager.TotalStake(ctx),
func(signer common.Address) *big.Int { return si.stakemanager.StakeBySigner(ctx, signer) })
if err != nil {
return nil, err
}
Expand All @@ -37,20 +38,22 @@ func (si *signatureIterator) next() ([]*database.OptimismSignature, error) {
func filterSignatures(
rows []*database.OptimismSignature,
minStake, totalStake *big.Int,
signerStakes map[common.Address]*big.Int,
stakeBySigner func(signer common.Address) *big.Int,
) (filterd []*database.OptimismSignature, err error) {
// group by RollupHash and Approved
type group struct {
stake *big.Int
rows []*database.OptimismSignature
}
groups := map[string]*group{}
signerStakes := map[common.Address]*big.Int{}

for _, row := range rows {
stake, ok := signerStakes[row.Signer.Address]
if !ok || stake.Cmp(minStake) == -1 {
stake := stakeBySigner(row.Signer.Address)
if stake.Cmp(minStake) == -1 {
continue
}
signerStakes[row.Signer.Address] = stake

key := fmt.Sprintf("%s:%v", row.RollupHash, row.Approved)
if _, ok := groups[key]; !ok {
Expand Down
Loading

0 comments on commit 6e230bb

Please sign in to comment.