Skip to content

Commit

Permalink
Merge #340
Browse files Browse the repository at this point in the history
340: feat: Integrate discovery and identify protocol r=jjyr a=TheWaWaR

## Changes
- [x] Add discovery protocol
- [x] Add identify protocol
- [x] Fix out bounded connect service
- [x] Fix can not shutdown network bug

## Known issues
- Shutdown network not very graceful.

Co-authored-by: Qian Linfeng <[email protected]>
Co-authored-by: Linfeng Qian <[email protected]>
  • Loading branch information
bors[bot] and TheWaWaR committed Mar 21, 2019
2 parents 99164a2 + 3999aee commit 89dd69d
Show file tree
Hide file tree
Showing 16 changed files with 847 additions and 356 deletions.
264 changes: 247 additions & 17 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ bytes = "0.4.12"
tokio = "0.1.17"
futures = { version = "0.1.19", features = ["use_std"] }
snap = "0.2"
p2p = { version = "0.1", package="tentacle" }
p2p-ping = { version = "0.1", package="tentacle-ping" }
secio = { version = "0.1", package="tentacle-secio" }
p2p = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle" }
secio = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-secio" }
p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-ping" }
p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-discovery" }
p2p-identify = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-identify" }
faketime = "0.2.0"
rusqlite = {version = "0.16.0", features = ["bundled"]}
lazy_static = "1.3.0"
Expand Down
125 changes: 87 additions & 38 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::protocol::Version as ProtocolVersion;
use crate::protocol_handler::{CKBProtocolHandler, DefaultCKBProtocolContext};
use crate::service::{
ckb_service::CKBService,
discovery_service::{DiscoveryEvent, DiscoveryProtocol, DiscoveryService},
identify_service::{IdentifyAddressManager, IdentifyEvent, IdentifyProtocol, IdentifyService},
outbound_peer_service::OutboundPeerService,
ping_service::PingService,
timer_service::{TimerRegistry, TimerService},
Expand All @@ -19,29 +21,31 @@ use bytes::Bytes;
use ckb_util::{Mutex, RwLock};
use fnv::FnvHashMap;
use futures::future::{select_all, Future};
use futures::sync::mpsc;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::Receiver;
use futures::sync::oneshot;
use futures::Stream;
use log::{debug, error, info, warn};
use multiaddr::multihash::Multihash;
use p2p::{
builder::ServiceBuilder,
builder::{MetaBuilder, ServiceBuilder},
multiaddr::{self, Multiaddr},
secio::{PeerId, PublicKey},
service::{Service, ServiceError, ServiceEvent},
service::{DialProtocol, ProtocolHandle, Service, ServiceError, ServiceEvent},
traits::ServiceHandle,
};
use p2p_ping::{Event as PingEvent, PingProtocol};
use p2p_ping::{Event as PingEvent, PingHandler};
use secio;
use std::boxed::Box;
use std::cmp::max;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::usize;
use tokio::codec::LengthDelimitedCodec;

const PING_PROTOCOL_ID: ProtocolId = 0;
const DISCOVERY_PROTOCOL_ID: ProtocolId = 1;
const IDENTIFY_PROTOCOL_ID: ProtocolId = 2;

pub type CKBProtocols = Vec<(CKBProtocol, Arc<dyn CKBProtocolHandler>)>;
type NetworkResult = Result<
Expand Down Expand Up @@ -80,7 +84,13 @@ impl PeerInfo {
}
}

type P2PService = Service<EventHandler, LengthDelimitedCodec>;
pub struct EventReceivers {
pub ping_event_receiver: Receiver<PingEvent>,
pub disc_event_receiver: mpsc::UnboundedReceiver<DiscoveryEvent>,
pub identify_event_receiver: mpsc::UnboundedReceiver<IdentifyEvent>,
}

type P2PService = Service<EventHandler>;

pub struct Network {
pub(crate) peers_registry: RwLock<PeersRegistry>,
Expand Down Expand Up @@ -313,7 +323,7 @@ impl Network {
}

pub fn dial_addr(&self, addr: Multiaddr) {
if let Err(err) = self.p2p_control.write().dial(addr) {
if let Err(err) = self.p2p_control.write().dial(addr, DialProtocol::All) {
error!(target: "network", "failed to dial: {}", err);
}
}
Expand All @@ -338,7 +348,7 @@ impl Network {
pub(crate) fn inner_build(
config: &NetworkConfig,
ckb_protocols: CKBProtocols,
) -> Result<(Arc<Self>, P2PService, TimerRegistry, Receiver<PingEvent>), Error> {
) -> Result<(Arc<Self>, P2PService, TimerRegistry, EventReceivers), Error> {
let local_private_key = match config.fetch_private_key() {
Some(private_key) => private_key?,
None => return Err(ConfigError::InvalidKey.into()),
Expand Down Expand Up @@ -369,17 +379,50 @@ impl Network {
config.reserved_only,
reserved_peers,
);
let mut p2p_service = ServiceBuilder::default().forever(true);
// register protocols
let (ping_sender, ping_receiver) = channel(std::u8::MAX as usize);
p2p_service = p2p_service.insert_protocol(PingProtocol::new(
PING_PROTOCOL_ID,
config.ping_interval,
config.ping_timeout,
ping_sender,
));
let ping_meta = MetaBuilder::default()
.id(PING_PROTOCOL_ID)
.service_handle(move || {
ProtocolHandle::Callback(Box::new(PingHandler::new(
PING_PROTOCOL_ID,
config.ping_interval,
config.ping_timeout,
ping_sender,
)))
})
.build();

let (disc_sender, disc_receiver) = mpsc::unbounded();
let disc_meta = MetaBuilder::default()
.id(DISCOVERY_PROTOCOL_ID)
.service_handle(move || {
ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
DISCOVERY_PROTOCOL_ID,
disc_sender,
)))
})
.build();

let (identify_sender, identify_receiver) = mpsc::unbounded();
let identify_addr_mgr = IdentifyAddressManager::new(identify_sender.clone());
let identify_meta = MetaBuilder::default()
.id(IDENTIFY_PROTOCOL_ID)
.service_handle(move || {
ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(
IDENTIFY_PROTOCOL_ID,
identify_addr_mgr.clone(),
)))
})
.build();

let mut p2p_service = ServiceBuilder::default()
.forever(true)
.insert_protocol(ping_meta)
.insert_protocol(disc_meta)
.insert_protocol(identify_meta);
for (ckb_protocol, _) in &ckb_protocols {
p2p_service = p2p_service.insert_protocol(ckb_protocol.clone());
p2p_service = p2p_service.insert_protocol(ckb_protocol.build());
}
let mut p2p_service = p2p_service
.key_pair(local_private_key.clone())
Expand Down Expand Up @@ -453,7 +496,16 @@ impl Network {
}
}

Ok((network, p2p_service, timer_registry, ping_receiver))
Ok((
network,
p2p_service,
timer_registry,
EventReceivers {
ping_event_receiver: ping_receiver,
disc_event_receiver: disc_receiver,
identify_event_receiver: identify_receiver,
},
))
}

pub(crate) fn build_network_future(
Expand All @@ -463,23 +515,21 @@ impl Network {
p2p_service: P2PService,
timer_registry: TimerRegistry,
ckb_event_receiver: Receiver<CKBEvent>,
ping_event_receiver: Receiver<PingEvent>,
receivers: EventReceivers,
) -> Result<Box<Future<Item = (), Error = Error> + Send>, Error> {
// initialize ckb_protocols
let ping_service = PingService {
network: Arc::clone(&network),
event_receiver: ping_event_receiver,
event_receiver: receivers.ping_event_receiver,
};
//let identify_service = Arc::new(IdentifyService {
// client_version,
// protocol_version,
// identify_timeout: config.identify_timeout,
// identify_interval: config.identify_interval,
//});
let disc_service =
DiscoveryService::new(Arc::clone(&network), receivers.disc_event_receiver);
let identify_service =
IdentifyService::new(Arc::clone(&network), receivers.identify_event_receiver);

let ckb_service = CKBService {
event_receiver: ckb_event_receiver,
network: Arc::clone(&network),
event_receiver: ckb_event_receiver,
};
let timer_service = TimerService::new(timer_registry, Arc::clone(&network));
let outbound_peer_service =
Expand All @@ -501,17 +551,16 @@ impl Network {
.for_each(|_| Ok(()))
.map_err(|_err| Error::Shutdown),
),
// Box::new(
// discovery_query_service
// .into_future()
// .map(|_| ())
// .map_err(|(err, _)| err),
// ) as Box<Future<Item = (), Error = IoError> + Send>,
//identify_service.start_protocol(
// Arc::clone(&network),
// swarm_controller.clone(),
// basic_transport.clone(),
//),
Box::new(
disc_service
.for_each(|_| Ok(()))
.map_err(|_err| Error::Shutdown),
),
Box::new(
identify_service
.for_each(|_| Ok(()))
.map_err(|_err| Error::Shutdown),
),
Box::new(timer_service.timer_futures.for_each(|_| Ok(()))),
Box::new(
outbound_peer_service
Expand Down Expand Up @@ -543,7 +592,7 @@ impl Network {
ckb_protocols: CKBProtocols,
ckb_event_receiver: Receiver<CKBEvent>,
) -> NetworkResult {
let (network, p2p_service, timer_registry, ping_event_receiver) =
let (network, p2p_service, timer_registry, receivers) =
Self::inner_build(config, ckb_protocols)?;
let (close_tx, close_rx) = oneshot::channel();
let network_future = Self::build_network_future(
Expand All @@ -553,7 +602,7 @@ impl Network {
p2p_service,
timer_registry,
ckb_event_receiver,
ping_event_receiver,
receivers,
)?;
Ok((network, close_tx, network_future))
}
Expand Down
78 changes: 41 additions & 37 deletions network/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,34 @@ use ckb_util::Mutex;
use futures::future::Future;
use futures::sync::mpsc::Receiver;
use futures::sync::oneshot;
use log::{debug, info};
use log::{debug, error, info};
use std::sync::Arc;
use std::thread;
use tokio::runtime;

pub struct StopHandler {
signal: oneshot::Sender<()>,
thread: thread::JoinHandle<()>,
network_runtime: runtime::Runtime,
}

impl StopHandler {
pub fn new(signal: oneshot::Sender<()>, thread: thread::JoinHandle<()>) -> StopHandler {
StopHandler { signal, thread }
pub fn new(signal: oneshot::Sender<()>, network_runtime: runtime::Runtime) -> StopHandler {
StopHandler {
signal,
network_runtime,
}
}

pub fn close(self) {
let StopHandler { signal, thread } = self;
let StopHandler {
signal,
network_runtime,
} = self;
if let Err(e) = signal.send(()) {
debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", e)
};
thread.join().expect("join network_service thread");
// TODO: not that gracefully shutdown, will output below error message:
// "terminate called after throwing an instance of 'std::system_error'"
network_runtime.shutdown_now();
}
}

Expand Down Expand Up @@ -67,42 +74,39 @@ impl NetworkService {
ckb_protocols: CKBProtocols,
ckb_event_receiver: Receiver<CKBEvent>,
) -> Result<NetworkService, Error> {
let (network, p2p_service, timer_registry, ping_event_receiver) =
let (network, p2p_service, timer_registry, receivers) =
Network::inner_build(config, ckb_protocols)?;
let (close_tx, close_rx) = oneshot::channel();
let (init_tx, init_rx) = oneshot::channel();
let join_handle = thread::spawn({
let network = Arc::clone(&network);
let config = config.clone();
move || {
info!(
target: "network",
"network peer_id {:?}",
network.local_public_key().peer_id()
);
let network_future = Network::build_network_future(
network,
&config,
close_rx,
p2p_service,
timer_registry,
ckb_event_receiver,
ping_event_receiver,
)
.expect("Network thread init");
init_tx.send(()).expect("Network init signal send");
// here we use default config
let network_runtime = runtime::Runtime::new().expect("Network tokio runtime init");;
match network_runtime.block_on_all(network_future) {
Ok(_) => info!(target: "network", "network service exit"),
Err(err) => panic!("network service exit unexpected {}", err),
}
}
});

info!(
target: "network",
"network peer_id {:?}",
network.local_public_key().peer_id()
);
let network_future = Network::build_network_future(
Arc::clone(&network),
&config,
close_rx,
p2p_service,
timer_registry,
ckb_event_receiver,
receivers,
)
.expect("Network thread init");
init_tx.send(()).expect("Network init signal send");
// here we use default config
let mut network_runtime = runtime::Runtime::new().expect("Network tokio runtime init");
network_runtime.spawn(
network_future
.map(|_| info!(target: "network", "network service exit"))
.map_err(|err| error!("network service exit unexpected {}", err)),
);

init_rx.wait().map_err(|_err| Error::Shutdown)?;
Ok(NetworkService {
network,
stop_handler: Mutex::new(Some(StopHandler::new(close_tx, join_handle))),
stop_handler: Mutex::new(Some(StopHandler::new(close_tx, network_runtime))),
})
}

Expand Down
Loading

0 comments on commit 89dd69d

Please sign in to comment.