diff --git a/Cargo.lock b/Cargo.lock index bf5d5f9707..8420b0ff40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,19 +476,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "ckb-discovery" -version = "0.31.0-pre" -dependencies = [ - "ckb-logger", - "ckb-types", - "futures 0.3.4", - "rand 0.6.5", - "tentacle", - "tokio 0.2.17", - "tokio-util", -] - [[package]] name = "ckb-error" version = "0.31.0-pre" @@ -555,15 +542,6 @@ dependencies = [ "blake2b-rs", ] -[[package]] -name = "ckb-identify" -version = "0.31.0-pre" -dependencies = [ - "ckb-logger", - "ckb-types", - "tentacle", -] - [[package]] name = "ckb-indexer" version = "0.31.0-pre" @@ -689,11 +667,8 @@ version = "0.31.0-pre" dependencies = [ "bs58", "ckb-build-info", - "ckb-discovery", "ckb-hash", - "ckb-identify", "ckb-logger", - "ckb-ping", "ckb-stop-handler", "ckb-types", "ckb-util", @@ -776,16 +751,6 @@ dependencies = [ "syn 0.15.29", ] -[[package]] -name = "ckb-ping" -version = "0.31.0-pre" -dependencies = [ - "ckb-logger", - "ckb-types", - "futures 0.3.4", - "tentacle", -] - [[package]] name = "ckb-pow" version = "0.31.0-pre" diff --git a/Cargo.toml b/Cargo.toml index ed3954c431..6dad97b771 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,9 +62,6 @@ members = [ "benches", "error", "notify", - "protocols/ping", - "protocols/identify", - "protocols/discovery", ] [profile.release] diff --git a/network/Cargo.toml b/network/Cargo.toml index 237f485d07..872e9edff0 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -11,14 +11,11 @@ serde = { version = "1.0", features = ["derive"] } ckb-util = { path = "../util" } ckb-stop-handler = { path = "../util/stop-handler" } ckb-logger = { path = "../util/logger" } -tokio = { version = "0.2.11", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking"] } +tokio = { version = "0.2.11", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking", "stream"] } tokio-util = { version = "0.2.0", features = ["codec"] } futures = "0.3" crossbeam-channel = "0.3" p2p = { version="0.3.0-alpha.2", package="tentacle", features = ["molc"] } -p2p-ping = { package="ckb-ping", path = "../protocols/ping" } -p2p-discovery = { package="ckb-discovery", path = "../protocols/discovery" } -p2p-identify = { package="ckb-identify", path = "../protocols/identify" } faketime = "0.2.0" lazy_static = "1.3.0" bs58 = "0.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index 664355ca3e..9722ba5f6c 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -6,10 +6,10 @@ use crate::peer_store::{ }; use crate::protocols::{ disconnect_message::DisconnectMessageProtocol, - discovery::{DiscoveryProtocol, DiscoveryService}, + discovery::DiscoveryProtocol, feeler::Feeler, - identify::IdentifyCallback, - ping::PingService, + identify::{IdentifyCallback, IdentifyProtocol}, + ping::{PingHandler, PingService}, }; use crate::services::{ dns_seeding::DnsSeedingService, dump_peer_store::DumpPeerStoreService, @@ -25,10 +25,7 @@ use ckb_logger::{debug, error, info, trace, warn}; use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{ - channel::{ - mpsc::{self, channel}, - oneshot, - }, + channel::{mpsc::channel, oneshot}, Future, StreamExt, }; use ipnetwork::IpNetwork; @@ -47,8 +44,6 @@ use p2p::{ utils::extract_peer_id, SessionId, }; -use p2p_identify::IdentifyProtocol; -use p2p_ping::PingHandler; use std::{ cmp::max, collections::{HashMap, HashSet}, @@ -881,7 +876,7 @@ impl NetworkService { .build(); // Discovery protocol - let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_network_state = Arc::clone(&network_state); let disc_meta = MetaBuilder::default() .id(DISCOVERY_PROTOCOL_ID.into()) .name(move |_| "/ckb/discovery".to_string()) @@ -893,10 +888,10 @@ impl NetworkService { ) }) .service_handle(move || { - ProtocolHandle::Both(Box::new( - DiscoveryProtocol::new(disc_sender.clone()) - .global_ip_only(!config.discovery_local_address), - )) + ProtocolHandle::Both(Box::new(DiscoveryProtocol::new( + disc_network_state, + config.discovery_local_address, + ))) }) .build(); @@ -977,11 +972,6 @@ impl NetworkService { .build(event_handler); // == Build background service tasks - let disc_service = DiscoveryService::new( - Arc::clone(&network_state), - disc_receiver, - config.discovery_local_address, - ); let mut ping_service = PingService::new( Arc::clone(&network_state), p2p_service.control().to_owned(), @@ -1001,7 +991,6 @@ impl NetworkService { } } }) as Pin>, - Box::pin(disc_service) as Pin>, Box::pin(dump_peer_store_service) as Pin>, Box::pin(protocol_type_checker_service) as Pin>, ]; diff --git a/network/src/protocols/discovery.rs b/network/src/protocols/discovery.rs deleted file mode 100644 index 7023276da9..0000000000 --- a/network/src/protocols/discovery.rs +++ /dev/null @@ -1,324 +0,0 @@ -// use crate::peer_store::Behaviour; -use crate::NetworkState; -use ckb_logger::{debug, error, trace, warn}; -use crossbeam_channel::{self, bounded}; -use futures::{channel::mpsc, Future, FutureExt, StreamExt}; -use std::{ - collections::HashMap, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - time::Duration, -}; - -use p2p::{ - bytes::Bytes, - context::{ProtocolContext, ProtocolContextMutRef}, - multiaddr::Multiaddr, - secio::PeerId, - traits::ServiceProtocol, - utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr}, - SessionId, -}; -use p2p_discovery::{ - AddressManager, Discovery, DiscoveryHandle, MisbehaveResult, Misbehavior, Substream, -}; - -pub struct DiscoveryProtocol { - discovery: Option>, - discovery_handle: DiscoveryHandle, - discovery_senders: HashMap>>, - event_sender: mpsc::UnboundedSender, -} - -impl DiscoveryProtocol { - pub fn new(event_sender: mpsc::UnboundedSender) -> DiscoveryProtocol { - let addr_mgr = DiscoveryAddressManager { - event_sender: event_sender.clone(), - }; - let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7))); - let discovery_handle = discovery.handle(); - DiscoveryProtocol { - discovery: Some(discovery), - discovery_handle, - discovery_senders: HashMap::default(), - event_sender, - } - } - - pub fn global_ip_only(mut self, global_ip_only: bool) -> Self { - self.discovery = self - .discovery - .map(move |protocol| protocol.global_ip_only(global_ip_only)); - self - } -} - -impl ServiceProtocol for DiscoveryProtocol { - fn init(&mut self, context: &mut ProtocolContext) { - debug!("protocol [discovery({})]: init", context.proto_id); - - let discovery_task = self - .discovery - .take() - .map(|mut discovery| { - debug!("Start discovery future_task"); - async move { - loop { - if discovery.next().await.is_none() { - warn!("discovery stream shutdown"); - break; - } - } - } - .boxed() - }) - .expect("Discovery init only once"); - if let Err(err) = context.future_task(discovery_task) { - error!("Start discovery_task failed: {:?}", err); - } - } - - fn connected(&mut self, context: ProtocolContextMutRef, _: &str) { - let session = context.session; - debug!( - "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", - session.id, session.address, session.ty - ); - let event = DiscoveryEvent::Connected { - session_id: session.id, - peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()), - }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (ServiceProtocol::connected)"); - return; - } - - let (sender, receiver) = mpsc::channel(8); - self.discovery_senders.insert(session.id, sender); - let substream = Substream::new(context, receiver); - match self.discovery_handle.substream_sender.try_send(substream) { - Ok(_) => { - debug!("Send substream success"); - } - Err(err) => { - // TODO: handle channel is full (wait for poll API?) - warn!("Send substream failed : {:?}", err); - } - } - } - - fn disconnected(&mut self, context: ProtocolContextMutRef) { - let session = context.session; - let event = DiscoveryEvent::Disconnected(session.id); - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (ServiceProtocol::disconnected)"); - return; - } - self.discovery_senders.remove(&session.id); - debug!("protocol [discovery] close on session [{}]", session.id); - } - - fn received(&mut self, context: ProtocolContextMutRef, data: Bytes) { - let session = context.session; - trace!("[received message]: length={}", data.len()); - - if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) { - // TODO: handle channel is full (wait for poll API?) - if let Err(err) = sender.try_send(data.to_vec()) { - if err.is_full() { - warn!("channel is full"); - } else if err.is_disconnected() { - warn!("channel is disconnected"); - } else { - warn!("other channel error: {:?}", err); - } - self.discovery_senders.remove(&session.id); - } - } - } -} - -pub enum DiscoveryEvent { - Connected { - session_id: SessionId, - peer_id: Option, - }, - Disconnected(SessionId), - AddNewAddrs { - session_id: SessionId, - addrs: Vec, - }, - Misbehave { - session_id: SessionId, - kind: Misbehavior, - result: crossbeam_channel::Sender, - }, - GetRandom { - n: usize, - result: crossbeam_channel::Sender>, - }, -} - -pub struct DiscoveryService { - event_receiver: mpsc::UnboundedReceiver, - network_state: Arc, - sessions: HashMap, - discovery_local_address: bool, -} - -impl DiscoveryService { - pub fn new( - network_state: Arc, - event_receiver: mpsc::UnboundedReceiver, - discovery_local_address: bool, - ) -> DiscoveryService { - DiscoveryService { - event_receiver, - network_state, - sessions: HashMap::default(), - discovery_local_address, - } - } - - fn is_valid_addr(&self, addr: &Multiaddr) -> bool { - if !self.discovery_local_address { - let local_or_invalid = multiaddr_to_socketaddr(&addr) - .map(|socket_addr| !is_reachable(socket_addr.ip())) - .unwrap_or(true); - !local_or_invalid - } else { - true - } - } - - fn handle_event(&mut self, event: DiscoveryEvent) { - match event { - DiscoveryEvent::Connected { - session_id, - peer_id, - } => { - if let Some(peer_id) = peer_id { - self.sessions.insert(session_id, peer_id); - } - } - DiscoveryEvent::Disconnected(session_id) => { - self.sessions.remove(&session_id); - } - DiscoveryEvent::AddNewAddrs { session_id, addrs } => { - if let Some(_peer_id) = self.sessions.get(&session_id) { - // TODO: wait for peer store update - for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) { - trace!("Add discovered address:{:?}", addr); - if let Some(peer_id) = extract_peer_id(&addr) { - self.network_state.with_peer_store_mut(|peer_store| { - if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { - debug!( - "Failed to add discoved address to peer_store {:?} {:?}", - err, peer_id - ); - } - }); - } - } - } - } - DiscoveryEvent::Misbehave { - session_id: _session_id, - kind: _kind, - result: _result, - } => { - // FIXME: - } - DiscoveryEvent::GetRandom { n, result } => { - let fetch_random_addrs = self - .network_state - .with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n)); - let addrs = fetch_random_addrs - .into_iter() - .filter_map(|paddr| { - if !self.is_valid_addr(&paddr.addr) { - return None; - } - match paddr.multiaddr() { - Ok(addr) => Some(addr), - Err(err) => { - error!("return discovery addresses error: {:?}", err); - None - } - } - }) - .collect(); - trace!("discovery send random addrs: {:?}", addrs); - result - .send(addrs) - .expect("Send failed (should not happened)"); - } - } - } -} - -impl Future for DiscoveryService { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match self.event_receiver.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - self.handle_event(event); - } - Poll::Ready(None) => { - debug!("discovery service shutdown"); - return Poll::Ready(()); - } - Poll::Pending => break, - } - } - Poll::Pending - } -} - -pub struct DiscoveryAddressManager { - pub event_sender: mpsc::UnboundedSender, -} - -impl AddressManager for DiscoveryAddressManager { - fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { - self.add_new_addrs(session_id, vec![addr]) - } - - fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { - if addrs.is_empty() { - return; - } - let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::add_new_addrs)"); - } - } - - fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { - let (sender, receiver) = bounded(1); - let event = DiscoveryEvent::Misbehave { - session_id, - kind, - result: sender, - }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::misbehave)"); - MisbehaveResult::Disconnect - } else { - tokio::task::block_in_place(|| receiver.recv().unwrap_or(MisbehaveResult::Disconnect)) - } - } - - fn get_random(&mut self, n: usize) -> Vec { - let (sender, receiver) = bounded(1); - let event = DiscoveryEvent::GetRandom { n, result: sender }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::get_random)"); - Vec::new() - } else { - tokio::task::block_in_place(|| receiver.recv().ok().unwrap_or_else(Vec::new)) - } - } -} diff --git a/protocols/discovery/src/addr.rs b/network/src/protocols/discovery/addr.rs similarity index 77% rename from protocols/discovery/src/addr.rs rename to network/src/protocols/discovery/addr.rs index ebcd3fed48..d4975bad1e 100644 --- a/protocols/discovery/src/addr.rs +++ b/network/src/protocols/discovery/addr.rs @@ -6,11 +6,7 @@ use std::{ time::Instant, }; -use p2p::{ - multiaddr::Multiaddr, - utils::{is_reachable, multiaddr_to_socketaddr}, - SessionId, -}; +use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr, SessionId}; // See: bitcoin/netaddress.cpp pchIPv4[12] pub(crate) const PCH_IPV4: [u8; 18] = [ @@ -33,23 +29,15 @@ pub enum Misbehavior { /// Misbehavior report result pub enum MisbehaveResult { - /// Continue to run - Continue, /// Disconnect this peer Disconnect, } impl MisbehaveResult { - pub fn is_continue(&self) -> bool { - match self { - MisbehaveResult::Continue => true, - _ => false, - } - } pub fn is_disconnect(&self) -> bool { match self { MisbehaveResult::Disconnect => true, - _ => false, + // _ => false, } } } @@ -158,37 +146,3 @@ impl From for RawAddr { RawAddr(data) } } - -impl RawAddr { - pub fn socket_addr(&self) -> SocketAddr { - SocketAddr::new(self.ip(), self.port()) - } - - pub fn ip(&self) -> IpAddr { - let mut is_ipv4 = true; - for (i, value) in PCH_IPV4.iter().enumerate().take(12) { - if self.0[i] != *value { - is_ipv4 = false; - break; - } - } - if is_ipv4 { - let mut buf = [0u8; 4]; - buf.copy_from_slice(&self.0[12..16]); - From::from(buf) - } else { - let mut buf = [0u8; 16]; - buf.copy_from_slice(&self.0[0..16]); - From::from(buf) - } - } - - pub fn port(&self) -> u16 { - 0x100 * u16::from(self.0[16]) + u16::from(self.0[17]) - } - - // Copy from std::net::IpAddr::is_global - pub fn is_reachable(&self) -> bool { - is_reachable(self.socket_addr().ip()) - } -} diff --git a/protocols/discovery/src/lib.rs b/network/src/protocols/discovery/mod.rs similarity index 70% rename from protocols/discovery/src/lib.rs rename to network/src/protocols/discovery/mod.rs index c769965977..0a0e62e0e3 100644 --- a/protocols/discovery/src/lib.rs +++ b/network/src/protocols/discovery/mod.rs @@ -1,29 +1,34 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use futures::{ + channel::mpsc::{self, channel, Receiver, Sender}, + prelude::*, + stream::FusedStream, + Stream, +}; use std::{ + collections::{HashMap, HashSet, VecDeque}, convert::TryFrom, pin::Pin, + sync::Arc, task::{Context, Poll}, + time::{Duration, Instant}, }; -use ckb_logger::{debug, warn}; -use futures::{ - channel::mpsc::{channel, Receiver, Sender}, - prelude::*, - stream::FusedStream, - Stream, -}; +use ckb_logger::{debug, error, trace, warn}; +use ckb_util::RwLock; use p2p::{ bytes, context::{ProtocolContext, ProtocolContextMutRef}, multiaddr::Multiaddr, + secio::PeerId, traits::ServiceProtocol, - utils::{is_reachable, multiaddr_to_socketaddr}, + utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr}, SessionId, }; use rand::seq::SliceRandom; use tokio::time::Interval; -use std::time::{Duration, Instant}; +// use crate::peer_store::Behaviour; +use crate::NetworkState; const CHECK_INTERVAL: Duration = Duration::from_secs(3); @@ -31,103 +36,13 @@ mod addr; mod protocol; mod substream; -pub use crate::{ +use self::{addr::DEFAULT_MAX_KNOWN, substream::RemoteAddress}; +pub use self::{ addr::{AddrKnown, AddressManager, MisbehaveResult, Misbehavior, RawAddr}, protocol::{DiscoveryMessage, Node, Nodes}, substream::{Substream, SubstreamKey, SubstreamValue}, }; -use crate::{addr::DEFAULT_MAX_KNOWN, substream::RemoteAddress}; - -pub struct DiscoveryProtocol { - discovery: Option>, - discovery_handle: DiscoveryHandle, - discovery_senders: HashMap>>, -} - -impl DiscoveryProtocol { - pub fn new(discovery: Discovery) -> DiscoveryProtocol { - let discovery_handle = discovery.handle(); - DiscoveryProtocol { - discovery: Some(discovery), - discovery_handle, - discovery_senders: HashMap::default(), - } - } -} - -impl ServiceProtocol for DiscoveryProtocol { - fn init(&mut self, context: &mut ProtocolContext) { - debug!("protocol [discovery({})]: init", context.proto_id); - - let discovery_task = self - .discovery - .take() - .map(|mut discovery| { - debug!("Start discovery future_task"); - async move { - loop { - if discovery.next().await.is_none() { - warn!("discovery stream shutdown"); - break; - } - } - } - .boxed() - }) - .unwrap(); - if context.future_task(discovery_task).is_err() { - warn!("start discovery fail"); - }; - } - - fn connected(&mut self, context: ProtocolContextMutRef, _: &str) { - let session = context.session; - debug!( - "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", - session.id, session.address, session.ty - ); - - let (sender, receiver) = channel(8); - self.discovery_senders.insert(session.id, sender); - let substream = Substream::new(context, receiver); - match self.discovery_handle.substream_sender.try_send(substream) { - Ok(_) => { - debug!("Send substream success"); - } - Err(err) => { - // TODO: handle channel is full (wait for poll API?) - warn!("Send substream failed : {:?}", err); - } - } - } - - fn disconnected(&mut self, context: ProtocolContextMutRef) { - self.discovery_senders.remove(&context.session.id); - debug!( - "protocol [discovery] close on session [{}]", - context.session.id - ); - } - - fn received(&mut self, context: ProtocolContextMutRef, data: bytes::Bytes) { - debug!("[received message]: length={}", data.len()); - - if let Some(ref mut sender) = self.discovery_senders.get_mut(&context.session.id) { - // TODO: handle channel is full (wait for poll API?) - if let Err(err) = sender.try_send(data.to_vec()) { - if err.is_full() { - warn!("channel is full"); - } else if err.is_disconnected() { - warn!("channel is disconnected"); - } else { - warn!("other channel error: {:?}", err); - } - } - } - } -} - pub struct Discovery { // Default: 5000 max_known: usize, @@ -184,10 +99,6 @@ impl Discovery { self } - pub fn addr_mgr(&self) -> &M { - &self.addr_mgr - } - pub fn handle(&self) -> DiscoveryHandle { DiscoveryHandle { substream_sender: self.substream_sender.clone(), @@ -206,7 +117,7 @@ impl Discovery { { Poll::Ready(Some(substream)) => { let key = substream.key(); - debug!("Received a substream: key={:?}", key); + trace!("Received a substream: key={:?}", key); let value = SubstreamValue::new( key.direction, substream, @@ -217,7 +128,7 @@ impl Discovery { } Poll::Ready(None) => unreachable!(), Poll::Pending => { - debug!("Discovery.substream_receiver Async::NotReady"); + trace!("Discovery.substream_receiver Async::NotReady"); break; } } @@ -233,7 +144,7 @@ impl Discovery { match Pin::new(&mut interval).as_mut().poll_next(cx) { Poll::Ready(Some(_)) => {} Poll::Ready(None) => { - debug!("Discovery check_interval poll finished"); + trace!("Discovery check_interval poll finished"); break; } Poll::Pending => break, @@ -343,7 +254,7 @@ impl Stream for Discovery { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - debug!("Discovery.poll()"); + trace!("Discovery.poll()"); self.recv_substreams(cx); self.check_interval(cx); @@ -355,14 +266,14 @@ impl Stream for Discovery { let mut rng = rand::thread_rng(); let mut remain_keys = self.substreams.keys().cloned().collect::>(); - debug!("announce_multiaddrs: {:?}", announce_multiaddrs); + trace!("announce_multiaddrs: {:?}", announce_multiaddrs); for announce_multiaddr in announce_multiaddrs.into_iter() { let announce_addr = RawAddr::try_from(announce_multiaddr.clone()).unwrap(); remain_keys.shuffle(&mut rng); for i in 0..2 { if let Some(key) = remain_keys.get(i) { if let Some(value) = self.substreams.get_mut(key) { - debug!( + trace!( ">> send {} to: {:?}, contains: {}", announce_multiaddr, value.remote_addr, @@ -395,3 +306,186 @@ impl Stream for Discovery { } } } + +pub struct DiscoveryProtocol { + discovery: Option>, + discovery_handle: DiscoveryHandle, + discovery_senders: HashMap>>, + sessions: Arc>>, +} + +impl DiscoveryProtocol { + pub fn new( + network_state: Arc, + discovery_local_address: bool, + ) -> DiscoveryProtocol { + let sessions = Arc::new(RwLock::new(HashMap::default())); + let addr_mgr = DiscoveryAddressManager { + sessions: Arc::clone(&sessions), + network_state, + discovery_local_address, + }; + let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7))) + .global_ip_only(!discovery_local_address); + let discovery_handle = discovery.handle(); + DiscoveryProtocol { + discovery: Some(discovery), + discovery_handle, + discovery_senders: HashMap::default(), + sessions, + } + } +} + +impl ServiceProtocol for DiscoveryProtocol { + fn init(&mut self, context: &mut ProtocolContext) { + debug!("protocol [discovery({})]: init", context.proto_id); + + let discovery_task = self + .discovery + .take() + .map(|mut discovery| { + debug!("Start discovery future_task"); + async move { + loop { + if discovery.next().await.is_none() { + warn!("discovery stream shutdown"); + break; + } + } + } + .boxed() + }) + .expect("Discovery init only once"); + if let Err(err) = context.future_task(discovery_task) { + error!("Start discovery_task failed: {:?}", err); + } + } + + fn connected(&mut self, context: ProtocolContextMutRef, _: &str) { + let session = context.session; + debug!( + "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", + session.id, session.address, session.ty + ); + + if let Some(pubkey) = session.remote_pubkey.as_ref() { + self.sessions.write().insert(session.id, pubkey.peer_id()); + } + + let (sender, receiver) = mpsc::channel(8); + self.discovery_senders.insert(session.id, sender); + let substream = Substream::new(context, receiver); + match self.discovery_handle.substream_sender.try_send(substream) { + Ok(_) => { + trace!("Send substream success"); + } + Err(err) => { + // TODO: handle channel is full (wait for poll API?) + warn!("Send substream failed : {:?}", err); + } + } + } + + fn disconnected(&mut self, context: ProtocolContextMutRef) { + let session = context.session; + self.sessions.write().remove(&session.id); + self.discovery_senders.remove(&session.id); + debug!("protocol [discovery] close on session [{}]", session.id); + } + + fn received(&mut self, context: ProtocolContextMutRef, data: bytes::Bytes) { + let session = context.session; + trace!("[received message]: length={}", data.len()); + + if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) { + // TODO: handle channel is full (wait for poll API?) + if let Err(err) = sender.try_send(data.to_vec()) { + if err.is_full() { + warn!("channel is full"); + } else if err.is_disconnected() { + warn!("channel is disconnected"); + } else { + warn!("other channel error: {:?}", err); + } + self.discovery_senders.remove(&session.id); + } + } + } +} + +pub struct DiscoveryAddressManager { + network_state: Arc, + sessions: Arc>>, + discovery_local_address: bool, +} + +impl DiscoveryAddressManager { + fn is_valid_addr(&self, addr: &Multiaddr) -> bool { + if !self.discovery_local_address { + let local_or_invalid = multiaddr_to_socketaddr(&addr) + .map(|socket_addr| !is_reachable(socket_addr.ip())) + .unwrap_or(true); + !local_or_invalid + } else { + true + } + } +} + +impl AddressManager for DiscoveryAddressManager { + fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) { + self.add_new_addrs(session_id, vec![addr]) + } + + fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { + if addrs.is_empty() { + return; + } + + if let Some(_peer_id) = self.sessions.read().get(&session_id) { + // TODO: wait for peer store update + for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) { + trace!("Add discovered address:{:?}", addr); + if let Some(peer_id) = extract_peer_id(&addr) { + self.network_state.with_peer_store_mut(|peer_store| { + if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { + debug!( + "Failed to add discoved address to peer_store {:?} {:?}", + err, peer_id + ); + } + }); + } + } + } + } + + fn misbehave(&mut self, _session_id: SessionId, _kind: Misbehavior) -> MisbehaveResult { + // FIXME: + MisbehaveResult::Disconnect + } + + fn get_random(&mut self, n: usize) -> Vec { + let fetch_random_addrs = self + .network_state + .with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n)); + let addrs = fetch_random_addrs + .into_iter() + .filter_map(|paddr| { + if !self.is_valid_addr(&paddr.addr) { + return None; + } + match paddr.multiaddr() { + Ok(addr) => Some(addr), + Err(err) => { + error!("return discovery addresses error: {:?}", err); + None + } + } + }) + .collect(); + trace!("discovery send random addrs: {:?}", addrs); + addrs + } +} diff --git a/protocols/discovery/src/protocol.rs b/network/src/protocols/discovery/protocol.rs similarity index 100% rename from protocols/discovery/src/protocol.rs rename to network/src/protocols/discovery/protocol.rs diff --git a/protocols/discovery/src/substream.rs b/network/src/protocols/discovery/substream.rs similarity index 98% rename from protocols/discovery/src/substream.rs rename to network/src/protocols/discovery/substream.rs index ac72715c76..8b22f489a1 100644 --- a/protocols/discovery/src/substream.rs +++ b/network/src/protocols/discovery/substream.rs @@ -21,8 +21,8 @@ use p2p::{ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; -use crate::addr::{AddrKnown, AddressManager, Misbehavior, RawAddr}; -use crate::protocol::{DiscoveryCodec, DiscoveryMessage, Node, Nodes}; +use super::addr::{AddrKnown, AddressManager, Misbehavior, RawAddr}; +use super::protocol::{DiscoveryCodec, DiscoveryMessage, Node, Nodes}; // FIXME: should be a more high level version number const VERSION: u32 = 0; @@ -177,7 +177,7 @@ impl SubstreamValue { .map(|time| time.elapsed() > self.announce_interval) .unwrap_or(true) { - debug!("announce this session: {:?}", self.session_id); + trace!("announce this session: {:?}", self.session_id); self.announce = true; } } @@ -186,7 +186,7 @@ impl SubstreamValue { let mut sink = Pin::new(&mut self.framed_stream); while let Some(message) = self.pending_messages.pop_front() { - debug!("Discovery sending message: {}", message); + trace!("Discovery sending message: {}", message); match sink.as_mut().poll_ready(cx)? { Poll::Pending => { diff --git a/network/src/protocols/identify.rs b/network/src/protocols/identify.rs deleted file mode 100644 index 182344f9f1..0000000000 --- a/network/src/protocols/identify.rs +++ /dev/null @@ -1,270 +0,0 @@ -// use crate::peer_store::Behaviour; -use crate::{network::FEELER_PROTOCOL_ID, NetworkState, PeerIdentifyInfo}; -use ckb_logger::{debug, trace}; -use ckb_types::{bytes::Bytes, packed, prelude::*}; -use p2p::{ - context::ProtocolContextMutRef, - multiaddr::{Multiaddr, Protocol}, - secio::{PeerId, PublicKey}, - service::{SessionType, TargetProtocol}, - utils::{is_reachable, multiaddr_to_socketaddr}, -}; -use p2p_identify::{Callback, MisbehaveResult, Misbehavior}; -use std::{sync::Arc, time::Duration}; - -const MAX_RETURN_LISTEN_ADDRS: usize = 10; -const BAN_ON_NOT_SAME_NET: Duration = Duration::from_secs(5 * 60); - -#[derive(Clone)] -pub(crate) struct IdentifyCallback { - network_state: Arc, - identify: Identify, -} - -impl IdentifyCallback { - pub(crate) fn new( - network_state: Arc, - name: String, - client_version: String, - ) -> IdentifyCallback { - let flags = Flags(Flag::FullNode as u64); - - IdentifyCallback { - network_state, - identify: Identify::new(name, flags, client_version), - } - } - - fn listen_addrs(&self) -> Vec { - let mut addrs = self.network_state.public_addrs(MAX_RETURN_LISTEN_ADDRS * 2); - addrs.sort_by(|a, b| a.1.cmp(&b.1)); - addrs - .into_iter() - .take(MAX_RETURN_LISTEN_ADDRS) - .map(|(addr, _)| addr) - .collect::>() - } -} - -impl Callback for IdentifyCallback { - fn identify(&mut self) -> &[u8] { - self.identify.encode() - } - - fn received_identify( - &mut self, - context: &mut ProtocolContextMutRef, - identify: &[u8], - ) -> MisbehaveResult { - match self.identify.verify(identify) { - None => { - self.network_state.ban_session( - context.control(), - context.session.id, - BAN_ON_NOT_SAME_NET, - "The nodes are not on the same network".to_string(), - ); - MisbehaveResult::Disconnect - } - Some((flags, client_version)) => { - let registry_client_version = |version: String| { - self.network_state.with_peer_registry_mut(|registry| { - if let Some(peer) = registry.get_peer_mut(context.session.id) { - peer.identify_info = Some(PeerIdentifyInfo { - client_version: version, - }) - } - }); - }; - - if context.session.ty.is_outbound() { - let peer_id = context - .session - .remote_pubkey - .as_ref() - .map(PublicKey::peer_id) - .expect("Secio must enabled"); - if self - .network_state - .with_peer_registry(|reg| reg.is_feeler(&peer_id)) - { - let _ = context.open_protocols( - context.session.id, - TargetProtocol::Single(FEELER_PROTOCOL_ID.into()), - ); - } else if flags.contains(self.identify.flags) { - registry_client_version(client_version); - - // The remote end can support all local protocols. - let protos = self - .network_state - .get_protocol_ids(|id| id != FEELER_PROTOCOL_ID.into()); - - let _ = context - .open_protocols(context.session.id, TargetProtocol::Multi(protos)); - } else { - // The remote end cannot support all local protocols. - return MisbehaveResult::Disconnect; - } - } else { - registry_client_version(client_version); - } - MisbehaveResult::Continue - } - } - } - - /// Get local listen addresses - fn local_listen_addrs(&mut self) -> Vec { - self.listen_addrs() - } - - fn add_remote_listen_addrs(&mut self, peer_id: &PeerId, addrs: Vec) { - trace!( - "got remote listen addrs from peer_id={:?}, addrs={:?}", - peer_id, - addrs, - ); - self.network_state.with_peer_registry_mut(|reg| { - if let Some(peer) = reg - .get_key_by_peer_id(peer_id) - .and_then(|session_id| reg.get_peer_mut(session_id)) - { - peer.listened_addrs = addrs.clone(); - } - }); - self.network_state.with_peer_store_mut(|peer_store| { - for addr in addrs { - if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { - debug!("Failed to add addrs to peer_store {:?} {:?}", err, peer_id); - } - } - }) - } - - fn add_observed_addr( - &mut self, - peer_id: &PeerId, - addr: Multiaddr, - ty: SessionType, - ) -> MisbehaveResult { - debug!( - "peer({:?}, {:?}) reported observed addr {}", - peer_id, ty, addr, - ); - - if ty.is_inbound() { - // The address already been discovered by other peer - return MisbehaveResult::Continue; - } - - // observed addr is not a reachable ip - if !multiaddr_to_socketaddr(&addr) - .map(|socket_addr| is_reachable(socket_addr.ip())) - .unwrap_or(false) - { - return MisbehaveResult::Continue; - } - - let observed_addrs_iter = self - .listen_addrs() - .into_iter() - .filter_map(|listen_addr| multiaddr_to_socketaddr(&listen_addr)) - .map(|socket_addr| { - addr.iter() - .filter_map(|proto| match proto { - Protocol::P2P(_) => None, - Protocol::TCP(_) => Some(Protocol::TCP(socket_addr.port())), - value => Some(value), - }) - .collect::() - }); - self.network_state.add_observed_addrs(observed_addrs_iter); - // NOTE: for future usage - MisbehaveResult::Continue - } - - fn misbehave(&mut self, _peer_id: &PeerId, _kind: Misbehavior) -> MisbehaveResult { - MisbehaveResult::Disconnect - } -} - -#[derive(Clone)] -struct Identify { - name: String, - client_version: String, - flags: Flags, - encode_data: Bytes, -} - -impl Identify { - fn new(name: String, flags: Flags, client_version: String) -> Self { - Identify { - name, - client_version, - flags, - encode_data: Bytes::default(), - } - } - - fn encode(&mut self) -> &[u8] { - if self.encode_data.is_empty() { - self.encode_data = packed::Identify::new_builder() - .name(self.name.as_str().pack()) - .flag(self.flags.0.pack()) - .client_version(self.client_version.as_str().pack()) - .build() - .as_bytes(); - } - - &self.encode_data - } - - fn verify<'a>(&self, data: &'a [u8]) -> Option<(Flags, String)> { - let reader = packed::IdentifyReader::from_slice(data).ok()?; - - let name = reader.name().as_utf8().ok()?.to_owned(); - if self.name != name { - debug!("Not the same chain, self: {}, remote: {}", self.name, name); - return None; - } - - let flag: u64 = reader.flag().unpack(); - if flag == 0 { - return None; - } - - let raw_client_version = reader.client_version().as_utf8().ok()?.to_owned(); - - Some((Flags::from(flag), raw_client_version)) - } -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -#[repr(u64)] -enum Flag { - /// Support all protocol - FullNode = 0x1, -} - -#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] -struct Flags(u64); - -impl Flags { - /// Check if contains a target flag - fn contains(self, flags: Flags) -> bool { - (self.0 & flags.0) == flags.0 - } -} - -impl From for Flags { - fn from(value: Flag) -> Flags { - Flags(value as u64) - } -} - -impl From for Flags { - fn from(value: u64) -> Flags { - Flags(value) - } -} diff --git a/protocols/identify/src/lib.rs b/network/src/protocols/identify/mod.rs similarity index 55% rename from protocols/identify/src/lib.rs rename to network/src/protocols/identify/mod.rs index a776dc3d31..0ae86c7d3b 100644 --- a/protocols/identify/src/lib.rs +++ b/network/src/protocols/identify/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use std::time::{Duration, Instant}; use ckb_logger::{debug, error, trace, warn}; @@ -6,8 +7,8 @@ use p2p::{ bytes::Bytes, context::{ProtocolContext, ProtocolContextMutRef, SessionContext}, multiaddr::{Multiaddr, Protocol}, - secio::PeerId, - service::SessionType, + secio::{PeerId, PublicKey}, + service::{SessionType, TargetProtocol}, traits::ServiceProtocol, utils::{is_reachable, multiaddr_to_socketaddr}, SessionId, @@ -15,8 +16,13 @@ use p2p::{ mod protocol; +use crate::{network::FEELER_PROTOCOL_ID, NetworkState, PeerIdentifyInfo}; +use ckb_types::{packed, prelude::*}; + use protocol::IdentifyMessage; +const MAX_RETURN_LISTEN_ADDRS: usize = 10; +const BAN_ON_NOT_SAME_NET: Duration = Duration::from_secs(5 * 60); const CHECK_TIMEOUT_TOKEN: u64 = 100; // Check timeout interval (seconds) const CHECK_TIMEOUT_INTERVAL: u64 = 1; @@ -46,12 +52,6 @@ pub enum MisbehaveResult { } impl MisbehaveResult { - pub fn is_continue(&self) -> bool { - match self { - MisbehaveResult::Continue => true, - _ => false, - } - } pub fn is_disconnect(&self) -> bool { match self { MisbehaveResult::Disconnect => true, @@ -104,10 +104,10 @@ impl IdentifyProtocol { } /// Turning off global ip only mode will allow any ip to be broadcast, default is true - pub fn global_ip_only(mut self, global_ip_only: bool) -> Self { - self.global_ip_only = global_ip_only; - self - } + // pub fn global_ip_only(mut self, global_ip_only: bool) -> Self { + // self.global_ip_only = global_ip_only; + // self + // } fn process_listens( &mut self, @@ -339,3 +339,257 @@ impl ServiceProtocol for IdentifyProtocol { } } } + +#[derive(Clone)] +pub(crate) struct IdentifyCallback { + network_state: Arc, + identify: Identify, +} + +impl IdentifyCallback { + pub(crate) fn new( + network_state: Arc, + name: String, + client_version: String, + ) -> IdentifyCallback { + let flags = Flags(Flag::FullNode as u64); + + IdentifyCallback { + network_state, + identify: Identify::new(name, flags, client_version), + } + } + + fn listen_addrs(&self) -> Vec { + let mut addrs = self.network_state.public_addrs(MAX_RETURN_LISTEN_ADDRS * 2); + addrs.sort_by(|a, b| a.1.cmp(&b.1)); + addrs + .into_iter() + .take(MAX_RETURN_LISTEN_ADDRS) + .map(|(addr, _)| addr) + .collect::>() + } +} + +impl Callback for IdentifyCallback { + fn identify(&mut self) -> &[u8] { + self.identify.encode() + } + + fn received_identify( + &mut self, + context: &mut ProtocolContextMutRef, + identify: &[u8], + ) -> MisbehaveResult { + match self.identify.verify(identify) { + None => { + self.network_state.ban_session( + context.control(), + context.session.id, + BAN_ON_NOT_SAME_NET, + "The nodes are not on the same network".to_string(), + ); + MisbehaveResult::Disconnect + } + Some((flags, client_version)) => { + let registry_client_version = |version: String| { + self.network_state.with_peer_registry_mut(|registry| { + if let Some(peer) = registry.get_peer_mut(context.session.id) { + peer.identify_info = Some(PeerIdentifyInfo { + client_version: version, + }) + } + }); + }; + + if context.session.ty.is_outbound() { + let peer_id = context + .session + .remote_pubkey + .as_ref() + .map(PublicKey::peer_id) + .expect("Secio must enabled"); + if self + .network_state + .with_peer_registry(|reg| reg.is_feeler(&peer_id)) + { + let _ = context.open_protocols( + context.session.id, + TargetProtocol::Single(FEELER_PROTOCOL_ID.into()), + ); + } else if flags.contains(self.identify.flags) { + registry_client_version(client_version); + + // The remote end can support all local protocols. + let protos = self + .network_state + .get_protocol_ids(|id| id != FEELER_PROTOCOL_ID.into()); + + let _ = context + .open_protocols(context.session.id, TargetProtocol::Multi(protos)); + } else { + // The remote end cannot support all local protocols. + return MisbehaveResult::Disconnect; + } + } else { + registry_client_version(client_version); + } + MisbehaveResult::Continue + } + } + } + + /// Get local listen addresses + fn local_listen_addrs(&mut self) -> Vec { + self.listen_addrs() + } + + fn add_remote_listen_addrs(&mut self, peer_id: &PeerId, addrs: Vec) { + trace!( + "got remote listen addrs from peer_id={:?}, addrs={:?}", + peer_id, + addrs, + ); + self.network_state.with_peer_registry_mut(|reg| { + if let Some(peer) = reg + .get_key_by_peer_id(peer_id) + .and_then(|session_id| reg.get_peer_mut(session_id)) + { + peer.listened_addrs = addrs.clone(); + } + }); + self.network_state.with_peer_store_mut(|peer_store| { + for addr in addrs { + if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { + debug!("Failed to add addrs to peer_store {:?} {:?}", err, peer_id); + } + } + }) + } + + fn add_observed_addr( + &mut self, + peer_id: &PeerId, + addr: Multiaddr, + ty: SessionType, + ) -> MisbehaveResult { + debug!( + "peer({:?}, {:?}) reported observed addr {}", + peer_id, ty, addr, + ); + + if ty.is_inbound() { + // The address already been discovered by other peer + return MisbehaveResult::Continue; + } + + // observed addr is not a reachable ip + if !multiaddr_to_socketaddr(&addr) + .map(|socket_addr| is_reachable(socket_addr.ip())) + .unwrap_or(false) + { + return MisbehaveResult::Continue; + } + + let observed_addrs_iter = self + .listen_addrs() + .into_iter() + .filter_map(|listen_addr| multiaddr_to_socketaddr(&listen_addr)) + .map(|socket_addr| { + addr.iter() + .filter_map(|proto| match proto { + Protocol::P2P(_) => None, + Protocol::TCP(_) => Some(Protocol::TCP(socket_addr.port())), + value => Some(value), + }) + .collect::() + }); + self.network_state.add_observed_addrs(observed_addrs_iter); + // NOTE: for future usage + MisbehaveResult::Continue + } + + fn misbehave(&mut self, _peer_id: &PeerId, _kind: Misbehavior) -> MisbehaveResult { + MisbehaveResult::Disconnect + } +} + +#[derive(Clone)] +struct Identify { + name: String, + client_version: String, + flags: Flags, + encode_data: ckb_types::bytes::Bytes, +} + +impl Identify { + fn new(name: String, flags: Flags, client_version: String) -> Self { + Identify { + name, + client_version, + flags, + encode_data: ckb_types::bytes::Bytes::default(), + } + } + + fn encode(&mut self) -> &[u8] { + if self.encode_data.is_empty() { + self.encode_data = packed::Identify::new_builder() + .name(self.name.as_str().pack()) + .flag(self.flags.0.pack()) + .client_version(self.client_version.as_str().pack()) + .build() + .as_bytes(); + } + + &self.encode_data + } + + fn verify<'a>(&self, data: &'a [u8]) -> Option<(Flags, String)> { + let reader = packed::IdentifyReader::from_slice(data).ok()?; + + let name = reader.name().as_utf8().ok()?.to_owned(); + if self.name != name { + debug!("Not the same chain, self: {}, remote: {}", self.name, name); + return None; + } + + let flag: u64 = reader.flag().unpack(); + if flag == 0 { + return None; + } + + let raw_client_version = reader.client_version().as_utf8().ok()?.to_owned(); + + Some((Flags::from(flag), raw_client_version)) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[repr(u64)] +enum Flag { + /// Support all protocol + FullNode = 0x1, +} + +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +struct Flags(u64); + +impl Flags { + /// Check if contains a target flag + fn contains(self, flags: Flags) -> bool { + (self.0 & flags.0) == flags.0 + } +} + +impl From for Flags { + fn from(value: Flag) -> Flags { + Flags(value as u64) + } +} + +impl From for Flags { + fn from(value: u64) -> Flags { + Flags(value) + } +} diff --git a/protocols/identify/src/protocol.rs b/network/src/protocols/identify/protocol.rs similarity index 100% rename from protocols/identify/src/protocol.rs rename to network/src/protocols/identify/protocol.rs diff --git a/network/src/protocols/ping.rs b/network/src/protocols/ping.rs index d914bd05a7..c49ec1f508 100644 --- a/network/src/protocols/ping.rs +++ b/network/src/protocols/ping.rs @@ -1,9 +1,15 @@ use crate::network::disconnect_with_message; use crate::NetworkState; -use ckb_logger::{debug, trace}; -use futures::{channel::mpsc::Receiver, Stream, StreamExt}; -use p2p::service::ServiceControl; -use p2p_ping::Event; +use ckb_logger::{debug, error, trace, warn}; +use futures::{ + channel::mpsc::{Receiver, Sender}, + Stream, StreamExt, +}; +use std::{ + collections::HashMap, + str, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use std::{ pin::Pin, sync::Arc, @@ -11,6 +17,306 @@ use std::{ time::Instant, }; +use ckb_types::{packed, prelude::*}; +use p2p::{ + bytes::Bytes, + context::{ProtocolContext, ProtocolContextMutRef}, + secio::PeerId, + service::{ServiceControl, TargetSession}, + traits::ServiceProtocol, + SessionId, +}; + +const SEND_PING_TOKEN: u64 = 0; +const CHECK_TIMEOUT_TOKEN: u64 = 1; + +/// Ping protocol events +#[derive(Debug)] +pub enum Event { + /// Peer send ping to us. + Ping(PeerId), + /// Peer send pong to us. + Pong(PeerId, Duration), + /// Peer is timeout. + Timeout(PeerId), + /// Peer cause a unexpected error. + UnexpectedError(PeerId), +} + +/// Ping protocol handler. +/// +/// The interval means that we send ping to peers. +/// The timeout means that consider peer is timeout if during a timeout we still have not received pong from a peer +pub struct PingHandler { + interval: Duration, + timeout: Duration, + connected_session_ids: HashMap, + event_sender: Sender, +} + +impl PingHandler { + pub fn new(interval: Duration, timeout: Duration, event_sender: Sender) -> PingHandler { + PingHandler { + interval, + timeout, + connected_session_ids: Default::default(), + event_sender, + } + } + + pub fn send_event(&mut self, event: Event) { + if let Err(err) = self.event_sender.try_send(event) { + error!("send ping event error: {}", err); + } + } +} + +/// PingStatus of a peer +#[derive(Clone, Debug)] +struct PingStatus { + /// Are we currently pinging this peer? + processing: bool, + /// The time we last send ping to this peer. + last_ping: SystemTime, + peer_id: PeerId, + version: String, +} + +impl PingStatus { + /// A meaningless value, peer must send a pong has same nonce to respond a ping. + fn nonce(&self) -> u32 { + self.last_ping + .duration_since(UNIX_EPOCH) + .map(|dur| dur.as_secs()) + .unwrap_or(0) as u32 + } + + /// Time duration since we last send ping. + fn elapsed(&self) -> Duration { + self.last_ping.elapsed().unwrap_or(Duration::from_secs(0)) + } +} + +impl ServiceProtocol for PingHandler { + fn init(&mut self, context: &mut ProtocolContext) { + // periodicly send ping to peers + let proto_id = context.proto_id; + if context + .set_service_notify(proto_id, self.interval, SEND_PING_TOKEN) + .is_err() + { + warn!("start ping fail"); + } + if context + .set_service_notify(proto_id, self.timeout, CHECK_TIMEOUT_TOKEN) + .is_err() + { + warn!("start ping fail"); + } + } + + fn connected(&mut self, context: ProtocolContextMutRef, version: &str) { + let session = context.session; + match session.remote_pubkey { + Some(ref pubkey) => { + let peer_id = pubkey.peer_id(); + self.connected_session_ids + .entry(session.id) + .or_insert_with(|| PingStatus { + last_ping: SystemTime::now(), + processing: false, + peer_id, + version: version.to_owned(), + }); + debug!( + "proto id [{}] open on session [{}], address: [{}], type: [{:?}], version: {}", + context.proto_id, session.id, session.address, session.ty, version + ); + debug!("connected sessions are: {:?}", self.connected_session_ids); + } + None => { + if context.disconnect(session.id).is_err() { + debug!("disconnect fail"); + } + } + } + } + + fn disconnected(&mut self, context: ProtocolContextMutRef) { + let session = context.session; + self.connected_session_ids.remove(&session.id); + debug!( + "proto id [{}] close on session [{}]", + context.proto_id, session.id + ); + } + + fn received(&mut self, context: ProtocolContextMutRef, data: Bytes) { + let session = context.session; + if let Some(peer_id) = self + .connected_session_ids + .get(&session.id) + .map(|ps| ps.peer_id.clone()) + { + match PingMessage::decode(data.as_ref()) { + None => { + error!("decode message error"); + self.send_event(Event::UnexpectedError(peer_id)); + } + Some(msg) => { + match msg { + PingPayload::Ping(nonce) => { + if context + .send_message(PingMessage::build_pong(nonce)) + .is_err() + { + debug!("send message fail"); + } + self.send_event(Event::Ping(peer_id)); + } + PingPayload::Pong(nonce) => { + // check pong + if self + .connected_session_ids + .get(&session.id) + .map(|ps| (ps.processing, ps.nonce())) + == Some((true, nonce)) + { + let ping_time = + match self.connected_session_ids.get_mut(&session.id) { + Some(ps) => { + ps.processing = false; + ps.elapsed() + } + None => return, + }; + self.send_event(Event::Pong(peer_id, ping_time)); + } else { + // ignore if nonce is incorrect + self.send_event(Event::UnexpectedError(peer_id)); + } + } + } + } + } + } + } + + fn notify(&mut self, context: &mut ProtocolContext, token: u64) { + match token { + SEND_PING_TOKEN => { + debug!("proto [{}] start ping peers", context.proto_id); + let now = SystemTime::now(); + let peers: Vec<(SessionId, u32)> = self + .connected_session_ids + .iter_mut() + .filter_map(|(session_id, ps)| { + if ps.processing { + None + } else { + ps.processing = true; + ps.last_ping = now; + Some((*session_id, ps.nonce())) + } + }) + .collect(); + if !peers.is_empty() { + let ping_msg = PingMessage::build_ping(peers[0].1); + let peer_ids: Vec = peers + .into_iter() + .map(|(session_id, _)| session_id) + .collect(); + let proto_id = context.proto_id; + if context + .filter_broadcast(TargetSession::Multi(peer_ids), proto_id, ping_msg) + .is_err() + { + debug!("send message fail"); + } + } + } + CHECK_TIMEOUT_TOKEN => { + debug!("proto [{}] check ping timeout", context.proto_id); + let timeout = self.timeout; + for peer_id in self + .connected_session_ids + .values() + .filter(|ps| ps.processing && ps.elapsed() >= timeout) + .map(|ps| ps.peer_id.clone()) + .collect::>() + { + self.send_event(Event::Timeout(peer_id)); + } + } + _ => panic!("unknown token {}", token), + } + } +} + +enum PingPayload { + Ping(u32), + Pong(u32), +} + +struct PingMessage; + +impl PingMessage { + fn build_ping(nonce: u32) -> Bytes { + let nonce_le = nonce.to_le_bytes(); + let nonce = packed::Uint32::new_builder() + .nth0(nonce_le[0].into()) + .nth1(nonce_le[1].into()) + .nth2(nonce_le[2].into()) + .nth3(nonce_le[3].into()) + .build(); + let ping = packed::Ping::new_builder().nonce(nonce).build(); + let payload = packed::PingPayload::new_builder().set(ping).build(); + // TODO update-after-upgrade-p2p + packed::PingMessage::new_builder() + .payload(payload) + .build() + .as_bytes() + .as_ref() + .to_owned() + .into() + } + + fn build_pong(nonce: u32) -> Bytes { + let nonce_le = nonce.to_le_bytes(); + let nonce = packed::Uint32::new_builder() + .nth0(nonce_le[0].into()) + .nth1(nonce_le[1].into()) + .nth2(nonce_le[2].into()) + .nth3(nonce_le[3].into()) + .build(); + let pong = packed::Pong::new_builder().nonce(nonce).build(); + let payload = packed::PingPayload::new_builder().set(pong).build(); + // TODO update-after-upgrade-p2p + packed::PingMessage::new_builder() + .payload(payload) + .build() + .as_bytes() + .as_ref() + .to_owned() + .into() + } + + #[allow(clippy::cast_ptr_alignment)] + fn decode(data: &[u8]) -> Option { + let reader = packed::PingMessageReader::from_compatible_slice(data).ok()?; + match reader.payload().to_enum() { + packed::PingPayloadUnionReader::Ping(reader) => { + let le = reader.nonce().raw_data().as_ptr() as *const u32; + Some(PingPayload::Ping(u32::from_le(unsafe { *le }))) + } + packed::PingPayloadUnionReader::Pong(reader) => { + let le = reader.nonce().raw_data().as_ptr() as *const u32; + Some(PingPayload::Pong(u32::from_le(unsafe { *le }))) + } + } + } +} + pub struct PingService { network_state: Arc, p2p_control: ServiceControl, diff --git a/network/src/protocols/test.rs b/network/src/protocols/test.rs index 3c7c1e122c..89d44b22c3 100644 --- a/network/src/protocols/test.rs +++ b/network/src/protocols/test.rs @@ -1,8 +1,8 @@ use super::{ - discovery::{DiscoveryProtocol, DiscoveryService}, + discovery::DiscoveryProtocol, feeler::Feeler, - identify::IdentifyCallback, - ping::PingService, + identify::{IdentifyCallback, IdentifyProtocol}, + ping::{PingHandler, PingService}, }; use crate::{ @@ -19,10 +19,7 @@ use std::{ }; use ckb_util::{Condvar, Mutex}; -use futures::{ - channel::mpsc::{self, channel}, - StreamExt, -}; +use futures::{channel::mpsc::channel, StreamExt}; use p2p::{ builder::{MetaBuilder, ServiceBuilder}, multiaddr::{Multiaddr, Protocol}, @@ -30,8 +27,6 @@ use p2p::{ utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; -use p2p_identify::IdentifyProtocol; -use p2p_ping::PingHandler; use tempfile::tempdir; struct Node { @@ -161,13 +156,11 @@ fn net_service_start(name: String) -> Node { .build(); // Discovery protocol - let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_network_state = Arc::clone(&network_state); let disc_meta = MetaBuilder::default() .id(DISCOVERY_PROTOCOL_ID.into()) .service_handle(move || { - ProtocolHandle::Both(Box::new( - DiscoveryProtocol::new(disc_sender).global_ip_only(false), - )) + ProtocolHandle::Both(Box::new(DiscoveryProtocol::new(disc_network_state, true))) }) .build(); @@ -206,12 +199,6 @@ fn net_service_start(name: String) -> Node { exit_condvar: Arc::new((Mutex::new(()), Condvar::new())), }); - let disc_service = DiscoveryService::new( - Arc::clone(&network_state), - disc_receiver, - config.discovery_local_address, - ); - let mut ping_service = PingService::new( Arc::clone(&network_state), p2p_service.control().to_owned(), @@ -231,7 +218,6 @@ fn net_service_start(name: String) -> Node { .threaded_scheduler() .build() .unwrap(); - rt.spawn(disc_service); rt.spawn(async move { loop { if ping_service.next().await.is_none() { diff --git a/protocols/discovery/Cargo.toml b/protocols/discovery/Cargo.toml deleted file mode 100644 index 984df77b3b..0000000000 --- a/protocols/discovery/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "ckb-discovery" -version = "0.31.0-pre" -license = "MIT" -authors = ["Nervos Core Dev "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -futures = { version = "0.3.0" } -tokio = { version = "0.2.0", features = ["time", "io-util", "tcp", "dns", "stream"] } -tokio-util = { version = "0.2.0", features = ["codec"] } -rand = "0.6.1" -ckb-types = { path = "../../util/types" } diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml deleted file mode 100644 index b4de072341..0000000000 --- a/protocols/identify/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "ckb-identify" -version = "0.31.0-pre" -license = "MIT" -authors = ["Nervos Core Dev "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -ckb-types = { path = "../../util/types" } diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml deleted file mode 100644 index 9da18702e6..0000000000 --- a/protocols/ping/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "ckb-ping" -version = "0.31.0-pre" -authors = ["Nervos Core Dev "] -license = "MIT" -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -futures = "0.3" -ckb-types = { path = "../../util/types" } diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs deleted file mode 100644 index 02c12b7a89..0000000000 --- a/protocols/ping/src/lib.rs +++ /dev/null @@ -1,302 +0,0 @@ -use ckb_logger::{debug, error, warn}; -use futures::channel::mpsc::Sender; -use p2p::{ - bytes::Bytes, - context::{ProtocolContext, ProtocolContextMutRef}, - secio::PeerId, - service::TargetSession, - traits::ServiceProtocol, - SessionId, -}; - -use std::{ - collections::HashMap, - str, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; - -use ckb_types::{packed, prelude::*}; - -const SEND_PING_TOKEN: u64 = 0; -const CHECK_TIMEOUT_TOKEN: u64 = 1; - -/// Ping protocol events -#[derive(Debug)] -pub enum Event { - /// Peer send ping to us. - Ping(PeerId), - /// Peer send pong to us. - Pong(PeerId, Duration), - /// Peer is timeout. - Timeout(PeerId), - /// Peer cause a unexpected error. - UnexpectedError(PeerId), -} - -/// Ping protocol handler. -/// -/// The interval means that we send ping to peers. -/// The timeout means that consider peer is timeout if during a timeout we still have not received pong from a peer -pub struct PingHandler { - interval: Duration, - timeout: Duration, - connected_session_ids: HashMap, - event_sender: Sender, -} - -impl PingHandler { - pub fn new(interval: Duration, timeout: Duration, event_sender: Sender) -> PingHandler { - PingHandler { - interval, - timeout, - connected_session_ids: Default::default(), - event_sender, - } - } - - pub fn send_event(&mut self, event: Event) { - if let Err(err) = self.event_sender.try_send(event) { - error!("send ping event error: {}", err); - } - } -} - -/// PingStatus of a peer -#[derive(Clone, Debug)] -struct PingStatus { - /// Are we currently pinging this peer? - processing: bool, - /// The time we last send ping to this peer. - last_ping: SystemTime, - peer_id: PeerId, - version: String, -} - -impl PingStatus { - /// A meaningless value, peer must send a pong has same nonce to respond a ping. - fn nonce(&self) -> u32 { - self.last_ping - .duration_since(UNIX_EPOCH) - .map(|dur| dur.as_secs()) - .unwrap_or(0) as u32 - } - - /// Time duration since we last send ping. - fn elapsed(&self) -> Duration { - self.last_ping.elapsed().unwrap_or(Duration::from_secs(0)) - } -} - -impl ServiceProtocol for PingHandler { - fn init(&mut self, context: &mut ProtocolContext) { - // periodicly send ping to peers - let proto_id = context.proto_id; - if context - .set_service_notify(proto_id, self.interval, SEND_PING_TOKEN) - .is_err() - { - warn!("start ping fail"); - } - if context - .set_service_notify(proto_id, self.timeout, CHECK_TIMEOUT_TOKEN) - .is_err() - { - warn!("start ping fail"); - } - } - - fn connected(&mut self, context: ProtocolContextMutRef, version: &str) { - let session = context.session; - match session.remote_pubkey { - Some(ref pubkey) => { - let peer_id = pubkey.peer_id(); - self.connected_session_ids - .entry(session.id) - .or_insert_with(|| PingStatus { - last_ping: SystemTime::now(), - processing: false, - peer_id, - version: version.to_owned(), - }); - debug!( - "proto id [{}] open on session [{}], address: [{}], type: [{:?}], version: {}", - context.proto_id, session.id, session.address, session.ty, version - ); - debug!("connected sessions are: {:?}", self.connected_session_ids); - } - None => { - if context.disconnect(session.id).is_err() { - debug!("disconnect fail"); - } - } - } - } - - fn disconnected(&mut self, context: ProtocolContextMutRef) { - let session = context.session; - self.connected_session_ids.remove(&session.id); - debug!( - "proto id [{}] close on session [{}]", - context.proto_id, session.id - ); - } - - fn received(&mut self, context: ProtocolContextMutRef, data: Bytes) { - let session = context.session; - if let Some(peer_id) = self - .connected_session_ids - .get(&session.id) - .map(|ps| ps.peer_id.clone()) - { - match PingMessage::decode(data.as_ref()) { - None => { - error!("decode message error"); - self.send_event(Event::UnexpectedError(peer_id)); - } - Some(msg) => { - match msg { - PingPayload::Ping(nonce) => { - if context - .send_message(PingMessage::build_pong(nonce)) - .is_err() - { - debug!("send message fail"); - } - self.send_event(Event::Ping(peer_id)); - } - PingPayload::Pong(nonce) => { - // check pong - if self - .connected_session_ids - .get(&session.id) - .map(|ps| (ps.processing, ps.nonce())) - == Some((true, nonce)) - { - let ping_time = - match self.connected_session_ids.get_mut(&session.id) { - Some(ps) => { - ps.processing = false; - ps.elapsed() - } - None => return, - }; - self.send_event(Event::Pong(peer_id, ping_time)); - } else { - // ignore if nonce is incorrect - self.send_event(Event::UnexpectedError(peer_id)); - } - } - } - } - } - } - } - - fn notify(&mut self, context: &mut ProtocolContext, token: u64) { - match token { - SEND_PING_TOKEN => { - debug!("proto [{}] start ping peers", context.proto_id); - let now = SystemTime::now(); - let peers: Vec<(SessionId, u32)> = self - .connected_session_ids - .iter_mut() - .filter_map(|(session_id, ps)| { - if ps.processing { - None - } else { - ps.processing = true; - ps.last_ping = now; - Some((*session_id, ps.nonce())) - } - }) - .collect(); - if !peers.is_empty() { - let ping_msg = PingMessage::build_ping(peers[0].1); - let peer_ids: Vec = peers - .into_iter() - .map(|(session_id, _)| session_id) - .collect(); - let proto_id = context.proto_id; - if context - .filter_broadcast(TargetSession::Multi(peer_ids), proto_id, ping_msg) - .is_err() - { - debug!("send message fail"); - } - } - } - CHECK_TIMEOUT_TOKEN => { - debug!("proto [{}] check ping timeout", context.proto_id); - let timeout = self.timeout; - for peer_id in self - .connected_session_ids - .values() - .filter(|ps| ps.processing && ps.elapsed() >= timeout) - .map(|ps| ps.peer_id.clone()) - .collect::>() - { - self.send_event(Event::Timeout(peer_id)); - } - } - _ => panic!("unknown token {}", token), - } - } -} - -enum PingPayload { - Ping(u32), - Pong(u32), -} - -struct PingMessage; - -impl PingMessage { - fn build_ping(nonce: u32) -> Bytes { - let nonce_le = nonce.to_le_bytes(); - let nonce = packed::Uint32::new_builder() - .nth0(nonce_le[0].into()) - .nth1(nonce_le[1].into()) - .nth2(nonce_le[2].into()) - .nth3(nonce_le[3].into()) - .build(); - let ping = packed::Ping::new_builder().nonce(nonce).build(); - let payload = packed::PingPayload::new_builder().set(ping).build(); - - packed::PingMessage::new_builder() - .payload(payload) - .build() - .as_bytes() - } - - fn build_pong(nonce: u32) -> Bytes { - let nonce_le = nonce.to_le_bytes(); - let nonce = packed::Uint32::new_builder() - .nth0(nonce_le[0].into()) - .nth1(nonce_le[1].into()) - .nth2(nonce_le[2].into()) - .nth3(nonce_le[3].into()) - .build(); - let pong = packed::Pong::new_builder().nonce(nonce).build(); - let payload = packed::PingPayload::new_builder().set(pong).build(); - - packed::PingMessage::new_builder() - .payload(payload) - .build() - .as_bytes() - } - - #[allow(clippy::cast_ptr_alignment)] - fn decode(data: &[u8]) -> Option { - let reader = packed::PingMessageReader::from_compatible_slice(data).ok()?; - match reader.payload().to_enum() { - packed::PingPayloadUnionReader::Ping(reader) => { - let le = reader.nonce().raw_data().as_ptr() as *const u32; - Some(PingPayload::Ping(u32::from_le(unsafe { *le }))) - } - packed::PingPayloadUnionReader::Pong(reader) => { - let le = reader.nonce().raw_data().as_ptr() as *const u32; - Some(PingPayload::Pong(u32::from_le(unsafe { *le }))) - } - } - } -}