Skip to content

Commit

Permalink
feat(relayer): flags for processing and indexing minimum amounts (#17685
Browse files Browse the repository at this point in the history
)
  • Loading branch information
cyberhorsey authored Jun 27, 2024
1 parent db6ccdf commit 04a3370
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 21 deletions.
8 changes: 8 additions & 0 deletions packages/relayer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ var (
Category: indexerCategory,
EnvVars: []string{"TARGET_BLOCK_NUMBER"},
}
MinFeeToIndex = &cli.Uint64Flag{
Name: "minFeeToIndex",
Usage: "Minimum fee to index and add to the queue (will still be saved to database)",
Category: indexerCategory,
Value: 0,
EnvVars: []string{"MIN_FEE_TO_INDEX"},
}
)

var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
Expand All @@ -115,5 +122,6 @@ var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
NumLatestBlocksEndWhenCrawling,
NumLatestBlocksStartWhenCrawling,
EventName,
MinFeeToIndex,
TargetBlockNumber,
})
8 changes: 8 additions & 0 deletions packages/relayer/cmd/flags/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ var (
Required: false,
EnvVars: []string{"DEST_QUOTA_MANAGER_ADDRESS"},
}
MinFeeToProcess = &cli.Uint64Flag{
Name: "minFeeToProcess",
Usage: "Minimum fee to process",
Category: processorCategory,
Value: 0,
EnvVars: []string{"MIN_FEE_TO_PROCESS"},
}
)

var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{
Expand All @@ -166,5 +173,6 @@ var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{
CacheOption,
UnprofitableMessageQueueExpiration,
MaxMessageRetries,
MinFeeToProcess,
DestQuotaManagerAddress,
})
2 changes: 2 additions & 0 deletions packages/relayer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Config struct {
TargetBlockNumber *uint64
BackOffRetryInterval time.Duration
BackOffMaxRetries uint64
MinFeeToIndex uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
}
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
EventName: c.String(flags.EventName.Name),
BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name),
MinFeeToIndex: c.Uint64(flags.MinFeeToIndex.Name),
TargetBlockNumber: func() *uint64 {
if c.IsSet(flags.TargetBlockNumber.Name) {
value := c.Uint64(flags.TargetBlockNumber.Name)
Expand Down
7 changes: 7 additions & 0 deletions packages/relayer/indexer/handle_message_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func (i *Indexer) handleMessageSentEvent(
return nil
}

if i.minFeeToIndex != 0 && event.Message.Fee < i.minFeeToIndex {
slog.Warn("Fee is less than minFeeToIndex, not adding to queue",
"fee", event.Message.Fee,
"minFeeToIndex", i.minFeeToIndex,
)
}

msg := queue.QueueMessageSentBody{
ID: id,
Event: event,
Expand Down
6 changes: 6 additions & 0 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ type Indexer struct {

eventName string

minFeeToIndex uint64

cfg *Config
}

Expand Down Expand Up @@ -241,6 +243,10 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {

i.ctx = ctx

i.minFeeToIndex = i.cfg.MinFeeToIndex

slog.Info("minFeeToIndex", "minFeeToIndex", i.minFeeToIndex)

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Config struct {
TxmgrConfigs *txmgr.CLIConfig

MaxMessageRetries uint64
MinFeeToProcess uint64
}

// NewConfigFromCliContext creates a new config instance from command line flags.
Expand Down Expand Up @@ -176,6 +177,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
c,
),
MaxMessageRetries: c.Uint64(flags.MaxMessageRetries.Name),
MinFeeToProcess: c.Uint64(flags.MinFeeToProcess.Name),
OpenDBFunc: func() (DB, error) {
return db.OpenDBConnection(db.DBConnectionOpts{
Name: c.String(flags.DatabaseUsername.Name),
Expand Down
17 changes: 15 additions & 2 deletions packages/relayer/processor/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,19 @@ func (p *Processor) processMessage(
return false, msgBody.TimesRetried, nil
}

if err := p.waitForConfirmations(ctx, msgBody.Event.Raw.TxHash, msgBody.Event.Raw.BlockNumber); err != nil {
return false, msgBody.TimesRetried, err
// we never want to process messages below a certain fee, if set.
// return a nil error, and we will successfully acknowledge this.
if p.minFeeToProcess != 0 && msgBody.Event.Message.Fee < p.minFeeToProcess {
slog.Warn("minFeeToProcess not met",
"minFeeToProcess", p.minFeeToProcess,
"fee", msgBody.Event.Message.Fee,
"srcTxHash", msgBody.Event.Raw.TxHash.Hex(),
)

return false, msgBody.TimesRetried, nil
}

// check message process eligibility before waiting for confirmations to process
eventStatus, err := p.eventStatusFromMsgHash(ctx, msgBody.Event.MsgHash)
if err != nil {
return false, msgBody.TimesRetried, errors.Wrap(err, "p.eventStatusFromMsgHash")
Expand All @@ -130,6 +139,10 @@ func (p *Processor) processMessage(
return false, msgBody.TimesRetried, nil
}

if err := p.waitForConfirmations(ctx, msgBody.Event.Raw.TxHash, msgBody.Event.Raw.BlockNumber); err != nil {
return false, msgBody.TimesRetried, err
}

// check paused status
paused, err := p.destBridge.Paused(&bind.CallOpts{
Context: ctx,
Expand Down
37 changes: 18 additions & 19 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Processor struct {

processingTxHashes map[common.Hash]bool
processingTxHashMu sync.Mutex

minFeeToProcess uint64
}

// InitFromCli creates a new processor from a cli context
Expand Down Expand Up @@ -373,6 +375,10 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {

p.processingTxHashes = make(map[common.Hash]bool, 0)

p.minFeeToProcess = p.cfg.MinFeeToProcess

slog.Info("minFeeToProcess", "minFeeToProcess", p.minFeeToProcess)

return nil
}

Expand Down Expand Up @@ -488,24 +494,9 @@ func (p *Processor) eventLoop(ctx context.Context) {
strings.Contains(err.Error(), "i/o") ||
strings.Contains(err.Error(), "connect") ||
strings.Contains(err.Error(), "failed to get tx into the mempool"):
// we want to do nothing, just log, and the message will be re-picked up
// by another consumer. no need to nack or ack.
slog.Error("process message failed", "err", err.Error())

// we want to negatively acknowledge the message and make sure
// we requeue it
if err := p.queue.Nack(ctx, m, false); err != nil {
slog.Error("Err nacking message", "err", err.Error())
break
}

marshalledMsg, err := json.Marshal(msg)
if err != nil {
slog.Error("err marshaling queue message", "err", err.Error())
break
}

if err := p.queue.Publish(ctx, p.queueName(), marshalledMsg, nil, nil); err != nil {
slog.Error("err publishing to queue", "err", err.Error())
}
default:
slog.Error("process message failed", "err", err.Error())

Expand All @@ -520,11 +511,19 @@ func (p *Processor) eventLoop(ctx context.Context) {
}

if shouldRequeue {
// we want to negatively acknowledge the message and make sure
// we requeue it
// we want to negatively acknowledge the message
if err := p.queue.Nack(ctx, m, true); err != nil {
slog.Error("Err nacking message", "err", err.Error())
}

marshalledMsg, err := json.Marshal(msg)
if err != nil {
slog.Error("err marshaling queue message", "err", err.Error())
} else {
if err := p.queue.Publish(ctx, p.queueName(), marshalledMsg, nil, nil); err != nil {
slog.Error("err publishing to queue", "err", err.Error())
}
}
} else {
// otherwise if no error, we can acknowledge it successfully.
if err := p.queue.Ack(ctx, m); err != nil {
Expand Down

0 comments on commit 04a3370

Please sign in to comment.