diff --git a/src/runtime/processor.rs b/src/runtime/processor.rs index 4cbf78b..fbe3dc7 100644 --- a/src/runtime/processor.rs +++ b/src/runtime/processor.rs @@ -28,8 +28,10 @@ use std::cell::UnsafeCell; use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Weak}; -use std::sync::mpsc::{self, Receiver, Sender}; -use std::thread::{self, Builder}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver, Sender, SendError}; +use std::thread::{self, Builder, Thread}; +use std::time::Duration; use deque::{self, Worker, Stealer, Stolen}; use rand; @@ -40,6 +42,23 @@ use scheduler::Scheduler; thread_local!(static PROCESSOR: UnsafeCell> = UnsafeCell::new(None)); +#[derive(Clone)] +pub struct ProcMessageSender { + inner: Sender, + processor: Arc, +} + +impl ProcMessageSender { + pub fn send(&self, proc_msg: ProcMessage) -> Result<(), SendError> { + try!(self.inner.send(proc_msg)); + self.processor.try_wake_up(); + Ok(()) + } +} + +unsafe impl Send for ProcMessageSender {} +unsafe impl Sync for ProcMessageSender {} + #[derive(Clone)] pub struct Processor { inner: Arc, @@ -69,6 +88,18 @@ pub struct ProcessorInner { chan_receiver: Receiver, is_exiting: bool, + is_parked: bool, + thread_handle: Option, + should_wake_up: AtomicBool, +} + +impl ProcessorInner { + fn try_wake_up(&self) { + if self.is_parked { + self.should_wake_up.store(true, Ordering::SeqCst); + self.thread_handle.as_ref().map(|x| x.unpark()); + } + } } impl Processor { @@ -94,6 +125,9 @@ impl Processor { chan_receiver: rx, is_exiting: false, + is_parked: false, + thread_handle: None, + should_wake_up: AtomicBool::new(false), }), }; @@ -117,7 +151,7 @@ impl Processor { pub fn run_with_neighbors(processor_id: usize, sched: *mut Scheduler, neigh: Vec>) - -> (thread::JoinHandle<()>, Sender, Stealer) { + -> (thread::JoinHandle<()>, ProcMessageSender, Stealer) { let mut p = Processor::new_with_neighbors(sched, neigh); let msg = p.handle(); let st = p.stealer(); @@ -126,6 +160,7 @@ impl Processor { .name(format!("Processor #{}", processor_id)) .spawn(move || { Processor::set_tls(&mut p); + p.thread_handle = Some(thread::current()); p.schedule(); }) .unwrap(); @@ -137,7 +172,7 @@ impl Processor { sched: *mut Scheduler, f: M) -> (thread::JoinHandle<()>, - Sender, + ProcMessageSender, Stealer, ::std::sync::mpsc::Receiver>>) where M: FnOnce() -> T + Send + 'static, @@ -212,8 +247,11 @@ impl Processor { self.queue_stealer.clone() } - pub fn handle(&self) -> Sender { - self.chan_sender.clone() + pub fn handle(&self) -> ProcMessageSender { + ProcMessageSender { + inner: self.chan_sender.clone(), + processor: self.inner.clone(), + } } pub fn spawn_opts(&mut self, f: Box, opts: Options) { @@ -227,6 +265,8 @@ impl Processor { self.main_coro.set_state(State::Running); 'outerloop: loop { + self.is_parked = false; + // 1. Run all tasks in local queue while let Some(hdl) = self.queue_worker.pop() { if !self.is_exiting { @@ -264,46 +304,34 @@ impl Processor { // Prefer running own tasks before stealing --> "continue" from anew. if resume_all_tasks { - continue; + continue 'outerloop; } } - // 3. Randomly steal from neighbors as a last measure. - // TODO: To improve cache locality foreign lists should be split in half or so instead. - let rand_idx = self.rng.gen::(); - let total_stealers = self.neighbor_stealers.len(); + loop { + // 3. Randomly steal from neighbors as a last measure. + // TODO: To improve cache locality foreign lists should be split in half or so instead. + let rand_idx = self.rng.gen::(); + let total_stealers = self.neighbor_stealers.len(); - for idx in 0..total_stealers { - let idx = (rand_idx + idx) % total_stealers; + for idx in 0..total_stealers { + let idx = (rand_idx + idx) % total_stealers; - if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() { - self.resume(hdl); - continue 'outerloop; - } - } - - // Wait forever until we got notified - // TODO: - // Could this be improved somehow? - // Maybe by implementing a "processor-pool" akin to a thread-pool, - // which would move park()ed Processors to a shared idle-queue. - // Other Processors could then unpark() them as necessary in their own ready() method. - if let Ok(msg) = self.chan_receiver.recv() { - match msg { - ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei), - ProcMessage::Shutdown => { - self.is_exiting = true; + if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() { + self.resume(hdl); continue 'outerloop; } - ProcMessage::Ready(mut coro) => { - coro.set_preferred_processor(Some(self.weak_self.clone())); - self.ready(coro); - } - ProcMessage::Exit => { - break 'outerloop; - } } - }; + + self.is_parked = true; + thread::park_timeout(Duration::from_millis(100)); + + // If we are waken up, then break this loop + // otherwise, continue to steal jobs from the others + if self.should_wake_up.swap(false, Ordering::SeqCst) { + break; + } + } } self.main_coro.set_state(State::Finished); @@ -341,6 +369,9 @@ impl Processor { /// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue) pub fn ready(&mut self, coro: Handle) { self.queue_worker.push(coro); + + // Wake up the worker thread if it is parked + self.try_wake_up(); } /// Suspends the current running coroutine, equivalent to `Scheduler::sched`