From d1d468c13c8df04ef8b52566c58961b24652515c Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 8 Mar 2023 16:29:38 +0800 Subject: [PATCH] Use signed peer records on handle prune --- core/src/signed_envelope.rs | 2 +- protocols/gossipsub/src/behaviour.rs | 29 +++++++-- protocols/gossipsub/src/behaviour/tests.rs | 72 ++++++++++++++++++++-- protocols/gossipsub/src/protocol.rs | 25 +++++--- protocols/gossipsub/src/types.rs | 11 ++-- 5 files changed, 114 insertions(+), 25 deletions(-) diff --git a/core/src/signed_envelope.rs b/core/src/signed_envelope.rs index 12c1324efa97..927416f7191e 100644 --- a/core/src/signed_envelope.rs +++ b/core/src/signed_envelope.rs @@ -9,7 +9,7 @@ use unsigned_varint::encode::usize_buffer; /// A signed envelope contains an arbitrary byte string payload, a signature of the payload, and the public key that can be used to verify the signature. /// /// For more details see libp2p RFC0002: -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SignedEnvelope { key: PublicKey, payload_type: Vec, diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9e778fd1b0ff..52035aacad9a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -36,7 +36,7 @@ use rand::{seq::SliceRandom, thread_rng}; use libp2p_core::{ identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr, - PeerId, + PeerId, PeerRecord, }; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, @@ -1073,7 +1073,13 @@ where |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0, ) .into_iter() - .map(|p| PeerInfo { peer_id: Some(p) }) + .map(|p| PeerInfo { + peer_id: Some(p), + // TODO: Retrieve signed_peer_record from Registrations store + // go-gossipsub uses a global store https://github.com/libp2p/go-libp2p-pubsub/blob/829f9026a3dcf12b268efad6a140dd99446cf17b/gossipsub.go#L1874 + // rust-libp2p collects records with the server behaviour of rendezvous protocol + signed_peer_record: None, + }) .collect() } else { Vec::new() @@ -1623,15 +1629,26 @@ where } for p in px { - // TODO: Once signed records are spec'd: extract signed peer record if given and handle - // it, see https://github.com/libp2p/specs/pull/217 - if let Some(peer_id) = p.peer_id { + let dial_opts = if let Some(signed_peer_record) = p.signed_peer_record { + // If signed envelop is invalid, return None and ignore peer exchange + if let Ok(peer_record) = PeerRecord::from_signed_envelope(signed_peer_record) { + Some((peer_record.peer_id(), peer_record.addresses().to_vec())) + } else { + None + } + } else if let Some(peer_id) = p.peer_id { + Some::<(PeerId, Vec)>((peer_id, vec![])) + } else { + None + }; + + if let Some((peer_id, addresses)) = dial_opts { // mark as px peer self.px_peers.insert(peer_id); // dial peer self.events.push_back(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(peer_id).build(), + opts: DialOpts::peer_id(peer_id).addresses(addresses).build(), }); } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 6502c5986ec1..11b9163bc59d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -345,11 +345,10 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { .filter_map(|info| { info.peer_id .and_then(|id| PeerId::from_bytes(&id).ok()) - .map(|peer_id| - //TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217 - PeerInfo { - peer_id: Some(peer_id), - }) + .map(|peer_id| PeerInfo { + peer_id: Some(peer_id), + signed_peer_record: None, + }) }) .collect::>(); @@ -1814,6 +1813,7 @@ fn test_connect_to_px_peers_on_handle_prune() { for _ in 0..config.prune_peers() + 5 { px.push(PeerInfo { peer_id: Some(PeerId::random()), + signed_peer_record: None, }); } @@ -1852,6 +1852,65 @@ fn test_connect_to_px_peers_on_handle_prune() { )); } +#[test] +fn test_connect_to_px_peers_with_peer_record_on_handle_prune() { + let config: Config = Config::default(); + + let (mut gs, peers, topics) = inject_nodes1() + .peer_no(1) + .topics(vec!["test".into()]) + .to_subscribe(true) + .create_network(); + + // handle prune from single peer with px peers + + let mut px_peer_ids = Vec::new(); + let mut px = Vec::new(); + // propose more px peers than config.prune_peers() + for i in 0..config.prune_peers() + 5 { + let key = Keypair::generate_ed25519(); + let address: Multiaddr = format!("/ip4/1.2.3.4/tcp/{i}").try_into().unwrap(); + let peer_record = PeerRecord::new(&key, vec![address]).unwrap(); + + px_peer_ids.push(key.public().to_peer_id()); + px.push(PeerInfo { + peer_id: None, + signed_peer_record: Some(peer_record.into_signed_envelope()), + }); + } + + gs.handle_prune( + &peers[0], + vec![( + topics[0].clone(), + px.clone(), + Some(config.prune_backoff().as_secs()), + )], + ); + + // Check DialPeer events for px peers + let dials: Vec<_> = gs + .events + .iter() + .filter_map(|e| match e { + // TODO: How to extract addresses from DialOpts to assert them in the test? + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(), + _ => None, + }) + .collect(); + + // Exactly config.prune_peers() many random peers should be dialled + assert_eq!(dials.len(), config.prune_peers()); + + let dials_set: HashSet<_> = dials.into_iter().collect(); + + // No duplicates + assert_eq!(dials_set.len(), config.prune_peers()); + + // all dial peers must be in px + assert!(dials_set.is_subset(&px_peer_ids.iter().map(|i| *i).collect::>())); +} + #[test] fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); @@ -2474,6 +2533,7 @@ fn test_ignore_px_from_negative_scored_peer() { //handle prune from single peer with px peers let px = vec![PeerInfo { peer_id: Some(PeerId::random()), + signed_peer_record: None, }]; gs.handle_prune( @@ -3067,6 +3127,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { // Handle prune from peer peers[0] with px peers let px = vec![PeerInfo { peer_id: Some(PeerId::random()), + signed_peer_record: None, }]; gs.handle_prune( &peers[0], @@ -3089,6 +3150,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { //handle prune from peer peers[1] with px peers let px = vec![PeerInfo { peer_id: Some(PeerId::random()), + signed_peer_record: None, }]; gs.handle_prune( &peers[1], diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 9fafbc04fa23..a05fbab348c0 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -31,6 +31,7 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; use futures::future; use futures::prelude::*; +use libp2p_core::SignedEnvelope; use libp2p_core::{ identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, }; @@ -509,14 +510,24 @@ impl Decoder for GossipsubCodec { .peers .into_iter() .filter_map(|info| { - info.peer_id + let peer_id = info + .peer_id .as_ref() - .and_then(|id| PeerId::from_bytes(id).ok()) - .map(|peer_id| - //TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217 - PeerInfo { - peer_id: Some(peer_id), - }) + .and_then(|peer_id| PeerId::from_bytes(peer_id).ok()); + + let signed_peer_record = info + .signed_peer_record + .as_ref() + .and_then(|spr| SignedEnvelope::from_protobuf_encoding(spr).ok()); + + if peer_id.is_none() && signed_peer_record.is_none() { + None + } else { + Some(PeerInfo { + peer_id, + signed_peer_record, + }) + } }) .collect::>(); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 5a3f54752daf..df94ae5ed118 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -20,7 +20,7 @@ //! A collection of types using the Gossipsub system. use crate::TopicHash; -use libp2p_core::PeerId; +use libp2p_core::{PeerId, SignedEnvelope}; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; @@ -200,9 +200,7 @@ pub enum SubscriptionAction { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PeerInfo { pub peer_id: Option, - //TODO add this when RFC: Signed Address Records got added to the spec (see pull request - // https://github.com/libp2p/specs/pull/217) - //pub signed_peer_record: ?, + pub signed_peer_record: Option, } /// A Control message received by the gossipsub system. @@ -330,8 +328,9 @@ impl From for proto::RPC { .into_iter() .map(|info| proto::PeerInfo { peer_id: info.peer_id.map(|id| id.to_bytes()), - /// TODO, see https://github.com/libp2p/specs/pull/217 - signed_peer_record: None, + signed_peer_record: info + .signed_peer_record + .map(|spr| spr.into_protobuf_encoding()), }) .collect(), backoff,