Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Adapt QuicMuxer to upstream StreamMuxer changes #6

Closed
wants to merge 97 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
7c8a977
swarm/src/handler: Document responsibility limiting inbound streams (…
mxinden Jul 14, 2022
d4f8ec2
misc/metrics: Track # connected nodes supporting specific protocol (…
mxinden Jul 15, 2022
1a553db
core/muxing: Flatten `StreamMuxer` interface to `poll_{inbound,outbou…
thomaseizinger Jul 18, 2022
e95232c
build(deps): Bump Swatinem/rust-cache from 1.4.0 to 2.0.0 (#2759)
dependabot[bot] Jul 19, 2022
66c2319
transports/tcp: Bump to v0.35.0 (#2760)
mxinden Jul 19, 2022
c8066df
*: Update to `if-watch` `1.1.1` (#2754)
tgmichel Jul 19, 2022
163c5c1
README.md: Add crates.io and docs.rs badges (#2766)
LesnyRumcajs Jul 21, 2022
51a8471
build(deps): Update prometheus-client requirement from 0.16.0 to 0.17…
dependabot[bot] Jul 22, 2022
f15a3dc
core/muxing: Drop `Unpin` requirement from `SubstreamBox` (#2762)
thomaseizinger Jul 22, 2022
2e2c117
core/tests: Remove unnecessary `Arc` (#2763)
thomaseizinger Jul 22, 2022
95713ab
core: fix PR number in changelog entry (#2769)
elenaf9 Jul 23, 2022
f85a990
core/tests: Remove unnecessary util module (#2764)
thomaseizinger Jul 25, 2022
c19a211
misc/metrics: fix clippy::assign-op-pattern (#2773)
elenaf9 Jul 25, 2022
0ec3bbc
core/muxing: Remove `Unpin` requirement from `StreamMuxer::Substream`…
thomaseizinger Jul 25, 2022
74f01e4
transports/tcp: fix clippy::from-over-into (#2774)
elenaf9 Jul 25, 2022
ce963df
core: fix clippy::op-ref, clippy::needless-borrow (#2770)
elenaf9 Jul 25, 2022
56c492c
core/muxing: Drop `Sync` requirement for `StreamMuxer` on `StreamMuxe…
thomaseizinger Jul 27, 2022
09c6908
protocols/dcutr: Fix clippy lints (#2772)
elenaf9 Jul 28, 2022
eaf3f3a
.cargo: Check all features in custom-clippy (#2771)
elenaf9 Jul 28, 2022
7019d49
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/mult…
elenaf9 Jul 31, 2022
2b9e212
examples/README.md: Fix tutorial link (#2790)
lukehinds Aug 2, 2022
028dece
core/muxing: Have functions on `StreamMuxer` take `Pin<&mut Self>` (#…
thomaseizinger Aug 3, 2022
07c0dba
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 3, 2022
57840a3
transports/quic: adapt QuicMuxer to libp2p#2724
elenaf9 Aug 3, 2022
579b1be
swarm-derive/: Generate OutEvent if not provided (#2792)
mxinden Aug 8, 2022
e2b83b7
SECURITY.md: Document supported releases and security mail addr (#2800)
mxinden Aug 8, 2022
3da8b42
README: Point to [email protected] (#2799)
mxinden Aug 8, 2022
1012579
protocols/: Remove passing default variant to `WithPeerId::condition`…
K0UR05H Aug 10, 2022
a4110a2
*: Remove `inject_connected` / `inject_disconnected` from docs (#2805)
K0UR05H Aug 10, 2022
0a01c81
misc/multistream-select: Replace msg.get(0) with msg.first() (#2816)
mxinden Aug 13, 2022
3ce0ef9
transports/quic: apply suggestions from review
elenaf9 Aug 13, 2022
3060d12
transports/quic: rename QuicMuxerInner -> Inner
elenaf9 Aug 13, 2022
63c6edc
transports/quic: improve poll_{inbound, outbound}
elenaf9 Aug 13, 2022
06aaea6
*: Fix `clippy::derive-partial-eq-without-eq` (#2818)
elenaf9 Aug 14, 2022
cef5056
core/muxing: Generalise `StreamMuxer::poll_address_change` to `poll` …
thomaseizinger Aug 16, 2022
0e5a25d
examples/file-sharing: Support binary files (#2786)
qidu Aug 16, 2022
878c49f
swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess (#2784)
mxinden Aug 16, 2022
6a9fa3d
build(deps): Update prost requirement from 0.10 to 0.11 (#2788)
dependabot[bot] Aug 16, 2022
8dc0188
swarm/src/connection: Test max_negotiating_inbound_streams (#2785)
mxinden Aug 16, 2022
67266c6
swarm-derive/: Add where clause of behaviour to generated out event (…
mxinden Aug 17, 2022
d2c5053
build(deps): Update prometheus-client requirement from 0.17.0 to 0.18…
dependabot[bot] Aug 17, 2022
a2738fd
swarm-derive/: Derive Debug for generated OutEvent (#2821)
mxinden Aug 17, 2022
475289c
docs/coding-guidelines: Add document (#2780)
mxinden Aug 17, 2022
8931860
core/identity: Allow clippy::large-enum-variant on `Keypair` (#2827)
elenaf9 Aug 19, 2022
1aeaba3
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 19, 2022
95fc6da
transports/quic: drive connection in `QuicMuxer::poll`
elenaf9 Aug 19, 2022
3d3666e
*: Enforce no clippy warnings for examples (#2826)
thomaseizinger Aug 20, 2022
217dd2c
clippy.toml: Create config and disallow unbounded channels (#2823)
mxinden Aug 20, 2022
0d7c8a5
transports/quic: refactor `Connection::poll_event`
elenaf9 Aug 21, 2022
67b52aa
transports/quic: rm `Connection::is_handshaking`
elenaf9 Aug 21, 2022
66974fc
transports/quic: refactor connection closing
elenaf9 Aug 22, 2022
4253080
*: Prepare v0.47.0 (#2830)
mxinden Aug 22, 2022
c88efe8
transports/quic: rm mutex around to_endpoint tx
elenaf9 Aug 22, 2022
0a82be4
transports/quic/tests: drive peers concurrently
elenaf9 Aug 22, 2022
d610e4b
protocols/dcutr: Disable `libp2p-core` default features (#2836)
elenaf9 Aug 23, 2022
d92cab8
build(deps): Update p256 requirement from 0.10.0 to 0.11.0 (#2636)
dependabot[bot] Aug 23, 2022
ca07ce4
swarm/behaviour: Remove deprecated NetworkBehaviourEventProcess (#2840)
mxinden Aug 26, 2022
a3dec47
docs/coding-guidelines: Document limit on number of tasks (#2839)
mxinden Aug 26, 2022
247b553
swarm-derive/: Remove support for custom poll method (#2841)
mxinden Aug 28, 2022
6855ab9
swarm-derive/: Remove support for ignoring fields on struct (#2842)
mxinden Aug 29, 2022
e01f77b
transports/noise: Migrate away from deprecated `sodiumoxide` for test…
pinkforest Aug 30, 2022
f16561c
.github/workflows: Split advisory issues from PR workflows using `car…
pinkforest Aug 30, 2022
36a2773
*: Update changelogs for prost dep update (#2851)
divagant-martian Aug 30, 2022
89f898c
protocols/mdns: Allow users to choose between async-io and tokio runt…
gallegogt Sep 2, 2022
cee199a
protocols/kad: Support multiple protocol names (#2846)
dmitry-markin Sep 3, 2022
f04df29
.git-blame-ignore-revs/: Initialize and add rustfmt commit (#2864)
thomaseizinger Sep 4, 2022
b8c3b28
protocols/gossipsub: Allow publishing to anything that implements `In…
GamePad64 Sep 5, 2022
a40180c
.github/: Introduce interop tests (#2835)
laurentsenta Sep 7, 2022
8644c65
core/: Introduce `rsa` feature flag to avoid `ring` dependency (#2860)
GamePad64 Sep 7, 2022
2eca38c
core/upgrade/: Add `ReadyUpgrade` (#2855)
thomaseizinger Sep 7, 2022
d2eddf4
muxers/yamux: Remove `OpenSubstreamToken` (#2873)
thomaseizinger Sep 7, 2022
83c6795
*: Prepare v0.48.0 (#2869)
mxinden Sep 7, 2022
c650dc1
*: Replace _serde with dep:serde in Cargo.toml (#2868)
GamePad64 Sep 8, 2022
69caf98
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Sep 9, 2022
fe3e09b
transports/quic: upgrade to if-watch v2.0.0
elenaf9 Sep 9, 2022
b6924db
transports/quic: fix clippy
elenaf9 Sep 9, 2022
689460f
transports/quic: fix smoke test
elenaf9 Sep 9, 2022
457fb51
transports/tcp: Simplify IfWatcher integration (#2813)
elenaf9 Sep 10, 2022
41d39fb
transports/quic: add `Endpoint::try_send`
elenaf9 Sep 10, 2022
66c2755
swarm/: Fix rare test failure of `multiple_addresses_err` (#2882)
thomaseizinger Sep 11, 2022
72bade1
build(deps): Update env_logger to 0.9 and criterion to 0.4 (#2896)
kpp Sep 14, 2022
5906140
protocols/kad: Remove deprecated `set_protocol_name()` (#2866)
dmitry-markin Sep 15, 2022
2c739e9
protocols/noise: Introduce `NoiseAuthenticated::xx` constructor with …
thomaseizinger Sep 16, 2022
c81b06a
*: Fix various clippy warnings (#2900)
umgefahren Sep 16, 2022
2025de3
swarm-derive/: Allow for templated behaviours (#2907)
thomaseizinger Sep 16, 2022
4c617a0
subscribe
elenaf9 Sep 17, 2022
4e027b1
transports/quic: handle substream being dropped
elenaf9 Sep 19, 2022
bdba780
transports/quic: return err on read after reset
elenaf9 Sep 19, 2022
40cb4f3
transports/quic: apply comments from code review
elenaf9 Sep 19, 2022
f8d1430
transports/quic: better naming, fix docs
elenaf9 Sep 20, 2022
4c3229b
transports/quic: add doc for `Endpoint:try_send`
elenaf9 Sep 20, 2022
e393fe5
transports/quic: add `ip_to_listenaddr`
elenaf9 Sep 20, 2022
d28db18
transports/quic: disable connection migration
elenaf9 Sep 20, 2022
42db0ed
transports/quic: minor fix
elenaf9 Sep 20, 2022
d46b72e
transports/quic: minor fixes
elenaf9 Sep 20, 2022
ec3c74a
transports/quic: rework forwarding of new connections
elenaf9 Sep 20, 2022
b7103aa
transports/quic: fix broken intra-doc link
elenaf9 Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions transports/quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::{
pub struct Connection {
/// Endpoint this connection belongs to.
endpoint: Endpoint,
/// Future whose job is to send a message to the endpoint. Only one at a time.
/// Pending message to be sent to the background task that is driving the endpoint.
pending_to_endpoint: Option<ToEndpoint>,
/// Events that the endpoint will send in destination to our local [`quinn_proto::Connection`].
/// Passed at initialization.
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Connection {
connection_id: quinn_proto::ConnectionHandle,
from_endpoint: mpsc::Receiver<quinn_proto::ConnectionEvent>,
) -> Self {
assert!(!connection.is_closed());
debug_assert!(!connection.is_closed());
Connection {
endpoint,
pending_to_endpoint: None,
Expand All @@ -107,21 +107,24 @@ impl Connection {
}
}

/// The local address which was used when the peer established the connection.
/// The local address which was used when the remote established the connection to us.
///
/// Works for server connections only.
pub fn local_addr(&self) -> SocketAddr {
debug_assert_eq!(self.connection.side(), quinn_proto::Side::Server);
/// `None` for client connections.
pub fn local_addr(&self) -> Option<SocketAddr> {
if self.connection.side().is_client() {
return None;
}
let endpoint_addr = self.endpoint.socket_addr();
self.connection
.local_ip()
.map(|ip| SocketAddr::new(ip, endpoint_addr.port()))
.unwrap_or_else(|| {
// In a normal case scenario this should not happen, because
// we get want to get a local addr for a server connection only.
tracing::error!("trying to get quinn::local_ip for a client");
*endpoint_addr
})

// Local address may differ from the socket address if the socket is
// bound to a wildcard address.
let addr = match self.connection.local_ip() {
Some(ip) => SocketAddr::new(ip, endpoint_addr.port()),
// TODO: `quinn_proto::Connection::local_ip` is only supported for linux,
// so for other platforms we currently still return the endpoint address.
None => *endpoint_addr,
};
Some(addr)
}

/// Returns the address of the node we're connected to.
Expand Down Expand Up @@ -160,7 +163,7 @@ impl Connection {
///
/// If `None` is returned, then a [`ConnectionEvent::StreamAvailable`] event will later be
/// produced when a substream is available.
pub fn pop_incoming_substream(&mut self) -> Option<quinn_proto::StreamId> {
pub fn accept_substream(&mut self) -> Option<quinn_proto::StreamId> {
self.connection.streams().accept(quinn_proto::Dir::Bi)
}

Expand All @@ -171,7 +174,7 @@ impl Connection {
///
/// If `None` is returned, then a [`ConnectionEvent::StreamOpened`] event will later be
/// produced when a substream is available.
pub fn pop_outgoing_substream(&mut self) -> Option<quinn_proto::StreamId> {
pub fn open_substream(&mut self) -> Option<quinn_proto::StreamId> {
self.connection.streams().open(quinn_proto::Dir::Bi)
}

Expand All @@ -183,16 +186,14 @@ impl Connection {
/// On success, a [`quinn_proto::StreamEvent::Finished`] event will later be produced when the
/// substream has been effectively closed. A [`ConnectionEvent::StreamStopped`] event can also
/// be emitted.
pub fn shutdown_substream(
pub fn finish_substream(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs, this only closes the write-side. If that is the case, perhaps close or close_write would be a better name?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer to stick with finish_substream because it is consistent if the name of the event it produces quinn_proto::StreamEvent::Finished / ConnectionEvent::StreamFinished.
But not a strong opinion, happy to change it to close_write if you insist :)

&mut self,
id: quinn_proto::StreamId,
) -> Result<(), quinn_proto::FinishError> {
// closes the write end of the substream without waiting for the remote to receive the
// event. use flush substream to wait for the remote to receive the event.
self.connection.send_stream(id).finish()
}

/// Polls the connection for an event that happend on it.
/// Polls the connection for an event that happened on it.
pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionEvent> {
let mut closed = None;
loop {
Expand Down Expand Up @@ -303,10 +304,10 @@ pub enum ConnectionEvent {
/// Connection has been closed and can no longer be used.
ConnectionLost(Error),

/// Generated after [`Connection::pop_incoming_substream`] has been called and has returned
/// Generated after [`Connection::accept_substream`] has been called and has returned
/// `None`. After this event has been generated, this method is guaranteed to return `Some`.
StreamAvailable,
/// Generated after [`Connection::pop_outgoing_substream`] has been called and has returned
/// Generated after [`Connection::open_substream`] has been called and has returned
/// `None`. After this event has been generated, this method is guaranteed to return `Some`.
StreamOpened,

Expand All @@ -315,7 +316,7 @@ pub enum ConnectionEvent {
/// Generated after `write_substream` has returned a `Blocked` error.
StreamWritable(quinn_proto::StreamId),

/// Generated after [`Connection::shutdown_substream`] has been called.
/// Generated after [`Connection::finish_substream`] has been called.
StreamFinished(quinn_proto::StreamId),
/// A substream has been stopped. This concept is similar to the concept of a substream being
/// "reset", as in a TCP socket being reset for example.
Expand Down
13 changes: 7 additions & 6 deletions transports/quic/src/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl QuicMuxer {
}
}
}

impl StreamMuxer for QuicMuxer {
type Substream = Substream;
type Error = Error;
Expand All @@ -94,7 +95,8 @@ impl StreamMuxer for QuicMuxer {
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected | ConnectionEvent::HandshakeDataReady => {
tracing::warn!(
debug_assert!(
false,
"Unexpected event {:?} on established QUIC connection",
event
);
Expand Down Expand Up @@ -143,7 +145,6 @@ impl StreamMuxer for QuicMuxer {
}
}
inner.poll_connection_waker = Some(cx.waker().clone());
// TODO: poll address change
Poll::Pending
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -152,7 +153,7 @@ impl StreamMuxer for QuicMuxer {
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
let substream_id = match inner.connection.pop_incoming_substream() {
let substream_id = match inner.connection.accept_substream() {
Some(id) => {
inner.poll_outbound_waker = None;
id
Expand All @@ -172,7 +173,7 @@ impl StreamMuxer for QuicMuxer {
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
let substream_id = match inner.connection.pop_outgoing_substream() {
let substream_id = match inner.connection.open_substream() {
Some(id) => {
inner.poll_outbound_waker = None;
id
Expand All @@ -195,7 +196,7 @@ impl StreamMuxer for QuicMuxer {

if inner.connection.connection.streams().send_streams() != 0 {
for substream in inner.substreams.keys().cloned().collect::<Vec<_>>() {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = inner.connection.shutdown_substream(substream) {
if let Err(e) = inner.connection.finish_substream(substream) {
tracing::warn!("substream finish error on muxer close: {}", e);
}
}
Expand Down Expand Up @@ -321,7 +322,7 @@ impl AsyncWrite for Substream {

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let mut muxer = self.muxer.lock();
match muxer.connection.shutdown_substream(self.id) {
match muxer.connection.finish_substream(self.id) {
Ok(()) => {
let substream_state = muxer
.substreams
Expand Down
5 changes: 4 additions & 1 deletion transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,10 @@ impl Stream for Listener {
}
match self.new_connections_rx.poll_next_unpin(cx) {
Poll::Ready(Some(connection)) => {
let local_addr = socketaddr_to_multiaddr(&connection.local_addr());
let local_addr = connection
.local_addr()
.expect("exists for server connections.");
let local_addr = socketaddr_to_multiaddr(&local_addr);
let send_back_addr = socketaddr_to_multiaddr(&connection.remote_addr());
let event = TransportEvent::Incoming {
upgrade: Upgrade::from_connection(connection),
Expand Down