Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): MessageStatusChanged events #13272

Merged
merged 19 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ linters:

linters-settings:
funlen:
lines: 130
statements: 52
lines: 132
statements: 54
gocognit:
min-complexity: 40
min-complexity: 41

issues:
exclude-rules:
Expand Down
9 changes: 9 additions & 0 deletions packages/relayer/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,13 @@ type Bridge interface {
GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error)
ProcessMessage(opts *bind.TransactOpts, message bridge.IBridgeMessage, proof []byte) (*types.Transaction, error)
IsMessageReceived(opts *bind.CallOpts, msgHash [32]byte, srcChainId *big.Int, proof []byte) (bool, error) // nolint
FilterMessageStatusChanged(
opts *bind.FilterOpts,
msgHash [][32]byte,
) (*bridge.BridgeMessageStatusChangedIterator, error)
WatchMessageStatusChanged(
opts *bind.WatchOpts,
sink chan<- *bridge.BridgeMessageStatusChanged,
msgHash [][32]byte,
) (event.Subscription, error)
}
11 changes: 10 additions & 1 deletion packages/relayer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
)

var (
EventNameMessageSent = "MessageSent"
EventNameMessageSent = "MessageSent"
EventNameMessageStatusChanged = "MessageStatusChanged"
)

// EventStatus is used to indicate whether processing has been attempted
Expand Down Expand Up @@ -54,6 +55,8 @@ type Event struct {
CanonicalTokenName string `json:"canonicalTokenName"`
CanonicalTokenDecimals uint8 `json:"canonicalTokenDecimals"`
Amount string `json:"amount"`
MsgHash string `json:"msgHash"`
MessageOwner string `json:"messageOwner"`
}

// SaveEventOpts
Expand All @@ -68,6 +71,8 @@ type SaveEventOpts struct {
CanonicalTokenName string
CanonicalTokenDecimals uint8
Amount string
MsgHash string
MessageOwner string
}

// EventRepository is used to interact with events in the store
Expand All @@ -83,4 +88,8 @@ type EventRepository interface {
ctx context.Context,
address common.Address,
) ([]*Event, error)
FindAllByMsgHash(
ctx context.Context,
msgHash string,
) ([]*Event, error)
}
25 changes: 20 additions & 5 deletions packages/relayer/indexer/filter_then_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,31 @@ func (svc *Service) FilterThenSubscribe(
end = header.Number.Uint64()
}

events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{
filterOpts := &bind.FilterOpts{
Start: svc.processingBlockHeight,
End: &end,
Context: ctx,
}, nil)
}

messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageStatusChanged")
}

// we dont need to do anything with msgStatus events except save them to the DB.
// we dont need to process them. they are for exposing via the API.

err = svc.saveMessageStatusChangedEvents(ctx, chainID, messageStatusChangedEvents)
if err != nil {
return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents")
}

messageSentEvents, err := svc.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

if !events.Next() || events.Event == nil {
if !messageSentEvents.Next() || messageSentEvents.Event == nil {
if err := svc.handleNoEventsInBatch(ctx, chainID, int64(end)); err != nil {
return errors.Wrap(err, "svc.handleNoEventsInBatch")
}
Expand All @@ -83,7 +98,7 @@ func (svc *Service) FilterThenSubscribe(
group.SetLimit(svc.numGoroutines)

for {
event := events.Event
event := messageSentEvents.Event

group.Go(func() error {
err := svc.handleEvent(groupCtx, chainID, event)
Expand All @@ -97,7 +112,7 @@ func (svc *Service) FilterThenSubscribe(
})

// if there are no more events
if !events.Next() {
if !messageSentEvents.Next() {
// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
Expand Down
9 changes: 6 additions & 3 deletions packages/relayer/indexer/filter_then_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func Test_FilterThenSubscribe(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}

func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) {
Expand All @@ -45,7 +46,8 @@ func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}

func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) {
Expand All @@ -65,5 +67,6 @@ func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}
4 changes: 3 additions & 1 deletion packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (svc *Service) handleEvent(
}

e, err := svc.eventRepo.Save(ctx, relayer.SaveEventOpts{
Name: eventName,
Name: relayer.EventNameMessageSent,
Data: string(marshaled),
ChainID: chainID,
Status: eventStatus,
Expand All @@ -62,6 +62,8 @@ func (svc *Service) handleEvent(
CanonicalTokenName: canonicalToken.Name,
CanonicalTokenDecimals: canonicalToken.Decimals,
Amount: amount.String(),
MsgHash: common.Hash(event.MsgHash).Hex(),
MessageOwner: event.Message.Owner.Hex(),
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
Expand Down
74 changes: 74 additions & 0 deletions packages/relayer/indexer/save_message_status_changed_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package indexer

import (
"context"
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts/bridge"
)

func (svc *Service) saveMessageStatusChangedEvents(
ctx context.Context,
chainID *big.Int,
events *bridge.BridgeMessageStatusChangedIterator,
) error {
if !events.Next() {
log.Infof("no messageStatusChanged events")
return nil
}

for {
event := events.Event
log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex())

if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil {
return errors.Wrap(err, "svc.saveMessageStatusChangedEvent")
}

if !events.Next() {
return nil
}
}
}

func (svc *Service) saveMessageStatusChangedEvent(
ctx context.Context,
chainID *big.Int,
event *bridge.BridgeMessageStatusChanged,
) error {
marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

// get the previous MessageSent event or other message status changed events,
// so we can find out the previous owner of this msg hash,
// to save to the db.
previousEvents, err := svc.eventRepo.FindAllByMsgHash(ctx, common.Hash(event.MsgHash).Hex())
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash")
}

if len(previousEvents) == 0 {
return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash")
}

_, err = svc.eventRepo.Save(ctx, relayer.SaveEventOpts{
Name: relayer.EventNameMessageStatusChanged,
Data: string(marshaled),
ChainID: chainID,
Status: relayer.EventStatus(event.Status),
MessageOwner: previousEvents[0].MessageOwner,
MsgHash: common.Hash(event.MsgHash).Hex(),
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
}

return nil
}
59 changes: 58 additions & 1 deletion packages/relayer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -16,6 +17,25 @@ import (
func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
log.Info("subscribing to new events")

errChan := make(chan error)

go svc.subscribeMessageSent(ctx, chainID, errChan)

go svc.subscribeMessageStatusChanged(ctx, chainID, errChan)

// nolint: gosimple
for {
select {
case <-ctx.Done():
log.Info("context finished")
return nil
case err := <-errChan:
return errors.Wrap(err, "errChan")
}
}
}

func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, errChan chan error) {
sink := make(chan *bridge.BridgeMessageSent)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
Expand All @@ -32,11 +52,16 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {

for {
select {
case <-ctx.Done():
log.Info("context finished")
return
case err := <-sub.Err():
return errors.Wrap(err, "sub.Err()")
errChan <- errors.Wrap(err, "sub.Err()")
case event := <-sink:
go func() {
log.Infof("new message sent event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String())
err := svc.handleEvent(ctx, chainID, event)

if err != nil {
log.Errorf("svc.subscribe, svc.handleEvent: %v", err)
}
Expand All @@ -63,3 +88,35 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
}
}
}

func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID *big.Int, errChan chan error) {
sink := make(chan *bridge.BridgeMessageStatusChanged)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err)
}

return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{
Context: ctx,
}, sink, nil)
})

defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
log.Info("context finished")
return
case err := <-sub.Err():
davidtaikocha marked this conversation as resolved.
Show resolved Hide resolved
errChan <- errors.Wrap(err, "sub.Err()")
case event := <-sink:
log.Infof("new message status changed event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String())

if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil {
log.Errorf("svc.subscribe, svc.saveMessageStatusChangedEvent: %v", err)
}
}
}
}
5 changes: 3 additions & 2 deletions packages/relayer/indexer/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func Test_subscribe(t *testing.T) {

b := bridge.(*mock.Bridge)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, 1, b.MessagesSent)
assert.Equal(t, 1, b.MessageStatusesChanged)
assert.Equal(t, 2, b.ErrorsSent)
}
Loading