From a6e344d26c9fe29edadfd6d3e0829f23bb49189d Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 28 Apr 2020 09:38:55 +0200 Subject: [PATCH] client/rpc: Move Transport to its own module --- client/src/rpc/client.rs | 81 ++------------------------------ client/src/rpc/mod.rs | 1 + client/src/rpc/transport.rs | 92 +++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 76 deletions(-) create mode 100644 client/src/rpc/transport.rs diff --git a/client/src/rpc/client.rs b/client/src/rpc/client.rs index 4acb7809e3c..862e650d5cd 100644 --- a/client/src/rpc/client.rs +++ b/client/src/rpc/client.rs @@ -25,11 +25,13 @@ use oasis_core_runtime::{ session::{Builder, Session}, types, }, - types::Body, }; #[cfg(not(target_env = "sgx"))] -use super::api::{CallEnclaveRequest, EnclaveRPCClient}; +use super::api::EnclaveRPCClient; +#[cfg(not(target_env = "sgx"))] +use super::transport::GrpcTransport; +use super::transport::{RuntimeTransport, Transport}; use crate::BoxFuture; /// Internal send queue backlog. @@ -37,7 +39,7 @@ const SENDQ_BACKLOG: usize = 10; /// RPC client error. #[derive(Debug, Fail)] -enum RpcClientError { +pub enum RpcClientError { #[fail(display = "call failed: {}", 0)] CallFailed(String), #[fail(display = "expected response message, received: {:?}", 0)] @@ -50,79 +52,6 @@ enum RpcClientError { Dropped, } -trait Transport: Send + Sync { - fn write_message( - &self, - ctx: Context, - session_id: types::SessionID, - data: Vec, - untrusted_plaintext: String, - ) -> BoxFuture> { - // Frame message. - let frame = types::Frame { - session: session_id, - untrusted_plaintext: untrusted_plaintext, - payload: data, - }; - - self.write_message_impl(ctx, cbor::to_vec(&frame)) - } - - fn write_message_impl(&self, ctx: Context, data: Vec) -> BoxFuture>; -} - -struct RuntimeTransport { - protocol: Arc, - endpoint: String, -} - -impl Transport for RuntimeTransport { - fn write_message_impl(&self, ctx: Context, data: Vec) -> BoxFuture> { - // NOTE: This is not actually async in SGX, but futures should be - // dispatched on the current thread anyway. - let rsp = self.protocol.make_request( - ctx, - Body::HostRPCCallRequest { - endpoint: self.endpoint.clone(), - request: data, - }, - ); - - let rsp = match rsp { - Ok(rsp) => rsp, - Err(error) => return Box::new(future::err(error)), - }; - - match rsp { - Body::HostRPCCallResponse { response } => Box::new(future::ok(response)), - _ => Box::new(future::err(RpcClientError::Transport.into())), - } - } -} - -#[cfg(not(target_env = "sgx"))] -struct GrpcTransport { - grpc_client: EnclaveRPCClient, - runtime_id: RuntimeId, - endpoint: String, -} - -#[cfg(not(target_env = "sgx"))] -impl Transport for GrpcTransport { - fn write_message_impl(&self, _ctx: Context, data: Vec) -> BoxFuture> { - let req = CallEnclaveRequest { - runtime_id: self.runtime_id, - endpoint: self.endpoint.clone(), - payload: data, - }; - - match self.grpc_client.call_enclave(&req, Default::default()) { - Ok(rsp) => Box::new(rsp.map(|r| r.into()).map_err(|error| error.into())), - Err(error) => Box::new(future::err(error.into())), - } - } -} - type SendqRequest = ( Arc, types::Request, diff --git a/client/src/rpc/mod.rs b/client/src/rpc/mod.rs index 14032053ad1..8ac72c7cde0 100644 --- a/client/src/rpc/mod.rs +++ b/client/src/rpc/mod.rs @@ -4,6 +4,7 @@ mod api; pub mod client; pub mod macros; +mod transport; // Re-exports. pub use self::client::RpcClient; diff --git a/client/src/rpc/transport.rs b/client/src/rpc/transport.rs new file mode 100644 index 00000000000..021be90a8e4 --- /dev/null +++ b/client/src/rpc/transport.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use futures::future; +#[cfg(not(target_env = "sgx"))] +use futures::Future; +use io_context::Context; + +#[cfg(not(target_env = "sgx"))] +use oasis_core_runtime::common::runtime::RuntimeId; +use oasis_core_runtime::{common::cbor, protocol::Protocol, rpc::types, types::Body}; + +#[cfg(not(target_env = "sgx"))] +use super::api::{CallEnclaveRequest, EnclaveRPCClient}; +use super::client::RpcClientError; +use crate::BoxFuture; + +/// An EnclaveRPC transport. +pub trait Transport: Send + Sync { + fn write_message( + &self, + ctx: Context, + session_id: types::SessionID, + data: Vec, + untrusted_plaintext: String, + ) -> BoxFuture> { + // Frame message. + let frame = types::Frame { + session: session_id, + untrusted_plaintext: untrusted_plaintext, + payload: data, + }; + + self.write_message_impl(ctx, cbor::to_vec(&frame)) + } + + fn write_message_impl(&self, ctx: Context, data: Vec) -> BoxFuture>; +} + +/// A transport implementation which can be used from inside the runtime and uses the Runtime Host +/// Protocol to transport EnclaveRPC frames. +pub struct RuntimeTransport { + pub protocol: Arc, + pub endpoint: String, +} + +impl Transport for RuntimeTransport { + fn write_message_impl(&self, ctx: Context, data: Vec) -> BoxFuture> { + // NOTE: This is not actually async in SGX, but futures should be + // dispatched on the current thread anyway. + let rsp = self.protocol.make_request( + ctx, + Body::HostRPCCallRequest { + endpoint: self.endpoint.clone(), + request: data, + }, + ); + + let rsp = match rsp { + Ok(rsp) => rsp, + Err(error) => return Box::new(future::err(error)), + }; + + match rsp { + Body::HostRPCCallResponse { response } => Box::new(future::ok(response)), + _ => Box::new(future::err(RpcClientError::Transport.into())), + } + } +} + +/// A transport implementation which uses gRPC to transport EnclaveRPC frames. +#[cfg(not(target_env = "sgx"))] +pub struct GrpcTransport { + pub grpc_client: EnclaveRPCClient, + pub runtime_id: RuntimeId, + pub endpoint: String, +} + +#[cfg(not(target_env = "sgx"))] +impl Transport for GrpcTransport { + fn write_message_impl(&self, _ctx: Context, data: Vec) -> BoxFuture> { + let req = CallEnclaveRequest { + runtime_id: self.runtime_id, + endpoint: self.endpoint.clone(), + payload: data, + }; + + match self.grpc_client.call_enclave(&req, Default::default()) { + Ok(rsp) => Box::new(rsp.map(|r| r.into()).map_err(|error| error.into())), + Err(error) => Box::new(future::err(error.into())), + } + } +}