Skip to content

Commit

Permalink
rename later to yield_task_queue_now; add yield_now
Browse files Browse the repository at this point in the history
Also add a free function `yield_if_needed` since it's one of the most
used in Glommio apps.
  • Loading branch information
HippoBaro committed Sep 27, 2021
1 parent 78a1f99 commit 2ffbfac
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 64 deletions.
24 changes: 14 additions & 10 deletions examples/cooperative_preempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ fn main() {
// explicitly yield control if they are going to do something that may take
// too long (that is usually a loop!)
//
// There are two ways of yielding control:
// There are three ways of yielding control:
//
// * glommio::executor().yield_if_needed(), which will yield if the task has
// run for too long. What "too long" means is an implementation detail, but
// it will be always somehow related to the latency guarantees that the task
// queues want to uphold in their `Latency::Matters` parameter (or
// Latency::NotImportant).
// * glommio::executor().yield_if_needed(), which will yield if the current
// task queue has run for too long. What "too long" means is an
// implementation detail, but it will be always somehow related to the
// latency guarantees that the task queues want to uphold in their
// `Latency::Matters` parameter (or Latency::NotImportant).
//
// * glommio::executor().later(), which will yield immediately (execute the
// rest of the function later).
// * glommio::executor().yield_task_queue_now(), works like yield_if_needed()
// but yields unconditionally.
//
// * glommio::executor().yield_now(), which unconditional yield the current
// task within the current task queue, forcing the scheduler to run another
// task on the same task queue.
//
// Because yield_if_needed() returns a future that has to be .awaited, it cannot
// be used in situations where .await is illegal. For instance, if we are
Expand All @@ -51,7 +55,7 @@ fn main() {
let start = Instant::now();
let mut lap = start;
while start.elapsed().as_millis() < 50 {
glommio::executor().yield_if_needed().await;
glommio::yield_if_needed().await;
if lap.elapsed().as_millis() > 1 {
lap = Instant::now();
println!("tq1: 1ms");
Expand All @@ -72,7 +76,7 @@ fn main() {
let mut v = value.borrow_mut();
if glommio::executor().need_preempt() {
drop(v);
glommio::executor().later().await;
glommio::executor().yield_task_queue_now().await;
} else {
*v += 1;
}
Expand Down
6 changes: 3 additions & 3 deletions examples/deadline_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl IntWriter {
}

burn_cpu(Duration::from_micros(500));
glommio::executor().later().await;
glommio::executor().yield_task_queue_now().await;
}
}
}
Expand Down Expand Up @@ -119,7 +119,7 @@ fn competing_cpu_hog(
async move {
while !stop.get() {
burn_cpu(Duration::from_micros(500));
glommio::executor().later().await;
glommio::executor().yield_task_queue_now().await;
}
},
cpuhog_tq,
Expand Down Expand Up @@ -233,7 +233,7 @@ fn main() {
loop {
let stop = Rc::new(Cell::new(false));
let hog = competing_cpu_hog(stop.clone(), cpuhog_tq);
glommio::executor().later().await;
glommio::executor().yield_task_queue_now().await;

let deadline = DeadlineQueue::new("example", Duration::from_millis(250));
let test = IntWriter::new(to_write, Duration::from_secs(duration as u64));
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn hello() {
for t in 0..5 {
tasks.push(glommio::spawn_local(async move {
println!("{}: Hello {} ...", glommio::executor().id(), t);
glommio::executor().later().await;
glommio::executor().yield_task_queue_now().await;
println!("{}: ... {} World!", glommio::executor().id(), t);
}));
}
Expand Down
4 changes: 2 additions & 2 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn main() {
println!("reset");
*(right.borrow_mut()) = true
}
glommio::executor().yield_if_needed().await;
glommio::yield_if_needed().await;

}
}
Expand All @@ -38,7 +38,7 @@ fn main() {
println!("right");
*(right.borrow_mut()) = true
}
glommio::executor().yield_if_needed().await;
glommio::yield_if_needed().await;
}
}
}(left.clone(), right.clone()))
Expand Down
2 changes: 1 addition & 1 deletion glommio/benches/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
let mut expected : u32 = 0;
while expected != runs {
while expected != acquisitions.get() {
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
s.signal(1);
expected += 1;
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/channels/local_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ mod test {

loop {
match sender.try_send(1) {
Ok(_) => crate::executor().later().await,
Ok(_) => crate::executor().yield_task_queue_now().await,
err => {
matches!(err, Err(GlommioError::WouldBlock(ResourceType::Channel(1))));
break;
Expand Down
97 changes: 72 additions & 25 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ impl<T> Task<T> {
/// glommio::spawn_local(async {
/// loop {
/// println!("I'm a background task looping forever.");
/// glommio::executor().later().await;
/// glommio::executor().yield_task_queue_now().await;
/// }
/// })
/// .detach();
Expand Down Expand Up @@ -1535,6 +1535,26 @@ impl<'a, T> Future for ScopedTask<'a, T> {
}
}

/// Conditionally yields the current task queue. The scheduler may then
/// process other task queues according to their latency requirements.
/// If a call to this function results in the current queue to yield,
/// then the calling task is moved to the back of the yielded task
/// queue.
///
/// Under which condition this function yield is an implementation detail
/// subject to changes, but it will always be somehow related to the latency
/// guarantees that the task queues want to uphold in their
/// `Latency::Matters` parameter (or `Latency::NotImportant`).
///
/// This function is the central mechanism of task cooperation in Glommio
/// and should be preferred over unconditional yielding methods like
/// [`ExecutorProxy::yield_now`] and
/// [`ExecutorProxy::yield_task_queue_now`].
#[inline]
pub async fn yield_if_needed() {
executor().yield_if_needed().await
}

/// Spawns a task onto the current single-threaded executor.
///
/// If called from a [`LocalExecutor`], the task is spawned on it.
Expand Down Expand Up @@ -1676,7 +1696,8 @@ pub unsafe fn spawn_scoped_local_into<'a, T>(
executor().spawn_scoped_local_into(future, handle)
}

/// A proxy struct to the underlying [`LocalExecutor`]
/// A proxy struct to the underlying [`LocalExecutor`]. It is accessible from
/// anywhere within a Glommio context using [`executor()`].
#[derive(Debug)]
pub struct ExecutorProxy {}

Expand All @@ -1699,14 +1720,7 @@ impl ExecutorProxy {
}
}

/// Unconditionally yields the current task, moving it back to the end of
/// its queue. It is not possible to yield futures that are not spawn'd,
/// as they don't have a task associated with them.
pub async fn later(&self) {
Self::cond_yield(|_| true).await
}

/// checks if this task has ran for too long and need to be preempted. This
/// Checks if this task has ran for too long and need to be preempted. This
/// is useful for situations where we can't call .await, for instance,
/// if a [`RefMut`] is held. If this tests true, then the user is
/// responsible for making any preparations necessary for calling .await
Expand Down Expand Up @@ -1758,13 +1772,46 @@ impl ExecutorProxy {
LOCAL_EX.with(|local_ex| local_ex.need_preempt())
}

/// Conditionally yields the current task, moving it back to the end of its
/// queue, if the task has run for too long
/// Conditionally yields the current task queue. The scheduler may then
/// process other task queues according to their latency requirements.
/// If a call to this function results in the current queue to yield,
/// then the calling task is moved to the back of the yielded task
/// queue.
///
/// Under which condition this function yield is an implementation detail
/// subject to changes, but it will always be somehow related to the latency
/// guarantees that the task queues want to uphold in their
/// `Latency::Matters` parameter (or `Latency::NotImportant`).
///
/// This function is the central mechanism of task cooperation in Glommio
/// and should be preferred over unconditional yielding methods like
/// [`ExecutorProxy::yield_now`] and
/// [`ExecutorProxy::yield_task_queue_now`].
#[inline]
pub async fn yield_if_needed(&self) {
Self::cond_yield(|local_ex| local_ex.need_preempt()).await;
}

/// Unconditionally yields the current task and forces the scheduler
/// to poll another task within the current task queue.
/// Calling this wakes the current task and returns [`Poll::Pending`] once.
///
/// Unless you know you need to yield right now, using
/// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
pub async fn yield_now(&self) {
futures_lite::future::yield_now().await
}

/// Unconditionally yields the current task queue and forces the scheduler
/// to poll another queue. Use [`ExecutorProxy::yield_now`] to yield within
/// a queue.
///
/// Unless you know you need to yield right now, using
/// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
pub async fn yield_task_queue_now(&self) {
Self::cond_yield(|_| true).await
}

#[inline]
pub(crate) fn reactor(&self) -> Rc<reactor::Reactor> {
LOCAL_EX.with(|local_ex| local_ex.get_reactor())
Expand Down Expand Up @@ -2293,7 +2340,7 @@ mod test {
assert_ne!(id, *last);
*last = id;
drop(last);
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
}));
}
Expand Down Expand Up @@ -2338,7 +2385,7 @@ mod test {
if start.elapsed().as_secs() > 1 {
panic!("Never received preempt signal");
}
crate::executor().yield_if_needed().await;
crate::yield_if_needed().await;
}
}
},
Expand All @@ -2353,7 +2400,7 @@ mod test {
// In case we are executed first, yield to the the other task
loop {
if !(*(nolat_started.borrow())) {
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
} else {
break;
}
Expand Down Expand Up @@ -2442,7 +2489,7 @@ mod test {
if *(second_status.borrow()) {
panic!("I was preempted but should not have been");
}
crate::executor().yield_if_needed().await;
crate::yield_if_needed().await;
}
}
},
Expand All @@ -2457,7 +2504,7 @@ mod test {
// In case we are executed first, yield to the the other task
loop {
if !(*(first_started.borrow())) {
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
} else {
break;
}
Expand All @@ -2480,7 +2527,7 @@ mod test {
ex.run(async {
crate::spawn_local(async {
loop {
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
})
.detach();
Expand Down Expand Up @@ -2575,7 +2622,7 @@ mod test {

let now = Instant::now();
while now.elapsed().as_millis() < 200 {}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
assert!(
crate::executor().executor_stats().total_runtime() >= Duration::from_millis(200),
"expected runtime on LE0 {:#?} is greater than 200 ms",
Expand Down Expand Up @@ -2606,7 +2653,7 @@ mod test {

let now = Instant::now();
while now.elapsed().as_millis() < 200 {}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
assert!(
crate::executor().executor_stats().total_runtime()
>= Duration::from_millis(200),
Expand All @@ -2629,7 +2676,7 @@ mod test {
async fn work_quanta() {
let now = Instant::now();
while now.elapsed().as_millis() < 2 {}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}

macro_rules! test_static_shares {
Expand Down Expand Up @@ -2758,7 +2805,7 @@ mod test {
break;
}
(*tq1_count.borrow_mut())[secs as usize] += 1;
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
}},
tq1,
Expand All @@ -2775,7 +2822,7 @@ mod test {
}
bm.tick(elapsed.as_millis() as u64);
(*tq2_count.borrow_mut())[secs as usize] += 1;
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
}},
tq2,
Expand Down Expand Up @@ -3141,7 +3188,7 @@ mod test {
})
.await;
}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
assert_eq!(a, 2);

let mut a = 1;
Expand All @@ -3151,7 +3198,7 @@ mod test {
})
};

crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
do_later.await;
assert_eq!(a, 2);
});
Expand Down
6 changes: 4 additions & 2 deletions glommio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ macro_rules! wait_on_cond {
if *($var.borrow()) == $val {
break;
}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
};
($var:expr, $val:expr, $instantval:expr) => {
Expand All @@ -429,7 +429,7 @@ macro_rules! wait_on_cond {
if start.elapsed().as_secs() > $instantval {
panic!("test timed out");
}
crate::executor().later().await;
crate::executor().yield_task_queue_now().await;
}
};
}
Expand Down Expand Up @@ -485,6 +485,7 @@ pub use crate::{
spawn_local_into,
spawn_scoped_local,
spawn_scoped_local_into,
yield_if_needed,
CpuSet,
ExecutorProxy,
ExecutorStats,
Expand Down Expand Up @@ -513,6 +514,7 @@ pub mod prelude {
executor,
spawn_local,
spawn_local_into,
yield_if_needed,
ByteSliceExt,
ByteSliceMutExt,
ExecutorProxy,
Expand Down
Loading

0 comments on commit 2ffbfac

Please sign in to comment.