From c68d9bfb59f0ebec480371c946eead592eb92e83 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Nov 2023 18:11:47 +0100 Subject: [PATCH 1/8] feat(server): make it possible to disable WS ping --- server/src/future.rs | 45 ++++++++++++++++++++++++++++++++++++-- server/src/server.rs | 27 +++++++++++++---------- server/src/tests/ws.rs | 2 +- server/src/transport/ws.rs | 38 ++++++++++++++------------------ 4 files changed, 76 insertions(+), 36 deletions(-) diff --git a/server/src/future.rs b/server/src/future.rs index affdf44744..a229a1430b 100644 --- a/server/src/future.rs +++ b/server/src/future.rs @@ -26,9 +26,18 @@ //! Utilities for handling async code. +use futures_util::{Stream, StreamExt}; use jsonrpsee_core::Error; -use std::sync::Arc; -use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; +use pin_project::pin_project; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{ + sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}, + time::interval, +}; /// Create channel to determine whether /// the server shall continue to run or not. @@ -119,3 +128,35 @@ impl ConnectionGuard { /// Connection permit. pub type ConnectionPermit = OwnedSemaphorePermit; + +#[pin_project] +pub(crate) struct IntervalStream(#[pin] Option); + +impl IntervalStream { + /// Creates a stream which never returns any elements. + pub(crate) fn pending() -> Self { + Self(None) + } + + /// Creates a stream which produces elements with `period`. + pub(crate) async fn new(period: std::time::Duration) -> Self { + let mut interval = interval(period); + interval.tick().await; + + Self(Some(tokio_stream::wrappers::IntervalStream::new(interval))) + } +} + +impl Stream for IntervalStream { + type Item = tokio::time::Instant; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(mut stream) = self.project().0.as_pin_mut() { + stream.poll_next_unpin(cx) + } else { + // NOTE: this will not be woken up again and it's by design + // to be a pending stream that never returns. + Poll::Pending + } + } +} diff --git a/server/src/server.rs b/server/src/server.rs index 0590c9a89c..9212308f1b 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -33,7 +33,7 @@ use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use crate::future::{ConnectionGuard, ServerHandle, StopHandle}; +use crate::future::{ConnectionGuard, IntervalStream, ServerHandle, StopHandle}; use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; use crate::transport::ws::BackgroundTaskParams; use crate::transport::{http, ws}; @@ -286,28 +286,31 @@ impl ConnectionState { pub enum PingConfig { /// The server pings the connected clients continuously at the configured interval but /// doesn't disconnect them if no pongs are received from the client. - WithoutInactivityCheck(Duration), + OnlyPing(Duration), /// The server pings the connected clients continuously at the configured interval /// and terminates the connection if no websocket messages received from client /// after the max limit is exceeded. - WithInactivityCheck { + Ping { /// Time interval between consequent pings from server ping_interval: Duration, /// Max allowed time for connection to stay idle inactive_limit: Duration, }, + /// Pings are disabled. + Disabled, } impl PingConfig { - pub(crate) fn ping_interval(&self) -> Duration { + pub(crate) async fn ping_interval(&self) -> IntervalStream { match self { - Self::WithoutInactivityCheck(ping_interval) => *ping_interval, - Self::WithInactivityCheck { ping_interval, .. } => *ping_interval, + Self::OnlyPing(ping_interval) => IntervalStream::new(*ping_interval).await, + Self::Ping { ping_interval, .. } => IntervalStream::new(*ping_interval).await, + Self::Disabled => IntervalStream::pending(), } } pub(crate) fn inactive_limit(&self) -> Option { - if let Self::WithInactivityCheck { inactive_limit, .. } = self { + if let Self::Ping { inactive_limit, .. } = self { Some(*inactive_limit) } else { None @@ -317,7 +320,7 @@ impl PingConfig { impl Default for PingConfig { fn default() -> Self { - Self::WithoutInactivityCheck(Duration::from_secs(60)) + Self::OnlyPing(Duration::from_secs(60)) } } @@ -333,7 +336,7 @@ impl Default for ServerConfig { enable_http: true, enable_ws: true, message_buffer_capacity: 1024, - ping_config: PingConfig::WithoutInactivityCheck(Duration::from_secs(60)), + ping_config: PingConfig::OnlyPing(Duration::from_secs(60)), id_provider: Arc::new(RandomIntegerIdProvider), } } @@ -423,7 +426,7 @@ impl ServerConfigBuilder { /// See [`Builder::ping_interval`] for documentation. pub fn ping_interval(mut self, config: PingConfig) -> Result { - if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config { + if let PingConfig::Ping { ping_interval, inactive_limit } = config { if ping_interval >= inactive_limit { return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into())); } @@ -646,10 +649,10 @@ impl Builder { /// use jsonrpsee_server::{ServerBuilder, PingConfig}; /// /// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes - /// let builder = ServerBuilder::default().ping_interval(PingConfig::WithInactivityCheck { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap(); + /// let builder = ServerBuilder::default().ping_interval(PingConfig::Ping { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap(); /// ``` pub fn ping_interval(mut self, config: PingConfig) -> Result { - if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config { + if let PingConfig::Ping { ping_interval, inactive_limit } = config { if ping_interval >= inactive_limit { return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into())); } diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 32a9f0a829..14d3f85ed5 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -880,7 +880,7 @@ async fn server_with_infinite_call( ) -> (crate::ServerHandle, std::net::SocketAddr) { let server = ServerBuilder::default() // Make sure that the ping_interval doesn't force the connection to be closed - .ping_interval(crate::server::PingConfig::WithoutInactivityCheck(timeout)) + .ping_interval(crate::server::PingConfig::OnlyPing(timeout)) .unwrap() .build("127.0.0.1:0") .with_default_timeout() diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 8da0cbccf5..e71182daf1 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -1,13 +1,14 @@ use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; +use crate::future::IntervalStream; use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; use crate::server::{handle_rpc_call, ConnectionState, ServerConfig}; use crate::PingConfig; -use futures_util::future::{self, Either, Fuse}; +use futures_util::future::{self, Either}; use futures_util::io::{BufReader, BufWriter}; -use futures_util::{Future, FutureExt, StreamExt, TryStreamExt}; +use futures_util::{Future, StreamExt, TryStreamExt}; use hyper::upgrade::Upgraded; use jsonrpsee_core::server::helpers::MethodSink; use jsonrpsee_core::server::{BoundedSubscriptions, Methods}; @@ -18,7 +19,7 @@ use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; pub(crate) type Sender = soketto::Sender>>>; @@ -77,7 +78,7 @@ where let (conn_tx, conn_rx) = oneshot::channel(); // Spawn another task that sends out the responses on the Websocket. - let send_task_handle = tokio::spawn(send_task(rx, ws_sender, ping_config.ping_interval(), conn_rx)); + let send_task_handle = tokio::spawn(send_task(rx, ws_sender, ping_config, conn_rx)); let stopped = conn.stop_handle.clone().shutdown(); let rpc_service = Arc::new(rpc_service); @@ -174,15 +175,10 @@ where async fn send_task( rx: mpsc::Receiver, mut ws_sender: Sender, - ping_interval: Duration, + ping_config: PingConfig, stop: oneshot::Receiver<()>, ) { - // Interval to send out continuously `pings`. - let mut ping_interval = tokio::time::interval(ping_interval); - // This returns immediately so make sure it doesn't resolve before the ping_interval has been elapsed. - ping_interval.tick().await; - - let ping_interval = IntervalStream::new(ping_interval); + let ping_interval = ping_config.ping_interval().await; let rx = ReceiverStream::new(rx); tokio::pin!(ping_interval, rx, stop); @@ -250,10 +246,14 @@ where T: StreamExt> + Unpin, { let mut last_active = Instant::now(); + let inactivity_check = match ping_config.inactive_limit() { + Some(period) => IntervalStream::new(period).await, + None => IntervalStream::pending(), + }; - let inactivity_check = - Box::pin(ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated)); - let mut futs = futures_util::future::select(ws_stream.next(), inactivity_check); + tokio::pin!(inactivity_check); + + let mut futs = futures_util::future::select(ws_stream.next(), inactivity_check.next()); loop { match futures_util::future::select(futs, stopped).await { @@ -279,12 +279,8 @@ where } stopped = s; - // use really large duration instead of Duration::MAX to - // solve the panic issue with interval initialization - let inactivity_check = Box::pin( - ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated), - ); - futs = futures_util::future::select(rcv, inactivity_check); + + futs = futures_util::future::select(rcv, inactivity_check.next()); } // Server has been stopped. Either::Right(_) => break Receive::Stopped, From a7e2db5e931c3ed23154379b482130b60dda522e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Nov 2023 09:31:42 +0100 Subject: [PATCH 2/8] cleanup --- server/src/future.rs | 24 ++++++++++-------------- server/src/server.rs | 6 +++--- server/src/transport/ws.rs | 5 ++--- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/server/src/future.rs b/server/src/future.rs index a229a1430b..4a837fbbae 100644 --- a/server/src/future.rs +++ b/server/src/future.rs @@ -26,18 +26,15 @@ //! Utilities for handling async code. +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + use futures_util::{Stream, StreamExt}; use jsonrpsee_core::Error; use pin_project::pin_project; -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{ - sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}, - time::interval, -}; +use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; +use tokio::time::{interval_at, Instant}; /// Create channel to determine whether /// the server shall continue to run or not. @@ -138,11 +135,10 @@ impl IntervalStream { Self(None) } - /// Creates a stream which produces elements with `period`. - pub(crate) async fn new(period: std::time::Duration) -> Self { - let mut interval = interval(period); - interval.tick().await; - + /// Creates a stream which produces elements with interval of `period`. + pub(crate) fn new(period: std::time::Duration) -> Self { + let first_tick = Instant::now() + period; + let interval = interval_at(first_tick, period); Self(Some(tokio_stream::wrappers::IntervalStream::new(interval))) } } diff --git a/server/src/server.rs b/server/src/server.rs index 9212308f1b..f5f7269798 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -301,10 +301,10 @@ pub enum PingConfig { } impl PingConfig { - pub(crate) async fn ping_interval(&self) -> IntervalStream { + pub(crate) fn ping_interval(&self) -> IntervalStream { match self { - Self::OnlyPing(ping_interval) => IntervalStream::new(*ping_interval).await, - Self::Ping { ping_interval, .. } => IntervalStream::new(*ping_interval).await, + Self::OnlyPing(ping_interval) => IntervalStream::new(*ping_interval), + Self::Ping { ping_interval, .. } => IntervalStream::new(*ping_interval), Self::Disabled => IntervalStream::pending(), } } diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index e71182daf1..ad8df7d743 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -178,7 +178,7 @@ async fn send_task( ping_config: PingConfig, stop: oneshot::Receiver<()>, ) { - let ping_interval = ping_config.ping_interval().await; + let ping_interval = ping_config.ping_interval(); let rx = ReceiverStream::new(rx); tokio::pin!(ping_interval, rx, stop); @@ -247,7 +247,7 @@ where { let mut last_active = Instant::now(); let inactivity_check = match ping_config.inactive_limit() { - Some(period) => IntervalStream::new(period).await, + Some(period) => IntervalStream::new(period), None => IntervalStream::pending(), }; @@ -279,7 +279,6 @@ where } stopped = s; - futs = futures_util::future::select(rcv, inactivity_check.next()); } // Server has been stopped. From ef32b4a553c8319ad5f118040d083959d81afbd3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Nov 2023 13:13:53 +0100 Subject: [PATCH 3/8] refactor PingConfig --- server/src/future.rs | 6 +- server/src/server.rs | 135 +++++++++++++++++++------------------ server/src/tests/ws.rs | 4 +- server/src/transport/ws.rs | 29 +++++--- 4 files changed, 94 insertions(+), 80 deletions(-) diff --git a/server/src/future.rs b/server/src/future.rs index 4a837fbbae..f4f529653e 100644 --- a/server/src/future.rs +++ b/server/src/future.rs @@ -34,7 +34,7 @@ use futures_util::{Stream, StreamExt}; use jsonrpsee_core::Error; use pin_project::pin_project; use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; -use tokio::time::{interval_at, Instant}; +use tokio::time::Interval; /// Create channel to determine whether /// the server shall continue to run or not. @@ -136,9 +136,7 @@ impl IntervalStream { } /// Creates a stream which produces elements with interval of `period`. - pub(crate) fn new(period: std::time::Duration) -> Self { - let first_tick = Instant::now() + period; - let interval = interval_at(first_tick, period); + pub(crate) fn new(interval: Interval) -> Self { Self(Some(tokio_stream::wrappers::IntervalStream::new(interval))) } } diff --git a/server/src/server.rs b/server/src/server.rs index f5f7269798..fa7bc0d8f6 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -27,13 +27,14 @@ use std::error::Error as StdError; use std::future::Future; use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use crate::future::{ConnectionGuard, IntervalStream, ServerHandle, StopHandle}; +use crate::future::{ConnectionGuard, ServerHandle, StopHandle}; use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; use crate::transport::ws::BackgroundTaskParams; use crate::transport::{http, ws}; @@ -142,7 +143,6 @@ where match try_accept_conn(&listener, stopped).await { AcceptConnection::Established { socket, remote_addr, stop } => { - process_connection(ProcessConnection { http_middleware: &self.http_middleware, rpc_middleware: self.rpc_middleware.clone(), @@ -199,7 +199,7 @@ pub struct ServerConfig { /// Number of messages that server is allowed to `buffer` until backpressure kicks in. pub(crate) message_buffer_capacity: u32, /// Ping settings. - pub(crate) ping_config: PingConfig, + pub(crate) ping_config: Option, /// ID provider. pub(crate) id_provider: Arc, } @@ -223,7 +223,7 @@ pub struct ServerConfigBuilder { /// Number of messages that server is allowed to `buffer` until backpressure kicks in. message_buffer_capacity: u32, /// Ping settings. - ping_config: PingConfig, + ping_config: Option, /// ID provider. id_provider: Arc, } @@ -273,54 +273,60 @@ impl ConnectionState { } } -/// Configuration for WebSocket ping's. +/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect inactive +/// clients. /// -/// If the server sends out a ping then remote peer must reply with a corresponding pong message. +/// It's possible to configure how often pings are sent out and how long the server will +/// wait until a client is determined as "inactive". /// -/// It's possible to just send out pings then don't care about response -/// or terminate the connection if the ping isn't replied to the configured `max_inactivity` limit. +/// jsonrpsee doesn't associate the ping/pong frames just that if +/// pong frame isn't received within `inactive_limit` it's regarded +/// as missed. /// -/// NOTE: It's possible that a `ping` may be backpressured and if you expect a connection -/// to be reassumed after interruption it's not recommended to enable the activity check. +/// Such that the `inactive_limit` should be configured to longer than a single +/// WebSocket ping takes or it might be missed and may end up +/// terminating the connection. #[derive(Debug, Copy, Clone)] -pub enum PingConfig { - /// The server pings the connected clients continuously at the configured interval but - /// doesn't disconnect them if no pongs are received from the client. - OnlyPing(Duration), - /// The server pings the connected clients continuously at the configured interval - /// and terminates the connection if no websocket messages received from client - /// after the max limit is exceeded. - Ping { - /// Time interval between consequent pings from server - ping_interval: Duration, - /// Max allowed time for connection to stay idle - inactive_limit: Duration, - }, - /// Pings are disabled. - Disabled, +pub struct PingConfig { + /// Period which the server pings the connected client. + pub(crate) ping_interval: Duration, + /// Max allowed time for a connection to stay idle. + pub(crate) inactive_limit: Duration, + /// Max failures. + pub(crate) max_failures: NonZeroUsize, } impl PingConfig { - pub(crate) fn ping_interval(&self) -> IntervalStream { - match self { - Self::OnlyPing(ping_interval) => IntervalStream::new(*ping_interval), - Self::Ping { ping_interval, .. } => IntervalStream::new(*ping_interval), - Self::Disabled => IntervalStream::pending(), + /// Create a new PingConfig. + pub fn new() -> Self { + Self { + ping_interval: Duration::from_secs(30), + max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"), + inactive_limit: Duration::from_secs(40), } } - pub(crate) fn inactive_limit(&self) -> Option { - if let Self::Ping { inactive_limit, .. } = self { - Some(*inactive_limit) - } else { - None - } + /// Configure the interval when the WebSocket pings are sent out. + pub fn ping_interval(mut self, ping_interval: Duration) -> Self { + self.ping_interval = ping_interval; + self } -} -impl Default for PingConfig { - fn default() -> Self { - Self::OnlyPing(Duration::from_secs(60)) + /// Configure how long to wait until for WebSocket pong. + /// When this limit is expired it's regarded as the client is unresponsive. + /// + /// You may configure how many times the client is allowed to be "inactive" by + /// [`PingConfig::max_failures`]. + pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self { + self.inactive_limit = inactivity_limit; + self + } + + /// Configure how many times the remote peer is allowed be + /// inactive until the connection is closed. + pub fn max_failures(mut self, max: NonZeroUsize) -> Self { + self.max_failures = max; + self } } @@ -336,7 +342,7 @@ impl Default for ServerConfig { enable_http: true, enable_ws: true, message_buffer_capacity: 1024, - ping_config: PingConfig::OnlyPing(Duration::from_secs(60)), + ping_config: None, id_provider: Arc::new(RandomIntegerIdProvider), } } @@ -424,18 +430,18 @@ impl ServerConfigBuilder { self } - /// See [`Builder::ping_interval`] for documentation. - pub fn ping_interval(mut self, config: PingConfig) -> Result { - if let PingConfig::Ping { ping_interval, inactive_limit } = config { - if ping_interval >= inactive_limit { - return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into())); - } - } - - self.ping_config = config; + /// See [`Builder::enable_ws_ping`] for documentation. + pub fn enable_ws_ping(mut self, config: PingConfig) -> Result { + self.ping_config = Some(config); Ok(self) } + /// See [`Builder::disable_ws_ping`] for documentation. + pub fn disable_ws_ping(mut self) -> Self { + self.ping_config = None; + self + } + /// See [`Builder::set_id_provider`] for documentation. pub fn set_id_provider(mut self, id_provider: I) -> Self { self.id_provider = Arc::new(id_provider); @@ -635,31 +641,30 @@ impl Builder { self } - /// Configure the interval at which pings are submitted, - /// and optionally enable connection inactivity check + /// Enable WebSocket pings on the server. /// - /// This option is used to keep the connection alive, and can be configured to just submit `Ping` frames or with extra parameter, configuring max interval when a `Pong` frame should be received - /// - /// Default: ping interval is set to 60 seconds and the inactivity check is disabled + /// Default: pings are disabled. /// /// # Examples /// /// ```rust - /// use std::time::Duration; + /// use std::{time::Duration, num::NonZeroUsize}; /// use jsonrpsee_server::{ServerBuilder, PingConfig}; /// /// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes - /// let builder = ServerBuilder::default().ping_interval(PingConfig::Ping { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap(); + /// let builder = ServerBuilder::default().enable_ws_ping(PingConfig::new(Duration::from_secs(10), NonZeroUsize::new(1).unwrap(), Duration::from_secs(10))).unwrap(); /// ``` - pub fn ping_interval(mut self, config: PingConfig) -> Result { - if let PingConfig::Ping { ping_interval, inactive_limit } = config { - if ping_interval >= inactive_limit { - return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into())); - } - } + pub fn enable_ws_ping(mut self, config: PingConfig) -> Self { + self.server_cfg.ping_config = Some(config); + self + } - self.server_cfg.ping_config = config; - Ok(self) + /// Disable WebSocket ping/pong on the server. + /// + /// Default: pings are disabled. + pub fn disable_ws_ping(mut self) -> Self { + self.server_cfg.ping_config = None; + self } /// Configure custom `subscription ID` provider for the server to use diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 14d3f85ed5..14d7324acb 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -24,6 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::num::NonZeroUsize; use std::time::Duration; use crate::server::BatchRequestConfig; @@ -880,8 +881,7 @@ async fn server_with_infinite_call( ) -> (crate::ServerHandle, std::net::SocketAddr) { let server = ServerBuilder::default() // Make sure that the ping_interval doesn't force the connection to be closed - .ping_interval(crate::server::PingConfig::OnlyPing(timeout)) - .unwrap() + .enable_ws_ping(crate::server::PingConfig::new().max_failures(NonZeroUsize::MAX).ping_interval(timeout)) .build("127.0.0.1:0") .with_default_timeout() .await diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index ad8df7d743..7b77be8240 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -19,6 +19,7 @@ use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; use tokio::sync::{mpsc, oneshot}; +use tokio::time::{interval, interval_at}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; @@ -175,10 +176,16 @@ where async fn send_task( rx: mpsc::Receiver, mut ws_sender: Sender, - ping_config: PingConfig, + ping_config: Option, stop: oneshot::Receiver<()>, ) { - let ping_interval = ping_config.ping_interval(); + let ping_interval = match ping_config { + None => IntervalStream::pending(), + // NOTE: we are emitted a tick here immidiately to sync + // with how the receive task work because it starts measuring the pong + // when it starts up. + Some(p) => IntervalStream::new(interval(p.ping_interval)), + }; let rx = ReceiverStream::new(rx); tokio::pin!(ping_interval, rx, stop); @@ -240,16 +247,17 @@ enum Receive { } /// Attempts to read data from WebSocket fails if the server was stopped. -async fn try_recv(ws_stream: &mut T, mut stopped: S, ping_config: PingConfig) -> Receive +async fn try_recv(ws_stream: &mut T, mut stopped: S, ping_config: Option) -> Receive where S: Future + Unpin, T: StreamExt> + Unpin, { let mut last_active = Instant::now(); - let inactivity_check = match ping_config.inactive_limit() { - Some(period) => IntervalStream::new(period), + let inactivity_check = match ping_config { + Some(p) => IntervalStream::new(interval_at(tokio::time::Instant::now() + p.ping_interval, p.ping_interval)), None => IntervalStream::pending(), }; + let mut missed = 0; tokio::pin!(inactivity_check); @@ -271,11 +279,14 @@ where Either::Left((Either::Left((Some(Err(e)), _)), s)) => break Receive::Err(e, s), // Max inactivity timeout fired, check if the connection has been idle too long. Either::Left((Either::Right((_instant, rcv)), s)) => { - let inactive_limit_exceeded = - ping_config.inactive_limit().map_or(false, |duration| last_active.elapsed() > duration); + if let Some(p) = ping_config { + if last_active.elapsed() > p.inactive_limit { + missed += 1; - if inactive_limit_exceeded { - break Receive::Err(SokettoError::Closed, s); + if missed >= p.max_failures.get() { + break Receive::ConnectionClosed; + } + } } stopped = s; From e3a2c61786da04453a6c4d594c2cf61b1d8bbf42 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Nov 2023 13:16:44 +0100 Subject: [PATCH 4/8] refactor PingConfig --- server/src/server.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/server.rs b/server/src/server.rs index fa7bc0d8f6..367913cdf2 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -296,15 +296,21 @@ pub struct PingConfig { pub(crate) max_failures: NonZeroUsize, } -impl PingConfig { - /// Create a new PingConfig. - pub fn new() -> Self { +impl Default for PingConfig { + fn default() -> Self { Self { ping_interval: Duration::from_secs(30), max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"), inactive_limit: Duration::from_secs(40), } } +} + +impl PingConfig { + /// Create a new PingConfig. + pub fn new() -> Self { + Self::default() + } /// Configure the interval when the WebSocket pings are sent out. pub fn ping_interval(mut self, ping_interval: Duration) -> Self { From baa24f99de6d173de6af5b123804fa7c35b1dece Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Nov 2023 14:44:20 +0100 Subject: [PATCH 5/8] fix build --- server/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/server.rs b/server/src/server.rs index 367913cdf2..1762e10f3c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -658,7 +658,8 @@ impl Builder { /// use jsonrpsee_server::{ServerBuilder, PingConfig}; /// /// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes - /// let builder = ServerBuilder::default().enable_ws_ping(PingConfig::new(Duration::from_secs(10), NonZeroUsize::new(1).unwrap(), Duration::from_secs(10))).unwrap(); + /// let ping_cfg = PingConfig::new().ping_interval(Duration::from_secs(10)).inactive_limit(Duration::from_secs(60 * 2)); + /// let builder = ServerBuilder::default().enable_ws_ping(ping_cfg); /// ``` pub fn enable_ws_ping(mut self, config: PingConfig) -> Self { self.server_cfg.ping_config = Some(config); From d13131c185e2b8395b0d97bdcf39129e5b2916a9 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Nov 2023 15:22:50 +0100 Subject: [PATCH 6/8] Update server/src/server.rs --- server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/server.rs b/server/src/server.rs index d949acf470..91e3880c2c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -319,7 +319,7 @@ impl PingConfig { self } - /// Configure how long to wait until for WebSocket pong. + /// Configure how long to wait for the WebSocket pong. /// When this limit is expired it's regarded as the client is unresponsive. /// /// You may configure how many times the client is allowed to be "inactive" by From 4350c6a7e8613e3525fc16d9be4f80f10712d080 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 1 Dec 2023 10:44:50 +0100 Subject: [PATCH 7/8] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 65edc5e680..e48488192f 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -181,7 +181,7 @@ async fn send_task( ) { let ping_interval = match ping_config { None => IntervalStream::pending(), - // NOTE: we are emitted a tick here immidiately to sync + // NOTE: we are emitted a tick here immediately to sync // with how the receive task work because it starts measuring the pong // when it starts up. Some(p) => IntervalStream::new(interval(p.ping_interval)), From 6d562fc26c9e3dc06e39f0d485bf46089cdc2033 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 1 Dec 2023 11:10:44 +0100 Subject: [PATCH 8/8] Update server/src/server.rs --- server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/server.rs b/server/src/server.rs index 91e3880c2c..ed425cb87a 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -281,7 +281,7 @@ impl ConnectionState { /// wait until a client is determined as "inactive". /// /// jsonrpsee doesn't associate the ping/pong frames just that if -/// pong frame isn't received within `inactive_limit` it's regarded +/// pong frame isn't received within `inactive_limit` then it's regarded /// as missed. /// /// Such that the `inactive_limit` should be configured to longer than a single