Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): update to interprocess 2 #687

Merged
merged 12 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/transport-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tokio-util = { workspace = true, features = ["io", "compat"] }
tracing.workspace = true

bytes = "1.5.0"
interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] }
interprocess = { version = "2", features = ["tokio"] }
serde = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }

Expand Down
18 changes: 12 additions & 6 deletions crates/transport-ipc/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use interprocess::local_socket::{GenericFilePath, ToFsName};
use std::{
ffi::{CString, OsString},
path::PathBuf,
Expand All @@ -21,7 +22,7 @@ impl<T> IpcConnect<T> {
}

macro_rules! impl_connect {
($target:ty) => {
($target:ty => $map:ident) => {
impl From<$target> for IpcConnect<$target> {
fn from(inner: $target) -> Self {
Self { inner }
Expand All @@ -42,15 +43,20 @@ macro_rules! impl_connect {
async fn connect(
&self,
) -> Result<alloy_pubsub::ConnectionHandle, alloy_transport::TransportError> {
crate::IpcBackend::connect(&self.inner)
let name = self
.inner
.$map()
.to_fs_name::<GenericFilePath>()
.map_err(alloy_transport::TransportErrorKind::custom)?;
crate::IpcBackend::connect(name)
.await
.map_err(alloy_transport::TransportErrorKind::custom)
}
}
};
}

impl_connect!(OsString);
impl_connect!(CString);
impl_connect!(PathBuf);
impl_connect!(String);
impl_connect!(OsString => as_os_str);
impl_connect!(CString => as_c_str);
impl_connect!(PathBuf => as_path);
impl_connect!(String => as_str);
54 changes: 19 additions & 35 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
extern crate tracing;

use bytes::{Buf, BytesMut};
use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName};
use futures::{ready, StreamExt};
use interprocess::local_socket::{tokio::prelude::*, Name};
use std::task::Poll::Ready;
use tokio::select;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::{
io::{AsyncRead, AsyncWriteExt},
select,
};
use tokio_util::io::poll_read_buf;

mod connect;
pub use connect::IpcConnect;
Expand All @@ -37,31 +40,24 @@ type Result<T> = std::result::Result<T, std::io::Error>;

/// An IPC backend task.
struct IpcBackend {
pub(crate) socket: LocalSocketStream,
pub(crate) stream: LocalSocketStream,

pub(crate) interface: alloy_pubsub::ConnectionInterface,
}

impl IpcBackend {
/// Connect to a local socket. Either a unix socket or a windows named pipe.
async fn connect<'a, I>(name: &I) -> Result<alloy_pubsub::ConnectionHandle>
where
// TODO: remove bound on next interprocess crate release
I: ToLocalSocketName<'a> + Clone,
{
let socket = LocalSocketStream::connect(name.clone()).await?;
async fn connect(name: Name<'_>) -> Result<alloy_pubsub::ConnectionHandle> {
let stream = LocalSocketStream::connect(name).await?;
let (handle, interface) = alloy_pubsub::ConnectionHandle::new();

let backend = IpcBackend { socket, interface };

let backend = IpcBackend { stream, interface };
backend.spawn();

Ok(handle)
}

fn spawn(mut self) {
let fut = async move {
let (read, mut writer) = self.socket.into_split();
let (read, mut writer) = self.stream.split();
let mut read = ReadJsonStream::new(read).fuse();

let err = loop {
Expand Down Expand Up @@ -118,43 +114,32 @@ const CAPACITY: usize = 4096;
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: tokio_util::compat::Compat<T>,
reader: T,
/// A buffer for reading data from the reader.
buf: BytesMut,
/// Whether the buffer has been drained.
drained: bool,
}

impl<T> ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> ReadJsonStream<T> {
fn new(reader: T) -> Self {
Self { reader: reader.compat(), buf: BytesMut::with_capacity(CAPACITY), drained: true }
Self { reader, buf: BytesMut::with_capacity(CAPACITY), drained: true }
}
}

impl<T> From<T> for ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> From<T> for ReadJsonStream<T> {
fn from(reader: T) -> Self {
Self::new(reader)
}
}

impl<T> futures::stream::Stream for ReadJsonStream<T>
where
T: AsyncRead,
{
impl<T: AsyncRead> futures::stream::Stream for ReadJsonStream<T> {
type Item = alloy_json_rpc::PubSubItem;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use tokio_util::io::poll_read_buf;

let mut this = self.project();

loop {
Expand Down Expand Up @@ -235,7 +220,6 @@ where
mod tests {
use super::*;
use std::future::poll_fn;
use tokio_util::compat::TokioAsyncReadCompatExt;

#[tokio::test]
async fn test_partial_stream() {
Expand All @@ -248,7 +232,7 @@ mod tests {
.read(r#", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }"#.as_bytes())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Expand All @@ -269,7 +253,7 @@ mod tests {
.read(vec![b'a'; CAPACITY].as_ref())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
let mut reader = ReadJsonStream::new(mock);
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Expand Down
13 changes: 6 additions & 7 deletions crates/transport-ipc/src/mock.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Mock IPC server.

use alloy_json_rpc::Response;
use futures::{AsyncReadExt, AsyncWriteExt};
use interprocess::local_socket::{tokio::prelude::*, GenericFilePath};
use serde::Serialize;
use std::{collections::VecDeque, path::PathBuf};
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

/// Mock IPC server.
///
Expand Down Expand Up @@ -71,13 +72,11 @@ impl MockIpcServer {
/// Run the server.
pub async fn spawn(mut self) {
tokio::spawn(async move {
let socket = interprocess::local_socket::tokio::LocalSocketStream::connect(
self.path.into_temp_path().to_path_buf(),
)
.await
.unwrap();
let tmp = self.path.into_temp_path();
let name = tmp.to_fs_name::<GenericFilePath>().unwrap();
let socket = LocalSocketStream::connect(name).await.unwrap();

let (mut reader, mut writer) = socket.into_split();
let (mut reader, mut writer) = socket.split();

let mut buf = [0u8; 4096];
loop {
Expand Down
Loading