Skip to content

Commit

Permalink
chore(relayer): upgrade event db operation (#17699)
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp authored Jul 2, 2024
1 parent 9e928ad commit bf593ee
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 32 deletions.
1 change: 1 addition & 0 deletions packages/relayer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type EventRepository interface {
) (uint64, error)
DeleteAllAfterBlockID(blockID uint64, srcChainID uint64, destChainID uint64) error
FindLatestBlockID(
ctx context.Context,
event string,
srcChainID uint64,
destChainID uint64,
Expand Down
2 changes: 1 addition & 1 deletion packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (i *Indexer) indexMessageSentEvents(ctx context.Context,
}

func (i *Indexer) checkReorg(ctx context.Context, emittedInBlockNumber uint64) error {
n, err := i.eventRepo.FindLatestBlockID(i.eventName, i.srcChainId.Uint64(), i.destChainId.Uint64())
n, err := i.eventRepo.FindLatestBlockID(ctx, i.eventName, i.srcChainId.Uint64(), i.destChainId.Uint64())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (i *Indexer) setInitialIndexingBlockByMode(
case Sync:
// get most recently processed block height from the DB
latest, err := i.eventRepo.FindLatestBlockID(
i.ctx,
i.eventName,
chainID.Uint64(),
i.destChainId.Uint64(),
Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/pkg/http/get_block_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (srv *Server) GetBlockInfo(c echo.Context) error {
}

latestProcessedSrcBlock, err := srv.eventRepo.FindLatestBlockID(
c.Request().Context(),
relayer.EventNameMessageSent,
srcChainID.Uint64(),
destChainID.Uint64(),
Expand All @@ -89,6 +90,7 @@ func (srv *Server) GetBlockInfo(c echo.Context) error {
}

latestProcessedDestBlock, err := srv.eventRepo.FindLatestBlockID(
c.Request().Context(),
relayer.EventNameMessageSent,
destChainID.Uint64(),
srcChainID.Uint64(),
Expand Down
1 change: 1 addition & 0 deletions packages/relayer/pkg/mock/event_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (r *EventRepository) DeleteAllAfterBlockID(blockID uint64, srcChainID uint6

// GetLatestBlockID get latest block id
func (r *EventRepository) FindLatestBlockID(
ctx context.Context,
event string,
srcChainID uint64,
destChainID uint64,
Expand Down
80 changes: 49 additions & 31 deletions packages/relayer/pkg/repo/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package repo

import (
"context"
"strings"
"time"

"net/http"
"strings"

"github.com/morkid/paginate"
"github.com/pkg/errors"
Expand Down Expand Up @@ -64,7 +62,7 @@ func (r *EventRepository) Save(ctx context.Context, opts *relayer.SaveEventOpts)
EmittedBlockID: opts.EmittedBlockID,
}

if err := r.db.GormDB().Create(e).Error; err != nil {
if err := r.db.GormDB().WithContext(ctx).Create(e).Error; err != nil {
return nil, errors.Wrap(err, "r.db.Create")
}

Expand All @@ -76,36 +74,54 @@ func (r *EventRepository) UpdateFeesAndProfitability(
id int,
opts *relayer.UpdateFeesAndProfitabilityOpts,
) error {
e := &relayer.Event{}
if err := r.db.GormDB().Where("id = ?", id).First(e).Error; err != nil {
return errors.Wrap(err, "r.db.First")
tx := r.db.GormDB().WithContext(ctx)
tx = tx.Model(&relayer.Event{})
tx = tx.Where("id = ?", id)

// check if existed.
var count int64
if err := tx.Count(&count).Error; err != nil {
return errors.Wrap(err, "r.db.Count")
}

if count == 0 {
return gorm.ErrRecordNotFound
}

e.Fee = &opts.Fee
e.DestChainBaseFee = &opts.DestChainBaseFee
e.GasTipCap = &opts.GasTipCap
e.GasLimit = &opts.GasLimit
e.IsProfitable = &opts.IsProfitable
e.EstimatedOnchainFee = &opts.EstimatedOnchainFee
currentTime := time.Now().UTC()
e.IsProfitableEvaluatedAt = &currentTime

if err := r.db.GormDB().Save(e).Error; err != nil {
return errors.Wrap(err, "r.db.Save")
err := tx.Updates(map[string]interface{}{
"fee": opts.Fee,
"dest_chain_base_fee": opts.DestChainBaseFee,
"gas_tip_cap": opts.GasTipCap,
"gas_limit": opts.GasLimit,
"is_profitable": opts.IsProfitable,
"estimated_onchain_fee": opts.EstimatedOnchainFee,
"is_profitable_evaluated_at": opts.IsProfitableEvaluatedAt,
}).Error

if err != nil {
return errors.Wrap(err, "r.db.Commit")
}

return nil
}

func (r *EventRepository) UpdateStatus(ctx context.Context, id int, status relayer.EventStatus) error {
e := &relayer.Event{}
if err := r.db.GormDB().Where("id = ?", id).First(e).Error; err != nil {
return errors.Wrap(err, "r.db.First")
tx := r.db.GormDB().WithContext(ctx)
tx = tx.Model(&relayer.Event{})
tx = tx.Where("id = ?", id)

// check if existed.
var count int64
if err := tx.Count(&count).Error; err != nil {
return errors.Wrap(err, "r.db.Count")
}

e.Status = status
if err := r.db.GormDB().Save(e).Error; err != nil {
return errors.Wrap(err, "r.db.Save")
if count == 0 {
return gorm.ErrRecordNotFound
}

if err := tx.Update("status", status).Error; err != nil {
return errors.Wrap(err, "tx.Commit")
}

return nil
Expand All @@ -117,7 +133,7 @@ func (r *EventRepository) FirstByMsgHash(
) (*relayer.Event, error) {
e := &relayer.Event{}
// find all message sent events
if err := r.db.GormDB().Where("msg_hash = ?", msgHash).
if err := r.db.GormDB().WithContext(ctx).Where("msg_hash = ?", msgHash).
First(&e).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
Expand All @@ -136,7 +152,7 @@ func (r *EventRepository) FirstByEventAndMsgHash(
) (*relayer.Event, error) {
e := &relayer.Event{}
// find all message sent events
if err := r.db.GormDB().Where("msg_hash = ?", msgHash).
if err := r.db.GormDB().WithContext(ctx).Where("msg_hash = ?", msgHash).
Where("event = ?", event).
First(&e).Error; err != nil {
if err == gorm.ErrRecordNotFound {
Expand All @@ -158,7 +174,7 @@ func (r *EventRepository) FindAllByAddress(
DefaultSize: 100,
})

q := r.db.GormDB().
q := r.db.GormDB().WithContext(ctx).
Model(&relayer.Event{}).
Where(
"dest_owner_json = ? OR message_owner = ?",
Expand Down Expand Up @@ -196,7 +212,7 @@ func (r *EventRepository) Delete(
ctx context.Context,
id int,
) error {
return r.db.GormDB().Delete(relayer.Event{}, id).Error
return r.db.GormDB().WithContext(ctx).Delete(relayer.Event{}, id).Error
}

func (r *EventRepository) ChainDataSyncedEventByBlockNumberOrGreater(
Expand All @@ -207,7 +223,7 @@ func (r *EventRepository) ChainDataSyncedEventByBlockNumberOrGreater(
) (*relayer.Event, error) {
e := &relayer.Event{}
// find all message sent events
if err := r.db.GormDB().Where("name = ?", relayer.EventNameChainDataSynced).
if err := r.db.GormDB().WithContext(ctx).Where("name = ?", relayer.EventNameChainDataSynced).
Where("chain_id = ?", srcChainId).
Where("synced_chain_id = ?", syncedChainId).
Where("block_id >= ?", blockNumber).
Expand All @@ -231,7 +247,7 @@ func (r *EventRepository) LatestChainDataSyncedEvent(
) (uint64, error) {
blockID := 0
// find all message sent events
if err := r.db.GormDB().Table("events").
if err := r.db.GormDB().WithContext(ctx).Table("events").
Where("chain_id = ?", srcChainId).
Where("synced_chain_id = ?", syncedChainId).
Select("COALESCE(MAX(block_id), 0)").
Expand All @@ -257,6 +273,7 @@ WHERE block_id >= ? AND chain_id = ? AND dest_chain_id = ?`

// GetLatestBlockID get latest block id
func (r *EventRepository) FindLatestBlockID(
ctx context.Context,
event string,
srcChainID uint64,
destChainID uint64,
Expand All @@ -266,7 +283,8 @@ func (r *EventRepository) FindLatestBlockID(

var b uint64

if err := r.db.GormDB().Table("events").Raw(q, srcChainID, destChainID, event).Scan(&b).Error; err != nil {
if err := r.db.GormDB().WithContext(ctx).Table("events").
Raw(q, srcChainID, destChainID, event).Scan(&b).Error; err != nil {
return 0, err
}

Expand Down

0 comments on commit bf593ee

Please sign in to comment.