Skip to content

Commit

Permalink
feat(rpc): remove ipc future and now using ServerHandle and StopHandl…
Browse files Browse the repository at this point in the history
…e from jsonrpsee (#9044)
  • Loading branch information
htiennv authored Jun 24, 2024
1 parent 8a53086 commit 31e2470
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 105 deletions.
60 changes: 0 additions & 60 deletions crates/rpc/ipc/src/server/future.rs

This file was deleted.

48 changes: 7 additions & 41 deletions crates/rpc/ipc/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! JSON-RPC IPC server implementation

use crate::server::{
connection::{IpcConn, JsonRpcStream},
future::StopHandle,
};
use crate::server::connection::{IpcConn, JsonRpcStream};
use futures::StreamExt;
use futures_util::future::Either;
use interprocess::local_socket::{
Expand All @@ -15,8 +12,8 @@ use jsonrpsee::{
core::TEN_MB_SIZE_BYTES,
server::{
middleware::rpc::{RpcLoggerLayer, RpcServiceT},
AlreadyStoppedError, ConnectionGuard, ConnectionPermit, IdProvider,
RandomIntegerIdProvider,
stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
ServerHandle, StopHandle,
},
BoundedSubscriptions, MethodSink, Methods,
};
Expand All @@ -29,7 +26,7 @@ use std::{
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::{oneshot, watch},
sync::oneshot,
};
use tower::{layer::util::Identity, Layer, Service};
use tracing::{debug, instrument, trace, warn, Instrument};
Expand All @@ -46,7 +43,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tower::layer::{util::Stack, LayerFn};

mod connection;
mod future;
mod ipc;
mod rpc_service;

Expand Down Expand Up @@ -109,9 +105,8 @@ where
methods: impl Into<Methods>,
) -> Result<ServerHandle, IpcServerStartError> {
let methods = methods.into();
let (stop_tx, stop_rx) = watch::channel(());

let stop_handle = StopHandle::new(stop_rx);
let (stop_handle, server_handle) = stop_channel();

// use a signal channel to wait until we're ready to accept connections
let (tx, rx) = oneshot::channel();
Expand All @@ -122,7 +117,7 @@ where
};
rx.await.expect("channel is open")?;

Ok(ServerHandle::new(stop_tx))
Ok(server_handle)
}

async fn start_inner(
Expand Down Expand Up @@ -795,35 +790,6 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
}
}

/// Server handle.
///
/// When all [`jsonrpsee::server::StopHandle`]'s have been `dropped` or `stop` has been called
/// the server will be stopped.
#[derive(Debug, Clone)]
pub struct ServerHandle(Arc<watch::Sender<()>>);

impl ServerHandle {
/// Create a new server handle.
pub(crate) fn new(tx: watch::Sender<()>) -> Self {
Self(Arc::new(tx))
}

/// Tell the server to stop without waiting for the server to stop.
pub fn stop(&self) -> Result<(), AlreadyStoppedError> {
self.0.send(()).map_err(|_| AlreadyStoppedError)
}

/// Wait for the server to stop.
pub async fn stopped(self) {
self.0.closed().await
}

/// Check if the server has been stopped.
pub fn is_stopped(&self) -> bool {
self.0.is_closed()
}
}

#[cfg(test)]
pub fn dummy_name() -> String {
let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
Expand Down Expand Up @@ -877,7 +843,7 @@ mod tests {
// and you might want to do something smarter if it's
// critical that "the most recent item" must be sent when it is produced.
if sink.send(notif).await.is_err() {
break Ok(())
break Ok(());
}

closed = c;
Expand Down
6 changes: 3 additions & 3 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl AuthServerConfig {
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;

let handle = server.start(module.inner.clone());
let mut ipc_handle: Option<reth_ipc::server::ServerHandle> = None;
let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;

if let Some(ipc_server_config) = ipc_server_config {
let ipc_endpoint_str = ipc_endpoint
Expand Down Expand Up @@ -241,7 +241,7 @@ pub struct AuthServerHandle {
handle: jsonrpsee::server::ServerHandle,
secret: JwtSecret,
ipc_endpoint: Option<String>,
ipc_handle: Option<reth_ipc::server::ServerHandle>,
ipc_handle: Option<jsonrpsee::server::ServerHandle>,
}

// === impl AuthServerHandle ===
Expand Down Expand Up @@ -310,7 +310,7 @@ impl AuthServerHandle {
}

/// Returns an ipc handle
pub fn ipc_handle(&self) -> Option<reth_ipc::server::ServerHandle> {
pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
self.ipc_handle.clone()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ pub struct RpcServerHandle {
http: Option<ServerHandle>,
ws: Option<ServerHandle>,
ipc_endpoint: Option<String>,
ipc: Option<reth_ipc::server::ServerHandle>,
ipc: Option<jsonrpsee::server::ServerHandle>,
jwt_secret: Option<JwtSecret>,
}

Expand Down

0 comments on commit 31e2470

Please sign in to comment.