Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TickedAsyncExecutor::spawn API #6

Merged
merged 3 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
files: target/cobertura.xml

- name: Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
cargo miri test
# - name: Miri
# run: |
# rustup toolchain install nightly --component miri
# rustup override set nightly
# cargo miri setup
# cargo miri test
41 changes: 11 additions & 30 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Payload = (TaskIdentifier, async_task::Runnable);
pub struct TickedAsyncExecutor<O> {
channel: (mpsc::Sender<Payload>, mpsc::Receiver<Payload>),
num_woken_tasks: Arc<AtomicUsize>,

num_spawned_tasks: Arc<AtomicUsize>,

// TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel
Expand Down Expand Up @@ -52,22 +53,6 @@ where
}
}

pub fn spawn<T>(
&self,
identifier: impl Into<TaskIdentifier>,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T>
where
T: Send + 'static,
{
let identifier = identifier.into();
let future = self.droppable_future(identifier.clone(), future);
let schedule = self.runnable_schedule_cb(identifier);
let (runnable, task) = async_task::spawn(future, schedule);
runnable.schedule();
task
}

pub fn spawn_local<T>(
&self,
identifier: impl Into<TaskIdentifier>,
Expand Down Expand Up @@ -172,7 +157,7 @@ mod tests {
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::default();
executor
.spawn("A", async move {
.spawn_local("A", async move {
tokio::task::yield_now().await;
})
.detach();
Expand Down Expand Up @@ -226,15 +211,6 @@ mod tests {
fn test_ticked_timer() {
let executor = TickedAsyncExecutor::default();

for _ in 0..10 {
let timer: TickedTimer = executor.create_timer();
executor
.spawn("ThreadedTimer", async move {
timer.sleep_for(256.0).await;
})
.detach();
}

for _ in 0..10 {
let timer = executor.create_timer();
executor
Expand All @@ -255,25 +231,30 @@ mod tests {
let elapsed = now.elapsed();
println!("Elapsed: {:?}", elapsed);
println!("Total: {:?}", instances);
println!(
"Min: {:?}, Max: {:?}",
instances.iter().min(),
instances.iter().max()
);

// Test Timer cancellation
let timer = executor.create_timer();
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let timer = executor.create_timer();
executor
.spawn_local("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let mut tick_event = executor.tick_channel();
executor
.spawn("ThreadedTickFuture", async move {
.spawn_local("LocalTickFuture1", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
Expand All @@ -285,7 +266,7 @@ mod tests {

let mut tick_event = executor.tick_channel();
executor
.spawn_local("LocalTickFuture", async move {
.spawn_local("LocalTickFuture2", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
Expand Down
8 changes: 4 additions & 4 deletions tests/tokio_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn test_tokio_join() {
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
let (tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
let (a, b) = tokio::join!(rx1.recv(), rx2.recv());
assert_eq!(a.unwrap(), 10);
assert_eq!(b.unwrap(), 20);
Expand All @@ -19,7 +19,7 @@ fn test_tokio_join() {
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
let (tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
let (a, b) = tokio::join!(rx3.recv(), rx4.recv());
assert_eq!(a.unwrap(), 10);
assert_eq!(b.unwrap(), 20);
Expand All @@ -46,7 +46,7 @@ fn test_tokio_select() {
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
let (_tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
tokio::select! {
data = rx1.recv() => {
assert_eq!(data.unwrap(), 10);
Expand All @@ -59,7 +59,7 @@ fn test_tokio_select() {
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
let (_tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
tokio::select! {
data = rx3.recv() => {
assert_eq!(data.unwrap(), 10);
Expand Down