From 5ab62f96c006475285b00b6d20a8b1bf83b74a4d Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Tue, 9 Apr 2024 10:45:46 +0800 Subject: [PATCH] Remove coordinator and support forking (#1067) This PR removes the coordinator (a.k.a. controller) thread, and allows temporarily terminating and restarting all GC worker threads in order to support forking. Major changes include: - `GCController` is removed. All synchronization mechanisms involving the controller are removed, too. Important synchronization operations, such as opening buckets and declaring GC finished, are done by the last parked worker. The work packet `EndOfGC` is removed, and its job is now done by the last parked worker. - The `WorkerMonitor`, which previously synchronizes between GC workers, now also synchronizes between mutators and GC workers. This allows mutators to trigger GC by directly communicating with GC workers. - Introduced a new mechanism: "goals". Mutators can now request "goals", and GC workers will work on one goal at a time. Currently, a "goal" can be either GC or StopForFork. Triggering GC is now implemented as requesting the GC goal. - Added a pair of new APIs, namely `MMTK::prepare_to_fork()` and `MMTK::after_fork()`. VM bindings call them before and after forking to let the MMTK instance do proper preparation for forking. Fixes: https://github.com/mmtk/mmtk-core/issues/1053 Fixes: https://github.com/mmtk/mmtk-core/issues/1054 --- docs/header/mmtk.h | 3 - src/global_state.rs | 6 + src/memory_manager.rs | 49 +-- src/mmtk.rs | 91 +++- src/plan/gc_requester.rs | 58 +-- src/scheduler/controller.rs | 148 ------- src/scheduler/gc_work.rs | 51 --- src/scheduler/mod.rs | 5 +- src/scheduler/scheduler.rs | 303 ++++++++++--- src/scheduler/work_bucket.rs | 24 +- src/scheduler/worker.rs | 408 ++++++++---------- src/scheduler/worker_goals.rs | 116 +++++ src/scheduler/worker_monitor.rs | 343 +++++++++++++++ src/util/options.rs | 5 +- src/util/rust_util/mod.rs | 17 + src/util/test_util/mock_vm.rs | 2 +- src/vm/collection.rs | 19 +- src/vm/scanning.rs | 2 +- .../tests/mock_tests/mock_test_init_fork.rs | 197 +++++++++ src/vm/tests/mock_tests/mod.rs | 1 + tools/tracing/README.md | 8 +- 21 files changed, 1277 insertions(+), 579 deletions(-) delete mode 100644 src/scheduler/controller.rs create mode 100644 src/scheduler/worker_goals.rs create mode 100644 src/scheduler/worker_monitor.rs create mode 100644 src/vm/tests/mock_tests/mock_test_init_fork.rs diff --git a/docs/header/mmtk.h b/docs/header/mmtk.h index e6fb6e2546..cf9995cd65 100644 --- a/docs/header/mmtk.h +++ b/docs/header/mmtk.h @@ -78,9 +78,6 @@ extern void mmtk_scan_region(); // Request MMTk to trigger a GC. Note that this may not actually trigger a GC extern void mmtk_handle_user_collection_request(void* tls); -// Run the main loop for the GC controller thread. Does not return -extern void mmtk_start_control_collector(void* tls, void* worker); - // Run the main loop for a GC worker. Does not return extern void mmtk_start_worker(void* tls, void* worker); diff --git a/src/global_state.rs b/src/global_state.rs index 4823119851..f355e697f2 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -1,5 +1,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Mutex; +use std::time::Instant; + +use atomic_refcell::AtomicRefCell; /// This stores some global states for an MMTK instance. /// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it. @@ -15,6 +18,8 @@ pub struct GlobalState { pub(crate) initialized: AtomicBool, /// The current GC status. pub(crate) gc_status: Mutex, + /// When did the last GC start? Only accessed by the last parked worker. + pub(crate) gc_start_time: AtomicRefCell>, /// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should /// attempt to collect as much as we can. pub(crate) emergency_collection: AtomicBool, @@ -195,6 +200,7 @@ impl Default for GlobalState { Self { initialized: AtomicBool::new(false), gc_status: Mutex::new(GcStatus::NotInGC), + gc_start_time: AtomicRefCell::new(None), stacks_prepared: AtomicBool::new(false), emergency_collection: AtomicBool::new(false), user_triggered_collection: AtomicBool::new(false), diff --git a/src/memory_manager.rs b/src/memory_manager.rs index 77d1b29d30..cabaa93ead 100644 --- a/src/memory_manager.rs +++ b/src/memory_manager.rs @@ -16,7 +16,7 @@ use crate::mmtk::MMTK; use crate::plan::AllocationSemantics; use crate::plan::{Mutator, MutatorContext}; use crate::scheduler::WorkBucketStage; -use crate::scheduler::{GCController, GCWork, GCWorker}; +use crate::scheduler::{GCWork, GCWorker}; use crate::util::alloc::allocators::AllocatorSelector; use crate::util::constants::{LOG_BYTES_IN_PAGE, MIN_OBJECT_SIZE}; use crate::util::heap::layout::vm_layout::vm_layout; @@ -25,7 +25,7 @@ use crate::util::{Address, ObjectReference}; use crate::vm::edge_shape::MemorySlice; use crate::vm::ReferenceGlue; use crate::vm::VMBinding; -use std::sync::atomic::Ordering; + /// Initialize an MMTk instance. A VM should call this method after creating an [`crate::MMTK`] /// instance but before using any of the methods provided in MMTk (except `process()` and `process_bulk()`). /// @@ -438,6 +438,7 @@ pub fn free_with_size(mmtk: &MMTK, addr: Address, old_size: u /// Get the current active malloc'd bytes. Here MMTk only accounts for bytes that are done through those 'counted malloc' functions. #[cfg(feature = "malloc_counted_size")] pub fn get_malloc_bytes(mmtk: &MMTK) -> usize { + use std::sync::atomic::Ordering; mmtk.state.malloc_bytes.load(Ordering::SeqCst) } @@ -460,53 +461,18 @@ pub fn gc_poll(mmtk: &MMTK, tls: VMMutatorThread) { } } -/// Run the main loop for the GC controller thread. This method does not return. -/// -/// Arguments: -/// * `tls`: The thread that will be used as the GC controller. -/// * `gc_controller`: The execution context of the GC controller threa. -/// It is the `GCController` passed to `Collection::spawn_gc_thread`. -/// * `mmtk`: A reference to an MMTk instance. -pub fn start_control_collector( - _mmtk: &'static MMTK, - tls: VMWorkerThread, - gc_controller: &mut GCController, -) { - gc_controller.run(tls); -} - -/// Run the main loop of a GC worker. This method does not return. -/// -/// Arguments: -/// * `tls`: The thread that will be used as the GC worker. -/// * `worker`: The execution context of the GC worker thread. -/// It is the `GCWorker` passed to `Collection::spawn_gc_thread`. -/// * `mmtk`: A reference to an MMTk instance. +/// Wrapper for [`crate::scheduler::GCWorker::run`]. pub fn start_worker( mmtk: &'static MMTK, tls: VMWorkerThread, - worker: &mut GCWorker, + worker: Box>, ) { worker.run(tls, mmtk); } -/// Initialize the scheduler and GC workers that are required for doing garbage collections. -/// This is a mandatory call for a VM during its boot process once its thread system -/// is ready. This should only be called once. This call will invoke Collection::spawn_gc_thread() -/// to create GC threads. -/// -/// Arguments: -/// * `mmtk`: A reference to an MMTk instance. -/// * `tls`: The thread that wants to enable the collection. This value will be passed back to the VM in -/// Collection::spawn_gc_thread() so that the VM knows the context. +/// Wrapper for [`crate::mmtk::MMTK::initialize_collection`]. pub fn initialize_collection(mmtk: &'static MMTK, tls: VMThread) { - assert!( - !mmtk.state.is_initialized(), - "MMTk collection has been initialized (was initialize_collection() already called before?)" - ); - mmtk.scheduler.spawn_gc_threads(mmtk, tls); - mmtk.state.initialized.store(true, Ordering::SeqCst); - probe!(mmtk, collection_initialized); + mmtk.initialize_collection(tls); } /// Process MMTk run-time options. Returns true if the option is processed successfully. @@ -554,6 +520,7 @@ pub fn free_bytes(mmtk: &MMTK) -> usize { /// to call this method is at the end of a GC (e.g. when the runtime is about to resume threads). #[cfg(feature = "count_live_bytes_in_gc")] pub fn live_bytes_in_last_gc(mmtk: &MMTK) -> usize { + use std::sync::atomic::Ordering; mmtk.state.live_bytes_in_last_gc.load(Ordering::SeqCst) } diff --git a/src/mmtk.rs b/src/mmtk.rs index 3ec1855ec0..86c9973954 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -148,7 +148,7 @@ impl MMTK { let state = Arc::new(GlobalState::default()); - let gc_requester = Arc::new(GCRequester::new()); + let gc_requester = Arc::new(GCRequester::new(scheduler.clone())); let gc_trigger = Arc::new(GCTrigger::new( options.clone(), @@ -220,6 +220,93 @@ impl MMTK { } } + /// Initialize the GC worker threads that are required for doing garbage collections. + /// This is a mandatory call for a VM during its boot process once its thread system + /// is ready. + /// + /// Internally, this function will invoke [`Collection::spawn_gc_thread()`] to spawn GC worker + /// threads. + /// + /// # Arguments + /// + /// * `tls`: The thread that wants to enable the collection. This value will be passed back + /// to the VM in [`Collection::spawn_gc_thread()`] so that the VM knows the context. + /// + /// [`Collection::spawn_gc_thread()`]: crate::vm::Collection::spawn_gc_thread() + pub fn initialize_collection(&'static self, tls: VMThread) { + assert!( + !self.state.is_initialized(), + "MMTk collection has been initialized (was initialize_collection() already called before?)" + ); + self.scheduler.spawn_gc_threads(self, tls); + self.state.initialized.store(true, Ordering::SeqCst); + probe!(mmtk, collection_initialized); + } + + /// Prepare an MMTk instance for calling the `fork()` system call. + /// + /// The `fork()` system call is available on Linux and some UNIX variants, and may be emulated + /// on other platforms by libraries such as Cygwin. The properties of the `fork()` system call + /// requires the users to do some preparation before calling it. + /// + /// - **Multi-threading**: If `fork()` is called when the process has multiple threads, it + /// will only duplicate the current thread into the child process, and the child process can + /// only call async-signal-safe functions, notably `exec()`. For VMs that that use + /// multi-process concurrency, it is imperative that when calling `fork()`, only one thread may + /// exist in the process. + /// + /// - **File descriptors**: The child process inherits copies of the parent's set of open + /// file descriptors. This may or may not be desired depending on use cases. + /// + /// This function helps VMs that use `fork()` for multi-process concurrency. It instructs all + /// GC threads to save their contexts and return from their entry-point functions. Currently, + /// such threads only include GC workers, and the entry point is + /// [`crate::memory_manager::start_worker`]. A subsequent call to `MMTK::after_fork()` will + /// re-spawn the threads using their saved contexts. The VM must not allocate objects in the + /// MMTk heap before calling `MMTK::after_fork()`. + /// + /// TODO: Currently, the MMTk core does not keep any files open for a long time. In the + /// future, this function and the `after_fork` function may be used for handling open file + /// descriptors across invocations of `fork()`. One possible use case is logging GC activities + /// and statistics to files, such as performing heap dumps across multiple GCs. + /// + /// If a VM intends to execute another program by calling `fork()` and immediately calling + /// `exec`, it may skip this function because the state of the MMTk instance will be irrelevant + /// in that case. + /// + /// # Caution! + /// + /// This function sends an asynchronous message to GC threads and returns immediately, but it + /// is only safe for the VM to call `fork()` after the underlying **native threads** of the GC + /// threads have exited. After calling this function, the VM should wait for their underlying + /// native threads to exit in VM-specific manner before calling `fork()`. + pub fn prepare_to_fork(&'static self) { + assert!( + self.state.is_initialized(), + "MMTk collection has not been initialized, yet (was initialize_collection() called before?)" + ); + probe!(mmtk, prepare_to_fork); + self.scheduler.stop_gc_threads_for_forking(); + } + + /// Call this function after the VM called the `fork()` system call. + /// + /// This function will re-spawn MMTk threads from saved contexts. + /// + /// # Arguments + /// + /// * `tls`: The thread that wants to respawn MMTk threads after forking. This value will be + /// passed back to the VM in `Collection::spawn_gc_thread()` so that the VM knows the + /// context. + pub fn after_fork(&'static self, tls: VMThread) { + assert!( + self.state.is_initialized(), + "MMTk collection has not been initialized, yet (was initialize_collection() called before?)" + ); + probe!(mmtk, after_fork); + self.scheduler.respawn_gc_threads_after_forking(tls); + } + /// Generic hook to allow benchmarks to be harnessed. MMTk will trigger a GC /// to clear any residual garbage and start collecting statistics for the benchmark. /// This is usually called by the benchmark harness as its last step before the actual benchmark. @@ -349,6 +436,8 @@ impl MMTK { self.state .internal_triggered_collection .store(true, Ordering::Relaxed); + // TODO: The current `GCRequester::request()` is probably incorrect for internally triggered GC. + // Consider removing functions related to "internal triggered collection". self.gc_requester.request(); } diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 2e5f518cb2..e3a8462f96 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -1,66 +1,42 @@ +use crate::scheduler::GCWorkScheduler; use crate::vm::VMBinding; -use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Condvar, Mutex}; +use std::sync::Arc; -struct RequestSync { - request_count: isize, - last_request_count: isize, -} - -/// GC requester. This object allows other threads to request (trigger) GC, -/// and the GC coordinator thread waits for GC requests using this object. +/// This data structure lets mutators trigger GC. pub struct GCRequester { - request_sync: Mutex, - request_condvar: Condvar, + /// Set by mutators to trigger GC. It is atomic so that mutators can check if GC has already + /// been requested efficiently in `poll` without acquiring any mutex. request_flag: AtomicBool, - phantom: PhantomData, -} - -// Clippy says we need this... -impl Default for GCRequester { - fn default() -> Self { - Self::new() - } + scheduler: Arc>, } impl GCRequester { - pub fn new() -> Self { + pub fn new(scheduler: Arc>) -> Self { GCRequester { - request_sync: Mutex::new(RequestSync { - request_count: 0, - last_request_count: -1, - }), - request_condvar: Condvar::new(), request_flag: AtomicBool::new(false), - phantom: PhantomData, + scheduler, } } + /// Request a GC. Called by mutators when polling (during allocation) and when handling user + /// GC requests (e.g. `System.gc();` in Java). pub fn request(&self) { if self.request_flag.load(Ordering::Relaxed) { return; } - let mut guard = self.request_sync.lock().unwrap(); - if !self.request_flag.load(Ordering::Relaxed) { - self.request_flag.store(true, Ordering::Relaxed); - guard.request_count += 1; - self.request_condvar.notify_all(); + if !self.request_flag.swap(true, Ordering::Relaxed) { + // `GCWorkScheduler::request_schedule_collection` needs to hold a mutex to communicate + // with GC workers, which is expensive for functions like `poll`. We use the atomic + // flag `request_flag` to elide the need to acquire the mutex in subsequent calls. + self.scheduler.request_schedule_collection(); } } + /// Clear the "GC requested" flag so that mutators can trigger the next GC. + /// Called by a GC worker when all mutators have come to a stop. pub fn clear_request(&self) { - let guard = self.request_sync.lock().unwrap(); self.request_flag.store(false, Ordering::Relaxed); - drop(guard); - } - - pub fn wait_for_request(&self) { - let mut guard = self.request_sync.lock().unwrap(); - guard.last_request_count += 1; - while guard.last_request_count == guard.request_count { - guard = self.request_condvar.wait(guard).unwrap(); - } } } diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs deleted file mode 100644 index 3608f1ebfb..0000000000 --- a/src/scheduler/controller.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! The GC controller thread. -//! -//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread. -//! The GC controller thread responds to GC requests and coordinates the workers to perform GC. - -use std::sync::Arc; - -use crate::plan::gc_requester::GCRequester; -use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection}; -use crate::scheduler::{GCWork, WorkBucketStage}; -use crate::util::VMWorkerThread; -use crate::vm::VMBinding; -use crate::MMTK; - -use super::{GCWorkScheduler, GCWorker}; - -/// The thread local struct for the GC controller, the counterpart of `GCWorker`. -pub struct GCController { - /// The reference to the MMTk instance. - mmtk: &'static MMTK, - /// The reference to the GC requester. - requester: Arc>, - /// The reference to the scheduler. - scheduler: Arc>, - /// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`. - coordinator_worker: GCWorker, -} - -impl GCController { - pub(crate) fn new( - mmtk: &'static MMTK, - requester: Arc>, - scheduler: Arc>, - coordinator_worker: GCWorker, - ) -> Box> { - Box::new(Self { - mmtk, - requester, - scheduler, - coordinator_worker, - }) - } - - /// The main loop for the GC controller. - pub fn run(&mut self, tls: VMWorkerThread) -> ! { - probe!(mmtk, gccontroller_run); - // Initialize the GC worker for coordinator. We are not using the run() method from - // GCWorker so we manually initialize the worker here. - self.coordinator_worker.tls = tls; - - loop { - debug!("[STWController: Waiting for request...]"); - self.requester.wait_for_request(); - debug!("[STWController: Request recieved.]"); - - self.do_gc_until_completion_traced(); - debug!("[STWController: Worker threads complete!]"); - } - } - - /// Find more work for workers to do. Return true if more work is available. - fn find_more_work_for_workers(&mut self) -> bool { - if self.scheduler.worker_group.has_designated_work() { - return true; - } - - // See if any bucket has a sentinel. - if self.scheduler.schedule_sentinels() { - return true; - } - - // Try to open new buckets. - if self.scheduler.update_buckets() { - return true; - } - - // If all of the above failed, it means GC has finished. - false - } - - /// A wrapper method for [`do_gc_until_completion`](GCController::do_gc_until_completion) to insert USDT tracepoints. - fn do_gc_until_completion_traced(&mut self) { - probe!(mmtk, gc_start); - self.do_gc_until_completion(); - probe!(mmtk, gc_end); - } - - /// Coordinate workers to perform GC in response to a GC request. - fn do_gc_until_completion(&mut self) { - let gc_start = std::time::Instant::now(); - - debug_assert!( - self.scheduler.worker_monitor.debug_is_sleeping(), - "Workers are still doing work when GC started." - ); - - // Add a ScheduleCollection work packet. It is the seed of other work packets. - self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); - - // Notify only one worker at this time because there is only one work packet, - // namely `ScheduleCollection`. - self.scheduler.worker_monitor.resume_and_wait(false); - - // Gradually open more buckets as workers stop each time they drain all open bucket. - loop { - // Workers should only transition to the `Sleeping` state when all open buckets have - // been drained. - self.scheduler.assert_all_activated_buckets_are_empty(); - - let new_work_available = self.find_more_work_for_workers(); - - // GC finishes if there is no new work to do. - if !new_work_available { - break; - } - - // Notify all workers because there should be many work packets available in the newly - // opened bucket(s). - self.scheduler.worker_monitor.resume_and_wait(true); - } - - // All GC workers must have parked by now. - debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping()); - debug_assert!(!self.scheduler.worker_group.has_designated_work()); - debug_assert!(self.scheduler.all_buckets_empty()); - - // Deactivate all work buckets to prepare for the next GC. - // NOTE: There is no need to hold any lock. - // Workers are in the `Sleeping` state. - // so they will not wake up while we deactivate buckets. - self.scheduler.deactivate_all(); - - // Tell GC trigger that GC ended - this happens before EndOfGC where we resume mutators. - self.mmtk.gc_trigger.policy.on_gc_end(self.mmtk); - - // Finalization: Resume mutators, reset gc states - // Note: Resume-mutators must happen after all work buckets are closed. - // Otherwise, for generational GCs, workers will receive and process - // newly generated remembered-sets from those open buckets. - // But these remsets should be preserved until next GC. - let mut end_of_gc = EndOfGC { - elapsed: gc_start.elapsed(), - }; - end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); - - self.scheduler.debug_assert_all_buckets_deactivated(); - } -} diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 8cf4c74bee..e12910a5fc 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -212,57 +212,6 @@ impl GCWork for StopMutators { } } -#[derive(Default)] -pub struct EndOfGC { - pub elapsed: std::time::Duration, -} - -impl GCWork for EndOfGC { - fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { - info!( - "End of GC ({}/{} pages, took {} ms)", - mmtk.get_plan().get_reserved_pages(), - mmtk.get_plan().get_total_pages(), - self.elapsed.as_millis() - ); - - #[cfg(feature = "count_live_bytes_in_gc")] - { - let live_bytes = mmtk.state.get_live_bytes_in_last_gc(); - let used_bytes = - mmtk.get_plan().get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE; - debug_assert!( - live_bytes <= used_bytes, - "Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.", - live_bytes, used_bytes - ); - info!( - "Live objects = {} bytes ({:04.1}% of {} used pages)", - live_bytes, - live_bytes as f64 * 100.0 / used_bytes as f64, - mmtk.get_plan().get_used_pages() - ); - } - - // We assume this is the only running work packet that accesses plan at the point of execution - let plan_mut: &mut dyn Plan = unsafe { mmtk.get_plan_mut() }; - plan_mut.end_of_gc(worker.tls); - - #[cfg(feature = "extreme_assertions")] - if crate::util::edge_logger::should_check_duplicate_edges(mmtk.get_plan()) { - // reset the logging info at the end of each GC - mmtk.edge_logger.reset(); - } - - // Reset the triggering information. - mmtk.state.reset_collection_trigger(); - - // Set to NotInGC after everything, and right before resuming mutators. - mmtk.set_gc_status(GcStatus::NotInGC); - ::VMCollection::resume_mutators(worker.tls); - } -} - /// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped /// `ProcessEdgesWork` instance. pub(crate) struct ProcessEdgesWorkTracer { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 03b7d822bc..6cc3b4da94 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -17,11 +17,10 @@ mod work_bucket; pub use work_bucket::WorkBucketStage; mod worker; +mod worker_goals; +mod worker_monitor; pub(crate) use worker::current_worker_ordinal; pub use worker::GCWorker; -mod controller; -pub use controller::GCController; - pub(crate) mod gc_work; pub use gc_work::ProcessEdgesWork; diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 5c71296eca..bc310bf482 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,26 +1,32 @@ +use self::worker::PollResult; + +use super::gc_work::ScheduleCollection; use super::stat::SchedulerStat; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, ThreadId, WorkerGroup, WorkerMonitor}; +use super::worker::{GCWorker, ThreadId, WorkerGroup}; +use super::worker_goals::{WorkerGoal, WorkerGoals}; +use super::worker_monitor::{LastParkedResult, WorkerMonitor}; use super::*; +use crate::global_state::GcStatus; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::util::options::AffinityKind; use crate::util::rust_util::array_from_fn; use crate::vm::Collection; -use crate::vm::{GCThreadContext, VMBinding}; -use crossbeam::deque::{self, Steal}; +use crate::vm::VMBinding; +use crate::Plan; +use crossbeam::deque::Steal; use enum_map::{Enum, EnumMap}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; pub struct GCWorkScheduler { /// Work buckets pub work_buckets: EnumMap>, /// Workers pub(crate) worker_group: Arc>, - /// The shared part of the GC worker object of the controller thread - coordinator_worker_shared: Arc>, - /// Condition Variable for worker synchronization + /// For synchronized communication between workers and with mutators. pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. affinity: AffinityKind, @@ -46,14 +52,16 @@ impl GCWorkScheduler { // Set the open condition of each bucket. { - // Unconstrained is always open. Prepare will be opened at the beginning of a GC. - // This vec will grow for each stage we call with open_next() let first_stw_stage = WorkBucketStage::first_stw_stage(); let mut open_stages: Vec = vec![first_stw_stage]; - // The rest will open after the previous stage is done. let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); for stage in stages { + // Unconstrained is always open. + // The first STW stage (Prepare) will be opened when the world stopped + // (i.e. when all mutators are suspended). if stage != WorkBucketStage::Unconstrained && stage != first_stw_stage { + // Other work packets will be opened after previous stages are done + // (i.e their buckets are drained and all workers parked). let cur_stages = open_stages.clone(); work_buckets[stage].set_open_condition( move |scheduler: &GCWorkScheduler| { @@ -65,12 +73,9 @@ impl GCWorkScheduler { } } - let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(None)); - Arc::new(Self { work_buckets, worker_group, - coordinator_worker_shared, worker_monitor, affinity, }) @@ -80,26 +85,40 @@ impl GCWorkScheduler { self.worker_group.as_ref().worker_count() } - /// Create GC threads, including the controller thread and all workers. + /// Create GC threads for the first time. It will also create the `GCWorker` instances. + /// + /// Currently GC threads only include worker threads, and we currently have only one worker + /// group. We may add more worker groups in the future. pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { - // Spawn the controller thread. - let coordinator_worker = GCWorker::new( - mmtk, - usize::MAX, - self.clone(), - true, - self.coordinator_worker_shared.clone(), - deque::Worker::new_fifo(), - ); - let gc_controller = GCController::new( - mmtk, - mmtk.gc_requester.clone(), - self.clone(), - coordinator_worker, - ); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); + self.worker_group.initial_spawn(tls, mmtk); + } + + /// Ask all GC workers to exit for forking. + pub fn stop_gc_threads_for_forking(self: &Arc) { + self.worker_group.prepare_surrender_buffer(); + + debug!("A mutator is requesting GC threads to stop for forking..."); + self.worker_monitor.make_request(WorkerGoal::StopForFork); + } + + /// Surrender the `GCWorker` struct of a GC worker when it exits. + pub fn surrender_gc_worker(&self, worker: Box>) { + let all_surrendered = self.worker_group.surrender_gc_worker(worker); + + if all_surrendered { + debug!( + "All {} workers surrendered.", + self.worker_group.worker_count() + ); + self.worker_monitor.on_all_workers_exited(); + } + } - self.worker_group.spawn(mmtk, tls) + /// Respawn GC threads after forking. This will reuse the `GCWorker` instances of stopped + /// workers. `tls` is the VM thread that requests GC threads to be re-spawn, and will be + /// passed down to [`crate::vm::Collection::spawn_gc_thread`]. + pub fn respawn_gc_threads_after_forking(self: &Arc, tls: VMThread) { + self.worker_group.respawn(tls) } /// Resolve the affinity of a thread. @@ -107,9 +126,20 @@ impl GCWorkScheduler { self.affinity.resolve_affinity(thread); } + /// Request a GC to be scheduled. Called by mutator via `GCRequester`. + pub(crate) fn request_schedule_collection(&self) { + debug!("A mutator is sending GC-scheduling request to workers..."); + self.worker_monitor.make_request(WorkerGoal::Gc); + } + + /// Add the `ScheduleCollection` packet. Called by the last parked worker. + fn add_schedule_collection_packet(&self) { + // We are still holding the mutex `WorkerMonitor::sync`. Do not notify now. + self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); + } + /// Schedule all the common work packets pub fn schedule_common_work>(&self, plan: &'static C::PlanType) { - use crate::plan::Plan; use crate::scheduler::gc_work::*; // Stop & scan mutators (mutator scanning can happen before STW) self.work_buckets[WorkBucketStage::Unconstrained].add(StopMutators::::new()); @@ -358,29 +388,207 @@ impl GCWorkScheduler { /// Called by workers to get a schedulable work packet. /// Park the worker if there're no available packets. - pub fn poll(&self, worker: &GCWorker) -> Box> { - self.poll_schedulable_work(worker) - .unwrap_or_else(|| self.poll_slow(worker)) + pub(crate) fn poll(&self, worker: &GCWorker) -> PollResult { + if let Some(work) = self.poll_schedulable_work(worker) { + return Ok(work); + } + self.poll_slow(worker) } - fn poll_slow(&self, worker: &GCWorker) -> Box> { + fn poll_slow(&self, worker: &GCWorker) -> PollResult { loop { // Retry polling if let Some(work) = self.poll_schedulable_work(worker) { - return work; + return Ok(work); } - self.worker_monitor.park_and_wait(worker); + let ordinal = worker.ordinal; + self.worker_monitor + .park_and_wait(ordinal, |goals| self.on_last_parked(worker, goals))?; } } + /// Called when the last worker parked. `goal` allows this function to inspect and change the + /// current goal. + fn on_last_parked(&self, worker: &GCWorker, goals: &mut WorkerGoals) -> LastParkedResult { + let Some(ref current_goal) = goals.current() else { + // There is no goal. Find a request to respond to. + return self.respond_to_requests(worker, goals); + }; + + match current_goal { + WorkerGoal::Gc => { + // We are in the progress of GC. + + // In stop-the-world GC, mutators cannot request for GC while GC is in progress. + // When we support concurrent GC, we should remove this assertion. + assert!( + !goals.debug_is_requested(WorkerGoal::Gc), + "GC request sent to WorkerMonitor while GC is still in progress." + ); + + // We are in the middle of GC, and the last GC worker parked. + trace!("The last worker parked during GC. Try to find more work to do..."); + + // During GC, if all workers parked, all open buckets must have been drained. + self.assert_all_activated_buckets_are_empty(); + + // Find more work for workers to do. + let found_more_work = self.find_more_work_for_workers(); + + if found_more_work { + LastParkedResult::WakeAll + } else { + // GC finished. + self.on_gc_finished(worker); + + // Clear the current goal + goals.on_current_goal_completed(); + self.respond_to_requests(worker, goals) + } + } + WorkerGoal::StopForFork => { + panic!( + "Worker {} parked again when it is asked to exit.", + worker.ordinal + ) + } + } + } + + /// Respond to a worker reqeust. + fn respond_to_requests( + &self, + worker: &GCWorker, + goals: &mut WorkerGoals, + ) -> LastParkedResult { + assert!(goals.current().is_none()); + + let Some(goal) = goals.poll_next_goal() else { + // No requests. Park this worker, too. + return LastParkedResult::ParkSelf; + }; + + match goal { + WorkerGoal::Gc => { + trace!("A mutator requested a GC to be scheduled."); + + // We set the eBPF trace point here so that bpftrace scripts can start recording + // work packet events before the `ScheduleCollection` work packet starts. + probe!(mmtk, gc_start); + + { + let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut(); + assert!(gc_start_time.is_none(), "GC already started?"); + *gc_start_time = Some(Instant::now()); + } + + self.add_schedule_collection_packet(); + LastParkedResult::WakeSelf + } + WorkerGoal::StopForFork => { + trace!("A mutator wanted to fork."); + LastParkedResult::WakeAll + } + } + } + + /// Find more work for workers to do. Return true if more work is available. + fn find_more_work_for_workers(&self) -> bool { + if self.worker_group.has_designated_work() { + trace!("Some workers have designated work."); + return true; + } + + // See if any bucket has a sentinel. + if self.schedule_sentinels() { + trace!("Some sentinels are scheduled."); + return true; + } + + // Try to open new buckets. + if self.update_buckets() { + trace!("Some buckets are opened."); + return true; + } + + // If all of the above failed, it means GC has finished. + false + } + + /// Called when GC has finished, i.e. when all work packets have been executed. + fn on_gc_finished(&self, worker: &GCWorker) { + // All GC workers must have parked by now. + debug_assert!(!self.worker_group.has_designated_work()); + debug_assert!(self.all_buckets_empty()); + + // Deactivate all work buckets to prepare for the next GC. + self.deactivate_all(); + self.debug_assert_all_buckets_deactivated(); + + let mmtk = worker.mmtk; + + // Tell GC trigger that GC ended - this happens before we resume mutators. + mmtk.gc_trigger.policy.on_gc_end(mmtk); + + // Compute the elapsed time of the GC. + let start_time = { + let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut(); + gc_start_time.take().expect("GC not started yet?") + }; + let elapsed = start_time.elapsed(); + + info!( + "End of GC ({}/{} pages, took {} ms)", + mmtk.get_plan().get_reserved_pages(), + mmtk.get_plan().get_total_pages(), + elapsed.as_millis() + ); + + // USDT tracepoint for the end of GC. + probe!(mmtk, gc_end); + + #[cfg(feature = "count_live_bytes_in_gc")] + { + let live_bytes = mmtk.state.get_live_bytes_in_last_gc(); + let used_bytes = + mmtk.get_plan().get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE; + debug_assert!( + live_bytes <= used_bytes, + "Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.", + live_bytes, used_bytes + ); + info!( + "Live objects = {} bytes ({:04.1}% of {} used pages)", + live_bytes, + live_bytes as f64 * 100.0 / used_bytes as f64, + mmtk.get_plan().get_used_pages() + ); + } + + // All other workers are parked, so it is safe to access the Plan instance mutably. + let plan_mut: &mut dyn Plan = unsafe { mmtk.get_plan_mut() }; + plan_mut.end_of_gc(worker.tls); + + #[cfg(feature = "extreme_assertions")] + if crate::util::edge_logger::should_check_duplicate_edges(mmtk.get_plan()) { + // reset the logging info at the end of each GC + mmtk.edge_logger.reset(); + } + + // Reset the triggering information. + mmtk.state.reset_collection_trigger(); + + // Set to NotInGC after everything, and right before resuming mutators. + mmtk.set_gc_status(GcStatus::NotInGC); + ::VMCollection::resume_mutators(worker.tls); + } + pub fn enable_stat(&self) { for worker in &self.worker_group.workers_shared { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } - let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); - coordinator_worker_stat.enable(); } pub fn statistics(&self) -> HashMap { @@ -389,8 +597,6 @@ impl GCWorkScheduler { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } - let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); - summary.merge(&coordinator_worker_stat); summary.harness_stat() } @@ -398,14 +604,13 @@ impl GCWorkScheduler { mmtk.gc_requester.clear_request(); let first_stw_bucket = &self.work_buckets[WorkBucketStage::first_stw_stage()]; debug_assert!(!first_stw_bucket.is_activated()); - // Note: This is the only place where a non-coordinator thread opens a bucket. - // If the `StopMutators` is executed by the coordinator thread, it will open - // the `Prepare` bucket and let workers start executing packets while the coordinator - // can still add more work packets to `Prepare`. However, since `Prepare` is the first STW - // bucket and only the coordinator can open any subsequent buckets, workers cannot execute - // work packets out of order. This is not generally true if we are not opening the first - // STW bucket. In the future, we should redesign the opening condition of work buckets to - // make the synchronization more robust, + // Note: This is the only place where a bucket is opened without having all workers parked. + // We usually require all workers to park before opening new buckets because otherwise + // packets will be executed out of order. However, since `Prepare` is the first STW + // bucket, and all subsequent buckets require all workers to park before opening, workers + // cannot execute work packets out of order. This is not generally true if we are not + // opening the first STW bucket. In the future, we should redesign the opening condition + // of work buckets to make the synchronization more robust, first_stw_bucket.activate(); self.worker_monitor.notify_work_available(true); } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index d6c6cb6545..ab55093bad 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -1,4 +1,4 @@ -use super::worker::WorkerMonitor; +use super::worker_monitor::WorkerMonitor; use super::*; use crate::vm::VMBinding; use crossbeam::deque::{Injector, Steal, Worker}; @@ -139,6 +139,19 @@ impl WorkBucket { self.notify_one_worker(); } + /// Add a work packet to this bucket, but do not notify any workers. + /// This is useful when the current thread is holding the mutex of `WorkerMonitor` which is + /// used for notifying workers. This usually happens if the current thread is the last worker + /// parked. + pub(crate) fn add_no_notify>(&self, work: W) { + self.queue.push(Box::new(work)); + } + + /// Like [`WorkBucket::add_no_notify`], but the work is boxed. + pub(crate) fn add_boxed_no_notify(&self, work: Box>) { + self.queue.push(work); + } + /// Add multiple packets with a higher priority. /// Panic if this bucket cannot receive prioritized packets. pub fn bulk_add_prioritized(&self, work_vec: Vec>>) { @@ -210,11 +223,10 @@ impl WorkBucket { sentinel.take() }; if let Some(work) = maybe_sentinel { - // We don't need to call `self.add` because this function is called by the coordinator - // when workers are stopped. We don't need to notify the workers because the - // coordinator will do that later. - // We can just "sneak" the sentinel work packet into the current bucket. - self.queue.push(work); + // We don't need to notify other workers because this function is called by the last + // parked worker. After this function returns, the caller will notify workers because + // more work packets become available. + self.add_boxed_no_notify(work); true } else { false diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 504b2f6b48..babfd32f29 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -12,7 +12,7 @@ use crossbeam::queue::ArrayQueue; #[cfg(feature = "count_live_bytes_in_gc")] use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; /// Represents the ID of a GC worker thread. pub type ThreadId = usize; @@ -33,8 +33,9 @@ pub fn current_worker_ordinal() -> ThreadId { ordinal } -/// The part shared between a GCWorker and the scheduler. -/// This structure is used for communication, e.g. adding new work packets. +/// The struct has one instance per worker, but is shared between workers via the scheduler +/// instance. This structure is used for communication between workers, e.g. adding designated +/// work packets, stealing work packets from other workers, and collecting per-worker statistics. pub struct GCWorkerShared { /// Worker-local statistics data. stat: AtomicRefCell>, @@ -44,10 +45,6 @@ pub struct GCWorkerShared { #[cfg(feature = "count_live_bytes_in_gc")] live_bytes: AtomicUsize, /// A queue of GCWork that can only be processed by the owned thread. - /// - /// Note: Currently, designated work cannot be added from the GC controller thread, or - /// there will be synchronization problems. If it is necessary to do so, we need to - /// update the code in `GCWorkScheduler::poll_slow` for proper synchornization. pub designated_work: ArrayQueue>>, /// Handle for stealing packets from the current worker pub stealer: Option>>>, @@ -75,181 +72,11 @@ impl GCWorkerShared { } } -/// Used to synchronize mutually exclusive operations between workers and controller, -/// and also waking up workers when more work packets are available. -pub(crate) struct WorkerMonitor { - /// The synchronized part. - sync: Mutex, - /// This is notified when new work is made available for the workers. - /// Particularly, it is notified when - /// - `sync.worker_group_state` is transitioned to `Working` because - /// - some workers still have designated work, or - /// - some sentinel work packets are added to their drained buckets, or - /// - some work buckets are opened, or - /// - any work packet is added to any open bucket. - /// Workers wait on this condvar. - work_available: Condvar, - /// This is notified when all workers parked. - /// The coordinator waits on this condvar. - all_workers_parked: Condvar, -} - -/// The state of the worker group. -/// -/// The worker group alternates between the `Sleeping` and the `Working` state. Workers are -/// allowed to execute work packets in the `Working` state. However, once workers entered the -/// `Sleeping` state, they will not be allowed to packets from buckets until the coordinator -/// explicitly transitions the state back to `Working` after it found more work for workers to do. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum WorkerGroupState { - /// In this state, the coordinator can open new buckets and close buckets, - /// but workers cannot execute any packets or get any work packets from any buckets. - /// Workers cannot unpark in this state. - Sleeping, - /// In this state, workers can get work packets from open buckets, - /// but no buckets can be opened or closed. - Working, -} - -/// The synchronized part of `WorkerMonitor`. -pub(crate) struct WorkerMonitorSync { - /// The total number of workers. - worker_count: usize, - /// Number of parked workers. - parked_workers: usize, - /// The worker group state. - worker_group_state: WorkerGroupState, -} - -impl WorkerMonitor { - pub fn new(worker_count: usize) -> Self { - Self { - sync: Mutex::new(WorkerMonitorSync { - worker_count, - parked_workers: 0, - worker_group_state: WorkerGroupState::Sleeping, - }), - work_available: Default::default(), - all_workers_parked: Default::default(), - } - } - - /// Wake up workers when more work packets are made available for workers. - /// This function is called when adding work packets to buckets. - /// This function doesn't change the `work_group_state` variable. - /// If workers are in the `Sleeping` state, use `resume_and_wait` to resume workers. - pub fn notify_work_available(&self, all: bool) { - let sync = self.sync.lock().unwrap(); - - // Don't notify workers if we are adding packets when workers are sleeping. - // This could happen when we add `ScheduleCollection` or schedule sentinels. - if sync.worker_group_state == WorkerGroupState::Sleeping { - return; - } - - if all { - self.work_available.notify_all(); - } else { - self.work_available.notify_one(); - } - } - - /// Wake up workers and wait until they transition to `Sleeping` state again. - /// This is called by the coordinator. - /// If `all` is true, notify all workers; otherwise only notify one worker. - pub fn resume_and_wait(&self, all: bool) { - let mut sync = self.sync.lock().unwrap(); - sync.worker_group_state = WorkerGroupState::Working; - if all { - self.work_available.notify_all(); - } else { - self.work_available.notify_one(); - } - let _sync = self - .all_workers_parked - .wait_while(sync, |sync| { - sync.worker_group_state == WorkerGroupState::Working - }) - .unwrap(); - } - - /// Test if the worker group is in the `Sleeping` state. - pub fn debug_is_sleeping(&self) -> bool { - let sync = self.sync.lock().unwrap(); - sync.worker_group_state == WorkerGroupState::Sleeping - } - - /// Park until more work is available. - /// The argument `worker` indicates this function can only be called by workers. - pub fn park_and_wait(&self, worker: &GCWorker) { - let mut sync = self.sync.lock().unwrap(); - - // Park this worker - let all_parked = sync.inc_parked_workers(); - trace!("Worker {} parked.", worker.ordinal); - - if all_parked { - // If all workers are parked, enter "Sleeping" state and notify controller. - sync.worker_group_state = WorkerGroupState::Sleeping; - debug!( - "Worker {} notifies the coordinator that all workerer parked.", - worker.ordinal - ); - self.all_workers_parked.notify_one(); - } else { - // Otherwise wait until notified. - // Note: The condition for this `cond.wait` is "more work is available". - // If this worker spuriously wakes up, then in the next loop iteration, the - // `poll_schedulable_work` invocation above will fail, and the worker will reach - // here and wait again. - sync = self.work_available.wait(sync).unwrap(); - } - - // If we are in the `Sleeping` state, wait until leaving that state. - sync = self - .work_available - .wait_while(sync, |sync| { - sync.worker_group_state == WorkerGroupState::Sleeping - }) - .unwrap(); - - // Unpark this worker. - sync.dec_parked_workers(); - trace!("Worker {} unparked.", worker.ordinal); - } -} - -impl WorkerMonitorSync { - /// Increase the packed-workers counter. - /// Called before a worker is parked. - /// - /// Return true if all the workers are parked. - fn inc_parked_workers(&mut self) -> bool { - let old = self.parked_workers; - debug_assert!(old < self.worker_count); - let new = old + 1; - self.parked_workers = new; - new == self.worker_count - } - - /// Decrease the packed-workers counter. - /// Called after a worker is resumed from the parked state. - fn dec_parked_workers(&mut self) { - let old = self.parked_workers; - debug_assert!(old <= self.worker_count); - debug_assert!(old > 0); - let new = old - 1; - self.parked_workers = new; - } -} - /// A GC worker. This part is privately owned by a worker thread. -/// The GC controller also has an embedded `GCWorker` because it may also execute work packets. pub struct GCWorker { /// The VM-specific thread-local state of the GC thread. pub tls: VMWorkerThread, - /// The ordinal of the worker, numbered from 0 to the number of workers minus one. The ordinal - /// is usize::MAX if it is the embedded worker of the GC controller thread. + /// The ordinal of the worker, numbered from 0 to the number of workers minus one. pub ordinal: ThreadId, /// The reference to the scheduler. scheduler: Arc>, @@ -257,9 +84,6 @@ pub struct GCWorker { copy: GCWorkerCopyContext, /// The reference to the MMTk instance. pub mmtk: &'static MMTK, - /// True if this struct is the embedded GCWorker of the controller thread. - /// False if this struct belongs to a standalone GCWorker thread. - is_coordinator: bool, /// Reference to the shared part of the GC worker. It is used for synchronization. pub shared: Arc>, /// Local work packet queue. @@ -283,12 +107,22 @@ impl GCWorkerShared { } } +/// A special error type that indicate a worker should exit. +/// This may happen if the VM needs to fork and asks workers to exit. +#[derive(Debug)] +pub(crate) struct WorkerShouldExit; + +/// The result type of `GCWorker::pool`. +/// Too many functions return `Option>>`. In most cases, when `None` is +/// returned, the caller should try getting work packets from another place. To avoid confusion, +/// we use `Err(WorkerShouldExit)` to clearly indicate that the worker should exit immediately. +pub(crate) type PollResult = Result>, WorkerShouldExit>; + impl GCWorker { pub(crate) fn new( mmtk: &'static MMTK, ordinal: ThreadId, scheduler: Arc>, - is_coordinator: bool, shared: Arc>, local_work_buffer: deque::Worker>>, ) -> Self { @@ -299,7 +133,6 @@ impl GCWorker { copy: GCWorkerCopyContext::new_non_copy(), scheduler, mmtk, - is_coordinator, shared, local_work_buffer, } @@ -333,11 +166,6 @@ impl GCWorker { self.local_work_buffer.push(Box::new(work)); } - /// Is this worker a coordinator or a normal GC worker? - pub fn is_coordinator(&self) -> bool { - self.is_coordinator - } - /// Get the scheduler. There is only one scheduler per MMTk instance. pub fn scheduler(&self) -> &GCWorkScheduler { &self.scheduler @@ -354,18 +182,36 @@ impl GCWorker { /// 2. Poll from the local work queue. /// 3. Poll from activated global work-buckets /// 4. Steal from other workers - fn poll(&self) -> Box> { - self.shared - .designated_work - .pop() - .or_else(|| self.local_work_buffer.pop()) - .unwrap_or_else(|| self.scheduler().poll(self)) + fn poll(&mut self) -> PollResult { + if let Some(work) = self.shared.designated_work.pop() { + return Ok(work); + } + + if let Some(work) = self.local_work_buffer.pop() { + return Ok(work); + } + + self.scheduler().poll(self) } - /// Entry of the worker thread. Resolve thread affinity, if it has been specified by the user. - /// Each worker will keep polling and executing work packets in a loop. - pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { + /// Entry point of the worker thread. + /// + /// This function will resolve thread affinity, if it has been specified by the user. + /// + /// Each worker will keep polling and executing work packets in a loop. It runs until the + /// worker is requested to exit. Currently a worker may exit after + /// [`crate::mmtk::MMTK::prepare_to_fork`] is called. + /// + /// Arguments: + /// * `tls`: The VM-specific thread-local storage for this GC worker thread. + /// * `mmtk`: A reference to an MMTk instance. + pub fn run(mut self: Box, tls: VMWorkerThread, mmtk: &'static MMTK) { probe!(mmtk, gcworker_run); + debug!( + "Worker started. ordinal: {}, {}", + self.ordinal, + crate::util::rust_util::debug_process_thread_id(), + ); WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst)); self.scheduler.resolve_affinity(self.ordinal); self.tls = tls; @@ -380,7 +226,10 @@ impl GCWorker { // If we have work_start and work_end, we cannot measure the first // poll. probe!(mmtk, work_poll); - let mut work = self.poll(); + let Ok(mut work) = self.poll() else { + // The worker is asked to exit. Break from the loop. + break; + }; // probe! expands to an empty block on unsupported platforms #[allow(unused_variables)] let typename = work.get_type_name(); @@ -392,55 +241,180 @@ impl GCWorker { std::hint::black_box(unsafe { *(typename.as_ptr()) }); probe!(mmtk, work, typename.as_ptr(), typename.len()); - work.do_work_with_stat(self, mmtk); + work.do_work_with_stat(&mut self, mmtk); } + debug!( + "Worker exiting. ordinal: {}, {}", + self.ordinal, + crate::util::rust_util::debug_process_thread_id(), + ); + probe!(mmtk, gcworker_exit); + + mmtk.scheduler.surrender_gc_worker(self); } } -/// A worker group to manage all the GC workers (except the coordinator worker). +/// Stateful part of [`WorkerGroup`]. +enum WorkerCreationState { + /// The initial state. `GCWorker` structs have not been created and GC worker threads have not + /// been spawn. + Initial { + /// The local work queues for to-be-created workers. + local_work_queues: Vec>>>, + }, + /// All worker threads are spawn and running. `GCWorker` structs have been transferred to + /// worker threads. + Spawned, + /// Worker threads are stopping, or have already stopped, for forking. Instances of `GCWorker` + /// structs are collected here to be reused when GC workers are respawn. + Surrendered { + /// `GCWorker` instances not currently owned by active GC worker threads. Once GC workers + /// are respawn, they will take ownership of these `GCWorker` instances. + // Note: Clippy warns about `Vec>` because `Vec` is already in the heap. + // However, the purpose of this `Vec` is allowing GC worker threads to give their + // `Box>` instances back to this pool. Therefore, the `Box` is necessary. + #[allow(clippy::vec_box)] + workers: Vec>>, + }, +} + +/// A worker group to manage all the GC workers. pub(crate) struct WorkerGroup { /// Shared worker data pub workers_shared: Vec>>, - unspawned_local_work_queues: Mutex>>>>, + /// The stateful part. `None` means state transition is underway. + state: Mutex>>, } +/// We have to persuade Rust that `WorkerGroup` is safe to share because the compiler thinks one +/// worker can refer to another worker via the path "worker -> scheduler -> worker_group -> +/// `Surrendered::workers` -> worker" which is cyclic reference and unsafe. +unsafe impl Sync for WorkerGroup {} + impl WorkerGroup { /// Create a WorkerGroup pub fn new(num_workers: usize) -> Arc { - let unspawned_local_work_queues = (0..num_workers) + let local_work_queues = (0..num_workers) .map(|_| deque::Worker::new_fifo()) .collect::>(); let workers_shared = (0..num_workers) .map(|i| { Arc::new(GCWorkerShared::::new(Some( - unspawned_local_work_queues[i].stealer(), + local_work_queues[i].stealer(), ))) }) .collect::>(); Arc::new(Self { workers_shared, - unspawned_local_work_queues: Mutex::new(unspawned_local_work_queues), + state: Mutex::new(Some(WorkerCreationState::Initial { local_work_queues })), }) } + /// Spawn GC worker threads for the first time. + pub fn initial_spawn(&self, tls: VMThread, mmtk: &'static MMTK) { + let mut state = self.state.lock().unwrap(); + + let WorkerCreationState::Initial { local_work_queues } = state.take().unwrap() else { + panic!("GCWorker structs have already been created"); + }; + + let workers = self.create_workers(local_work_queues, mmtk); + self.spawn(workers, tls); + + *state = Some(WorkerCreationState::Spawned); + } + + /// Respawn GC threads after stopping for forking. + pub fn respawn(&self, tls: VMThread) { + let mut state = self.state.lock().unwrap(); + + let WorkerCreationState::Surrendered { workers } = state.take().unwrap() else { + panic!("GCWorker structs have not been created, yet."); + }; + + self.spawn(workers, tls); + + *state = Some(WorkerCreationState::Spawned) + } + + /// Create `GCWorker` instances. + #[allow(clippy::vec_box)] // See `WorkerCreationState::Surrendered`. + fn create_workers( + &self, + local_work_queues: Vec>>>, + mmtk: &'static MMTK, + ) -> Vec>> { + debug!("Creating GCWorker instances..."); + + assert_eq!(self.workers_shared.len(), local_work_queues.len()); + + // Each `GCWorker` instance corresponds to a `GCWorkerShared` at the same index. + let workers = (local_work_queues.into_iter()) + .zip(self.workers_shared.iter()) + .enumerate() + .map(|(ordinal, (queue, shared))| { + Box::new(GCWorker::new( + mmtk, + ordinal, + mmtk.scheduler.clone(), + shared.clone(), + queue, + )) + }) + .collect::>(); + + debug!("Created {} GCWorker instances.", workers.len()); + workers + } + /// Spawn all the worker threads - pub fn spawn(&self, mmtk: &'static MMTK, tls: VMThread) { - let mut unspawned_local_work_queues = self.unspawned_local_work_queues.lock().unwrap(); - // Spawn each worker thread. - for (ordinal, shared) in self.workers_shared.iter().enumerate() { - let worker = Box::new(GCWorker::new( - mmtk, - ordinal, - mmtk.scheduler.clone(), - false, - shared.clone(), - unspawned_local_work_queues.pop().unwrap(), - )); + #[allow(clippy::vec_box)] // See `WorkerCreationState::Surrendered`. + fn spawn(&self, workers: Vec>>, tls: VMThread) { + debug!( + "Spawning GC workers. {}", + crate::util::rust_util::debug_process_thread_id(), + ); + + // We transfer the ownership of each `GCWorker` instance to a GC thread. + for worker in workers { VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); } - debug_assert!(unspawned_local_work_queues.is_empty()); + + debug!( + "Spawned {} worker threads. {}", + self.worker_count(), + crate::util::rust_util::debug_process_thread_id(), + ); + } + + /// Prepare the buffer for workers to surrender their `GCWorker` structs. + pub fn prepare_surrender_buffer(&self) { + let mut state = self.state.lock().unwrap(); + assert!(matches!(*state, Some(WorkerCreationState::Spawned))); + + *state = Some(WorkerCreationState::Surrendered { + workers: Vec::with_capacity(self.worker_count()), + }) + } + + /// Return the `GCWorker` struct to the worker group. + /// This function returns `true` if all workers returned their `GCWorker` structs. + pub fn surrender_gc_worker(&self, worker: Box>) -> bool { + let mut state = self.state.lock().unwrap(); + let WorkerCreationState::Surrendered { ref mut workers } = state.as_mut().unwrap() else { + panic!("GCWorker structs have not been created, yet."); + }; + let ordinal = worker.ordinal; + workers.push(worker); + trace!( + "Worker {} surrendered. ({}/{})", + ordinal, + workers.len(), + self.worker_count() + ); + workers.len() == self.worker_count() } /// Get the number of workers in the group diff --git a/src/scheduler/worker_goals.rs b/src/scheduler/worker_goals.rs new file mode 100644 index 0000000000..695471f7b2 --- /dev/null +++ b/src/scheduler/worker_goals.rs @@ -0,0 +1,116 @@ +//! This module contain "goals" which are larger than work packets, and describes what workers are +//! working towards on a high level. +//! +//! A "goal" is represented by a `WorkerGoal`. All workers work towards a single goal at a time. +//! The current goal influences the behavior of GC workers, especially the last parked worker. +//! For example, +//! +//! - When in the progress of GC, the last parker will try to open buckets or announce the GC +//! has finished. +//! - When stopping for fork, every waken worker should save its thread state (giving in the +//! `GCWorker` struct) and exit. +//! +//! The struct `WorkerGoals` keeps the set of goals requested by mutators, but GC workers will only +//! respond to one request at a time, and will favor higher-priority goals. + +use enum_map::{Enum, EnumMap}; + +/// This current and reqeusted goals. +#[derive(Default, Debug)] +pub(crate) struct WorkerGoals { + /// The current goal. + current: Option, + /// Requests received from mutators. `requests[goal]` is true if the `goal` is requested. + requests: EnumMap, +} + +/// A goal, i.e. something that workers should work together to achieve. +/// +/// Members of this `enum` should be listed from the highest priority to the lowest priority. +#[derive(Debug, Enum, Clone, Copy)] +pub(crate) enum WorkerGoal { + /// Do a garbage collection. + Gc, + /// Stop all GC threads so that the VM can call `fork()`. + StopForFork, +} + +impl WorkerGoals { + /// Set the `goal` as requested. Return `true` if the requested state of the `goal` changed + /// from `false` to `true`. + pub fn set_request(&mut self, goal: WorkerGoal) -> bool { + if !self.requests[goal] { + self.requests[goal] = true; + true + } else { + false + } + } + + /// Move the highest priority goal from the pending requests to the current request. Return + /// that goal, or `None` if no goal has been requested. + pub fn poll_next_goal(&mut self) -> Option { + for (goal, requested) in self.requests.iter_mut() { + if *requested { + *requested = false; + self.current = Some(goal); + probe!(mmtk, goal_set, goal); + return Some(goal); + } + } + None + } + + /// Get the current goal if exists. + pub fn current(&self) -> Option { + self.current + } + + /// Called when the current goal is completed. This will clear the current goal. + pub fn on_current_goal_completed(&mut self) { + probe!(mmtk, goal_complete); + self.current = None + } + + /// Test if the given `goal` is requested. Used for debug purpose, only. The workers always + /// respond to the request of the highest priority first. + pub fn debug_is_requested(&self, goal: WorkerGoal) -> bool { + self.requests[goal] + } +} + +#[cfg(test)] +mod tests { + use super::{WorkerGoal, WorkerGoals}; + + #[test] + fn test_poll_none() { + let mut goals = WorkerGoals::default(); + let next_goal = goals.poll_next_goal(); + + assert!(next_goal.is_none()); + assert!(goals.current().is_none()); + } + + #[test] + fn test_poll_one() { + let mut goals = WorkerGoals::default(); + goals.set_request(WorkerGoal::StopForFork); + let next_goal = goals.poll_next_goal(); + + assert!(matches!(next_goal, Some(WorkerGoal::StopForFork))); + assert!(matches!(goals.current(), Some(WorkerGoal::StopForFork))); + } + + #[test] + fn test_goals_priority() { + let mut goals = WorkerGoals::default(); + goals.set_request(WorkerGoal::StopForFork); + goals.set_request(WorkerGoal::Gc); + + let next_goal = goals.poll_next_goal(); + + assert!(matches!(next_goal, Some(WorkerGoal::Gc))); + assert!(matches!(goals.current(), Some(WorkerGoal::Gc))); + } +} diff --git a/src/scheduler/worker_monitor.rs b/src/scheduler/worker_monitor.rs new file mode 100644 index 0000000000..c84dedfcd3 --- /dev/null +++ b/src/scheduler/worker_monitor.rs @@ -0,0 +1,343 @@ +//! This module contains `WorkerMonitor` and related types. It purposes includes: +//! +//! - allowing workers to park, +//! - letting the last parked worker take action, and +//! - letting workers and mutators notify workers when workers are given things to do. + +use std::sync::{Condvar, Mutex}; + +use super::{ + worker::WorkerShouldExit, + worker_goals::{WorkerGoal, WorkerGoals}, +}; + +/// The result type of the `on_last_parked` call-back in `WorkMonitor::park_and_wait`. +/// It decides how many workers should wake up after `on_last_parked`. +pub(crate) enum LastParkedResult { + /// The last parked worker should wait, too, until more work packets are added. + ParkSelf, + /// The last parked worker should unpark and find work packet to do. + WakeSelf, + /// Wake up all parked GC workers. + WakeAll, +} + +/// A data structure for synchronizing workers with each other and with mutators. +/// +/// Unlike `GCWorkerShared`, there is only one instance of `WorkerMonitor`. +/// +/// - It allows workers to park and unpark. +/// - It allows mutators to notify workers to schedule a GC. +pub(crate) struct WorkerMonitor { + /// The synchronized part. + sync: Mutex, + /// Workers wait on this when idle. Notified if workers have things to do. That include: + /// - any work packets available, and + /// - any field in `sync.goals.requests` set to true. + workers_have_anything_to_do: Condvar, +} + +/// The synchronized part of `WorkerMonitor`. +struct WorkerMonitorSync { + /// Count parked workers. + parker: WorkerParker, + /// Current and requested goals. + goals: WorkerGoals, +} + +/// This struct counts the number of workers parked and identifies the last parked worker. +struct WorkerParker { + /// The total number of workers. + worker_count: usize, + /// Number of parked workers. + parked_workers: usize, +} + +impl WorkerParker { + fn new(worker_count: usize) -> Self { + Self { + worker_count, + parked_workers: 0, + } + } + + /// Increase the packed-workers counter. + /// Called before a worker is parked. + /// + /// Return true if all the workers are parked. + fn inc_parked_workers(&mut self) -> bool { + let old = self.parked_workers; + debug_assert!(old < self.worker_count); + let new = old + 1; + self.parked_workers = new; + new == self.worker_count + } + + /// Decrease the packed-workers counter. + /// Called after a worker is resumed from the parked state. + fn dec_parked_workers(&mut self) { + let old = self.parked_workers; + debug_assert!(old <= self.worker_count); + debug_assert!(old > 0); + let new = old - 1; + self.parked_workers = new; + } +} + +impl WorkerMonitor { + pub fn new(worker_count: usize) -> Self { + Self { + sync: Mutex::new(WorkerMonitorSync { + parker: WorkerParker::new(worker_count), + goals: Default::default(), + }), + workers_have_anything_to_do: Default::default(), + } + } + + /// Make a request. Can be called by a mutator to request the workers to work towards the + /// given `goal`. + pub fn make_request(&self, goal: WorkerGoal) { + let mut guard = self.sync.lock().unwrap(); + let newly_requested = guard.goals.set_request(goal); + if newly_requested { + self.notify_work_available(false); + } + } + + /// Wake up workers when more work packets are made available for workers, + /// or a mutator has requested the GC workers to schedule a GC. + pub fn notify_work_available(&self, all: bool) { + if all { + self.workers_have_anything_to_do.notify_all(); + } else { + self.workers_have_anything_to_do.notify_one(); + } + } + + /// Park a worker and wait on the CondVar `workers_have_anything_to_do`. + /// + /// If it is the last worker parked, `on_last_parked` will be called. + /// The argument of `on_last_parked` is true if `sync.gc_requested` is `true`. + /// The return value of `on_last_parked` will determine whether this worker and other workers + /// will wake up or block waiting. + /// + /// This function returns `Ok(())` if the current worker should continue working, + /// or `Err(WorkerShouldExit)` if the current worker should exit now. + pub fn park_and_wait( + &self, + ordinal: usize, + on_last_parked: F, + ) -> Result<(), WorkerShouldExit> + where + F: FnOnce(&mut WorkerGoals) -> LastParkedResult, + { + let mut sync = self.sync.lock().unwrap(); + + // Park this worker + let all_parked = sync.parker.inc_parked_workers(); + trace!( + "Worker {} parked. parked/total: {}/{}. All parked: {}", + ordinal, + sync.parker.parked_workers, + sync.parker.worker_count, + all_parked + ); + + let mut should_wait = false; + + if all_parked { + trace!("Worker {} is the last worker parked.", ordinal); + let result = on_last_parked(&mut sync.goals); + match result { + LastParkedResult::ParkSelf => { + should_wait = true; + } + LastParkedResult::WakeSelf => { + // Continue without waiting. + } + LastParkedResult::WakeAll => { + self.notify_work_available(true); + } + } + } else { + should_wait = true; + } + + if should_wait { + // Notes on CondVar usage: + // + // Conditional variables are usually tested in a loop while holding a mutex + // + // lock(); + // while condition() { + // condvar.wait(); + // } + // unlock(); + // + // The actual condition for this `self.workers_have_anything_to_do.wait(sync)` is: + // + // 1. any work packet is available, or + // 2. a goal (such as doing GC) is requested + // + // But it is not used like the typical use pattern shown above, mainly because work + // packets can be added without holding the mutex `self.sync`. This means one worker + // can add a new work packet (no mutex needed) right after another worker finds no work + // packets are available and then park. In other words, condition (1) can suddenly + // become true after a worker sees it is false but before the worker blocks waiting on + // the CondVar. If this happens, the last parked worker will block forever and never + // get notified. This may happen if mutators or the previously existing "coordinator + // thread" can add work packets. + // + // However, after the "coordinator thread" was removed, only GC worker threads can add + // work packets during GC. Parked workers (except the last parked worker) cannot make + // more work packets availble (by adding new packets or opening buckets). For this + // reason, the **last** parked worker can be sure that after it finds no packets + // available, no other workers can add another work packet (because they all parked). + // So the **last** parked worker can open more buckets or declare GC finished. + // + // Condition (2), i.e. goals added to `sync.goals`, is guarded by the monitor `sync`. + // When a mutator adds a goal via `WorkerMonitor::make_request`, it will notify a + // worker; and the last parked worker always checks it before waiting. So this + // condition will not be set without any worker noticing. + // + // Note that generational barriers may add `ProcessModBuf` work packets when not in GC. + // This is benign because those work packets are not executed immediately, and are + // guaranteed to be executed in the next GC. + + // Notes on spurious wake-up: + // + // 1. The condition variable `workers_have_anything_to_do` is guarded by `self.sync`. + // Because the last parked worker is holding the mutex `self.sync` when executing + // `on_last_parked`, no workers can unpark (even if they spuriously wake up) during + // `on_last_parked` because they cannot re-acquire the mutex `self.sync`. + // + // 2. Workers may spuriously wake up and unpark when `on_last_parked` is not being + // executed (including the case when the last parked worker is waiting here, too). + // If one or more GC workers spuriously wake up, they will check for work packets, + // and park again if not available. The last parked worker will ensure the two + // conditions listed above are both false before blocking. If either condition is + // true, the last parked worker will take action. + sync = self.workers_have_anything_to_do.wait(sync).unwrap(); + } + + // Unpark this worker. + sync.parker.dec_parked_workers(); + trace!( + "Worker {} unparked. parked/total: {}/{}.", + ordinal, + sync.parker.parked_workers, + sync.parker.worker_count, + ); + + // If the current goal is `StopForFork`, the worker thread should exit. + if matches!(sync.goals.current(), Some(WorkerGoal::StopForFork)) { + return Err(WorkerShouldExit); + } + + Ok(()) + } + + /// Called when all workers have exited. + pub fn on_all_workers_exited(&self) { + let mut sync = self.sync.try_lock().unwrap(); + sync.goals.on_current_goal_completed(); + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }; + + use super::WorkerMonitor; + + /// Test if the `WorkerMonitor::park_and_wait` method calls the `on_last_parked` callback + /// properly. + #[test] + fn test_last_worker_park_wake_all() { + let number_threads = 4; + let worker_monitor = Arc::new(WorkerMonitor::new(number_threads)); + let on_last_parked_called = AtomicUsize::new(0); + let should_unpark = AtomicBool::new(false); + + std::thread::scope(|scope| { + for ordinal in 0..number_threads { + let worker_monitor = worker_monitor.clone(); + let on_last_parked_called = &on_last_parked_called; + let should_unpark = &should_unpark; + scope.spawn(move || { + // This emulates the use pattern in the scheduler, i.e. checking the condition + // ("Is there any work packets available") without holding a mutex. + while !should_unpark.load(Ordering::SeqCst) { + println!("Thread {} parking...", ordinal); + worker_monitor + .park_and_wait(ordinal, |_goals| { + println!("Thread {} is the last thread parked.", ordinal); + on_last_parked_called.fetch_add(1, Ordering::SeqCst); + should_unpark.store(true, Ordering::SeqCst); + super::LastParkedResult::WakeAll + }) + .unwrap(); + println!("Thread {} unparked.", ordinal); + } + }); + } + }); + + // `on_last_parked` should only be called once. + assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1); + } + + /// Like `test_last_worker_park_wake_all`, but only wake up the last parked worker when it + /// parked. + #[test] + fn test_last_worker_park_wake_self() { + let number_threads = 4; + let worker_monitor = Arc::new(WorkerMonitor::new(number_threads)); + let on_last_parked_called = AtomicUsize::new(0); + let threads_running = AtomicUsize::new(0); + let should_unpark = AtomicBool::new(false); + + std::thread::scope(|scope| { + for ordinal in 0..number_threads { + let worker_monitor = worker_monitor.clone(); + let on_last_parked_called = &on_last_parked_called; + let threads_running = &threads_running; + let should_unpark = &should_unpark; + scope.spawn(move || { + let mut i_am_the_last_parked_worker = false; + // Record the number of threads entering the following `while` loop. + threads_running.fetch_add(1, Ordering::SeqCst); + while !should_unpark.load(Ordering::SeqCst) { + println!("Thread {} parking...", ordinal); + worker_monitor + .park_and_wait(ordinal, |_goals| { + println!("Thread {} is the last thread parked.", ordinal); + on_last_parked_called.fetch_add(1, Ordering::SeqCst); + should_unpark.store(true, Ordering::SeqCst); + i_am_the_last_parked_worker = true; + super::LastParkedResult::WakeSelf + }) + .unwrap(); + println!("Thread {} unparked.", ordinal); + } + threads_running.fetch_sub(1, Ordering::SeqCst); + + if i_am_the_last_parked_worker { + println!("The last parked worker woke up"); + // Only the current worker should wake and leave the `while` loop above. + assert_eq!(threads_running.load(Ordering::SeqCst), number_threads - 1); + should_unpark.store(true, Ordering::SeqCst); + worker_monitor.notify_work_available(true); + } + }); + } + }); + + // `on_last_parked` should only be called once. + assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1); + } +} diff --git a/src/util/options.rs b/src/util/options.rs index 11a1b329ae..fb51d0fc65 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -705,10 +705,7 @@ mod gc_trigger_tests { options! { /// The GC plan to use. plan: PlanSelector [env_var: true, command_line: true] [always_valid] = PlanSelector::GenImmix, - /// Number of GC worker threads. (There is always one GC controller thread besides the GC workers) - // FIXME: Currently we create GCWorkScheduler when MMTK is created, which is usually static. - // To allow this as a command-line option, we need to refactor the creation fo the `MMTK` instance. - // See: https://github.com/mmtk/mmtk-core/issues/532 + /// Number of GC worker threads. threads: usize [env_var: true, command_line: true] [|v: &usize| *v > 0] = num_cpus::get(), /// Enable an optimization that only scans the part of the stack that has changed since the last GC (not supported) use_short_stack_scans: bool [env_var: true, command_line: true] [always_valid] = false, diff --git a/src/util/rust_util/mod.rs b/src/util/rust_util/mod.rs index efd7224881..f8a4b3c7c2 100644 --- a/src/util/rust_util/mod.rs +++ b/src/util/rust_util/mod.rs @@ -125,6 +125,23 @@ where unsafe { result_array.assume_init() } } +/// Create a formatted string that makes the best effort idenfying the current process and thread. +pub fn debug_process_thread_id() -> String { + let pid = unsafe { libc::getpid() }; + #[cfg(target_os = "linux")] + { + // `gettid()` is Linux-specific. + let tid = unsafe { libc::gettid() }; + format!("PID: {}, TID: {}", pid, tid) + } + #[cfg(not(target_os = "linux"))] + { + // TODO: When we support other platforms, use platform-specific methods to get thread + // identifiers. + format!("PID: {}", pid) + } +} + #[cfg(test)] mod initialize_once_tests { use super::*; diff --git a/src/util/test_util/mock_vm.rs b/src/util/test_util/mock_vm.rs index 4ac620ac80..9f5d0c7122 100644 --- a/src/util/test_util/mock_vm.rs +++ b/src/util/test_util/mock_vm.rs @@ -82,7 +82,7 @@ where func(&mut lock) } -/// A test that uses `MockVM`` should use this method to wrap the entire test +/// A test that uses `MockVM` should use this method to wrap the entire test /// that may use `MockVM`. /// /// # Arguents diff --git a/src/vm/collection.rs b/src/vm/collection.rs index a5e0a63acd..16e87eebe0 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -4,10 +4,9 @@ use crate::util::opaque_pointer::*; use crate::vm::VMBinding; use crate::{scheduler::*, Mutator}; -/// Thread context for the spawned GC thread. It is used by spawn_gc_thread. +/// Thread context for the spawned GC thread. It is used by `spawn_gc_thread`. +/// Currently, `GCWorker` is the only kind of thread that mmtk-core will create. pub enum GCThreadContext { - /// The GC thread to spawn is a controller thread. There is only one controller thread. - Controller(Box>), /// The GC thread to spawn is a worker thread. There can be multiple worker threads. Worker(Box>), } @@ -32,8 +31,7 @@ pub trait Collection { /// This method may not be called by the same GC thread that called `stop_all_mutators`. /// /// Arguments: - /// * `tls`: The thread pointer for the GC worker. Currently it is the tls of the embedded `GCWorker` instance - /// of the coordinator thread, but it is subject to change, and should not be depended on. + /// * `tls`: The thread pointer for the GC worker. fn resume_mutators(tls: VMWorkerThread); /// Block the current thread for GC. This is called when an allocation request cannot be fulfilled and a GC @@ -48,17 +46,16 @@ pub trait Collection { /// Ask the VM to spawn a GC thread for MMTk. A GC thread may later call into the VM through these VM traits. Some VMs /// have assumptions that those calls needs to be within VM internal threads. /// As a result, MMTk does not spawn GC threads itself to avoid breaking this kind of assumptions. - /// MMTk calls this method to spawn GC threads during [`initialize_collection()`](../memory_manager/fn.initialize_collection.html). + /// MMTk calls this method to spawn GC threads during [`crate::mmtk::MMTK::initialize_collection`] + /// and [`crate::mmtk::MMTK::after_fork`]. /// /// Arguments: /// * `tls`: The thread pointer for the parent thread that we spawn new threads from. This is the same `tls` when the VM /// calls `initialize_collection()` and passes as an argument. /// * `ctx`: The context for the GC thread. - /// * If `Controller` is passed, it means spawning a thread to run as the GC controller. - /// The spawned thread shall call `memory_manager::start_control_collector`. - /// * If `Worker` is passed, it means spawning a thread to run as a GC worker. - /// The spawned thread shall call `memory_manager::start_worker`. - /// In either case, the `Box` inside should be passed back to the called function. + /// * If [`GCThreadContext::Worker`] is passed, it means spawning a thread to run as a GC worker. + /// The spawned thread shall call the entry point function `GCWorker::run`. + /// Currently `Worker` is the only kind of thread which mmtk-core will create. fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext); /// Inform the VM of an out-of-memory error. The binding should hook into the VM's error diff --git a/src/vm/scanning.rs b/src/vm/scanning.rs index 21768f4337..df6a0c6607 100644 --- a/src/vm/scanning.rs +++ b/src/vm/scanning.rs @@ -124,7 +124,7 @@ pub trait RootsWorkFactory: Clone + Send + 'static { /// Create work packets to handle transitively pinning (TP) roots. /// /// Similar to `create_process_pinning_roots_work`, this work packet will not move objects in `nodes`. - /// Unlike ``create_process_pinning_roots_work`, no objects in the transitive closure of `nodes` will be moved, either. + /// Unlike `create_process_pinning_roots_work`, no objects in the transitive closure of `nodes` will be moved, either. /// /// Arguments: /// * `nodes`: A vector of references to objects pointed by root edges. diff --git a/src/vm/tests/mock_tests/mock_test_init_fork.rs b/src/vm/tests/mock_tests/mock_test_init_fork.rs new file mode 100644 index 0000000000..596ce6fe77 --- /dev/null +++ b/src/vm/tests/mock_tests/mock_test_init_fork.rs @@ -0,0 +1,197 @@ +use std::{ + sync::{Condvar, Mutex, MutexGuard}, + thread::JoinHandle, + time::Duration, +}; + +use super::mock_test_prelude::*; +use crate::{ + util::{options::GCTriggerSelector, Address, OpaquePointer, VMThread, VMWorkerThread}, + MMTKBuilder, MMTK, +}; + +#[derive(Default)] +struct ForkTestShared { + sync: Mutex, + all_threads_spawn: Condvar, + all_threads_exited: Condvar, + all_threads_running: Condvar, +} + +#[derive(Default)] +struct ForkTestSync { + join_handles: Vec>, + /// Number of threads spawn. + spawn_threds: usize, + /// Number of threads that have actually entered our entry-point function. + running_threads: usize, + /// Number of threads that have returned from `memory_manager::start_worker`. + exited_threads: usize, +} + +lazy_static! { + static ref SHARED: ForkTestShared = ForkTestShared::default(); +} + +// We fix the number of threads so that we can assert the number of GC threads spawn. +const NUM_WORKER_THREADS: usize = 4; + +// Don't block the CI. +const TIMEOUT: Duration = Duration::from_secs(5); + +/// A convenient wrapper that panics on timeout. +fn wait_timeout_while<'a, T, F>( + guard: MutexGuard<'a, T>, + condvar: &Condvar, + condition: F, +) -> MutexGuard<'a, T> +where + F: FnMut(&mut T) -> bool, +{ + let (guard, timeout_result) = condvar + .wait_timeout_while(guard, TIMEOUT, condition) + .unwrap(); + assert!(!timeout_result.timed_out()); + guard +} + +fn simple_spawn_gc_thread( + _vm_thread: VMThread, + context: GCThreadContext, + mmtk: &'static MMTK, +) { + let GCThreadContext::Worker(worker) = context; + let join_handle = std::thread::spawn(move || { + let ordinal = worker.ordinal; + println!("GC thread starting. Ordinal: {ordinal}"); + + { + let mut sync = SHARED.sync.lock().unwrap(); + sync.running_threads += 1; + if sync.running_threads == NUM_WORKER_THREADS { + SHARED.all_threads_running.notify_all(); + } + } + + let gc_thread_tls = VMWorkerThread(VMThread(OpaquePointer::from_address(Address::ZERO))); + memory_manager::start_worker(mmtk, gc_thread_tls, worker); + + { + let mut sync = SHARED.sync.lock().unwrap(); + sync.running_threads -= 1; + sync.exited_threads += 1; + if sync.exited_threads == NUM_WORKER_THREADS { + SHARED.all_threads_exited.notify_all(); + } + } + + println!("GC thread stopped. Ordinal: {ordinal}"); + }); + + { + let mut sync = SHARED.sync.lock().unwrap(); + sync.join_handles.push(join_handle); + sync.spawn_threds += 1; + if sync.spawn_threds == NUM_WORKER_THREADS { + SHARED.all_threads_spawn.notify_all(); + } + } +} + +/// Test the `initialize_collection` function with actual running GC threads, and the functions for +/// supporting forking. +#[test] +pub fn test_initialize_collection_and_fork() { + // We don't use fixtures or `with_mockvm` because we want to precisely control the + // initialization process. + let mut builder = MMTKBuilder::new(); + // The exact heap size doesn't matter because we don't even allocate. + let trigger = GCTriggerSelector::FixedHeapSize(1024 * 1024); + builder.options.gc_trigger.set(trigger); + builder.options.threads.set(NUM_WORKER_THREADS); + let mmtk: &'static mut MMTK = Box::leak(Box::new(builder.build::())); + + let mock_vm = MockVM { + spawn_gc_thread: MockMethod::new_fixed(Box::new(|(vm_thread, context)| { + simple_spawn_gc_thread(vm_thread, context, mmtk) + })), + ..Default::default() + }; + write_mockvm(move |mock_vm_ref| *mock_vm_ref = mock_vm); + + let test_thread_tls = VMThread(OpaquePointer::from_address(Address::ZERO)); + + // Initialize collection. This will spawn GC worker threads. + mmtk.initialize_collection(test_thread_tls); + + // Wait for GC workers to be spawned, and get their join handles. + let join_handles = { + println!("Waiting for GC worker threads to be spawn"); + let sync = SHARED.sync.lock().unwrap(); + + // We wait for `all_threads_spawn` instead of `all_threads_running`. It is not necessary + // to wait for all GC worker threads to enter `memory_manager::start_worker` before calling + // `prepare_to_fork`. The impementation of `initialize_collection` and `prepare_to_fork` + // should be robust against GC worker threads that start slower than usual. + let mut sync = wait_timeout_while(sync, &SHARED.all_threads_spawn, |sync| { + sync.spawn_threds < NUM_WORKER_THREADS + }); + + // Take join handles out of `SHARED.sync` so that the main thread can join them without + // holding the Mutex, and GC workers can acquire the mutex and mutate `SHARED.sync`. + std::mem::take(&mut sync.join_handles) + }; + + assert_eq!(join_handles.len(), NUM_WORKER_THREADS); + + // Now we prepare to fork. GC worker threads should go down. + mmtk.prepare_to_fork(); + + println!("Waiting for GC worker threads to stop"); + + { + // In theory, we can just join the join handles, and is unnecessary to wait for + // `SHARED.all_threads_exited`. This is a workaround for the fact that + // `JoinHandle::join()` does not have a variant that supports timeout. We use + // `wait_timeout_while` so that it won't hang the CI. When the condvar + // `all_threads_exited` is notified, all GC workers will have returned from + // `memory_manager::start_worker`, and it is unlikely that the `join_handle.join()` below + // will block for too long. + let sync = SHARED.sync.lock().unwrap(); + let _sync = wait_timeout_while(sync, &SHARED.all_threads_exited, |sync| { + sync.exited_threads < NUM_WORKER_THREADS + }); + } + + for join_handle in join_handles { + // TODO: PThread has `pthread_timedjoin_np`, but the `JoinHandle` in the Rust standard + // library doesn't have a variant that supports timeout. Let's wait for the Rust library + // to update. + join_handle.join().unwrap(); + } + + println!("All GC worker threads stopped"); + + { + let mut sync = SHARED.sync.lock().unwrap(); + assert_eq!(sync.running_threads, 0); + + // Reset counters. + sync.spawn_threds = 0; + sync.exited_threads = 0; + } + + // We don't actually call `fork()` in this test, but we pretend we have called `fork()`. + // We now try to resume GC workers. + mmtk.after_fork(test_thread_tls); + + { + println!("Waiting for GC worker threads to be running after calling `after_fork`"); + let sync = SHARED.sync.lock().unwrap(); + let _sync = wait_timeout_while(sync, &SHARED.all_threads_running, |sync| { + sync.running_threads < NUM_WORKER_THREADS + }); + } + + println!("All GC worker threads are up and running."); +} diff --git a/src/vm/tests/mock_tests/mod.rs b/src/vm/tests/mock_tests/mod.rs index 51a085cec8..5bf880fbb6 100644 --- a/src/vm/tests/mock_tests/mod.rs +++ b/src/vm/tests/mock_tests/mod.rs @@ -33,6 +33,7 @@ mod mock_test_edges; #[cfg(target_os = "linux")] mod mock_test_handle_mmap_conflict; mod mock_test_handle_mmap_oom; +mod mock_test_init_fork; mod mock_test_is_in_mmtk_spaces; mod mock_test_issue139_allocate_non_multiple_of_min_alignment; mod mock_test_issue867_allocate_unrealistically_large_object; diff --git a/tools/tracing/README.md b/tools/tracing/README.md index 7c5584d082..0dc05b3210 100644 --- a/tools/tracing/README.md +++ b/tools/tracing/README.md @@ -16,11 +16,15 @@ shipped with the MMTk release you use. Currently, the core provides the following tracepoints. -- `mmtk:collection_initialized()`: GC is enabled +- `mmtk:collection_initialized()`: All GC worker threads are spawn +- `mmtk:prepare_fork()`: The VM requests MMTk core to prepare for calling `fork()`. +- `mmtk:after_fork()`: The VM notifies MMTk core it has finished calling `fork()`. +- `mmtk:goal_set(goal: int)`: GC workers have started working on a goal. +- `mmtk:goal_complete(goal: int)`: GC workers have fihisned working on a goal. - `mmtk:harness_begin()`: the timing iteration of a benchmark begins - `mmtk:harness_end()`: the timing iteration of a benchmark ends -- `mmtk:gccontroller_run()`: the GC controller thread enters its work loop - `mmtk:gcworker_run()`: a GC worker thread enters its work loop +- `mmtk:gcworker_exit()`: a GC worker thread exits its work loop - `mmtk:gc_start()`: a collection epoch starts - `mmtk:gc_end()`: a collection epoch ends - `mmtk:process_edges(num_edges: int, is_roots: bool)`: a invocation of the `process_edges`