Skip to content

Commit

Permalink
[#27] Temporary solution for processor idle
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Mar 19, 2016
1 parent fb3d941 commit 43cab0b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ log = "0.3"
mio = "0.5"
rand = "0.3"
slab = { git = "https://github.com/carllerche/slab.git", rev = "36c635b313619cec3d3595e53289b70f11aadff2" }
linked-hash-map = "0.0.9"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern crate deque;
extern crate mio;
extern crate rand;
extern crate slab;
extern crate linked_hash_map;

pub mod join_handle;
pub mod net;
Expand Down
114 changes: 51 additions & 63 deletions src/runtime/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ use std::cell::UnsafeCell;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, SendError};
use std::thread::{self, Builder, Thread};
use std::time::Duration;
use std::thread::{self, Builder};
use std::fmt;

use deque::{self, Worker, Stealer, Stolen};
Expand All @@ -52,7 +50,6 @@ pub struct ProcMessageSender {
impl ProcMessageSender {
pub fn send(&self, proc_msg: ProcMessage) -> Result<(), SendError<ProcMessage>> {
try!(self.inner.send(proc_msg));
self.processor.try_wake_up();
Ok(())
}
}
Expand Down Expand Up @@ -177,17 +174,6 @@ pub struct ProcessorInner {

chan_sender: Sender<ProcMessage>,
chan_receiver: Receiver<ProcMessage>,

thread_handle: Option<Thread>,
should_wake_up: AtomicBool,
}

impl ProcessorInner {
fn try_wake_up(&self) {
// This flag should always set to true when we have job to do
self.should_wake_up.store(true, Ordering::SeqCst);
self.thread_handle.as_ref().map(|x| x.unpark());
}
}

impl Processor {
Expand All @@ -211,9 +197,6 @@ impl Processor {

chan_sender: tx,
chan_receiver: rx,

thread_handle: None,
should_wake_up: AtomicBool::new(false),
}),
};

Expand Down Expand Up @@ -297,60 +280,68 @@ impl Processor {
}

// 2. Check the mainbox
loop {
{
let mut resume_all_tasks = false;

while let Ok(msg) = self.chan_receiver.try_recv() {
match msg {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
resume_all_tasks = true;
}
{
let mut resume_all_tasks = false;

while let Ok(msg) = self.chan_receiver.try_recv() {
match msg {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
resume_all_tasks = true;
}
}

// Prefer running own tasks before stealing --> "continue" from anew.
if resume_all_tasks {
continue 'outerloop;
}
}

// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists
// should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();
// Prefer running own tasks before stealing --> "continue" from anew.
if resume_all_tasks {
continue 'outerloop;
}
}

for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;
// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists
// should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();

if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
trace!("{:?}: stole Coroutine `{}`", self, hdl.debug_name());
self.resume(hdl);
continue 'outerloop;
}
}
for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;

// Check once before park
if self.should_wake_up.swap(false, Ordering::SeqCst) {
break;
if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
trace!("{:?}: stole Coroutine `{}`", self, hdl.debug_name());
self.resume(hdl);
continue 'outerloop;
}
}

thread::park_timeout(Duration::from_millis(1));
// Park the processor
{
let sched = self.scheduler();
sched.park_processor(self.id(), self.handle());
}

// If we are waken up, then break this loop
// otherwise, continue to steal jobs from the others
if self.should_wake_up.swap(false, Ordering::SeqCst) {
break;
match self.chan_receiver.recv().unwrap() {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
}
}

{
let sched = self.scheduler();
sched.unpark_processor(self.id());
}
}

trace!("{:?}: dropping coroutines in channel", self);
Expand Down Expand Up @@ -434,9 +425,6 @@ impl Processor {
/// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue)
pub fn ready(&mut self, coro: Handle) {
self.queue_worker.push(coro);

// Wake up the worker thread if it is parked
self.try_wake_up();
}

/// Suspends the current running coroutine, equivalent to `Scheduler::sched`
Expand Down
33 changes: 30 additions & 3 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ use std::fmt::Debug;
use std::io::{self, Write};
use std::mem;
use std::panic;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;

use mio::{Evented, EventLoop, EventSet, Handler, NotifyError, PollOpt, Sender, TimerError, Token};
use slab::Slab;
use linked_hash_map::LinkedHashMap;

use coroutine::{Handle, Coroutine};
use join_handle::{self, JoinHandleReceiver};
use options::Options;
use runtime::processor::{Processor, ProcMessage};
use runtime::processor::{Processor, ProcMessage, ProcMessageSender};
use sync::mono_barrier::CoroMonoBarrier;


Expand Down Expand Up @@ -157,7 +158,6 @@ impl ReadyStates {
}
}


/// Coroutine scheduler
pub struct Scheduler {
default_spawn_options: Options,
Expand All @@ -167,6 +167,8 @@ pub struct Scheduler {
event_loop_sender: Option<Sender<Message>>,
slab: Slab<ReadyStates, usize>,
work_count: Arc<AtomicUsize>,

parked_processors: Mutex<LinkedHashMap<usize, ProcMessageSender>>,
}

unsafe impl Send for Scheduler {}
Expand All @@ -181,6 +183,8 @@ impl Scheduler {
event_loop_sender: None,
slab: Slab::new(1024),
work_count: Arc::new(AtomicUsize::new(0)),

parked_processors: Mutex::new(LinkedHashMap::new()),
}
}

Expand Down Expand Up @@ -354,6 +358,17 @@ impl Scheduler {

let current = Processor::current();

// Try to wake up a parked processor
if let Some(ref processor) = current {
let mut parked = processor.scheduler().parked_processors.lock().unwrap();
if let Some((_, prochdl)) = parked.pop_front() {
trace!("Coroutine `{}`: pushing into a parked processor",
coro.debug_name());
let _ = prochdl.send(ProcMessage::Ready(coro));
return;
}
}

if let Some(mut preferred) = coro.preferred_processor() {
if let Some(ref curr) = current {
if preferred.id() == curr.id() {
Expand Down Expand Up @@ -490,6 +505,18 @@ impl Scheduler {
pub fn sleep(&self, delay: Duration) -> Result<(), TimerError> {
self.sleep_ms(delay.as_secs() * 1_000 + delay.subsec_nanos() as u64 / 1_000_000)
}

#[doc(hidden)]
pub fn park_processor(&self, id: usize, prochdl: ProcMessageSender) {
let mut parked = self.parked_processors.lock().unwrap();
parked.insert(id, prochdl);
}

#[doc(hidden)]
pub fn unpark_processor(&self, id: usize) {
let mut parked = self.parked_processors.lock().unwrap();
parked.remove(&id);
}
}

impl Handler for Scheduler {
Expand Down

0 comments on commit 43cab0b

Please sign in to comment.