Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(yamux): increase max number of buffered inbound streams to 256 #3872

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Increase the amount of buffered inbound streams to 256.
See [PR 3872].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3872]: https://github.com/libp2p/rust-libp2p/pull/3872

## 0.43.1

Expand Down
6 changes: 5 additions & 1 deletion muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ pub struct Muxer<S> {
inbound_stream_waker: Option<Waker>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
/// Yamux does not have any built-in backpressure mechanism.
///
/// Implementations are encouraged to not open more than 256 unacknowledged streams, see <https://github.com/libp2p/specs/tree/master/yamux#ack-backlog>.
/// Thus, it makes sense to buffer up to 256 streams before dropping them for good interoperability.
const MAX_BUFFERED_INBOUND_STREAMS: usize = 256;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow the reasoning above. At the point where a new stream is retrieved through this.poll_inner, the stream will be acknowledged to the remote, thus allowing the remote to open another new stream.

As far as I can tell, bumping this limit just delays running into the limit itself, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for questioning this!

At the point where a new stream is retrieved through this.poll_inner, the stream will be acknowledged to the remote

I don't think this is true. We emit streams in rust-yamux without acknowledging them directly because we don't want to send a frame for just acknowledging a stream. I'll have to dig into this in detail but I think we delay the acknowledgement until we send the first payload on this stream.

The first payload will be multistream-select which in our case happens directly after we have removed a stream from this buffer. Meaning if our Connection is slow in removing streams from this buffer, the remote should eventually stop opening new streams.

I think it makes sense to write a test against this just to be sure but for that we land the ACK backlog changes first: libp2p/rust-yamux#150

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the WindowUpdateMode is set to OnRead (which is the default), then we are not sending a Frame on a newly opened inbound stream until the user first uses it. To acknowledge the stream in that case, we preemptively set the ACK flag:

https://github.com/libp2p/rust-yamux/blob/72ccd5734f71971fc1f02ac4a5f43b8eb4dff660/yamux/src/connection.rs#L656

Let me know if you see anything wrong with the reasoning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized I still had a fully-working branch for the ACK backlog locally so I just quickly added a test for this as well and it confirmed my intuition: libp2p/rust-yamux#153


impl<S> fmt::Debug for Muxer<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down