Skip to content

Commit

Permalink
Return immediately from wait if at least one channel is invalid (proj…
Browse files Browse the repository at this point in the history
…ect-oak#960)

- `wait_on_channels` in `oak_runtime` returns early if any of the channels
  is invalid, orphaned, or inaccessible. A vector containing all statuses
  is returned even in this case. The method only waits if all the channels
  have a `ChannelReadStatus::NotReady` status.
- A new status `PERMISSION_DENIED` is added to `ChannelReadStatus`.
  • Loading branch information
rbehjati authored May 12, 2020
1 parent 32337e5 commit b813d40
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 58 deletions.
18 changes: 8 additions & 10 deletions docs/abi.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,14 @@ functions** as
### wait_on_channels

`wait_on_channels(usize, u32) -> u32` blocks until data is available for reading
from one of the specified channel handles. The channel handles are encoded in a
buffer that holds N contiguous 9-byte chunks, each of which is made up of an
8-byte channel handle value (little-endian `u64`) followed by a single channel
status byte. Invalid handles will have an `INVALID_CHANNEL` status, but
`wait_on_channels` return value will only fail for internal errors or if _all_
channels are invalid.

If reading from any of the specified (valid) channels would violate
[information flow control](/docs/concepts.md#labels), returns
`ERR_PERMISSION_DENIED`.
from one of the specified channel handles, unless any of the channels is
invalid, orphaned, or violates the
[information flow control](/docs/concepts.md#labels). The channel handles are
encoded in a buffer that holds N contiguous 9-byte chunks, each of which is made
up of an 8-byte channel handle value (little-endian u64) followed by a single
channel status byte. Invalid handles will have an `INVALID_CHANNEL`, `ORPHANED`,
or `PERMISSION_DENIED` status, but `wait_on_channels` return value will only
fail for internal errors or if _all_ channels are invalid.

- arg 0: Address of handle status buffer
- arg 1: Count N of handles provided
Expand Down
2 changes: 2 additions & 0 deletions oak/proto/oak_abi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ enum ChannelReadStatus {
INVALID_CHANNEL = 2;
// Channel has no extant write halves (and is empty).
ORPHANED = 3;
// A node trying to access the channel does not have the permission to do so.
PERMISSION_DENIED = 4;
}
2 changes: 2 additions & 0 deletions oak/server/rust/oak_abi/src/proto/oak_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ pub enum ChannelReadStatus {
InvalidChannel = 2,
/// Channel has no extant write halves (and is empty).
Orphaned = 3,
/// A node trying to access the channel does not have the permission to do so.
PermissionDenied = 4,
}
11 changes: 7 additions & 4 deletions oak/server/rust/oak_runtime/src/node/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,13 @@ impl WasmInterface {
})?;
}

if statuses
.iter()
.all(|&s| s == ChannelReadStatus::InvalidChannel || s == ChannelReadStatus::Orphaned)
{
let is_invalid = |&s| {
s == ChannelReadStatus::InvalidChannel
|| s == ChannelReadStatus::Orphaned
|| s == ChannelReadStatus::PermissionDenied
};

if statuses.iter().all(is_invalid) {
Err(OakStatus::ErrBadHandle)
} else {
Ok(())
Expand Down
80 changes: 37 additions & 43 deletions oak/server/rust/oak_runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,23 +723,6 @@ impl Runtime {
}
}

/// Returns whether the given Node is allowed to read from all the provided channels,
/// according to their respective [`Label`]s.
fn validate_can_read_from_channels(
&self,
node_id: NodeId,
channel_halves: &[ChannelHalf],
) -> Result<(), OakStatus> {
let all_channel_halves_ok = channel_halves
.iter()
.all(|half| self.validate_can_read_from_channel(node_id, half).is_ok());
if all_channel_halves_ok {
Ok(())
} else {
Err(OakStatus::ErrPermissionDenied)
}
}

/// Returns whether the given Node is allowed to write to the provided channel, according to
/// their respective [`Label`]s.
fn validate_can_write_to_channel(
Expand Down Expand Up @@ -809,17 +792,25 @@ impl Runtime {
.collect()
}

/// Waits on a slice of `ChannelHalf`s, blocking until one of the following conditions occurs:
/// - If the [`Runtime`] is terminating, return immediately with an `ErrTerminated` overall
/// status.
/// - If all provided read channels are in an erroneous status, e.g. when all channels are
/// orphaned, fill in the returned status vector and return an overall `Ok` status.
/// - If any of the channels is able to read a message, set the corresponding element in the
/// returned status vector to `ReadReady`, and return an overall `Ok` status.
// TODO(#970): Improve the documentation.
/// Waits on a slice of `ChannelHalf`s, blocking until one of the following conditions:
/// - If the [`Runtime`] is terminating this will return immediately with an `ErrTerminated`
/// status for each channel.
/// - If any of the readers is in an erroneous status, e.g. when a channel is orphaned, this
/// will immediately return with all the channels statuses set in the returned vector.
/// - If any of the channels is able to read a message, the corresponding element in the
/// returned vector will be set to `Ok(ReadReady)`, with `Ok(NotReady)` signaling the channel
/// has no message available.
///
/// In particular, if all channels are in a good status but no messages are available on any of
/// the channels (i.e., all channels have status [`ChannelReadStatus::NotReady`]),
/// [`Runtime::wait_on_channels`] will continue to block until a message is available on one of
/// the channels, or one of the channels is orphaned.
///
/// In particular, if there is at least one channel in good status and no messages on said
/// channel available, [`Runtime::wait_on_channels`] will continue to block until a message is
/// available.
/// Invariant: The returned vector of [`ChannelReadStatus`] values will be in 1-1
/// correspondence with the passed-in vector of [`oak_abi::Handle`]s.
///
/// [`Runtime`]: crate::runtime::Runtime
fn wait_on_channels(
&self,
node_id: NodeId,
Expand All @@ -836,8 +827,6 @@ impl Runtime {
}
}

self.validate_can_read_from_channels(node_id, &readers)?;

let thread = thread::current();
while !self.is_terminating() {
// Create a new Arc each iteration to be dropped after `thread::park` e.g. when the
Expand All @@ -859,18 +848,15 @@ impl Runtime {
})?;
}
let statuses = self.readers_statuses(node_id, &readers);
// Transcribe the status for valid channels back to the original position
// in the list of all statuses.
for i in 0..readers.len() {
all_statuses[reader_pos[i]] = statuses[i];
}

let all_unreadable = statuses.iter().all(|&s| {
s == ChannelReadStatus::InvalidChannel || s == ChannelReadStatus::Orphaned
});
let any_ready = statuses.iter().any(|&s| s == ChannelReadStatus::ReadReady);
let all_not_ready = statuses.iter().all(|&s| s == ChannelReadStatus::NotReady);

if all_unreadable || any_ready {
// Transcribe the status for valid channels back to the original position
// in the list of all statuses.
for i in 0..readers.len() {
all_statuses[reader_pos[i]] = statuses[i];
}
if !all_not_ready || read_handles.is_empty() || readers.len() != read_handles.len() {
return Ok(all_statuses);
}

Expand Down Expand Up @@ -955,15 +941,23 @@ impl Runtime {
}

/// Determine the readable status of a channel, returning:
/// - [`ChannelReadStatus::ReadReady`] if there is at least one message in the channel.
/// - [`ChannelReadStatus::Orphaned`] if there are no messages and there are no writers
/// - [`ChannelReadStatus::NotReady`] if there are no messages but there are some writers
/// - `Ok`([`ChannelReadStatus::ReadReady`]) if there is at least one message in the channel.
/// - `Ok`([`ChannelReadStatus::Orphaned`]) if there are no messages and there are no writers.
/// - `Ok`([`ChannelReadStatus::NotReady`]) if there are no messages but there are some writers.
/// - `Ok`([`ChannelReadStatus::PermissionDenied`]) if the node does not have permission to read
/// from the channel.
/// - `Err`([`OakStatus::ErrBadHandle`]) if the input handle does not indicate the read half of
/// a channel.
fn channel_status(
&self,
node_id: NodeId,
half: &ChannelHalf,
) -> Result<ChannelReadStatus, OakStatus> {
self.validate_can_read_from_channel(node_id, half)?;
if let Err(OakStatus::ErrPermissionDenied) =
self.validate_can_read_from_channel(node_id, half)
{
return Ok(ChannelReadStatus::PermissionDenied);
};
with_reader_channel(half, |channel| {
Ok(if channel.messages.read().unwrap().front().is_some() {
ChannelReadStatus::ReadReady
Expand Down
85 changes: 85 additions & 0 deletions oak/server/rust/oak_runtime/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,88 @@ fn create_node_more_public_label() {
}),
);
}

#[test]
fn wait_on_channels_immediately_returns_if_any_channel_is_orphaned() {
run_node_body(
Label::public_trusted(),
Box::new(|runtime| {
let (write_handle_0, read_handle_0) = runtime.channel_create(&Label::public_trusted());
let (_write_handle_1, read_handle_1) = runtime.channel_create(&Label::public_trusted());

// Close the write_handle; this should make the channel Orphaned
let result = runtime.channel_close(write_handle_0);
assert_eq!(Ok(()), result);

let result = runtime.wait_on_channels(&[read_handle_0, read_handle_1]);
assert_eq!(
Ok(vec![
ChannelReadStatus::Orphaned,
ChannelReadStatus::NotReady
]),
result
);
Ok(())
}),
);
}

#[test]
fn wait_on_channels_blocks_if_all_channels_have_status_not_ready() {
run_node_body(
Label::public_trusted(),
Box::new(|runtime| {
let (write_handle, read_handle) = runtime.channel_create(&Label::public_trusted());

// Change the status of the channel concurrently, to unpark the waiting thread.
let runtime_copy = runtime.clone();
let start = std::time::Instant::now();
std::thread::spawn(move || {
let ten_millis = std::time::Duration::from_millis(10);
thread::sleep(ten_millis);

// Close the write_handle; this should make the channel Orphaned
let result = runtime_copy.channel_close(write_handle);
assert_eq!(Ok(()), result);
});

let result = runtime.wait_on_channels(&[read_handle]);
assert!(start.elapsed() >= std::time::Duration::from_millis(10));
assert_eq!(Ok(vec![ChannelReadStatus::Orphaned]), result);
Ok(())
}),
);
}

#[test]
fn wait_on_channels_immediately_returns_if_any_channel_is_invalid() {
run_node_body(
Label::public_trusted(),
Box::new(|runtime| {
let (write_handle, _read_handle) = runtime.channel_create(&Label::public_trusted());
let (_write_handle, read_handle) = runtime.channel_create(&Label::public_trusted());

let result = runtime.wait_on_channels(&[write_handle, read_handle]);
assert_eq!(
Ok(vec![
ChannelReadStatus::InvalidChannel,
ChannelReadStatus::NotReady
]),
result
);
Ok(())
}),
);
}

#[test]
fn wait_on_channels_immediately_returns_if_the_input_list_is_empty() {
run_node_body(
Label::public_trusted(),
Box::new(|runtime| {
let result = runtime.wait_on_channels(&[]);
assert_eq!(Ok(Vec::<ChannelReadStatus>::new()), result);
Ok(())
}),
);
}
2 changes: 1 addition & 1 deletion sdk/rust/oak/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub fn wait_on_channels(handles: &[ReadHandle]) -> Result<Vec<ChannelReadStatus>
/// It also returns an error if the underlying channel is empty (i.e. not ready to read).
///
/// The provided vectors for received data and associated handles will be
/// resized to accomodate the information in the message; any data already
/// resized to accommodate the information in the message; any data already
/// held in the vectors will be overwritten.
pub fn channel_read(
half: ReadHandle,
Expand Down

0 comments on commit b813d40

Please sign in to comment.