Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-tune (dynamic) stream receive window #176

Merged
merged 39 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
71f60c4
feat: dynamic stream window
mxinden Nov 22, 2023
c6ca2e5
expose rtt
mxinden Nov 22, 2023
b8764d2
Introduce connection limit
mxinden Nov 23, 2023
1839e8b
improve logging
mxinden Nov 23, 2023
5dc40c2
less logging
mxinden Nov 23, 2023
dd4cf3a
Unbounded stream receive window
mxinden Nov 23, 2023
24ca871
Rename config options
mxinden Nov 24, 2023
5ee74d1
Remove max_buffer_size and rename variables
mxinden Nov 24, 2023
504e876
Revert benchmark changes
mxinden Nov 24, 2023
3e9a870
Refactor rtt
mxinden Nov 24, 2023
8de55cf
randomize nonce
mxinden Nov 24, 2023
6630c9d
Revert to u32 and add quickcheck
mxinden Nov 24, 2023
df2e62c
Minor clean-ups
mxinden Nov 25, 2023
3fda972
Add assertions
mxinden Nov 25, 2023
2892b8d
Fix deadlock
mxinden Nov 25, 2023
85aef76
Move rtt into own module
mxinden Nov 27, 2023
a43fbee
Improve docs
mxinden Nov 27, 2023
bb0c6a5
Undo test changes
mxinden Nov 27, 2023
5d93aa2
Remove Stream::set_flag
mxinden Nov 29, 2023
cf76ccf
Document MAX_FRAME_BODY_LEN
mxinden Nov 29, 2023
b9b74db
Introduce flow_control module
mxinden Dec 1, 2023
55872bf
Remove Config::max_stream_receive_window
mxinden Dec 3, 2023
4d59c6d
Reduce diff
mxinden Dec 3, 2023
0ac5534
Remove unreachable buffer length exceeded
mxinden Dec 3, 2023
c8269b6
docs
mxinden Dec 3, 2023
fc6aaf7
Add explainer for asserts
mxinden Dec 3, 2023
d3a22b8
Remove obsolete code
mxinden Dec 3, 2023
db2971b
fmt
mxinden Dec 3, 2023
8d69551
fmt
mxinden Dec 3, 2023
8261e12
Define and use KIB MIB and GIB constants
mxinden Dec 4, 2023
7581f76
Document DOS mitigation
mxinden Dec 4, 2023
c9c71b6
Document no ping on idle connection
mxinden Dec 4, 2023
82106b0
Derive Clone
mxinden Dec 4, 2023
c3f581f
Remove RttState::Initial
mxinden Dec 4, 2023
33c6b05
Use `Duration`'s `fmt::Debug` implementation
mxinden Dec 4, 2023
68e5e1c
Refactor string formatting
mxinden Dec 4, 2023
aec5fd5
Add changelog entry
mxinden Dec 4, 2023
79c0d11
Use shadowing over mut
mxinden Dec 4, 2023
6b64a9f
Remove current_ and _size
mxinden Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::trace!("{}: creating new outbound stream", self.id);

let id = self.next_stream_id()?;
let mut stream = self.make_new_outbound_stream(id);
stream.set_flag(stream::Flag::Syn);
let stream = self.make_new_outbound_stream(id);

log::debug!("{}: new outbound {} of {}", self.id, stream, self);
self.streams.insert(id, stream.clone_shared());
Expand Down Expand Up @@ -625,7 +624,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::error!("{}: maximum number of streams reached", self.id);
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
{
let mut shared = stream.shared();
if is_finish {
Expand All @@ -636,7 +635,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream);
}
Expand Down Expand Up @@ -726,8 +724,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}

let credit = frame.header().credit() + DEFAULT_CREDIT;
let mut stream = self.make_new_inbound_stream(stream_id, credit);
stream.set_flag(stream::Flag::Ack);
let stream = self.make_new_inbound_stream(stream_id, credit);

if is_finish {
stream
Expand Down
9 changes: 2 additions & 7 deletions yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl Stream {
conn,
config: config.clone(),
sender,
flag: Flag::None,
flag: Flag::Ack,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
shared: Arc::new(Mutex::new(Shared::new(
DEFAULT_CREDIT,
curent_send_window_size,
Expand All @@ -185,7 +185,7 @@ impl Stream {
conn,
config: config.clone(),
sender,
flag: Flag::None,
flag: Flag::Syn,
shared: Arc::new(Mutex::new(Shared::new(DEFAULT_CREDIT, DEFAULT_CREDIT))),
accumulated_max_stream_windows,
rtt,
Expand All @@ -211,11 +211,6 @@ impl Stream {
self.shared().is_pending_ack()
}

/// Set the flag that should be set on the next outbound frame header.
pub(crate) fn set_flag(&mut self, flag: Flag) {
self.flag = flag
}

pub(crate) fn shared(&self) -> MutexGuard<'_, Shared> {
self.shared.lock()
}
Expand Down