diff --git a/client/src/rpc/client.rs b/client/src/rpc/client.rs index 862e650d5cd..7190c8ee579 100644 --- a/client/src/rpc/client.rs +++ b/client/src/rpc/client.rs @@ -373,3 +373,123 @@ impl RpcClient { ) } } + +#[cfg(test)] +mod test { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }; + + use failure::format_err; + use futures::future; + use io_context::Context; + use tokio::runtime::Runtime; + + use oasis_core_runtime::{ + rak::RAK, + rpc::{demux::Demux, session, types}, + }; + + use super::{super::transport::Transport, RpcClient}; + use crate::BoxFuture; + + #[derive(Clone)] + struct MockTransport { + rak: Arc, + demux: Arc>, + next_error: Arc, + } + + impl MockTransport { + fn new() -> Self { + let rak = Arc::new(RAK::new()); + + Self { + rak: rak.clone(), + demux: Arc::new(Mutex::new(Demux::new(rak))), + next_error: Arc::new(AtomicBool::new(false)), + } + } + + fn reset(&self) { + let mut demux = self.demux.lock().unwrap(); + *demux = Demux::new(self.rak.clone()); + } + + fn induce_transport_error(&self) { + self.next_error.store(true, Ordering::SeqCst); + } + } + + impl Transport for MockTransport { + fn write_message_impl(&self, _ctx: Context, data: Vec) -> BoxFuture> { + if self + .next_error + .compare_and_swap(true, false, Ordering::SeqCst) + { + return Box::new(future::err(format_err!("transport error"))); + } + + let mut demux = self.demux.lock().unwrap(); + + // Deliver directly to the multiplexer. + let mut buffer = Vec::new(); + match demux.process_frame(data, &mut buffer) { + Err(err) => Box::new(future::err(err)), + Ok(Some((session_id, _session_info, message, _untrusted_plaintext))) => { + // Message, process and write reply. + let body = match message { + types::Message::Request(rq) => { + // Just echo back what was given. + types::Body::Success(rq.args) + } + _ => panic!("unhandled message type"), + }; + let response = types::Message::Response(types::Response { body }); + + let mut buffer = Vec::new(); + match demux.write_message(session_id, response, &mut buffer) { + Ok(_) => Box::new(future::ok(buffer)), + Err(error) => Box::new(future::err(error)), + } + } + Ok(None) => { + // Handshake. + Box::new(future::ok(buffer)) + } + } + } + } + + #[test] + fn test_rpc_client() { + let mut rt = Runtime::new().unwrap(); + let transport = MockTransport::new(); + let builder = session::Builder::new(); + let client = RpcClient::new(Box::new(transport.clone()), builder); + + // Basic call. + let result: u64 = rt + .block_on(client.call(Context::background(), "test", 42)) + .unwrap(); + assert_eq!(result, 42, "call should work"); + + // Reset all sessions on the server and make sure that we can still get a response. + transport.reset(); + + let result: u64 = rt + .block_on(client.call(Context::background(), "test", 43)) + .unwrap(); + assert_eq!(result, 43, "call should work"); + + // Induce a single transport error without resetting the server sessions and make sure we + // can still get a response. + transport.induce_transport_error(); + + let result: u64 = rt + .block_on(client.call(Context::background(), "test", 44)) + .unwrap(); + assert_eq!(result, 44, "call should work"); + } +} diff --git a/runtime/src/rak.rs b/runtime/src/rak.rs index ac9fca555c0..185205a3604 100644 --- a/runtime/src/rak.rs +++ b/runtime/src/rak.rs @@ -73,7 +73,7 @@ pub struct RAK { impl RAK { /// Create an uninitialized runtime attestation key instance. - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { inner: RwLock::new(Inner { private_key: None,