Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(websocket client): drop subscriptions that can't keep up with the internal buffer size #166

Merged
merged 18 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_std::task::block_on;
use criterion::*;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig};
use jsonrpsee::http::HttpServer;
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap());
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr), WsConfig::default())).unwrap());

c.bench_function("synchronous WebSocket round trip", |b| {
b.iter(|| {
Expand Down
4 changes: 2 additions & 2 deletions examples/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{WsClient, WsSubscription};
use jsonrpsee::client::{WsClient, WsConfig, WsSubscription};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;

Expand All @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let mut subscribe_hello: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::WsClient;
use jsonrpsee::client::{WsClient, WsConfig};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;

Expand All @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let response: JsonValue = client.request("say_hello", Params::None).await?;
println!("r: {:?}", response);

Expand Down
4 changes: 3 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ mod ws;
#[cfg(feature = "http")]
pub use http::{HttpClient, HttpConfig, HttpTransportClient};
#[cfg(feature = "ws")]
pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient};
pub use ws::{
Client as WsClient, Config as WsConfig, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient,
};
135 changes: 98 additions & 37 deletions src/client/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::client::ws::{RawClient, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::client::ws::transport::WsConnectError;
use crate::client::ws::{RawClient, RawClientError, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::types::error::Error;
use crate::types::jsonrpc::{self, JsonValue};
// NOTE: this is a sign of a leaky abstraction to expose transport related details
// Should be removed after https://github.com/paritytech/jsonrpsee/issues/154
use soketto::connection::Error as SokettoError;

use futures::{
channel::{mpsc, oneshot},
future::Either,
pin_mut,
prelude::*,
sink::SinkExt,
};
use std::{collections::HashMap, io, marker::PhantomData};

Expand All @@ -45,6 +50,35 @@ use std::{collections::HashMap, io, marker::PhantomData};
pub struct Client {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Config.
config: Config,
}

#[derive(Copy, Clone, Debug)]
/// Configuration.
pub struct Config {
/// Backend channel for serving requests and notifications.
pub request_channel_capacity: usize,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this becomes useless in practice because we clone the sender every time, see PR description for more info.

/// Backend channel for each unique subscription.
pub subscription_channel_capacity: usize,
/// Allow losses when the channel gets full
pub allow_subscription_losses: bool,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// Allow losses when the request/notifications channel gets full
pub allow_request_losses: bool,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// Max request body size
pub max_request_body_size: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
request_channel_capacity: 100,
subscription_channel_capacity: 4,
allow_subscription_losses: false,
allow_request_losses: false,
max_request_body_size: 10 * 1024 * 1024,
}
}
}

/// Active subscription on a [`Client`].
Expand All @@ -54,7 +88,7 @@ pub struct Subscription<Notif> {
/// Channel from which we receive notifications from the server, as undecoded `JsonValue`s.
notifs_rx: mpsc::Receiver<JsonValue>,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<mpsc::Receiver<Notif>>,
marker: PhantomData<Notif>,
}

/// Message that the [`Client`] can send to the background task.
Expand Down Expand Up @@ -104,14 +138,16 @@ impl Client {
/// Initializes a new WebSocket client
///
/// Fails when the URL is invalid.
pub async fn new(target: &str) -> Result<Self, Error> {
pub async fn new(target: impl AsRef<str>, config: Config) -> Result<Self, Error> {
let transport = WsTransportClient::new(target).await.map_err(|e| Error::TransportError(Box::new(e)))?;
let client = RawClient::new(transport);
let (to_back, from_front) = mpsc::channel(16);

let (to_back, from_front) = mpsc::channel(config.request_channel_capacity);

async_std::task::spawn(async move {
background_task(client, from_front).await;
background_task(client, from_front, config).await;
});
Ok(Client { to_back })
Ok(Client { to_back, config })
}

/// Send a notification to the server.
Expand All @@ -123,7 +159,11 @@ impl Client {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params);
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal)
self.to_back
.clone()
.send(FrontToBack::Notification { method, params })
.await
.map_err(|e| Error::Internal(e.into()))
}

/// Perform a request towards the server.
Expand All @@ -143,9 +183,7 @@ impl Client {
.clone()
.send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })
.await
.map_err(Error::Internal)?;

// TODO: send a `ChannelClosed` message if we close the channel unexpectedly
.map_err(|e| Error::Internal(e.into()))?;

let json_value = match send_back_rx.await {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -196,7 +234,6 @@ impl Client {
return Err(Error::TransportError(Box::new(err)));
}
};

Ok(Subscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData })
}
}
Expand All @@ -205,18 +242,19 @@ impl<Notif> Subscription<Notif>
where
Notif: jsonrpc::DeserializeOwned,
{
/// Returns the next notification sent from the server.
/// Returns the next notification from the stream
/// This may return `None` if finished subscription has been terminated
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
///
/// Ignores any malformed packet.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
pub async fn next(&mut self) -> Notif {
pub async fn next(&mut self) -> Option<Notif> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change in this PR.

loop {
match self.notifs_rx.next().await {
Some(n) => {
if let Ok(parsed) = jsonrpc::from_value(n) {
return parsed;
return Some(parsed);
}
}
None => futures::pending!(),
None => return None,
}
}
}
Expand All @@ -225,20 +263,20 @@ where
impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full, and our unsubscription request will never make it.
// the channel's buffer will be full, and our un-subscription request will never make it.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// However, when a notification arrives, the background task will realize that the channel
// to the `Subscription` has been closed, and will perform the unsubscribe.
let _ = self.to_back.send(FrontToBack::ChannelClosed).now_or_never();
let _ = self.to_back.try_send(FrontToBack::ChannelClosed);
}
}

/// Function being run in the background that processes messages from the frontend.
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>) {
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>, config: Config) {
// List of subscription requests that have been sent to the server, with the method name to
// unsubscribe.
let mut pending_subscriptions: HashMap<RawClientRequestId, (oneshot::Sender<_>, _)> = HashMap::new();
// List of subscription that are active on the server, with the method name to unsubscribe.
let mut active_subscriptions: HashMap<RawClientRequestId, (mpsc::Sender<jsonrpc::JsonValue>, _)> = HashMap::new();
let mut active_subscriptions: HashMap<RawClientRequestId, (mpsc::Sender<JsonValue>, _)> = HashMap::new();
// List of requests that the server must answer.
let mut ongoing_requests: HashMap<RawClientRequestId, oneshot::Sender<Result<_, _>>> = HashMap::new();

Expand Down Expand Up @@ -300,9 +338,10 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}
}
// A subscription has been closed (could improved to be used for requests too.)
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Either::Left(Some(FrontToBack::ChannelClosed)) => {
// TODO: there's no way to cancel pending subscriptions and requests, otherwise
// we should clean them up as well
//TODO: there's no way to cancel pending subscriptions and requests
//TODO(niklasad1): using `iter().find()` is wrong, it's guessing (could close down the wrong channel) and inefficient
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) {
let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap();
client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap();
Expand All @@ -315,15 +354,16 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request));
}

// Receive a response from the server about a subscription.
// Received a response from the server that a subscription is registered.
Either::Right(Ok(RawClientEvent::SubscriptionResponse { request_id, result })) => {
log::trace!("[backend]: client received response to subscription: {:?}", result);
let (send_back, unsubscribe) = pending_subscriptions.remove(&request_id).unwrap();
if let Err(err) = result {
let _ = send_back.send(Err(Error::Request(err)));
} else {
// TODO: what's a good limit here? way more tricky than it looks
let (notifs_tx, notifs_rx) = mpsc::channel(4);
let (notifs_tx, notifs_rx) = mpsc::channel(config.subscription_channel_capacity);

// Send receiving end of `subscription channel` to the frontend
if send_back.send(Ok(notifs_rx)).is_ok() {
active_subscriptions.insert(request_id, (notifs_tx, unsubscribe));
} else {
Expand All @@ -339,28 +379,49 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}

// Received a response on a subscription.
Either::Right(Ok(RawClientEvent::SubscriptionNotif { request_id, result })) => {
// TODO: unsubscribe if channel is closed
let (notifs_tx, _) = active_subscriptions.get_mut(&request_id).unwrap();
if notifs_tx.send(result).await.is_err() {
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
let notifs_tx = match active_subscriptions.get_mut(&request_id) {
None => {
log::debug!("Invalid subscription response: {:?}", request_id);
continue;
}
Some((notifs_tx, _)) => notifs_tx,
};
Comment on lines +375 to +381
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice.


// NOTE: This is non_blocking but it doesn't depend on any external handling to finish
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// such as a response from a remote party.
match notifs_tx.try_send(result) {
Ok(()) => (),
// Channel is either full or disconnected, close it.
Err(e) => {
log::error!("Subscription ID: {:?} failed: {:?}", request_id, e);
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
client
.subscription_by_id(request_id)
.unwrap()
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
}
}
}

// Request for the server to unsubscribe us has succeeded.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Either::Right(Ok(RawClientEvent::Unsubscribed { request_id: _ })) => {}

Either::Right(Err(RawClientError::Inner(WsConnectError::Ws(SokettoError::UnexpectedOpCode(e))))) => {
log::error!(
"Client Error: {:?}, <https://github.com/paritytech/jsonrpsee/issues/154>",
SokettoError::UnexpectedOpCode(e)
);
}
Either::Right(Err(e)) => {
// TODO: https://github.com/paritytech/jsonrpsee/issues/67
log::error!("Client Error: {:?}", e);
log::error!("Client Error: {:?} terminating connection", e);
break;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but it closes the event loop once an error is received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the sending ends of the channels going to shut down gracefully when we drop the receivers here?

Copy link
Member Author

@niklasad1 niklasad1 Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the senders are not gracefully terminated but once the receiver is dropped here all senders will receive None -> Error::Internal once any of the operations (method call, notification, subscription) is invoked.

Thus, not possible for the user to know the exact failure reason without another channel or message (but we have logs lol)

EDIT:
I can revert this change if you want and handle it in a separate PR, I think it would make sense to incorporate this with https://github.com/paritytech/jsonrpsee/pull/134/files and fix better error messages for it.

Then add some similar tests that we have for the HTTP such as an invalid request ID and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you; it's fine to keep here too.

}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/client/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ pub mod raw;
pub mod stream;
pub mod transport;

pub use client::{Client, Subscription};
pub use raw::{RawClient, RawClientEvent, RawClientRequestId};
#[cfg(test)]
mod tests;

pub use client::{Client, Config, Subscription};
pub use raw::{RawClient, RawClientError, RawClientEvent, RawClientRequestId};
pub use transport::{WsConnectError, WsTransportClient};
1 change: 1 addition & 0 deletions src/client/ws/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#![cfg(test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there code missing here perhaps? There was a test removed in ws/tests.rs – was it meant to be moved here? Or is it all in integration_tests?

5 changes: 3 additions & 2 deletions src/client/ws/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ impl WsTransportClient {
}

/// Initializes a new WS client from a URL.
pub async fn new(target: &str) -> Result<Self, WsNewDnsError> {
let url = url::Url::parse(target).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?;
pub async fn new(target: impl AsRef<str>) -> Result<Self, WsNewDnsError> {
let url =
url::Url::parse(target.as_ref()).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?;
let mode = match url.scheme() {
"ws" => Mode::Plain,
"wss" => Mode::Tls,
Expand Down
1 change: 1 addition & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod transport;
#[cfg(test)]
mod tests;

pub use crate::types::http::HttpConfig;
pub use raw::RawServer as HttpRawServer;
pub use raw::RawServerEvent as HttpRawServerEvent;
pub use raw::TypedResponder as HttpTypedResponder;
Expand Down
Loading