Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process messages concurrently #288

Merged
merged 62 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
51d4e6f
route incoming messages to appRelayer
cam-schultz Apr 29, 2024
608022c
Merge branch 'db-manager' into concurrent-messages
cam-schultz Apr 30, 2024
a1bdf87
todo comments
cam-schultz Apr 30, 2024
3b0c438
remove catch up results chan
cam-schultz Apr 30, 2024
4142101
Revert "remove catch up results chan"
cam-schultz Apr 30, 2024
01a07e3
write directly to headers chan
cam-schultz Apr 30, 2024
8cf64a6
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 1, 2024
678fa92
send to channel before closing
cam-schultz May 2, 2024
e8371e3
relay messages async - wip
cam-schultz May 2, 2024
252b027
listener composed of app relayers
cam-schultz May 2, 2024
f386d11
appRelayer maintains block status
cam-schultz May 2, 2024
b5fca25
combine prepare and process height
cam-schultz May 3, 2024
34eda3f
refactor message manager and warpblockinfo
cam-schultz May 3, 2024
efa8f3b
restore integ tests
cam-schultz May 3, 2024
de411b9
combine prepare and process height
cam-schultz May 3, 2024
9f79ca1
refactor app relayer interface
cam-schultz May 3, 2024
7edff1d
noop if committed height is unchanged
cam-schultz May 10, 2024
2ee56bb
remove unused var
cam-schultz May 10, 2024
72f6f0c
fix unit tests
cam-schultz May 10, 2024
08235b7
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 13, 2024
5ffa9dc
Merge branch 'concurrent-messages' into app-relayer-worker
cam-schultz May 13, 2024
90391a6
go 1.22.3
cam-schultz May 13, 2024
69dbc80
bump golangci-lint version
cam-schultz May 13, 2024
acbc62d
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 14, 2024
23dd427
Merge branch 'concurrent-messages' into app-relayer-worker
cam-schultz May 14, 2024
b57d05d
add batch Teleporter message test
cam-schultz May 14, 2024
1a051f3
do not assume ordered message delivery
cam-schultz May 14, 2024
2db03ec
migrate deprecated flag
cam-schultz May 14, 2024
248c852
checkout submodules in ci
cam-schultz May 14, 2024
b5cd801
re-checkout repo
cam-schultz May 14, 2024
0930225
Merge branch 'main' into app-relayer-worker
cam-schultz May 14, 2024
4d0cd01
remove unused param
cam-schultz May 14, 2024
1cd196e
relay between subnets, no c-chain
cam-schultz May 14, 2024
7b345c6
debug logs
cam-schultz May 14, 2024
b3c5b5e
batch test requires all messages delivered
cam-schultz May 14, 2024
8dce7b4
comments+cleanup
cam-schultz May 15, 2024
359032d
Merge branch 'main' into app-relayer-worker
cam-schultz May 20, 2024
f1887c9
cleanup external handler
cam-schultz May 21, 2024
65a9162
request id mutual exclusion
cam-schultz May 21, 2024
ef461da
comments and cleanup
cam-schultz May 21, 2024
cbddf3a
rename MessageManager to MessageHandlerFactory
cam-schultz May 21, 2024
8653be1
lint
cam-schultz May 21, 2024
610105b
revert to go1.21
cam-schultz May 21, 2024
e939eca
add handleResponse helper
cam-schultz May 21, 2024
0611a3b
Update relayer/application_relayer.go
cam-schultz May 22, 2024
b920a0f
rename shadowed var
cam-schultz May 22, 2024
757f988
cleanup
cam-schultz May 22, 2024
359dde4
process blocks async
cam-schultz May 22, 2024
00a1d4e
app relayer holds a dst client
cam-schultz May 22, 2024
8ed636b
fix unit test
cam-schultz May 22, 2024
ad391aa
Merge branch 'app-relayer-worker' of github.com:ava-labs/awm-relayer …
cam-schultz May 22, 2024
d1936ec
handle app relayer errors
cam-schultz May 23, 2024
59210e0
add upgrade comment
cam-schultz May 23, 2024
f4f44e6
only lock nonce usage
cam-schultz May 23, 2024
9e55c91
Merge branch 'main' into app-relayer-worker
geoff-vball May 28, 2024
077cea1
Merge branch 'main' into app-relayer-worker
geoff-vball May 31, 2024
1782f31
Refactor application relayers
geoff-vball May 31, 2024
ba2f686
Fix typo
geoff-vball May 31, 2024
024ffd2
Merge pull request #309 from ava-labs/gstuart/concurrent-message
geoff-vball May 31, 2024
65b8ba8
Fix key truncation
geoff-vball May 31, 2024
3404bf7
lint
geoff-vball May 31, 2024
7d2f8ee
Merge branch 'main' into app-relayer-worker
geoff-vball May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ava-labs/awm-relayer

go 1.21.10
go 1.22.3
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved

require (
github.com/ava-labs/avalanche-network-runner v1.7.6
Expand Down
119 changes: 98 additions & 21 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/types"
relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
Expand Down Expand Up @@ -97,7 +100,7 @@ func main() {
if logLevel <= logging.Debug {
networkLogLevel = logLevel
}
network, responseChans, err := peers.NewNetwork(
network, err := peers.NewNetwork(
networkLogLevel,
registerer,
&cfg,
Expand Down Expand Up @@ -175,13 +178,20 @@ func main() {
ticker := utils.NewTicker(cfg.DBWriteIntervalSeconds)
go ticker.Run()

manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpLogInfo)
manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpMessageInfo)
for _, msg := range cfg.ManualWarpMessages {
sourceBlockchainID := msg.GetSourceBlockchainID()

warpLogInfo := relayerTypes.WarpLogInfo{
SourceAddress: msg.GetSourceAddress(),
UnsignedMsgBytes: msg.GetUnsignedMessageBytes(),
unsignedMsg, err := types.UnpackWarpMessage(msg.GetUnsignedMessageBytes())
if err != nil {
logger.Error(
"Failed to unpack manual Warp message",
zap.Error(err),
)
panic(err)
}
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
warpLogInfo := relayerTypes.WarpMessageInfo{
SourceAddress: msg.GetSourceAddress(),
UnsignedMessage: unsignedMsg,
}
manualWarpMessages[sourceBlockchainID] = append(manualWarpMessages[sourceBlockchainID], &warpLogInfo)
}
Expand Down Expand Up @@ -213,7 +223,6 @@ func main() {
ticker,
*subnetInfo,
network,
responseChans[blockchainID],
destinationClients,
messageCreator,
health,
Expand All @@ -236,62 +245,130 @@ func runRelayer(
metrics *relayer.ApplicationRelayerMetrics,
db database.RelayerDatabase,
ticker *utils.Ticker,
sourceSubnetInfo config.SourceBlockchain,
sourceBlockchain config.SourceBlockchain,
network *peers.AppRequestNetwork,
responseChan chan message.InboundMessage,
destinationClients map[ids.ID]vms.DestinationClient,
messageCreator message.Creator,
relayerHealth *atomic.Bool,
manualWarpMessages []*relayerTypes.WarpLogInfo,
manualWarpMessages []*relayerTypes.WarpMessageInfo,
cfg *config.Config,
) error {
// Create the application relayers
logger.Info(
"Creating relayer",
zap.String("originBlockchainID", sourceSubnetInfo.BlockchainID),
"Creating application relayers",
zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
)
ethClient, err := ethclient.Dial(sourceBlockchain.RPCEndpoint)
if err != nil {
logger.Error(
"Failed to connect to node via RPC",
zap.String("blockchainID", sourceBlockchain.BlockchainID),
zap.Error(err),
)
return err
}
currentHeight, err := ethClient.BlockNumber(context.Background())
if err != nil {
logger.Error(
"Failed to get current block height",
zap.Error(err),
)
return err
}
applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)
minHeight := uint64(0)

for _, relayerID := range database.GetSourceBlockchainRelayerIDs(&sourceBlockchain) {
height, err := database.CalculateStartingBlockHeight(
logger,
db,
relayerID,
sourceBlockchain.ProcessHistoricalBlocksFromHeight,
currentHeight,
)
if err != nil {
logger.Error(
"Failed to calculate starting block height",
zap.String("relayerID", relayerID.ID.String()),
zap.Error(err),
)
return err
}
if minHeight == 0 || height < minHeight {
minHeight = height
}
applicationRelayer, err := relayer.NewApplicationRelayer(
logger,
metrics,
network,
messageCreator,
relayerID,
db,
ticker,
sourceBlockchain,
height,
cfg,
)
if err != nil {
logger.Error(
"Failed to create application relayer",
zap.String("relayerID", relayerID.ID.String()),
zap.Error(err),
)
return err
}
applicationRelayers[relayerID.ID] = applicationRelayer
}

logger.Info(
"Creating listener",
zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
)
listener, err := relayer.NewListener(
logger,
metrics,
db,
ticker,
sourceSubnetInfo,
sourceBlockchain,
network,
responseChan,
destinationClients,
messageCreator,
relayerHealth,
cfg,
applicationRelayers,
minHeight,
ethClient,
)
if err != nil {
return fmt.Errorf("failed to create listener instance: %w", err)
}
logger.Info(
"Created listener",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
zap.String("blockchainID", sourceBlockchain.BlockchainID),
)

// Send any messages that were specified in the configuration
for _, warpMessage := range manualWarpMessages {
logger.Info(
"Relaying manual Warp message",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMsgBytes)),
zap.String("blockchainID", sourceBlockchain.BlockchainID),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMessage.Bytes())),
)
err := listener.RouteManualWarpMessage(warpMessage)
appRelayer, handler, err := listener.GetAppRelayerMessageHandler(0, warpMessage)
if err != nil {
logger.Error(
"Failed to relay manual Warp message. Continuing.",
"Failed to parse manual Warp message. Continuing.",
zap.Error(err),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMsgBytes)),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMessage.Bytes())),
)
continue
}
appRelayer.ProcessMessage(handler)
}

logger.Info(
"Listener initialized. Listening for messages to relay.",
zap.String("originBlockchainID", sourceSubnetInfo.BlockchainID),
zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
)

// Wait for logs from the subscribed node
Expand Down
44 changes: 8 additions & 36 deletions messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,35 @@
package messages

import (
"fmt"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/config"
offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
"github.com/ava-labs/awm-relayer/messages/teleporter"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ethereum/go-ethereum/common"
)

// MessageManager is specific to each message protocol. The interface handles choosing which messages to send
// for each message protocol, and performs the sending to the destination chain.
type MessageManager interface {
NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (MessageHandler, error)
}

type MessageHandler interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused what the need is to separate MessageManager and MessageHandler here, and worried it could easily be confused what each is intended to be used for in the future.

It seems like a MessageManager is just a wrapper for creating MessageHandlers. Would it be possible to remove it all together and just use NewMessageHandler directly with additional params as needed? Otherwise, I think we should consider renaming MessageManager to MessageHandlerFactory or something similar.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed MessageManager to MessageHandlerFactory, and put both it and MessageHandler in message_handler.go.

// ShouldSendMessage returns true if the message should be sent to the destination chain
// If an error is returned, the boolean should be ignored by the caller.
ShouldSendMessage(unsignedMessage *warp.UnsignedMessage, destinationBlockchainID ids.ID) (bool, error)
ShouldSendMessage(destinationBlockchainID ids.ID) (bool, error)

// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
SendMessage(signedMessage *warp.Message, destinationBlockchainID ids.ID) error

// GetMessageRoutingInfo returns the source chain ID, origin sender address, destination chain ID, and destination address
GetMessageRoutingInfo(unsignedMessage *warp.UnsignedMessage) (
GetMessageRoutingInfo() (
ids.ID,
common.Address,
ids.ID,
common.Address,
error,
)
}

// NewMessageManager constructs a MessageManager for a particular message protocol, defined by the message protocol address and config
// Note that DestinationClients may be invoked concurrently by many MessageManagers, so it is assumed that they are implemented in a thread-safe way
func NewMessageManager(
logger logging.Logger,
messageProtocolAddress common.Address,
messageProtocolConfig config.MessageProtocolConfig,
destinationClients map[ids.ID]vms.DestinationClient,
) (MessageManager, error) {
format := messageProtocolConfig.MessageFormat
switch config.ParseMessageProtocol(format) {
case config.TELEPORTER:
return teleporter.NewMessageManager(
logger,
messageProtocolAddress,
messageProtocolConfig,
destinationClients,
)
case config.OFF_CHAIN_REGISTRY:
return offchainregistry.NewMessageManager(
logger,
messageProtocolConfig,
destinationClients,
)
default:
return nil, fmt.Errorf("invalid message format %s", format)
}
// GetUnsignedMessage returns the unsigned message
GetUnsignedMessage() *warp.UnsignedMessage
}
Loading