diff --git a/src/memory_manager.rs b/src/memory_manager.rs index 490ce6b899..e77a2ab002 100644 --- a/src/memory_manager.rs +++ b/src/memory_manager.rs @@ -196,7 +196,7 @@ pub fn initialize_collection(mmtk: &'static MMTK, tls: VMThre !mmtk.plan.is_initialized(), "MMTk collection has been initialized (was initialize_collection() already called before?)" ); - mmtk.scheduler.initialize(*mmtk.options.threads, mmtk, tls); + mmtk.scheduler.spawn_gc_threads(mmtk, tls); mmtk.plan.base().initialized.store(true, Ordering::SeqCst); } diff --git a/src/mmtk.rs b/src/mmtk.rs index cc3bd2d31f..989fd85baf 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -57,8 +57,15 @@ impl MMTK { // The first call will initialize SFT map. Other calls will be blocked until SFT map is initialized. SFT_MAP.initialize_once(); - let scheduler = GCWorkScheduler::new(); let options = Arc::new(UnsafeOptionsWrapper::new(Options::default())); + + let num_workers = if cfg!(feature = "single_worker") { + 1 + } else { + *options.threads + }; + + let scheduler = GCWorkScheduler::new(num_workers); let plan = crate::plan::create_plan( *options.plan, &VM_MAP, diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 50c111b868..ceaae741d7 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -86,7 +86,7 @@ impl GCController { } } let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.worker_group().all_parked() && self.scheduler.all_buckets_empty() { + if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() { break; } } diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 13c060e89d..0def65f90a 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -49,7 +49,7 @@ impl GCWork for Prepare { mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(PrepareCollector); } } @@ -117,7 +117,7 @@ impl GCWork for Release { mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(ReleaseCollector); } // TODO: Process weak references properly diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 44b1b98458..1ddefbe6cd 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,6 +1,7 @@ use super::stat::SchedulerStat; +use super::work_bucket::WorkBucketStage::*; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, WorkerGroup}; +use super::worker::{GCWorker, GCWorkerShared}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; @@ -10,7 +11,7 @@ use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::mpsc::channel; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; pub enum CoordinatorMessage { Work(Box>), @@ -18,15 +19,16 @@ pub enum CoordinatorMessage { BucketDrained, } +/// The shared data structure for distributing work packets between worker threads and the coordinator thread. pub struct GCWorkScheduler { - /// Work buckets + /// Work buckets for worker threads pub work_buckets: EnumMap>, /// Work for the coordinator thread pub coordinator_work: WorkBucket, /// The shared parts of GC workers - worker_group: Option>, + pub workers_shared: Vec>>, /// The shared part of the GC worker object of the controller thread - coordinator_worker_shared: Option>>>, + coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization pub worker_monitor: Arc<(Mutex<()>, Condvar)>, /// A callback to be fired after the `Closure` bucket is drained. @@ -41,66 +43,29 @@ pub struct GCWorkScheduler { closure_end: Mutex bool>>>, } -// The 'channel' inside Scheduler disallows Sync for Scheduler. We have to make sure we use channel properly: -// 1. We should never directly use Sender. We clone the sender and let each worker have their own copy. -// 2. Only the coordinator can use Receiver. -// TODO: We should remove channel from Scheduler, and directly send Sender/Receiver when creating the coordinator and -// the workers. +// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. +// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances +// makes the compiler think WorkBucket is not Sync. unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { - pub fn new() -> Arc { + pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); - Arc::new(Self { - work_buckets: enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), - }, - coordinator_work: WorkBucket::new(true, worker_monitor.clone()), - worker_group: None, - coordinator_worker_shared: None, - worker_monitor, - closure_end: Mutex::new(None), - }) - } - #[inline] - pub fn num_workers(&self) -> usize { - self.worker_group.as_ref().unwrap().worker_count() - } - - pub fn initialize( - self: &'static Arc, - num_workers: usize, - mmtk: &'static MMTK, - tls: VMThread, - ) { - use crate::scheduler::work_bucket::WorkBucketStage::*; - let num_workers = if cfg!(feature = "single_worker") { - 1 - } else { - num_workers + // Create work buckets for workers. + let mut work_buckets = enum_map! { + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), }; - let (sender, receiver) = channel::>(); - - let mut self_mut = self.clone(); - let self_mut = unsafe { Arc::get_mut_unchecked(&mut self_mut) }; - - let coordinator_worker = GCWorker::new(mmtk, 0, self.clone(), true, sender.clone()); - self_mut.coordinator_worker_shared = Some(RwLock::new(coordinator_worker.shared.clone())); - - let (worker_group, spawn_workers) = - WorkerGroup::new(mmtk, num_workers, self.clone(), sender); - self_mut.worker_group = Some(worker_group); - + // Set the open condition of each bucket. { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() @@ -108,12 +73,12 @@ impl GCWorkScheduler { // The rest will open after the previous stage is done. let mut open_next = |s: WorkBucketStage| { let cur_stages = open_stages.clone(); - self_mut.work_buckets[s].set_open_condition(move || { - let should_open = - self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked(); + work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler| { + let should_open = scheduler.are_buckets_drained(&cur_stages) + && scheduler.all_workers_parked(); // Additional check before the `RefClosure` bucket opens. if should_open && s == WorkBucketStage::RefClosure { - if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() { + if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() { if closure_end() { // Don't open `RefClosure` if `closure_end` added more works to `Closure`. return false; @@ -134,9 +99,51 @@ impl GCWorkScheduler { open_next(Final); } - // Now that the scheduler is initialized, we spawn the worker threads and the controller thread. - spawn_workers(tls); + // Create the work bucket for the controller. + let coordinator_work = WorkBucket::new(true, worker_monitor.clone()); + + // We prepare the shared part of workers, but do not create the actual workers now. + // The shared parts of workers are communication hubs between controller and workers. + let workers_shared = (0..num_workers) + .map(|_| Arc::new(GCWorkerShared::::new(worker_monitor.clone()))) + .collect::>(); + + // Similarly, we create the shared part of the work of the controller, but not the controller itself. + let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(worker_monitor.clone())); + + Arc::new(Self { + work_buckets, + coordinator_work, + workers_shared, + coordinator_worker_shared, + worker_monitor, + closure_end: Mutex::new(None), + }) + } + + #[inline] + pub fn num_workers(&self) -> usize { + self.workers_shared.len() + } + + pub fn all_workers_parked(&self) -> bool { + self.workers_shared.iter().all(|w| w.is_parked()) + } + + /// Create GC threads, including the controller thread and all workers. + pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { + // Create the communication channel. + let (sender, receiver) = channel::>(); + // Spawn the controller thread. + let coordinator_worker = GCWorker::new( + mmtk, + 0, + self.clone(), + true, + sender.clone(), + self.coordinator_worker_shared.clone(), + ); let gc_controller = GCController::new( mmtk, mmtk.plan.base().gc_requester.clone(), @@ -144,8 +151,20 @@ impl GCWorkScheduler { receiver, coordinator_worker, ); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); + + // Spawn each worker thread. + for (ordinal, shared) in self.workers_shared.iter().enumerate() { + let worker = Box::new(GCWorker::new( + mmtk, + ordinal, + self.clone(), + false, + sender.clone(), + shared.clone(), + )); + VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); + } } /// Schedule all the common work packets @@ -206,10 +225,6 @@ impl GCWorkScheduler { *self.closure_end.lock().unwrap() = Some(f); } - pub fn worker_group(&self) -> &WorkerGroup { - self.worker_group.as_ref().unwrap() - } - pub fn all_buckets_empty(&self) -> bool { self.work_buckets.values().all(|bucket| bucket.is_empty()) } @@ -221,7 +236,7 @@ impl GCWorkScheduler { if id == WorkBucketStage::Unconstrained { continue; } - buckets_updated |= bucket.update(); + buckets_updated |= bucket.update(self); } if buckets_updated { // Notify the workers for new work @@ -317,7 +332,7 @@ impl GCWorkScheduler { } // Park this worker worker.shared.parked.store(true, Ordering::SeqCst); - if self.worker_group().all_parked() { + if self.all_workers_parked() { worker .sender .send(CoordinatorMessage::AllWorkerParked) @@ -331,33 +346,21 @@ impl GCWorkScheduler { } pub fn enable_stat(&self) { - for worker in &self.worker_group().workers_shared { + for worker in &self.workers_shared { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } - let coordinator_worker_shared = self - .coordinator_worker_shared - .as_ref() - .unwrap() - .read() - .unwrap(); - let coordinator_worker_stat = coordinator_worker_shared.borrow_stat(); + let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); coordinator_worker_stat.enable(); } pub fn statistics(&self) -> HashMap { let mut summary = SchedulerStat::default(); - for worker in &self.worker_group().workers_shared { + for worker in &self.workers_shared { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } - let coordinator_worker_shared = self - .coordinator_worker_shared - .as_ref() - .unwrap() - .read() - .unwrap(); - let coordinator_worker_stat = coordinator_worker_shared.borrow_stat(); + let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); summary.merge(&coordinator_worker_stat); summary.harness_stat() } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index f5b9ad3150..cc6badd08b 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -59,7 +59,7 @@ pub struct WorkBucket { /// A priority queue queue: RwLock>>, monitor: Arc<(Mutex<()>, Condvar)>, - can_open: Option bool) + Send>>, + can_open: Option) -> bool) + Send>>, } impl WorkBucket { @@ -132,12 +132,15 @@ impl WorkBucket { } self.queue.write().pop().map(|v| v.work) } - pub fn set_open_condition(&mut self, pred: impl Fn() -> bool + Send + 'static) { + pub fn set_open_condition( + &mut self, + pred: impl Fn(&GCWorkScheduler) -> bool + Send + 'static, + ) { self.can_open = Some(box pred); } - pub fn update(&self) -> bool { + pub fn update(&self, scheduler: &GCWorkScheduler) -> bool { if let Some(can_open) = self.can_open.as_ref() { - if !self.is_activated() && can_open() { + if !self.is_activated() && can_open(scheduler) { self.activate(); return true; } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 50a1701bd8..809c78eb27 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -4,11 +4,11 @@ use super::*; use crate::mmtk::MMTK; use crate::util::copy::GCWorkerCopyContext; use crate::util::opaque_pointer::*; -use crate::vm::{Collection, GCThreadContext, VMBinding}; +use crate::vm::VMBinding; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Sender; -use std::sync::Arc; +use std::sync::{Arc, Condvar, Mutex}; const LOCALLY_CACHED_WORKS: usize = 1; @@ -23,6 +23,16 @@ pub struct GCWorkerShared { pub local_work_bucket: WorkBucket, } +impl GCWorkerShared { + pub fn new(worker_monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + Self { + parked: AtomicBool::new(true), + stat: Default::default(), + local_work_bucket: WorkBucket::new(true, worker_monitor), + } + } +} + /// A GC worker. This part is privately owned by a worker thread. /// The GC controller also has an embedded `GCWorker` because it may also execute work packets. pub struct GCWorker { @@ -77,8 +87,8 @@ impl GCWorker { scheduler: Arc>, is_coordinator: bool, sender: Sender>, + shared: Arc>, ) -> Self { - let worker_monitor = scheduler.worker_monitor.clone(); Self { tls: VMWorkerThread(VMThread::UNINITIALIZED), ordinal, @@ -89,11 +99,7 @@ impl GCWorker { mmtk, is_coordinator, local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS), - shared: Arc::new(GCWorkerShared { - parked: AtomicBool::new(true), - stat: Default::default(), - local_work_bucket: WorkBucket::new(true, worker_monitor), - }), + shared, } } @@ -149,56 +155,3 @@ impl GCWorker { } } } - -pub struct WorkerGroup { - pub workers_shared: Vec>>, -} - -impl WorkerGroup { - pub fn new( - mmtk: &'static MMTK, - workers: usize, - scheduler: Arc>, - sender: Sender>, - ) -> (Self, Box) { - let mut workers_shared = Vec::new(); - let mut workers_to_spawn = Vec::new(); - - for ordinal in 0..workers { - let worker = Box::new(GCWorker::new( - mmtk, - ordinal, - scheduler.clone(), - false, - sender.clone(), - )); - let worker_shared = worker.shared.clone(); - workers_shared.push(worker_shared); - workers_to_spawn.push(worker); - } - - // NOTE: We cannot call spawn_gc_thread here, - // because the worker will access `Scheduler::worker_group` immediately after started, - // but that field will not be assigned to before this function returns. - // Therefore we defer the spawning operation later. - let deferred_spawn = Box::new(move |tls| { - for worker in workers_to_spawn.drain(..) { - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); - } - }); - - (Self { workers_shared }, deferred_spawn) - } - - pub fn worker_count(&self) -> usize { - self.workers_shared.len() - } - - pub fn parked_workers(&self) -> usize { - self.workers_shared.iter().filter(|w| w.is_parked()).count() - } - - pub fn all_parked(&self) -> bool { - self.parked_workers() == self.worker_count() - } -} diff --git a/src/util/options.rs b/src/util/options.rs index ae0ed7926b..b49c4cd654 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -148,9 +148,9 @@ mod process_tests { fn test_process_valid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process("threads", "1") }; + let success = unsafe { options.process("no_finalizer", "true") }; assert!(success); - assert_eq!(*options.threads, 1); + assert!(*options.no_finalizer); }) } @@ -158,10 +158,10 @@ mod process_tests { fn test_process_invalid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let default_threads = *options.threads; - let success = unsafe { options.process("threads", "a") }; + let default_no_finalizer = *options.no_finalizer; + let success = unsafe { options.process("no_finalizer", "100") }; assert!(!success); - assert_eq!(*options.threads, default_threads); + assert_eq!(*options.no_finalizer, default_no_finalizer); }) } @@ -178,9 +178,9 @@ mod process_tests { fn test_process_bulk_valid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process_bulk("threads=1 stress_factor=42") }; + let success = unsafe { options.process_bulk("no_finalizer=true stress_factor=42") }; assert!(success); - assert_eq!(*options.threads, 1); + assert!(*options.no_finalizer); assert_eq!(*options.stress_factor, 42); }) } @@ -189,7 +189,7 @@ mod process_tests { fn test_process_bulk_invalid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process_bulk("threads=a stress_factor=42") }; + let success = unsafe { options.process_bulk("no_finalizer=true stress_factor=a") }; assert!(!success); }) } @@ -307,8 +307,11 @@ macro_rules! options { options! { // The plan to use. This needs to be initialized before creating an MMTk instance (currently by setting env vars) plan: PlanSelector [env_var: true, command_line: false] [always_valid] = PlanSelector::NoGC, - // Number of GC threads. - threads: usize [env_var: true, command_line: true] [|v: &usize| *v > 0] = num_cpus::get(), + // Number of GC worker threads. (There is always one GC controller thread.) + // FIXME: Currently we create GCWorkScheduler when MMTK is created, which is usually static. + // To allow this as a command-line option, we need to refactor the creation fo the `MMTK` instance. + // See: https://github.com/mmtk/mmtk-core/issues/532 + threads: usize [env_var: true, command_line: false] [|v: &usize| *v > 0] = num_cpus::get(), // Enable an optimization that only scans the part of the stack that has changed since the last GC (not supported) use_short_stack_scans: bool [env_var: true, command_line: true] [always_valid] = false, // Enable a return barrier (not supported) diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index c7b7621150..e02e67c4d3 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -109,7 +109,7 @@ impl GCWork for SanityPrepare

{ mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(PrepareCollector); } } @@ -133,7 +133,7 @@ impl GCWork for SanityRelease

{ mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(ReleaseCollector); } }