Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1313 from ethcore/net
Browse files Browse the repository at this point in the history
Network start/stop
  • Loading branch information
arkpar authored Jun 18, 2016
2 parents 42478ad + 3f77f7c commit 591fa96
Show file tree
Hide file tree
Showing 19 changed files with 298 additions and 109 deletions.
25 changes: 16 additions & 9 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ pub enum SyncMessage {
NewChainHead,
/// A block is ready
BlockVerified,
/// Start network command.
StartNetwork,
/// Stop network command.
StopNetwork,
}

/// IO Message type used for Network service
pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;

/// Client service setup. Creates and registers client and network services with the IO subsystem.
pub struct ClientService {
net_service: NetworkService<SyncMessage>,
net_service: Arc<NetworkService<SyncMessage>>,
client: Arc<Client>,
panic_handler: Arc<PanicHandler>
}

impl ClientService {
/// Start the service in a separate thread.
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>) -> Result<ClientService, Error> {
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>, enable_network: bool) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config));
let net_service = try!(NetworkService::new(net_config));
panic_handler.forward_from(&net_service);
if enable_network {
try!(net_service.start());
}

info!("Starting {}", net_service.host_info());
info!("Configured for {} using {:?} engine", spec.name, spec.engine.name());
Expand All @@ -70,7 +77,7 @@ impl ClientService {
try!(net_service.io().register_handler(client_io));

Ok(ClientService {
net_service: net_service,
net_service: Arc::new(net_service),
client: client,
panic_handler: panic_handler,
})
Expand All @@ -82,8 +89,8 @@ impl ClientService {
}

/// Get general IO interface
pub fn io(&mut self) -> &mut IoService<NetSyncMessage> {
self.net_service.io()
pub fn register_io_handler(&self, handler: Arc<IoHandler<NetSyncMessage> + Send>) -> Result<(), IoError> {
self.net_service.io().register_handler(handler)
}

/// Get client interface
Expand All @@ -92,8 +99,8 @@ impl ClientService {
}

/// Get network service component
pub fn network(&mut self) -> &mut NetworkService<SyncMessage> {
&mut self.net_service
pub fn network(&mut self) -> Arc<NetworkService<SyncMessage>> {
self.net_service.clone()
}
}

Expand Down Expand Up @@ -149,7 +156,7 @@ mod tests {
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()));
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false);
assert!(service.is_ok());
}
}
2 changes: 2 additions & 0 deletions parity/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Account Options:
--no-import-keys Do not import keys from legacy clients.
Networking Options:
--no-network Disable p2p networking.
--port PORT Override the port on which the node should listen
[default: 30303].
--peers NUM Try to maintain that many peers [default: 25].
Expand Down Expand Up @@ -268,6 +269,7 @@ pub struct Args {
pub flag_format: Option<String>,
pub flag_jitvm: bool,
pub flag_no_color: bool,
pub flag_no_network: bool,
// legacy...
pub flag_geth: bool,
pub flag_nodekey: Option<String>,
Expand Down
20 changes: 18 additions & 2 deletions parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::NetSyncMessage;
use ethcore::service::{NetSyncMessage, SyncMessage};
use ethsync::EthSync;
use util::keys::store::AccountService;
use util::{TimerToken, IoHandler, IoContext};
use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage};

use informant::Informant;

Expand All @@ -33,6 +33,7 @@ pub struct ClientIoHandler {
pub sync: Arc<EthSync>,
pub accounts: Arc<AccountService>,
pub info: Informant,
pub network: Arc<NetworkService<SyncMessage>>,
}

impl IoHandler<NetSyncMessage> for ClientIoHandler {
Expand All @@ -48,6 +49,21 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
_ => {}
}
}

fn message(&self, _io: &IoContext<NetSyncMessage>, message: &NetSyncMessage) {
match *message {
NetworkIoMessage::User(SyncMessage::StartNetwork) => {
info!("Starting network");
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e));
},
NetworkIoMessage::User(SyncMessage::StopNetwork) => {
info!("Stopping network");
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
},
_ => {/* Ignore other messages */},
}
}
}


14 changes: 8 additions & 6 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use std::thread::sleep;
use std::time::Duration;
use rustc_serialize::hex::FromHex;
use ctrlc::CtrlC;
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes};
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError};
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path};
use ethcore::error::{Error, ImportError};
Expand Down Expand Up @@ -199,7 +199,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)

// Build client
let mut service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone()
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand All @@ -209,7 +209,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
let network_settings = Arc::new(conf.network_settings());

// Sync
let sync = EthSync::register(service.network(), sync_config, client.clone());
let sync = EthSync::new(sync_config, client.clone());
EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into()));

let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_port: conf.signer_port(),
Expand Down Expand Up @@ -270,8 +271,9 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
info: Informant::new(conf.have_color()),
sync: sync.clone(),
accounts: account_service.clone(),
network: service.network(),
});
service.io().register_handler(io_handler).expect("Error registering IO handler");
service.register_io_handler(io_handler).expect("Error registering IO handler");

if conf.args.cmd_ui {
url::open("http://localhost:8080/")
Expand Down Expand Up @@ -314,7 +316,7 @@ fn execute_export(conf: Configuration) {

// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand Down Expand Up @@ -385,7 +387,7 @@ fn execute_import(conf: Configuration) {

// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand Down
10 changes: 10 additions & 0 deletions rpc/src/v1/impls/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,14 @@ impl<S> Net for NetClient<S> where S: SyncProvider + 'static {
// right now (11 march 2016), we are always listening for incoming connections
Ok(Value::Bool(true))
}

fn start_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).start_network();
Ok(Value::Bool(true))
}

fn stop_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).stop_network();
Ok(Value::Bool(true))
}
}
6 changes: 6 additions & 0 deletions rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone()
}

fn start_network(&self) {
}

fn stop_network(&self) {
}
}

6 changes: 6 additions & 0 deletions rpc/src/v1/traits/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub trait Net: Sized + Send + Sync + 'static {
/// Otherwise false.
fn is_listening(&self, _: Params) -> Result<Value, Error>;

/// Start the network.
fn start_network(&self, _: Params) -> Result<Value, Error>;

/// Stop the network.
fn stop_network(&self, _: Params) -> Result<Value, Error>;

/// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self));
Expand Down
4 changes: 4 additions & 0 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ impl ChainSync {
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if io.is_expired() {
trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}

if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
Expand Down
6 changes: 6 additions & 0 deletions sync/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub trait SyncIo {
fn is_chain_queue_empty(&self) -> bool {
self.chain().queue_info().is_empty()
}
/// Check if the session is expired
fn is_expired(&self) -> bool;
}

/// Wraps `NetworkContext` and the blockchain client
Expand Down Expand Up @@ -83,6 +85,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_info(peer_id)
}

fn is_expired(&self) -> bool {
self.network.is_expired()
}
}


36 changes: 30 additions & 6 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
//! use ethcore::miner::Miner;
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap();
//! service.start().unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
//! let miner = Miner::new(false, ethereum::new_frontier());
//! EthSync::register(&mut service, SyncConfig::default(), client);
//! let sync = EthSync::new(SyncConfig::default(), client);
//! EthSync::register(&mut service, sync);
//! }
//! ```
Expand All @@ -66,8 +68,10 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
use util::TimerToken;
use util::{U256, ONE_U256};
use ethcore::client::Client;
use ethcore::service::SyncMessage;
use ethcore::service::{SyncMessage, NetSyncMessage};
use io::NetSyncIo;
use util::io::IoChannel;
use util::{NetworkIoMessage, NetworkError};
use chain::ChainSync;

mod chain;
Expand Down Expand Up @@ -98,30 +102,41 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync {
/// Get sync status
fn status(&self) -> SyncStatus;
/// Start the network
fn start_network(&self);
/// Stop the network
fn stop_network(&self);
}

/// Ethereum network protocol handler
pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
chain: Arc<Client>,
/// Sync strategy
sync: RwLock<ChainSync>
sync: RwLock<ChainSync>,
/// IO communication chnnel.
io_channel: RwLock<IoChannel<NetSyncMessage>>,
}

pub use self::chain::{SyncStatus, SyncState};

impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
pub fn new(config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = ChainSync::new(config, chain.deref());
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(sync),
io_channel: RwLock::new(IoChannel::disconnected()),
});
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync
}

/// Register protocol with the network service
pub fn register(service: &NetworkService<SyncMessage>, sync: Arc<EthSync>) -> Result<(), NetworkError> {
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8])
}

/// Stop sync
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
Expand All @@ -138,11 +153,20 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status()
}

fn start_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification");
}

fn stop_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification");
}
}

impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
io.register_timer(0, 1000).expect("Error registering sync timer");
*self.io_channel.write().unwrap() = io.io_channel();
}

fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
Expand Down
10 changes: 10 additions & 0 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,13 @@ fn restart_on_broken_chain() {

assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}

#[test]
fn high_td_attach() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block_parent(6);
net.sync_steps(20);

assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}
4 changes: 4 additions & 0 deletions sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> {
fn disconnect_peer(&mut self, _peer_id: PeerId) {
}

fn is_expired(&self) -> bool {
false
}

fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
self.queue.push_back(TestPacket {
data: data,
Expand Down
2 changes: 1 addition & 1 deletion util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ mod tests {

#[test]
fn test_service_register_handler () {
let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(Arc::new(MyHandler)).unwrap();
}

Expand Down
Loading

0 comments on commit 591fa96

Please sign in to comment.