Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Spawn for each stream (#26086) (#26264)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2cc48a6)

Co-authored-by: sakridge <[email protected]>
  • Loading branch information
mergify[bot] and sakridge authored Jun 27, 2022
1 parent b5c789a commit d02a1f0
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,33 +237,39 @@ async fn handle_connection(
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
}
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
});
}
Err(e) => {
debug!("stream error: {:?}", e);
Expand Down

0 comments on commit d02a1f0

Please sign in to comment.