From 3166e28aa53eecfa1a1c7be86bd0917b7e474bd6 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Thu, 15 Feb 2024 02:07:45 +0100 Subject: [PATCH 1/3] chore: use `impl Future` in `PubSubConnect` --- crates/pubsub/src/connect.rs | 11 ++++++----- crates/pubsub/src/frontend.rs | 8 ++++---- crates/pubsub/src/managers/in_flight.rs | 6 +++--- crates/pubsub/src/service.rs | 6 +++--- crates/rpc-client/src/batch.rs | 6 +++--- crates/rpc-client/src/builder.rs | 14 +++++++------- crates/rpc-client/src/client.rs | 8 +++----- crates/transport-ipc/src/connect.rs | 20 +++++++------------- crates/transport-ws/src/native.rs | 21 +++++++++------------ crates/transport-ws/src/wasm.rs | 19 +++++++------------ crates/transport/Cargo.toml | 1 + crates/transport/src/connect.rs | 15 ++++----------- 12 files changed, 57 insertions(+), 78 deletions(-) diff --git a/crates/pubsub/src/connect.rs b/crates/pubsub/src/connect.rs index 5e74f08d9e0..ddb061b07dd 100644 --- a/crates/pubsub/src/connect.rs +++ b/crates/pubsub/src/connect.rs @@ -1,5 +1,6 @@ use crate::{handle::ConnectionHandle, service::PubSubService, PubSubFrontend}; -use alloy_transport::{Pbf, TransportError}; +use alloy_transport::TransportResult; +use std::future::Future; /// Configuration objects that contain connection details for a backend. /// @@ -15,19 +16,19 @@ pub trait PubSubConnect: Sized + Send + Sync + 'static { /// [`ConnectionInterface`], and return the corresponding handle. /// /// [`ConnectionInterface`]: crate::ConnectionInterface - fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, ConnectionHandle, TransportError>; + fn connect(&self) -> impl Future> + Send; /// Attempt to reconnect the transport. /// /// Override this to add custom reconnection logic to your connector. This /// will be used by the internal pubsub connection managers in the event the /// connection fails. - fn try_reconnect<'a: 'b, 'b>(&'a self) -> Pbf<'b, ConnectionHandle, TransportError> { + fn try_reconnect(&self) -> impl Future> + Send { self.connect() } /// Convert the configuration object into a service with a running backend. - fn into_service(self) -> Pbf<'static, PubSubFrontend, TransportError> { - Box::pin(PubSubService::connect(self)) + fn into_service(self) -> impl Future> + Send { + PubSubService::connect(self) } } diff --git a/crates/pubsub/src/frontend.rs b/crates/pubsub/src/frontend.rs index 06477611404..870cd1c7a1a 100644 --- a/crates/pubsub/src/frontend.rs +++ b/crates/pubsub/src/frontend.rs @@ -1,7 +1,7 @@ use crate::{ix::PubSubInstruction, managers::InFlight, RawSubscription}; use alloy_json_rpc::{RequestPacket, Response, ResponsePacket, SerializedRequest}; use alloy_primitives::U256; -use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut, TransportResult}; use futures::{future::try_join_all, FutureExt, TryFutureExt}; use std::{ future::Future, @@ -31,7 +31,7 @@ impl PubSubFrontend { pub fn get_subscription( &self, id: U256, - ) -> impl Future> + Send + 'static { + ) -> impl Future> + Send + 'static { let backend_tx = self.tx.clone(); async move { let (tx, rx) = oneshot::channel(); @@ -43,7 +43,7 @@ impl PubSubFrontend { } /// Unsubscribe from a subscription. - pub fn unsubscribe(&self, id: U256) -> Result<(), TransportError> { + pub fn unsubscribe(&self, id: U256) -> TransportResult<()> { self.tx .send(PubSubInstruction::Unsubscribe(id)) .map_err(|_| TransportErrorKind::backend_gone()) @@ -53,7 +53,7 @@ impl PubSubFrontend { pub fn send( &self, req: SerializedRequest, - ) -> impl Future> + Send + 'static { + ) -> impl Future> + Send + 'static { let tx = self.tx.clone(); let channel_size = self.channel_size; diff --git a/crates/pubsub/src/managers/in_flight.rs b/crates/pubsub/src/managers/in_flight.rs index 6b405b35bc9..fe466f91ba2 100644 --- a/crates/pubsub/src/managers/in_flight.rs +++ b/crates/pubsub/src/managers/in_flight.rs @@ -1,6 +1,6 @@ use alloy_json_rpc::{Response, ResponsePayload, SerializedRequest}; use alloy_primitives::U256; -use alloy_transport::TransportError; +use alloy_transport::{TransportError, TransportResult}; use std::fmt; use tokio::sync::oneshot; @@ -16,7 +16,7 @@ pub(crate) struct InFlight { pub(crate) channel_size: usize, /// The channel to send the response on. - pub(crate) tx: oneshot::Sender>, + pub(crate) tx: oneshot::Sender>, } impl fmt::Debug for InFlight { @@ -34,7 +34,7 @@ impl InFlight { pub(crate) fn new( request: SerializedRequest, channel_size: usize, - ) -> (Self, oneshot::Receiver>) { + ) -> (Self, oneshot::Receiver>) { let (tx, rx) = oneshot::channel(); (Self { request, channel_size, tx }, rx) diff --git a/crates/pubsub/src/service.rs b/crates/pubsub/src/service.rs index c4dcf50a218..a57cbd67791 100644 --- a/crates/pubsub/src/service.rs +++ b/crates/pubsub/src/service.rs @@ -8,7 +8,7 @@ use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload}; use alloy_primitives::U256; use alloy_transport::{ utils::{to_json_raw_value, Spawnable}, - TransportError, TransportErrorKind, TransportResult, + TransportErrorKind, TransportResult, }; use serde_json::value::RawValue; use tokio::sync::{mpsc, oneshot}; @@ -35,7 +35,7 @@ pub(crate) struct PubSubService { impl PubSubService { /// Create a new service from a connector. - pub(crate) async fn connect(connector: T) -> Result { + pub(crate) async fn connect(connector: T) -> TransportResult { let handle = connector.connect().await?; let (tx, reqs) = mpsc::unbounded_channel(); @@ -51,7 +51,7 @@ impl PubSubService { } /// Reconnect by dropping the backend and creating a new one. - async fn get_new_backend(&mut self) -> Result { + async fn get_new_backend(&mut self) -> TransportResult { let mut handle = self.connector.try_reconnect().await?; std::mem::swap(&mut self.handle, &mut handle); Ok(handle) diff --git a/crates/rpc-client/src/batch.rs b/crates/rpc-client/src/batch.rs index c1512efa63d..0e715b5070d 100644 --- a/crates/rpc-client/src/batch.rs +++ b/crates/rpc-client/src/batch.rs @@ -107,7 +107,7 @@ impl<'a, T> BatchRequest<'a, T> { fn push( &mut self, request: Request, - ) -> Result, TransportError> { + ) -> TransportResult> { let ser = request.serialize().map_err(TransportError::ser_err)?; Ok(self.push_raw(ser).into()) } @@ -127,7 +127,7 @@ where &mut self, method: &'static str, params: &Params, - ) -> Result, TransportError> { + ) -> TransportResult> { let request = self.transport.make_request(method, Cow::Borrowed(params)); self.push(request) } @@ -245,7 +245,7 @@ impl Future for BatchFuture where T: Transport + Clone, { - type Output = Result<(), TransportError>; + type Output = TransportResult<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { if matches!(*self.as_mut(), BatchFuture::Prepared { .. }) { diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 1bbcd2baecc..212915c7b18 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -1,6 +1,6 @@ use crate::RpcClient; use alloy_transport::{ - BoxTransport, BoxTransportConnect, Transport, TransportConnect, TransportError, + BoxTransport, BoxTransportConnect, Transport, TransportConnect, TransportResult, }; use tower::{ layer::util::{Identity, Stack}, @@ -74,10 +74,10 @@ impl ClientBuilder { self.transport(transport, is_local) } - #[cfg(feature = "pubsub")] /// Connect a pubsub transport, producing an [`RpcClient`] with the provided /// connection. - pub async fn pubsub(self, pubsub_connect: C) -> Result, TransportError> + #[cfg(feature = "pubsub")] + pub async fn pubsub(self, pubsub_connect: C) -> TransportResult> where C: alloy_pubsub::PubSubConnect, L: Layer, @@ -88,13 +88,13 @@ impl ClientBuilder { Ok(self.transport(transport, is_local)) } - #[cfg(feature = "ws")] /// Connect a WS transport, producing an [`RpcClient`] with the provided /// connection + #[cfg(feature = "ws")] pub async fn ws( self, ws_connect: alloy_transport_ws::WsConnect, - ) -> Result, TransportError> + ) -> TransportResult> where L: Layer, L::Service: Transport, @@ -104,7 +104,7 @@ impl ClientBuilder { /// Connect a transport, producing an [`RpcClient`] with the provided /// connection. - pub async fn connect(self, connect: C) -> Result, TransportError> + pub async fn connect(self, connect: C) -> TransportResult> where C: TransportConnect, L: Layer, @@ -116,7 +116,7 @@ impl ClientBuilder { /// Connect a transport, producing an [`RpcClient`] with a [`BoxTransport`] /// connection. - pub async fn connect_boxed(self, connect: C) -> Result, TransportError> + pub async fn connect_boxed(self, connect: C) -> TransportResult> where C: BoxTransportConnect, L: Layer, diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index 0cddf0251a8..b1126235569 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -1,6 +1,6 @@ use crate::{BatchRequest, ClientBuilder, RpcCall}; use alloy_json_rpc::{Id, Request, RpcParam, RpcReturn}; -use alloy_transport::{BoxTransport, Transport, TransportConnect, TransportError}; +use alloy_transport::{BoxTransport, Transport, TransportConnect, TransportResult}; use alloy_transport_http::Http; use std::sync::atomic::{AtomicU64, Ordering}; use tower::{layer::util::Identity, ServiceBuilder}; @@ -41,7 +41,7 @@ impl RpcClient { } /// Connect to a transport via a [`TransportConnect`] implementor. - pub async fn connect(connect: C) -> Result + pub async fn connect(connect: C) -> TransportResult where T: Transport, C: TransportConnect, @@ -148,9 +148,7 @@ mod pubsub_impl { } /// Connect to a transport via a [`PubSubConnect`] implementor. - pub async fn connect_pubsub( - connect: C, - ) -> Result, TransportError> + pub async fn connect_pubsub(connect: C) -> TransportResult> where C: PubSubConnect, { diff --git a/crates/transport-ipc/src/connect.rs b/crates/transport-ipc/src/connect.rs index 7b5bfc5290e..1371aa64dcf 100644 --- a/crates/transport-ipc/src/connect.rs +++ b/crates/transport-ipc/src/connect.rs @@ -3,8 +3,8 @@ use std::{ path::PathBuf, }; -#[derive(Debug, Clone)] /// An IPC Connection object. +#[derive(Debug, Clone)] pub struct IpcConnect { inner: T, } @@ -28,18 +28,12 @@ macro_rules! impl_connect { true } - fn connect<'a: 'b, 'b>( - &'a self, - ) -> alloy_transport::Pbf< - 'b, - alloy_pubsub::ConnectionHandle, - alloy_transport::TransportError, - > { - Box::pin(async move { - crate::IpcBackend::connect(&self.inner) - .await - .map_err(alloy_transport::TransportErrorKind::custom) - }) + async fn connect( + &self, + ) -> Result { + crate::IpcBackend::connect(&self.inner) + .await + .map_err(alloy_transport::TransportErrorKind::custom) } } }; diff --git a/crates/transport-ws/src/native.rs b/crates/transport-ws/src/native.rs index 6ec9fd16952..3196c869b87 100644 --- a/crates/transport-ws/src/native.rs +++ b/crates/transport-ws/src/native.rs @@ -1,6 +1,6 @@ use crate::WsBackend; use alloy_pubsub::PubSubConnect; -use alloy_transport::{utils::Spawnable, Authorization, Pbf, TransportError, TransportErrorKind}; +use alloy_transport::{utils::Spawnable, Authorization, TransportErrorKind, TransportResult}; use futures::{SinkExt, StreamExt}; use serde_json::value::RawValue; use std::time::Duration; @@ -42,21 +42,18 @@ impl PubSubConnect for WsConnect { alloy_transport::utils::guess_local_url(&self.url) } - fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, alloy_pubsub::ConnectionHandle, TransportError> { + async fn connect(&self) -> TransportResult { let request = self.clone().into_client_request(); + let req = request.map_err(TransportErrorKind::custom)?; + let (socket, _) = + tokio_tungstenite::connect_async(req).await.map_err(TransportErrorKind::custom)?; - Box::pin(async move { - let req = request.map_err(TransportErrorKind::custom)?; - let (socket, _) = - tokio_tungstenite::connect_async(req).await.map_err(TransportErrorKind::custom)?; + let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); + let backend = WsBackend { socket, interface }; - let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); - let backend = WsBackend { socket, interface }; + backend.spawn(); - backend.spawn(); - - Ok(handle) - }) + Ok(handle) } } diff --git a/crates/transport-ws/src/wasm.rs b/crates/transport-ws/src/wasm.rs index b7ca79ad1ed..f822dcda773 100644 --- a/crates/transport-ws/src/wasm.rs +++ b/crates/transport-ws/src/wasm.rs @@ -20,21 +20,16 @@ impl PubSubConnect for WsConnect { alloy_transport::utils::guess_local_url(&self.url) } - fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, alloy_pubsub::ConnectionHandle, TransportError> { - Box::pin(async move { - let socket = WsMeta::connect(&self.url, None) - .await - .map_err(TransportErrorKind::custom)? - .1 - .fuse(); + async fn connect(&self) -> TransportResult { + let socket = + WsMeta::connect(&self.url, None).await.map_err(TransportErrorKind::custom)?.1.fuse(); - let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); - let backend = WsBackend { socket, interface }; + let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); + let backend = WsBackend { socket, interface }; - backend.spawn(); + backend.spawn(); - Ok(handle) - }) + Ok(handle) } } diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 85a06b3e992..2ace36ba033 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -15,6 +15,7 @@ exclude.workspace = true alloy-json-rpc.workspace = true base64.workspace = true +futures-util.workspace = true serde_json = { workspace = true, features = ["raw_value"] } serde.workspace = true thiserror.workspace = true diff --git a/crates/transport/src/connect.rs b/crates/transport/src/connect.rs index fc4f8fdc281..48f84ee585f 100644 --- a/crates/transport/src/connect.rs +++ b/crates/transport/src/connect.rs @@ -1,4 +1,5 @@ use crate::{BoxTransport, Pbf, Transport, TransportError}; +use futures_util::{FutureExt, TryFutureExt}; /// Connection details for a transport. /// @@ -41,23 +42,15 @@ pub trait BoxTransportConnect { fn get_boxed_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, BoxTransport, TransportError>; } -impl BoxTransportConnect for T -where - T: TransportConnect, -{ +impl BoxTransportConnect for T { fn is_local(&self) -> bool { TransportConnect::is_local(self) } fn get_boxed_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, BoxTransport, TransportError> { - Box::pin(async move { self.get_transport().await.map(Transport::boxed) }) + self.get_transport().map_ok(Transport::boxed).boxed() } } #[cfg(test)] -mod test { - use super::*; - fn __compile_check(_: Box) { - todo!() - } -} +fn _object_safe(_: Box) {} From 40eb8192c6c919683f2a4e6d620e6c2ea777089a Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Thu, 15 Feb 2024 02:14:05 +0100 Subject: [PATCH 2/3] fix: wasm --- crates/transport/src/connect.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/transport/src/connect.rs b/crates/transport/src/connect.rs index 48f84ee585f..cbf761c0f53 100644 --- a/crates/transport/src/connect.rs +++ b/crates/transport/src/connect.rs @@ -1,5 +1,5 @@ use crate::{BoxTransport, Pbf, Transport, TransportError}; -use futures_util::{FutureExt, TryFutureExt}; +use futures_util::TryFutureExt; /// Connection details for a transport. /// @@ -48,7 +48,7 @@ impl BoxTransportConnect for T { } fn get_boxed_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, BoxTransport, TransportError> { - self.get_transport().map_ok(Transport::boxed).boxed() + Box::pin(self.get_transport().map_ok(Transport::boxed)) } } From 30fa776a3819144c078a340231728f528273b842 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Thu, 15 Feb 2024 03:02:26 +0100 Subject: [PATCH 3/3] fix: wasm 2 --- crates/pubsub/src/connect.rs | 9 ++++----- crates/transport-ws/src/wasm.rs | 2 +- crates/transport/src/lib.rs | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/crates/pubsub/src/connect.rs b/crates/pubsub/src/connect.rs index ddb061b07dd..7c26355d604 100644 --- a/crates/pubsub/src/connect.rs +++ b/crates/pubsub/src/connect.rs @@ -1,6 +1,5 @@ use crate::{handle::ConnectionHandle, service::PubSubService, PubSubFrontend}; -use alloy_transport::TransportResult; -use std::future::Future; +use alloy_transport::{impl_future, TransportResult}; /// Configuration objects that contain connection details for a backend. /// @@ -16,19 +15,19 @@ pub trait PubSubConnect: Sized + Send + Sync + 'static { /// [`ConnectionInterface`], and return the corresponding handle. /// /// [`ConnectionInterface`]: crate::ConnectionInterface - fn connect(&self) -> impl Future> + Send; + fn connect(&self) -> impl_future!(>); /// Attempt to reconnect the transport. /// /// Override this to add custom reconnection logic to your connector. This /// will be used by the internal pubsub connection managers in the event the /// connection fails. - fn try_reconnect(&self) -> impl Future> + Send { + fn try_reconnect(&self) -> impl_future!(>) { self.connect() } /// Convert the configuration object into a service with a running backend. - fn into_service(self) -> impl Future> + Send { + fn into_service(self) -> impl_future!(>) { PubSubService::connect(self) } } diff --git a/crates/transport-ws/src/wasm.rs b/crates/transport-ws/src/wasm.rs index f822dcda773..6fc965187f7 100644 --- a/crates/transport-ws/src/wasm.rs +++ b/crates/transport-ws/src/wasm.rs @@ -1,6 +1,6 @@ use super::WsBackend; use alloy_pubsub::PubSubConnect; -use alloy_transport::{utils::Spawnable, Pbf, TransportError, TransportErrorKind}; +use alloy_transport::{utils::Spawnable, TransportErrorKind, TransportResult}; use futures::{ sink::SinkExt, stream::{Fuse, StreamExt}, diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 1cca6314114..7a7627f7f15 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -55,6 +55,14 @@ mod type_aliases { /// Future for RPC-level requests. pub type RpcFut<'a, T> = std::pin::Pin> + Send + 'a>>; + + /// `impl Future` with a `Send` bound. + #[macro_export] + macro_rules! impl_future { + (<$($t:tt)+) => { + impl (::core::future::Future<$($t)+) + Send + }; + } } #[cfg(target_arch = "wasm32")] @@ -73,4 +81,12 @@ mod type_aliases { /// Future for RPC-level requests. pub type RpcFut<'a, T> = std::pin::Pin> + 'a>>; + + /// `impl Future` without a `Send` bound. + #[macro_export] + macro_rules! impl_future { + (<$($t:tt)+) => { + impl ::core::future::Future<$($t)+ + }; + } }