From 7cae6fe2caa74962149946c0c1a6920ecaecad50 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 25 Jun 2024 17:50:15 -0500 Subject: [PATCH 01/14] warp api cfg option --- config/source_blockchain.go | 14 ++++++++++++++ relayer/application_relayer.go | 8 ++------ tests/e2e_test.go | 3 +++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/config/source_blockchain.go b/config/source_blockchain.go index 4d069dd1..d3e6f035 100644 --- a/config/source_blockchain.go +++ b/config/source_blockchain.go @@ -24,11 +24,13 @@ type SourceBlockchain struct { SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"` ProcessHistoricalBlocksFromHeight uint64 `mapstructure:"process-historical-blocks-from-height" json:"process-historical-blocks-from-height"` AllowedOriginSenderAddresses []string `mapstructure:"allowed-origin-sender-addresses" json:"allowed-origin-sender-addresses"` + WarpAPIEndpoint APIConfig `mapstructure:"warp-api-endpoint" json:"warp-api-endpoint"` // convenience fields to access parsed data after initialization subnetID ids.ID blockchainID ids.ID allowedOriginSenderAddresses []common.Address + useAppRequestNetwork bool } // Validates the source subnet configuration, including verifying that the supported destinations are present in destinationBlockchainIDs @@ -47,6 +49,14 @@ func (s *SourceBlockchain) Validate(destinationBlockchainIDs *set.Set[string]) e if err := s.WSEndpoint.Validate(); err != nil { return fmt.Errorf("invalid ws-endpoint in source subnet configuration: %w", err) } + // The Warp API endpoint is optional. If omitted, signatures are fetched from validators via app request. + if s.WarpAPIEndpoint.BaseURL != "" { + if err := s.WarpAPIEndpoint.Validate(); err != nil { + return fmt.Errorf("invalid warp-api-endpoint in source subnet configuration: %w", err) + } + } else { + s.useAppRequestNetwork = true + } // Validate the VM specific settings switch ParseVM(s.VM) { @@ -140,6 +150,10 @@ func (s *SourceBlockchain) GetAllowedOriginSenderAddresses() []common.Address { return s.allowedOriginSenderAddresses } +func (s *SourceBlockchain) UseAppRequestNetwork() bool { + return s.useAppRequestNetwork +} + // Specifies a supported destination blockchain and addresses for a source blockchain. type SupportedDestination struct { BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 7097a80b..d803aa17 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -170,7 +170,6 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) err err := r.relayMessage( reqID, handler, - true, ) return err @@ -183,7 +182,6 @@ func (r *ApplicationRelayer) RelayerID() database.RelayerID { func (r *ApplicationRelayer) relayMessage( requestID uint32, handler messages.MessageHandler, - useAppRequestNetwork bool, ) error { r.logger.Debug( "Relaying message", @@ -209,7 +207,7 @@ func (r *ApplicationRelayer) relayMessage( startCreateSignedMessageTime := time.Now() // Query nodes on the origin chain for signatures, and construct the signed warp message. var signedMessage *avalancheWarp.Message - if useAppRequestNetwork { + if r.sourceBlockchain.UseAppRequestNetwork() { signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID) if err != nil { r.logger.Error( @@ -257,9 +255,7 @@ func (r *ApplicationRelayer) relayMessage( // will need to be accounted for here. func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) { r.logger.Info("Fetching aggregate signature from the source chain validators via API") - // TODO: To properly support this, we should provide a dedicated Warp API endpoint in the config - uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint.BaseURL, "/ext") - warpClient, err := warpBackend.NewClient(uri, r.sourceBlockchain.GetBlockchainID().String()) + warpClient, err := warpBackend.NewClient(r.sourceBlockchain.WarpAPIEndpoint.BaseURL, r.sourceBlockchain.GetBlockchainID().String()) if err != nil { r.logger.Error( "Failed to create Warp API client", diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 1e1b8df5..a587c874 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -80,4 +80,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Batch Message", func() { BatchRelay(localNetworkInstance) }) + ginkgo.It("Warp API", func() { + WarpAPIRelay(localNetworkInstance) + }) }) From 35b63b7acbc65e7e534391f30b6bb2a8d1d9a11b Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 25 Jun 2024 18:01:31 -0500 Subject: [PATCH 02/14] dial warp api on construction --- relayer/application_relayer.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index d803aa17..9d330f08 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -70,6 +70,7 @@ type ApplicationRelayer struct { checkpointManager *checkpoint.CheckpointManager currentRequestID uint32 lock *sync.RWMutex + warpClient warpBackend.Client } func NewApplicationRelayer( @@ -108,6 +109,18 @@ func NewApplicationRelayer( checkpointManager := checkpoint.NewCheckpointManager(logger, db, sub, relayerID, startingHeight) checkpointManager.Run() + var warpClient warpBackend.Client + if !sourceBlockchain.UseAppRequestNetwork() { + warpClient, err = warpBackend.NewClient(sourceBlockchain.WarpAPIEndpoint.BaseURL, sourceBlockchain.GetBlockchainID().String()) + if err != nil { + logger.Error( + "Failed to create Warp API client", + zap.Error(err), + ) + return nil, err + } + } + ar := ApplicationRelayer{ logger: logger, metrics: metrics, @@ -121,6 +134,7 @@ func NewApplicationRelayer( checkpointManager: checkpointManager, currentRequestID: rand.Uint32(), // TODONOW: pass via ctor lock: &sync.RWMutex{}, + warpClient: warpClient, } return &ar, nil @@ -255,16 +269,11 @@ func (r *ApplicationRelayer) relayMessage( // will need to be accounted for here. func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) { r.logger.Info("Fetching aggregate signature from the source chain validators via API") - warpClient, err := warpBackend.NewClient(r.sourceBlockchain.WarpAPIEndpoint.BaseURL, r.sourceBlockchain.GetBlockchainID().String()) - if err != nil { - r.logger.Error( - "Failed to create Warp API client", - zap.Error(err), - ) - return nil, err - } - var signedWarpMessageBytes []byte + var ( + signedWarpMessageBytes []byte + err error + ) for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ { r.logger.Debug( "Relayer collecting signatures from peers.", @@ -273,7 +282,7 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp. zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), zap.String("signingSubnetID", r.signingSubnetID.String()), ) - signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature( + signedWarpMessageBytes, err = r.warpClient.GetMessageAggregateSignature( context.Background(), unsignedMessage.ID(), r.warpQuorum.QuorumNumerator, From 48755d94c2df6da45b3c0535097880dc39b78a86 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 25 Jun 2024 18:15:11 -0500 Subject: [PATCH 03/14] warp api e2e test --- tests/warp_api.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/warp_api.go diff --git a/tests/warp_api.go b/tests/warp_api.go new file mode 100644 index 00000000..1e31d743 --- /dev/null +++ b/tests/warp_api.go @@ -0,0 +1,95 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tests + +import ( + "context" + "time" + + testUtils "github.com/ava-labs/awm-relayer/tests/utils" + "github.com/ava-labs/teleporter/tests/interfaces" + "github.com/ava-labs/teleporter/tests/utils" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + . "github.com/onsi/gomega" +) + +// This tests the basic functionality of the relayer using the Warp API/, rather than app requests. Includes: +// - Relaying from Subnet A to Subnet B +// - Relaying from Subnet B to Subnet A +func WarpAPIRelay(network interfaces.LocalNetwork) { + subnetAInfo := network.GetPrimaryNetworkInfo() + subnetBInfo, _ := utils.GetTwoSubnets(network) + fundedAddress, fundedKey := network.GetFundedAccountInfo() + teleporterContractAddress := network.GetTeleporterContractAddress() + err := testUtils.ClearRelayerStorage() + Expect(err).Should(BeNil()) + + // + // Fund the relayer address on all subnets + // + ctx := context.Background() + + log.Info("Funding relayer address on all subnets") + relayerKey, err := crypto.GenerateKey() + Expect(err).Should(BeNil()) + testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey) + + // + // Set up relayer config + // + relayerConfig := testUtils.CreateDefaultRelayerConfig( + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + teleporterContractAddress, + fundedAddress, + relayerKey, + ) + // Enable the Warp API for all source blockchains + for _, subnet := range relayerConfig.SourceBlockchains { + subnet.WarpAPIEndpoint = subnet.RPCEndpoint + } + + relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) + + // + // Test Relaying from Subnet A to Subnet B + // + log.Info("Test Relaying from Subnet A to Subnet B") + + log.Info("Starting the relayer") + relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath) + defer relayerCleanup() + + // Sleep for some time to make sure relayer has started up and subscribed. + log.Info("Waiting for the relayer to start up") + time.Sleep(15 * time.Second) + + log.Info("Sending transaction from Subnet A to Subnet B") + testUtils.RelayBasicMessage( + ctx, + subnetAInfo, + subnetBInfo, + teleporterContractAddress, + fundedKey, + fundedAddress, + ) + + // + // Test Relaying from Subnet B to Subnet A + // + log.Info("Test Relaying from Subnet B to Subnet A") + testUtils.RelayBasicMessage( + ctx, + subnetBInfo, + subnetAInfo, + teleporterContractAddress, + fundedKey, + fundedAddress, + ) + + log.Info("Finished sending warp message, closing down output channel") + // Cancel the command and stop the relayer + relayerCleanup() +} From 28c941ebd5651079a6727f1435898b0f006c3023 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 25 Jun 2024 18:18:02 -0500 Subject: [PATCH 04/14] invoke warp api directly --- config/destination_blockchain.go | 2 +- main/main.go | 2 +- relayer/application_relayer.go | 23 +++++++++++++++++------ relayer/listener.go | 2 +- utils/client_utils.go | 15 ++++++++++++--- vms/evm/destination_client.go | 2 +- 6 files changed, 33 insertions(+), 13 deletions(-) diff --git a/config/destination_blockchain.go b/config/destination_blockchain.go index 709c87aa..80cd9ed0 100644 --- a/config/destination_blockchain.go +++ b/config/destination_blockchain.go @@ -91,7 +91,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := utils.DialWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams) + client, err := utils.NewEthClientWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } diff --git a/main/main.go b/main/main.go index 6ec3340f..248eaeda 100644 --- a/main/main.go +++ b/main/main.go @@ -243,7 +243,7 @@ func main() { // errgroup will cancel the context when the first goroutine returns an error errGroup.Go(func() error { // Dial the eth client - ethClient, err := utils.DialWithConfig( + ethClient, err := utils.NewEthClientWithConfig( context.Background(), sourceBlockchain.RPCEndpoint.BaseURL, sourceBlockchain.RPCEndpoint.HTTPHeaders, diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 9d330f08..0495075b 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -30,7 +30,8 @@ import ( "github.com/ava-labs/awm-relayer/vms" coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" - warpBackend "github.com/ava-labs/subnet-evm/warp" + "github.com/ava-labs/subnet-evm/rpc" + "github.com/ethereum/go-ethereum/common/hexutil" "golang.org/x/sync/errgroup" "go.uber.org/zap" @@ -70,7 +71,7 @@ type ApplicationRelayer struct { checkpointManager *checkpoint.CheckpointManager currentRequestID uint32 lock *sync.RWMutex - warpClient warpBackend.Client + warpClient *rpc.Client } func NewApplicationRelayer( @@ -109,9 +110,16 @@ func NewApplicationRelayer( checkpointManager := checkpoint.NewCheckpointManager(logger, db, sub, relayerID, startingHeight) checkpointManager.Run() - var warpClient warpBackend.Client + var warpClient *rpc.Client if !sourceBlockchain.UseAppRequestNetwork() { - warpClient, err = warpBackend.NewClient(sourceBlockchain.WarpAPIEndpoint.BaseURL, sourceBlockchain.GetBlockchainID().String()) + // The subnet-evm Warp API client does not support query parameters or HTTP headers, and expects the URI to be in a specific form. + // Instead, we invoke the Warp API directly via the RPC client. + warpClient, err = utils.DialWithConfig( + context.Background(), + sourceBlockchain.WarpAPIEndpoint.BaseURL, + sourceBlockchain.WarpAPIEndpoint.HTTPHeaders, + sourceBlockchain.WarpAPIEndpoint.QueryParams, + ) if err != nil { logger.Error( "Failed to create Warp API client", @@ -271,7 +279,7 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp. r.logger.Info("Fetching aggregate signature from the source chain validators via API") var ( - signedWarpMessageBytes []byte + signedWarpMessageBytes hexutil.Bytes err error ) for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ { @@ -282,8 +290,11 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp. zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), zap.String("signingSubnetID", r.signingSubnetID.String()), ) - signedWarpMessageBytes, err = r.warpClient.GetMessageAggregateSignature( + + err = r.warpClient.CallContext( context.Background(), + &signedWarpMessageBytes, + "warp_getMessageAggregateSignature", unsignedMessage.ID(), r.warpQuorum.QuorumNumerator, r.signingSubnetID.String(), diff --git a/relayer/listener.go b/relayer/listener.go index 6bc72bfa..345e79b9 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -66,7 +66,7 @@ func NewListener( ) return nil, err } - ethWSClient, err := utils.DialWithConfig( + ethWSClient, err := utils.NewEthClientWithConfig( context.Background(), sourceBlockchain.WSEndpoint.BaseURL, sourceBlockchain.WSEndpoint.HTTPHeaders, diff --git a/utils/client_utils.go b/utils/client_utils.go index 05d4562a..051a5705 100644 --- a/utils/client_utils.go +++ b/utils/client_utils.go @@ -15,8 +15,17 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") -// DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. -func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { +// NewEthClientWithConfig returns an ethclient.Client with the internal RPC client configured with the provided options. +func NewEthClientWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { + client, err := DialWithConfig(ctx, baseURL, httpHeaders, queryParams) + if err != nil { + return nil, err + } + return ethclient.NewClient(client), nil +} + +// DialWithConfig dials the provided baseURL with the provided httpHeaders and queryParams +func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (*rpc.Client, error) { url, err := addQueryParams(baseURL, queryParams) if err != nil { return nil, err @@ -25,7 +34,7 @@ func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParam if err != nil { return nil, err } - return ethclient.NewClient(client), nil + return client, nil } // addQueryParams adds the query parameters to the url diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index 6be100dc..aea6ab90 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -52,7 +52,7 @@ func NewDestinationClient( destinationBlockchain *config.DestinationBlockchain, ) (*destinationClient, error) { // Dial the destination RPC endpoint - client, err := utils.DialWithConfig( + client, err := utils.NewEthClientWithConfig( context.Background(), destinationBlockchain.RPCEndpoint.BaseURL, destinationBlockchain.RPCEndpoint.HTTPHeaders, From eaef7661a5ff6525428bd9302224f2b32d8d1a31 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 25 Jun 2024 18:25:35 -0500 Subject: [PATCH 05/14] update readme --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c64403ea..64a0380a 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,8 @@ The Fuji and Mainnet [public API nodes](https://docs.avax.network/tooling/rpc-pr ### Peer-to-Peer Connections -- The AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip). +- By default, the AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip). +- If configured to use the Warp API (see `warp-api-endpoint` in [Configuration](#configuration)) then aggregate signatures are fetched via a single RPC request, rather than `AppRequests` to individual validators. Note that the Warp API is disabled on the public API. ### Private Key Management @@ -252,6 +253,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th - List of addresses on this source blockchain to relay Warp messages from. The sending address is defined by the message protocol. For example, it could be defined as the EOA that initiates the transaction, or the address that calls the message protocol contract. If empty, then all addresses are allowed. + `"warp-api-endpoint": APIConfig` + + - The RPC endpoint configuration for the Warp API, which is used to fetch Warp aggregate signatures. If omitted, then signatures are fetched via AppRequest instead. + `"destination-blockchains": []DestinationBlockchains` - The list of destination blockchains to support. Each `DestinationBlockchain` has the following configuration: From 9b0e9746084b60108177195f4875912f39ede87c Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 09:14:30 -0500 Subject: [PATCH 06/14] rename warp client field --- relayer/application_relayer.go | 58 ++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 0495075b..f7ab7b1e 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -59,19 +59,19 @@ var ( // to a specific destination address on a specific destination blockchain. This routing information is // encapsulated in [relayerID], which also represents the database key for an ApplicationRelayer. type ApplicationRelayer struct { - logger logging.Logger - metrics *ApplicationRelayerMetrics - network *peers.AppRequestNetwork - messageCreator message.Creator - sourceBlockchain config.SourceBlockchain - signingSubnetID ids.ID - destinationClient vms.DestinationClient - relayerID database.RelayerID - warpQuorum config.WarpQuorum - checkpointManager *checkpoint.CheckpointManager - currentRequestID uint32 - lock *sync.RWMutex - warpClient *rpc.Client + logger logging.Logger + metrics *ApplicationRelayerMetrics + network *peers.AppRequestNetwork + messageCreator message.Creator + sourceBlockchain config.SourceBlockchain + signingSubnetID ids.ID + destinationClient vms.DestinationClient + relayerID database.RelayerID + warpQuorum config.WarpQuorum + checkpointManager *checkpoint.CheckpointManager + currentRequestID uint32 + lock *sync.RWMutex + sourceWarpSignatureClient *rpc.Client } func NewApplicationRelayer( @@ -130,19 +130,19 @@ func NewApplicationRelayer( } ar := ApplicationRelayer{ - logger: logger, - metrics: metrics, - network: network, - messageCreator: messageCreator, - sourceBlockchain: sourceBlockchain, - destinationClient: destinationClient, - relayerID: relayerID, - signingSubnetID: signingSubnet, - warpQuorum: quorum, - checkpointManager: checkpointManager, - currentRequestID: rand.Uint32(), // TODONOW: pass via ctor - lock: &sync.RWMutex{}, - warpClient: warpClient, + logger: logger, + metrics: metrics, + network: network, + messageCreator: messageCreator, + sourceBlockchain: sourceBlockchain, + destinationClient: destinationClient, + relayerID: relayerID, + signingSubnetID: signingSubnet, + warpQuorum: quorum, + checkpointManager: checkpointManager, + currentRequestID: rand.Uint32(), // TODONOW: pass via ctor + lock: &sync.RWMutex{}, + sourceWarpSignatureClient: warpClient, } return &ar, nil @@ -229,7 +229,9 @@ func (r *ApplicationRelayer) relayMessage( startCreateSignedMessageTime := time.Now() // Query nodes on the origin chain for signatures, and construct the signed warp message. var signedMessage *avalancheWarp.Message - if r.sourceBlockchain.UseAppRequestNetwork() { + + // sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest + if r.sourceWarpSignatureClient == nil { signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID) if err != nil { r.logger.Error( @@ -291,7 +293,7 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp. zap.String("signingSubnetID", r.signingSubnetID.String()), ) - err = r.warpClient.CallContext( + err = r.sourceWarpSignatureClient.CallContext( context.Background(), &signedWarpMessageBytes, "warp_getMessageAggregateSignature", From dea8f81ab2ccd82be5d9bc12004da572ea80126c Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 09:54:34 -0500 Subject: [PATCH 07/14] add rpc and apprequest metrics --- relayer/application_relayer.go | 18 ++++++++++++ relayer/application_relayer_metrics.go | 40 ++++++++++++++++++++++---- tests/e2e_test.go | 36 +++++++++++------------ tests/utils/utils.go | 1 + tests/warp_api.go | 36 +++++++++++++++++++++++ 5 files changed, 107 insertions(+), 24 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index f7ab7b1e..86650061 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -232,6 +232,7 @@ func (r *ApplicationRelayer) relayMessage( // sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest if r.sourceWarpSignatureClient == nil { + r.incFetchSignatureAppRequestCount() signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID) if err != nil { r.logger.Error( @@ -242,6 +243,7 @@ func (r *ApplicationRelayer) relayMessage( return err } } else { + r.incFetchSignatureRPCCount() signedMessage, err = r.createSignedMessage(unsignedMessage) if err != nil { r.logger.Error( @@ -750,3 +752,19 @@ func (r *ApplicationRelayer) setCreateSignedMessageLatencyMS(latency float64) { r.sourceBlockchain.GetBlockchainID().String(), r.sourceBlockchain.GetSubnetID().String()).Set(latency) } + +func (r *ApplicationRelayer) incFetchSignatureRPCCount() { + r.metrics.fetchSignatureRPCCount. + WithLabelValues( + r.relayerID.DestinationBlockchainID.String(), + r.sourceBlockchain.GetBlockchainID().String(), + r.sourceBlockchain.GetSubnetID().String()).Inc() +} + +func (r *ApplicationRelayer) incFetchSignatureAppRequestCount() { + r.metrics.fetchSignatureAppRequestCount. + WithLabelValues( + r.relayerID.DestinationBlockchainID.String(), + r.sourceBlockchain.GetBlockchainID().String(), + r.sourceBlockchain.GetSubnetID().String()).Inc() +} diff --git a/relayer/application_relayer_metrics.go b/relayer/application_relayer_metrics.go index f36d8f2e..7a6b321e 100644 --- a/relayer/application_relayer_metrics.go +++ b/relayer/application_relayer_metrics.go @@ -14,9 +14,11 @@ var ( ) type ApplicationRelayerMetrics struct { - successfulRelayMessageCount *prometheus.CounterVec - createSignedMessageLatencyMS *prometheus.GaugeVec - failedRelayMessageCount *prometheus.CounterVec + successfulRelayMessageCount *prometheus.CounterVec + createSignedMessageLatencyMS *prometheus.GaugeVec + failedRelayMessageCount *prometheus.CounterVec + fetchSignatureAppRequestCount *prometheus.CounterVec + fetchSignatureRPCCount *prometheus.CounterVec } func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*ApplicationRelayerMetrics, error) { @@ -53,9 +55,35 @@ func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*Applicatio } registerer.MustRegister(failedRelayMessageCount) + fetchSignatureAppRequestCount := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "fetch_signature_app_request_count", + Help: "Number messages signed via AppRequest", + }, + []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + ) + if fetchSignatureAppRequestCount == nil { + return nil, ErrFailedToCreateApplicationRelayerMetrics + } + registerer.MustRegister(fetchSignatureAppRequestCount) + + fetchSignatureRPCCount := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "fetch_signature_rpc_count", + Help: "Number messages signed via Warp API", + }, + []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + ) + if fetchSignatureRPCCount == nil { + return nil, ErrFailedToCreateApplicationRelayerMetrics + } + registerer.MustRegister(fetchSignatureRPCCount) + return &ApplicationRelayerMetrics{ - successfulRelayMessageCount: successfulRelayMessageCount, - createSignedMessageLatencyMS: createSignedMessageLatencyMS, - failedRelayMessageCount: failedRelayMessageCount, + successfulRelayMessageCount: successfulRelayMessageCount, + createSignedMessageLatencyMS: createSignedMessageLatencyMS, + failedRelayMessageCount: failedRelayMessageCount, + fetchSignatureAppRequestCount: fetchSignatureAppRequestCount, + fetchSignatureRPCCount: fetchSignatureRPCCount, }, nil } diff --git a/tests/e2e_test.go b/tests/e2e_test.go index a587c874..fe634343 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -62,24 +62,24 @@ var _ = ginkgo.AfterSuite(func() { }) var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { - ginkgo.It("Manually Provided Message", func() { - ManualMessage(localNetworkInstance) - }) - ginkgo.It("Basic Relay", func() { - BasicRelay(localNetworkInstance) - }) - ginkgo.It("Teleporter Registry", func() { - TeleporterRegistry(localNetworkInstance) - }) - ginkgo.It("Shared Database", func() { - SharedDatabaseAccess(localNetworkInstance) - }) - ginkgo.It("Allowed Addresses", func() { - AllowedAddresses(localNetworkInstance) - }) - ginkgo.It("Batch Message", func() { - BatchRelay(localNetworkInstance) - }) + // ginkgo.It("Manually Provided Message", func() { + // ManualMessage(localNetworkInstance) + // }) + // ginkgo.It("Basic Relay", func() { + // BasicRelay(localNetworkInstance) + // }) + // ginkgo.It("Teleporter Registry", func() { + // TeleporterRegistry(localNetworkInstance) + // }) + // ginkgo.It("Shared Database", func() { + // SharedDatabaseAccess(localNetworkInstance) + // }) + // ginkgo.It("Allowed Addresses", func() { + // AllowedAddresses(localNetworkInstance) + // }) + // ginkgo.It("Batch Message", func() { + // BatchRelay(localNetworkInstance) + // }) ginkgo.It("Warp API", func() { WarpAPIRelay(localNetworkInstance) }) diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 54b4bc6d..3d8fd8d1 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -201,6 +201,7 @@ func CreateDefaultRelayerConfig( StorageLocation: StorageLocation, DBWriteIntervalSeconds: DBUpdateSeconds, ProcessMissedBlocks: false, + MetricsPort: 9090, SourceBlockchains: sources, DestinationBlockchains: destinations, } diff --git a/tests/warp_api.go b/tests/warp_api.go index 1e31d743..249b5b17 100644 --- a/tests/warp_api.go +++ b/tests/warp_api.go @@ -4,7 +4,13 @@ package tests import ( + "bufio" "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" "time" testUtils "github.com/ava-labs/awm-relayer/tests/utils" @@ -89,6 +95,36 @@ func WarpAPIRelay(network interfaces.LocalNetwork) { fundedAddress, ) + // + // Verify the messages were signed using the Warp API + // + log.Info("Verifying the messages were signed using the Warp API") + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", relayerConfig.MetricsPort)) + Expect(err).Should(BeNil()) + + body, err := io.ReadAll(resp.Body) + Expect(err).Should(BeNil()) + defer resp.Body.Close() + + metricName := "app_fetch_signature_rpc_count" + var totalCount uint64 + scanner := bufio.NewScanner(strings.NewReader(string(body))) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, metricName) { + log.Info("Found metric line", "metric", line) + parts := strings.Fields(line) + + // Fetch the metric count from the last field of the line + value, err := strconv.ParseUint(parts[len(parts)-1], 10, 64) + if err != nil { + continue + } + totalCount += value + } + } + Expect(totalCount).Should(Equal(uint64(2))) + log.Info("Finished sending warp message, closing down output channel") // Cancel the command and stop the relayer relayerCleanup() From 85007d5a0cb48b718b39352b2fba67839ec0e503 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 09:55:51 -0500 Subject: [PATCH 08/14] add comment regarding nil pointer condition --- relayer/application_relayer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 86650061..1d76e19c 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -71,7 +71,7 @@ type ApplicationRelayer struct { checkpointManager *checkpoint.CheckpointManager currentRequestID uint32 lock *sync.RWMutex - sourceWarpSignatureClient *rpc.Client + sourceWarpSignatureClient *rpc.Client // nil if the source blockchain is configured to sign messages via AppRequest } func NewApplicationRelayer( From 6ea3bff07b72b77bc728c32c91ab6f20dbfbc215 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 09:56:15 -0500 Subject: [PATCH 09/14] comment back in all tests --- tests/e2e_test.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/e2e_test.go b/tests/e2e_test.go index fe634343..a587c874 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -62,24 +62,24 @@ var _ = ginkgo.AfterSuite(func() { }) var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { - // ginkgo.It("Manually Provided Message", func() { - // ManualMessage(localNetworkInstance) - // }) - // ginkgo.It("Basic Relay", func() { - // BasicRelay(localNetworkInstance) - // }) - // ginkgo.It("Teleporter Registry", func() { - // TeleporterRegistry(localNetworkInstance) - // }) - // ginkgo.It("Shared Database", func() { - // SharedDatabaseAccess(localNetworkInstance) - // }) - // ginkgo.It("Allowed Addresses", func() { - // AllowedAddresses(localNetworkInstance) - // }) - // ginkgo.It("Batch Message", func() { - // BatchRelay(localNetworkInstance) - // }) + ginkgo.It("Manually Provided Message", func() { + ManualMessage(localNetworkInstance) + }) + ginkgo.It("Basic Relay", func() { + BasicRelay(localNetworkInstance) + }) + ginkgo.It("Teleporter Registry", func() { + TeleporterRegistry(localNetworkInstance) + }) + ginkgo.It("Shared Database", func() { + SharedDatabaseAccess(localNetworkInstance) + }) + ginkgo.It("Allowed Addresses", func() { + AllowedAddresses(localNetworkInstance) + }) + ginkgo.It("Batch Message", func() { + BatchRelay(localNetworkInstance) + }) ginkgo.It("Warp API", func() { WarpAPIRelay(localNetworkInstance) }) From d096ee45e0e9704e4a26a406688f405e7ff63bfd Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 10:06:24 -0500 Subject: [PATCH 10/14] do not collect network or app request metrics --- main/main.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/main/main.go b/main/main.go index 248eaeda..e33c5d8e 100644 --- a/main/main.go +++ b/main/main.go @@ -122,13 +122,14 @@ func main() { // The app request network generates P2P networking logs that are verbose at the info level. // Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs. + // We do not collect metrics for the network. networkLogLevel := logging.Error if logLevel <= logging.Debug { networkLogLevel = logLevel } network, err := peers.NewNetwork( networkLogLevel, - registerer, + prometheus.DefaultRegisterer, &cfg, ) if err != nil { @@ -181,7 +182,14 @@ func main() { } // Initialize message creator passed down to relayers for creating app requests. - messageCreator, err := message.NewCreator(logger, registerer, "message_creator", constants.DefaultNetworkCompressionType, constants.DefaultNetworkMaximumInboundTimeout) + // We do not collect metrics for the message creator. + messageCreator, err := message.NewCreator( + logger, + prometheus.DefaultRegisterer, + "message_creator", + constants.DefaultNetworkCompressionType, + constants.DefaultNetworkMaximumInboundTimeout, + ) if err != nil { logger.Error( "Failed to create message creator", From b93ecd0fe9df749829f215714cb84539223a3b2c Mon Sep 17 00:00:00 2001 From: cam-schultz <78878559+cam-schultz@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:08:24 -0500 Subject: [PATCH 11/14] Update relayer/application_relayer_metrics.go Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com> Signed-off-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com> --- relayer/application_relayer_metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/application_relayer_metrics.go b/relayer/application_relayer_metrics.go index 7a6b321e..ca8d5f61 100644 --- a/relayer/application_relayer_metrics.go +++ b/relayer/application_relayer_metrics.go @@ -58,7 +58,7 @@ func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*Applicatio fetchSignatureAppRequestCount := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "fetch_signature_app_request_count", - Help: "Number messages signed via AppRequest", + Help: "Number of aggregate signatures constructed via AppRequest", }, []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, ) From 8fd9954cee574418e063a33112fe83d65f0dcd4a Mon Sep 17 00:00:00 2001 From: cam-schultz <78878559+cam-schultz@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:08:33 -0500 Subject: [PATCH 12/14] Update relayer/application_relayer_metrics.go Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com> Signed-off-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com> --- relayer/application_relayer_metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/application_relayer_metrics.go b/relayer/application_relayer_metrics.go index ca8d5f61..e4e74e3e 100644 --- a/relayer/application_relayer_metrics.go +++ b/relayer/application_relayer_metrics.go @@ -70,7 +70,7 @@ func NewApplicationRelayerMetrics(registerer prometheus.Registerer) (*Applicatio fetchSignatureRPCCount := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "fetch_signature_rpc_count", - Help: "Number messages signed via Warp API", + Help: "Number of aggregate signatures fetched via Warp API", }, []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, ) From 22bbf5c015f8028106f6de22999572b15a0f5d39 Mon Sep 17 00:00:00 2001 From: cam-schultz <78878559+cam-schultz@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:08:48 -0500 Subject: [PATCH 13/14] Update relayer/application_relayer.go Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com> Signed-off-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com> --- relayer/application_relayer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 1d76e19c..c1f59600 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -71,7 +71,7 @@ type ApplicationRelayer struct { checkpointManager *checkpoint.CheckpointManager currentRequestID uint32 lock *sync.RWMutex - sourceWarpSignatureClient *rpc.Client // nil if the source blockchain is configured to sign messages via AppRequest + sourceWarpSignatureClient *rpc.Client // nil if configured to fetch signatures via AppRequest for the source blockchain } func NewApplicationRelayer( From 062cfd9615b220741cddbd93c7ec57307daed363 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Wed, 26 Jun 2024 10:11:56 -0500 Subject: [PATCH 14/14] add const metric name --- tests/warp_api.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/warp_api.go b/tests/warp_api.go index 249b5b17..4a793c7d 100644 --- a/tests/warp_api.go +++ b/tests/warp_api.go @@ -21,9 +21,13 @@ import ( . "github.com/onsi/gomega" ) +// Fully formed name of the metric that tracks the number aggregate signatures fetched from the Warp API +const rpcSignatureMetricName = "app_fetch_signature_rpc_count" + // This tests the basic functionality of the relayer using the Warp API/, rather than app requests. Includes: // - Relaying from Subnet A to Subnet B // - Relaying from Subnet B to Subnet A +// - Verifying the messages were signed using the Warp API func WarpAPIRelay(network interfaces.LocalNetwork) { subnetAInfo := network.GetPrimaryNetworkInfo() subnetBInfo, _ := utils.GetTwoSubnets(network) @@ -106,12 +110,11 @@ func WarpAPIRelay(network interfaces.LocalNetwork) { Expect(err).Should(BeNil()) defer resp.Body.Close() - metricName := "app_fetch_signature_rpc_count" var totalCount uint64 scanner := bufio.NewScanner(strings.NewReader(string(body))) for scanner.Scan() { line := scanner.Text() - if strings.HasPrefix(line, metricName) { + if strings.HasPrefix(line, rpcSignatureMetricName) { log.Info("Found metric line", "metric", line) parts := strings.Fields(line)