Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler and worker creation #539

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn initialize_collection<VM: VMBinding>(mmtk: &'static MMTK<VM>, tls: VMThre
!mmtk.plan.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
mmtk.scheduler.initialize(*mmtk.options.threads, mmtk, tls);
mmtk.scheduler.spawn_gc_threads(mmtk, tls);
mmtk.plan.base().initialized.store(true, Ordering::SeqCst);
}

Expand Down
9 changes: 8 additions & 1 deletion src/mmtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ impl<VM: VMBinding> MMTK<VM> {
// The first call will initialize SFT map. Other calls will be blocked until SFT map is initialized.
SFT_MAP.initialize_once();

let scheduler = GCWorkScheduler::new();
let options = Arc::new(UnsafeOptionsWrapper::new(Options::default()));

let num_workers = if cfg!(feature = "single_worker") {
1
} else {
*options.threads
};

let scheduler = GCWorkScheduler::new(num_workers);
let plan = crate::plan::create_plan(
*options.plan,
&VM_MAP,
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<VM: VMBinding> GCController<VM> {
}
}
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
if self.scheduler.worker_group().all_parked() && self.scheduler.all_buckets_empty() {
if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() {
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers_shared {
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers_shared {
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
// TODO: Process weak references properly
Expand Down
177 changes: 90 additions & 87 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::stat::SchedulerStat;
use super::work_bucket::WorkBucketStage::*;
use super::work_bucket::*;
use super::worker::{GCWorker, GCWorkerShared, WorkerGroup};
use super::worker::{GCWorker, GCWorkerShared};
use super::*;
use crate::mmtk::MMTK;
use crate::util::opaque_pointer::*;
Expand All @@ -10,23 +11,24 @@ use enum_map::{enum_map, EnumMap};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{Arc, Condvar, Mutex};

pub enum CoordinatorMessage<VM: VMBinding> {
Work(Box<dyn CoordinatorWork<VM>>),
AllWorkerParked,
BucketDrained,
}

/// The shared data structure for distributing work packets between worker threads and the coordinator thread.
pub struct GCWorkScheduler<VM: VMBinding> {
/// Work buckets
/// Work buckets for worker threads
pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
/// Work for the coordinator thread
pub coordinator_work: WorkBucket<VM>,
/// The shared parts of GC workers
worker_group: Option<WorkerGroup<VM>>,
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
/// The shared part of the GC worker object of the controller thread
coordinator_worker_shared: Option<RwLock<Arc<GCWorkerShared<VM>>>>,
coordinator_worker_shared: Arc<GCWorkerShared<VM>>,
/// Condition Variable for worker synchronization
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
/// A callback to be fired after the `Closure` bucket is drained.
Expand All @@ -41,79 +43,42 @@ pub struct GCWorkScheduler<VM: VMBinding> {
closure_end: Mutex<Option<Box<dyn Send + Fn() -> bool>>>,
}

// The 'channel' inside Scheduler disallows Sync for Scheduler. We have to make sure we use channel properly:
// 1. We should never directly use Sender. We clone the sender and let each worker have their own copy.
// 2. Only the coordinator can use Receiver.
// TODO: We should remove channel from Scheduler, and directly send Sender/Receiver when creating the coordinator and
// the workers.
// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet.
// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances
// makes the compiler think WorkBucket is not Sync.
unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}

impl<VM: VMBinding> GCWorkScheduler<VM> {
pub fn new() -> Arc<Self> {
pub fn new(num_workers: usize) -> Arc<Self> {
let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default();
Arc::new(Self {
work_buckets: enum_map! {
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
},
coordinator_work: WorkBucket::new(true, worker_monitor.clone()),
worker_group: None,
coordinator_worker_shared: None,
worker_monitor,
closure_end: Mutex::new(None),
})
}

#[inline]
pub fn num_workers(&self) -> usize {
self.worker_group.as_ref().unwrap().worker_count()
}

pub fn initialize(
self: &'static Arc<Self>,
num_workers: usize,
mmtk: &'static MMTK<VM>,
tls: VMThread,
) {
use crate::scheduler::work_bucket::WorkBucketStage::*;
let num_workers = if cfg!(feature = "single_worker") {
1
} else {
num_workers
// Create work buckets for workers.
let mut work_buckets = enum_map! {
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
};

let (sender, receiver) = channel::<CoordinatorMessage<VM>>();

let mut self_mut = self.clone();
let self_mut = unsafe { Arc::get_mut_unchecked(&mut self_mut) };

let coordinator_worker = GCWorker::new(mmtk, 0, self.clone(), true, sender.clone());
self_mut.coordinator_worker_shared = Some(RwLock::new(coordinator_worker.shared.clone()));

let (worker_group, spawn_workers) =
WorkerGroup::new(mmtk, num_workers, self.clone(), sender);
self_mut.worker_group = Some(worker_group);

// Set the open condition of each bucket.
{
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
// This vec will grow for each stage we call with open_next()
let mut open_stages: Vec<WorkBucketStage> = vec![Unconstrained, Prepare];
// The rest will open after the previous stage is done.
let mut open_next = |s: WorkBucketStage| {
let cur_stages = open_stages.clone();
self_mut.work_buckets[s].set_open_condition(move || {
let should_open =
self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked();
work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler<VM>| {
let should_open = scheduler.are_buckets_drained(&cur_stages)
&& scheduler.all_workers_parked();
// Additional check before the `RefClosure` bucket opens.
if should_open && s == WorkBucketStage::RefClosure {
if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() {
if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() {
if closure_end() {
// Don't open `RefClosure` if `closure_end` added more works to `Closure`.
return false;
Expand All @@ -134,18 +99,72 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
open_next(Final);
}

// Now that the scheduler is initialized, we spawn the worker threads and the controller thread.
spawn_workers(tls);
// Create the work bucket for the controller.
let coordinator_work = WorkBucket::new(true, worker_monitor.clone());

// We prepare the shared part of workers, but do not create the actual workers now.
// The shared parts of workers are communication hubs between controller and workers.
let workers_shared = (0..num_workers)
.map(|_| Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone())))
.collect::<Vec<_>>();

// Similarly, we create the shared part of the work of the controller, but not the controller itself.
let coordinator_worker_shared = Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone()));

Arc::new(Self {
work_buckets,
coordinator_work,
workers_shared,
coordinator_worker_shared,
worker_monitor,
closure_end: Mutex::new(None),
})
}

#[inline]
pub fn num_workers(&self) -> usize {
self.workers_shared.len()
}

pub fn all_workers_parked(&self) -> bool {
self.workers_shared.iter().all(|w| w.is_parked())
}

/// Create GC threads, including the controller thread and all workers.
pub fn spawn_gc_threads(self: &Arc<Self>, mmtk: &'static MMTK<VM>, tls: VMThread) {
// Create the communication channel.
let (sender, receiver) = channel::<CoordinatorMessage<VM>>();

// Spawn the controller thread.
let coordinator_worker = GCWorker::new(
mmtk,
0,
self.clone(),
true,
sender.clone(),
self.coordinator_worker_shared.clone(),
);
let gc_controller = GCController::new(
mmtk,
mmtk.plan.base().gc_requester.clone(),
self.clone(),
receiver,
coordinator_worker,
);

VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Controller(gc_controller));

// Spawn each worker thread.
for (ordinal, shared) in self.workers_shared.iter().enumerate() {
let worker = Box::new(GCWorker::new(
mmtk,
ordinal,
self.clone(),
false,
sender.clone(),
shared.clone(),
));
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
}
}

/// Schedule all the common work packets
Expand Down Expand Up @@ -206,10 +225,6 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
*self.closure_end.lock().unwrap() = Some(f);
}

pub fn worker_group(&self) -> &WorkerGroup<VM> {
self.worker_group.as_ref().unwrap()
}

pub fn all_buckets_empty(&self) -> bool {
self.work_buckets.values().all(|bucket| bucket.is_empty())
}
Expand All @@ -221,7 +236,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
if id == WorkBucketStage::Unconstrained {
continue;
}
buckets_updated |= bucket.update();
buckets_updated |= bucket.update(self);
}
if buckets_updated {
// Notify the workers for new work
Expand Down Expand Up @@ -317,7 +332,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}
// Park this worker
worker.shared.parked.store(true, Ordering::SeqCst);
if self.worker_group().all_parked() {
if self.all_workers_parked() {
worker
.sender
.send(CoordinatorMessage::AllWorkerParked)
Expand All @@ -331,33 +346,21 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}

pub fn enable_stat(&self) {
for worker in &self.worker_group().workers_shared {
for worker in &self.workers_shared {
let worker_stat = worker.borrow_stat();
worker_stat.enable();
}
let coordinator_worker_shared = self
.coordinator_worker_shared
.as_ref()
.unwrap()
.read()
.unwrap();
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
coordinator_worker_stat.enable();
}

pub fn statistics(&self) -> HashMap<String, String> {
let mut summary = SchedulerStat::default();
for worker in &self.worker_group().workers_shared {
for worker in &self.workers_shared {
let worker_stat = worker.borrow_stat();
summary.merge(&worker_stat);
}
let coordinator_worker_shared = self
.coordinator_worker_shared
.as_ref()
.unwrap()
.read()
.unwrap();
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
summary.merge(&coordinator_worker_stat);
summary.harness_stat()
}
Expand Down
11 changes: 7 additions & 4 deletions src/scheduler/work_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct WorkBucket<VM: VMBinding> {
/// A priority queue
queue: RwLock<BinaryHeap<PrioritizedWork<VM>>>,
monitor: Arc<(Mutex<()>, Condvar)>,
can_open: Option<Box<dyn (Fn() -> bool) + Send>>,
can_open: Option<Box<dyn (Fn(&GCWorkScheduler<VM>) -> bool) + Send>>,
}

impl<VM: VMBinding> WorkBucket<VM> {
Expand Down Expand Up @@ -132,12 +132,15 @@ impl<VM: VMBinding> WorkBucket<VM> {
}
self.queue.write().pop().map(|v| v.work)
}
pub fn set_open_condition(&mut self, pred: impl Fn() -> bool + Send + 'static) {
pub fn set_open_condition(
&mut self,
pred: impl Fn(&GCWorkScheduler<VM>) -> bool + Send + 'static,
) {
self.can_open = Some(box pred);
}
pub fn update(&self) -> bool {
pub fn update(&self, scheduler: &GCWorkScheduler<VM>) -> bool {
if let Some(can_open) = self.can_open.as_ref() {
if !self.is_activated() && can_open() {
if !self.is_activated() && can_open(scheduler) {
self.activate();
return true;
}
Expand Down
Loading