Skip to content

Commit

Permalink
[#27] add parking strategy for worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Jan 30, 2016
1 parent 0b0e2bb commit c3a425d
Showing 1 changed file with 69 additions and 38 deletions.
107 changes: 69 additions & 38 deletions src/runtime/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use std::cell::UnsafeCell;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{self, Builder};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, SendError};
use std::thread::{self, Builder, Thread};
use std::time::Duration;

use deque::{self, Worker, Stealer, Stolen};
use rand;
Expand All @@ -40,6 +42,23 @@ use scheduler::Scheduler;

thread_local!(static PROCESSOR: UnsafeCell<Option<Processor>> = UnsafeCell::new(None));

#[derive(Clone)]
pub struct ProcMessageSender {
inner: Sender<ProcMessage>,
processor: Arc<ProcessorInner>,
}

impl ProcMessageSender {
pub fn send(&self, proc_msg: ProcMessage) -> Result<(), SendError<ProcMessage>> {
try!(self.inner.send(proc_msg));
self.processor.try_wake_up();
Ok(())
}
}

unsafe impl Send for ProcMessageSender {}
unsafe impl Sync for ProcMessageSender {}

#[derive(Clone)]
pub struct Processor {
inner: Arc<ProcessorInner>,
Expand Down Expand Up @@ -69,6 +88,18 @@ pub struct ProcessorInner {
chan_receiver: Receiver<ProcMessage>,

is_exiting: bool,
is_parked: bool,
thread_handle: Option<Thread>,
should_wake_up: AtomicBool,
}

impl ProcessorInner {
fn try_wake_up(&self) {
if self.is_parked {
self.should_wake_up.store(true, Ordering::SeqCst);
self.thread_handle.as_ref().map(|x| x.unpark());
}
}
}

impl Processor {
Expand All @@ -94,6 +125,9 @@ impl Processor {
chan_receiver: rx,

is_exiting: false,
is_parked: false,
thread_handle: None,
should_wake_up: AtomicBool::new(false),
}),
};

Expand All @@ -117,7 +151,7 @@ impl Processor {
pub fn run_with_neighbors(processor_id: usize,
sched: *mut Scheduler,
neigh: Vec<Stealer<Handle>>)
-> (thread::JoinHandle<()>, Sender<ProcMessage>, Stealer<Handle>) {
-> (thread::JoinHandle<()>, ProcMessageSender, Stealer<Handle>) {
let mut p = Processor::new_with_neighbors(sched, neigh);
let msg = p.handle();
let st = p.stealer();
Expand All @@ -126,6 +160,7 @@ impl Processor {
.name(format!("Processor #{}", processor_id))
.spawn(move || {
Processor::set_tls(&mut p);
p.thread_handle = Some(thread::current());
p.schedule();
})
.unwrap();
Expand All @@ -137,7 +172,7 @@ impl Processor {
sched: *mut Scheduler,
f: M)
-> (thread::JoinHandle<()>,
Sender<ProcMessage>,
ProcMessageSender,
Stealer<Handle>,
::std::sync::mpsc::Receiver<Result<T, Box<Any + Send + 'static>>>)
where M: FnOnce() -> T + Send + 'static,
Expand Down Expand Up @@ -212,8 +247,11 @@ impl Processor {
self.queue_stealer.clone()
}

pub fn handle(&self) -> Sender<ProcMessage> {
self.chan_sender.clone()
pub fn handle(&self) -> ProcMessageSender {
ProcMessageSender {
inner: self.chan_sender.clone(),
processor: self.inner.clone(),
}
}

pub fn spawn_opts(&mut self, f: Box<FnBox()>, opts: Options) {
Expand All @@ -227,6 +265,8 @@ impl Processor {
self.main_coro.set_state(State::Running);

'outerloop: loop {
self.is_parked = false;

// 1. Run all tasks in local queue
while let Some(hdl) = self.queue_worker.pop() {
if !self.is_exiting {
Expand Down Expand Up @@ -264,46 +304,34 @@ impl Processor {

// Prefer running own tasks before stealing --> "continue" from anew.
if resume_all_tasks {
continue;
continue 'outerloop;
}
}

// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();
loop {
// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();

for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;
for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;

if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
self.resume(hdl);
continue 'outerloop;
}
}

// Wait forever until we got notified
// TODO:
// Could this be improved somehow?
// Maybe by implementing a "processor-pool" akin to a thread-pool,
// which would move park()ed Processors to a shared idle-queue.
// Other Processors could then unpark() them as necessary in their own ready() method.
if let Ok(msg) = self.chan_receiver.recv() {
match msg {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
self.is_exiting = true;
if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
self.resume(hdl);
continue 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
}
ProcMessage::Exit => {
break 'outerloop;
}
}
};

self.is_parked = true;
thread::park_timeout(Duration::from_millis(100));

// If we are waken up, then break this loop
// otherwise, continue to steal jobs from the others
if self.should_wake_up.swap(false, Ordering::SeqCst) {
break;
}
}
}

self.main_coro.set_state(State::Finished);
Expand Down Expand Up @@ -341,6 +369,9 @@ impl Processor {
/// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue)
pub fn ready(&mut self, coro: Handle) {
self.queue_worker.push(coro);

// Wake up the worker thread if it is parked
self.try_wake_up();
}

/// Suspends the current running coroutine, equivalent to `Scheduler::sched`
Expand Down

0 comments on commit c3a425d

Please sign in to comment.