diff --git a/Cargo.lock b/Cargo.lock index c3be43600a1..ece39ac9b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,14 +1173,19 @@ name = "oak_runtime" version = "0.1.0" dependencies = [ "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "oak 0.1.0", "oak_abi 0.1.0", "oak_utils 0.1.0", "prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "prost-build 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 2.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "wasmi 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "wat 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/oak/proto/application.proto b/oak/proto/application.proto index 53e3ee8b1c0..f63fd9c3446 100644 --- a/oak/proto/application.proto +++ b/oak/proto/application.proto @@ -47,7 +47,8 @@ message NodeConfiguration { WebAssemblyConfiguration wasm_config = 2; LogConfiguration log_config = 3; StorageProxyConfiguration storage_config = 4; - GrpcClientConfiguration grpc_client_config = 5; + GrpcServerConfiguration grpc_server_config = 5; + GrpcClientConfiguration grpc_client_config = 6; } } @@ -73,10 +74,20 @@ message StorageProxyConfiguration { string address = 1; } +// GrpcServerConfiguration describes the configuration of a gRPC server +// pseudo-Node (which is provided by the Oak Runtime), that processes gRPC +// requests from external (non-Oak) clients. +message GrpcServerConfiguration { + // The endpoint address for the gRPC server to listen on. + // `address` is represented as an "ip_address:tcp_port" string. + string address = 1; +} + // GrpcClientConfiguration describes the configuration of a gRPC client // pseudo-Node (which is provided by the Oak Runtime), connected to a specific // external (non-Oak) gRPC service. message GrpcClientConfiguration { // The endpoint address of the external gRPC service. + // `address` is represented as an "ip_address:tcp_port" string. string address = 1; } diff --git a/oak/server/rust/oak_runtime/Cargo.toml b/oak/server/rust/oak_runtime/Cargo.toml index 6d8e66c8b0f..b20c5257b2e 100644 --- a/oak/server/rust/oak_runtime/Cargo.toml +++ b/oak/server/rust/oak_runtime/Cargo.toml @@ -14,11 +14,16 @@ default = [] [dependencies] byteorder = { version = "*", default-features = false } +http = "*" +hyper = "*" itertools = "*" log = { version = "*" } +oak = "0.1.0" oak_abi = "=0.1.0" prost = "*" +protobuf = "*" rand = { version = "*", default-features = false, features = ["alloc"] } +tokio = { version = "*", features = ["rt-core"] } wasmi = { version = "*", default-features = false, features = ["core"] } [dev-dependencies] diff --git a/oak/server/rust/oak_runtime/src/config.rs b/oak/server/rust/oak_runtime/src/config.rs index 403dd10bae4..32e35dbb890 100644 --- a/oak/server/rust/oak_runtime/src/config.rs +++ b/oak/server/rust/oak_runtime/src/config.rs @@ -15,11 +15,12 @@ // use crate::proto::{ - node_configuration::ConfigType, ApplicationConfiguration, LogConfiguration, NodeConfiguration, - WebAssemblyConfiguration, + node_configuration::ConfigType, ApplicationConfiguration, GrpcServerConfiguration, + LogConfiguration, NodeConfiguration, WebAssemblyConfiguration, }; use itertools::Itertools; use std::collections::HashMap; +use std::net::AddrParseError; use std::sync::Arc; use log::error; @@ -27,7 +28,7 @@ use log::error; use oak_abi::OakStatus; use crate::node; -use crate::node::load_wasm; +use crate::node::{check_port, load_wasm}; use crate::runtime; use crate::runtime::{Handle, Runtime}; @@ -90,6 +91,15 @@ pub fn from_protobuf( return Err(OakStatus::ErrInvalidArgs); } Some(ConfigType::LogConfig(_)) => node::Configuration::LogNode, + Some(ConfigType::GrpcServerConfig(GrpcServerConfiguration { address })) => address + .parse() + .map_err(|error: AddrParseError| error.into()) + .and_then(|address| check_port(&address).map(|_| address)) + .map(|address| node::Configuration::GrpcServerNode { address }) + .map_err(|error| { + error!("Incorrect gRPC server address: {:?}", error); + OakStatus::ErrInvalidArgs + })?, Some(ConfigType::WasmConfig(WebAssemblyConfiguration { module_bytes, .. })) => { load_wasm(&module_bytes).map_err(|e| { error!("Error loading Wasm module: {}", e); diff --git a/oak/server/rust/oak_runtime/src/node/grpc_server.rs b/oak/server/rust/oak_runtime/src/node/grpc_server.rs new file mode 100644 index 00000000000..52aafdca06b --- /dev/null +++ b/oak/server/rust/oak_runtime/src/node/grpc_server.rs @@ -0,0 +1,349 @@ +// +// Copyright 2020 The Project Oak Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use log::{error, info, warn}; +use protobuf::{well_known_types::Any, Message}; +use std::{ + fmt::{self, Display, Formatter}, + net::SocketAddr, + thread::{self, JoinHandle}, +}; + +use oak::grpc::{encap_request, GrpcRequest}; +use oak_abi::{label::Label, ChannelReadStatus, OakStatus}; + +use crate::{pretty_name_for_thread, runtime::RuntimeProxy, Handle}; + +/// Struct that represents a gRPC server pseudo-node. +/// +/// For each gRPC request from a client, gRPC server pseudo-node creates a pair of temporary +/// channels (to write a request to and to read a response from) and passes corresponding handles to +/// the [`GrpcServerNode::channel_writer`]. +pub struct GrpcServerNode { + /// Pseudo-node name that corresponds to an entry from the + /// [`oak_runtime::proto::ApplicationConfiguration`]. + config_name: String, + /// Reference to a Runtime that corresponds to a node that created a gRPC server pseudo-node. + runtime: RuntimeProxy, + /// Server address to listen client requests on. + address: SocketAddr, + /// Channel handle used for reading a [`GrpcServerNode::channel_writer`] once the gRPC server + /// pseudo-node has started. + initial_reader: Handle, + /// Channel handle used for writing invocations. + channel_writer: Option, + /// Thread handle that corresponds to a thread running a gRPC server pseudo-node. + thread_handle: Option>, +} + +#[derive(Debug)] +enum GrpcServerError { + BadProtobufMessage, + RequestProcessingError, + ResponseProcessingError, +} + +impl Into for GrpcServerError { + fn into(self) -> http::StatusCode { + match self { + Self::BadProtobufMessage => http::StatusCode::BAD_REQUEST, + Self::RequestProcessingError => http::StatusCode::INTERNAL_SERVER_ERROR, + Self::ResponseProcessingError => http::StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +/// Clone implementation without `thread_handle` copying to pass the node to other threads. +impl Clone for GrpcServerNode { + fn clone(&self) -> Self { + Self { + config_name: self.config_name.to_string(), + runtime: self.runtime.clone(), + address: self.address, + initial_reader: self.initial_reader, + channel_writer: self.channel_writer, + thread_handle: None, + } + } +} + +impl Display for GrpcServerNode { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + write!(f, "GrpcServerNode({})", self.config_name) + } +} + +impl GrpcServerNode { + /// Creates a new [`GrpcServerNode`] instance, but does not start it. + /// + /// `channel_writer` and `thread_handle` are initialized with `None`, because they will receive + /// their values after the gRPC server pseudo-node has started and a separate thread was + /// initialized. + pub fn new( + config_name: &str, + runtime: RuntimeProxy, + address: SocketAddr, + initial_reader: Handle, + ) -> Self { + Self { + config_name: config_name.to_string(), + runtime, + address, + initial_reader, + channel_writer: None, + thread_handle: None, + } + } + + /// Reads a [`Handle`] from a channel specified by [`GrpcServerNode::initial_reader`]. + /// Returns an error if couldn't read from the channel or if received a wrong number of handles + /// (not equal to 1). + fn init_channel_writer(&self) -> Result { + let read_status = self + .runtime + .wait_on_channels(&[Some(self.initial_reader)]) + .map_err(|error| { + error!("Couldn't wait on the initial reader handle: {:?}", error); + OakStatus::ErrInternal + })?; + + if read_status[0] == ChannelReadStatus::ReadReady { + self.runtime + .channel_read(self.initial_reader) + .map_err(|error| { + error!("Couldn't read from the initial reader handle {:?}", error); + OakStatus::ErrInternal + }) + .and_then(|message| { + message + .ok_or_else(|| { + error!("Empty message"); + OakStatus::ErrInternal + }) + .and_then(|m| { + if m.channels.len() == 1 { + Ok(m.channels[0]) + } else { + error!( + "gRPC server pseudo-node should receive a single `writer` handle, found {}", + m.channels.len() + ); + Err(OakStatus::ErrInternal) + } + }) + }) + } else { + error!("Couldn't read channel: {:?}", read_status[0]); + Err(OakStatus::ErrInternal) + } + } + + /// Processes an HTTP request from a client and sends an HTTP response back. + async fn serve( + &self, + http_request: hyper::Request, + ) -> Result, hyper::Error> { + // Parse HTTP header. + let http_request_path = http_request.uri().path().to_string(); + + // Aggregate the data buffers from an HTTP body asynchronously. + let http_request_body = hyper::body::aggregate(http_request) + .await + .map_err(|error| { + warn!("Couldn't aggregate request body: {}", error); + error + })?; + + // Create a gRPC request from an HTTP body. + Self::decode_grpc_request(&http_request_path, &http_request_body) + // Process a gRPC request and send it into the Runtime. + .and_then(|request| self.process_request(request)) + // Read a gRPC response from the Runtime. + .and_then(|response_reader| self.process_response(response_reader)) + // Send gRPC response back to the HTTP client. + .map(|body| Self::http_response(http::StatusCode::OK, body)) + // Convert an error to an HTTP response with a corresponding error status. + .or_else(|error| Ok(Self::http_response(error.into(), vec![]))) + } + + /// Creates a [`GrpcRequest`] instance from a `http_request_path` and an `http_request_body`. + fn decode_grpc_request( + http_request_path: &str, + http_request_body: &dyn hyper::body::Buf, + ) -> Result { + // Parse an HTTP request body as a [`protobuf::well_known_types::Any`] message. + let grpc_request_body = protobuf::parse_from_bytes::(http_request_body.bytes()) + .map_err(|error| { + error!("Failed to parse Protobuf message {}", error); + GrpcServerError::BadProtobufMessage + })?; + + // Create a gRPC request. + encap_request(&grpc_request_body, None, http_request_path).ok_or_else(|| { + error!("Failed to create a GrpcRequest"); + GrpcServerError::BadProtobufMessage + }) + } + + /// Creates an HTTP response message. + fn http_response(status: http::StatusCode, body: Vec) -> hyper::Response { + let mut response = hyper::Response::new(hyper::Body::from(body)); + *response.status_mut() = status; + response + } + + /// Processes a gRPC request, forwards it to a temporary channel and sends handles for this + /// channel to the [`GrpcServerNode::channel_writer`]. + /// Returns a [`Handle`] for reading a gRPC response from. + fn process_request(&self, request: GrpcRequest) -> Result { + // Create a pair of temporary channels to pass the gRPC request and to receive the response. + let (request_writer, request_reader) = + self.runtime.channel_create(&Label::public_trusted()); + let (response_writer, response_reader) = + self.runtime.channel_create(&Label::public_trusted()); + + // Create an invocation message and attach the method-invocation specific channels to it. + // + // This message should be in sync with the [`oak::grpc::Invocation`] from the Oak SDK: + // the order of the `request_reader` and `response_writer` must be consistent. + let invocation = crate::Message { + data: vec![], + channels: vec![request_reader, response_writer], + }; + + // Serialize gRPC request into a message. + let mut message = crate::Message { + data: vec![], + channels: vec![], + }; + request + .write_to_writer(&mut message.data) + .map_err(|error| { + error!("Couldn't serialize a GrpcRequest message: {}", error); + GrpcServerError::RequestProcessingError + })?; + + // Send a message to the temporary channel. + self.runtime + .channel_write(request_writer, message) + .map_err(|error| { + error!( + "Couldn't write a message to the temporary channel: {:?}", + error + ); + GrpcServerError::RequestProcessingError + })?; + + // Send an invocation message (with attached handles) to the Oak node. + self.runtime + .channel_write( + self.channel_writer.expect("Node writer wasn't initialized"), + invocation, + ) + .map_err(|error| { + error!("Couldn't write a gRPC invocation message: {:?}", error); + GrpcServerError::RequestProcessingError + })?; + + Ok(response_reader) + } + + /// Processes a gRPC response from a channel represented by `response_reader` and returns an + /// HTTP response body. + fn process_response(&self, response_reader: Handle) -> Result, GrpcServerError> { + let read_status = self + .runtime + .wait_on_channels(&[Some(response_reader)]) + .map_err(|error| { + error!("Couldn't wait on the temporary gRPC channel: {:?}", error); + GrpcServerError::ResponseProcessingError + })?; + + if read_status[0] == ChannelReadStatus::ReadReady { + self.runtime + .channel_read(response_reader) + .map_err(|error| { + error!("Couldn't read from a temporary gRPC channel: {:?}", error); + GrpcServerError::ResponseProcessingError + }) + .map(|message| { + // Return an empty HTTP body if the `message` is None. + message.map_or(vec![], |m| m.data) + }) + } else { + error!( + "Couldn't read from a temporary gRPC channel: {:?}", + read_status[0] + ); + Err(GrpcServerError::ResponseProcessingError) + } + } +} + +/// Oak Node implementation for the gRPC server. +impl super::Node for GrpcServerNode { + fn start(&mut self) -> Result<(), OakStatus> { + let server = self.clone(); + let thread_handle = thread::Builder::new() + .name(self.to_string()) + .spawn(move || { + let pretty_name = pretty_name_for_thread(&thread::current()); + + // Receive a `writer` handle used to pass handles for temporary channels. + server + .init_channel_writer() + .expect("Couldn't initialialize node writer"); + + // Initialize a function that creates a separate instance on the `server` for each + // HTTP request. + // + // TODO(#813): Remove multiple `clone` calls by either introducing `Arc>` + // or not using Hyper (move to more simple single-threaded server). + let generator_server = server.clone(); + let service = hyper::service::make_service_fn(move |_| { + let connection_server = generator_server.clone(); + async move { + Ok::<_, hyper::Error>(hyper::service::service_fn(move |req| { + let request_server = connection_server.clone(); + async move { request_server.serve(req).await } + })) + } + }); + + // Start the HTTP server. + let mut tokio_runtime = + tokio::runtime::Runtime::new().expect("Couldn't create Tokio runtime"); + let result = + tokio_runtime.block_on(hyper::Server::bind(&server.address).serve(service)); + info!( + "{} LOG: exiting gRPC server node thread {:?}", + pretty_name, result + ); + server.runtime.exit_node(); + }) + .expect("Failed to spawn thread"); + self.thread_handle = Some(thread_handle); + Ok(()) + } + + fn stop(&mut self) { + if let Some(join_handle) = self.thread_handle.take() { + if let Err(err) = join_handle.join() { + error!("Error while stopping gRPC server node: {:?}", err); + } + } + } +} diff --git a/oak/server/rust/oak_runtime/src/node/mod.rs b/oak/server/rust/oak_runtime/src/node/mod.rs index 14c16017956..7f80a2b2111 100644 --- a/oak/server/rust/oak_runtime/src/node/mod.rs +++ b/oak/server/rust/oak_runtime/src/node/mod.rs @@ -14,6 +14,7 @@ // limitations under the License. // +use std::net::{AddrParseError, SocketAddr}; use std::string::String; use std::sync::Arc; @@ -22,6 +23,7 @@ use oak_abi::OakStatus; use crate::runtime::RuntimeProxy; use crate::Handle; +mod grpc_server; mod logger; mod wasm; @@ -52,6 +54,9 @@ pub enum Configuration { /// The configuration for a logging pseudo node. LogNode, + /// The configuration for a gRPC server pseudo node that contains an `address` to listen on. + GrpcServerNode { address: SocketAddr }, + /// The configuration for a Wasm node. // It would be better to store a list of exported methods and copyable Wasm interpreter // instance, but wasmi doesn't allow this. We make do with having a copyable @@ -61,13 +66,26 @@ pub enum Configuration { } /// A enumeration for errors occuring when building `Configuration` from protobuf types. +#[derive(Debug)] pub enum ConfigurationError { + AddressParsingError(AddrParseError), + IncorrectPort, WasmiModuleInializationError(wasmi::Error), } +impl From for ConfigurationError { + fn from(error: AddrParseError) -> Self { + ConfigurationError::AddressParsingError(error) + } +} + impl std::fmt::Display for ConfigurationError { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { + ConfigurationError::AddressParsingError(e) => { + write!(f, "Failed to parse an address: {}", e) + } + ConfigurationError::IncorrectPort => write!(f, "Incorrect port (must be > 1023)"), ConfigurationError::WasmiModuleInializationError(e) => { write!(f, "Failed to initialize wasmi::Module: {}", e) } @@ -85,6 +103,15 @@ pub fn load_wasm(wasm_bytes: &[u8]) -> Result }) } +/// Checks if port is greater than 1023. +pub fn check_port(address: &SocketAddr) -> Result<(), ConfigurationError> { + if address.port() > 1023 { + Ok(()) + } else { + Err(ConfigurationError::IncorrectPort) + } +} + impl Configuration { /// Creates a new node instance corresponding to the [`Configuration`] `self`. /// @@ -101,6 +128,9 @@ impl Configuration { Configuration::LogNode => { Box::new(logger::LogNode::new(config_name, runtime, initial_reader)) } + Configuration::GrpcServerNode { address } => Box::new( + grpc_server::GrpcServerNode::new(config_name, runtime, *address, initial_reader), + ), Configuration::WasmNode { module } => Box::new(wasm::WasmNode::new( config_name, runtime, diff --git a/sdk/rust/oak/src/grpc/mod.rs b/sdk/rust/oak/src/grpc/mod.rs index 00a43726469..4d2d59d4134 100644 --- a/sdk/rust/oak/src/grpc/mod.rs +++ b/sdk/rust/oak/src/grpc/mod.rs @@ -17,7 +17,7 @@ //! Functionality to help Oak Nodes interact with gRPC. pub use crate::proto::code::Code; -use crate::{proto, OakError}; +use crate::{proto, OakError, OakStatus}; use log::{error, warn}; pub use proto::grpc_encap::{GrpcRequest, GrpcResponse}; use protobuf::ProtobufEnum; @@ -350,3 +350,29 @@ where vec![protobuf::parse_from_bytes(&req).expect("Failed to parse request protobuf message")]; node_fn(rr, writer) } + +/// Default name for predefined node configuration that corresponds to a gRPC pseudo-Node. +pub const DEFAULT_CONFIG_NAME: &str = "grpc_server"; + +/// Initialize a gRPC pseudo-node with the default configuration. +pub fn init_default() { + init(DEFAULT_CONFIG_NAME).unwrap(); +} + +/// Initializes a gRPC server pseudo-node and passes it a handle to write invocations to. +/// +/// Returns a [`Handle`] to read invocations from. +pub fn init(config: &str) -> std::result::Result { + // Create a channel and pass the read half to a new gRPC pseudo-node. + let (write_handle, read_handle) = crate::channel_create().expect("Couldn't create a channel"); + crate::node_create(config, "oak_main", read_handle)?; + crate::channel_close(read_handle.handle).expect("Couldn't close a channel"); + + // Create a separate channel for receiving invocations and pass it to a gRPC pseudo-node. + let (invocation_write_handle, invocation_read_handle) = + crate::channel_create().expect("Couldn't create a channel"); + crate::channel_write(write_handle, &[], &[invocation_write_handle.handle]) + .expect("Couldn't write to a channel"); + + Ok(invocation_read_handle) +}