Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No coordinator work #794

Merged
merged 4 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 32 additions & 76 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::{Arc, Condvar, Mutex};
use std::sync::Arc;

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::{GCWork, WorkBucketStage};
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;

use self::channel::{Event, Receiver};

use super::{CoordinatorWork, GCWorkScheduler, GCWorker};

pub(crate) mod channel;
use super::{GCWorkScheduler, GCWorker};

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
Expand All @@ -25,8 +22,6 @@ pub struct GCController<VM: VMBinding> {
requester: Arc<GCRequester<VM>>,
/// The reference to the scheduler.
scheduler: Arc<GCWorkScheduler<VM>>,
/// Receive coordinator work packets and notifications from GC workers through this.
receiver: Receiver<VM>,
/// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
coordinator_worker: GCWorker<VM>,
}
Expand All @@ -36,14 +31,12 @@ impl<VM: VMBinding> GCController<VM> {
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<VM>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
mmtk,
requester,
scheduler,
receiver,
coordinator_worker,
})
}
Expand Down Expand Up @@ -79,76 +72,52 @@ impl<VM: VMBinding> GCController<VM> {
return true;
}

// If all fo the above failed, it means GC has finished.
// If all of the above failed, it means GC has finished.
false
}

/// Reset the "all workers parked" state and resume workers.
fn reset_and_resume_workers(&mut self) {
self.receiver.reset_all_workers_parked();
self.scheduler.worker_monitor.notify_work_available(true);
debug!("Workers resumed");
}
/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();

/// Handle the "all workers have parked" event. Return true if GC is finished.
fn on_all_workers_parked(&mut self) -> bool {
assert!(self.scheduler.all_activated_buckets_are_empty());
debug_assert!(
self.scheduler.worker_monitor.debug_is_sleeping(),
"Workers are still doing work when GC started."
);

let new_work_available = self.find_more_work_for_workers();
// Add a ScheduleCollection work packet. It is the seed of other work packets.
self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection);
qinsoon marked this conversation as resolved.
Show resolved Hide resolved

if new_work_available {
self.reset_and_resume_workers();
// If there is more work to do, GC has not finished.
return false;
}
// Notify only one worker at this time because there is only one work packet,
// namely `ScheduleCollection`.
self.scheduler.worker_monitor.resume_and_wait(false);

assert!(self.scheduler.all_buckets_empty());
// Gradually open more buckets as workers stop each time they drain all open bucket.
loop {
// Workers should only transition to the `Sleeping` state when all open buckets have
// been drained.
self.scheduler.assert_all_activated_buckets_are_empty();

true
}
let new_work_available = self.find_more_work_for_workers();

/// Process an event. Return true if the GC is finished.
fn process_event(&mut self, message: Event<VM>) -> bool {
match message {
Event::Work(mut work) => {
self.execute_coordinator_work(work.as_mut(), true);
false
}
Event::AllParked => self.on_all_workers_parked(),
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();
// Schedule collection.
self.execute_coordinator_work(&mut ScheduleCollection, true);

// Tell GC trigger that GC started - this happens after ScheduleCollection so we
// will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
self.mmtk
.plan
.base()
.gc_trigger
.policy
.on_gc_start(self.mmtk);

// React to worker-generated events until finished.
loop {
let event = self.receiver.poll_event();
let finished = self.process_event(event);
if finished {
// GC finishes if there is no new work to do.
if !new_work_available {
break;
}

// Notify all workers because there should be many work packets available in the newly
// opened bucket(s).
self.scheduler.worker_monitor.resume_and_wait(true);
}

// All GC workers must have parked by now.
debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping());
debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping());
debug_assert!(!self.scheduler.worker_group.has_designated_work());
debug_assert!(self.scheduler.all_buckets_empty());

// Deactivate all work buckets to prepare for the next GC.
// NOTE: There is no need to hold any lock.
// All GC workers are doing "group sleeping" now,
// Workers are in the `Sleeping` state.
// so they will not wake up while we deactivate buckets.
self.scheduler.deactivate_all();

Expand All @@ -163,21 +132,8 @@ impl<VM: VMBinding> GCController<VM> {
let mut end_of_gc = EndOfGC {
elapsed: gc_start.elapsed(),
};

self.execute_coordinator_work(&mut end_of_gc, false);
end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler.debug_assert_all_buckets_deactivated();
}

fn execute_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

if notify_workers {
self.reset_and_resume_workers();
};
}
}
114 changes: 0 additions & 114 deletions src/scheduler/controller/channel.rs

This file was deleted.

27 changes: 7 additions & 20 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ pub struct ScheduleCollection;
impl<VM: VMBinding> GCWork<VM> for ScheduleCollection {
fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
mmtk.plan.schedule_collection(worker.scheduler());

// Tell GC trigger that GC started.
// We now know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
// TODO: Depending on the OS scheduling, other workers can run so fast that they can finish
// everything in the `Unconstrained` and the `Prepare` buckets before we execute the next
// statement. Consider if there is a better place to call `on_gc_start`.
mmtk.plan.base().gc_trigger.policy.on_gc_start(mmtk);
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for ScheduleCollection {}

/// The global GC Preparation Work
/// This work packet invokes prepare() for the plan (which will invoke prepare() for each space), and
/// pushes work packets for preparing mutators and collectors.
Expand Down Expand Up @@ -174,15 +179,6 @@ impl<ScanEdges: ProcessEdgesWork> StopMutators<ScanEdges> {

impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, mmtk: &'static MMTK<E::VM>) {
// If the VM requires that only the coordinator thread can stop the world,
// we delegate the work to the coordinator.
if <E::VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() {
worker
.sender
.add_coordinator_work(Box::new(StopMutators::<E>::new()));
return;
}

trace!("stop_all_mutators start");
mmtk.plan.base().prepare_for_stack_scanning();
<E::VM as VMBinding>::VMCollection::stop_all_mutators(worker.tls, |mutator| {
Expand Down Expand Up @@ -217,8 +213,6 @@ impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
}
}

impl<E: ProcessEdgesWork> CoordinatorWork<E::VM> for StopMutators<E> {}

#[derive(Default)]
pub struct EndOfGC {
pub elapsed: std::time::Duration,
Expand All @@ -244,11 +238,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
mmtk.edge_logger.reset();
}

if <VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW {
assert!(worker.is_coordinator(),
"VM only allows coordinator to resume mutators, but the current worker is not the coordinator.");
}

mmtk.plan.base().set_gc_status(GcStatus::NotInGC);

// Reset the triggering information.
Expand All @@ -258,8 +247,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for EndOfGC {}

/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped
/// `ProcessEdgesWork` instance.
struct ProcessEdgesWorkTracer<E: ProcessEdgesWork> {
Expand Down
1 change: 0 additions & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod stat;
pub(self) mod work_counter;

mod work;
pub use work::CoordinatorWork;
pub use work::GCWork;
pub(crate) use work::GCWorkContext;

Expand Down
Loading