Skip to content

Commit

Permalink
add gas price dynamic adjust in follower mode (#34)
Browse files Browse the repository at this point in the history
* add dynamic adjust for follower

* modify the config

* fix lint and schema

* modify the comment

* modify the comment

* make doc
  • Loading branch information
lyh169 authored and scf0220 committed Dec 12, 2023
1 parent 283a53f commit 88bb2d2
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 50 deletions.
18 changes: 18 additions & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ MaxGasPriceWei = 0
#GasPriceUsdt = 0.0001
#L2CoinId = 7184

## the follower strategy config demo that can dynamic adjust the factor through get the L1 and L2 coin price from kafka
#Type = "follower"
#UpdatePeriod = "10s"
#Factor = 1
#DefaultGasPriceWei = 1000000000
#KafkaURL = "127.0.0.1:9092"
#Topic = "middle_coinPrice_push"
#GroupID = "web3_okbc_explorerchainprice"
## just in SASL_SSL mode
#Username = ""
#Password = ""
#RootCAPath = "only-4096-ca-cert"
#L1CoinId = 15756
#L2CoinId = 7184
#DefaultL2CoinPrice = 40
#DefaultL1CoinPrice = 1600
#EnableFollowerAdjustByL2L1Price = true # dynamic adjust the factor through the L1 and L2 coins price in follower strategy

[MTClient]
URI = "zkevm-prover:50061"

Expand Down
6 changes: 6 additions & 0 deletions gasprice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ type Config struct {
Username string `mapstructure:"Username"`
Password string `mapstructure:"Password"`
RootCAPath string `mapstructure:"RootCAPath"`
L1CoinId int `mapstructure:"L1CoinId"`
L2CoinId int `mapstructure:"L2CoinId"`
// DefaultL1CoinPrice is the L1 token's coin price
DefaultL1CoinPrice float64 `mapstructure:"DefaultL1CoinPrice"`
// DefaultL2CoinPrice is the native token's coin price
DefaultL2CoinPrice float64 `mapstructure:"DefaultL2CoinPrice"`
GasPriceUsdt float64 `mapstructure:"GasPriceUsdt"`

// EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy
EnableFollowerAdjustByL2L1Price bool `mapstructure:"EnableFollowerAdjustByL2L1Price"`

Factor float64 `mapstructure:"Factor"`
}
6 changes: 3 additions & 3 deletions gasprice/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

const (
// OKBWei OKB wei
OKBWei = 1e18
minOKBWei = 1e-18
OKBWei = 1e18
minCoinPrice = 1e-18
)

// FixedGasPrice struct
Expand Down Expand Up @@ -49,7 +49,7 @@ func (f *FixedGasPrice) UpdateGasPriceAvg() {
}

l2CoinPrice := f.ratePrc.GetL2CoinPrice()
if l2CoinPrice < minOKBWei {
if l2CoinPrice < minCoinPrice {
log.Warn("the L2 native coin price too small...")
return
}
Expand Down
23 changes: 19 additions & 4 deletions gasprice/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (

// FollowerGasPrice struct.
type FollowerGasPrice struct {
cfg Config
pool poolInterface
ctx context.Context
eth ethermanInterface
cfg Config
pool poolInterface
ctx context.Context
eth ethermanInterface
kafkaPrc *KafkaProcessor
}

// newFollowerGasPriceSuggester inits l2 follower gas price suggester which is based on the l1 gas price.
Expand All @@ -26,6 +27,9 @@ func newFollowerGasPriceSuggester(ctx context.Context, cfg Config, pool poolInte
ctx: ctx,
eth: ethMan,
}
if cfg.EnableFollowerAdjustByL2L1Price {
gps.kafkaPrc = newKafkaProcessor(cfg, ctx)
}
gps.UpdateGasPriceAvg()
return gps
}
Expand All @@ -44,6 +48,17 @@ func (f *FollowerGasPrice) UpdateGasPriceAvg() {
factor := big.NewFloat(0).SetFloat64(f.cfg.Factor)
res := new(big.Float).Mul(factor, big.NewFloat(0).SetInt(l1GasPrice))

// convert the eth gas price to okb gas price
if f.cfg.EnableFollowerAdjustByL2L1Price {
l1CoinPrice, l2CoinPrice := f.kafkaPrc.GetL1L2CoinPrice()
if l1CoinPrice < minCoinPrice || l2CoinPrice < minCoinPrice {
log.Warn("the L1 or L2 native coin price too small...")
return
}
res = new(big.Float).Mul(big.NewFloat(0).SetFloat64(l1CoinPrice/l2CoinPrice), res)
log.Debug("L2 pre gas price value: ", res.String(), ". L1 coin price: ", l1CoinPrice, ". L2 coin price: ", l2CoinPrice)
}

// Store l2 gasPrice calculated
result := new(big.Int)
res.Int(result)
Expand Down
128 changes: 107 additions & 21 deletions gasprice/kafka_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"github.com/0xPolygonHermez/zkevm-node/log"
kafka "github.com/segmentio/kafka-go"
Expand All @@ -17,10 +18,16 @@ import (

const (
okbcoinId = 7184
ethcoinId = 15756
defaultTime = 10
defaultMaxData = 10e6 // 10M
)

var (
// ErrNotFindCoinPrice not find a correct coin price
ErrNotFindCoinPrice = errors.New("not find a correct coin price")
)

// MsgInfo msg info
type MsgInfo struct {
Topic string `json:"topic"`
Expand Down Expand Up @@ -63,25 +70,43 @@ type Price struct {
Id string `json:"id"`
}

// L1L2PriceRecord l1 l2 coin price record
type L1L2PriceRecord struct {
l1Price float64
l2Price float64
l1Update bool
l2Update bool
}

// KafkaProcessor kafka processor
type KafkaProcessor struct {
kreader *kafka.Reader
L2Price float64
ctx context.Context
rwLock sync.RWMutex
l2CoinId int
cfg Config
kreader *kafka.Reader
ctx context.Context
rwLock sync.RWMutex
l1CoinId int
l2CoinId int
l1Price float64
l2Price float64
tmpPrices L1L2PriceRecord
}

func newKafkaProcessor(cfg Config, ctx context.Context) *KafkaProcessor {
rp := &KafkaProcessor{
cfg: cfg,
kreader: getKafkaReader(cfg),
L2Price: cfg.DefaultL2CoinPrice,
l1Price: cfg.DefaultL1CoinPrice,
l2Price: cfg.DefaultL2CoinPrice,
ctx: ctx,
l2CoinId: okbcoinId,
l1CoinId: ethcoinId,
}
if cfg.L2CoinId != 0 {
rp.l2CoinId = cfg.L2CoinId
}
if cfg.L1CoinId != 0 {
rp.l1CoinId = cfg.L1CoinId
}

go rp.processor()
return rp
Expand Down Expand Up @@ -129,52 +154,113 @@ func (rp *KafkaProcessor) processor() {
case <-rp.ctx.Done():
return
default:
value, err := rp.ReadAndCalc(rp.ctx)
if err != nil {
err := rp.ReadAndUpdate(rp.ctx)
if err != nil && err != ErrNotFindCoinPrice {
log.Warn("get the destion data fail ", err)
time.Sleep(time.Second * defaultTime)
continue
}
rp.updateL2CoinPrice(value)
}
}
}

// ReadAndCalc read and calc
func (rp *KafkaProcessor) ReadAndCalc(ctx context.Context) (float64, error) {
// ReadAndUpdate read and update
func (rp *KafkaProcessor) ReadAndUpdate(ctx context.Context) error {
m, err := rp.kreader.ReadMessage(ctx)
if err != nil {
return 0, err
return err
}
return rp.parseL2CoinPrice(m.Value)
return rp.Update(m.Value)
}

// Update update the coin price
func (rp *KafkaProcessor) Update(data []byte) error {
if rp.cfg.Type == FixedType {
price, err := rp.parseCoinPrice(data, []int{rp.l2CoinId})
if err == nil {
rp.updateL2CoinPrice(price[rp.l2CoinId])
}
return err
} else if rp.cfg.Type == FollowerType {
prices, err := rp.parseCoinPrice(data, []int{rp.l1CoinId, rp.l2CoinId})
if err == nil {
rp.updateL1L2CoinPrice(prices)
}
return err
}
return nil
}

func (rp *KafkaProcessor) updateL2CoinPrice(price float64) {
rp.rwLock.Lock()
defer rp.rwLock.Unlock()
rp.L2Price = price
rp.l2Price = price
}

// GetL2CoinPrice get L2 coin price
func (rp *KafkaProcessor) GetL2CoinPrice() float64 {
rp.rwLock.RLock()
defer rp.rwLock.RUnlock()
return rp.L2Price
return rp.l2Price
}

func (rp *KafkaProcessor) updateL1L2CoinPrice(prices map[int]float64) {
if len(prices) == 0 {
return
}
rp.rwLock.Lock()
defer rp.rwLock.Unlock()
if v, ok := prices[rp.l1CoinId]; ok {
rp.tmpPrices.l1Price = v
rp.tmpPrices.l1Update = true
}
if v, ok := prices[rp.l2CoinId]; ok {
rp.tmpPrices.l2Price = v
rp.tmpPrices.l2Update = true
}
if rp.tmpPrices.l1Update && rp.tmpPrices.l2Update {
rp.l1Price = rp.tmpPrices.l1Price
rp.l2Price = rp.tmpPrices.l2Price
rp.tmpPrices.l1Update = false
rp.tmpPrices.l2Update = false
return
}
}

// GetL1L2CoinPrice get l1, L2 coin price
func (rp *KafkaProcessor) GetL1L2CoinPrice() (float64, float64) {
rp.rwLock.RLock()
defer rp.rwLock.RUnlock()
return rp.l1Price, rp.l2Price
}

func (rp *KafkaProcessor) parseL2CoinPrice(value []byte) (float64, error) {
func (rp *KafkaProcessor) parseCoinPrice(value []byte, coinIds []int) (map[int]float64, error) {
if len(coinIds) == 0 {
return nil, fmt.Errorf("the params coinIds is empty")
}
msgI := &MsgInfo{}
err := json.Unmarshal(value, &msgI)
if err != nil {
return 0, err
return nil, err
}
if msgI.Data == nil || len(msgI.Data.PriceList) == 0 {
return 0, fmt.Errorf("the data PriceList is empty")
return nil, fmt.Errorf("the data PriceList is empty")
}
mp := make(map[int]*Price)
for _, price := range msgI.Data.PriceList {
if price.CoinId == rp.l2CoinId {
return price.Price, nil
mp[price.CoinId] = price
}

results := make(map[int]float64)
for _, coinId := range coinIds {
if coin, ok := mp[coinId]; ok {
results[coinId] = coin.Price
} else {
log.Debugf("not find a correct coin price coin id is =%v", coinId)
}
}
return 0, fmt.Errorf("not find a correct coin price coinId=%v", rp.l2CoinId)
if len(results) == 0 {
return results, ErrNotFindCoinPrice
}
return results, nil
}
Loading

0 comments on commit 88bb2d2

Please sign in to comment.