diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index c6f2e00df53a26..89f9de78491101 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -408,11 +408,16 @@ async fn handle_connection( )); match futures::future::try_join(send_requests_task, recv_requests_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), - Ok(((), Err(err))) => { - debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); - record_error(&err, &stats); + Ok(out) => { + if let (Err(ref err), _) = out { + debug!("send_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } + if let (_, Err(ref err)) = out { + debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } } - Ok(((), Ok(()))) => (), } drop_connection(remote_pubkey, &connection, &cache).await; if let Entry::Occupied(entry) = router.write().await.entry(remote_address) { @@ -513,15 +518,27 @@ async fn send_requests_task( connection: Connection, mut receiver: AsyncReceiver, stats: Arc, -) { - while let Some(request) = receiver.recv().await { - tokio::task::spawn(send_request_task( - endpoint.clone(), - remote_address, - connection.clone(), - request, - stats.clone(), - )); +) -> Result<(), Error> { + tokio::pin! { + let connection_closed = connection.closed(); + } + loop { + tokio::select! { + biased; + request = receiver.recv() => { + match request { + None => return Ok(()), + Some(request) => tokio::task::spawn(send_request_task( + endpoint.clone(), + remote_address, + connection.clone(), + request, + stats.clone(), + )), + }; + } + err = &mut connection_closed => return Err(Error::from(err)), + } } }