From 68d213297f962c58c9d2f4571837c6bbd9eb07d0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 25 Nov 2023 11:57:40 +0100 Subject: [PATCH] Fix deadlock --- yamux/src/connection/stream.rs | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 3f6eb2a2..93891688 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -282,8 +282,8 @@ impl Stream { let config = &self.config; let shared = self.shared.lock(); - let accumulated_max_stream_windows = *self.accumulated_max_stream_windows.lock(); let rtt = self.rtt.get(); + let accumulated_max_stream_windows = *self.accumulated_max_stream_windows.lock(); assert!( shared.current_receive_window_size <= shared.max_receive_window_size, @@ -403,6 +403,7 @@ impl Stream { self.last_window_update = Instant::now(); shared.current_receive_window_size += next_window_update; + drop(shared); debug_assert(); @@ -610,8 +611,8 @@ impl AsyncWrite for Stream { impl Drop for Stream { fn drop(&mut self) { - let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock(); let max_receive_window_size = self.shared.lock().max_receive_window_size; + let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock(); debug_assert!( *accumulated_max_stream_windows >= (max_receive_window_size - DEFAULT_CREDIT) as usize, @@ -648,19 +649,24 @@ impl quickcheck::Arbitrary for Stream { let mut shared = Shared::arbitrary(g); let config = Arc::new(Config::arbitrary(g)); + let rtt = Rtt::arbitrary(g); - // Update `shared` to align with `config`. - shared.max_receive_window_size = g.gen_range( + // Update `shared` to align with `config` and rtt. + shared.max_receive_window_size = if rtt.get().is_none() { DEFAULT_CREDIT - ..cmp::min( - config.max_stream_receive_window.unwrap_or(u32::MAX), - (DEFAULT_CREDIT as usize + config.max_connection_receive_window - - (config.max_num_streams * (DEFAULT_CREDIT as usize))) - .try_into() - .unwrap_or(u32::MAX), - ) - .saturating_add(1), - ); + } else { + g.gen_range( + DEFAULT_CREDIT + ..cmp::min( + config.max_stream_receive_window.unwrap_or(u32::MAX), + (DEFAULT_CREDIT as usize + config.max_connection_receive_window + - (config.max_num_streams * (DEFAULT_CREDIT as usize))) + .try_into() + .unwrap_or(u32::MAX), + ) + .saturating_add(1), + ) + }; shared.current_receive_window_size = g.gen_range(0..shared.max_receive_window_size); Self { @@ -674,7 +680,7 @@ impl quickcheck::Arbitrary for Stream { - config.max_num_streams * DEFAULT_CREDIT as usize + 1), ))), - rtt: Rtt::arbitrary(g), + rtt, last_window_update: Instant::now() - std::time::Duration::from_secs(g.gen_range(0..(60 * 60 * 24))), config,