-
Notifications
You must be signed in to change notification settings - Fork 114
/
mod.rs
341 lines (293 loc) · 12.5 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
//
// Copyright 2020 The Project Oak Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use std::prelude::v1::*;
use std::collections::HashMap;
use std::string::String;
use std::sync::{Arc, Weak};
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::SeqCst;
use oak_abi::{ChannelReadStatus, OakStatus};
use oak_platform::{JoinHandle, Mutex};
use log::debug;
use crate::message::Message;
use crate::node;
mod channel;
pub use channel::{ChannelEither, ChannelReader, ChannelWriter, ReadStatus};
type Channels = Vec<Weak<channel::Channel>>;
pub struct Configuration {
pub nodes: HashMap<String, node::Configuration>,
pub entry_module: String,
pub entrypoint: String,
}
/// Runtime structure for configuring and running a set of Oak nodes.
pub struct Runtime {
configurations: HashMap<String, node::Configuration>,
terminating: AtomicBool,
channels: Mutex<Channels>,
node_threads: Mutex<Vec<JoinHandle>>,
}
impl Runtime {
/// Configure and run the protobuf specified `ApplicationConfiguration`. After creating a
/// `Runtime` calling `stop` will send termination signals to nodes and wait for them to
/// terminate.
pub fn configure_and_run(
config: Configuration,
) -> Result<(RuntimeRef, ChannelWriter), OakStatus> {
let runtime = Runtime {
configurations: config.nodes,
terminating: AtomicBool::new(false),
channels: Mutex::new(Vec::new()),
node_threads: Mutex::new(Vec::new()),
};
let runtime = RuntimeRef(Arc::new(runtime));
let (chan_writer, chan_reader) = runtime.new_channel();
runtime.node_create(&config.entry_module, &config.entrypoint, chan_reader)?;
Ok((runtime, chan_writer))
}
/// Thread safe method for determining if the `Runtime` is terminating.
pub fn is_terminating(&self) -> bool {
self.terminating.load(SeqCst)
}
/// Thread safe method for signaling termination to a `Runtime` and waiting for its node
/// threads to terminate.
pub fn stop(&self) {
let handles = {
let mut node_threads = self.node_threads.lock().unwrap();
self.terminating.store(true, SeqCst);
std::mem::replace(&mut *node_threads, vec![])
};
// Unpark any threads that are blocked waiting on channels.
for handle in handles.iter() {
handle.thread().unpark();
}
for handle in handles {
handle.join().expect("Failed to join handle");
}
}
/// Creates a new channel.
pub fn new_channel(&self) -> (ChannelWriter, ChannelReader) {
let (c, w, r) = channel::new();
let mut channels = self.channels.lock().unwrap();
channels.push(Arc::downgrade(&c));
(w, r)
}
/// Reads the statuses from a slice of `Option<&ChannelReader>`s.
/// [`ChannelReadStatus::InvalidChannel`] is set for `None` readers in the slice. For `Some(_)`
/// readers, the result is set from a call to `has_message`.
fn readers_statuses(&self, readers: &[Option<&ChannelReader>]) -> Vec<ChannelReadStatus> {
readers
.iter()
.map(|chan| {
chan.map_or(ChannelReadStatus::InvalidChannel, |chan| {
self.channel_status(chan)
})
})
.collect()
}
/// Waits on a slice of `Option<&ChannelReader>`s, blocking until one of the following
/// conditions:
/// - If the `Runtime` is terminating this will return immediately with an `ErrTerminated`
/// status for each channel.
/// - If all readers are in an erroneous status, e.g. when all `ChannelReaders` are orphaned,
/// this will immediately return the channels statuses.
/// - If any of the channels is able to read a message, the corresponding element in the
/// returned vector will be set to `Ok(ReadReady)`, with `Ok(NotReady)` signaling the channel
/// has no message available
///
/// In particular, if there is at least one channel in good status and no messages on said
/// channel available, `wait_on_channels` will continue to block until a message is
/// available.
pub fn wait_on_channels(
&self,
readers: &[Option<&ChannelReader>],
) -> Result<Vec<ChannelReadStatus>, OakStatus> {
let thread = oak_platform::current_thread();
while !self.is_terminating() {
// Create a new Arc each iteration to be dropped after `thread::park` e.g. when the
// thread is resumed. When the Arc is deallocated, any remaining `Weak`
// references in `Channel`s will be orphaned. This means thread::unpark will
// not be called multiple times. Even if thread unpark is called spuriously
// and we wake up early, no channel statuses will be ready and so we can
// just continue.
//
// Note we read statuses directly after adding waiters, before blocking to ensure that
// there are no messages, after we have been added as a waiter.
let thread_id = oak_platform::current_thread().id();
let thread_ref = Arc::new(thread.clone());
for reader in readers {
if let Some(reader) = reader {
reader.add_waiter(thread_id, &thread_ref);
}
}
let statuses = self.readers_statuses(readers);
let all_unreadable = statuses.iter().all(|&s| {
s == ChannelReadStatus::InvalidChannel || s == ChannelReadStatus::Orphaned
});
let any_ready = statuses.iter().any(|&s| s == ChannelReadStatus::ReadReady);
if all_unreadable || any_ready {
return Ok(statuses);
}
debug!(
"wait_on_channels: channels not ready, parking thread {:?}",
oak_platform::current_thread().id()
);
oak_platform::park_thread();
debug!(
"wait_on_channels: thread {:?} re-woken",
oak_platform::current_thread().id()
);
}
Err(OakStatus::ErrTerminated)
}
/// Write a message to a channel. Fails with [`OakStatus::ErrChannelClosed`] if the underlying
/// channel has been orphaned.
pub fn channel_write(&self, channel: &ChannelWriter, msg: Message) -> Result<(), OakStatus> {
if channel.is_orphan() {
return Err(OakStatus::ErrChannelClosed);
}
{
let mut messages = channel.messages.write().unwrap();
messages.push_back(msg);
}
let mut waiting_threads = channel.waiting_threads.lock().unwrap();
// Unpark (wake up) all waiting threads that still have live references. The first thread
// woken can immediately read the message, and others might find `messages` is empty before
// they are even woken. This should not be an issue (being woken does not guarantee a
// message is available), but it could potentially result in some particular thread always
// getting first chance to read the message.
//
// If a thread is woken and finds no message it will take the `waiting_threads` lock and
// add itself again. Note that since that lock is currently held, the woken thread will add
// itself to waiting_threads *after* we call clear below as we release the lock implicilty
// on leaving this function.
for thread in waiting_threads.values() {
if let Some(thread) = thread.upgrade() {
thread.unpark();
}
}
waiting_threads.clear();
Ok(())
}
/// Thread safe. Read a message from a channel. Fails with [`OakStatus::ErrChannelClosed`] if
/// the underlying channel is empty and has been orphaned.
pub fn channel_read(&self, channel: &ChannelReader) -> Result<Option<Message>, OakStatus> {
let mut messages = channel.messages.write().unwrap();
match messages.pop_front() {
Some(m) => Ok(Some(m)),
None => {
if channel.is_orphan() {
Err(OakStatus::ErrChannelClosed)
} else {
Ok(None)
}
}
}
}
/// Thread safe. This function returns:
/// - [`ChannelReadStatus::ReadReady`] if there is at least one message in the channel.
/// - [`ChannelReadStatus::Orphaned`] if there are no messages and there are no writers
/// - [`ChannelReadStatus::NotReady`] if there are no messages but there are some writers
pub fn channel_status(&self, channel: &ChannelReader) -> ChannelReadStatus {
let messages = channel.messages.read().unwrap();
if messages.front().is_some() {
ChannelReadStatus::ReadReady
} else if channel.is_orphan() {
ChannelReadStatus::Orphaned
} else {
ChannelReadStatus::NotReady
}
}
/// Thread safe. Reads a message from the channel if `bytes_capacity` and `handles_capacity` are
/// large enough to accept the message. Fails with `OakStatus::ErrChannelClosed` if the
/// underlying channel has been orphaned _and_ is empty. If there was not enough
/// `bytes_capacity` or `handles_capacity`, `try_read_message` returns
/// `Some(ReadStatus::NeedsCapacity(needed_bytes_capacity,needed_handles_capacity))`. Does not
/// guarantee that the next call will succeed after capacity adjustments as another thread may
/// have read the original message.
pub fn channel_try_read_message(
&self,
channel: &ChannelReader,
bytes_capacity: usize,
handles_capacity: usize,
) -> Result<Option<ReadStatus>, OakStatus> {
let mut messages = channel.messages.write().unwrap();
match messages.front() {
Some(front) => {
let req_bytes_capacity = front.data.len();
let req_handles_capacity = front.channels.len();
Ok(Some(
if req_bytes_capacity > bytes_capacity
|| req_handles_capacity > handles_capacity
{
ReadStatus::NeedsCapacity(req_bytes_capacity, req_handles_capacity)
} else {
ReadStatus::Success(messages.pop_front().expect(
"Front element disappeared while we were holding the write lock!",
))
},
))
}
None => {
if channel.is_orphan() {
Err(OakStatus::ErrChannelClosed)
} else {
Ok(None)
}
}
}
}
}
/// A reference to a `Runtime`
#[derive(Clone)]
pub struct RuntimeRef(Arc<Runtime>);
impl RuntimeRef {
/// Thread safe method that attempts to create a node within the `Runtime` corresponding to a
/// given module name and entrypoint. The `reader: ChannelReader` is passed to the newly
/// created node.
///
/// `node_create` is a method of `RuntimeRef` and not `Runtime`, so that the underlying
/// `Arc<Runtime>` can be passed to `conf.new_instance` and given to a new node thread.
pub fn node_create(
&self,
module_name: &str,
entrypoint: &str,
reader: ChannelReader,
) -> Result<(), OakStatus> {
if self.is_terminating() {
return Err(OakStatus::ErrTerminated);
}
let mut node_threads = self.node_threads.lock().unwrap();
if self.is_terminating() {
return Err(OakStatus::ErrTerminated);
}
let join_handle = self
.configurations
.get(module_name)
.ok_or(OakStatus::ErrInvalidArgs)
.and_then(|conf| {
conf.new_instance(module_name, self.clone(), entrypoint.to_owned(), reader)
})?;
node_threads.push(join_handle);
Ok(())
}
}
impl std::ops::Deref for RuntimeRef {
type Target = Runtime;
#[inline]
fn deref(&self) -> &Runtime {
&self.0
}
}