Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess #2784

Merged
merged 12 commits into from
Aug 16, 2022
Merged
80 changes: 45 additions & 35 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use libp2p::{
mdns::{Mdns, MdnsEvent},
mplex,
noise,
swarm::{dial_opts::DialOpts, NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
swarm::{SwarmBuilder, SwarmEvent},
// `TokioTcpTransport` is available through the `tcp-tokio` feature.
tcp::TokioTcpTransport,
Multiaddr,
Expand Down Expand Up @@ -82,47 +82,29 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");

// We create a custom network behaviour that combines floodsub and mDNS.
// The derive generates a delegating `NetworkBehaviour` impl which in turn
// requires the implementations of `NetworkBehaviourEventProcess` for
// the events of each behaviour.
// We create a custom behaviour that combines floodsub and mDNS.
// The derive generates a delegating `NetworkBehaviour` impl.
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: Mdns,
}

impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
// Called when `floodsub` produces an event.
fn inject_event(&mut self, message: FloodsubEvent) {
if let FloodsubEvent::Message(message) = message {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
enum MyBehaviourEvent {
Floodsub(FloodsubEvent),
Mdns(MdnsEvent),
}

impl From<FloodsubEvent> for MyBehaviourEvent {
fn from(event: FloodsubEvent) -> Self {
MyBehaviourEvent::Floodsub(event)
}
}

impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
impl From<MdnsEvent> for MyBehaviourEvent {
fn from(event: MdnsEvent) -> Self {
MyBehaviourEvent::Mdns(event)
}
}

Expand Down Expand Up @@ -166,8 +148,36 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
}
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(event)) => {
if let FloodsubEvent::Message(message) = event {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !swarm.behaviour().mdns.has_node(&peer) {
swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
_ => {}
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let floodsub_topic = floodsub::Topic::new("chat");

// We create a custom network behaviour that combines floodsub and mDNS.
// In the future, we want to improve libp2p to make this easier to do.
// Use the derive to generate delegating NetworkBehaviour impl and require the
// NetworkBehaviourEventProcess implementations below.
// Use the derive to generate delegating NetworkBehaviour impl.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent")]
struct MyBehaviour {
Expand Down
94 changes: 49 additions & 45 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use libp2p::kad::{
use libp2p::{
development_transport, identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
swarm::SwarmEvent,
NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;
Expand All @@ -68,28 +68,60 @@ async fn main() -> Result<(), Box<dyn Error>> {

// We create a custom network behaviour that combines Kademlia and mDNS.
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
}

impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
if let MdnsEvent::Discovered(list) = event {
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
}
enum MyBehaviourEvent {
Kademlia(KademliaEvent),
Mdns(MdnsEvent),
}

impl From<KademliaEvent> for MyBehaviourEvent {
fn from(event: KademliaEvent) -> Self {
MyBehaviourEvent::Kademlia(event)
}
}

impl From<MdnsEvent> for MyBehaviourEvent {
fn from(event: MdnsEvent) -> Self {
MyBehaviourEvent::Mdns(event)
}
}

impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
println!(
Expand Down Expand Up @@ -137,38 +169,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
},
_ => {}
}
}
_ => {}
}
}

// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
_ => {}
}
}
}
}
Expand Down
Loading