From 8932f85512e0fcd2634969042472845b2b308da2 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 7 Mar 2023 12:08:52 -0800 Subject: [PATCH 01/10] wip filter/sub for messagestatuschangedevents --- packages/relayer/.golangci.yml | 4 +- packages/relayer/bridge.go | 9 +++ packages/relayer/event.go | 3 +- .../relayer/indexer/filter_then_subscribe.go | 25 ++++++-- .../save_message_status_changed_events.go | 60 +++++++++++++++++++ packages/relayer/indexer/subscribe.go | 50 +++++++++++++++- packages/relayer/indexer/subscribe_test.go | 5 +- packages/relayer/mock/bridge.go | 40 ++++++++++++- 8 files changed, 184 insertions(+), 12 deletions(-) create mode 100644 packages/relayer/indexer/save_message_status_changed_events.go diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 25aa24eb396..e62f74c8761 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -28,10 +28,10 @@ linters: linters-settings: funlen: - lines: 130 + lines: 132 statements: 52 gocognit: - min-complexity: 40 + min-complexity: 41 issues: exclude-rules: diff --git a/packages/relayer/bridge.go b/packages/relayer/bridge.go index 0ab984735c3..506a6eb282c 100644 --- a/packages/relayer/bridge.go +++ b/packages/relayer/bridge.go @@ -19,4 +19,13 @@ type Bridge interface { GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error) ProcessMessage(opts *bind.TransactOpts, message bridge.IBridgeMessage, proof []byte) (*types.Transaction, error) IsMessageReceived(opts *bind.CallOpts, msgHash [32]byte, srcChainId *big.Int, proof []byte) (bool, error) // nolint + FilterMessageStatusChanged( + opts *bind.FilterOpts, + msgHash [][32]byte, + ) (*bridge.BridgeMessageStatusChangedIterator, error) + WatchMessageStatusChanged( + opts *bind.WatchOpts, + sink chan<- *bridge.BridgeMessageStatusChanged, + msgHash [][32]byte, + ) (event.Subscription, error) } diff --git a/packages/relayer/event.go b/packages/relayer/event.go index 8b2737aef66..7a5ec8c08c1 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -9,7 +9,8 @@ import ( ) var ( - EventNameMessageSent = "MessageSent" + EventNameMessageSent = "MessageSent" + EventNameMessageStatusChanged = "MessageStatusChanged" ) // EventStatus is used to indicate whether processing has been attempted diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 0d949fb087b..f7355ad0c8a 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -61,7 +61,24 @@ func (svc *Service) FilterThenSubscribe( end = header.Number.Uint64() } - events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{ + messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(&bind.FilterOpts{ + Start: svc.processingBlockHeight, + End: &end, + Context: ctx, + }, nil) + if err != nil { + return errors.Wrap(err, "bridge.FilterMessageStatusChanged") + } + + err = svc.saveMessageStatusChangedEvents(ctx, chainID, messageStatusChangedEvents) + if err != nil { + return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents") + } + + // we dont need to do anything with msgStatus events except save them to the DB. + // we dont need to process them. they are for exposing via the API. + + messageSentEvents, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{ Start: svc.processingBlockHeight, End: &end, Context: ctx, @@ -70,7 +87,7 @@ func (svc *Service) FilterThenSubscribe( return errors.Wrap(err, "bridge.FilterMessageSent") } - if !events.Next() || events.Event == nil { + if !messageSentEvents.Next() || messageSentEvents.Event == nil { if err := svc.handleNoEventsInBatch(ctx, chainID, int64(end)); err != nil { return errors.Wrap(err, "svc.handleNoEventsInBatch") } @@ -83,7 +100,7 @@ func (svc *Service) FilterThenSubscribe( group.SetLimit(svc.numGoroutines) for { - event := events.Event + event := messageSentEvents.Event group.Go(func() error { err := svc.handleEvent(groupCtx, chainID, event) @@ -97,7 +114,7 @@ func (svc *Service) FilterThenSubscribe( }) // if there are no more events - if !events.Next() { + if !messageSentEvents.Next() { // wait for the last of the goroutines to finish if err := group.Wait(); err != nil { return errors.Wrap(err, "group.Wait") diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go new file mode 100644 index 00000000000..dcfe11c2b12 --- /dev/null +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -0,0 +1,60 @@ +package indexer + +import ( + "context" + "encoding/json" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikoxyz/taiko-mono/packages/relayer" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts/bridge" +) + +func (svc *Service) saveMessageStatusChangedEvents( + ctx context.Context, + chainID *big.Int, + events *bridge.BridgeMessageStatusChangedIterator, +) error { + if !events.Next() || events.Event == nil { + log.Infof("no messageStatusChanged events") + return nil + } + + for { + event := events.Event + log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex()) + + if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { + return errors.Wrap(err, "svc.saveMessageStatusChangedEvent") + } + + if !events.Next() { + return nil + } + } +} + +func (svc *Service) saveMessageStatusChangedEvent( + ctx context.Context, + chainID *big.Int, + event *bridge.BridgeMessageStatusChanged, +) error { + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + _, err = svc.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: string(marshaled), + ChainID: chainID, + Status: relayer.EventStatus(event.Status), + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } + + return nil +} diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 50cab3ad180..e4db224e367 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -16,6 +17,22 @@ import ( func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { log.Info("subscribing to new events") + errChan := make(chan error) + + go svc.subscribeMessageSent(ctx, chainID, errChan) + + go svc.subscribeMessageStatusChanged(ctx, chainID, errChan) + + // nolint: gosimple + for { + select { + case err := <-errChan: + return errors.Wrap(err, "errChan") + } + } +} + +func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, errChan chan error) { sink := make(chan *bridge.BridgeMessageSent) sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { @@ -33,10 +50,12 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { for { select { case err := <-sub.Err(): - return errors.Wrap(err, "sub.Err()") + errChan <- errors.Wrap(err, "sub.Err()") case event := <-sink: go func() { + log.Infof("new message sent event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String()) err := svc.handleEvent(ctx, chainID, event) + if err != nil { log.Errorf("svc.subscribe, svc.handleEvent: %v", err) } @@ -63,3 +82,32 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { } } } + +func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID *big.Int, errChan chan error) { + sink := make(chan *bridge.BridgeMessageStatusChanged) + + sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err) + } + + return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{ + Context: ctx, + }, sink, nil) + }) + + defer sub.Unsubscribe() + + for { + select { + case err := <-sub.Err(): + errChan <- errors.Wrap(err, "sub.Err()") + case event := <-sink: + log.Infof("new message status changed event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String()) + + if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { + log.Errorf("svc.subscribe, svc.saveMessageStatusChangedEvent: %v", err) + } + } + } +} diff --git a/packages/relayer/indexer/subscribe_test.go b/packages/relayer/indexer/subscribe_test.go index f899386d352..a606fe53cda 100644 --- a/packages/relayer/indexer/subscribe_test.go +++ b/packages/relayer/indexer/subscribe_test.go @@ -20,6 +20,7 @@ func Test_subscribe(t *testing.T) { b := bridge.(*mock.Bridge) - assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, 1, b.MessagesSent) + assert.Equal(t, 1, b.MessageStatusesChanged) + assert.Equal(t, 2, b.ErrorsSent) } diff --git a/packages/relayer/mock/bridge.go b/packages/relayer/mock/bridge.go index 10b54fc13c0..80404a437e3 100644 --- a/packages/relayer/mock/bridge.go +++ b/packages/relayer/mock/bridge.go @@ -30,8 +30,9 @@ var ProcessMessageTx = types.NewTransaction( ) type Bridge struct { - MessagesSent int - ErrorsSent int + MessagesSent int + MessageStatusesChanged int + ErrorsSent int } type Subscription struct { @@ -80,6 +81,41 @@ func (b *Bridge) FilterMessageSent( return &bridge.BridgeMessageSentIterator{}, nil } +func (b *Bridge) WatchMessageStatusChanged( + opts *bind.WatchOpts, + sink chan<- *bridge.BridgeMessageStatusChanged, + msgHash [][32]byte, +) (event.Subscription, error) { + s := &Subscription{ + errChan: make(chan error), + } + + go func(sink chan<- *bridge.BridgeMessageStatusChanged) { + <-time.After(2 * time.Second) + + sink <- &bridge.BridgeMessageStatusChanged{} + b.MessageStatusesChanged++ + }(sink) + + go func(errChan chan error) { + <-time.After(5 * time.Second) + + errChan <- errors.New("fail") + + s.done = true + b.ErrorsSent++ + }(s.errChan) + + return s, nil +} + +func (b *Bridge) FilterMessageStatusChanged( + opts *bind.FilterOpts, + signal [][32]byte, +) (*bridge.BridgeMessageStatusChangedIterator, error) { + return &bridge.BridgeMessageStatusChangedIterator{}, nil +} + func (b *Bridge) GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error) { if msgHash == SuccessMsgHash { return uint8(relayer.EventStatusNew), nil From a9f0431ca295dafbf5df029566333cbbb359884a Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 7 Mar 2023 12:16:34 -0800 Subject: [PATCH 02/10] filtera nd sub --- packages/relayer/repo/event.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index c0dd7d7bce2..651257c678d 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -65,12 +65,15 @@ func (r *EventRepository) FindAllByAddressAndChainID( address common.Address, ) ([]*relayer.Event, error) { e := make([]*relayer.Event, 0) + // find all message sent events if err := r.db.GormDB().Where("chain_id = ?", chainID.Int64()). Find(&e, datatypes.JSONQuery("data"). Equals(strings.ToLower(address.Hex()), "Message", "Owner")).Error; err != nil { return nil, errors.Wrap(err, "r.db.Find") } + // find all message status changed events + return e, nil } From 4ec6592b5b879519e1ae64a0b932ef4caf2ab282 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 7 Mar 2023 17:31:58 -0800 Subject: [PATCH 03/10] add relayer event when procesiing own message --- packages/relayer/.golangci.yml | 2 +- packages/relayer/event.go | 8 ++ .../indexer/filter_then_subscribe_test.go | 9 +- packages/relayer/indexer/handle_event.go | 2 + .../save_message_status_changed_events.go | 21 ++++- packages/relayer/message/process_message.go | 54 +++++++++++- .../relayer/message/process_message_test.go | 1 + .../1666650599_create_events_table.sql | 4 +- packages/relayer/mock/bridge.go | 6 +- packages/relayer/mock/event_repository.go | 27 ++++-- packages/relayer/repo/event.go | 22 ++++- packages/relayer/repo/event_test.go | 84 +++++++++++++++++++ 12 files changed, 222 insertions(+), 18 deletions(-) diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index e62f74c8761..71d65badac2 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -29,7 +29,7 @@ linters: linters-settings: funlen: lines: 132 - statements: 52 + statements: 54 gocognit: min-complexity: 41 diff --git a/packages/relayer/event.go b/packages/relayer/event.go index 7a5ec8c08c1..a598c0215d6 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -55,6 +55,8 @@ type Event struct { CanonicalTokenName string `json:"canonicalTokenName"` CanonicalTokenDecimals uint8 `json:"canonicalTokenDecimals"` Amount string `json:"amount"` + MsgHash string `json:"msgHash"` + MessageOwner string `json:"messageOwner"` } // SaveEventOpts @@ -69,6 +71,8 @@ type SaveEventOpts struct { CanonicalTokenName string CanonicalTokenDecimals uint8 Amount string + MsgHash string + MessageOwner string } // EventRepository is used to interact with events in the store @@ -84,4 +88,8 @@ type EventRepository interface { ctx context.Context, address common.Address, ) ([]*Event, error) + FindAllByMsgHash( + ctx context.Context, + msgHash string, + ) ([]*Event, error) } diff --git a/packages/relayer/indexer/filter_then_subscribe_test.go b/packages/relayer/indexer/filter_then_subscribe_test.go index 2bede242ca1..3d6841d2723 100644 --- a/packages/relayer/indexer/filter_then_subscribe_test.go +++ b/packages/relayer/indexer/filter_then_subscribe_test.go @@ -27,7 +27,8 @@ func Test_FilterThenSubscribe(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) { @@ -45,7 +46,8 @@ func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) { @@ -65,5 +67,6 @@ func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index a3714ae87fd..2ec59bc9497 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -62,6 +62,8 @@ func (svc *Service) handleEvent( CanonicalTokenName: canonicalToken.Name, CanonicalTokenDecimals: canonicalToken.Decimals, Amount: amount.String(), + MsgHash: common.Hash(event.MsgHash).Hex(), + MessageOwner: event.Message.Owner.Hex(), }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go index dcfe11c2b12..6a2f81b883d 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -46,11 +46,24 @@ func (svc *Service) saveMessageStatusChangedEvent( return errors.Wrap(err, "json.Marshal(event)") } + // get the previous MessageSent event or other message status changed events, + // so we can find out the previous owner of this msg hash, + // to save to the db + previousEvents, err := svc.eventRepo.FindAllByMsgHash(ctx, common.Hash(event.MsgHash).Hex()) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") + } + + if len(previousEvents) == 0 { + return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") + } + _, err = svc.eventRepo.Save(ctx, relayer.SaveEventOpts{ - Name: relayer.EventNameMessageStatusChanged, - Data: string(marshaled), - ChainID: chainID, - Status: relayer.EventStatus(event.Status), + Name: relayer.EventNameMessageStatusChanged, + Data: string(marshaled), + ChainID: chainID, + Status: relayer.EventStatus(event.Status), + MessageOwner: previousEvents[0].MessageOwner, }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index c32f2b7be29..3daaefcfc0f 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -3,7 +3,10 @@ package message import ( "context" "encoding/hex" + "encoding/json" + "strings" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -88,11 +91,15 @@ func (p *Processor) ProcessMessage( log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes())) - _, err = relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash()) + receipt, err := relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash()) if err != nil { return errors.Wrap(err, "relayer.WaitReceipt") } + if err := p.saveMessageStatusChangedEvent(ctx, receipt, e, event); err != nil { + return errors.Wrap(err, "p.saveMEssageStatusChangedEvent") + } + log.Infof("Mined tx %s", hex.EncodeToString(tx.Hash().Bytes())) messageStatus, err := p.destBridge.GetMessageStatus(&bind.CallOpts{}, event.MsgHash) @@ -163,3 +170,48 @@ func (p *Processor) sendProcessMessageCall( func (p *Processor) setLatestNonce(nonce uint64) { p.destNonce = nonce } + +func (p *Processor) saveMessageStatusChangedEvent( + ctx context.Context, + receipt *types.Receipt, + e *relayer.Event, + event *bridge.BridgeMessageSent, +) error { + bridgeAbi, err := abi.JSON(strings.NewReader(bridge.BridgeABI)) + if err != nil { + return errors.Wrap(err, "abi.JSON") + } + + messageStatusChangedEvent := &bridge.BridgeMessageStatusChanged{} + + for _, log := range receipt.Logs { + // topic := log.Topics[0] + // if topic == crypto.Keccak256Hash("MessageStatusChanged(bytes32,enum,address)") { + err = bridgeAbi.UnpackIntoInterface(messageStatusChangedEvent, "MessageStatusChanged", log.Data) + if err != nil { + continue + } else { + break + } + } + + if e != nil { + marshaled, err := json.Marshal(messageStatusChangedEvent) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: string(marshaled), + ChainID: event.Message.SrcChainId, + Status: relayer.EventStatus(messageStatusChangedEvent.Status), + MessageOwner: e.MessageOwner, + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } + } + + return nil +} diff --git a/packages/relayer/message/process_message_test.go b/packages/relayer/message/process_message_test.go index 116fc17e2f5..71ed8629ba3 100644 --- a/packages/relayer/message/process_message_test.go +++ b/packages/relayer/message/process_message_test.go @@ -67,6 +67,7 @@ func Test_ProcessMessage(t *testing.T) { GasLimit: big.NewInt(1), DestChainId: mock.MockChainID, ProcessingFee: big.NewInt(1000000000), + SrcChainId: mock.MockChainID, }, MsgHash: mock.SuccessMsgHash, }, &relayer.Event{}) diff --git a/packages/relayer/migrations/1666650599_create_events_table.sql b/packages/relayer/migrations/1666650599_create_events_table.sql index 827d4595739..2cc73556c33 100644 --- a/packages/relayer/migrations/1666650599_create_events_table.sql +++ b/packages/relayer/migrations/1666650599_create_events_table.sql @@ -11,7 +11,9 @@ CREATE TABLE IF NOT EXISTS events ( canonical_token_symbol VARCHAR(10) DEFAULT "", canonical_token_name VARCHAR(255) DEFAULT "", canonical_token_decimals int DEFAULT 0, - amount VARCHAR(255) NOT NULL, + amount VARCHAR(255) NOT NULL DEFAULT 0, + msg_hash VARCHAR(255) NOT NULL, + message_owner VARCHAR(255) NOT NULL DEFAULT "", created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP , updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); diff --git a/packages/relayer/mock/bridge.go b/packages/relayer/mock/bridge.go index 80404a437e3..c8e87c0abfc 100644 --- a/packages/relayer/mock/bridge.go +++ b/packages/relayer/mock/bridge.go @@ -58,7 +58,11 @@ func (b *Bridge) WatchMessageSent( go func(sink chan<- *bridge.BridgeMessageSent) { <-time.After(2 * time.Second) - sink <- &bridge.BridgeMessageSent{} + sink <- &bridge.BridgeMessageSent{ + Message: bridge.IBridgeMessage{ + SrcChainId: big.NewInt(1), + }, + } b.MessagesSent++ }(sink) diff --git a/packages/relayer/mock/event_repository.go b/packages/relayer/mock/event_repository.go index b0dd7aa4162..d9f8f921ec1 100644 --- a/packages/relayer/mock/event_repository.go +++ b/packages/relayer/mock/event_repository.go @@ -22,11 +22,13 @@ func NewEventRepository() *EventRepository { } func (r *EventRepository) Save(ctx context.Context, opts relayer.SaveEventOpts) (*relayer.Event, error) { r.events = append(r.events, &relayer.Event{ - ID: rand.Int(), // nolint: gosec - Data: datatypes.JSON(opts.Data), - Status: opts.Status, - ChainID: opts.ChainID.Int64(), - Name: opts.Name, + ID: rand.Int(), // nolint: gosec + Data: datatypes.JSON(opts.Data), + Status: opts.Status, + ChainID: opts.ChainID.Int64(), + Name: opts.Name, + MessageOwner: opts.MessageOwner, + MsgHash: opts.MsgHash, }) return nil, nil @@ -121,3 +123,18 @@ func (r *EventRepository) FindAllByAddress( return events, nil } + +func (r *EventRepository) FindAllByMsgHash( + ctx context.Context, + msgHash string, +) ([]*relayer.Event, error) { + events := make([]*relayer.Event, 0) + + for _, e := range r.events { + if e.MsgHash == msgHash { + events = append(events, e) + } + } + + return events, nil +} diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index 651257c678d..5220692d318 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -37,6 +37,8 @@ func (r *EventRepository) Save(ctx context.Context, opts relayer.SaveEventOpts) CanonicalTokenName: opts.CanonicalTokenName, CanonicalTokenDecimals: opts.CanonicalTokenDecimals, Amount: opts.Amount, + MsgHash: opts.MsgHash, + MessageOwner: opts.MessageOwner, } if err := r.db.GormDB().Create(e).Error; err != nil { return nil, errors.Wrap(err, "r.db.Create") @@ -59,6 +61,22 @@ func (r *EventRepository) UpdateStatus(ctx context.Context, id int, status relay return nil } +func (r *EventRepository) FindAllByMsgHash( + ctx context.Context, + msgHash string, +) ([]*relayer.Event, error) { + e := make([]*relayer.Event, 0) + // find all message sent events + if err := r.db.GormDB().Where("msg_hash = ?", msgHash). + Find(&e).Error; err != nil { + return nil, errors.Wrap(err, "r.db.Find") + } + + // find all message status changed events + + return e, nil +} + func (r *EventRepository) FindAllByAddressAndChainID( ctx context.Context, chainID *big.Int, @@ -67,8 +85,8 @@ func (r *EventRepository) FindAllByAddressAndChainID( e := make([]*relayer.Event, 0) // find all message sent events if err := r.db.GormDB().Where("chain_id = ?", chainID.Int64()). - Find(&e, datatypes.JSONQuery("data"). - Equals(strings.ToLower(address.Hex()), "Message", "Owner")).Error; err != nil { + Where("message_owner = ?", strings.ToLower(address.Hex())). + Find(&e).Error; err != nil { return nil, errors.Wrap(err, "r.db.Find") } diff --git a/packages/relayer/repo/event_test.go b/packages/relayer/repo/event_test.go index 172efa12904..2a8e6f67d9c 100644 --- a/packages/relayer/repo/event_test.go +++ b/packages/relayer/repo/event_test.go @@ -65,6 +65,8 @@ func TestIntegration_Event_Save(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: "0x1", }, nil, }, @@ -121,6 +123,8 @@ func TestIntegration_Event_UpdateStatus(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: "0x1", }, ) assert.Equal(t, nil, err) @@ -152,6 +156,8 @@ func TestIntegration_Event_FindAllByAddressAndChainID(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }) assert.Equal(t, nil, err) tests := []struct { @@ -179,6 +185,8 @@ func TestIntegration_Event_FindAllByAddressAndChainID(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }, }, nil, @@ -230,6 +238,8 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }) assert.Equal(t, nil, err) tests := []struct { @@ -255,6 +265,8 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }, }, nil, @@ -275,3 +287,75 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { }) } } + +func TestIntegration_Event_FindAllByMsgHash(t *testing.T) { + db, close, err := testMysql(t) + assert.Equal(t, nil, err) + + defer close() + + eventRepo, err := NewEventRepository(db) + assert.Equal(t, nil, err) + + addr := common.HexToAddress("0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + + _, err = eventRepo.Save(context.Background(), relayer.SaveEventOpts{ + Name: "name", + Data: fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())), + ChainID: big.NewInt(1), + Status: relayer.EventStatusDone, + EventType: relayer.EventTypeSendETH, + CanonicalTokenAddress: "0x1", + CanonicalTokenSymbol: "ETH", + CanonicalTokenName: "Ethereum", + CanonicalTokenDecimals: 18, + Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), + }) + assert.Equal(t, nil, err) + tests := []struct { + name string + msgHash string + wantResp []*relayer.Event + wantErr error + }{ + { + "success", + "0x1", + []*relayer.Event{ + { + ID: 1, + Name: "name", + // nolint lll + Data: datatypes.JSON([]byte(fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())))), + ChainID: 1, + Status: relayer.EventStatusDone, + EventType: relayer.EventTypeSendETH, + CanonicalTokenAddress: "0x1", + CanonicalTokenSymbol: "ETH", + CanonicalTokenName: "Ethereum", + CanonicalTokenDecimals: 18, + Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), + }, + }, + nil, + }, + { + "noneByMgHash", + "0xfake", + []*relayer.Event{}, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp, err := eventRepo.FindAllByMsgHash(context.Background(), tt.msgHash) + assert.Equal(t, tt.wantResp, resp) + assert.Equal(t, tt.wantErr, err) + }) + } +} From a3fe97468e0a95d03a3eb75ccb797bd8d1801f0a Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 7 Mar 2023 17:36:35 -0800 Subject: [PATCH 04/10] use dest chain for event --- .../relayer/indexer/save_message_status_changed_events.go | 4 +++- packages/relayer/message/process_message.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go index 6a2f81b883d..a0b4f86506c 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -3,6 +3,7 @@ package indexer import ( "context" "encoding/json" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -23,6 +24,7 @@ func (svc *Service) saveMessageStatusChangedEvents( } for { + fmt.Println("event!") event := events.Event log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex()) @@ -48,7 +50,7 @@ func (svc *Service) saveMessageStatusChangedEvent( // get the previous MessageSent event or other message status changed events, // so we can find out the previous owner of this msg hash, - // to save to the db + // to save to the db. previousEvents, err := svc.eventRepo.FindAllByMsgHash(ctx, common.Hash(event.MsgHash).Hex()) if err != nil { return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 3daaefcfc0f..d7e29f6de39 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -204,7 +204,7 @@ func (p *Processor) saveMessageStatusChangedEvent( _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ Name: relayer.EventNameMessageStatusChanged, Data: string(marshaled), - ChainID: event.Message.SrcChainId, + ChainID: event.Message.DestChainId, Status: relayer.EventStatus(messageStatusChangedEvent.Status), MessageOwner: e.MessageOwner, }) From 70346d801c94ef9594fef8233240f59cd6e64143 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 7 Mar 2023 18:16:01 -0800 Subject: [PATCH 05/10] wip --- .../relayer/indexer/filter_then_subscribe.go | 18 ++++++++---------- .../save_message_status_changed_events.go | 3 +-- packages/relayer/message/process_message.go | 15 +++++++-------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index f7355ad0c8a..875496b379e 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -61,28 +61,26 @@ func (svc *Service) FilterThenSubscribe( end = header.Number.Uint64() } - messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(&bind.FilterOpts{ + filterOpts := &bind.FilterOpts{ Start: svc.processingBlockHeight, End: &end, Context: ctx, - }, nil) + } + + messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(filterOpts, nil) if err != nil { return errors.Wrap(err, "bridge.FilterMessageStatusChanged") } + // we dont need to do anything with msgStatus events except save them to the DB. + // we dont need to process them. they are for exposing via the API. + err = svc.saveMessageStatusChangedEvents(ctx, chainID, messageStatusChangedEvents) if err != nil { return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents") } - // we dont need to do anything with msgStatus events except save them to the DB. - // we dont need to process them. they are for exposing via the API. - - messageSentEvents, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{ - Start: svc.processingBlockHeight, - End: &end, - Context: ctx, - }, nil) + messageSentEvents, err := svc.bridge.FilterMessageSent(filterOpts, nil) if err != nil { return errors.Wrap(err, "bridge.FilterMessageSent") } diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go index a0b4f86506c..8fb24967035 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -3,7 +3,6 @@ package indexer import ( "context" "encoding/json" - "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -24,7 +23,6 @@ func (svc *Service) saveMessageStatusChangedEvents( } for { - fmt.Println("event!") event := events.Event log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex()) @@ -66,6 +64,7 @@ func (svc *Service) saveMessageStatusChangedEvent( ChainID: chainID, Status: relayer.EventStatus(event.Status), MessageOwner: previousEvents[0].MessageOwner, + MsgHash: common.Hash(event.MsgHash).Hex(), }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index d7e29f6de39..243a105544d 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -3,7 +3,7 @@ package message import ( "context" "encoding/hex" - "encoding/json" + "fmt" "strings" "github.com/ethereum/go-ethereum/accounts/abi" @@ -189,23 +189,22 @@ func (p *Processor) saveMessageStatusChangedEvent( // if topic == crypto.Keccak256Hash("MessageStatusChanged(bytes32,enum,address)") { err = bridgeAbi.UnpackIntoInterface(messageStatusChangedEvent, "MessageStatusChanged", log.Data) if err != nil { - continue + return errors.Wrap(err, "abi.UnpackIntoInterface") } else { break } } - if e != nil { - marshaled, err := json.Marshal(messageStatusChangedEvent) - if err != nil { - return errors.Wrap(err, "json.Marshal(event)") - } + if messageStatusChangedEvent != nil { + // keep same format as other raw events + data := fmt.Sprintf(`{"Raw":"transactionHash": "%v"}`, receipt.TxHash.Hex()) _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ Name: relayer.EventNameMessageStatusChanged, - Data: string(marshaled), + Data: data, ChainID: event.Message.DestChainId, Status: relayer.EventStatus(messageStatusChangedEvent.Status), + MsgHash: e.MsgHash, MessageOwner: e.MessageOwner, }) if err != nil { From 92091f7996850c1d736a356daa316bf50f726653 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Wed, 8 Mar 2023 10:14:02 -0800 Subject: [PATCH 06/10] use map --- packages/relayer/indexer/handle_event.go | 2 +- .../save_message_status_changed_events.go | 2 +- packages/relayer/message/process_message.go | 43 +++++++++---------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index 2ec59bc9497..08aaac249dc 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -52,7 +52,7 @@ func (svc *Service) handleEvent( } e, err := svc.eventRepo.Save(ctx, relayer.SaveEventOpts{ - Name: eventName, + Name: relayer.EventNameMessageSent, Data: string(marshaled), ChainID: chainID, Status: eventStatus, diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go index 8fb24967035..ce44f1a483a 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -17,7 +17,7 @@ func (svc *Service) saveMessageStatusChangedEvents( chainID *big.Int, events *bridge.BridgeMessageStatusChangedIterator, ) error { - if !events.Next() || events.Event == nil { + if !events.Next() { log.Infof("no messageStatusChanged events") return nil } diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index ec9eab50e74..2fb8989d5af 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -184,34 +184,33 @@ func (p *Processor) saveMessageStatusChangedEvent( return errors.Wrap(err, "abi.JSON") } - messageStatusChangedEvent := &bridge.BridgeMessageStatusChanged{} + m := make(map[string]interface{}) for _, log := range receipt.Logs { - // topic := log.Topics[0] - // if topic == crypto.Keccak256Hash("MessageStatusChanged(bytes32,enum,address)") { - err = bridgeAbi.UnpackIntoInterface(messageStatusChangedEvent, "MessageStatusChanged", log.Data) - if err != nil { - return errors.Wrap(err, "abi.UnpackIntoInterface") - } else { + topic := log.Topics[0] + if topic == crypto.Keccak256Hash([]byte("MessageStatusChanged(bytes32,uint8,address)")) { + err = bridgeAbi.UnpackIntoMap(m, "MessageStatusChanged", log.Data) + if err != nil { + return errors.Wrap(err, "abi.UnpackIntoInterface") + } + break } } - if messageStatusChangedEvent != nil { - // keep same format as other raw events - data := fmt.Sprintf(`{"Raw":"transactionHash": "%v"}`, receipt.TxHash.Hex()) - - _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ - Name: relayer.EventNameMessageStatusChanged, - Data: data, - ChainID: event.Message.DestChainId, - Status: relayer.EventStatus(messageStatusChangedEvent.Status), - MsgHash: e.MsgHash, - MessageOwner: e.MessageOwner, - }) - if err != nil { - return errors.Wrap(err, "svc.eventRepo.Save") - } + // keep same format as other raw events + data := fmt.Sprintf(`{"Raw":{"transactionHash": "%v"}}`, receipt.TxHash.Hex()) + + _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: data, + ChainID: event.Message.DestChainId, + Status: relayer.EventStatus(m["status"].(uint8)), + MsgHash: e.MsgHash, + MessageOwner: e.MessageOwner, + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") } return nil From a25855e17869773bb41600eeb140a4d2dfc00c8b Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Wed, 8 Mar 2023 10:57:35 -0800 Subject: [PATCH 07/10] only use map if value exists --- packages/relayer/message/process_message.go | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 2fb8989d5af..a2a04a9401a 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -197,20 +197,21 @@ func (p *Processor) saveMessageStatusChangedEvent( break } } - - // keep same format as other raw events - data := fmt.Sprintf(`{"Raw":{"transactionHash": "%v"}}`, receipt.TxHash.Hex()) - - _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ - Name: relayer.EventNameMessageStatusChanged, - Data: data, - ChainID: event.Message.DestChainId, - Status: relayer.EventStatus(m["status"].(uint8)), - MsgHash: e.MsgHash, - MessageOwner: e.MessageOwner, - }) - if err != nil { - return errors.Wrap(err, "svc.eventRepo.Save") + if m["status"] != nil { + // keep same format as other raw events + data := fmt.Sprintf(`{"Raw":{"transactionHash": "%v"}}`, receipt.TxHash.Hex()) + + _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: data, + ChainID: event.Message.DestChainId, + Status: relayer.EventStatus(m["status"].(uint8)), + MsgHash: e.MsgHash, + MessageOwner: e.MessageOwner, + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } } return nil From aef90df80c3e069aaaeb1d958481e8271f6be4e8 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Thu, 9 Mar 2023 10:56:37 -0800 Subject: [PATCH 08/10] lint --- packages/relayer/message/process_message.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index a2a04a9401a..40b66f70626 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -197,6 +197,7 @@ func (p *Processor) saveMessageStatusChangedEvent( break } } + if m["status"] != nil { // keep same format as other raw events data := fmt.Sprintf(`{"Raw":{"transactionHash": "%v"}}`, receipt.TxHash.Hex()) From 43175536ac9179fd9cea594deaa1d9dab17260b1 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 10 Mar 2023 11:38:39 -0800 Subject: [PATCH 09/10] context for subscribe --- packages/relayer/indexer/subscribe.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index e4db224e367..46b8f82002c 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -26,6 +26,9 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { // nolint: gosimple for { select { + case <-ctx.Done(): + log.Info("context finished") + return nil case err := <-errChan: return errors.Wrap(err, "errChan") } @@ -49,6 +52,9 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, for { select { + case <-ctx.Done(): + log.Info("context finished") + return case err := <-sub.Err(): errChan <- errors.Wrap(err, "sub.Err()") case event := <-sink: @@ -100,6 +106,9 @@ func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID * for { select { + case <-ctx.Done(): + log.Info("context finished") + return case err := <-sub.Err(): errChan <- errors.Wrap(err, "sub.Err()") case event := <-sink: From 52eb14aa8636d8b812d52c1e6a95b0c73d06688e Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 10 Mar 2023 11:39:30 -0800 Subject: [PATCH 10/10] use events abi --- packages/relayer/message/process_message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 40b66f70626..cc479b5eddf 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -188,7 +188,7 @@ func (p *Processor) saveMessageStatusChangedEvent( for _, log := range receipt.Logs { topic := log.Topics[0] - if topic == crypto.Keccak256Hash([]byte("MessageStatusChanged(bytes32,uint8,address)")) { + if topic == bridgeAbi.Events["MessageStatusChanged"].ID { err = bridgeAbi.UnpackIntoMap(m, "MessageStatusChanged", log.Data) if err != nil { return errors.Wrap(err, "abi.UnpackIntoInterface")