Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No coordinator work #794

Merged
merged 4 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 23 additions & 70 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::{Arc, Condvar, Mutex};
use std::sync::Arc;

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::{GCWork, WorkBucketStage};
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;

use self::channel::{Event, Receiver};

use super::{CoordinatorWork, GCWorkScheduler, GCWorker};

pub(crate) mod channel;
use super::{GCWorkScheduler, GCWorker};

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
Expand All @@ -25,8 +22,6 @@ pub struct GCController<VM: VMBinding> {
requester: Arc<GCRequester<VM>>,
/// The reference to the scheduler.
scheduler: Arc<GCWorkScheduler<VM>>,
/// Receive coordinator work packets and notifications from GC workers through this.
receiver: Receiver<VM>,
/// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
coordinator_worker: GCWorker<VM>,
}
Expand All @@ -36,14 +31,12 @@ impl<VM: VMBinding> GCController<VM> {
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<VM>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
mmtk,
requester,
scheduler,
receiver,
coordinator_worker,
})
}
Expand Down Expand Up @@ -79,76 +72,49 @@ impl<VM: VMBinding> GCController<VM> {
return true;
}

// If all fo the above failed, it means GC has finished.
// If all of the above failed, it means GC has finished.
false
}

/// Reset the "all workers parked" state and resume workers.
fn reset_and_resume_workers(&mut self) {
self.receiver.reset_all_workers_parked();
self.scheduler.worker_monitor.notify_work_available(true);
debug!("Workers resumed");
}

/// Handle the "all workers have parked" event. Return true if GC is finished.
fn on_all_workers_parked(&mut self) -> bool {
assert!(self.scheduler.all_activated_buckets_are_empty());
self.scheduler.assert_all_activated_buckets_are_empty();

let new_work_available = self.find_more_work_for_workers();

if new_work_available {
self.reset_and_resume_workers();
// If there is more work to do, GC has not finished.
return false;
}

assert!(self.scheduler.all_buckets_empty());

true
}

/// Process an event. Return true if the GC is finished.
fn process_event(&mut self, message: Event<VM>) -> bool {
match message {
Event::Work(mut work) => {
self.execute_coordinator_work(work.as_mut(), true);
false
}
Event::AllParked => self.on_all_workers_parked(),
}
// GC finishes if there is no new work to do.
!new_work_available
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();
// Schedule collection.
self.execute_coordinator_work(&mut ScheduleCollection, true);

// Tell GC trigger that GC started - this happens after ScheduleCollection so we
// will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
self.mmtk
.plan
.base()
.gc_trigger
.policy
.on_gc_start(self.mmtk);

// React to worker-generated events until finished.

debug_assert!(
self.scheduler.worker_monitor.debug_is_sleeping(),
"Workers are still doing work when GC started."
);

// Add a ScheduleCollection work packet. It is the seed of other work packets.
self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection);
qinsoon marked this conversation as resolved.
Show resolved Hide resolved

// Resume the workers and gradually open more buckets when they stop together.
loop {
let event = self.receiver.poll_event();
let finished = self.process_event(event);
self.scheduler.worker_monitor.resume_and_wait();
let finished = self.on_all_workers_parked();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just remove on_all_workers_parked() -- it is simple enough to be included in the main loop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

if finished {
break;
}
}

// All GC workers must have parked by now.
debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping());
debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping());
debug_assert!(!self.scheduler.worker_group.has_designated_work());
debug_assert!(self.scheduler.all_buckets_empty());

// Deactivate all work buckets to prepare for the next GC.
// NOTE: There is no need to hold any lock.
// All GC workers are doing "group sleeping" now,
// Workers are in the `Sleeping` state.
// so they will not wake up while we deactivate buckets.
self.scheduler.deactivate_all();

Expand All @@ -163,21 +129,8 @@ impl<VM: VMBinding> GCController<VM> {
let mut end_of_gc = EndOfGC {
elapsed: gc_start.elapsed(),
};

self.execute_coordinator_work(&mut end_of_gc, false);
end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler.debug_assert_all_buckets_deactivated();
}

fn execute_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

if notify_workers {
self.reset_and_resume_workers();
};
}
}
114 changes: 0 additions & 114 deletions src/scheduler/controller/channel.rs

This file was deleted.

27 changes: 7 additions & 20 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ pub struct ScheduleCollection;
impl<VM: VMBinding> GCWork<VM> for ScheduleCollection {
fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
mmtk.plan.schedule_collection(worker.scheduler());

// Tell GC trigger that GC started.
// We now know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
// TODO: Depending on the OS scheduling, other workers can run so fast that they can finish
// everything in the `Unconstrained` and the `Prepare` buckets before we execute the next
// statement. Consider if there is a better place to call `on_gc_start`.
mmtk.plan.base().gc_trigger.policy.on_gc_start(mmtk);
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for ScheduleCollection {}

/// The global GC Preparation Work
/// This work packet invokes prepare() for the plan (which will invoke prepare() for each space), and
/// pushes work packets for preparing mutators and collectors.
Expand Down Expand Up @@ -174,15 +179,6 @@ impl<ScanEdges: ProcessEdgesWork> StopMutators<ScanEdges> {

impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, mmtk: &'static MMTK<E::VM>) {
// If the VM requires that only the coordinator thread can stop the world,
// we delegate the work to the coordinator.
if <E::VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() {
worker
.sender
.add_coordinator_work(Box::new(StopMutators::<E>::new()));
return;
}

trace!("stop_all_mutators start");
mmtk.plan.base().prepare_for_stack_scanning();
<E::VM as VMBinding>::VMCollection::stop_all_mutators(worker.tls, |mutator| {
Expand Down Expand Up @@ -217,8 +213,6 @@ impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
}
}

impl<E: ProcessEdgesWork> CoordinatorWork<E::VM> for StopMutators<E> {}

#[derive(Default)]
pub struct EndOfGC {
pub elapsed: std::time::Duration,
Expand All @@ -244,11 +238,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
mmtk.edge_logger.reset();
}

if <VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW {
assert!(worker.is_coordinator(),
"VM only allows coordinator to resume mutators, but the current worker is not the coordinator.");
}

mmtk.plan.base().set_gc_status(GcStatus::NotInGC);

// Reset the triggering information.
Expand All @@ -258,8 +247,6 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
}
}

impl<VM: VMBinding> CoordinatorWork<VM> for EndOfGC {}

/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped
/// `ProcessEdgesWork` instance.
struct ProcessEdgesWorkTracer<E: ProcessEdgesWork> {
Expand Down
1 change: 0 additions & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod stat;
pub(self) mod work_counter;

mod work;
pub use work::CoordinatorWork;
pub use work::GCWork;
pub(crate) use work::GCWorkContext;

Expand Down
Loading