Skip to content

Commit

Permalink
feat(monitor/xfeemngr): better gas buffer (#2476)
Browse files Browse the repository at this point in the history
Improve gas price price buffer, simplify tests.

- move all "buffering" to buffer (remove gas price shield)
- replace gas price threshold with offset / tolerance
- simplify xfeemngr tests, just test that onchain matches buffer

issue: none
  • Loading branch information
kevinhalliday authored Nov 14, 2024
1 parent d56de9c commit 283c20e
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 462 deletions.
8 changes: 4 additions & 4 deletions lib/contracts/feeoraclev1/feeparams.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func destFeeParams(ctx context.Context, srcChain evmchain.Metadata, destChain ev
ChainId: destChain.ChainID,
PostsTo: postsTo,
ToNativeRate: rateToNumerator(toNativeRate),
GasPrice: withGasPriceShield(gasPrice),
GasPrice: withGasPriceOffset(gasPrice),
}, nil
}

Expand Down Expand Up @@ -132,10 +132,10 @@ func rateToNumerator(r float64) *big.Int {
return norm
}

// withGasPriceShield returns the gas price with an added xfeemngr.GasPriceShield pct offset.
func withGasPriceShield(gasPrice *big.Int) *big.Int {
// withGasPriceOffset returns the gas price with an added xfeemngr.GasPriceShield pct offset.
func withGasPriceOffset(gasPrice *big.Int) *big.Int {
gasPriceF := float64(gasPrice.Uint64())
return new(big.Int).SetUint64(uint64(gasPriceF + (xfeemngr.GasPriceShield * gasPriceF)))
return new(big.Int).SetUint64(uint64(gasPriceF + (xfeemngr.GasPriceBufferOffset * gasPriceF)))
}

func contains(params []bindings.IFeeOracleV1ChainFeeParams, chainID uint64) bool {
Expand Down
143 changes: 91 additions & 52 deletions monitor/xfeemngr/gasprice/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,82 @@ import (
"fmt"
"sync"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/evmchain"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/monitor/xfeemngr/ticker"

"github.com/ethereum/go-ethereum"
)

type Buffer struct {
mu sync.RWMutex
once sync.Once
buffer map[uint64]uint64 // map chainID to buffered gas price (not changed if outside threshold)
pricers map[uint64]ethereum.GasPricer // map chainID to provider
opts *Opts
type buffer struct {
mu sync.RWMutex
once sync.Once
ticker ticker.Ticker

// map chainID to buffered gas price (not changed if outside threshold)
buffer map[uint64]uint64

// map chainID to provider
pricers map[uint64]ethereum.GasPricer

// pct to offset live -> buffer
// ex. with offset 0.5, live=100, buffered=150 (50% higher)
// if live increases above 150, offset buffer will update to 225 (50% higher)
offset float64

// pct below the buffer the live value be to trigger a buffer decrease
// ex. with tolerance 0.5, buffered=150, live must be below 75 to decrease buffer
tolerance float64
}

type Buffer interface {
GasPrice(chainID uint64) uint64
Stream(ctx context.Context)
}

var _ Buffer = (*buffer)(nil)

// NewBuffer creates a new gas price buffer.
//
// A gas price buffer maintains a buffered view of gas prices for multiple
// chains. Buffered gas prices are not updated unless they are outside the
// threshold percentage. Start steaming gas prices with Buffer.Stream(ctx).
func NewBuffer(pricers map[uint64]ethereum.GasPricer, opts ...func(*Opts)) *Buffer {
return &Buffer{
mu: sync.RWMutex{},
once: sync.Once{},
buffer: make(map[uint64]uint64),
pricers: pricers,
opts: makeOpts(opts),
// chains. Buffered gas prices exceed live prices by an offset. They decrease
// if live prices fall below tolerance.
func NewBuffer(pricers map[uint64]ethereum.GasPricer, offset, tolerance float64, ticker ticker.Ticker) (Buffer, error) {
if offset < 0 {
return nil, errors.New("offset must be >= 0")
}

if tolerance < 0 {
return nil, errors.New("tolerance must be >= 0")
}

if (1+offset)*(1-tolerance) >= 1 {
return nil, errors.New("applying offset would trigger tolerance")
}

return &buffer{
mu: sync.RWMutex{},
once: sync.Once{},
buffer: make(map[uint64]uint64),
pricers: pricers,
offset: offset,
tolerance: tolerance,
ticker: ticker,
}, nil
}

// GasPrice returns the buffered gas price for the given chainID.
// If the price is not known, returns 0.
func (b *Buffer) GasPrice(chainID uint64) uint64 {
p, _ := b.price(chainID)
return p
func (b *buffer) GasPrice(chainID uint64) uint64 {
b.mu.RLock()
defer b.mu.RUnlock()

return b.buffer[chainID]
}

// Stream starts streaming gas prices for all providers into the buffer.
func (b *Buffer) Stream(ctx context.Context) {
func (b *buffer) Stream(ctx context.Context) {
b.once.Do(func() {
ctx = log.WithCtx(ctx, "component", "gasprice.Buffer")
log.Info(ctx, "Streaming gas prices into buffer")
Expand All @@ -52,36 +90,57 @@ func (b *Buffer) Stream(ctx context.Context) {
}

// streamAll starts streaming gas prices for all providers into the buffer.
func (b *Buffer) streamAll(ctx context.Context) {
func (b *buffer) streamAll(ctx context.Context) {
for chainID := range b.pricers {
b.streamOne(ctx, chainID)
}
}

// streamOne starts streaming gas prices for the given chainID into the buffer.
func (b *Buffer) streamOne(ctx context.Context, chainID uint64) {
func (b *buffer) streamOne(ctx context.Context, chainID uint64) {
ctx = log.WithCtx(ctx, "chainID", chainID)
pricer := b.pricers[chainID]
tick := b.opts.ticker
tick := b.ticker

callback := func(ctx context.Context) {
gpriceBig, err := pricer.SuggestGasPrice(ctx)
liveBn, err := pricer.SuggestGasPrice(ctx)
if err != nil {
log.Warn(ctx, "Failed to get gas price (will retry)", err)
return
}

gprice := gpriceBig.Uint64()
guageLive(chainID, gprice)

// if price is buffed, and within threshold, return
buffed, ok := b.price(chainID)
if ok && inThreshold(gprice, buffed, b.opts.thresholdPct) {
live := liveBn.Uint64()
guageLive(chainID, live)

buffed := b.GasPrice(chainID)
tooLow := live > buffed
tooHigh := live < uint64(float64(buffed)*(1-b.tolerance))

log.Debug(ctx, "Checking buffer",
"live", live,
"buffered", buffed,
"too_low", tooLow,
"too_high", tooHigh,
"offset", b.offset,
"tolerance", b.tolerance,
)

// do nothing
if !tooLow && !tooHigh {
log.Debug(ctx, "No update needed")
return
}

b.setPrice(chainID, gprice)
guageBuffered(chainID, gprice)
log.Info(ctx, "Updating buffer",
"live", live,
"buffered", buffed,
"too_low", tooLow,
"too_high", tooHigh,
)

corrected := uint64(float64(live) * (1 + b.offset))
b.setPrice(chainID, corrected)
guageBuffered(chainID, corrected)
}

tick.Go(ctx, callback)
Expand All @@ -98,33 +157,13 @@ func guageBuffered(chainID uint64, price uint64) {
}

// setPrice sets the buffered gas price for the given chainID.
func (b *Buffer) setPrice(chainID, price uint64) {
func (b *buffer) setPrice(chainID, price uint64) {
b.mu.Lock()
defer b.mu.Unlock()

b.buffer[chainID] = price
}

// price returns the buffered gas price for the given chainID.
// If the price is not found, returns 0 and false.
func (b *Buffer) price(chainID uint64) (uint64, bool) {
b.mu.RLock()
defer b.mu.RUnlock()

price, ok := b.buffer[chainID]

return price, ok
}

// inThreshold returns true if a greater or less than b by pct.
func inThreshold(a, b uint64, pct float64) bool {
bf := float64(b)
gt := a > uint64(bf+(bf*pct))
lt := a < uint64(bf-(bf*pct))

return !gt && !lt
}

// chainName returns the name of the chain with the given chainID.
func chainName(chainID uint64) string {
meta, ok := evmchain.MetadataByID(chainID)
Expand Down
72 changes: 33 additions & 39 deletions monitor/xfeemngr/gasprice/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math/rand"
"testing"

"github.com/omni-network/omni/lib/umath"
"github.com/omni-network/omni/monitor/xfeemngr/gasprice"
"github.com/omni-network/omni/monitor/xfeemngr/ticker"

Expand All @@ -25,58 +24,53 @@ func TestBufferStream(t *testing.T) {
// mock gas pricers per chain
mocks := makeMockPricers(initials)

thresh := 0.1
offset := 0.3 // 30% offset
tolerance := 0.5 // 50% tolerance
tick := ticker.NewMock()
ctx := context.Background()

b := gasprice.NewBuffer(toEthGasPricers(mocks), gasprice.WithThresholdPct(thresh), gasprice.WithTicker(tick))

// start streaming gas prices
b.Stream(ctx)

// tick once - initial prices should get buffered
tick.Tick()

// buffered price should be initial
for chainID, price := range initials {
require.Equal(t, price, b.GasPrice(chainID), "initial")
withOffset := func(price uint64) uint64 {
return uint64(float64(price) * (1 + offset))
}

// just increase a little, but not above threshold
for chainID, price := range initials {
delta := umath.SubtractOrZero(uint64(float64(price)*thresh), 1)
mocks[chainID].SetPrice(price + delta)
atTolerance := func(price uint64) uint64 {
return uint64(float64(price) * (1 - tolerance))
}

tick.Tick()

// buffered price should still be initial
for chainID, price := range initials {
require.Equal(t, price, b.GasPrice(chainID), "within threshold")
}
b, err := gasprice.NewBuffer(toEthGasPricers(mocks), offset, tolerance, tick)
require.NoError(t, err)

// increase above threshold
for chainID, price := range initials {
mocks[chainID].SetPrice(price + uint64(float64(price)*thresh)*2)
}
b.Stream(ctx)

// tick once
tick.Tick()

// buffered price should be updated
for chainID, mock := range mocks {
require.Equal(t, mock.Price(), b.GasPrice(chainID), "outside threshold")
}

// reset back to initial
// buffered price should be initial live + offset
for chainID, price := range initials {
mocks[chainID].SetPrice(price)
require.Equal(t, withOffset(price), b.GasPrice(chainID), "initial")
}

tick.Tick()

// buffered price should be initial
for chainID, price := range initials {
require.Equal(t, price, b.GasPrice(chainID), "reset")
// 10 steps
buffed := make(map[uint64]uint64)
for i := 0; i < 10; i++ {
for chainID, mock := range mocks {
buffed[chainID] = b.GasPrice(chainID)
mock.SetPrice(randGasPrice())
}

tick.Tick()

// for each step, we check if buffer properly updates (or doesn't)
for chainID, mock := range mocks {
tooLow := mock.Price() > buffed[chainID]
tooHigh := mock.Price() < atTolerance(buffed[chainID])

if tooHigh || tooLow {
require.Equal(t, withOffset(mock.Price()), b.GasPrice(chainID), 0.01, "should change")
} else {
require.Equal(t, buffed[chainID], b.GasPrice(chainID), 0.01, "should not change")
}
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions monitor/xfeemngr/gasprice/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,35 @@ func (m *MockPricer) Price() uint64 {

return m.price
}

type MockBuffer struct {
mu sync.RWMutex
prices map[uint64]uint64
}

var _ Buffer = (*MockBuffer)(nil)

func NewMockBuffer() *MockBuffer {
return &MockBuffer{
mu: sync.RWMutex{},
prices: make(map[uint64]uint64),
}
}

func (b *MockBuffer) SetGasPrice(chainID uint64, price uint64) {
b.mu.Lock()
defer b.mu.Unlock()

b.prices[chainID] = price
}

func (b *MockBuffer) GasPrice(chainID uint64) uint64 {
b.mu.RLock()
defer b.mu.RUnlock()

return b.prices[chainID]
}

func (*MockBuffer) Stream(context.Context) {
// no-op
}
Loading

0 comments on commit 283c20e

Please sign in to comment.