Skip to content

Commit

Permalink
chore: Replace FuturesUnordered with JoinSet
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jul 23, 2023
1 parent 21e6815 commit fe3f0e0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 16 deletions.
4 changes: 2 additions & 2 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ codegen = ["dep:async-trait"]
gzip = ["dep:flate2"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:async-stream"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:async-stream", "tokio/rt"]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
Expand All @@ -36,7 +36,7 @@ transport = [
"channel",
"dep:h2",
"dep:hyper",
"dep:tokio",
"tokio",
"dep:tower",
"dep:hyper-timeout",
]
Expand Down
22 changes: 8 additions & 14 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::{Connected, Server};
use crate::transport::service::ServerIo;
use futures_util::stream::TryStreamExt;
use hyper::server::{
accept::Accept,
conn::{AddrIncoming, AddrStream},
Expand All @@ -26,6 +25,7 @@ where
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IE: Into<crate::Error>,
{
use futures_util::stream::TryStreamExt;
incoming.err_into().map_ok(ServerIo::new_io)
}

Expand All @@ -41,21 +41,17 @@ where
async_stream::try_stream! {
futures_util::pin_mut!(incoming);

#[cfg(feature = "tls")]
let mut tasks = futures_util::stream::futures_unordered::FuturesUnordered::new();
let mut tasks = tokio::task::JoinSet::new();

loop {
match select(&mut incoming, &mut tasks).await {
SelectOutput::Incoming(stream) => {
if let Some(tls) = &server.tls {
let tls = tls.clone();

let accept = tokio::spawn(async move {
tasks.spawn(async move {
let io = tls.accept(stream).await?;
Ok(ServerIo::new_tls_io(io))
});

tasks.push(accept);
} else {
yield ServerIo::new_io(stream);
}
Expand All @@ -78,16 +74,14 @@ where
}

#[cfg(feature = "tls")]
async fn select<IO, IE>(
async fn select<IO: 'static, IE>(
incoming: &mut (impl Stream<Item = Result<IO, IE>> + Unpin),
tasks: &mut futures_util::stream::futures_unordered::FuturesUnordered<
tokio::task::JoinHandle<Result<ServerIo<IO>, crate::Error>>,
>,
tasks: &mut tokio::task::JoinSet<Result<ServerIo<IO>, crate::Error>>,
) -> SelectOutput<IO>
where
IE: Into<crate::Error>,
{
use futures_util::StreamExt;
use tokio_stream::StreamExt;

if tasks.is_empty() {
return match incoming.try_next().await {
Expand All @@ -106,8 +100,8 @@ where
}
}

accept = tasks.next() => {
match accept.expect("FuturesUnordered stream should never end") {
accept = tasks.join_next() => {
match accept.expect("JoinSet should never end") {
Ok(Ok(io)) => SelectOutput::Io(io),
Ok(Err(e)) => SelectOutput::Err(e),
Err(e) => SelectOutput::Err(e.into()),
Expand Down

0 comments on commit fe3f0e0

Please sign in to comment.