Skip to content

Commit

Permalink
No coordinator work (#794)
Browse files Browse the repository at this point in the history
Remove the concept of "coordinator work packets". Now ScheduleCollection
and StopMutators are both executed on ordinary workers. The only work
packet executed by the coordinator is EndOfGC.

Simplified the interaction between the coordinator and the workers. The
coordinator only responds to the event that "all workers have parked".
Removed the workers-to-coordinator channel. WorkerMonitor now has two
Condvars, one (the existing one) for notifying workers about more work
available, and another for notifying the coordinator that all workers
have parked.

Fixes: #792
  • Loading branch information
wks authored Apr 27, 2023
1 parent df146b7 commit 43e8a92
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 312 deletions.
108 changes: 32 additions & 76 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::{Arc, Condvar, Mutex};
use std::sync::Arc;

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::{GCWork, WorkBucketStage};
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;

use self::channel::{Event, Receiver};

use super::{CoordinatorWork, GCWorkScheduler, GCWorker};

pub(crate) mod channel;
use super::{GCWorkScheduler, GCWorker};

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
Expand All @@ -25,8 +22,6 @@ pub struct GCController<VM: VMBinding> {
requester: Arc<GCRequester<VM>>,
/// The reference to the scheduler.
scheduler: Arc<GCWorkScheduler<VM>>,
/// Receive coordinator work packets and notifications from GC workers through this.
receiver: Receiver<VM>,
/// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
coordinator_worker: GCWorker<VM>,
}
Expand All @@ -36,14 +31,12 @@ impl<VM: VMBinding> GCController<VM> {
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<VM>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
mmtk,
requester,
scheduler,
receiver,
coordinator_worker,
})
}
Expand Down Expand Up @@ -79,76 +72,52 @@ impl<VM: VMBinding> GCController<VM> {
return true;
}

// If all fo the above failed, it means GC has finished.
// If all of the above failed, it means GC has finished.
false
}

/// Reset the "all workers parked" state and resume workers.
fn reset_and_resume_workers(&mut self) {
self.receiver.reset_all_workers_parked();
self.scheduler.worker_monitor.notify_work_available(true);
debug!("Workers resumed");
}
/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();

/// Handle the "all workers have parked" event. Return true if GC is finished.
fn on_all_workers_parked(&mut self) -> bool {
assert!(self.scheduler.all_activated_buckets_are_empty());
debug_assert!(
self.scheduler.worker_monitor.debug_is_sleeping(),
"Workers are still doing work when GC started."
);

let new_work_available = self.find_more_work_for_workers();
// Add a ScheduleCollection work packet. It is the seed of other work packets.
self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection);

if new_work_available {
self.reset_and_resume_workers();
// If there is more work to do, GC has not finished.
return false;
}
// Notify only one worker at this time because there is only one work packet,
// namely `ScheduleCollection`.
self.scheduler.worker_monitor.resume_and_wait(false);

assert!(self.scheduler.all_buckets_empty());
// Gradually open more buckets as workers stop each time they drain all open bucket.
loop {
// Workers should only transition to the `Sleeping` state when all open buckets have
// been drained.
self.scheduler.assert_all_activated_buckets_are_empty();

true
}
let new_work_available = self.find_more_work_for_workers();

/// Process an event. Return true if the GC is finished.
fn process_event(&mut self, message: Event<VM>) -> bool {
match message {
Event::Work(mut work) => {
self.execute_coordinator_work(work.as_mut(), true);
false
}
Event::AllParked => self.on_all_workers_parked(),
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();
// Schedule collection.
self.execute_coordinator_work(&mut ScheduleCollection, true);

// Tell GC trigger that GC started - this happens after ScheduleCollection so we
// will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
self.mmtk
.plan
.base()
.gc_trigger
.policy
.on_gc_start(self.mmtk);

// React to worker-generated events until finished.
loop {
let event = self.receiver.poll_event();
let finished = self.process_event(event);
if finished {
// GC finishes if there is no new work to do.
if !new_work_available {
break;
}

// Notify all workers because there should be many work packets available in the newly
// opened bucket(s).
self.scheduler.worker_monitor.resume_and_wait(true);
}

// All GC workers must have parked by now.
debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping());
debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping());
debug_assert!(!self.scheduler.worker_group.has_designated_work());
debug_assert!(self.scheduler.all_buckets_empty());

// Deactivate all work buckets to prepare for the next GC.
// NOTE: There is no need to hold any lock.
// All GC workers are doing "group sleeping" now,
// Workers are in the `Sleeping` state.
// so they will not wake up while we deactivate buckets.
self.scheduler.deactivate_all();

Expand All @@ -163,21 +132,8 @@ impl<VM: VMBinding> GCController<VM> {
let mut end_of_gc = EndOfGC {
elapsed: gc_start.elapsed(),
};

self.execute_coordinator_work(&mut end_of_gc, false);
end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler.debug_assert_all_buckets_deactivated();
}

fn execute_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

if notify_workers {
self.reset_and_resume_workers();
};
}
}
114 changes: 0 additions & 114 deletions src/scheduler/controller/channel.rs

This file was deleted.

27 changes: 7 additions & 20 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ pub struct ScheduleCollection;
impl<VM: VMBinding> GCWork<VM> for ScheduleCollection {
fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
mmtk.plan.schedule_collection(worker.scheduler());

// Tell GC trigger that GC started.
// We now know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
// TODO: Depending on the OS scheduling, other workers can run so fast that they can finish
// everything in the `Unconstrained` and the `Prepare` buckets before we execute the next
// statement. Consider if there is a better place to call `on_gc_start`.
mmtk.plan.base().gc_trigger.policy.on_gc_start(mmtk);
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for ScheduleCollection {}

/// The global GC Preparation Work
/// This work packet invokes prepare() for the plan (which will invoke prepare() for each space), and
/// pushes work packets for preparing mutators and collectors.
Expand Down Expand Up @@ -174,15 +179,6 @@ impl<ScanEdges: ProcessEdgesWork> StopMutators<ScanEdges> {

impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, mmtk: &'static MMTK<E::VM>) {
// If the VM requires that only the coordinator thread can stop the world,
// we delegate the work to the coordinator.
if <E::VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() {
worker
.sender
.add_coordinator_work(Box::new(StopMutators::<E>::new()));
return;
}

trace!("stop_all_mutators start");
mmtk.plan.base().prepare_for_stack_scanning();
<E::VM as VMBinding>::VMCollection::stop_all_mutators(worker.tls, |mutator| {
Expand Down Expand Up @@ -217,8 +213,6 @@ impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
}
}

impl<E: ProcessEdgesWork> CoordinatorWork<E::VM> for StopMutators<E> {}

#[derive(Default)]
pub struct EndOfGC {
pub elapsed: std::time::Duration,
Expand All @@ -244,11 +238,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
mmtk.edge_logger.reset();
}

if <VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW {
assert!(worker.is_coordinator(),
"VM only allows coordinator to resume mutators, but the current worker is not the coordinator.");
}

mmtk.plan.base().set_gc_status(GcStatus::NotInGC);

// Reset the triggering information.
Expand All @@ -258,8 +247,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for EndOfGC {}

/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped
/// `ProcessEdgesWork` instance.
struct ProcessEdgesWorkTracer<E: ProcessEdgesWork> {
Expand Down
1 change: 0 additions & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod stat;
pub(self) mod work_counter;

mod work;
pub use work::CoordinatorWork;
pub use work::GCWork;
pub(crate) use work::GCWorkContext;

Expand Down
Loading

0 comments on commit 43e8a92

Please sign in to comment.