Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(websocket client): drop subscriptions that can't keep up with the internal buffer size #166

Merged
merged 18 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 39 additions & 43 deletions src/client/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ pub struct Config {
pub request_channel_capacity: usize,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this becomes useless in practice because we clone the sender every time, see PR description for more info.

/// Backend channel for each unique subscription.
pub subscription_channel_capacity: usize,
/// Allow losses when the channel gets full
pub allow_subscription_losses: bool,
/// Allow losses when the request/notifications channel gets full
pub allow_request_losses: bool,
/// Max request body size
pub max_request_body_size: usize,
}
Expand All @@ -74,8 +70,6 @@ impl Default for Config {
Self {
request_channel_capacity: 100,
subscription_channel_capacity: 4,
allow_subscription_losses: false,
allow_request_losses: false,
max_request_body_size: 10 * 1024 * 1024,
}
}
Expand Down Expand Up @@ -159,11 +153,7 @@ impl Client {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params);
self.to_back
.clone()
.send(FrontToBack::Notification { method, params })
.await
.map_err(|e| Error::Internal(e.into()))
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal)
}

/// Perform a request towards the server.
Expand All @@ -183,7 +173,7 @@ impl Client {
.clone()
.send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })
.await
.map_err(|e| Error::Internal(e.into()))?;
.map_err(Error::Internal)?;

let json_value = match send_back_rx.await {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -249,11 +239,10 @@ where
pub async fn next(&mut self) -> Option<Notif> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change in this PR.

loop {
match self.notifs_rx.next().await {
Some(n) => {
if let Ok(parsed) = jsonrpc::from_value(n) {
return Some(parsed);
}
}
Some(n) => match jsonrpc::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::error!("Subscription response error: {:?}", e),
},
None => return None,
}
}
Expand Down Expand Up @@ -338,20 +327,29 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}
}
// A subscription has been closed (could improved to be used for requests too.)
// A subscription has been closed (could be used for requests too.)
Either::Left(Some(FrontToBack::ChannelClosed)) => {
//TODO: there's no way to cancel pending subscriptions and requests
//TODO(niklasad1): using `iter().find()` is wrong, it's guessing (could close down the wrong channel) and inefficient
while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) {
let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap();
client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap();
//TODO: https://github.com/paritytech/jsonrpsee/issues/169
while let Some(req_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k)
{
let (_, unsubscribe) =
active_subscriptions.remove(&req_id).expect("Subscription is active checked above; qed");
close_subscription(&mut client, req_id, unsubscribe).await;
}
}

// Received a response to a request from the server.
Either::Right(Ok(RawClientEvent::Response { request_id, result })) => {
log::trace!("[backend] client received response to req={:?}, result={:?}", request_id, result);
let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request));
match ongoing_requests.remove(&request_id) {
Some(r) => {
if let Err(e) = r.send(result.map_err(Error::Request)) {
log::error!("Could not dispatch pending request: {:?}", e);
}
}
None => log::error!("Invalid response ID to request received"),
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Received a response from the server that a subscription is registered.
Expand All @@ -367,14 +365,7 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
if send_back.send(Ok(notifs_rx)).is_ok() {
active_subscriptions.insert(request_id, (notifs_tx, unsubscribe));
} else {
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
close_subscription(&mut client, request_id, unsubscribe).await;
}
}
}
Expand All @@ -389,27 +380,19 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
Some((notifs_tx, _)) => notifs_tx,
};

// NOTE: This is non_blocking but it doesn't depend on any external handling to finish
// such as a response from a remote party.
match notifs_tx.try_send(result) {
Ok(()) => (),
// Channel is either full or disconnected, close it.
Err(e) => {
log::error!("Subscription ID: {:?} failed: {:?}", request_id, e);
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();
let (_, unsubscribe) =
active_subscriptions.remove(&request_id).expect("Request is active checked above; qed");
close_subscription(&mut client, request_id, unsubscribe).await;
}
}
}

// Request for the server to unsubscribe us has succeeded.
// Request for the server to unsubscribe to us has succeeded.
Either::Right(Ok(RawClientEvent::Unsubscribed { request_id: _ })) => {}

Either::Right(Err(RawClientError::Inner(WsConnectError::Ws(SokettoError::UnexpectedOpCode(e))))) => {
Expand All @@ -426,3 +409,16 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}
}

/// Close subscription in RawClient helper.
/// Logs if the subscription couldn't be found.
async fn close_subscription(client: &mut RawClient, request_id: RawClientRequestId, unsubscribe_method: String) {
match client.subscription_by_id(request_id).and_then(|s| s.into_active()) {
Some(mut sub) => {
if let Err(e) = sub.close(&unsubscribe_method).await {
log::error!("RequestID : {:?}, unsubscribe to {} failed: {:?}", request_id, unsubscribe_method, e);
}
}
None => log::error!("Request ID: {:?}, not an active subscription", request_id),
}
}
1 change: 0 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {

#[tokio::test]
async fn ws_more_request_than_buffer_should_not_deadlock() {
env_logger::init();
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
let (concurrent_tx, concurrent_rx) = oneshot::channel::<()>();
websocket_server_with_wait_period(server_started_tx, concurrent_rx);
Expand Down