Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GCWorker ownership. #523

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ atomic = "0.4.6"
spin = "0.5.2"
env_logger = "0.8.2"
pfm = {version = "0.1.0-beta.1", optional = true}
atomic_refcell = "0.1.7"

[dev-dependencies]
crossbeam = "0.7.3"
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
// TODO: Process weak references properly
Expand Down
48 changes: 26 additions & 22 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct GCWorkScheduler<VM: VMBinding> {
/// Work for the coordinator thread
pub coordinator_work: WorkBucket<VM>,
/// workers
worker_group: Option<Arc<WorkerGroup<VM>>>,
worker_group: Option<WorkerGroup<VM>>,
/// Condition Variable for worker synchronization
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
mmtk: Option<&'static MMTK<VM>>,
Expand Down Expand Up @@ -104,16 +104,16 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
self_mut.mmtk = Some(mmtk);
self_mut.coordinator_worker = Some(RwLock::new(GCWorker::new(
0,
Arc::downgrade(self),
self.clone(),
true,
self.channel.0.clone(),
)));
self_mut.worker_group = Some(WorkerGroup::new(
num_workers,
Arc::downgrade(self),
self.channel.0.clone(),
));
self.worker_group.as_ref().unwrap().spawn_workers(tls, mmtk);
let (worker_group, spawn_workers) =
WorkerGroup::new(num_workers, self.clone(), self.channel.0.clone());
self_mut.worker_group = Some(worker_group);
// FIXME: because of the `Arc::get_mut_unchanged` above, we are now mutating the scheduler
// while the spawned workers already have access to the scheduler.
spawn_workers(tls);

{
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
Expand Down Expand Up @@ -223,8 +223,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
*self.closure_end.lock().unwrap() = Some(f);
}

pub fn worker_group(&self) -> Arc<WorkerGroup<VM>> {
self.worker_group.as_ref().unwrap().clone()
pub fn worker_group(&self) -> &WorkerGroup<VM> {
self.worker_group.as_ref().unwrap()
}

fn all_buckets_empty(&self) -> bool {
Expand Down Expand Up @@ -330,8 +330,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

#[inline]
fn pop_scheduable_work(&self, worker: &GCWorker<VM>) -> Option<(Box<dyn GCWork<VM>>, bool)> {
if let Some(work) = worker.local_work_bucket.poll() {
return Some((work, worker.local_work_bucket.is_empty()));
if let Some(work) = worker.shared.local_work_bucket.poll() {
return Some((work, worker.shared.local_work_bucket.is_empty()));
}
for work_bucket in self.work_buckets.values() {
if let Some(work) = work_bucket.poll() {
Expand Down Expand Up @@ -360,10 +360,10 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

#[cold]
fn poll_slow(&self, worker: &GCWorker<VM>) -> Box<dyn GCWork<VM>> {
debug_assert!(!worker.is_parked());
debug_assert!(!worker.shared.is_parked());
let mut guard = self.worker_monitor.0.lock().unwrap();
loop {
debug_assert!(!worker.is_parked());
debug_assert!(!worker.shared.is_parked());
if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) {
if bucket_is_empty {
worker
Expand All @@ -374,7 +374,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
return work;
}
// Park this worker
worker.parked.store(true, Ordering::SeqCst);
worker.shared.parked.store(true, Ordering::SeqCst);
if self.worker_group().all_parked() {
worker
.sender
Expand All @@ -384,25 +384,29 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
// Wait
guard = self.worker_monitor.1.wait(guard).unwrap();
// Unpark this worker
worker.parked.store(false, Ordering::SeqCst);
worker.shared.parked.store(false, Ordering::SeqCst);
}
}

pub fn enable_stat(&self) {
for worker in &self.worker_group().workers {
worker.stat.enable();
for worker in &self.worker_group().workers_shared {
let worker_stat = worker.borrow_stat();
worker_stat.enable();
}
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
coordinator_worker.stat.enable();
let coordinator_worker_stat = coordinator_worker.shared.borrow_stat();
coordinator_worker_stat.enable();
}

pub fn statistics(&self) -> HashMap<String, String> {
let mut summary = SchedulerStat::default();
for worker in &self.worker_group().workers {
summary.merge(&worker.stat);
for worker in &self.worker_group().workers_shared {
let worker_stat = worker.borrow_stat();
summary.merge(&worker_stat);
}
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
summary.merge(&coordinator_worker.stat);
let coordinator_worker_stat = coordinator_worker.shared.borrow_stat();
summary.merge(&coordinator_worker_stat);
summary.harness_stat()
}

Expand Down
12 changes: 8 additions & 4 deletions src/scheduler/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ pub trait GCWork<VM: VMBinding>: 'static + Send {

#[cfg(feature = "work_packet_stats")]
// Start collecting statistics
let stat = worker
.stat
.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk);
let stat = {
let mut worker_stat = worker.shared.borrow_stat_mut();
worker_stat.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk)
};

// Do the actual work
self.do_work(worker, mmtk);

#[cfg(feature = "work_packet_stats")]
// Finish collecting statistics
stat.end_of_work(&mut worker.stat);
{
let mut worker_stat = worker.shared.borrow_stat_mut();
stat.end_of_work(&mut worker_stat);
}
}
}

Expand Down
106 changes: 72 additions & 34 deletions src/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,78 @@ use crate::mmtk::MMTK;
use crate::util::copy::GCWorkerCopyContext;
use crate::util::opaque_pointer::*;
use crate::vm::{Collection, VMBinding};
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Weak};
use std::sync::Arc;

const LOCALLY_CACHED_WORKS: usize = 1;

/// The part shared between a GCWorker and the scheduler.
/// This structure is used for communication, e.g. adding new work packets.
pub struct GCWorkerShared<VM: VMBinding> {
pub parked: AtomicBool,
stat: AtomicRefCell<WorkerLocalStat<VM>>,
pub local_work_bucket: WorkBucket<VM>,
}

/// A GC worker. This part is privately owned by a worker thread.
pub struct GCWorker<VM: VMBinding> {
pub tls: VMWorkerThread,
pub ordinal: usize,
pub parked: AtomicBool,
scheduler: Arc<GCWorkScheduler<VM>>,
copy: GCWorkerCopyContext<VM>,
pub local_work_bucket: WorkBucket<VM>,
pub sender: Sender<CoordinatorMessage<VM>>,
pub stat: WorkerLocalStat<VM>,
mmtk: Option<&'static MMTK<VM>>,
is_coordinator: bool,
local_work_buffer: Vec<(WorkBucketStage, Box<dyn GCWork<VM>>)>,
pub shared: Arc<GCWorkerShared<VM>>,
}

unsafe impl<VM: VMBinding> Sync for GCWorker<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorker<VM> {}
unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}

// Error message for borrowing `GCWorkerShared::stat`.
const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This may happen if \
the mutator calls harness_begin or harness_end while the GC is running.";

impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn is_parked(&self) -> bool {
self.parked.load(Ordering::SeqCst)
}

pub fn borrow_stat(&self) -> AtomicRef<WorkerLocalStat<VM>> {
self.stat.try_borrow().expect(STAT_BORROWED_MSG)
}

pub fn borrow_stat_mut(&self) -> AtomicRefMut<WorkerLocalStat<VM>> {
self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
}
}

impl<VM: VMBinding> GCWorker<VM> {
pub fn new(
ordinal: usize,
scheduler: Weak<GCWorkScheduler<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
is_coordinator: bool,
sender: Sender<CoordinatorMessage<VM>>,
) -> Self {
let scheduler = scheduler.upgrade().unwrap();
let worker_monitor = scheduler.worker_monitor.clone();
Self {
tls: VMWorkerThread(VMThread::UNINITIALIZED),
ordinal,
parked: AtomicBool::new(true),
// We will set this later
copy: GCWorkerCopyContext::new_non_copy(),
local_work_bucket: WorkBucket::new(true, scheduler.worker_monitor.clone()),
sender,
scheduler,
stat: Default::default(),
mmtk: None,
is_coordinator,
local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS),
shared: Arc::new(GCWorkerShared {
parked: AtomicBool::new(true),
stat: Default::default(),
local_work_bucket: WorkBucket::new(true, worker_monitor),
}),
}
}

Expand All @@ -73,10 +101,6 @@ impl<VM: VMBinding> GCWorker<VM> {
}
}

pub fn is_parked(&self) -> bool {
self.parked.load(Ordering::SeqCst)
}

pub fn is_coordinator(&self) -> bool {
self.is_coordinator
}
Expand All @@ -97,52 +121,66 @@ impl<VM: VMBinding> GCWorker<VM> {
self.tls = tls;
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
self.mmtk = Some(mmtk);
self.parked.store(false, Ordering::SeqCst);
self.shared.parked.store(false, Ordering::SeqCst);
loop {
while let Some((bucket, mut work)) = self.local_work_buffer.pop() {
debug_assert!(self.scheduler.work_buckets[bucket].is_activated());
work.do_work_with_stat(self, mmtk);
}
let mut work = self.scheduler().poll(self);
debug_assert!(!self.is_parked());
debug_assert!(!self.shared.is_parked());
work.do_work_with_stat(self, mmtk);
}
}
}

pub struct WorkerGroup<VM: VMBinding> {
pub workers: Vec<GCWorker<VM>>,
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
}

impl<VM: VMBinding> WorkerGroup<VM> {
pub fn new(
workers: usize,
scheduler: Weak<GCWorkScheduler<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
sender: Sender<CoordinatorMessage<VM>>,
) -> Arc<Self> {
Arc::new(Self {
workers: (0..workers)
.map(|i| GCWorker::new(i, scheduler.clone(), false, sender.clone()))
.collect(),
})
) -> (Self, Box<dyn FnOnce(VMThread)>) {
let mut workers_shared = Vec::new();
let mut workers_to_spawn = Vec::new();

for ordinal in 0..workers {
let worker = Box::new(GCWorker::new(
ordinal,
scheduler.clone(),
false,
sender.clone(),
));
let worker_shared = worker.shared.clone();
workers_shared.push(worker_shared);
workers_to_spawn.push(worker);
}

// NOTE: We cannot call spawn_worker_thread here,
// because the worker will access `Scheduler::worker_group` immediately after started,
// but that field will not be assigned to before this function returns.
// Therefore we defer the spawning operation later.
let deferred_spawn = Box::new(move |tls| {
for worker in workers_to_spawn.drain(..) {
VM::VMCollection::spawn_worker_thread(tls, Some(worker));
}
});

(Self { workers_shared }, deferred_spawn)
}

pub fn worker_count(&self) -> usize {
self.workers.len()
self.workers_shared.len()
}

pub fn parked_workers(&self) -> usize {
self.workers.iter().filter(|w| w.is_parked()).count()
self.workers_shared.iter().filter(|w| w.is_parked()).count()
}

pub fn all_parked(&self) -> bool {
self.parked_workers() == self.worker_count()
}

pub fn spawn_workers(&'static self, tls: VMThread, _mmtk: &'static MMTK<VM>) {
for i in 0..self.worker_count() {
let worker = &self.workers[i];
VM::VMCollection::spawn_worker_thread(tls, Some(worker));
}
}
}
4 changes: 2 additions & 2 deletions src/util/sanity/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<P: Plan> GCWork<P::VM> for SanityPrepare<P> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<P::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand All @@ -133,7 +133,7 @@ impl<P: Plan> GCWork<P::VM> for SanityRelease<P> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<P::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/vm/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait Collection<VM: VMBinding> {
/// calls `initialize_collection()` and passes as an argument.
/// * `ctx`: The GC worker context for the GC thread. If `None` is passed, it means spawning a GC thread for the GC controller,
/// which does not have a worker context.
fn spawn_worker_thread(tls: VMThread, ctx: Option<&GCWorker<VM>>);
fn spawn_worker_thread(tls: VMThread, ctx: Option<Box<GCWorker<VM>>>);

/// Allow VM-specific behaviors for a mutator after all the mutators are stopped and before any actual GC work starts.
///
Expand Down
4 changes: 2 additions & 2 deletions vmbindings/dummyvm/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ impl Collection<DummyVM> for VMCollection {
panic!("block_for_gc is not implemented")
}

fn spawn_worker_thread(_tls: VMThread, _ctx: Option<&GCWorker<DummyVM>>) {
fn spawn_worker_thread(_tls: VMThread, _ctx: Option<Box<GCWorker<DummyVM>>>) {

}

fn prepare_mutator<T: MutatorContext<DummyVM>>(_tls_w: VMWorkerThread, _tls_m: VMMutatorThread, _mutator: &T) {
unimplemented!()
}
}
}