Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: change ws ping API #1248

Merged
merged 9 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion server/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@

//! Utilities for handling async code.

use jsonrpsee_core::Error;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_util::{Stream, StreamExt};
use jsonrpsee_core::Error;
use pin_project::pin_project;
use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tokio::time::Interval;

/// Create channel to determine whether
/// the server shall continue to run or not.
Expand Down Expand Up @@ -119,3 +125,32 @@ impl ConnectionGuard {

/// Connection permit.
pub type ConnectionPermit = OwnedSemaphorePermit;

#[pin_project]
pub(crate) struct IntervalStream(#[pin] Option<tokio_stream::wrappers::IntervalStream>);

impl IntervalStream {
/// Creates a stream which never returns any elements.
pub(crate) fn pending() -> Self {
Self(None)
}

/// Creates a stream which produces elements with interval of `period`.
pub(crate) fn new(interval: Interval) -> Self {
Self(Some(tokio_stream::wrappers::IntervalStream::new(interval)))
}
}

impl Stream for IntervalStream {
type Item = tokio::time::Instant;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut stream) = self.project().0.as_pin_mut() {
stream.poll_next_unpin(cx)
} else {
// NOTE: this will not be woken up again and it's by design
// to be a pending stream that never returns.
Poll::Pending
}
}
}
139 changes: 77 additions & 62 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use std::error::Error as StdError;
use std::future::Future;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
Expand Down Expand Up @@ -143,7 +144,6 @@ where
match try_accept_conn(&listener, stopped).await {
AcceptConnection::Established { socket, remote_addr, stop } => {


process_connection(ProcessConnection {
http_middleware: &self.http_middleware,
rpc_middleware: self.rpc_middleware.clone(),
Expand Down Expand Up @@ -200,7 +200,7 @@ pub struct ServerConfig {
/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
pub(crate) message_buffer_capacity: u32,
/// Ping settings.
pub(crate) ping_config: PingConfig,
pub(crate) ping_config: Option<PingConfig>,
/// ID provider.
pub(crate) id_provider: Arc<dyn IdProvider>,
}
Expand All @@ -224,7 +224,7 @@ pub struct ServerConfigBuilder {
/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
message_buffer_capacity: u32,
/// Ping settings.
ping_config: PingConfig,
ping_config: Option<PingConfig>,
/// ID provider.
id_provider: Arc<dyn IdProvider>,
}
Expand Down Expand Up @@ -274,51 +274,66 @@ impl ConnectionState {
}
}

/// Configuration for WebSocket ping's.
/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect inactive
/// clients.
///
/// If the server sends out a ping then remote peer must reply with a corresponding pong message.
/// It's possible to configure how often pings are sent out and how long the server will
/// wait until a client is determined as "inactive".
///
/// It's possible to just send out pings then don't care about response
/// or terminate the connection if the ping isn't replied to the configured `max_inactivity` limit.
/// jsonrpsee doesn't associate the ping/pong frames just that if
/// pong frame isn't received within `inactive_limit` it's regarded
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// as missed.
///
/// NOTE: It's possible that a `ping` may be backpressured and if you expect a connection
/// to be reassumed after interruption it's not recommended to enable the activity check.
/// Such that the `inactive_limit` should be configured to longer than a single
/// WebSocket ping takes or it might be missed and may end up
/// terminating the connection.
#[derive(Debug, Copy, Clone)]
pub enum PingConfig {
/// The server pings the connected clients continuously at the configured interval but
/// doesn't disconnect them if no pongs are received from the client.
WithoutInactivityCheck(Duration),
/// The server pings the connected clients continuously at the configured interval
/// and terminates the connection if no websocket messages received from client
/// after the max limit is exceeded.
WithInactivityCheck {
/// Time interval between consequent pings from server
ping_interval: Duration,
/// Max allowed time for connection to stay idle
inactive_limit: Duration,
},
pub struct PingConfig {
/// Period which the server pings the connected client.
pub(crate) ping_interval: Duration,
/// Max allowed time for a connection to stay idle.
pub(crate) inactive_limit: Duration,
/// Max failures.
pub(crate) max_failures: NonZeroUsize,
}

impl PingConfig {
pub(crate) fn ping_interval(&self) -> Duration {
match self {
Self::WithoutInactivityCheck(ping_interval) => *ping_interval,
Self::WithInactivityCheck { ping_interval, .. } => *ping_interval,
impl Default for PingConfig {
fn default() -> Self {
Self {
ping_interval: Duration::from_secs(30),
max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"),
inactive_limit: Duration::from_secs(40),
}
}
}

pub(crate) fn inactive_limit(&self) -> Option<Duration> {
if let Self::WithInactivityCheck { inactive_limit, .. } = self {
Some(*inactive_limit)
} else {
None
}
impl PingConfig {
/// Create a new PingConfig.
pub fn new() -> Self {
Self::default()
}
}

impl Default for PingConfig {
fn default() -> Self {
Self::WithoutInactivityCheck(Duration::from_secs(60))
/// Configure the interval when the WebSocket pings are sent out.
pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
self.ping_interval = ping_interval;
self
}

/// Configure how long to wait for the WebSocket pong.
/// When this limit is expired it's regarded as the client is unresponsive.
///
/// You may configure how many times the client is allowed to be "inactive" by
/// [`PingConfig::max_failures`].
pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
self.inactive_limit = inactivity_limit;
self
}

/// Configure how many times the remote peer is allowed be
/// inactive until the connection is closed.
pub fn max_failures(mut self, max: NonZeroUsize) -> Self {
self.max_failures = max;
self
}
}

Expand All @@ -334,7 +349,7 @@ impl Default for ServerConfig {
enable_http: true,
enable_ws: true,
message_buffer_capacity: 1024,
ping_config: PingConfig::WithoutInactivityCheck(Duration::from_secs(60)),
ping_config: None,
id_provider: Arc::new(RandomIntegerIdProvider),
}
}
Expand Down Expand Up @@ -422,18 +437,18 @@ impl ServerConfigBuilder {
self
}

/// See [`Builder::ping_interval`] for documentation.
pub fn ping_interval(mut self, config: PingConfig) -> Result<Self, Error> {
if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config {
if ping_interval >= inactive_limit {
return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into()));
}
}

self.ping_config = config;
/// See [`Builder::enable_ws_ping`] for documentation.
pub fn enable_ws_ping(mut self, config: PingConfig) -> Result<Self, Error> {
self.ping_config = Some(config);
Ok(self)
}

/// See [`Builder::disable_ws_ping`] for documentation.
pub fn disable_ws_ping(mut self) -> Self {
self.ping_config = None;
self
}

/// See [`Builder::set_id_provider`] for documentation.
pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
self.id_provider = Arc::new(id_provider);
Expand Down Expand Up @@ -633,31 +648,31 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
self
}

/// Configure the interval at which pings are submitted,
/// and optionally enable connection inactivity check
/// Enable WebSocket pings on the server.
///
/// This option is used to keep the connection alive, and can be configured to just submit `Ping` frames or with extra parameter, configuring max interval when a `Pong` frame should be received
///
/// Default: ping interval is set to 60 seconds and the inactivity check is disabled
/// Default: pings are disabled.
///
/// # Examples
///
/// ```rust
/// use std::time::Duration;
/// use std::{time::Duration, num::NonZeroUsize};
/// use jsonrpsee_server::{ServerBuilder, PingConfig};
///
/// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes
/// let builder = ServerBuilder::default().ping_interval(PingConfig::WithInactivityCheck { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap();
/// let ping_cfg = PingConfig::new().ping_interval(Duration::from_secs(10)).inactive_limit(Duration::from_secs(60 * 2));
/// let builder = ServerBuilder::default().enable_ws_ping(ping_cfg);
/// ```
pub fn ping_interval(mut self, config: PingConfig) -> Result<Self, Error> {
if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config {
if ping_interval >= inactive_limit {
return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into()));
}
}
pub fn enable_ws_ping(mut self, config: PingConfig) -> Self {
self.server_cfg.ping_config = Some(config);
self
}

self.server_cfg.ping_config = config;
Ok(self)
/// Disable WebSocket ping/pong on the server.
///
/// Default: pings are disabled.
pub fn disable_ws_ping(mut self) -> Self {
self.server_cfg.ping_config = None;
self
}

/// Configure custom `subscription ID` provider for the server to use
Expand Down
4 changes: 2 additions & 2 deletions server/src/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::num::NonZeroUsize;
use std::time::Duration;

use crate::server::BatchRequestConfig;
Expand Down Expand Up @@ -880,8 +881,7 @@ async fn server_with_infinite_call(
) -> (crate::ServerHandle, std::net::SocketAddr) {
let server = ServerBuilder::default()
// Make sure that the ping_interval doesn't force the connection to be closed
.ping_interval(crate::server::PingConfig::WithoutInactivityCheck(timeout))
.unwrap()
.enable_ws_ping(crate::server::PingConfig::new().max_failures(NonZeroUsize::MAX).ping_interval(timeout))
.build("127.0.0.1:0")
.with_default_timeout()
.await
Expand Down
Loading
Loading