Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(websocket client): drop subscriptions that can't keep up with the internal buffer size #166

Merged
merged 18 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_std::task::block_on;
use criterion::*;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig};
use jsonrpsee::http::HttpServer;
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap());
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr), WsConfig::default())).unwrap());

c.bench_function("synchronous WebSocket round trip", |b| {
b.iter(|| {
Expand Down
4 changes: 2 additions & 2 deletions examples/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{WsClient, WsSubscription};
use jsonrpsee::client::{WsClient, WsConfig, WsSubscription};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;

Expand All @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let mut subscribe_hello: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::WsClient;
use jsonrpsee::client::{WsClient, WsConfig};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;

Expand All @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let response: JsonValue = client.request("say_hello", Params::None).await?;
println!("r: {:?}", response);

Expand Down
4 changes: 3 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ mod ws;
#[cfg(feature = "http")]
pub use http::{HttpClient, HttpConfig, HttpTransportClient};
#[cfg(feature = "ws")]
pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient};
pub use ws::{
Client as WsClient, Config as WsConfig, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient,
};
159 changes: 108 additions & 51 deletions src/client/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::client::ws::{RawClient, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::client::ws::transport::WsConnectError;
use crate::client::ws::{RawClient, RawClientError, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::types::error::Error;
use crate::types::jsonrpc::{self, JsonValue};
// NOTE: this is a sign of a leaky abstraction to expose transport related details
// Should be removed after https://github.com/paritytech/jsonrpsee/issues/154
use soketto::connection::Error as SokettoError;

use futures::{
channel::{mpsc, oneshot},
future::Either,
pin_mut,
prelude::*,
sink::SinkExt,
};
use std::{collections::HashMap, io, marker::PhantomData};

Expand All @@ -45,6 +50,29 @@ use std::{collections::HashMap, io, marker::PhantomData};
pub struct Client {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Config.
config: Config,
}

#[derive(Copy, Clone, Debug)]
/// Configuration.
pub struct Config {
/// Backend channel for serving requests and notifications.
pub request_channel_capacity: usize,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this becomes useless in practice because we clone the sender every time, see PR description for more info.

/// Backend channel for each unique subscription.
pub subscription_channel_capacity: usize,
/// Max request body size
pub max_request_body_size: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
request_channel_capacity: 100,
subscription_channel_capacity: 4,
max_request_body_size: 10 * 1024 * 1024,
}
}
}

/// Active subscription on a [`Client`].
Expand All @@ -54,7 +82,7 @@ pub struct Subscription<Notif> {
/// Channel from which we receive notifications from the server, as undecoded `JsonValue`s.
notifs_rx: mpsc::Receiver<JsonValue>,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<mpsc::Receiver<Notif>>,
marker: PhantomData<Notif>,
}

/// Message that the [`Client`] can send to the background task.
Expand Down Expand Up @@ -104,14 +132,16 @@ impl Client {
/// Initializes a new WebSocket client
///
/// Fails when the URL is invalid.
pub async fn new(target: &str) -> Result<Self, Error> {
pub async fn new(target: impl AsRef<str>, config: Config) -> Result<Self, Error> {
let transport = WsTransportClient::new(target).await.map_err(|e| Error::TransportError(Box::new(e)))?;
let client = RawClient::new(transport);
let (to_back, from_front) = mpsc::channel(16);

let (to_back, from_front) = mpsc::channel(config.request_channel_capacity);

async_std::task::spawn(async move {
background_task(client, from_front).await;
background_task(client, from_front, config).await;
});
Ok(Client { to_back })
Ok(Client { to_back, config })
}

/// Send a notification to the server.
Expand Down Expand Up @@ -145,8 +175,6 @@ impl Client {
.await
.map_err(Error::Internal)?;

// TODO: send a `ChannelClosed` message if we close the channel unexpectedly

let json_value = match send_back_rx.await {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Expand Down Expand Up @@ -196,7 +224,6 @@ impl Client {
return Err(Error::TransportError(Box::new(err)));
}
};

Ok(Subscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData })
}
}
Expand All @@ -205,18 +232,18 @@ impl<Notif> Subscription<Notif>
where
Notif: jsonrpc::DeserializeOwned,
{
/// Returns the next notification sent from the server.
/// Returns the next notification from the stream
/// This may return `None` if the subscription has been terminated, may happen if the channel becomes full or dropped.
///
/// Ignores any malformed packet.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
pub async fn next(&mut self) -> Notif {
pub async fn next(&mut self) -> Option<Notif> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change in this PR.

loop {
match self.notifs_rx.next().await {
Some(n) => {
if let Ok(parsed) = jsonrpc::from_value(n) {
return parsed;
}
}
None => futures::pending!(),
Some(n) => match jsonrpc::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::error!("Subscription response error: {:?}", e),
},
None => return None,
}
}
}
Expand All @@ -228,17 +255,17 @@ impl<Notif> Drop for Subscription<Notif> {
// the channel's buffer will be full, and our unsubscription request will never make it.
// However, when a notification arrives, the background task will realize that the channel
// to the `Subscription` has been closed, and will perform the unsubscribe.
let _ = self.to_back.send(FrontToBack::ChannelClosed).now_or_never();
let _ = self.to_back.try_send(FrontToBack::ChannelClosed);
}
}

/// Function being run in the background that processes messages from the frontend.
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>) {
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>, config: Config) {
// List of subscription requests that have been sent to the server, with the method name to
// unsubscribe.
let mut pending_subscriptions: HashMap<RawClientRequestId, (oneshot::Sender<_>, _)> = HashMap::new();
// List of subscription that are active on the server, with the method name to unsubscribe.
let mut active_subscriptions: HashMap<RawClientRequestId, (mpsc::Sender<jsonrpc::JsonValue>, _)> = HashMap::new();
let mut active_subscriptions: HashMap<RawClientRequestId, (mpsc::Sender<JsonValue>, _)> = HashMap::new();
// List of requests that the server must answer.
let mut ongoing_requests: HashMap<RawClientRequestId, oneshot::Sender<Result<_, _>>> = HashMap::new();

Expand Down Expand Up @@ -300,68 +327,98 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}
}
// A subscription has been closed (could be used for requests too.)
Either::Left(Some(FrontToBack::ChannelClosed)) => {
// TODO: there's no way to cancel pending subscriptions and requests, otherwise
// we should clean them up as well
while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) {
let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap();
client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap();
//TODO: there's no way to cancel pending subscriptions and requests
//TODO: https://github.com/paritytech/jsonrpsee/issues/169
while let Some(req_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k)
{
let (_, unsubscribe) =
active_subscriptions.remove(&req_id).expect("Subscription is active checked above; qed");
close_subscription(&mut client, req_id, unsubscribe).await;
}
}

// Received a response to a request from the server.
Either::Right(Ok(RawClientEvent::Response { request_id, result })) => {
log::trace!("[backend] client received response to req={:?}, result={:?}", request_id, result);
let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request));
match ongoing_requests.remove(&request_id) {
Some(r) => {
if let Err(e) = r.send(result.map_err(Error::Request)) {
log::error!("Could not dispatch pending request ID: {:?}, error: {:?}", request_id, e);
}
}
None => log::error!("No pending response found for request ID {:?}", request_id),
}
}

// Receive a response from the server about a subscription.
// Received a response from the server that a subscription is registered.
Either::Right(Ok(RawClientEvent::SubscriptionResponse { request_id, result })) => {
log::trace!("[backend]: client received response to subscription: {:?}", result);
let (send_back, unsubscribe) = pending_subscriptions.remove(&request_id).unwrap();
if let Err(err) = result {
let _ = send_back.send(Err(Error::Request(err)));
} else {
// TODO: what's a good limit here? way more tricky than it looks
let (notifs_tx, notifs_rx) = mpsc::channel(4);
let (notifs_tx, notifs_rx) = mpsc::channel(config.subscription_channel_capacity);

// Send receiving end of `subscription channel` to the frontend
if send_back.send(Ok(notifs_rx)).is_ok() {
active_subscriptions.insert(request_id, (notifs_tx, unsubscribe));
} else {
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
close_subscription(&mut client, request_id, unsubscribe).await;
}
}
}

// Received a response on a subscription.
Either::Right(Ok(RawClientEvent::SubscriptionNotif { request_id, result })) => {
// TODO: unsubscribe if channel is closed
let (notifs_tx, _) = active_subscriptions.get_mut(&request_id).unwrap();
if notifs_tx.send(result).await.is_err() {
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
let notifs_tx = match active_subscriptions.get_mut(&request_id) {
None => {
log::debug!("Invalid subscription response: {:?}", request_id);
continue;
}
Some((notifs_tx, _)) => notifs_tx,
};
Comment on lines +375 to +381
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice.


match notifs_tx.try_send(result) {
Ok(()) => (),
// Channel is either full or disconnected, close it.
Err(e) => {
log::error!("Subscription ID: {:?} failed: {:?}", request_id, e);
let (_, unsubscribe) =
active_subscriptions.remove(&request_id).expect("Request is active checked above; qed");
close_subscription(&mut client, request_id, unsubscribe).await;
}
}
}

// Request for the server to unsubscribe us has succeeded.
// Request for the server to unsubscribe to us has succeeded.
Either::Right(Ok(RawClientEvent::Unsubscribed { request_id: _ })) => {}

Either::Right(Err(RawClientError::Inner(WsConnectError::Ws(SokettoError::UnexpectedOpCode(e))))) => {
log::error!(
"Client Error: {:?}, <https://github.com/paritytech/jsonrpsee/issues/154>",
SokettoError::UnexpectedOpCode(e)
);
}
Either::Right(Err(e)) => {
// TODO: https://github.com/paritytech/jsonrpsee/issues/67
log::error!("Client Error: {:?}", e);
log::error!("Client Error: {:?} terminating connection", e);
break;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but it closes the event loop once an error is received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the sending ends of the channels going to shut down gracefully when we drop the receivers here?

Copy link
Member Author

@niklasad1 niklasad1 Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the senders are not gracefully terminated but once the receiver is dropped here all senders will receive None -> Error::Internal once any of the operations (method call, notification, subscription) is invoked.

Thus, not possible for the user to know the exact failure reason without another channel or message (but we have logs lol)

EDIT:
I can revert this change if you want and handle it in a separate PR, I think it would make sense to incorporate this with https://github.com/paritytech/jsonrpsee/pull/134/files and fix better error messages for it.

Then add some similar tests that we have for the HTTP such as an invalid request ID and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you; it's fine to keep here too.

}
}
}
}

/// Close subscription in RawClient helper.
/// Logs if the subscription couldn't be found.
async fn close_subscription(client: &mut RawClient, request_id: RawClientRequestId, unsubscribe_method: String) {
match client.subscription_by_id(request_id).and_then(|s| s.into_active()) {
Some(mut sub) => {
if let Err(e) = sub.close(&unsubscribe_method).await {
log::error!("RequestID : {:?}, unsubscribe to {} failed: {:?}", request_id, unsubscribe_method, e);
}
}
None => log::error!("Request ID: {:?}, not an active subscription", request_id),
}
}
7 changes: 5 additions & 2 deletions src/client/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ pub mod raw;
pub mod stream;
pub mod transport;

pub use client::{Client, Subscription};
pub use raw::{RawClient, RawClientEvent, RawClientRequestId};
#[cfg(test)]
mod tests;

pub use client::{Client, Config, Subscription};
pub use raw::{RawClient, RawClientError, RawClientEvent, RawClientRequestId};
pub use transport::{WsConnectError, WsTransportClient};
1 change: 1 addition & 0 deletions src/client/ws/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#![cfg(test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there code missing here perhaps? There was a test removed in ws/tests.rs – was it meant to be moved here? Or is it all in integration_tests?

5 changes: 3 additions & 2 deletions src/client/ws/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ impl WsTransportClient {
}

/// Initializes a new WS client from a URL.
pub async fn new(target: &str) -> Result<Self, WsNewDnsError> {
let url = url::Url::parse(target).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?;
pub async fn new(target: impl AsRef<str>) -> Result<Self, WsNewDnsError> {
let url =
url::Url::parse(target.as_ref()).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?;
let mode = match url.scheme() {
"ws" => Mode::Plain,
"wss" => Mode::Tls,
Expand Down
1 change: 1 addition & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod transport;
#[cfg(test)]
mod tests;

pub use crate::types::http::HttpConfig;
pub use raw::RawServer as HttpRawServer;
pub use raw::RawServerEvent as HttpRawServerEvent;
pub use raw::TypedResponder as HttpTypedResponder;
Expand Down
Loading