Skip to content

Commit

Permalink
Swarm api changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Mar 20, 2021
1 parent fc6be78 commit 801f6c1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 27 deletions.
6 changes: 3 additions & 3 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
use libp2p_swarm::{
DialPeerCondition,
NetworkBehaviour,
Expand Down Expand Up @@ -1888,11 +1888,11 @@ where
};
}

fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_new_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone());
}

fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_expired_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.remove(addr);
}

Expand Down
30 changes: 24 additions & 6 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(addr); },
None => quote!{ self.#field_n.inject_new_listen_addr(addr); },
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_new_listen_addr(id, addr); },
})
})
};
Expand All @@ -266,8 +266,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(addr); },
None => quote!{ self.#field_n.inject_expired_listen_addr(addr); },
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_expired_listen_addr(id, addr); },
})
})
};
Expand All @@ -286,6 +286,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of statements to put in the body of `inject_expired_external_addr()`.
let inject_expired_external_addr_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_external_addr(addr); },
None => quote!{ self.#field_n.inject_expired_external_addr(addr); },
})
})
};

// Build the list of statements to put in the body of `inject_listener_error()`.
let inject_listener_error_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
Expand Down Expand Up @@ -504,18 +518,22 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_dial_failure_stmts);*
}

fn inject_new_listen_addr(&mut self, addr: &#multiaddr) {
fn inject_new_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_new_listen_addr_stmts);*
}

fn inject_expired_listen_addr(&mut self, addr: &#multiaddr) {
fn inject_expired_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_expired_listen_addr_stmts);*
}

fn inject_new_external_addr(&mut self, addr: &#multiaddr) {
#(#inject_new_external_addr_stmts);*
}

fn inject_expired_external_addr(&mut self, addr: &#multiaddr) {
#(#inject_expired_external_addr_stmts);*
}

fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) {
#(#inject_listener_error_stmts);*
}
Expand Down
8 changes: 6 additions & 2 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,22 @@ pub trait NetworkBehaviour: Send + 'static {
}

/// Indicates to the behaviour that we have started listening on a new multiaddr.
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) {
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
}

/// Indicates to the behaviour that a new multiaddr we were listening on has expired,
/// which means that we are no longer listening in it.
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) {
fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
}

/// Indicates to the behaviour that we have discovered a new external address for us.
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
}

/// Indicates to the behaviour that an external address was removed.
fn inject_expired_external_addr(&mut self, _addr: &Multiaddr) {
}

/// A listener experienced an error.
fn inject_listener_error(&mut self, _id: ListenerId, _err: &(dyn std::error::Error + 'static)) {
}
Expand Down
8 changes: 5 additions & 3 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
/// through this method.
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
self.behaviour.inject_new_external_addr(&a);
self.external_addrs.add(a, s)
}

Expand All @@ -422,6 +423,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Returns `true` if the address existed and was removed, `false`
/// otherwise.
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
self.behaviour.inject_expired_external_addr(addr);
self.external_addrs.remove(addr)
}

Expand Down Expand Up @@ -565,19 +567,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
this.behaviour.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
this.behaviour.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
this.behaviour.inject_expired_listen_addr(listener_id, addr);
}
this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()),
Expand Down
23 changes: 15 additions & 8 deletions swarm/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ where
pub inject_event: Vec<(PeerId, ConnectionId, <<TInner::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent)>,
pub inject_addr_reach_failure: Vec<(Option<PeerId>, Multiaddr)>,
pub inject_dial_failure: Vec<PeerId>,
pub inject_new_listen_addr: Vec<Multiaddr>,
pub inject_new_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_new_external_addr: Vec<Multiaddr>,
pub inject_expired_listen_addr: Vec<Multiaddr>,
pub inject_expired_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_expired_external_addr: Vec<Multiaddr>,
pub inject_listener_error: Vec<ListenerId>,
pub inject_listener_closed: Vec<(ListenerId, bool)>,
pub poll: usize,
Expand All @@ -141,6 +142,7 @@ where
inject_new_listen_addr: Vec::new(),
inject_new_external_addr: Vec::new(),
inject_expired_listen_addr: Vec::new(),
inject_expired_external_addr: Vec::new(),
inject_listener_error: Vec::new(),
inject_listener_closed: Vec::new(),
poll: 0,
Expand Down Expand Up @@ -217,21 +219,26 @@ where
self.inner.inject_dial_failure(p);
}

fn inject_new_listen_addr(&mut self, a: &Multiaddr) {
self.inject_new_listen_addr.push(a.clone());
self.inner.inject_new_listen_addr(a);
fn inject_new_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_new_listen_addr.push((id, a.clone()));
self.inner.inject_new_listen_addr(id, a);
}

fn inject_expired_listen_addr(&mut self, a: &Multiaddr) {
self.inject_expired_listen_addr.push(a.clone());
self.inner.inject_expired_listen_addr(a);
fn inject_expired_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_expired_listen_addr.push((id, a.clone()));
self.inner.inject_expired_listen_addr(id, a);
}

fn inject_new_external_addr(&mut self, a: &Multiaddr) {
self.inject_new_external_addr.push(a.clone());
self.inner.inject_new_external_addr(a);
}

fn inject_expired_external_addr(&mut self, a: &Multiaddr) {
self.inject_expired_external_addr.push(a.clone());
self.inner.inject_expired_external_addr(a);
}

fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) {
self.inject_listener_error.push(l.clone());
self.inner.inject_listener_error(l, e);
Expand Down
34 changes: 29 additions & 5 deletions swarm/src/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_core::{
ConnectedPoint,
PeerId,
Multiaddr,
connection::ConnectionId,
connection::{ConnectionId, ListenerId},
either::{EitherError, EitherOutput},
upgrade::{DeniedUpgrade, EitherUpgrade}
};
Expand Down Expand Up @@ -110,6 +110,12 @@ where
}
}

fn inject_address_change(&mut self, peer_id: &PeerId, connection: &ConnectionId, old: &ConnectedPoint, new: &ConnectedPoint) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_address_change(peer_id, connection, old, new)
}
}

fn inject_event(
&mut self,
peer_id: PeerId,
Expand All @@ -133,15 +139,15 @@ where
}
}

fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_new_listen_addr(addr)
inner.inject_new_listen_addr(id, addr)
}
}

fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_expired_listen_addr(addr)
inner.inject_expired_listen_addr(id, addr)
}
}

Expand All @@ -151,6 +157,24 @@ where
}
}

fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_expired_external_addr(addr)
}
}

fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_listener_error(id, err)
}
}

fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_listener_closed(id, reason)
}
}

fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters)
-> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Expand Down

0 comments on commit 801f6c1

Please sign in to comment.