Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/rpc: Change session identifier on reset, add unit tests #2872

Merged
merged 3 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/2872.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
client/rpc: Change session identifier on reset

Previously the EnclaveRPC client did not change the session identifier on
reset, resulting in unnecessary round-trips during a transport error. The
EnclaveRPC client now changes the session identifier whenever resetting the
session.
262 changes: 168 additions & 94 deletions client/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ use oasis_core_runtime::{
session::{Builder, Session},
types,
},
types::Body,
};

#[cfg(not(target_env = "sgx"))]
use super::api::{CallEnclaveRequest, EnclaveRPCClient};
use super::api::EnclaveRPCClient;
#[cfg(not(target_env = "sgx"))]
use super::transport::GrpcTransport;
use super::transport::{RuntimeTransport, Transport};
use crate::BoxFuture;

/// Internal send queue backlog.
const SENDQ_BACKLOG: usize = 10;

/// RPC client error.
#[derive(Debug, Fail)]
enum RpcClientError {
pub enum RpcClientError {
#[fail(display = "call failed: {}", 0)]
CallFailed(String),
#[fail(display = "expected response message, received: {:?}", 0)]
Expand All @@ -50,93 +52,40 @@ enum RpcClientError {
Dropped,
}

trait Transport: Send + Sync {
fn write_message(
&self,
ctx: Context,
session_id: types::SessionID,
data: Vec<u8>,
untrusted_plaintext: String,
) -> BoxFuture<Vec<u8>> {
// Frame message.
let frame = types::Frame {
session: session_id,
untrusted_plaintext: untrusted_plaintext,
payload: data,
};

self.write_message_impl(ctx, cbor::to_vec(&frame))
}

fn write_message_impl(&self, ctx: Context, data: Vec<u8>) -> BoxFuture<Vec<u8>>;
}

struct RuntimeTransport {
protocol: Arc<Protocol>,
endpoint: String,
}

impl Transport for RuntimeTransport {
fn write_message_impl(&self, ctx: Context, data: Vec<u8>) -> BoxFuture<Vec<u8>> {
// NOTE: This is not actually async in SGX, but futures should be
// dispatched on the current thread anyway.
let rsp = self.protocol.make_request(
ctx,
Body::HostRPCCallRequest {
endpoint: self.endpoint.clone(),
request: data,
},
);

let rsp = match rsp {
Ok(rsp) => rsp,
Err(error) => return Box::new(future::err(error)),
};

match rsp {
Body::HostRPCCallResponse { response } => Box::new(future::ok(response)),
_ => Box::new(future::err(RpcClientError::Transport.into())),
}
}
}

#[cfg(not(target_env = "sgx"))]
struct GrpcTransport {
grpc_client: EnclaveRPCClient,
runtime_id: RuntimeId,
endpoint: String,
}

#[cfg(not(target_env = "sgx"))]
impl Transport for GrpcTransport {
fn write_message_impl(&self, _ctx: Context, data: Vec<u8>) -> BoxFuture<Vec<u8>> {
let req = CallEnclaveRequest {
runtime_id: self.runtime_id,
endpoint: self.endpoint.clone(),
payload: data,
};

match self.grpc_client.call_enclave(&req, Default::default()) {
Ok(rsp) => Box::new(rsp.map(|r| r.into()).map_err(|error| error.into())),
Err(error) => Box::new(future::err(error.into())),
}
}
}

type SendqRequest = (
Arc<Context>,
types::Request,
oneshot::Sender<Fallible<types::Response>>,
usize,
);

struct Inner {
struct MultiplexedSession {
/// Session builder for resetting sessions.
builder: Builder,
/// Underlying protocol session.
session: Mutex<Session>,
/// Unique session identifier.
session_id: types::SessionID,
id: types::SessionID,
/// Current underlying protocol session.
inner: Session,
}

impl MultiplexedSession {
fn new(builder: Builder) -> Self {
Self {
builder: builder.clone(),
id: types::SessionID::random(),
inner: builder.build_initiator(),
}
}

fn reset(&mut self) {
self.id = types::SessionID::random();
self.inner = self.builder.clone().build_initiator();
}
}

struct Inner {
/// Multiplexed session.
session: Mutex<MultiplexedSession>,
/// Used transport.
transport: Box<dyn Transport>,
/// Internal send queue receiver, only available until the controller
Expand All @@ -161,9 +110,7 @@ impl RpcClient {

Self {
inner: Arc::new(Inner {
builder: builder.clone(),
session: Mutex::new(builder.build_initiator()),
session_id: types::SessionID::random(),
session: Mutex::new(MultiplexedSession::new(builder)),
transport,
recvq: Mutex::new(Some(rx)),
sendq: tx,
Expand Down Expand Up @@ -301,15 +248,17 @@ impl RpcClient {
fn connect(inner: Arc<Inner>, ctx: Context) -> BoxFuture<()> {
Box::new(future::lazy(move || -> BoxFuture<()> {
let mut session = inner.session.lock().unwrap();
if session.is_connected() {
if session.inner.is_connected() {
return Box::new(future::ok(()));
}

let mut buffer = vec![];
// Handshake1 -> Handshake2
session
.inner
.process_data(vec![], &mut buffer)
.expect("initiation must always succeed");
let session_id = session.id;
drop(session);

let fctx = ctx.freeze();
Expand All @@ -319,28 +268,28 @@ impl RpcClient {
Box::new(
inner
.transport
.write_message(ctx, inner.session_id, buffer, String::new())
.write_message(ctx, session_id, buffer, String::new())
.and_then(move |data| -> BoxFuture<()> {
let mut session = inner.session.lock().unwrap();
let mut buffer = vec![];
// Handshake2 -> Transport
if let Err(error) = session.process_data(data, &mut buffer) {
if let Err(error) = session.inner.process_data(data, &mut buffer) {
return Box::new(future::err(error));
}

let ctx = Context::create_child(&fctx);
Box::new(
inner
.transport
.write_message(ctx, inner.session_id, buffer, String::new())
.write_message(ctx, session.id, buffer, String::new())
.map(|_| ()),
)
})
.or_else(move |err| {
// Failed to establish a session, we must reset it as otherwise
// it will always fail.
let mut session = inner2.session.lock().unwrap();
*session = inner2.builder.clone().build_initiator();
session.reset();

Err(err)
}),
Expand All @@ -351,7 +300,10 @@ impl RpcClient {
fn close(inner: Arc<Inner>) -> BoxFuture<()> {
let mut session = inner.session.lock().unwrap();
let mut buffer = vec![];
if let Err(error) = session.write_message(types::Message::Close, &mut buffer) {
if let Err(error) = session
.inner
.write_message(types::Message::Close, &mut buffer)
{
return Box::new(future::err(error));
}

Expand All @@ -360,17 +312,18 @@ impl RpcClient {
Box::new(
inner
.transport
.write_message(ctx, inner.session_id, buffer, String::new())
.write_message(ctx, session.id, buffer, String::new())
.and_then(move |data| {
// Verify that session is closed.
let mut session = inner.session.lock().unwrap();
let msg = session
.inner
.process_data(data, vec![])?
.expect("message must be decoded if there is no error");

match msg {
types::Message::Close => {
session.close();
session.inner.close();
Ok(())
}
msg => Err(RpcClientError::ExpectedCloseMessage(msg).into()),
Expand All @@ -388,7 +341,7 @@ impl RpcClient {
let msg = types::Message::Request(request);
let mut session = inner.session.lock().unwrap();
let mut buffer = vec![];
if let Err(error) = session.write_message(msg, &mut buffer) {
if let Err(error) = session.inner.write_message(msg, &mut buffer) {
return Box::new(future::err(error));
}

Expand All @@ -397,10 +350,11 @@ impl RpcClient {
Box::new(
inner
.transport
.write_message(ctx, inner.session_id, buffer, method)
.write_message(ctx, session.id, buffer, method)
.and_then(move |data| {
let mut session = inner.session.lock().unwrap();
let msg = session
.inner
.process_data(data, vec![])?
.expect("message must be decoded if there is no error");

Expand All @@ -412,10 +366,130 @@ impl RpcClient {
.or_else(move |err| {
// Failed to communicate, we must reset it as otherwise it will always fail.
let mut session = inner2.session.lock().unwrap();
*session = inner2.builder.clone().build_initiator();
session.reset();

Err(err)
}),
)
}
}

#[cfg(test)]
mod test {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};

use failure::format_err;
use futures::future;
use io_context::Context;
use tokio::runtime::Runtime;

use oasis_core_runtime::{
rak::RAK,
rpc::{demux::Demux, session, types},
};

use super::{super::transport::Transport, RpcClient};
use crate::BoxFuture;

#[derive(Clone)]
struct MockTransport {
rak: Arc<RAK>,
demux: Arc<Mutex<Demux>>,
next_error: Arc<AtomicBool>,
}

impl MockTransport {
fn new() -> Self {
let rak = Arc::new(RAK::new());

Self {
rak: rak.clone(),
demux: Arc::new(Mutex::new(Demux::new(rak))),
next_error: Arc::new(AtomicBool::new(false)),
}
}

fn reset(&self) {
let mut demux = self.demux.lock().unwrap();
*demux = Demux::new(self.rak.clone());
}

fn induce_transport_error(&self) {
self.next_error.store(true, Ordering::SeqCst);
}
}

impl Transport for MockTransport {
fn write_message_impl(&self, _ctx: Context, data: Vec<u8>) -> BoxFuture<Vec<u8>> {
if self
.next_error
.compare_and_swap(true, false, Ordering::SeqCst)
{
return Box::new(future::err(format_err!("transport error")));
}

let mut demux = self.demux.lock().unwrap();

// Deliver directly to the multiplexer.
let mut buffer = Vec::new();
match demux.process_frame(data, &mut buffer) {
Err(err) => Box::new(future::err(err)),
Ok(Some((session_id, _session_info, message, _untrusted_plaintext))) => {
// Message, process and write reply.
let body = match message {
types::Message::Request(rq) => {
// Just echo back what was given.
types::Body::Success(rq.args)
}
_ => panic!("unhandled message type"),
};
let response = types::Message::Response(types::Response { body });

let mut buffer = Vec::new();
match demux.write_message(session_id, response, &mut buffer) {
Ok(_) => Box::new(future::ok(buffer)),
Err(error) => Box::new(future::err(error)),
}
}
Ok(None) => {
// Handshake.
Box::new(future::ok(buffer))
}
}
}
}

#[test]
fn test_rpc_client() {
let mut rt = Runtime::new().unwrap();
let transport = MockTransport::new();
let builder = session::Builder::new();
let client = RpcClient::new(Box::new(transport.clone()), builder);

// Basic call.
let result: u64 = rt
.block_on(client.call(Context::background(), "test", 42))
.unwrap();
assert_eq!(result, 42, "call should work");

// Reset all sessions on the server and make sure that we can still get a response.
transport.reset();

let result: u64 = rt
.block_on(client.call(Context::background(), "test", 43))
.unwrap();
assert_eq!(result, 43, "call should work");

// Induce a single transport error without resetting the server sessions and make sure we
// can still get a response.
transport.induce_transport_error();

let result: u64 = rt
.block_on(client.call(Context::background(), "test", 44))
.unwrap();
assert_eq!(result, 44, "call should work");
}
}
Loading