Skip to content

Commit

Permalink
Fix GCWorker ownership.
Browse files Browse the repository at this point in the history
This commit fixes an inconsistency in the VMCollection interface where a
GCWorker is exposed to the binding via `Collection::spawn_worker_thread`
as a `&GCWorker`, but later received from the binding via
`memory_manager::start_worker` as a `&mut GCWorker`.  The root cause is
because GCWorker is wrongly owned by the scheduler.

We make each GC worker thread the owner of its `GCWorker` struct, and we
pass the `GCWorker` struct across API boundary as owning `Box`, fixing
the interface.

We isolate the part of `GCWorker` shared with the GC scheduler into a
`GCWorkerShared` struct, and ensure the fields are properly
synchronized.

Particularly, we now use `AtomicRefCell` for `WorkerLocalStat`.
`AtomicRefCell` allows it to be borrowed mutably by the GC worker and
the mutator (via `harness_begin/end` methods) at different time.
However, it is a known issue that in concurrent GC, it is possible for
GC to happen when `harness_begin/end` is called.  In that case, it will
panic.

Fixes: mmtk#522
  • Loading branch information
wks committed Jan 17, 2022
1 parent 7ce3259 commit b735c4b
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ atomic = "0.4.6"
spin = "0.5.2"
env_logger = "0.8.2"
pfm = {version = "0.1.0-beta.1", optional = true}
atomic_refcell = "0.1.7"

[dev-dependencies]
crossbeam = "0.7.3"
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
// TODO: Process weak references properly
Expand Down
46 changes: 28 additions & 18 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct GCWorkScheduler<VM: VMBinding> {
/// Work for the coordinator thread
pub coordinator_work: WorkBucket<VM>,
/// workers
worker_group: Option<Arc<WorkerGroup<VM>>>,
worker_group: Option<WorkerGroup<VM>>,
/// Condition Variable for worker synchronization
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
mmtk: Option<&'static MMTK<VM>>,
Expand Down Expand Up @@ -53,6 +53,10 @@ pub struct GCWorkScheduler<VM: VMBinding> {
// the workers.
unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}

// Error message for borrowing `GCWorkerShared::stat`.
const STAT_BORROWED_MSG: &'static str = "`stat` is already borrowed. This may happen when GC is running concurrently. \
The current statistics algorithm does not support concurrent GC.";

impl<VM: VMBinding> GCWorkScheduler<VM> {
pub fn new() -> Arc<Self> {
let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default();
Expand Down Expand Up @@ -104,16 +108,18 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
self_mut.mmtk = Some(mmtk);
self_mut.coordinator_worker = Some(RwLock::new(GCWorker::new(
0,
Arc::downgrade(self),
self.clone(),
true,
self.channel.0.clone(),
)));
self_mut.worker_group = Some(WorkerGroup::new(
num_workers,
Arc::downgrade(self),
self.clone(),
self.channel.0.clone(),
));
self.worker_group.as_ref().unwrap().spawn_workers(tls, mmtk);
// FIXME: because of the `Arc::get_mut_unchanged` above, we may mutate the scheduler
// while the spawned workers already have access to the scheduler.
self_mut.worker_group.as_mut().unwrap().spawn_workers(tls, mmtk);

{
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
Expand Down Expand Up @@ -223,8 +229,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
*self.closure_end.lock().unwrap() = Some(f);
}

pub fn worker_group(&self) -> Arc<WorkerGroup<VM>> {
self.worker_group.as_ref().unwrap().clone()
pub fn worker_group(&self) -> &WorkerGroup<VM> {
self.worker_group.as_ref().unwrap()
}

fn all_buckets_empty(&self) -> bool {
Expand Down Expand Up @@ -330,8 +336,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

#[inline]
fn pop_scheduable_work(&self, worker: &GCWorker<VM>) -> Option<(Box<dyn GCWork<VM>>, bool)> {
if let Some(work) = worker.local_work_bucket.poll() {
return Some((work, worker.local_work_bucket.is_empty()));
if let Some(work) = worker.shared.local_work_bucket.poll() {
return Some((work, worker.shared.local_work_bucket.is_empty()));
}
for work_bucket in self.work_buckets.values() {
if let Some(work) = work_bucket.poll() {
Expand Down Expand Up @@ -360,10 +366,10 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

#[cold]
fn poll_slow(&self, worker: &GCWorker<VM>) -> Box<dyn GCWork<VM>> {
debug_assert!(!worker.is_parked());
debug_assert!(!worker.shared.is_parked());
let mut guard = self.worker_monitor.0.lock().unwrap();
loop {
debug_assert!(!worker.is_parked());
debug_assert!(!worker.shared.is_parked());
if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) {
if bucket_is_empty {
worker
Expand All @@ -374,7 +380,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
return work;
}
// Park this worker
worker.parked.store(true, Ordering::SeqCst);
worker.shared.parked.store(true, Ordering::SeqCst);
if self.worker_group().all_parked() {
worker
.sender
Expand All @@ -384,25 +390,29 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
// Wait
guard = self.worker_monitor.1.wait(guard).unwrap();
// Unpark this worker
worker.parked.store(false, Ordering::SeqCst);
worker.shared.parked.store(false, Ordering::SeqCst);
}
}

pub fn enable_stat(&self) {
for worker in &self.worker_group().workers {
worker.stat.enable();
for worker in &self.worker_group().workers_shared {
let worker_stat = worker.stat.try_borrow().expect(STAT_BORROWED_MSG);
worker_stat.enable();
}
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
coordinator_worker.stat.enable();
let coordinator_worker_stat = coordinator_worker.shared.stat.try_borrow().expect(STAT_BORROWED_MSG);
coordinator_worker_stat.enable();
}

pub fn statistics(&self) -> HashMap<String, String> {
let mut summary = SchedulerStat::default();
for worker in &self.worker_group().workers {
summary.merge(&worker.stat);
for worker in &self.worker_group().workers_shared {
let worker_stat = worker.stat.try_borrow().expect(STAT_BORROWED_MSG);
summary.merge(&worker_stat);
}
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
summary.merge(&coordinator_worker.stat);
let coordinator_worker_stat = coordinator_worker.shared.stat.try_borrow().expect(STAT_BORROWED_MSG);
summary.merge(&coordinator_worker_stat);
summary.harness_stat()
}

Expand Down
16 changes: 12 additions & 4 deletions src/scheduler/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@ use std::any::{type_name, TypeId};
/// For this case, use `WorkBucket::add_with_priority_unsync` instead.
pub trait CoordinatorWork<VM: VMBinding>: 'static + Send + GCWork<VM> {}

// Error message for borrowing `GCWorkerShared::stat`.
const STAT_BORROWED_MSG: &'static str = "`stat` is already borrowed. This may happen when GC is running concurrently. \
The current statistics algorithm does not support concurrent GC.";

pub trait GCWork<VM: VMBinding>: 'static + Send {
fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>);
#[inline]
fn do_work_with_stat(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
debug!("{}", std::any::type_name::<Self>());
debug_assert!(!worker.tls.0.0.is_null(), "TLS must be set correctly for a GC worker before the worker does any work. GC Worker {} has no valid tls.", worker.ordinal);
let stat = worker
.stat
.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk);
let stat = {
let mut worker_stat = worker.shared.stat.try_borrow_mut().expect(STAT_BORROWED_MSG);
worker_stat.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk)
};
self.do_work(worker, mmtk);
stat.end_of_work(&mut worker.stat);
{
let mut worker_stat = worker.shared.stat.try_borrow_mut().expect(STAT_BORROWED_MSG);
stat.end_of_work(&mut worker_stat);
}
}
}

Expand Down
84 changes: 55 additions & 29 deletions src/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,67 @@ use crate::mmtk::MMTK;
use crate::util::copy::GCWorkerCopyContext;
use crate::util::opaque_pointer::*;
use crate::vm::{Collection, VMBinding};
use atomic_refcell::AtomicRefCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Weak};
use std::sync::Arc;

const LOCALLY_CACHED_WORKS: usize = 1;


/// The part shared between a GCWorker and the scheduler.
/// This structure is used for communication, e.g. adding new work packets.
pub struct GCWorkerShared<VM: VMBinding> {
pub parked: AtomicBool,
pub stat: AtomicRefCell<WorkerLocalStat<VM>>,
pub local_work_bucket: WorkBucket<VM>,
}

/// A GC worker. This part is privately owned by a worker thread.
pub struct GCWorker<VM: VMBinding> {
pub tls: VMWorkerThread,
pub ordinal: usize,
pub parked: AtomicBool,
scheduler: Arc<GCWorkScheduler<VM>>,
copy: GCWorkerCopyContext<VM>,
pub local_work_bucket: WorkBucket<VM>,
pub sender: Sender<CoordinatorMessage<VM>>,
pub stat: WorkerLocalStat<VM>,
mmtk: Option<&'static MMTK<VM>>,
is_coordinator: bool,
local_work_buffer: Vec<(WorkBucketStage, Box<dyn GCWork<VM>>)>,
pub shared: Arc<GCWorkerShared<VM>>,
}

unsafe impl<VM: VMBinding> Sync for GCWorker<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorker<VM> {}
unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}

impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn is_parked(&self) -> bool {
self.parked.load(Ordering::SeqCst)
}
}

impl<VM: VMBinding> GCWorker<VM> {
pub fn new(
ordinal: usize,
scheduler: Weak<GCWorkScheduler<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
is_coordinator: bool,
sender: Sender<CoordinatorMessage<VM>>,
) -> Self {
let scheduler = scheduler.upgrade().unwrap();
let worker_monitor = scheduler.worker_monitor.clone();
Self {
tls: VMWorkerThread(VMThread::UNINITIALIZED),
ordinal,
parked: AtomicBool::new(true),
// We will set this later
copy: GCWorkerCopyContext::new_non_copy(),
local_work_bucket: WorkBucket::new(true, scheduler.worker_monitor.clone()),
sender,
scheduler,
stat: Default::default(),
mmtk: None,
is_coordinator,
local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS),
shared: Arc::new(GCWorkerShared {
parked: AtomicBool::new(true),
stat: Default::default(),
local_work_bucket: WorkBucket::new(true, worker_monitor),
}),
}
}

Expand All @@ -73,9 +90,6 @@ impl<VM: VMBinding> GCWorker<VM> {
}
}

pub fn is_parked(&self) -> bool {
self.parked.load(Ordering::SeqCst)
}

pub fn is_coordinator(&self) -> bool {
self.is_coordinator
Expand All @@ -97,52 +111,64 @@ impl<VM: VMBinding> GCWorker<VM> {
self.tls = tls;
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
self.mmtk = Some(mmtk);
self.parked.store(false, Ordering::SeqCst);
self.shared.parked.store(false, Ordering::SeqCst);
loop {
while let Some((bucket, mut work)) = self.local_work_buffer.pop() {
debug_assert!(self.scheduler.work_buckets[bucket].is_activated());
work.do_work_with_stat(self, mmtk);
}
let mut work = self.scheduler().poll(self);
debug_assert!(!self.is_parked());
debug_assert!(!self.shared.is_parked());
work.do_work_with_stat(self, mmtk);
}
}
}

pub struct WorkerGroup<VM: VMBinding> {
pub workers: Vec<GCWorker<VM>>,
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
workers_to_spawn: Vec<Box<GCWorker<VM>>>,
}

impl<VM: VMBinding> WorkerGroup<VM> {
pub fn new(
workers: usize,
scheduler: Weak<GCWorkScheduler<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
sender: Sender<CoordinatorMessage<VM>>,
) -> Arc<Self> {
Arc::new(Self {
workers: (0..workers)
.map(|i| GCWorker::new(i, scheduler.clone(), false, sender.clone()))
.collect(),
})
) -> Self {
let mut workers_shared = Vec::new();
let mut workers_to_spawn = Vec::new();

for ordinal in 0..workers {
let worker = Box::new(GCWorker::new(ordinal, scheduler.clone(), false, sender.clone()));
let worker_shared = worker.shared.clone();
workers_shared.push(worker_shared);
workers_to_spawn.push(worker);
// NOTE: We cannot call spawn_worker_thread here,
// because the worker will access the WorkerGroup in the scheduler which is not yet constructed.
}

Self {
workers_shared,
workers_to_spawn,
}
}

pub fn worker_count(&self) -> usize {
self.workers.len()
self.workers_shared.len()
}

pub fn parked_workers(&self) -> usize {
self.workers.iter().filter(|w| w.is_parked()).count()
self.workers_shared.iter().filter(|w| w.is_parked()).count()
}

pub fn all_parked(&self) -> bool {
self.parked_workers() == self.worker_count()
}

pub fn spawn_workers(&'static self, tls: VMThread, _mmtk: &'static MMTK<VM>) {
for i in 0..self.worker_count() {
let worker = &self.workers[i];
pub fn spawn_workers(&mut self, tls: VMThread, _mmtk: &'static MMTK<VM>) {
for worker in self.workers_to_spawn.drain(..) {
VM::VMCollection::spawn_worker_thread(tls, Some(worker));
}
assert!(self.workers_to_spawn.is_empty());
}
}
4 changes: 2 additions & 2 deletions src/util/sanity/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<P: Plan> GCWork<P::VM> for SanityPrepare<P> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<P::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand All @@ -133,7 +133,7 @@ impl<P: Plan> GCWork<P::VM> for SanityRelease<P> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<P::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers {
for w in &mmtk.scheduler.worker_group().workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/vm/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait Collection<VM: VMBinding> {
/// calls `initialize_collection()` and passes as an argument.
/// * `ctx`: The GC worker context for the GC thread. If `None` is passed, it means spawning a GC thread for the GC controller,
/// which does not have a worker context.
fn spawn_worker_thread(tls: VMThread, ctx: Option<&GCWorker<VM>>);
fn spawn_worker_thread(tls: VMThread, ctx: Option<Box<GCWorker<VM>>>);

/// Allow VM-specific behaviors for a mutator after all the mutators are stopped and before any actual GC work starts.
///
Expand Down

0 comments on commit b735c4b

Please sign in to comment.