From e6fb5f81b3bffa62cd0a6c49f8f933cdc7b301b4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 15:31:47 +0100 Subject: [PATCH] Unbounded stream receive window Depend on connection window only. --- test-harness/src/lib.rs | 6 +++++- yamux/src/connection/stream.rs | 4 ++-- yamux/src/lib.rs | 12 +++++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 72e2a596..acd43cc9 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -449,7 +449,11 @@ impl Arbitrary for TestConfig { fn arbitrary(g: &mut Gen) -> Self { let mut c = Config::default(); c.set_read_after_close(Arbitrary::arbitrary(g)); - c.set_receive_window(256 * 1024 + usize::arbitrary(g) % (768 * 1024)); + if bool::arbitrary(g) { + c.set_receive_window(Some(256 * 1024 + usize::arbitrary(g) % (768 * 1024))); + } else { + c.set_receive_window(None); + } TestConfig(c) } } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 0ccc638f..08c754ac 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -25,7 +25,6 @@ use futures::{ ready, SinkExt, }; use parking_lot::{Mutex, MutexGuard}; -use std::convert::TryInto; use std::time::Instant; use std::{ fmt, io, @@ -546,6 +545,7 @@ impl Shared { self.window_max, self.window ); + let bytes_received = self.window_max.saturating_sub(self.window); let buffer_len = self.buffer.len(); let mut new_credit = bytes_received.saturating_sub(buffer_len); @@ -571,7 +571,7 @@ impl Shared { self.window_max = std::cmp::min( std::cmp::min( self.window_max.saturating_mul(2), - self.config.receive_window, + self.config.receive_window.unwrap_or(usize::MAX), ), self.window_max + ((self.config.connection_window diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index d3935d77..fbb56a39 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -77,7 +77,7 @@ const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024; #[derive(Debug, Clone)] pub struct Config { // TODO: Rename to max_stream_receive_window - receive_window: usize, + receive_window: Option, // TODO: Rename to max_connection_receive_window connection_window: usize, max_buffer_size: usize, @@ -89,7 +89,8 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { - receive_window: 16 * 1024 * 1024, + // TODO: Add rational: given that we have a connection window, ... + receive_window: None, // TODO: reevaluate default. // TODO: Add setter. connection_window: 1 * 1024 * 1024 * 1024, @@ -108,12 +109,12 @@ impl Config { /// # Panics /// /// If the given receive window is < 256 KiB. - pub fn set_receive_window(&mut self, n: usize) -> &mut Self { + pub fn set_receive_window(&mut self, n: Option) -> &mut Self { self.receive_window = n; self.check(); self } - + pub fn set_connection_window(&mut self, n: usize) -> &mut Self { self.connection_window = n; self.check(); @@ -150,8 +151,9 @@ impl Config { self } + // TODO: Consider doing the check on creation, not on each builder method call. fn check(&self) { - assert!(self.receive_window >= DEFAULT_CREDIT); + assert!(self.receive_window.unwrap_or(usize::MAX) >= DEFAULT_CREDIT); assert!(self.connection_window >= self.max_num_streams * DEFAULT_CREDIT); } }