Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh-blobs): use async_channel instead of flume for local_pool #2533

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
async-channel = "2.3.1"
bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
Expand Down
23 changes: 11 additions & 12 deletions iroh-blobs/src/util/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Deref for LocalPool {
#[derive(Debug, Clone)]
pub struct LocalPoolHandle {
/// The sender half of the channel used to send tasks to the pool
send: flume::Sender<Message>,
send: async_channel::Sender<Message>,
}

/// What to do when a panic occurs in a pool thread
Expand Down Expand Up @@ -124,7 +124,7 @@ impl LocalPool {
panic_mode,
} = config;
let cancel_token = CancellationToken::new();
let (send, recv) = flume::unbounded::<Message>();
let (send, recv) = async_channel::unbounded::<Message>();
let shutdown_sem = Arc::new(Semaphore::new(0));
let handle = tokio::runtime::Handle::current();
let handles = (0..threads)
Expand Down Expand Up @@ -159,7 +159,7 @@ impl LocalPool {
/// Spawn a new pool thread.
fn spawn_pool_thread(
thread_name: String,
recv: flume::Receiver<Message>,
recv: async_channel::Receiver<Message>,
cancel_token: CancellationToken,
panic_mode: PanicMode,
shutdown_sem: Arc<Semaphore>,
Expand Down Expand Up @@ -198,18 +198,18 @@ impl LocalPool {
// if the cancel token is cancelled, break the loop immediately
_ = cancel_token.cancelled() => break ShutdownMode::Stop,
// if we receive a message, execute it
msg = recv.recv_async() => {
msg = recv.recv() => {
match msg {
// just push into the FuturesUnordered
// just push into the join set
Ok(Message::Execute(f)) => {
s.spawn_local((f)());
}
// break with optional semaphore
Ok(Message::Finish) => break ShutdownMode::Finish,
// if the sender is dropped, break the loop immediately
Err(flume::RecvError::Disconnected) => break ShutdownMode::Stop,
Err(async_channel::RecvError) => break ShutdownMode::Stop,
}
}
},
}
}
}));
Expand Down Expand Up @@ -247,7 +247,7 @@ impl LocalPool {

/// Immediately stop polling all tasks and wait for all threads to finish.
///
/// This is like droo, but waits for thread completion asynchronously.
/// This is like drop, but waits for thread completion asynchronously.
///
/// If there was a panic on any of the threads, it will be re-thrown here.
pub async fn shutdown(self) {
Expand All @@ -270,14 +270,13 @@ impl LocalPool {
// we assume that there are exactly as many threads as there are handles.
// also, we assume that the threads are still running.
for _ in 0..self.threads_u32() {
println!("sending shutdown message");
// send the shutdown message
// sending will fail if all threads are already finished, but
// in that case we don't need to do anything.
//
// Threads will add a permit in any case, so await_thread_completion
// will then immediately return.
self.send.send(Message::Finish).ok();
self.send.send_blocking(Message::Finish).ok();
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
}
self.await_thread_completion().await;
}
Expand Down Expand Up @@ -460,7 +459,7 @@ impl LocalPoolHandle {
/// spawn a task in the pool.
pub fn try_spawn_detached_boxed(&self, gen: SpawnFn) -> SpawnResult<()> {
self.send
.send(Message::Execute(gen))
.send_blocking(Message::Execute(gen))
.map_err(|_| SpawnError::Cancelled)
}
}
Expand Down Expand Up @@ -593,7 +592,7 @@ mod tests {
}

#[tokio::test]
async fn test_shutdown() {
async fn test_finish() {
let _ = tracing_subscriber::fmt::try_init();
let pool = LocalPool::new(Config::default());
let counter = Arc::new(AtomicU64::new(0));
Expand Down
Loading