Skip to content

Commit

Permalink
runtime: make scheduler non-optional (#3980)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Jul 21, 2021
1 parent 2087f3e commit 0cefa85
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 79 deletions.
70 changes: 3 additions & 67 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::runtime::task::Schedule;
use crate::util::linked_list;

use std::pin::Pin;
Expand All @@ -36,10 +36,6 @@ pub(super) struct Cell<T: Future, S> {
pub(super) trailer: Trailer,
}

pub(super) struct Scheduler<S> {
scheduler: UnsafeCell<Option<S>>,
}

pub(super) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
Expand All @@ -49,7 +45,7 @@ pub(super) struct CoreStage<T: Future> {
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
pub(super) scheduler: Scheduler<S>,
pub(super) scheduler: S,

/// Either the future or the output
pub(super) stage: CoreStage<T>,
Expand Down Expand Up @@ -106,9 +102,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
id,
},
core: Core {
scheduler: Scheduler {
scheduler: UnsafeCell::new(Some(scheduler)),
},
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
Expand All @@ -120,64 +114,6 @@ impl<T: Future, S: Schedule> Cell<T, S> {
}
}

impl<S: Schedule> Scheduler<S> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
self.scheduler.with_mut(f)
}

/// Returns true if the task is bound to a scheduler.
pub(super) fn is_bound(&self) -> bool {
// Safety: never called concurrently w/ a mutation.
self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
}

/// Schedule the future for execution
pub(super) fn schedule(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.schedule(task),
None => panic!("no scheduler set"),
}
});
}

/// Schedule the future for execution in the near future, yielding the
/// thread to other tasks.
pub(super) fn yield_now(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.yield_now(task),
None => panic!("no scheduler set"),
}
});
}

/// Release the task
///
/// If the `Scheduler` implementation is able to, it returns the `Task`
/// handle immediately. The caller of this function will batch a ref-dec
/// with a state change.
pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
use std::mem::ManuallyDrop;

let task = ManuallyDrop::new(task);

self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.release(&*task),
// Task was never polled
None => None,
}
})
}
}

impl<T: Future> CoreStage<T> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
self.stage.with_mut(f)
Expand Down
21 changes: 9 additions & 12 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
use crate::runtime::task::state::Snapshot;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
Expand Down Expand Up @@ -95,7 +95,6 @@ where

// Check causality
self.core().stage.with_mut(drop);
self.core().scheduler.with_mut(drop);

unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
Expand Down Expand Up @@ -219,7 +218,7 @@ enum TransitionToRunning {

struct SchedulerView<'a, S> {
header: &'a Header,
scheduler: &'a Scheduler<S>,
scheduler: &'a S,
}

impl<'a, S> SchedulerView<'a, S>
Expand All @@ -233,17 +232,17 @@ where

/// Returns true if the task should be deallocated.
fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
let ref_dec = if self.scheduler.is_bound() {
if let Some(task) = self.scheduler.release(self.to_task()) {
mem::forget(task);
true
} else {
false
}
let me = self.to_task();

let ref_dec = if let Some(task) = self.scheduler.release(&me) {
mem::forget(task);
true
} else {
false
};

mem::forget(me);

// This might deallocate
let snapshot = self
.header
Expand All @@ -254,8 +253,6 @@ where
}

fn transition_to_running(&self) -> TransitionToRunning {
debug_assert!(self.scheduler.is_bound());

// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
Expand Down

0 comments on commit 0cefa85

Please sign in to comment.