-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support client mode in kademlia #2184
Changes from 13 commits
f8cf486
1a042f8
3e759a0
b8cbe62
fc218a0
e58e544
af1fcbd
c944ecb
4be8fca
e213350
e47148d
cdaa0a1
1a09907
46167a1
8bf3ef4
9ef4c4c
c387f41
5b21774
5ea645e
aa6500f
146405f
2cea417
54331e5
9d23741
b202c5b
8234a17
d5e6509
06c1a30
d420fa6
190c3dd
ab97b10
f3259b1
f308b0c
8f33ba1
520fb8e
d52c6af
81eae5f
f0c2fc3
f0bf7df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
#![allow(dead_code, unused_variables)] | ||
|
||
// Use the following command to run: | ||
// cargo run --example kademlia-example --features="tcp-tokio mdns" | ||
// To run in client mode: | ||
// cargo run --example kademlia-example --features="tcp-tokio mdns" --client | ||
|
||
// Based on the following example code: | ||
// https://github.com/zupzup/rust-peer-to-peer-example/blob/main/src/main.rs | ||
|
||
use libp2p::{ | ||
core::upgrade, | ||
identity, | ||
kad::{ | ||
protocol::Mode, record::store::MemoryStore, Kademlia, KademliaConfig, KademliaEvent, | ||
QueryResult, | ||
}, | ||
mdns::{Mdns, MdnsConfig, MdnsEvent}, | ||
mplex, | ||
noise::{Keypair, NoiseConfig, X25519Spec}, | ||
swarm::SwarmEvent, | ||
tcp::TokioTcpConfig, | ||
NetworkBehaviour, PeerId, Swarm, Transport, | ||
}; | ||
|
||
use async_std::io; | ||
use futures::{FutureExt, StreamExt}; | ||
use std::env; | ||
|
||
#[derive(NetworkBehaviour)] | ||
#[behaviour(event_process = false, out_event = "MyOutEvent")] | ||
struct MyBehaviour { | ||
kademlia: Kademlia<MemoryStore>, | ||
// Using MDNS for peer discovery. | ||
mdns: Mdns, | ||
} | ||
|
||
enum MyOutEvent { | ||
Kademlia(KademliaEvent), | ||
Mdns(MdnsEvent), | ||
} | ||
|
||
impl From<KademliaEvent> for MyOutEvent { | ||
fn from(event: KademliaEvent) -> Self { | ||
MyOutEvent::Kademlia(event) | ||
} | ||
} | ||
|
||
impl From<MdnsEvent> for MyOutEvent { | ||
fn from(event: MdnsEvent) -> Self { | ||
MyOutEvent::Mdns(event) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let client_mode: bool = if let Some(kad_mode) = env::args().nth(1) { | ||
kad_mode.starts_with("--client") | ||
} else { | ||
false | ||
}; | ||
|
||
create_peer(client_mode).await | ||
} | ||
|
||
async fn create_peer(client_mode: bool) { | ||
let key = identity::Keypair::generate_ed25519(); | ||
let peer_id = PeerId::from_public_key(&key.public()); | ||
|
||
// Print the current peer ID. | ||
println!("{:?}", peer_id.clone()); | ||
|
||
let auth_keys = Keypair::<X25519Spec>::new() // create new auth keys | ||
.into_authentic(&key) // sign the keys | ||
.unwrap(); | ||
|
||
let transport = TokioTcpConfig::new() | ||
.upgrade(upgrade::Version::V1) // upgrade will only show up if you import `Transport` | ||
.authenticate(NoiseConfig::xx(auth_keys).into_authenticated()) | ||
.multiplex(mplex::MplexConfig::new()) | ||
.boxed(); | ||
|
||
let mut config = KademliaConfig::default(); | ||
|
||
let kad_config = if client_mode { | ||
config.set_mode(Mode::Client); | ||
println!("Setting to client mode."); | ||
config | ||
} else { | ||
config | ||
}; | ||
|
||
let behaviour = MyBehaviour { | ||
kademlia: Kademlia::with_config( | ||
peer_id.clone(), | ||
MemoryStore::new(peer_id.clone()), | ||
kad_config, | ||
), | ||
mdns: Mdns::new(MdnsConfig::default()).await.unwrap(), | ||
}; | ||
|
||
let mut swarm = Swarm::new(transport, behaviour, peer_id.clone()); | ||
|
||
swarm | ||
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) | ||
.unwrap(); | ||
|
||
let stdin = io::stdin(); | ||
let mut buffer = String::new(); | ||
|
||
loop { | ||
futures::select! { | ||
line = stdin.read_line(&mut buffer).fuse() => handle_command(buffer.clone(), &mut swarm), | ||
event = swarm.next() => match event.unwrap() { | ||
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on: {}", address), | ||
SwarmEvent::Behaviour(event) => handle_event(event, &mut swarm), | ||
_ => {} | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn handle_command(command: String, swarm: &mut Swarm<MyBehaviour>) { | ||
if command.contains("list peer") { | ||
list_peers(swarm); | ||
} | ||
} | ||
|
||
fn handle_event(event: MyOutEvent, swarm: &mut Swarm<MyBehaviour>) { | ||
match event { | ||
MyOutEvent::Kademlia(kad_event) => match kad_event { | ||
KademliaEvent::OutboundQueryCompleted { id, result, stats } => todo!(), | ||
_ => {} | ||
}, | ||
MyOutEvent::Mdns(mdns_event) => match mdns_event { | ||
MdnsEvent::Discovered(nodes) => { | ||
for (peer_id, multiaddr) in nodes { | ||
swarm | ||
.behaviour_mut() | ||
.kademlia | ||
.add_address(&peer_id, multiaddr); | ||
} | ||
} | ||
_ => {} | ||
}, | ||
} | ||
} | ||
|
||
fn list_peers(swarm: &mut Swarm<MyBehaviour>) { | ||
for bucket in swarm.behaviour_mut().kademlia.kbuckets() { | ||
if bucket.num_entries() > 0 { | ||
for item in bucket.iter() { | ||
println!("Peer ID: {:?}", item.node.key.preimage()); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
|
||
use crate::protocol::{ | ||
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, | ||
KademliaProtocolConfig, | ||
KademliaProtocolConfig, Mode, | ||
}; | ||
use crate::record::{self, Record}; | ||
use futures::prelude::*; | ||
|
@@ -62,10 +62,15 @@ impl<T: Clone + fmt::Debug + Send + 'static> IntoProtocolsHandler for KademliaHa | |
} | ||
|
||
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol { | ||
if self.config.allow_listening { | ||
upgrade::EitherUpgrade::A(self.config.protocol_config.clone()) | ||
} else { | ||
upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade) | ||
match self.config.client { | ||
Mode::Client => upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), | ||
Mode::Server => { | ||
if self.config.allow_listening { | ||
whereistejas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
upgrade::EitherUpgrade::A(self.config.protocol_config.clone()) | ||
} else { | ||
upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -123,6 +128,9 @@ pub struct KademliaHandlerConfig { | |
|
||
/// Time after which we close an idle connection. | ||
pub idle_timeout: Duration, | ||
|
||
// If Mode::Client, node will act in `client` mode in the Kademlia network. | ||
pub client: Mode, | ||
whereistejas marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
} | ||
|
||
/// State of an active substream, opened either by us or by the remote. | ||
|
@@ -480,11 +488,20 @@ where | |
type InboundOpenInfo = (); | ||
|
||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> { | ||
if self.config.allow_listening { | ||
SubstreamProtocol::new(self.config.protocol_config.clone(), ()) | ||
.map_upgrade(upgrade::EitherUpgrade::A) | ||
} else { | ||
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) | ||
match self.config.client { | ||
Mode::Server => { | ||
if self.config.allow_listening { | ||
SubstreamProtocol::new(self.config.protocol_config.clone(), ()) | ||
.map_upgrade(upgrade::EitherUpgrade::A) | ||
} else { | ||
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) | ||
} | ||
} | ||
// If we are in client mode, I don't want to advertise Kademlia so that other peers will | ||
// not send me Kademlia requests. | ||
Mode::Client => { | ||
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -520,12 +537,16 @@ where | |
self.next_connec_unique_id.0 += 1; | ||
self.substreams | ||
.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); | ||
if let ProtocolStatus::Unconfirmed = self.protocol_status { | ||
// Upon the first successfully negotiated substream, we know that the | ||
// remote is configured with the same protocol name and we want | ||
// the behaviour to add this peer to the routing table, if possible. | ||
self.protocol_status = ProtocolStatus::Confirmed; | ||
} | ||
// TODO: | ||
// Just because another peer is sending us Kademlia requests, doesn't necessarily | ||
// mean that it will answer the Kademlia requests that we send to it. | ||
// Thus, Commenting this code out for now. | ||
// if let ProtocolStatus::Unconfirmed = self.protocol_status { | ||
// // Upon the first successfully negotiated substream, we know that the | ||
// // remote is configured with the same protocol name and we want | ||
// // the behaviour to add this peer to the routing table, if possible. | ||
// self.protocol_status = ProtocolStatus::Confirmed; | ||
// } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. I am in favor of documenting this, though obviously both the |
||
} | ||
|
||
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) { | ||
|
@@ -763,6 +784,7 @@ impl Default for KademliaHandlerConfig { | |
protocol_config: Default::default(), | ||
allow_listening: true, | ||
idle_timeout: Duration::from_secs(10), | ||
client: Mode::default(), | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -46,6 +46,18 @@ pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0"; | |||||||
/// The default maximum size for a varint length-delimited packet. | ||||||||
pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024; | ||||||||
|
||||||||
#[derive(Debug, Clone)] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
pub enum Mode { | ||||||||
Client, | ||||||||
Server, | ||||||||
} | ||||||||
|
||||||||
impl Default for Mode { | ||||||||
fn default() -> Self { | ||||||||
Mode::Server | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/// Status of our connection to a node reported by the Kademlia protocol. | ||||||||
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] | ||||||||
pub enum KadConnectionType { | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of an example, could you add a test to
kad/src/behaviour/tests.rs
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing this test in the
kademlia
module will be kind of difficult, as I'm using a lot of other things as well, for building the swarm or transport. Is it okay if I put this is in a test directory?I can further enhance
kademlia-example
to create four peers and cross-check if the peer inclient
mode is actually, absent from the other's routing tables.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for most of it. You can take the
manual_bucket_insert
andbootstrap
test as an example.I don't think there is any need for an example. The "cross-check" sounds great, but should really be done in a test instead of an example.