Skip to content

Commit

Permalink
fix(subscriber): fix build on tokio 1.21.0 (#374)
Browse files Browse the repository at this point in the history
Due to a change in the unstable task builder APIs, this no longer
compiles with the latest version of Tokio. Fortunately, it's a simple
fix.
  • Loading branch information
Noah-Kennedy authored and hawkw committed Sep 29, 2023
1 parent c903b33 commit c34ac2d
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 35 deletions.
70 changes: 56 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ serde_json = "1"
crossbeam-channel = "0.5"

[dev-dependencies]
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] }
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
futures = "0.3"

[package.metadata.docs.rs]
Expand Down
22 changes: 16 additions & 6 deletions console-subscriber/examples/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
"blocks" => {
tokio::task::Builder::new()
.name("blocks")
.spawn(double_sleepy(1, 10));
.spawn(double_sleepy(1, 10))
.unwrap();
}
"coma" => {
tokio::task::Builder::new()
.name("coma")
.spawn(std::future::pending::<()>());
.spawn(std::future::pending::<()>())
.unwrap();
}
"burn" => {
tokio::task::Builder::new().name("burn").spawn(burn(1, 10));
tokio::task::Builder::new()
.name("burn")
.spawn(burn(1, 10))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
Expand All @@ -47,10 +52,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

let task1 = tokio::task::Builder::new()
.name("task1")
.spawn(spawn_tasks(1, 10));
.spawn(spawn_tasks(1, 10))
.unwrap();
let task2 = tokio::task::Builder::new()
.name("task2")
.spawn(spawn_tasks(10, 30));
.spawn(spawn_tasks(10, 30))
.unwrap();

let result = tokio::try_join! {
task1,
Expand All @@ -66,7 +73,10 @@ async fn spawn_tasks(min: u64, max: u64) {
loop {
for i in min..max {
tracing::trace!(i, "spawning wait task");
tokio::task::Builder::new().name("wait").spawn(wait(i));
tokio::task::Builder::new()
.name("wait")
.spawn(wait(i))
.unwrap();

let sleep = Duration::from_secs(max) - Duration::from_secs(i);
tracing::trace!(?sleep, "sleeping...");
Expand Down
14 changes: 10 additions & 4 deletions console-subscriber/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for i in 0..30 {
let c = barrier.clone();
let task_name = format!("task-{}", i);
handles.push(task::Builder::default().name(&task_name).spawn(async move {
tokio::time::sleep(Duration::from_secs(i)).await;
c.wait().await
}));
handles.push(
task::Builder::default()
.name(&task_name)
.spawn(async move {
tokio::time::sleep(Duration::from_secs(i)).await;
c.wait().await
})
.unwrap(),
);
}

// Will not resolve until all "after wait" messages have been printed
Expand All @@ -33,6 +38,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Exactly one barrier will resolve as the "leader"
assert_eq!(num_leaders, 1);
})
.unwrap()
.await?;

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion console-subscriber/examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
*lock += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
})
.unwrap();
}

while *count.lock().await < 50 {}
})
.unwrap()
.await?;

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion console-subscriber/examples/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
*lock += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
})
.unwrap();
}

loop {
Expand All @@ -31,6 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
}
})
.unwrap()
.await?;

Ok(())
Expand Down
19 changes: 12 additions & 7 deletions console-subscriber/examples/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.spawn(async move {
let _permit = acquire_sem.acquire_many(i).await.unwrap();
tokio::time::sleep(Duration::from_secs(i as u64 * 2)).await;
}),
})
.unwrap(),
);
tasks.push(
tokio::task::Builder::default()
.name(&add_task_name)
.spawn(async move {
tokio::time::sleep(Duration::from_secs(i as u64 * 5)).await;
add_sem.add_permits(i as usize);
})
.unwrap(),
);
tasks.push(tokio::task::Builder::default().name(&add_task_name).spawn(
async move {
tokio::time::sleep(Duration::from_secs(i as u64 * 5)).await;
add_sem.add_permits(i as usize);
},
));
}

let all_tasks = futures::future::try_join_all(tasks);
all_tasks.await.unwrap();
})
.unwrap()
.await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ where
T: Send + 'static,
{
#[cfg(tokio_unstable)]
return tokio::task::Builder::new().name(_name).spawn(task);
return tokio::task::Builder::new().name(_name).spawn(task).unwrap();

#[cfg(not(tokio_unstable))]
tokio::spawn(task)
Expand Down

0 comments on commit c34ac2d

Please sign in to comment.