Skip to content

Commit

Permalink
gRPC request processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ipetr0v committed Mar 31, 2020
1 parent cad0d33 commit 588d326
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 74 deletions.
227 changes: 154 additions & 73 deletions oak/server/rust/oak_runtime/src/node/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,27 @@
// limitations under the License.
//

use log::{error, info};
use log::{error, info, warn};

use bytes::Buf;
use std::{
thread,
thread::{spawn, JoinHandle},
};
use futures_executor::block_on;
use protobuf::Message;
use protobuf::well_known_types::Any;

use oak_abi::{ChannelReadStatus, OakStatus};
use oak::grpc::{GrpcResponse, GrpcRequest};
use oak::io::error_from_nonok_status;

use crate::Handle;
use crate::{NodeId, RuntimeRef};

trait HttpStatusCode {
fn status_code(&self) -> http::StatusCode;
}

impl HttpStatusCode for ChannelReadStatus {
fn status_code(&self) -> http::StatusCode {
match self {
// TODO: Consider multiple different status codes.
ChannelReadStatus::NotReady => http::StatusCode::SERVICE_UNAVAILABLE,
ChannelReadStatus::ReadReady => http::StatusCode::OK,
ChannelReadStatus::InvalidChannel => http::StatusCode::INTERNAL_SERVER_ERROR,
ChannelReadStatus::Orphaned => http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

impl HttpStatusCode for OakStatus {
fn status_code(&self) -> http::StatusCode {
match self {
// TODO: Consider multiple different status codes.
OakStatus::Unspecified => http::StatusCode::INTERNAL_SERVER_ERROR,
OakStatus::Ok => http::StatusCode::OK,
_ => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrBadHandle => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrInvalidArgs => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrChannelClosed => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrBufferTooSmall => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrHandleSpaceTooSmall => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrOutOfRange => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrInternal => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrTerminated => http::StatusCode::INTERNAL_SERVER_ERROR,
// OakStatus::ErrChannelEmpty => http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

pub struct GrpcServerNode {
config_name: String,
runtime: RuntimeRef,
node_id: NodeId,
reader: Handle,
writer: Handle,
thread_handle: Option<JoinHandle<()>>,
}
Expand All @@ -79,14 +45,12 @@ impl GrpcServerNode {
config_name: &str,
runtime: RuntimeRef,
node_id: NodeId,
reader: Handle,
writer: Handle,
) -> Self {
Self {
config_name: config_name.to_string(),
runtime,
node_id,
reader,
writer,
thread_handle: None,
}
Expand All @@ -99,7 +63,6 @@ impl super::Node for GrpcServerNode {
config_name: self.config_name.clone(),
runtime: self.runtime.clone(),
node_id: self.node_id,
reader: self.reader,
writer: self.writer,
};
// TODO(#770): Use `std::thread::Builder` and give a name to this thread.
Expand Down Expand Up @@ -128,6 +91,7 @@ impl super::Node for GrpcServerNode {
self.thread_handle = Some(thread_handle);
Ok(())
}

fn stop(&mut self) {
if let Some(join_handle) = self.thread_handle.take() {
if let Err(err) = join_handle.join() {
Expand All @@ -142,7 +106,6 @@ struct GrpcService {
config_name: String,
runtime: RuntimeRef,
node_id: NodeId,
reader: Handle,
writer: Handle,
}

Expand All @@ -151,45 +114,163 @@ impl GrpcService {
&self,
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, hyper::Error> {
let body = hyper::body::aggregate(req)
.await
.expect("Couldn't get request body");

let client_message = crate::message::Message {
// TODO: Is it copying the data?
data: body.bytes().to_vec(),
channels: vec![],
// Parse HTTP header.
let grpc_method = req.uri().path();

// Aggregate the data buffers from an HTTP body asynchronously.
let http_body = match hyper::body::aggregate(req).await {
Ok(http_body) => http_body,
Err(error) => {
warn!("Couldn't aggregate request body: {}", error);
return Err(error);
}
};

// Create gRPC request from HTTP body.
let request = match Self::decode_grpc_request(grpc_method, &http_body) {
Ok(request) => request,
Err(error) => {
error!("Couldn't decode gRPC request: {}", error);
return Ok(Self::http_response(
http::StatusCode::BAD_REQUEST,
vec![], // TODO: Consider explaining this error to the user in a response.
));
}
};

// Send request to the Oak node
let response_reader = match self.process_request(request) {
Ok(response_reader) => response_reader,
Err(error) => {
error!("Couldn't process gRPC request: {}", error);
return Ok(Self::http_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
vec![],
));
}
};

// Read the response fomr the Oak node.
let response_body = match self.process_response(response_reader) {
Ok(body) => body,
Err(error) => {
// TODO: error? can module return an Err based on the client request?
error!("Couldn't process gRPC response: {}", error);
return Ok(Self::http_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
vec![],
));
}
};

// Send response back to the gRPC client.
Ok(Self::http_response(http::StatusCode::OK, response_body))
}

fn decode_grpc_request(grpc_method: &str, http_body: &dyn hyper::body::Buf) -> Result<Box<GrpcRequest>, String> {
// Parse HTTP body as into a Protobuf message.
match protobuf::parse_from_bytes::<Any>(http_body.bytes()) {
Ok(protobuf_body) => {
// Create a gRPC request.
match oak::grpc::encap_request(&protobuf_body, None, grpc_method) {
Some(grpc_request) => Ok(Box::new(grpc_request)),
None => Err("Failed to parse Protobuf message".to_string()),
}
},
Err(protobuf_error) => {
let error = format!("Failed to build GrpcRequest {}", protobuf_error);
Err(error)
},
}
}

// Creates an HTTP response message.
fn http_response(status: http::StatusCode, body: Vec<u8>) -> hyper::Response<hyper::Body> {
let mut response = hyper::Response::new(hyper::Body::from(body));
*response.status_mut() = status;
response
}

/// Processes a gRPC request from the client and forwards it to the Oak node.
/// Returns a channel handle for reading the response from the Oak node or an error.
fn process_request(&self, request: Box<GrpcRequest>) -> Result<Handle, String> {
// Create a pair of temporary channels to pass the request to the Oak node and to receive the response.
let (request_writer, request_reader) = self.runtime.new_channel(
self.node_id,
&oak_abi::label::Label::public_trusted()
);
let (response_writer, response_reader) = self.runtime.new_channel(
self.node_id,
&oak_abi::label::Label::public_trusted()
);

// Create a notification message and attach the method-invocation specific channels to it.
let notification = crate::Message {
data: vec![],
channels: vec![request_reader, response_writer],
};

// Serialize gRPC request into a message.
let message = crate::Message { data: vec![], channels: vec![] };
if let Err(protobuf_error) = request.write_to_writer(&mut message.data) {
let error = format!("Failed to serialize GrpcRequest message: {}", protobuf_error);
return Err(error);
}

// Send a message to the temporary channel that will be read by the Oak node.
if let Err(oak_status) = self.runtime.channel_write(self.node_id, request_writer, message) {
let error = format!(
"Could not write message to the terporary gRPC server channel: {}",
error_from_nonok_status(oak_status)
);
return Err(error);
}

// Send data to the Runtime.
self.runtime
.channel_write(self.node_id, self.writer, client_message)
.expect("could not write message");
// Send a notification message (with attached handles) to the Oak node.
if let Err(oak_status) = self.runtime.channel_write(self.node_id, self.writer, notification) {
let error = format!(
"Could not write gRPC server notification message: {}",
error_from_nonok_status(oak_status)
);
return Err(error);
}

// Read the response.
let (status, body) = match self.runtime.wait_on_channels(self.node_id, &[Some(self.reader)]) {
Ok(response_reader)
}

/// Processes a gRPC response from the Oak node.
/// Returns a HTTP status and the response body or an error.
fn process_response(&self, response_reader: Handle) -> Result<Vec<u8>, String> {
match self.runtime.wait_on_channels(self.node_id, &[Some(response_reader)]) {
Ok(status) => match status[0] {
ChannelReadStatus::ReadReady => match self.runtime.channel_read(self.node_id, self.reader) {
ChannelReadStatus::ReadReady => match self.runtime.channel_read(self.node_id, response_reader) {
Ok(message) => match message {
Some(message) => (http::StatusCode::OK, message.data),
None => (http::StatusCode::OK, vec![]),
Some(message) => Ok(message.data),
None => Ok(vec![]), // Empty response body.
},
Err(oak_status) => {
error!("{}", oak::io::error_from_nonok_status(oak_status));
(oak_status.status_code(), vec![])
}
let error = format!(
"Could not read temporary gRPC channel: {}",
oak::io::error_from_nonok_status(oak_status)
);
Err(error)
},
},
ref read_status => {
let error = format!(
"Could not read channel: {:?}",
read_status
);
Err(error)
},
ref read_status => (read_status.status_code(), vec![]),
},
Err(oak_status) => {
error!("{}", oak::io::error_from_nonok_status(oak_status));
(oak_status.status_code(), vec![])
}
};

let mut response = hyper::Response::new(hyper::Body::from(body));
*response.status_mut() = status;
// TODO: consider returning an Error.
Ok(response)
let error = format!(
"Could not wait on the temporary gRPC channels: {}",
oak::io::error_from_nonok_status(oak_status)
);
Err(error)
},
}
}
}
1 change: 0 additions & 1 deletion oak/server/rust/oak_runtime/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl Configuration {
config_name,
runtime,
node_id,
reader,
writer,
)),
Configuration::WasmNode { module } => Box::new(wasm::WasmNode::new(
Expand Down

0 comments on commit 588d326

Please sign in to comment.