From 8cdd98402e8ce0f08e0ef82153d8f682e151a92b Mon Sep 17 00:00:00 2001 From: Bastien Faivre Date: Tue, 10 Dec 2024 14:56:52 +0100 Subject: [PATCH] feat(request-response): Add connection id to behaviour events Closes #5716. Added connection id to the events emitted by a request-response Behaviour and adapted the code accordingly. Pull-Request: #5719. --- Cargo.lock | 6 +- Cargo.toml | 6 +- protocols/autonat/CHANGELOG.md | 4 + protocols/autonat/Cargo.toml | 2 +- .../autonat/src/v1/behaviour/as_client.rs | 2 + .../autonat/src/v1/behaviour/as_server.rs | 2 + protocols/rendezvous/CHANGELOG.md | 4 + protocols/rendezvous/Cargo.toml | 2 +- protocols/rendezvous/src/server.rs | 3 + protocols/request-response/CHANGELOG.md | 5 ++ protocols/request-response/Cargo.toml | 2 +- protocols/request-response/src/lib.rs | 81 ++++++++++++++----- .../request-response/tests/error_reporting.rs | 3 + protocols/request-response/tests/ping.rs | 9 ++- 14 files changed, 100 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45f185d9780..efa03d89d79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2611,7 +2611,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" -version = "0.13.1" +version = "0.13.2" dependencies = [ "async-trait", "asynchronous-codec", @@ -3145,7 +3145,7 @@ dependencies = [ [[package]] name = "libp2p-rendezvous" -version = "0.15.0" +version = "0.15.1" dependencies = [ "async-trait", "asynchronous-codec", @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.27.1" +version = "0.28.0" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 0b5eb844b25..1f43231bd39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ rust-version = "1.80.0" [workspace.dependencies] libp2p = { version = "0.54.2", path = "libp2p" } libp2p-allow-block-list = { version = "0.4.2", path = "misc/allow-block-list" } -libp2p-autonat = { version = "0.13.1", path = "protocols/autonat" } +libp2p-autonat = { version = "0.13.2", path = "protocols/autonat" } libp2p-connection-limits = { version = "0.4.1", path = "misc/connection-limits" } libp2p-core = { version = "0.42.1", path = "core" } libp2p-dcutr = { version = "0.12.1", path = "protocols/dcutr" } @@ -95,8 +95,8 @@ libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.25.0", path = "transports/pnet" } libp2p-quic = { version = "0.11.2", path = "transports/quic" } libp2p-relay = { version = "0.18.1", path = "protocols/relay" } -libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.27.1", path = "protocols/request-response" } +libp2p-rendezvous = { version = "0.15.1", path = "protocols/rendezvous" } +libp2p-request-response = { version = "0.28.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.8", path = "misc/server" } libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.45.2", path = "swarm" } diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 9b2bc4cb2ea..f946f59c9ef 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.13.2 + +- Update to `libp2p-request-response` `v0.28.0`. + ## 0.13.1 - Verify that an incoming AutoNAT dial comes from a connected peer. See [PR 5597](https://github.com/libp2p/rust-libp2p/pull/5597). diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 92ca163d8ec..88564b18541 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-autonat" edition = "2021" rust-version = { workspace = true } description = "NAT and firewall detection for libp2p" -version = "0.13.1" +version = "0.13.2" authors = [ "David Craven ", "Elena Frank ", diff --git a/protocols/autonat/src/v1/behaviour/as_client.rs b/protocols/autonat/src/v1/behaviour/as_client.rs index 3377964373c..ca8daf6e1ac 100644 --- a/protocols/autonat/src/v1/behaviour/as_client.rs +++ b/protocols/autonat/src/v1/behaviour/as_client.rs @@ -112,6 +112,7 @@ impl HandleInnerEvent for AsClient<'_> { request_id, response, }, + .. } => { tracing::debug!(?response, "Outbound dial-back request returned response"); @@ -154,6 +155,7 @@ impl HandleInnerEvent for AsClient<'_> { peer, error, request_id, + .. } => { tracing::debug!( %peer, diff --git a/protocols/autonat/src/v1/behaviour/as_server.rs b/protocols/autonat/src/v1/behaviour/as_server.rs index 663f94122c7..32b4120c552 100644 --- a/protocols/autonat/src/v1/behaviour/as_server.rs +++ b/protocols/autonat/src/v1/behaviour/as_server.rs @@ -107,6 +107,7 @@ impl HandleInnerEvent for AsServer<'_> { request, channel, }, + .. } => { let probe_id = self.probe_id.next(); if !self.connected.contains_key(&peer) { @@ -183,6 +184,7 @@ impl HandleInnerEvent for AsServer<'_> { peer, error, request_id, + .. } => { tracing::debug!( %peer, diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index 1ed9e5bc3b0..ca01538a76d 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.15.1 + +- Update to `libp2p-request-response` `v0.28.0`. + ## 0.15.0 diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 5fa40c3785b..53a579918c5 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-rendezvous" edition = "2021" rust-version = { workspace = true } description = "Rendezvous protocol for libp2p" -version = "0.15.0" +version = "0.15.1" authors = ["The COMIT guys "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 8aafcfb48e3..1be7220cfcb 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -183,6 +183,7 @@ impl NetworkBehaviour for Behaviour { libp2p_request_response::Message::Request { request, channel, .. }, + .. }) => { if let Some((event, response)) = handle_request(peer_id, request, &mut self.registrations) @@ -202,6 +203,7 @@ impl NetworkBehaviour for Behaviour { peer, request_id, error, + .. }) => { tracing::warn!( %peer, @@ -217,6 +219,7 @@ impl NetworkBehaviour for Behaviour { | ToSwarm::GenerateEvent(libp2p_request_response::Event::Message { peer: _, message: libp2p_request_response::Message::Response { .. }, + .. }) | ToSwarm::GenerateEvent(libp2p_request_response::Event::OutboundFailure { .. diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 9ed658fc90f..15cb0c91797 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.28.0 + +- Add connection id to the events emitted by a request-response `Behaviour`. + See [PR 5719](https://github.com/libp2p/rust-libp2p/pull/5719). + ## 0.27.1 - Deprecate `void` crate. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index b2e6fd0b0ac..48ef4c2c066 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.27.1" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 052e1e87e2b..39a773d99b4 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -131,6 +131,8 @@ pub enum Event { Message { /// The peer who sent the message. peer: PeerId, + /// The connection used. + connection_id: ConnectionId, /// The incoming message. message: Message, }, @@ -138,6 +140,8 @@ pub enum Event { OutboundFailure { /// The peer to whom the request was sent. peer: PeerId, + /// The connection used. + connection_id: ConnectionId, /// The (local) ID of the failed request. request_id: OutboundRequestId, /// The error that occurred. @@ -147,6 +151,8 @@ pub enum Event { InboundFailure { /// The peer from whom the request was received. peer: PeerId, + /// The connection used. + connection_id: ConnectionId, /// The ID of the failed inbound request. request_id: InboundRequestId, /// The error that occurred. @@ -159,6 +165,8 @@ pub enum Event { ResponseSent { /// The peer to whom the response was sent. peer: PeerId, + /// The connection used. + connection_id: ConnectionId, /// The ID of the inbound request whose response was sent. request_id: InboundRequestId, }, @@ -569,10 +577,10 @@ where fn remove_pending_outbound_response( &mut self, peer: &PeerId, - connection: ConnectionId, + connection_id: ConnectionId, request: OutboundRequestId, ) -> bool { - self.get_connection_mut(peer, connection) + self.get_connection_mut(peer, connection_id) .map(|c| c.pending_outbound_responses.remove(&request)) .unwrap_or(false) } @@ -585,10 +593,10 @@ where fn remove_pending_inbound_response( &mut self, peer: &PeerId, - connection: ConnectionId, + connection_id: ConnectionId, request: InboundRequestId, ) -> bool { - self.get_connection_mut(peer, connection) + self.get_connection_mut(peer, connection_id) .map(|c| c.pending_inbound_responses.remove(&request)) .unwrap_or(false) } @@ -598,11 +606,11 @@ where fn get_connection_mut( &mut self, peer: &PeerId, - connection: ConnectionId, + connection_id: ConnectionId, ) -> Option<&mut Connection> { self.connected .get_mut(peer) - .and_then(|connections| connections.iter_mut().find(|c| c.id == connection)) + .and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id)) } fn on_address_change( @@ -659,6 +667,7 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer: peer_id, + connection_id, request_id, error: InboundFailure::ConnectionClosed, })); @@ -668,13 +677,21 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer: peer_id, + connection_id, request_id, error: OutboundFailure::ConnectionClosed, })); } } - fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) { + fn on_dial_failure( + &mut self, + DialFailure { + peer_id, + connection_id, + .. + }: DialFailure, + ) { if let Some(peer) = peer_id { // If there are pending outgoing requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending @@ -687,6 +704,7 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer, + connection_id, request_id: request.request_id, error: OutboundFailure::DialFailure, })); @@ -811,7 +829,7 @@ where fn on_connection_handler_event( &mut self, peer: PeerId, - connection: ConnectionId, + connection_id: ConnectionId, event: THandlerOutEvent, ) { match event { @@ -819,7 +837,8 @@ where request_id, response, } => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_outbound_response(&peer, connection_id, request_id); debug_assert!( removed, "Expect request_id to be pending before receiving response.", @@ -830,13 +849,17 @@ where response, }; self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); + .push_back(ToSwarm::GenerateEvent(Event::Message { + peer, + connection_id, + message, + })); } handler::Event::Request { request_id, request, sender, - } => match self.get_connection_mut(&peer, connection) { + } => match self.get_connection_mut(&peer, connection_id) { Some(connection) => { let inserted = connection.pending_inbound_responses.insert(request_id); debug_assert!(inserted, "Expect id of new request to be unknown."); @@ -848,14 +871,19 @@ where channel, }; self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); + .push_back(ToSwarm::GenerateEvent(Event::Message { + peer, + connection_id, + message, + })); } None => { - tracing::debug!("Connection ({connection}) closed after `Event::Request` ({request_id}) has been emitted."); + tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted."); } }, handler::Event::ResponseSent(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_inbound_response(&peer, connection_id, request_id); debug_assert!( removed, "Expect request_id to be pending before response is sent." @@ -864,11 +892,13 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::ResponseSent { peer, + connection_id, request_id, })); } handler::Event::ResponseOmission(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_inbound_response(&peer, connection_id, request_id); debug_assert!( removed, "Expect request_id to be pending before response is omitted.", @@ -877,12 +907,14 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer, + connection_id, request_id, error: InboundFailure::ResponseOmission, })); } handler::Event::OutboundTimeout(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_outbound_response(&peer, connection_id, request_id); debug_assert!( removed, "Expect request_id to be pending before request times out." @@ -891,12 +923,14 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer, + connection_id, request_id, error: OutboundFailure::Timeout, })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_outbound_response(&peer, connection_id, request_id); debug_assert!( removed, "Expect request_id to be pending before failing to connect.", @@ -905,28 +939,33 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer, + connection_id, request_id, error: OutboundFailure::UnsupportedProtocols, })); } handler::Event::OutboundStreamFailed { request_id, error } => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_outbound_response(&peer, connection_id, request_id); debug_assert!(removed, "Expect request_id to be pending upon failure"); self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer, + connection_id, request_id, error: OutboundFailure::Io(error), })) } handler::Event::InboundTimeout(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_inbound_response(&peer, connection_id, request_id); if removed { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer, + connection_id, request_id, error: InboundFailure::Timeout, })); @@ -938,12 +977,14 @@ where } } handler::Event::InboundStreamFailed { request_id, error } => { - let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + let removed = + self.remove_pending_inbound_response(&peer, connection_id, request_id); if removed { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer, + connection_id, request_id, error: InboundFailure::Io(error), })); diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index d1f26378a77..2108b6006c5 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -566,6 +566,7 @@ async fn wait_request( request, channel, }, + .. }) => { return Ok((peer, request_id, request, channel)); } @@ -600,6 +601,7 @@ async fn wait_inbound_failure( peer, request_id, error, + .. }) => { return Ok((peer, request_id, error)); } @@ -618,6 +620,7 @@ async fn wait_outbound_failure( peer, request_id, error, + .. }) => { return Ok((peer, request_id, error)); } diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index e53fe99d6cf..94adedac2d7 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -65,6 +65,7 @@ async fn is_response_outbound() { peer, request_id: req_id, error: _error, + .. } => { assert_eq!(&offline_peer, &peer); assert_eq!(req_id, request_id1); @@ -116,6 +117,7 @@ async fn ping_protocol() { request_response::Message::Request { request, channel, .. }, + .. }) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); @@ -157,6 +159,7 @@ async fn ping_protocol() { request_id, response, }, + .. } => { count += 1; assert_eq!(&response, &expected_pong); @@ -205,7 +208,8 @@ async fn emits_inbound_connection_closed_failure() { event = swarm1.select_next_some() => match event { SwarmEvent::Behaviour(request_response::Event::Message { peer, - message: request_response::Message::Request { request, channel, .. } + message: request_response::Message::Request { request, channel, .. }, + .. }) => { assert_eq!(&request, &ping); assert_eq!(&peer, &peer2_id); @@ -270,7 +274,8 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { event = swarm1.select_next_some() => { if let SwarmEvent::Behaviour(request_response::Event::Message { peer, - message: request_response::Message::Request { request, channel, .. } + message: request_response::Message::Request { request, channel, .. }, + .. }) = event { assert_eq!(&request, &ping); assert_eq!(&peer, &peer2_id);