Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process messages concurrently #288

Merged
merged 62 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
51d4e6f
route incoming messages to appRelayer
cam-schultz Apr 29, 2024
608022c
Merge branch 'db-manager' into concurrent-messages
cam-schultz Apr 30, 2024
a1bdf87
todo comments
cam-schultz Apr 30, 2024
3b0c438
remove catch up results chan
cam-schultz Apr 30, 2024
4142101
Revert "remove catch up results chan"
cam-schultz Apr 30, 2024
01a07e3
write directly to headers chan
cam-schultz Apr 30, 2024
8cf64a6
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 1, 2024
678fa92
send to channel before closing
cam-schultz May 2, 2024
e8371e3
relay messages async - wip
cam-schultz May 2, 2024
252b027
listener composed of app relayers
cam-schultz May 2, 2024
f386d11
appRelayer maintains block status
cam-schultz May 2, 2024
b5fca25
combine prepare and process height
cam-schultz May 3, 2024
34eda3f
refactor message manager and warpblockinfo
cam-schultz May 3, 2024
efa8f3b
restore integ tests
cam-schultz May 3, 2024
de411b9
combine prepare and process height
cam-schultz May 3, 2024
9f79ca1
refactor app relayer interface
cam-schultz May 3, 2024
7edff1d
noop if committed height is unchanged
cam-schultz May 10, 2024
2ee56bb
remove unused var
cam-schultz May 10, 2024
72f6f0c
fix unit tests
cam-schultz May 10, 2024
08235b7
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 13, 2024
5ffa9dc
Merge branch 'concurrent-messages' into app-relayer-worker
cam-schultz May 13, 2024
90391a6
go 1.22.3
cam-schultz May 13, 2024
69dbc80
bump golangci-lint version
cam-schultz May 13, 2024
acbc62d
Merge branch 'db-manager' into concurrent-messages
cam-schultz May 14, 2024
23dd427
Merge branch 'concurrent-messages' into app-relayer-worker
cam-schultz May 14, 2024
b57d05d
add batch Teleporter message test
cam-schultz May 14, 2024
1a051f3
do not assume ordered message delivery
cam-schultz May 14, 2024
2db03ec
migrate deprecated flag
cam-schultz May 14, 2024
248c852
checkout submodules in ci
cam-schultz May 14, 2024
b5cd801
re-checkout repo
cam-schultz May 14, 2024
0930225
Merge branch 'main' into app-relayer-worker
cam-schultz May 14, 2024
4d0cd01
remove unused param
cam-schultz May 14, 2024
1cd196e
relay between subnets, no c-chain
cam-schultz May 14, 2024
7b345c6
debug logs
cam-schultz May 14, 2024
b3c5b5e
batch test requires all messages delivered
cam-schultz May 14, 2024
8dce7b4
comments+cleanup
cam-schultz May 15, 2024
359032d
Merge branch 'main' into app-relayer-worker
cam-schultz May 20, 2024
f1887c9
cleanup external handler
cam-schultz May 21, 2024
65a9162
request id mutual exclusion
cam-schultz May 21, 2024
ef461da
comments and cleanup
cam-schultz May 21, 2024
cbddf3a
rename MessageManager to MessageHandlerFactory
cam-schultz May 21, 2024
8653be1
lint
cam-schultz May 21, 2024
610105b
revert to go1.21
cam-schultz May 21, 2024
e939eca
add handleResponse helper
cam-schultz May 21, 2024
0611a3b
Update relayer/application_relayer.go
cam-schultz May 22, 2024
b920a0f
rename shadowed var
cam-schultz May 22, 2024
757f988
cleanup
cam-schultz May 22, 2024
359dde4
process blocks async
cam-schultz May 22, 2024
00a1d4e
app relayer holds a dst client
cam-schultz May 22, 2024
8ed636b
fix unit test
cam-schultz May 22, 2024
ad391aa
Merge branch 'app-relayer-worker' of github.com:ava-labs/awm-relayer …
cam-schultz May 22, 2024
d1936ec
handle app relayer errors
cam-schultz May 23, 2024
59210e0
add upgrade comment
cam-schultz May 23, 2024
f4f44e6
only lock nonce usage
cam-schultz May 23, 2024
9e55c91
Merge branch 'main' into app-relayer-worker
geoff-vball May 28, 2024
077cea1
Merge branch 'main' into app-relayer-worker
geoff-vball May 31, 2024
1782f31
Refactor application relayers
geoff-vball May 31, 2024
ba2f686
Fix typo
geoff-vball May 31, 2024
024ffd2
Merge pull request #309 from ava-labs/gstuart/concurrent-message
geoff-vball May 31, 2024
65b8ba8
Fix key truncation
geoff-vball May 31, 2024
3404bf7
lint
geoff-vball May 31, 2024
7d2f8ee
Merge branch 'main' into app-relayer-worker
geoff-vball May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
steps:
- name: Checkout awm-relayer repository
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set Go version
run: |
Expand All @@ -45,6 +47,8 @@ jobs:

- name: Checkout awm-relayer repository
uses: actions/checkout@v4
with:
submodules: recursive

- name: Run E2E Tests
run: AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh
2 changes: 2 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set Go version
run: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
with:
fetch-depth: 0
path: awm-relayer
submodules: recursive

- name: Set Go version
run: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/snyk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
uses: actions/checkout@v4
with:
path: awm-relayer
submodules: recursive

- name: Run Snyk
uses: snyk/actions/golang@master
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
steps:
- name: Checkout awm-relayer repository
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set Go version
run: |
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ relayer-config.json
main.log
server.log
*.test

# Foundry outputs
cache/
out/
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[submodule "tests/contracts/lib/teleporter"]
path = tests/contracts/lib/teleporter
url = https://github.com/ava-labs/teleporter
[submodule "tests/contracts/lib/forge-std"]
path = tests/contracts/lib/forge-std
url = https://github.com/foundry-rs/forge-std
6 changes: 3 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
run:
timeout: 3m
tests: true
# skip auto-generated files.
skip-files:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest golangci-lint deprecates this option in favor of exclude-files

- ".*mock.*"

issues:
# Maximum count of issues with the same text. Set to 0 to disable. Default is 3.
max-same-issues: 0
# skip auto-generated files.
exclude-files:
- ".*mock.*"

linters:
disable-all: true
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ awm-relayer --version Display awm-relayer vers
awm-relayer --help Display awm-relayer usage and exit.
```

### Initialize the repository

- Get all submodules: `git submodule update --init --recursive`

### Building

Before building, be sure to install Go, which is required even if you're just building the Docker image.
Expand Down
2 changes: 1 addition & 1 deletion database/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func CalculateStartingBlockHeight(
} else if err != nil {
// Otherwise, we've encountered an unknown database error
logger.Error(
"failed to get latest block from database",
"Failed to get latest block from database",
zap.String("relayerID", relayerID.ID.String()),
zap.Error(err),
)
Expand Down
193 changes: 153 additions & 40 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/ethclient"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/types"
relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
Expand Down Expand Up @@ -122,7 +125,7 @@ func main() {
if logLevel <= logging.Debug {
networkLogLevel = logLevel
}
network, responseChans, err := peers.NewNetwork(
network, err := peers.NewNetwork(
networkLogLevel,
registerer,
&cfg,
Expand Down Expand Up @@ -200,18 +203,27 @@ func main() {
ticker := utils.NewTicker(cfg.DBWriteIntervalSeconds)
go ticker.Run()

manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpLogInfo)
// Gather manual Warp messages specified in the configuration
manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpMessageInfo)
for _, msg := range cfg.ManualWarpMessages {
sourceBlockchainID := msg.GetSourceBlockchainID()

warpLogInfo := relayerTypes.WarpLogInfo{
SourceAddress: msg.GetSourceAddress(),
UnsignedMsgBytes: msg.GetUnsignedMessageBytes(),
unsignedMsg, err := types.UnpackWarpMessage(msg.GetUnsignedMessageBytes())
if err != nil {
logger.Error(
"Failed to unpack manual Warp message",
zap.String("warpMessageBytes", hex.EncodeToString(msg.GetUnsignedMessageBytes())),
zap.Error(err),
)
panic(err)
}
warpLogInfo := relayerTypes.WarpMessageInfo{
SourceAddress: msg.GetSourceAddress(),
UnsignedMessage: unsignedMsg,
}
manualWarpMessages[sourceBlockchainID] = append(manualWarpMessages[sourceBlockchainID], &warpLogInfo)
}

// Create relayers for each of the subnets configured as a source
// Create listeners for each of the subnets configured as a source
errGroup, ctx := errgroup.WithContext(context.Background())
for _, s := range cfg.SourceBlockchains {
blockchainID, err := ids.FromString(s.BlockchainID)
Expand All @@ -229,16 +241,15 @@ func main() {

// errgroup will cancel the context when the first goroutine returns an error
errGroup.Go(func() error {
// runRelayer runs until it errors or the context is cancelled by another goroutine
return runRelayer(
// runListener runs until it errors or the context is cancelled by another goroutine
return runListener(
ctx,
logger,
metrics,
db,
ticker,
*subnetInfo,
network,
responseChans[blockchainID],
destinationClients,
messageCreator,
health,
Expand All @@ -254,76 +265,178 @@ func main() {
)
}

// runRelayer creates a relayer instance for a subnet. It listens for warp messages on that subnet, and handles delivery to the destination
func runRelayer(
// runListener creates a Listener instance and the ApplicationRelayers for a subnet.
// The Listener listens for warp messages on that subnet, and the ApplicationRelayers handle delivery to the destination
func runListener(
ctx context.Context,
logger logging.Logger,
metrics *relayer.ApplicationRelayerMetrics,
db database.RelayerDatabase,
ticker *utils.Ticker,
sourceSubnetInfo config.SourceBlockchain,
sourceBlockchain config.SourceBlockchain,
network *peers.AppRequestNetwork,
responseChan chan message.InboundMessage,
destinationClients map[ids.ID]vms.DestinationClient,
messageCreator message.Creator,
relayerHealth *atomic.Bool,
manualWarpMessages []*relayerTypes.WarpLogInfo,
manualWarpMessages []*relayerTypes.WarpMessageInfo,
cfg *config.Config,
) error {
logger.Info(
"Creating relayer",
zap.String("originBlockchainID", sourceSubnetInfo.BlockchainID),
// Dial the eth client
ethClient, err := ethclient.DialWithConfig(
context.Background(),
sourceBlockchain.RPCEndpoint.BaseURL,
sourceBlockchain.RPCEndpoint.HTTPHeaders,
sourceBlockchain.RPCEndpoint.QueryParams,
)
if err != nil {
logger.Error(
"Failed to connect to node via RPC",
zap.String("blockchainID", sourceBlockchain.BlockchainID),
zap.Error(err),
)
return err
}

listener, err := relayer.NewListener(
// Create the ApplicationRelayers
applicationRelayers, minHeight, err := createApplicationRelayers(
geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
ctx,
logger,
metrics,
db,
ticker,
sourceSubnetInfo,
sourceBlockchain,
network,
responseChan,
destinationClients,
messageCreator,
cfg,
ethClient,
destinationClients,
)
if err != nil {
logger.Error(
"Failed to create application relayers",
zap.String("blockchainID", sourceBlockchain.BlockchainID),
zap.Error(err),
)
return err
}
logger.Info(
"Created application relayers",
zap.String("blockchainID", sourceBlockchain.BlockchainID),
)

// Create the Listener
listener, err := relayer.NewListener(
logger,
sourceBlockchain,
relayerHealth,
cfg,
applicationRelayers,
minHeight,
ethClient,
)
if err != nil {
return fmt.Errorf("failed to create listener instance: %w", err)
}
logger.Info(
"Created listener",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
zap.String("blockchainID", sourceBlockchain.BlockchainID),
)

// Send any messages that were specified in the configuration
for _, warpMessage := range manualWarpMessages {
logger.Info(
"Relaying manual Warp message",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMsgBytes)),
err = listener.ProcessManualWarpMessages(logger, manualWarpMessages, sourceBlockchain)
geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Error(
"Failed to process manual Warp messages",
zap.String("blockchainID", sourceBlockchain.BlockchainID),
zap.Error(err),
)
err := listener.RouteManualWarpMessage(warpMessage)
if err != nil {
logger.Error(
"Failed to relay manual Warp message. Continuing.",
zap.Error(err),
zap.String("warpMessageBytes", hex.EncodeToString(warpMessage.UnsignedMsgBytes)),
)
continue
}
}

logger.Info(
"Listener initialized. Listening for messages to relay.",
zap.String("originBlockchainID", sourceSubnetInfo.BlockchainID),
zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
)

// Wait for logs from the subscribed node
// Will only return on error or context cancellation
return listener.ProcessLogs(ctx)
}

func createApplicationRelayers(
geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
logger logging.Logger,
metrics *relayer.ApplicationRelayerMetrics,
db database.RelayerDatabase,
ticker *utils.Ticker,
sourceBlockchain config.SourceBlockchain,
network *peers.AppRequestNetwork,
messageCreator message.Creator,
cfg *config.Config,
srcEthClient ethclient.Client,
destinationClients map[ids.ID]vms.DestinationClient,
) (map[common.Hash]*relayer.ApplicationRelayer, uint64, error) {
// Create the ApplicationRelayers
logger.Info(
"Creating application relayers",
zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
)
applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)

currentHeight, err := srcEthClient.BlockNumber(context.Background())
if err != nil {
logger.Error(
"Failed to get current block height",
zap.Error(err),
)
return nil, 0, err
}

// Each ApplicationRelayer determines its starting height based on the database state.
// The Listener begins processing messages starting from the minimum height across all of the ApplicationRelayers
minHeight := uint64(0)
for _, relayerID := range database.GetSourceBlockchainRelayerIDs(&sourceBlockchain) {
height, err := database.CalculateStartingBlockHeight(
logger,
db,
relayerID,
sourceBlockchain.ProcessHistoricalBlocksFromHeight,
currentHeight,
)
if err != nil {
logger.Error(
"Failed to calculate starting block height",
zap.String("relayerID", relayerID.ID.String()),
zap.Error(err),
)
return nil, 0, err
}
if minHeight == 0 || height < minHeight {
minHeight = height
}
applicationRelayer, err := relayer.NewApplicationRelayer(
logger,
metrics,
network,
messageCreator,
relayerID,
db,
ticker,
destinationClients[relayerID.DestinationBlockchainID],
sourceBlockchain,
height,
cfg,
)
if err != nil {
logger.Error(
"Failed to create application relayer",
zap.String("relayerID", relayerID.ID.String()),
zap.Error(err),
)
return nil, 0, err
}
applicationRelayers[relayerID.ID] = applicationRelayer
}
return applicationRelayers, minHeight, nil
}

func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint16) {
http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))

Expand Down
Loading