Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-427: HybridClient (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh authored Dec 18, 2023
1 parent 30c1eea commit 287d281
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 52 deletions.
22 changes: 11 additions & 11 deletions fendermint/app/options/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use clap::{Args, Subcommand};
use tendermint_rpc::Url;
use tendermint_rpc::{Url, WebSocketClientUrl};

#[derive(Args, Debug)]
pub struct EthArgs {
Expand All @@ -15,22 +15,22 @@ pub enum EthCommands {
/// Run the Ethereum JSON-RPC facade.
Run {
/// The URL of the Tendermint node's RPC endpoint.
#[arg(
long,
short,
default_value = "http://127.0.0.1:26657",
env = "TENDERMINT_RPC_URL"
)]
http_url: Url,

/// The URL of the Tendermint node's WebSocket endpoint.
#[arg(
long,
short,
default_value = "ws://127.0.0.1:26657/websocket",
env = "TENDERMINT_WS_URL"
)]
url: Url,

/// An optional HTTP/S proxy through which to submit requests to the
/// Tendermint node's RPC endpoint.
#[arg(long)]
proxy_url: Option<Url>,

/// Maximum number of times to try to connect to the websocket.
#[arg(long, short = 'r', default_value = "5")]
connect_max_retries: usize,
ws_url: WebSocketClientUrl,

/// Seconds to wait between trying to connect to the websocket.
#[arg(long, short = 'd', default_value = "5")]
Expand Down
36 changes: 5 additions & 31 deletions fendermint/app/src/cmd/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use std::time::Duration;

use anyhow::Context;
use fendermint_rpc::client::ws_client;
use tendermint_rpc::{Url, WebSocketClient, WebSocketClientDriver};
use fendermint_eth_api::HybridClient;

use crate::{
cmd,
Expand All @@ -16,9 +15,9 @@ use crate::{
cmd! {
EthArgs(self, settings: EthSettings) {
match self.command.clone() {
EthCommands::Run { url, proxy_url:_, connect_max_retries, connect_retry_delay } => {
EthCommands::Run { ws_url, http_url, connect_retry_delay } => {

let (client, driver) = ws_connect(url, connect_max_retries, Duration::from_secs(connect_retry_delay)).await.context("failed to connect to Tendermint")?;
let (client, driver) = HybridClient::new(http_url, ws_url, Duration::from_secs(connect_retry_delay)).context("failed to create HybridClient")?;

let driver_handle = tokio::spawn(async move { driver.run().await });

Expand All @@ -32,8 +31,8 @@ cmd! {
}
}

/// Run the Ethereum
async fn run(settings: EthSettings, client: WebSocketClient) -> anyhow::Result<()> {
/// Run the Ethereum API facade.
async fn run(settings: EthSettings, client: HybridClient) -> anyhow::Result<()> {
let gas = fendermint_eth_api::GasOpt {
min_gas_premium: settings.gas.min_gas_premium,
num_blocks_max_prio_fee: settings.gas.num_blocks_max_prio_fee,
Expand All @@ -48,28 +47,3 @@ async fn run(settings: EthSettings, client: WebSocketClient) -> anyhow::Result<(
)
.await
}

/// Try connecting repeatedly until it succeeds.
async fn ws_connect(
url: Url,
max_retries: usize,
retry_delay: Duration,
) -> anyhow::Result<(WebSocketClient, WebSocketClientDriver)> {
let mut retry = 0;
loop {
match ws_client(url.clone()).await {
Ok(cd) => {
return Ok(cd);
}
Err(e) => {
if retry >= max_retries {
return Err(e);
} else {
tracing::warn!("failed to connect to Tendermint; retrying...");
retry += 1;
tokio::time::sleep(retry_delay).await;
}
}
}
}
}
1 change: 1 addition & 0 deletions fendermint/eth/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
ethers-core = { workspace = true }
erased-serde = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions fendermint/eth/api/src/apis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
// See https://ethereum.org/en/developers/docs/apis/json-rpc/#json-rpc-methods
// and https://ethereum.github.io/execution-apis/api-documentation/

use crate::HybridClient;
use jsonrpc_v2::{MapRouter, ServerBuilder};
use paste::paste;
use tendermint_rpc::WebSocketClient;

mod eth;
mod net;
Expand All @@ -18,7 +18,7 @@ macro_rules! with_methods {
$server
$(.with_method(
stringify!([< $module _ $method >]),
$module :: [< $method:snake >] ::<WebSocketClient>
$module :: [< $method:snake >] ::<HybridClient>
))*
}
};
Expand Down
193 changes: 193 additions & 0 deletions fendermint/eth/api/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{pin::Pin, time::Duration};

use anyhow::Context;
use async_trait::async_trait;
use fendermint_rpc::client::{http_client, ws_client};
use futures::Future;
use tendermint_rpc::{
error::ErrorDetail, query::Query, Client, Error, HttpClient, SimpleRequest, Subscription,
SubscriptionClient, Url, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl,
};

/// A mixed HTTP and WebSocket client. Uses HTTP to perform all
/// the JSON-RPC requests except the ones which require subscription,
/// which go through a WebSocket client.
///
/// The WebSocket client is expected to lose connection with CometBFT,
/// in which case it will be re-established in the background.
///
/// Existing subscriptions should receive an error and they can try
/// re-subscribing through the Ethereum API facade, which should create
/// new subscriptions through a fresh CometBFT client.
#[derive(Clone)]
pub struct HybridClient {
http_client: HttpClient,
cmd_tx: tokio::sync::mpsc::UnboundedSender<DriverCommand>,
}

pub struct HybridClientDriver {
ws_url: WebSocketClientUrl,
retry_delay: Duration,
cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DriverCommand>,
}

enum DriverCommand {
Subscribe(
Query,
tokio::sync::oneshot::Sender<Result<Subscription, Error>>,
),
Unsubscribe(Query, tokio::sync::oneshot::Sender<Result<(), Error>>),
Close,
}

impl HybridClient {
pub fn new(
http_url: Url,
ws_url: WebSocketClientUrl,
retry_delay: Duration,
) -> anyhow::Result<(Self, HybridClientDriver)> {
let http_client =
http_client(http_url, None).context("failed to create Tendermint client")?;

let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();

let client = Self {
http_client,
cmd_tx,
};

let driver = HybridClientDriver {
ws_url,
retry_delay,
cmd_rx,
};

Ok((client, driver))
}
}

#[async_trait]
impl Client for HybridClient {
async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest,
{
self.http_client.perform(request).await
}
}

#[async_trait]
impl SubscriptionClient for HybridClient {
async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.cmd_tx
.send(DriverCommand::Subscribe(query, tx))
.map_err(|_| Error::channel_send())?;

rx.await
.map_err(|e| Error::client_internal(e.to_string()))?
}

async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.cmd_tx
.send(DriverCommand::Unsubscribe(query, tx))
.map_err(|_| Error::channel_send())?;

rx.await
.map_err(|e| Error::client_internal(e.to_string()))?
}

fn close(self) -> Result<(), Error> {
self.cmd_tx
.send(DriverCommand::Close)
.map_err(|_| Error::channel_send())
}
}

impl HybridClientDriver {
pub async fn run(mut self) {
let mut client = self.ws_client().await;

while let Some(cmd) = self.cmd_rx.recv().await {
match cmd {
DriverCommand::Subscribe(query, tx) => {
client = self
.send_loop(client, tx, |client| {
let query = query.clone();
Box::pin(async move { client.subscribe(query.clone()).await })
})
.await;
}
DriverCommand::Unsubscribe(query, tx) => {
client = self
.send_loop(client, tx, |client| {
let query = query.clone();
Box::pin(async move { client.unsubscribe(query.clone()).await })
})
.await;
}
DriverCommand::Close => {
break;
}
}
}
let _ = client.close();
}

/// Try to send something to the socket. If it fails, reconnect and send again.
async fn send_loop<F, T>(
&self,
mut client: WebSocketClient,
tx: tokio::sync::oneshot::Sender<Result<T, Error>>,
f: F,
) -> WebSocketClient
where
F: Fn(WebSocketClient) -> Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>,
{
loop {
match f(client.clone()).await {
Err(e) if matches!(e.detail(), ErrorDetail::ChannelSend(_)) => {
client = self.ws_client().await;
}
res => {
let _ = tx.send(res);
return client;
}
}
}
}

/// Connect to the WebSocket and start the driver, returning the client.
async fn ws_client(&self) -> WebSocketClient {
let (client, driver) = self.ws_connect().await;
tokio::spawn(async move { driver.run().await });
client
}

/// Try connecting repeatedly until it succeeds.
async fn ws_connect(&self) -> (WebSocketClient, WebSocketClientDriver) {
let url: Url = self.ws_url.clone().into();
loop {
match ws_client(url.clone()).await {
Ok(cd) => {
return cd;
}
Err(e) => {
tracing::warn!(
error = e.to_string(),
url = url.to_string(),
"failed to connect to Tendermint WebSocket; retrying in {}s...",
self.retry_delay.as_secs()
);
tokio::time::sleep(self.retry_delay).await;
}
}
}
}
}
10 changes: 6 additions & 4 deletions fendermint/eth/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ use axum::routing::{get, post};
use fvm_shared::econ::TokenAmount;
use jsonrpc_v2::Data;
use std::{net::ToSocketAddrs, sync::Arc, time::Duration};
use tendermint_rpc::WebSocketClient;

mod apis;
mod cache;
mod client;
mod conv;
mod error;
mod filters;
mod gas;
mod handlers;
mod state;

pub use client::{HybridClient, HybridClientDriver};

use error::{error, JsonRpcError};
use state::JsonRpcState;

Expand All @@ -29,7 +31,7 @@ type JsonRpcResult<T> = Result<T, JsonRpcError>;
#[derive(Clone)]
pub struct AppState {
pub rpc_server: JsonRpcServer,
pub rpc_state: Arc<JsonRpcState<WebSocketClient>>,
pub rpc_state: Arc<JsonRpcState<HybridClient>>,
}

#[derive(Debug, Clone)]
Expand All @@ -42,7 +44,7 @@ pub struct GasOpt {
/// Start listening to JSON-RPC requests.
pub async fn listen<A: ToSocketAddrs>(
listen_addr: A,
client: WebSocketClient,
client: HybridClient,
filter_timeout: Duration,
cache_capacity: usize,
gas_opt: GasOpt,
Expand Down Expand Up @@ -71,7 +73,7 @@ pub async fn listen<A: ToSocketAddrs>(
}

/// Register method handlers with the JSON-RPC server construct.
fn make_server(state: Arc<JsonRpcState<WebSocketClient>>) -> JsonRpcServer {
fn make_server(state: Arc<JsonRpcState<HybridClient>>) -> JsonRpcServer {
let server = jsonrpc_v2::Server::new().with_data(Data(state));
let server = apis::register_methods(server);
server.finish()
Expand Down
Loading

0 comments on commit 287d281

Please sign in to comment.