Skip to content

Commit

Permalink
chore(deps): update to interprocess 2 (#687)
Browse files Browse the repository at this point in the history
* chore(deps): update to interprocess 2

* cfg

* try

* try2

* fixmock

* deny

* win

* re

* a

* b

* c

* simpler
  • Loading branch information
DaniPopes authored May 5, 2024
1 parent f24af53 commit 4726331
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 65 deletions.
18 changes: 9 additions & 9 deletions crates/node-bindings/src/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,14 @@ impl Geth {
/// This will put the geth instance into non-dev mode, discarding any previously set dev-mode
/// options.
pub fn p2p_port(mut self, port: u16) -> Self {
match self.mode {
match &mut self.mode {
GethMode::Dev(_) => {
self.mode = GethMode::NonDev(PrivateNetOptions {
p2p_port: Some(port),
..Default::default()
})
}
GethMode::NonDev(ref mut opts) => opts.p2p_port = Some(port),
GethMode::NonDev(opts) => opts.p2p_port = Some(port),
}
self
}
Expand Down Expand Up @@ -376,16 +376,16 @@ impl Geth {
}

fn inner_disable_discovery(&mut self) {
match self.mode {
match &mut self.mode {
GethMode::Dev(_) => {
self.mode =
GethMode::NonDev(PrivateNetOptions { discovery: false, ..Default::default() })
}
GethMode::NonDev(ref mut opts) => opts.discovery = false,
GethMode::NonDev(opts) => opts.discovery = false,
}
}

/// Manually sets the IPC path for the socket manually.
/// Sets the IPC path for the socket.
pub fn ipc_path<T: Into<PathBuf>>(mut self, path: T) -> Self {
self.ipc_path = Some(path.into());
self
Expand Down Expand Up @@ -506,7 +506,7 @@ impl Geth {
cmd.arg("--miner.etherbase").arg(format!("{clique_addr:?}"));
}

if let Some(ref genesis) = self.genesis {
if let Some(genesis) = &self.genesis {
// create a temp dir to store the genesis file
let temp_genesis_dir_path = tempdir().map_err(GethError::CreateDirError)?.into_path();

Expand All @@ -524,7 +524,7 @@ impl Geth {
})?;

let mut init_cmd = Command::new(bin_path);
if let Some(ref data_dir) = self.data_dir {
if let Some(data_dir) = &self.data_dir {
init_cmd.arg("--datadir").arg(data_dir);
}

Expand All @@ -548,7 +548,7 @@ impl Geth {
})?;
}

if let Some(ref data_dir) = self.data_dir {
if let Some(data_dir) = &self.data_dir {
cmd.arg("--datadir").arg(data_dir);

// create the directory if it doesn't exist
Expand Down Expand Up @@ -586,7 +586,7 @@ impl Geth {
// debug verbosity is needed to check when peers are added
cmd.arg("--verbosity").arg("4");

if let Some(ref ipc) = self.ipc_path {
if let Some(ipc) = &self.ipc_path {
cmd.arg("--ipcpath").arg(ipc);
}

Expand Down
5 changes: 2 additions & 3 deletions crates/rpc-client/tests/it/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use alloy_node_bindings::Geth;
use alloy_primitives::U64;
use alloy_rpc_client::{ClientBuilder, RpcCall};
use alloy_transport_ipc::IpcConnect;
use std::path::PathBuf;

#[tokio::test]
async fn it_makes_a_request() {
Expand All @@ -15,8 +14,8 @@ async fn it_makes_a_request() {
.data_dir(temp_dir.path())
.spawn();

let connector: IpcConnect<_> = PathBuf::from(geth.ipc_endpoint()).into();
let client = ClientBuilder::default().pubsub(connector).await.unwrap();
let connect = IpcConnect::new(geth.ipc_endpoint());
let client = ClientBuilder::default().pubsub(connect).await.unwrap();

let req: RpcCall<_, (), U64> = client.request("eth_blockNumber", ());
let timeout = tokio::time::timeout(std::time::Duration::from_secs(2), req);
Expand Down
2 changes: 1 addition & 1 deletion crates/transport-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tokio-util = { workspace = true, features = ["io", "compat"] }
tracing.workspace = true

bytes = "1.5.0"
interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] }
interprocess = { version = "2", features = ["tokio"] }
serde = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }

Expand Down
34 changes: 24 additions & 10 deletions crates/transport-ipc/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::{
ffi::{CString, OsString},
path::PathBuf,
};
use interprocess::local_socket as ls;
use std::io;

pub(crate) fn to_name(path: &std::ffi::OsStr) -> io::Result<ls::Name<'_>> {
if cfg!(windows) && !path.as_encoded_bytes().starts_with(br"\\.\pipe\") {
ls::ToNsName::to_ns_name::<ls::GenericNamespaced>(path)
} else {
ls::ToFsName::to_fs_name::<ls::GenericFilePath>(path)
}
}

/// An IPC Connection object.
#[derive(Clone, Debug)]
Expand All @@ -21,7 +27,7 @@ impl<T> IpcConnect<T> {
}

macro_rules! impl_connect {
($target:ty) => {
($target:ty => | $inner:ident | $map:expr) => {
impl From<$target> for IpcConnect<$target> {
fn from(inner: $target) -> Self {
Self { inner }
Expand All @@ -42,15 +48,23 @@ macro_rules! impl_connect {
async fn connect(
&self,
) -> Result<alloy_pubsub::ConnectionHandle, alloy_transport::TransportError> {
crate::IpcBackend::connect(&self.inner)
let $inner = &self.inner;
let inner = $map;
let name = to_name(inner).map_err(alloy_transport::TransportErrorKind::custom)?;
crate::IpcBackend::connect(name)
.await
.map_err(alloy_transport::TransportErrorKind::custom)
}
}
};
}

impl_connect!(OsString);
impl_connect!(CString);
impl_connect!(PathBuf);
impl_connect!(String);
impl_connect!(std::ffi::OsString => |s| s.as_os_str());
impl_connect!(std::path::PathBuf => |s| s.as_os_str());
impl_connect!(String => |s| s.as_ref());

#[cfg(unix)]
impl_connect!(std::ffi::CString => |s| {
use std::os::unix::ffi::OsStrExt;
std::ffi::OsStr::from_bytes(s.to_bytes())
});
54 changes: 19 additions & 35 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
extern crate tracing;

use bytes::{Buf, BytesMut};
use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName};
use futures::{ready, StreamExt};
use interprocess::local_socket::{tokio::prelude::*, Name};
use std::task::Poll::Ready;
use tokio::select;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::{
io::{AsyncRead, AsyncWriteExt},
select,
};
use tokio_util::io::poll_read_buf;

mod connect;
pub use connect::IpcConnect;
Expand All @@ -37,31 +40,24 @@ type Result<T> = std::result::Result<T, std::io::Error>;

/// An IPC backend task.
struct IpcBackend {
pub(crate) socket: LocalSocketStream,
pub(crate) stream: LocalSocketStream,

pub(crate) interface: alloy_pubsub::ConnectionInterface,
}

impl IpcBackend {
/// Connect to a local socket. Either a unix socket or a windows named pipe.
async fn connect<'a, I>(name: &I) -> Result<alloy_pubsub::ConnectionHandle>
where
// TODO: remove bound on next interprocess crate release
I: ToLocalSocketName<'a> + Clone,
{
let socket = LocalSocketStream::connect(name.clone()).await?;
async fn connect(name: Name<'_>) -> Result<alloy_pubsub::ConnectionHandle> {
let stream = LocalSocketStream::connect(name).await?;
let (handle, interface) = alloy_pubsub::ConnectionHandle::new();

let backend = IpcBackend { socket, interface };

let backend = IpcBackend { stream, interface };
backend.spawn();

Ok(handle)
}

fn spawn(mut self) {
let fut = async move {
let (read, mut writer) = self.socket.into_split();
let (read, mut writer) = self.stream.split();
let mut read = ReadJsonStream::new(read).fuse();

let err = loop {
Expand Down Expand Up @@ -118,43 +114,32 @@ const CAPACITY: usize = 4096;
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: tokio_util::compat::Compat<T>,
reader: T,
/// A buffer for reading data from the reader.
buf: BytesMut,
/// Whether the buffer has been drained.
drained: bool,
}

impl<T> ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> ReadJsonStream<T> {
fn new(reader: T) -> Self {
Self { reader: reader.compat(), buf: BytesMut::with_capacity(CAPACITY), drained: true }
Self { reader, buf: BytesMut::with_capacity(CAPACITY), drained: true }
}
}

impl<T> From<T> for ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> From<T> for ReadJsonStream<T> {
fn from(reader: T) -> Self {
Self::new(reader)
}
}

impl<T> futures::stream::Stream for ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> futures::stream::Stream for ReadJsonStream<T> {
type Item = alloy_json_rpc::PubSubItem;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use tokio_util::io::poll_read_buf;

let mut this = self.project();

loop {
Expand Down Expand Up @@ -235,7 +220,6 @@ where
mod tests {
use super::*;
use std::future::poll_fn;
use tokio_util::compat::TokioAsyncReadCompatExt;

#[tokio::test]
async fn test_partial_stream() {
Expand All @@ -248,7 +232,7 @@ mod tests {
.read(r#", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }"#.as_bytes())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Expand All @@ -269,7 +253,7 @@ mod tests {
.read(vec![b'a'; CAPACITY].as_ref())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Expand Down
13 changes: 6 additions & 7 deletions crates/transport-ipc/src/mock.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Mock IPC server.
use alloy_json_rpc::Response;
use futures::{AsyncReadExt, AsyncWriteExt};
use interprocess::local_socket::tokio::prelude::*;
use serde::Serialize;
use std::{collections::VecDeque, path::PathBuf};
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

/// Mock IPC server.
///
Expand Down Expand Up @@ -71,13 +72,11 @@ impl MockIpcServer {
/// Run the server.
pub async fn spawn(mut self) {
tokio::spawn(async move {
let socket = interprocess::local_socket::tokio::LocalSocketStream::connect(
self.path.into_temp_path().to_path_buf(),
)
.await
.unwrap();
let tmp = self.path.into_temp_path();
let name = crate::connect::to_name(tmp.as_os_str()).unwrap();
let socket = LocalSocketStream::connect(name).await.unwrap();

let (mut reader, mut writer) = socket.into_split();
let (mut reader, mut writer) = socket.split();

let mut buf = [0u8; 4096];
loop {
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ allow = [
"Apache-2.0",
"Apache-2.0 WITH LLVM-exception",
"BSD-3-Clause",
"0BSD",
"ISC",
"Unicode-DFS-2016",
"Unlicense",
Expand Down

0 comments on commit 4726331

Please sign in to comment.