diff --git a/src/client/ws/tests.rs b/src/client/ws/tests.rs index 67005ab4bc..9b99cbdf77 100644 --- a/src/client/ws/tests.rs +++ b/src/client/ws/tests.rs @@ -1 +1,124 @@ #![cfg(test)] + +use crate::client::{WsClient, WsConfig, WsSubscription}; +use crate::types::error::Error; +use crate::types::jsonrpc; + +use jsonrpsee_test_utils::helpers::*; +use jsonrpsee_test_utils::types::{Id, WebSocketTestServer}; + +fn assert_error_response(response: Result, code: jsonrpc::ErrorCode, message: String) { + let expected = jsonrpc::Error { code, message, data: None }; + match response { + Err(Error::Request(err)) => { + assert_eq!(err, expected); + } + e @ _ => panic!("Expected error: \"{}\", got: {:?}", expected, e), + }; +} + +#[tokio::test] +async fn method_call_works() { + let server = WebSocketTestServer::with_hardcoded_response( + "127.0.0.1:0".parse().unwrap(), + ok_response("hello".into(), Id::Num(0_u64)), + ) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: jsonrpc::JsonValue = client.request("say_hello", jsonrpc::Params::None).await.unwrap(); + let exp = jsonrpc::JsonValue::String("hello".to_string()); + assert_eq!(response, exp); +} + +#[tokio::test] +async fn notif_works() { + // this empty string shouldn't be read because the server shouldn't respond to notifications. + let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), String::new()).await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + assert!(client.notification("notif", jsonrpc::Params::None).await.is_ok()); +} + +#[tokio::test] +async fn method_not_found_works() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), method_not_found(Id::Num(0_u64))) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert_error_response(response, jsonrpc::ErrorCode::MethodNotFound, METHOD_NOT_FOUND.into()); +} + +#[tokio::test] +async fn parse_error_works() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), parse_error(Id::Num(0_u64))).await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert_error_response(response, jsonrpc::ErrorCode::ParseError, PARSE_ERROR.into()); +} + +#[tokio::test] +async fn invalid_request_works() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_request(Id::Num(0_u64))) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert_error_response(response, jsonrpc::ErrorCode::InvalidRequest, INVALID_REQUEST.into()); +} + +#[tokio::test] +async fn invalid_params_works() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_params(Id::Num(0_u64))) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert_error_response(response, jsonrpc::ErrorCode::InvalidParams, INVALID_PARAMS.into()); +} + +#[tokio::test] +async fn internal_error_works() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(0_u64))) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert_error_response(response, jsonrpc::ErrorCode::InternalError, INTERNAL_ERROR.into()); +} + +#[tokio::test] +async fn subscription_works() { + let server = WebSocketTestServer::with_hardcoded_subscription( + "127.0.0.1:0".parse().unwrap(), + server_subscription_id_response(Id::Num(0)), + server_subscription_response(jsonrpc::JsonValue::String("hello my friend".to_owned())), + ) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + { + let mut sub: WsSubscription = + client.subscribe("subscribe_hello", jsonrpc::Params::None, "unsubscribe_hello").await.unwrap(); + let response: String = sub.next().await.unwrap().into(); + assert_eq!("hello my friend".to_owned(), response); + } +} + +#[tokio::test] +async fn response_with_wrong_id() { + let server = + WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(99_u64))) + .await; + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let err: Result = client.request("say_hello", jsonrpc::Params::None).await; + assert!(matches!(err, Err(Error::TransportError(e)) if e.to_string().contains("background task closed"))); +} diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 8315a60627..9e2ffde061 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -8,8 +8,10 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-std = "1.8.0" futures = "0.3.8" hyper = "0.13.9" +log = "0.4.11" serde = { version = "1.0.118", default-features = false, features = ["derive"] } serde_json = "1.0.60" soketto = "0.4.2" diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index 4e20807e1f..97f81dea72 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -64,6 +64,24 @@ pub fn internal_error(id: Id) -> String { ) } +/// Hardcoded server response when a client initiates a new subscription. +/// +/// NOTE: works only for one subscription because the subscription ID is hardcoded. +pub fn server_subscription_id_response(id: Id) -> String { + format!( + r#"{{"jsonrpc":"2.0","result":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","id":{}}}"#, + serde_json::to_string(&id).unwrap() + ) +} + +/// Server response to a hardcoded pending subscription +pub fn server_subscription_response(result: Value) -> String { + format!( + r#"{{"jsonrpc":"2.0","method":"bar","params":{{"subscription":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","result":{}}}}}"#, + serde_json::to_string(&result).unwrap() + ) +} + pub async fn http_request(body: Body, uri: Uri) -> Result { let client = hyper::Client::new(); let r = hyper::Request::post(uri) diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index d027631c48..28454314f3 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -1,4 +1,6 @@ //! Shared test helpers for JSONRPC v2. +#![recursion_limit = "256"] + pub mod helpers; pub mod types; diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index dcc182f5bf..036ccb98fc 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -1,7 +1,13 @@ +use futures::channel::mpsc::{self, Receiver, Sender}; +use futures::future::FutureExt; use futures::io::{BufReader, BufWriter}; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; use serde::{Deserialize, Serialize}; use soketto::handshake; +use soketto::handshake::{server::Response, Server}; use std::net::SocketAddr; +use std::time::Duration; use tokio::net::TcpStream; use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt}; @@ -68,3 +74,145 @@ impl WebSocketTestClient { self.tx.close().await.map_err(Into::into) } } + +#[derive(Debug, Clone)] +pub enum ServerMode { + // Send out a hardcoded response on every connection. + Response(String), + // Send out a subscription ID on a request and continuously send out data on the subscription. + Subscription { subscription_id: String, subscription_response: String }, +} + +/// JSONRPC v2 dummy WebSocket server that sends a hardcoded response. +pub struct WebSocketTestServer { + local_addr: SocketAddr, + exit: Sender<()>, +} + +impl WebSocketTestServer { + // Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured `hardcoded response` for every connection. + pub async fn with_hardcoded_response(sockaddr: SocketAddr, response: String) -> Self { + let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let (tx, rx) = mpsc::channel::<()>(4); + tokio::spawn(server_backend(listener, rx, ServerMode::Response(response))); + + Self { local_addr, exit: tx } + } + + // Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured subscription ID and subscription response. + // + // NOTE: ignores the actual subscription and unsubscription method. + pub async fn with_hardcoded_subscription( + sockaddr: SocketAddr, + subscription_id: String, + subscription_response: String, + ) -> Self { + let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let (tx, rx) = mpsc::channel::<()>(4); + tokio::spawn(server_backend(listener, rx, ServerMode::Subscription { subscription_id, subscription_response })); + + Self { local_addr, exit: tx } + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + pub async fn close(&mut self) { + self.exit.send(()).await.unwrap(); + } +} + +async fn server_backend(listener: async_std::net::TcpListener, mut exit: Receiver<()>, mode: ServerMode) { + let mut connections = Vec::new(); + + loop { + let conn_fut = listener.accept().fuse(); + let exit_fut = exit.next(); + futures::pin_mut!(exit_fut, conn_fut); + + futures::select! { + _ = exit_fut => break, + conn = conn_fut => { + if let Ok((stream, _)) = conn { + let (tx, rx) = mpsc::channel::<()>(4); + let handle = tokio::spawn(connection_task(stream, mode.clone(), rx)); + connections.push((handle, tx)); + } + } + } + } + + // close connections + for (handle, mut exit) in connections { + // If the actual connection was never established i.e., returned early + // It will most likely be caught on the client-side but just to be explicit. + exit.send(()).await.expect("WebSocket connection was never established"); + handle.await.unwrap(); + } +} + +async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mut exit: Receiver<()>) { + let mut server = Server::new(socket); + + let websocket_key = match server.receive_request().await { + Ok(req) => req.into_key(), + Err(_) => return, + }; + + let accept = server.send_response(&Response::Accept { key: &websocket_key, protocol: None }).await; + + if accept.is_err() { + return; + } + + let (mut sender, receiver) = server.into_builder().finish(); + + let ws_stream = stream::unfold(receiver, move |mut receiver| async { + let mut buf = Vec::new(); + let ret = match receiver.receive_data(&mut buf).await { + Ok(_) => Ok(buf), + Err(err) => Err(err), + }; + Some((ret, receiver)) + }); + futures::pin_mut!(ws_stream); + + loop { + let next_ws = ws_stream.next().fuse(); + let next_exit = exit.next().fuse(); + let time_out = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + futures::pin_mut!(time_out, next_exit, next_ws); + + futures::select! { + _ = time_out => { + if let ServerMode::Subscription { subscription_response, .. } = &mode { + if let Err(e) = sender.send_text(&subscription_response).await { + log::warn!("send response to subscription: {:?}", e); + } + } + } + ws = next_ws => { + // Got a request on the connection but don't care about the contents. + // Just send out the pre-configured hardcoded responses. + if let Some(Ok(_)) = ws { + match &mode { + ServerMode::Response(r) => { + if let Err(e) = sender.send_text(&r).await { + log::warn!("send response to request error: {:?}", e); + } + } + ServerMode::Subscription { subscription_id, .. } => { + if let Err(e) = sender.send_text(&subscription_id).await { + log::warn!("send subscription id error: {:?}", e); + } + } + } + } + } + _ = next_exit => break, + } + } +}