Skip to content

Commit

Permalink
oak_tests: support logging pseudo-Node
Browse files Browse the repository at this point in the history
Add a Rust implementation of a logging pseudo-Node, which just loops
round and logs any messages it receives.
  • Loading branch information
daviddrysdale committed Nov 18, 2019
1 parent 0ea930a commit 980aaf5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 17 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/oak_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ byteorder = "1"
lazy_static = "1.4"
log = { version = "^0.4.0", features = ["std"] }
oak_derive = { path = "../oak_derive"}
oak_log = { path = "../oak_log"}
oak = { path = "../oak"}
protobuf = "*"
rand = { version = "0.7", features = ["std"] }
Expand Down
81 changes: 64 additions & 17 deletions rust/oak_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,20 @@ impl OakRuntime {
}
}
// Build a runtime that has Node and Channels set up according to the given
// Application configuration. Returns a vector of (node-name, contents-name)
// Application configuration. Returns a vector of (node-name, entrypoint)
// pairs indicating which Nodes need to be started, running which code.
fn configure(
fn configure<S: ::std::hash::BuildHasher>(
&mut self,
config: proto::manager::ApplicationConfiguration,
) -> Vec<(String, String)> {
entrypoints: HashMap<String, NodeMain, S>,
) -> Vec<(String, NodeMain)> {
// TODO: validate the config
let mut required_nodes = Vec::new();
self.termination_pending = false;
self.nodes.clear();
let mut grpc_node_name: Option<&str> = None;
let mut log_node_name: Option<&str> = None;
let mut log_channel: Option<ChannelRef> = None;
for node_cfg in config.get_nodes() {
if let Some(Node_oneof_node_type::web_assembly_node(wasm_cfg)) = &node_cfg.node_type {
let mut node = OakNode::new();
Expand All @@ -225,23 +228,43 @@ impl OakRuntime {
node_cfg.node_name, wasm_cfg.wasm_contents_name
);
self.nodes.insert(node_cfg.node_name.to_string(), node);
required_nodes.push((
node_cfg.node_name.clone(),
wasm_cfg.wasm_contents_name.clone(),
));
let entrypoint = *entrypoints
.get(&wasm_cfg.wasm_contents_name)
.unwrap_or_else(|| {
panic!("failed to find {} entrypoint", wasm_cfg.wasm_contents_name)
});
required_nodes.push((node_cfg.node_name.clone(), entrypoint));
} else if let Some(Node_oneof_node_type::grpc_server_node(_)) = node_cfg.node_type {
debug!("{{{}}}: add gRPC pseudo-Node", node_cfg.node_name);
grpc_node_name = Some(&node_cfg.node_name);
} else if let Some(Node_oneof_node_type::log_node(_)) = node_cfg.node_type {
let node = OakNode::new();
log_node_name = Some(&node_cfg.node_name);
debug!("{{{}}}: add log pseudo-Node", node_cfg.node_name);
self.nodes.insert(node_cfg.node_name.to_string(), node);
required_nodes.push((node_cfg.node_name.to_string(), log_node_main));
}
// TODO: add storage support
}
for channel_cfg in config.get_channels() {
let channel = self.new_channel();
let src = channel_cfg.get_source_endpoint();
let dest = channel_cfg.get_destination_endpoint();
debug!(
"add channel {{{}}}:{} -> {{{}}}:{}",
src.node_name, src.port_name, dest.node_name, dest.port_name
);
let channel = if log_node_name.is_some()
&& dest.node_name == log_node_name.unwrap()
&& dest.port_name == oak_log::IN_PORT_NAME
{
if log_channel.is_none() {
log_channel = Some(self.new_channel());
}
log_channel.clone().unwrap()
} else {
self.new_channel()
};

if grpc_node_name.is_some()
&& src.node_name == grpc_node_name.unwrap()
&& src.port_name == oak::grpc::OUT_PORT_NAME
Expand Down Expand Up @@ -750,20 +773,20 @@ pub type NodeMain = fn() -> i32;
/// oak_main(), as this would lead to duplicate symbols. Instead, the
/// entrypoints map should provide a function pointer for each WasmContents
/// entry in the configuration.
///
/// Also, Nodes under test should ensure that the oak_log crate does not end
/// being used for logging during tests, either by not calling oak_log::init()
/// at start-of-day (e.g. based on a build configuration), or by calling
/// oak_tests::init_logging() first, and ignoring the subsequent failure from
/// oak_log::init().
pub fn start<S: ::std::hash::BuildHasher>(
config: proto::manager::ApplicationConfiguration,
entrypoints: HashMap<String, NodeMain, S>,
) {
let required_nodes = RUNTIME.write().unwrap().configure(config);
for (name, contents_name) in required_nodes {
debug!(
"{{{}}}: start per-Node thread with contents '{}'",
name, contents_name
);
let required_nodes = RUNTIME.write().unwrap().configure(config, entrypoints);
for (name, entrypoint) in required_nodes {
debug!("{{{}}}: start per-Node thread", name);
let node_name = name.clone();
let entrypoint = *entrypoints
.get(&contents_name)
.unwrap_or_else(|| panic!("failed to find {} entrypoint", contents_name));
let thread_handle = spawn(move || {
set_node_name(node_name);
entrypoint()
Expand Down Expand Up @@ -811,6 +834,30 @@ pub fn stop() -> OakStatus {
overall_result
}

// Main loop function for a log pseudo-Node.
fn log_node_main() -> i32 {
let half = oak::ReadHandle {
handle: oak::channel_find(oak_log::IN_PORT_NAME),
};
if half.handle == oak::wasm::INVALID_HANDLE {
return OakStatus::ERR_BAD_HANDLE.value();
}
loop {
if let Err(status) = oak::wait_on_channels(&[half]) {
return status.value();
}
let mut buf = Vec::<u8>::with_capacity(1024);
let mut handles = Vec::with_capacity(8);
oak::channel_read(half, &mut buf, &mut handles);
if buf.is_empty() {
debug!("no pending message; poll again");
continue;
}
let message = String::from_utf8_lossy(&buf);
info!("LOG: {}", message);
}
}

/// Test helper to set up a channel into the Node under for injected gRPC
/// requests, using default parameters (node "app" port "grpc_in").
pub fn grpc_channel_setup_default() {
Expand Down

0 comments on commit 980aaf5

Please sign in to comment.