From 287d281038dbd4a3daa4435ad529fba911259d07 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 18 Dec 2023 13:43:23 +0000 Subject: [PATCH] FM-427: HybridClient (#462) --- fendermint/app/options/src/eth.rs | 22 +-- fendermint/app/src/cmd/eth.rs | 36 +--- fendermint/eth/api/Cargo.toml | 1 + fendermint/eth/api/src/apis/mod.rs | 4 +- fendermint/eth/api/src/client.rs | 193 ++++++++++++++++++++ fendermint/eth/api/src/lib.rs | 10 +- fendermint/rpc/src/client.rs | 8 +- fendermint/testing/smoke-test/Makefile.toml | 4 +- infra/docker-compose.yml | 1 + infra/scripts/ethapi.toml | 1 + 10 files changed, 228 insertions(+), 52 deletions(-) create mode 100644 fendermint/eth/api/src/client.rs diff --git a/fendermint/app/options/src/eth.rs b/fendermint/app/options/src/eth.rs index 75e459e1..1e9dd469 100644 --- a/fendermint/app/options/src/eth.rs +++ b/fendermint/app/options/src/eth.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use clap::{Args, Subcommand}; -use tendermint_rpc::Url; +use tendermint_rpc::{Url, WebSocketClientUrl}; #[derive(Args, Debug)] pub struct EthArgs { @@ -15,22 +15,22 @@ pub enum EthCommands { /// Run the Ethereum JSON-RPC facade. Run { /// The URL of the Tendermint node's RPC endpoint. + #[arg( + long, + short, + default_value = "http://127.0.0.1:26657", + env = "TENDERMINT_RPC_URL" + )] + http_url: Url, + + /// The URL of the Tendermint node's WebSocket endpoint. #[arg( long, short, default_value = "ws://127.0.0.1:26657/websocket", env = "TENDERMINT_WS_URL" )] - url: Url, - - /// An optional HTTP/S proxy through which to submit requests to the - /// Tendermint node's RPC endpoint. - #[arg(long)] - proxy_url: Option, - - /// Maximum number of times to try to connect to the websocket. - #[arg(long, short = 'r', default_value = "5")] - connect_max_retries: usize, + ws_url: WebSocketClientUrl, /// Seconds to wait between trying to connect to the websocket. #[arg(long, short = 'd', default_value = "5")] diff --git a/fendermint/app/src/cmd/eth.rs b/fendermint/app/src/cmd/eth.rs index 54744dd3..02d60755 100644 --- a/fendermint/app/src/cmd/eth.rs +++ b/fendermint/app/src/cmd/eth.rs @@ -4,8 +4,7 @@ use std::time::Duration; use anyhow::Context; -use fendermint_rpc::client::ws_client; -use tendermint_rpc::{Url, WebSocketClient, WebSocketClientDriver}; +use fendermint_eth_api::HybridClient; use crate::{ cmd, @@ -16,9 +15,9 @@ use crate::{ cmd! { EthArgs(self, settings: EthSettings) { match self.command.clone() { - EthCommands::Run { url, proxy_url:_, connect_max_retries, connect_retry_delay } => { + EthCommands::Run { ws_url, http_url, connect_retry_delay } => { - let (client, driver) = ws_connect(url, connect_max_retries, Duration::from_secs(connect_retry_delay)).await.context("failed to connect to Tendermint")?; + let (client, driver) = HybridClient::new(http_url, ws_url, Duration::from_secs(connect_retry_delay)).context("failed to create HybridClient")?; let driver_handle = tokio::spawn(async move { driver.run().await }); @@ -32,8 +31,8 @@ cmd! { } } -/// Run the Ethereum -async fn run(settings: EthSettings, client: WebSocketClient) -> anyhow::Result<()> { +/// Run the Ethereum API facade. +async fn run(settings: EthSettings, client: HybridClient) -> anyhow::Result<()> { let gas = fendermint_eth_api::GasOpt { min_gas_premium: settings.gas.min_gas_premium, num_blocks_max_prio_fee: settings.gas.num_blocks_max_prio_fee, @@ -48,28 +47,3 @@ async fn run(settings: EthSettings, client: WebSocketClient) -> anyhow::Result<( ) .await } - -/// Try connecting repeatedly until it succeeds. -async fn ws_connect( - url: Url, - max_retries: usize, - retry_delay: Duration, -) -> anyhow::Result<(WebSocketClient, WebSocketClientDriver)> { - let mut retry = 0; - loop { - match ws_client(url.clone()).await { - Ok(cd) => { - return Ok(cd); - } - Err(e) => { - if retry >= max_retries { - return Err(e); - } else { - tracing::warn!("failed to connect to Tendermint; retrying..."); - retry += 1; - tokio::time::sleep(retry_delay).await; - } - } - } - } -} diff --git a/fendermint/eth/api/Cargo.toml b/fendermint/eth/api/Cargo.toml index 07ae5b2c..bc631938 100644 --- a/fendermint/eth/api/Cargo.toml +++ b/fendermint/eth/api/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } axum = { workspace = true } ethers-core = { workspace = true } erased-serde = { workspace = true } diff --git a/fendermint/eth/api/src/apis/mod.rs b/fendermint/eth/api/src/apis/mod.rs index c5094fad..a5ade784 100644 --- a/fendermint/eth/api/src/apis/mod.rs +++ b/fendermint/eth/api/src/apis/mod.rs @@ -4,9 +4,9 @@ // See https://ethereum.org/en/developers/docs/apis/json-rpc/#json-rpc-methods // and https://ethereum.github.io/execution-apis/api-documentation/ +use crate::HybridClient; use jsonrpc_v2::{MapRouter, ServerBuilder}; use paste::paste; -use tendermint_rpc::WebSocketClient; mod eth; mod net; @@ -18,7 +18,7 @@ macro_rules! with_methods { $server $(.with_method( stringify!([< $module _ $method >]), - $module :: [< $method:snake >] :: + $module :: [< $method:snake >] :: ))* } }; diff --git a/fendermint/eth/api/src/client.rs b/fendermint/eth/api/src/client.rs new file mode 100644 index 00000000..6ef1fa20 --- /dev/null +++ b/fendermint/eth/api/src/client.rs @@ -0,0 +1,193 @@ +// Copyright 2022-2023 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::{pin::Pin, time::Duration}; + +use anyhow::Context; +use async_trait::async_trait; +use fendermint_rpc::client::{http_client, ws_client}; +use futures::Future; +use tendermint_rpc::{ + error::ErrorDetail, query::Query, Client, Error, HttpClient, SimpleRequest, Subscription, + SubscriptionClient, Url, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, +}; + +/// A mixed HTTP and WebSocket client. Uses HTTP to perform all +/// the JSON-RPC requests except the ones which require subscription, +/// which go through a WebSocket client. +/// +/// The WebSocket client is expected to lose connection with CometBFT, +/// in which case it will be re-established in the background. +/// +/// Existing subscriptions should receive an error and they can try +/// re-subscribing through the Ethereum API facade, which should create +/// new subscriptions through a fresh CometBFT client. +#[derive(Clone)] +pub struct HybridClient { + http_client: HttpClient, + cmd_tx: tokio::sync::mpsc::UnboundedSender, +} + +pub struct HybridClientDriver { + ws_url: WebSocketClientUrl, + retry_delay: Duration, + cmd_rx: tokio::sync::mpsc::UnboundedReceiver, +} + +enum DriverCommand { + Subscribe( + Query, + tokio::sync::oneshot::Sender>, + ), + Unsubscribe(Query, tokio::sync::oneshot::Sender>), + Close, +} + +impl HybridClient { + pub fn new( + http_url: Url, + ws_url: WebSocketClientUrl, + retry_delay: Duration, + ) -> anyhow::Result<(Self, HybridClientDriver)> { + let http_client = + http_client(http_url, None).context("failed to create Tendermint client")?; + + let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); + + let client = Self { + http_client, + cmd_tx, + }; + + let driver = HybridClientDriver { + ws_url, + retry_delay, + cmd_rx, + }; + + Ok((client, driver)) + } +} + +#[async_trait] +impl Client for HybridClient { + async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + self.http_client.perform(request).await + } +} + +#[async_trait] +impl SubscriptionClient for HybridClient { + async fn subscribe(&self, query: Query) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.cmd_tx + .send(DriverCommand::Subscribe(query, tx)) + .map_err(|_| Error::channel_send())?; + + rx.await + .map_err(|e| Error::client_internal(e.to_string()))? + } + + async fn unsubscribe(&self, query: Query) -> Result<(), Error> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.cmd_tx + .send(DriverCommand::Unsubscribe(query, tx)) + .map_err(|_| Error::channel_send())?; + + rx.await + .map_err(|e| Error::client_internal(e.to_string()))? + } + + fn close(self) -> Result<(), Error> { + self.cmd_tx + .send(DriverCommand::Close) + .map_err(|_| Error::channel_send()) + } +} + +impl HybridClientDriver { + pub async fn run(mut self) { + let mut client = self.ws_client().await; + + while let Some(cmd) = self.cmd_rx.recv().await { + match cmd { + DriverCommand::Subscribe(query, tx) => { + client = self + .send_loop(client, tx, |client| { + let query = query.clone(); + Box::pin(async move { client.subscribe(query.clone()).await }) + }) + .await; + } + DriverCommand::Unsubscribe(query, tx) => { + client = self + .send_loop(client, tx, |client| { + let query = query.clone(); + Box::pin(async move { client.unsubscribe(query.clone()).await }) + }) + .await; + } + DriverCommand::Close => { + break; + } + } + } + let _ = client.close(); + } + + /// Try to send something to the socket. If it fails, reconnect and send again. + async fn send_loop( + &self, + mut client: WebSocketClient, + tx: tokio::sync::oneshot::Sender>, + f: F, + ) -> WebSocketClient + where + F: Fn(WebSocketClient) -> Pin> + Send>>, + { + loop { + match f(client.clone()).await { + Err(e) if matches!(e.detail(), ErrorDetail::ChannelSend(_)) => { + client = self.ws_client().await; + } + res => { + let _ = tx.send(res); + return client; + } + } + } + } + + /// Connect to the WebSocket and start the driver, returning the client. + async fn ws_client(&self) -> WebSocketClient { + let (client, driver) = self.ws_connect().await; + tokio::spawn(async move { driver.run().await }); + client + } + + /// Try connecting repeatedly until it succeeds. + async fn ws_connect(&self) -> (WebSocketClient, WebSocketClientDriver) { + let url: Url = self.ws_url.clone().into(); + loop { + match ws_client(url.clone()).await { + Ok(cd) => { + return cd; + } + Err(e) => { + tracing::warn!( + error = e.to_string(), + url = url.to_string(), + "failed to connect to Tendermint WebSocket; retrying in {}s...", + self.retry_delay.as_secs() + ); + tokio::time::sleep(self.retry_delay).await; + } + } + } + } +} diff --git a/fendermint/eth/api/src/lib.rs b/fendermint/eth/api/src/lib.rs index 3df7d0ff..84a6bc87 100644 --- a/fendermint/eth/api/src/lib.rs +++ b/fendermint/eth/api/src/lib.rs @@ -6,10 +6,10 @@ use axum::routing::{get, post}; use fvm_shared::econ::TokenAmount; use jsonrpc_v2::Data; use std::{net::ToSocketAddrs, sync::Arc, time::Duration}; -use tendermint_rpc::WebSocketClient; mod apis; mod cache; +mod client; mod conv; mod error; mod filters; @@ -17,6 +17,8 @@ mod gas; mod handlers; mod state; +pub use client::{HybridClient, HybridClientDriver}; + use error::{error, JsonRpcError}; use state::JsonRpcState; @@ -29,7 +31,7 @@ type JsonRpcResult = Result; #[derive(Clone)] pub struct AppState { pub rpc_server: JsonRpcServer, - pub rpc_state: Arc>, + pub rpc_state: Arc>, } #[derive(Debug, Clone)] @@ -42,7 +44,7 @@ pub struct GasOpt { /// Start listening to JSON-RPC requests. pub async fn listen( listen_addr: A, - client: WebSocketClient, + client: HybridClient, filter_timeout: Duration, cache_capacity: usize, gas_opt: GasOpt, @@ -71,7 +73,7 @@ pub async fn listen( } /// Register method handlers with the JSON-RPC server construct. -fn make_server(state: Arc>) -> JsonRpcServer { +fn make_server(state: Arc>) -> JsonRpcServer { let server = jsonrpc_v2::Server::new().with_data(Data(state)); let server = apis::register_methods(server); server.finish() diff --git a/fendermint/rpc/src/client.rs b/fendermint/rpc/src/client.rs index 8933b6de..0fa44f9d 100644 --- a/fendermint/rpc/src/client.rs +++ b/fendermint/rpc/src/client.rs @@ -1,6 +1,7 @@ // Copyright 2022-2023 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT +use std::fmt::Display; use std::marker::PhantomData; use anyhow::{anyhow, Context}; @@ -9,7 +10,7 @@ use fendermint_vm_message::chain::ChainMessage; use tendermint::abci::response::DeliverTx; use tendermint::block::Height; use tendermint_rpc::{endpoint::abci_query::AbciQuery, Client, HttpClient, Scheme, Url}; -use tendermint_rpc::{WebSocketClient, WebSocketClientDriver}; +use tendermint_rpc::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl}; use fendermint_vm_message::query::{FvmQuery, FvmQueryHeight}; @@ -70,7 +71,10 @@ pub fn http_client(url: Url, proxy_url: Option) -> anyhow::Result anyhow::Result<(WebSocketClient, WebSocketClientDriver)> { +pub async fn ws_client(url: U) -> anyhow::Result<(WebSocketClient, WebSocketClientDriver)> +where + U: TryInto + Display + Clone, +{ // TODO: Doesn't handle proxy. tracing::debug!("Using WS client to submit request to: {}", url); diff --git a/fendermint/testing/smoke-test/Makefile.toml b/fendermint/testing/smoke-test/Makefile.toml index d86435f9..7dbf7a20 100644 --- a/fendermint/testing/smoke-test/Makefile.toml +++ b/fendermint/testing/smoke-test/Makefile.toml @@ -30,7 +30,7 @@ EOF [tasks.test] clear = true -dependencies = ["simplecoin-example", "ethapi-example"] +dependencies = ["simplecoin-example", "ethers-example"] [tasks.simplecoin-example] @@ -43,7 +43,7 @@ cargo run -p fendermint_rpc --release --example simplecoin -- \ """ -[tasks.ethapi-example] +[tasks.ethers-example] script = """ cd ${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY} cargo run -p fendermint_eth_api --release --example ethers -- \ diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml index 8a1489f4..5a0bd93a 100644 --- a/infra/docker-compose.yml +++ b/infra/docker-compose.yml @@ -44,6 +44,7 @@ services: image: "fendermint:latest" command: "eth run" environment: + - TENDERMINT_RPC_URL=http://cometbft-node${NODE_ID}:26657 - TENDERMINT_WS_URL=ws://cometbft-node${NODE_ID}:26657/websocket - LOG_LEVEL=debug - RUST_BACKTRACE=1 diff --git a/infra/scripts/ethapi.toml b/infra/scripts/ethapi.toml index 756b904f..6803e470 100644 --- a/infra/scripts/ethapi.toml +++ b/infra/scripts/ethapi.toml @@ -7,6 +7,7 @@ docker run \ --user $(id -u) \ --network ${NETWORK_NAME} \ --publish ${ETHAPI_HOST_PORT}:8545 \ + --env TENDERMINT_RPC_URL=http://${CMT_CONTAINER_NAME}:26657 \ --env TENDERMINT_WS_URL=ws://${CMT_CONTAINER_NAME}:26657/websocket \ --env LOG_LEVEL=${ETHAPI_LOG_LEVEL} \ --env RUST_BACKTRACE=1 \