Skip to content

Commit

Permalink
refactor(request-response): don't use upgrade infrastructure
Browse files Browse the repository at this point in the history
This patch refactors `libp2p-request-response` to not use the "upgrade infrastructure" provided by `libp2p-swarm`. Instead, we directly convert the negotiated streams into futures that read and write the messages.

Related: libp2p#3268.
Related: libp2p#2863.

Pull-Request: libp2p#3914.

Co-authored-by: Yiannis Marangos <[email protected]>
  • Loading branch information
thomaseizinger and oblique authored Oct 26, 2023
1 parent eb0c66f commit 77a8818
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.12.0 - unreleased

- Remove `Clone`, `PartialEq` and `Eq` implementations on `Event` and its sub-structs.
The `Event` also contains errors which are not clonable or comparable.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).

## 0.11.0

Expand Down
16 changes: 9 additions & 7 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, ProtocolSupport, RequestId, ResponseChannel,
self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel,
};
use libp2p_swarm::{
behaviour::{
Expand Down Expand Up @@ -133,7 +133,7 @@ impl ProbeId {
}

/// Event produced by [`Behaviour`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum Event {
/// Event on an inbound probe.
InboundProbe(InboundProbeEvent),
Expand Down Expand Up @@ -187,14 +187,14 @@ pub struct Behaviour {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
>,

// Ongoing outbound probes and mapped to the inner request id.
ongoing_outbound: HashMap<RequestId, ProbeId>,
ongoing_outbound: HashMap<OutboundRequestId, ProbeId>,

// Connected peers with the observed address of each connection.
// If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips),
Expand All @@ -220,9 +220,11 @@ pub struct Behaviour {
impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self {
let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
let mut cfg = request_response::Config::default();
cfg.set_request_timeout(config.timeout);
let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg);
let inner = request_response::Behaviour::with_codec(
AutoNatCodec,
protocols,
request_response::Config::default().with_request_timeout(config.timeout),
);
Self {
local_peer_id,
inner,
Expand Down
10 changes: 5 additions & 5 deletions src/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures_timer::Delay;
use instant::Instant;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response::{self as request_response, OutboundFailure, RequestId};
use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId};
use libp2p_swarm::{ConnectionId, ListenAddresses, ToSwarm};
use rand::{seq::SliceRandom, thread_rng};
use std::{
Expand All @@ -39,7 +39,7 @@ use std::{
};

/// Outbound probe failed or was aborted.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeError {
/// Probe was aborted because no server is known, or all servers
/// are throttled through [`Config::throttle_server_period`].
Expand All @@ -53,7 +53,7 @@ pub enum OutboundProbeError {
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeEvent {
/// A dial-back request was sent to a remote peer.
Request {
Expand Down Expand Up @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> {
pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>,
pub(crate) nat_status: &'a mut NatStatus,
pub(crate) confidence: &'a mut usize,
pub(crate) ongoing_outbound: &'a mut HashMap<RequestId, ProbeId>,
pub(crate) ongoing_outbound: &'a mut HashMap<OutboundRequestId, ProbeId>,
pub(crate) last_probe: &'a mut Option<Instant>,
pub(crate) schedule_probe: &'a mut Delay,
pub(crate) listen_addresses: &'a ListenAddresses,
Expand All @@ -117,7 +117,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
let probe_id = self
.ongoing_outbound
.remove(&request_id)
.expect("RequestId exists.");
.expect("OutboundRequestId exists.");

let event = match response.result.clone() {
Ok(address) => OutboundProbeEvent::Response {
Expand Down
8 changes: 4 additions & 4 deletions src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, InboundFailure, RequestId, ResponseChannel,
self as request_response, InboundFailure, InboundRequestId, ResponseChannel,
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
Expand All @@ -38,15 +38,15 @@ use std::{
};

/// Inbound probe failed.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeError {
/// Receiving the dial-back request or sending a response failed.
InboundRequest(InboundFailure),
/// We refused or failed to dial the client.
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeEvent {
/// A dial-back request was received from a remote peer.
Request {
Expand Down Expand Up @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
Expand Down
8 changes: 4 additions & 4 deletions tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_auto_probe() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoAddresses);
assert!(matches!(error, OutboundProbeError::NoAddresses));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -181,10 +181,10 @@ async fn test_confidence() {
peer,
error,
} if !test_public => {
assert_eq!(
assert!(matches!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
));
(peer.unwrap(), probe_id)
}
other => panic!("Unexpected Outbound Event: {other:?}"),
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_throttle_server_period() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoServer);
assert!(matches!(error, OutboundProbeError::NoServer));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down
9 changes: 6 additions & 3 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ async fn test_dial_error() {
}) => {
assert_eq!(probe_id, request_probe_id);
assert_eq!(peer, client_id);
assert_eq!(error, InboundProbeError::Response(ResponseError::DialError));
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialError)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() {
}) => {
assert_eq!(client_id, peer);
assert_ne!(first_probe_id, probe_id);
assert_eq!(
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialRefused)
)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
};
Expand Down

0 comments on commit 77a8818

Please sign in to comment.