diff --git a/.github/scripts/install_test_binaries.sh b/.github/scripts/install_test_binaries.sh new file mode 100755 index 00000000000..39a1f869171 --- /dev/null +++ b/.github/scripts/install_test_binaries.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Installs Solc and Geth binaries +# Note: intended for use only with CI (x86_64 Ubuntu, MacOS or Windows) +set -e + +GETH_BUILD=${GETH_BUILD:-"1.11.2-73b01f40"} + +BIN_DIR=${BIN_DIR:-"$HOME/bin"} + +PLATFORM="$(uname -s | awk '{print tolower($0)}')" +if [ "$PLATFORM" != "linux" ] && [ "$PLATFORM" != "darwin" ]; then + EXT=".exe" +fi + +main() { + mkdir -p "$BIN_DIR" + cd "$BIN_DIR" + export PATH="$BIN_DIR:$PATH" + if [ "$GITHUB_PATH" ]; then + echo "$BIN_DIR" >> "$GITHUB_PATH" + fi + + install_geth + + echo "" + echo "Installed Geth:" + geth version +} + +# Installs geth from https://geth.ethereum.org/downloads +install_geth() { + case "$PLATFORM" in + linux|darwin) + name="geth-$PLATFORM-amd64-$GETH_BUILD" + curl -s "https://gethstore.blob.core.windows.net/builds/$name.tar.gz" | tar -xzf - + mv -f "$name/geth" ./ + rm -rf "$name" + chmod +x geth + ;; + *) + name="geth-windows-amd64-$GETH_BUILD" + zip="$name.zip" + curl -so "$zip" "https://gethstore.blob.core.windows.net/builds/$zip" + unzip "$zip" + mv -f "$name/geth.exe" ./ + rm -rf "$name" "$zip" + ;; + esac +} + +main diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3934561434f..726a1aa84c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,9 @@ jobs: uses: foundry-rs/foundry-toolchain@v1 with: version: nightly + - name: Install test binaries + shell: bash + run: ./.github/scripts/install_test_binaries.sh - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true @@ -54,7 +57,8 @@ jobs: with: cache-on-failure: true - name: check - run: cargo check --workspace --target wasm32-unknown-unknown + # Do not run WASM checks on IPC as it doesn't make sense + run: cargo check --workspace --target wasm32-unknown-unknown --exclude "alloy-transport-ipc" feature-checks: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index a332db33199..9e809fbd9ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ alloy-json-rpc = { version = "0.1.0", path = "crates/json-rpc" } alloy-transport = { version = "0.1.0", path = "crates/transport" } alloy-pubsub = { version = "0.1.0", path = "crates/pubsub" } alloy-transport-http = { version = "0.1.0", path = "crates/transport-http" } +alloy-transport-ipc = { version = "0.1.0", path = "crates/transport-ipc" } alloy-transport-ws = { version = "0.1.0", path = "crates/transport-ws" } alloy-networks = { version = "0.1.0", path = "crates/networks" } alloy-rpc-types = { version = "0.1.0", path = "crates/rpc-types" } diff --git a/crates/json-rpc/src/response/error.rs b/crates/json-rpc/src/response/error.rs index 11632f4df6e..6b709239347 100644 --- a/crates/json-rpc/src/response/error.rs +++ b/crates/json-rpc/src/response/error.rs @@ -1,6 +1,6 @@ use serde::{ de::{DeserializeOwned, MapAccess, Visitor}, - Deserialize, Deserializer, + Deserialize, Deserializer, Serialize, }; use serde_json::value::RawValue; use std::{borrow::Borrow, fmt, marker::PhantomData}; @@ -10,7 +10,7 @@ use std::{borrow::Borrow, fmt, marker::PhantomData}; /// This response indicates that the server received and handled the request, /// but that there was an error in the processing of it. The error should be /// included in the `message` field of the response payload. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct ErrorPayload> { /// The error code. pub code: i64, diff --git a/crates/json-rpc/src/response/mod.rs b/crates/json-rpc/src/response/mod.rs index ed84dc689ad..97cbb261be7 100644 --- a/crates/json-rpc/src/response/mod.rs +++ b/crates/json-rpc/src/response/mod.rs @@ -1,7 +1,8 @@ use crate::common::Id; use serde::{ de::{DeserializeOwned, MapAccess, Visitor}, - Deserialize, Deserializer, + ser::SerializeMap, + Deserialize, Deserializer, Serialize, }; use serde_json::value::RawValue; use std::{borrow::Borrow, fmt, marker::PhantomData}; @@ -224,6 +225,30 @@ where } } +impl Serialize for Response +where + Payload: Serialize, + ErrData: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(3))?; + map.serialize_entry("jsonrpc", "2.0")?; + map.serialize_entry("id", &self.id)?; + match &self.payload { + ResponsePayload::Success(result) => { + map.serialize_entry("result", result)?; + } + ResponsePayload::Failure(error) => { + map.serialize_entry("error", error)?; + } + } + map.end() + } +} + #[cfg(test)] mod test { #[test] diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index cbac62f8111..1b822c6afe6 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -30,11 +30,17 @@ reqwest = { workspace = true, optional = true } tokio = { workspace = true, optional = true } url = { workspace = true, optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +alloy-transport-ipc = { workspace = true, optional = true } + [dev-dependencies] alloy-primitives.workspace = true alloy-transport-ws.workspace = true test-log = { version = "0.2.13", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.17", features = ["std", "env-filter"] } +ethers-core = "2.0.10" +alloy-transport-ipc = { workspace = true, features = ["mock"] } +tempfile = "3" [features] default = ["reqwest"] @@ -42,3 +48,4 @@ reqwest = ["dep:url", "dep:reqwest", "alloy-transport-http/reqwest"] hyper = ["dep:url", "dep:hyper", "alloy-transport-http/hyper"] pubsub = ["dep:tokio", "dep:alloy-pubsub", "dep:alloy-primitives"] ws = ["pubsub", "dep:alloy-transport-ws"] +ipc = ["pubsub", "dep:alloy-transport-ipc"] \ No newline at end of file diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index d2ecf786bfc..b603c14f83a 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -131,7 +131,7 @@ where #[cfg(feature = "pubsub")] mod pubsub_impl { use super::*; - use alloy_pubsub::PubSubFrontend; + use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use tokio::sync::broadcast; impl RpcClient { @@ -142,6 +142,16 @@ mod pubsub_impl { ) -> broadcast::Receiver> { self.transport.get_subscription(id).await.unwrap() } + + /// Connect to a transport via a [`PubSubConnect`] implementor. + pub async fn connect_pubsub( + connect: C, + ) -> Result, TransportError> + where + C: PubSubConnect, + { + ClientBuilder::default().pubsub(connect).await + } } } diff --git a/crates/rpc-client/src/lib.rs b/crates/rpc-client/src/lib.rs index 0e479e95ff1..c0a148c4e8c 100644 --- a/crates/rpc-client/src/lib.rs +++ b/crates/rpc-client/src/lib.rs @@ -29,3 +29,9 @@ pub use call::RpcCall; mod client; pub use client::RpcClient; + +#[cfg(feature = "ws")] +pub use alloy_transport_ws::WsConnect; + +#[cfg(all(feature = "ipc", not(target_arch = "wasm32")))] +pub use alloy_transport_ipc::IpcConnect; diff --git a/crates/rpc-client/tests/it/ipc.rs b/crates/rpc-client/tests/it/ipc.rs new file mode 100644 index 00000000000..2a8f15c5e58 --- /dev/null +++ b/crates/rpc-client/tests/it/ipc.rs @@ -0,0 +1,38 @@ +use std::borrow::Cow; + +use alloy_primitives::U64; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::{ClientBuilder, RpcCall, RpcClient}; +use alloy_transport_ipc::IpcConnect; +use ethers_core::utils::{Geth, GethInstance}; +use tempfile::NamedTempFile; + +async fn connect() -> (RpcClient, GethInstance) { + let temp_file = NamedTempFile::new().unwrap(); + let path = temp_file.into_temp_path().to_path_buf(); + let geth = Geth::new().block_time(1u64).ipc_path(&path).spawn(); + + // [Windows named pipes](https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes) + // are located at `\\\pipe\`. + #[cfg(windows)] + let path = format!(r"\\.\pipe\{}", path.display()); + + let connector: IpcConnect<_> = path.into(); + + let client = ClientBuilder::default().pubsub(connector).await.unwrap(); + + (client, geth) +} + +#[test_log::test(tokio::test)] +async fn it_makes_a_request() { + let (client, _geth) = connect().await; + + let params: Cow<'static, _> = Cow::Owned(vec![]); + + let req: RpcCall<_, Cow<'static, Vec>, U64> = client.prepare("eth_blockNumber", params); + + let timeout = tokio::time::timeout(std::time::Duration::from_secs(2), req); + + timeout.await.unwrap().unwrap(); +} diff --git a/crates/rpc-client/tests/it/main.rs b/crates/rpc-client/tests/it/main.rs index c5c35e7bd39..932cf898ee0 100644 --- a/crates/rpc-client/tests/it/main.rs +++ b/crates/rpc-client/tests/it/main.rs @@ -5,3 +5,6 @@ mod http; #[cfg(feature = "pubsub")] mod ws; + +#[cfg(feature = "pubsub")] +mod ipc; diff --git a/crates/rpc-types/src/eth/filter.rs b/crates/rpc-types/src/eth/filter.rs index 6c714e05d0e..fc239c1ba91 100644 --- a/crates/rpc-types/src/eth/filter.rs +++ b/crates/rpc-types/src/eth/filter.rs @@ -736,7 +736,7 @@ impl FilteredParams { } /// Returns `true` if the bloom matches the topics - pub fn matches_topics(bloom: Bloom, topic_filters: &Vec) -> bool { + pub fn matches_topics(bloom: Bloom, topic_filters: &[BloomFilter]) -> bool { if topic_filters.is_empty() { return true; } diff --git a/crates/transport-ipc/Cargo.toml b/crates/transport-ipc/Cargo.toml new file mode 100644 index 00000000000..8a98337e51e --- /dev/null +++ b/crates/transport-ipc/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "alloy-transport-ipc" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +alloy-json-rpc.workspace = true +alloy-transport.workspace = true +alloy-pubsub.workspace = true + +futures.workspace = true +pin-project.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true + +bytes = "1.5.0" +interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] } +serde = { workspace = true, optional = true } +tempfile = { version = "3.8.1", optional = true } + +[features] +default = [] +mock = ["dep:serde", "dep:tempfile"] diff --git a/crates/transport-ipc/README.md b/crates/transport-ipc/README.md new file mode 100644 index 00000000000..5305aaa6414 --- /dev/null +++ b/crates/transport-ipc/README.md @@ -0,0 +1,3 @@ +# alloy-transport-ipc + +IPC transport implementation. diff --git a/crates/transport-ipc/src/connect.rs b/crates/transport-ipc/src/connect.rs new file mode 100644 index 00000000000..f905b76056e --- /dev/null +++ b/crates/transport-ipc/src/connect.rs @@ -0,0 +1,52 @@ +use std::{ + ffi::{CString, OsString}, + path::PathBuf, +}; + +#[derive(Debug, Clone)] +/// An IPC Connection object. +pub struct IpcConnect { + /// + inner: T, +} + +macro_rules! impl_connect { + ($target:ty) => { + impl From<$target> for IpcConnect<$target> { + fn from(inner: $target) -> Self { + Self { inner } + } + } + + impl From> for $target { + fn from(this: IpcConnect<$target>) -> $target { + this.inner + } + } + + impl alloy_pubsub::PubSubConnect for IpcConnect<$target> { + fn is_local(&self) -> bool { + true + } + + fn connect<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf< + 'b, + alloy_pubsub::ConnectionHandle, + alloy_transport::TransportError, + > { + Box::pin(async move { + crate::IpcBackend::connect(&self.inner) + .await + .map_err(alloy_transport::TransportErrorKind::custom) + }) + } + } + }; +} + +impl_connect!(OsString); +impl_connect!(CString); +impl_connect!(PathBuf); +impl_connect!(String); diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs new file mode 100644 index 00000000000..fe550ca1938 --- /dev/null +++ b/crates/transport-ipc/src/lib.rs @@ -0,0 +1,205 @@ +#![doc = include_str!("../README.md")] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/alloy.jpg", + html_favicon_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/favicon.ico" +)] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod connect; +pub use connect::IpcConnect; + +#[cfg(feature = "mock")] +pub mod mock; +#[cfg(feature = "mock")] +pub use mock::MockIpcServer; + +use std::task::Poll::{Pending, Ready}; + +use alloy_json_rpc::PubSubItem; +use bytes::{Buf, BytesMut}; +use futures::{io::BufReader, ready, AsyncBufRead, AsyncRead, AsyncWriteExt, StreamExt}; +use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName}; +use tokio::select; + +type Result = std::result::Result; + +/// An IPC backend task. +struct IpcBackend { + pub(crate) socket: LocalSocketStream, + + pub(crate) interface: alloy_pubsub::ConnectionInterface, +} + +impl IpcBackend { + /// Connect to a local socket. Either a unix socket or a windows named pipe. + async fn connect<'a, I>(name: &I) -> Result + where + // TODO: remove bound on next interprocess crate release + I: ToLocalSocketName<'a> + Clone, + { + let socket = LocalSocketStream::connect(name.clone()).await?; + let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); + + let backend = IpcBackend { socket, interface }; + + backend.spawn(); + + Ok(handle) + } + + fn spawn(mut self) { + let fut = async move { + let (read, mut writer) = self.socket.into_split(); + let mut read = ReadJsonStream::new(read).fuse(); + + let err = loop { + select! { + biased; + item = self.interface.recv_from_frontend() => { + match item { + Some(msg) => { + let bytes = msg.get(); + if let Err(e) = writer.write_all(bytes.as_bytes()).await { + tracing::error!(%e, "Failed to write to IPC socket"); + break true; + } + }, + // dispatcher has gone away, or shutdown was received + None => { + tracing::debug!("Frontend has gone away"); + break false; + }, + } + } + // Read from the socket. + item = read.next() => { + match item { + Some(item) => { + if self.interface.send_to_frontend(item).is_err() { + tracing::debug!("Frontend has gone away"); + break false; + } + } + None => { + tracing::error!("Read stream has failed."); + break true; + } + } + } + } + }; + if err { + self.interface.close_with_error(); + } + }; + + tokio::spawn(fut); + } +} + +/// A stream of JSON-RPC items, read from an [`AsyncRead`] stream. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct ReadJsonStream { + /// The underlying reader. + #[pin] + reader: BufReader, + /// A buffer of bytes read from the reader. + buf: BytesMut, + /// A buffer of items deserialized from the reader. + items: Vec, +} + +impl ReadJsonStream +where + T: AsyncRead, +{ + fn new(reader: T) -> Self { + Self { reader: BufReader::new(reader), buf: BytesMut::with_capacity(4096), items: vec![] } + } +} + +impl From for ReadJsonStream +where + T: AsyncRead, +{ + fn from(reader: T) -> Self { + Self::new(reader) + } +} + +impl futures::stream::Stream for ReadJsonStream +where + T: AsyncRead, +{ + type Item = alloy_json_rpc::PubSubItem; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + + // Deserialize any buffered items. + if !this.buf.is_empty() { + this.reader.consume(this.buf.len()); + + tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data"); + let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter(); + + let item = de.next(); + match item { + Some(Ok(response)) => { + this.items.push(response); + } + Some(Err(e)) => { + tracing::error!(%e, "IPC response contained invalid JSON. Buffer contents will be logged at trace level"); + tracing::trace!( + buffer = %String::from_utf8_lossy(this.buf.as_ref()), + "IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.", + ); + + return Ready(None); + } + None => {} + } + this.buf.advance(de.byte_offset()); + cx.waker().wake_by_ref(); + return Pending; + } + + // Return any buffered items, rewaking. + if !this.items.is_empty() { + // may have more work! + cx.waker().wake_by_ref(); + return Ready(this.items.pop()); + } + + tracing::debug!(buf_len = this.buf.len(), "Polling IPC socket for data"); + + let data = ready!(this.reader.poll_fill_buf(cx)); + match data { + Err(e) => { + tracing::error!(%e, "Failed to read from IPC socket, shutting down"); + Ready(None) + } + Ok(data) => { + tracing::debug!(data_len = data.len(), "Read data from IPC socket"); + this.buf.extend_from_slice(data); + // wake task to run deserialization + cx.waker().wake_by_ref(); + Pending + } + } + } +} diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs new file mode 100644 index 00000000000..9c3a8947c33 --- /dev/null +++ b/crates/transport-ipc/src/mock.rs @@ -0,0 +1,90 @@ +//! Mock IPC server. + +use alloy_json_rpc::Response; +use futures::{AsyncReadExt, AsyncWriteExt}; +use serde::Serialize; +use std::{collections::VecDeque, path::PathBuf}; +use tempfile::NamedTempFile; + +/// Mock IPC server. +/// +/// Currently unix socket only, due to use of namedtempfile. +/// +/// ## Example: +/// +/// ``` +/// use alloy_transport_ipc::MockIpcServer; +/// # fn main() -> Result<(), Box> { +/// // Instantiate a new mock server. +/// let mut server = MockIpcServer::new(); +/// // Get the path to the socket. +/// let path = server.path(); +/// // Add a reply to the server. Can also use `add_raw_reply` to add a raw +/// // byte vector, or `add_response` to add a json-rpc response. +/// server.add_reply("hello"); +/// // Run the server. The first request will get "hello" as a response. +/// MockIpcServer::new().spawn(); +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug)] +pub struct MockIpcServer { + /// Replies to send, in order + replies: VecDeque>, + /// Path to the socket + path: NamedTempFile, +} + +impl Default for MockIpcServer { + fn default() -> Self { + Self::new() + } +} + +impl MockIpcServer { + /// Create a new mock IPC server. + pub fn new() -> Self { + Self { replies: VecDeque::new(), path: NamedTempFile::new().unwrap() } + } + + /// Add a raw reply to the server. + pub fn add_raw_reply(&mut self, reply: Vec) { + self.replies.push_back(reply); + } + + /// Add a reply to the server. + pub fn add_reply(&mut self, s: S) { + let reply = serde_json::to_vec(&s).unwrap(); + self.add_raw_reply(reply); + } + + /// Add a json-rpc response to the server. + pub fn add_response(&mut self, response: Response) { + self.add_reply(response); + } + + /// Get the path to the socket. + pub fn path(&self) -> PathBuf { + self.path.path().to_owned() + } + + /// Run the server. + pub async fn spawn(mut self) { + tokio::spawn(async move { + let socket = interprocess::local_socket::tokio::LocalSocketStream::connect( + self.path.into_temp_path().to_path_buf(), + ) + .await + .unwrap(); + + let (mut reader, mut writer) = socket.into_split(); + + let mut buf = [0u8; 4096]; + loop { + let _ = reader.read(&mut buf).await.unwrap(); + let reply = self.replies.pop_front().unwrap_or_default(); + writer.write_all(&reply).await.unwrap(); + } + }); + } +} diff --git a/deny.toml b/deny.toml index 8d7bf3ea155..a9fa6886dee 100644 --- a/deny.toml +++ b/deny.toml @@ -37,6 +37,7 @@ exceptions = [ # so we prefer to not have dependencies using it # https://tldrlegal.com/license/creative-commons-cc0-1.0-universal { allow = ["CC0-1.0"], name = "tiny-keccak" }, + { allow = ["CC0-1.0"], name = "to_method" } ] [[licenses.clarify]]