Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKCHANGE: refactor network config #361

Merged
merged 3 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 0 additions & 68 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,74 +30,6 @@ pub(crate) use p2p::{
context::{ServiceContext, SessionContext},
service::ServiceControl,
};
use serde_derive::Deserialize;
use std::time::Duration;

const DEFAULT_OUTGOING_PEERS_RATIO: u32 = 3;

// used in CKBProtocolContext
pub type PeerIndex = usize;

#[derive(Clone, Debug, PartialEq, Deserialize)]
pub struct Config {
pub listen_addresses: Vec<multiaddr::Multiaddr>,
pub secret_file: String,
pub peer_store_path: String,
pub try_outbound_connect_secs: Option<u64>,
/// List of initial node addresses
pub bootnodes: Vec<String>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: Option<String>,
/// Minimum number of connected peers to maintain
pub max_peers: u32,
pub outbound_peers_ratio: Option<u32>,
pub config_dir_path: Option<String>,
}

impl Config {
fn max_outbound_peers(&self) -> u32 {
self.max_peers
/ self
.outbound_peers_ratio
.unwrap_or_else(|| DEFAULT_OUTGOING_PEERS_RATIO)
}
fn max_inbound_peers(&self) -> u32 {
self.max_peers - self.max_outbound_peers()
}
}

impl From<Config> for NetworkConfig {
fn from(config: Config) -> Self {
let mut cfg = NetworkConfig::default();
cfg.max_outbound_peers = config.max_outbound_peers();
cfg.max_inbound_peers = config.max_inbound_peers();
cfg.listen_addresses = config.listen_addresses;
cfg.bootnodes = config.bootnodes;
cfg.reserved_peers = config.reserved_nodes;
if let Some(try_outbound_connect_secs) = config.try_outbound_connect_secs {
cfg.try_outbound_connect_interval = Duration::from_secs(try_outbound_connect_secs);
}
if let Some(value) = config.non_reserved_mode {
cfg.reserved_only = match value.as_str() {
"Accept" => false,
"Deny" => true,
_ => false,
};
}
if let Some(dir_path) = &config.config_dir_path {
cfg.secret_key_path = Some(format!("{}/{}", dir_path, config.secret_file));
cfg.peer_store_path = Some(format!("{}/{}", dir_path, config.peer_store_path));
}
cfg.client_version = "ckb network".to_string();
match cfg.read_secret_key() {
Some(raw_key) => cfg.secret_key = Some(raw_key),
None => {
cfg.generate_random_key().expect("generate random key");
cfg.write_secret_key_to_file().expect("write random key");
}
}
cfg
}
}
28 changes: 13 additions & 15 deletions network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::{ConfigError, Error, PeerError, ProtocolError};
use crate::errors::{Error, PeerError, ProtocolError};
use crate::peer_store::{sqlite::SqlitePeerStore, PeerStore};
use crate::peers_registry::{
ConnectionStatus, Peer, PeerIdentifyInfo, PeersRegistry, RegisterResult, Session,
Expand Down Expand Up @@ -354,21 +354,17 @@ impl Network {
config: &NetworkConfig,
ckb_protocols: CKBProtocols,
) -> Result<(Arc<Self>, P2PService, TimerRegistry, EventReceivers), Error> {
let local_private_key = match config.fetch_private_key() {
Some(private_key) => private_key?,
None => return Err(ConfigError::InvalidKey.into()),
};
config.create_dir_if_not_exists()?;
let local_private_key = config.fetch_private_key()?;
// set max score to public addresses
let listened_addresses: FnvHashMap<Multiaddr, u8> = config
.public_addresses
.iter()
.map(|addr| (addr.to_owned(), std::u8::MAX))
.collect();
let peer_store: Arc<RwLock<dyn PeerStore>> = {
let mut peer_store = match &config.peer_store_path {
Some(path) => SqlitePeerStore::file(path.to_string())?,
None => SqlitePeerStore::memory("default".to_string())?,
};
let mut peer_store =
SqlitePeerStore::file(config.peer_store_path().to_string_lossy().to_string())?;
let bootnodes = config.bootnodes()?;
for (peer_id, addr) in bootnodes {
peer_store.add_bootnode(peer_id, addr);
Expand All @@ -382,8 +378,8 @@ impl Network {
.collect::<Vec<_>>();
let peers_registry = PeersRegistry::new(
Arc::clone(&peer_store),
config.max_inbound_peers,
config.max_outbound_peers,
config.max_inbound_peers(),
config.max_outbound_peers(),
config.reserved_only,
reserved_peers,
);
Expand All @@ -394,8 +390,8 @@ impl Network {
.service_handle(move || {
ProtocolHandle::Callback(Box::new(PingHandler::new(
PING_PROTOCOL_ID,
config.ping_interval,
config.ping_timeout,
Duration::from_secs(config.ping_interval_secs),
Duration::from_secs(config.ping_timeout_secs),
ping_sender,
)))
})
Expand Down Expand Up @@ -539,8 +535,10 @@ impl Network {
event_receiver: ckb_event_receiver,
};
let timer_service = TimerService::new(timer_registry, Arc::clone(&network));
let outbound_peer_service =
OutboundPeerService::new(Arc::clone(&network), config.try_outbound_connect_interval);
let outbound_peer_service = OutboundPeerService::new(
Arc::clone(&network),
Duration::from_secs(config.connect_outbound_interval_secs),
);
// prepare services futures
let futures: Vec<Box<Future<Item = (), Error = Error> + Send>> = vec![
Box::new(
Expand Down
160 changes: 70 additions & 90 deletions network/src/network_config.rs
Original file line number Diff line number Diff line change
@@ -1,93 +1,102 @@
use crate::errors::{ConfigError, Error};
use crate::PeerId;
use bytes::Bytes;
use log::info;
use p2p::multiaddr::{Multiaddr, Protocol, ToMultiaddr};
use rand;
use rand::Rng;
use secio;
use serde_derive::Deserialize;
use std::fs;
use std::io::Error as IoError;
use std::io::Read;
use std::io::Write;
use std::iter;
use std::net::Ipv4Addr;
use std::time::Duration;
use std::path::PathBuf;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Deserialize)]
pub struct NetworkConfig {
pub listen_addresses: Vec<Multiaddr>,
pub public_addresses: Vec<Multiaddr>,
pub client_version: String,
pub protocol_version: String,
pub bootnodes: Vec<Multiaddr>,
pub reserved_peers: Vec<Multiaddr>,
pub reserved_only: bool,
pub max_inbound_peers: u32,
pub max_peers: u32,
pub max_outbound_peers: u32,
pub reserved_peers: Vec<String>,
pub secret_key: Option<Bytes>,
pub secret_key_path: Option<String>,
pub peer_store_path: Option<String>,
pub config_dir_path: Option<String>,
pub bootnodes: Vec<String>,
pub ping_interval: Duration,
pub ping_timeout: Duration,
pub discovery_timeout: Duration,
pub discovery_response_count: usize,
pub discovery_interval: Duration,
pub try_outbound_connect_interval: Duration,
pub path: PathBuf,
pub ping_interval_secs: u64,
pub ping_timeout_secs: u64,
pub connect_outbound_interval_secs: u64,
}

impl NetworkConfig {
pub(crate) fn read_secret_key(&self) -> Option<Bytes> {
if self.secret_key.is_some() {
self.secret_key.clone()
} else if let Some(ref path) = self.secret_key_path {
match fs::File::open(path).and_then(|mut file| {
let mut buf = Vec::new();
file.read_to_end(&mut buf).map(|_| buf)
}) {
Ok(secret) => Some(secret.into()),
Err(_err) => None,
}
} else {
None
fn generate_random_key() -> [u8; 32] {
loop {
let mut key: [u8; 32] = [0; 32];
rand::thread_rng().fill(&mut key);
if secio::SecioKeyPair::secp256k1_raw_key(&key).is_ok() {
return key;
}
}
}

pub fn new() -> Self {
Self::default()
impl NetworkConfig {
pub fn secret_key_path(&self) -> PathBuf {
let mut path = self.path.clone();
path.push("secret_key");
path
}

pub fn generate_random_key(&mut self) -> Result<secio::SecioKeyPair, Error> {
info!(target: "network", "Generate random key");
let mut key: [u8; 32] = [0; 32];
rand::thread_rng().fill(&mut key);
self.secret_key = Some(Bytes::from(key.to_vec()));
secio::SecioKeyPair::secp256k1_raw_key(&key).map_err(|_err| ConfigError::InvalidKey.into())
pub fn peer_store_path(&self) -> PathBuf {
let mut path = self.path.clone();
path.push("peer_store.db");
path
}

pub fn write_secret_key_to_file(&mut self) -> Result<(), IoError> {
if let Some(ref secret_key_path) = self.secret_key_path {
if let Some(secret_key) = self.secret_key.clone() {
info!(target: "network", "write random secret key to {}", secret_key_path);
return fs::OpenOptions::new()
.create(true)
.write(true)
.open(secret_key_path)
.and_then(|mut file| file.write_all(&secret_key));
}
pub fn create_dir_if_not_exists(&self) -> Result<(), Error> {
if !self.path.exists() {
fs::create_dir(&self.path)?;
}
Ok(())
}

pub fn fetch_private_key(&self) -> Option<Result<secio::SecioKeyPair, Error>> {
if let Some(secret) = self.read_secret_key() {
Some(
secio::SecioKeyPair::secp256k1_raw_key(&secret)
.map_err(|_err| ConfigError::InvalidKey.into()),
)
} else {
None
pub fn max_inbound_peers(&self) -> u32 {
self.max_peers - self.max_outbound_peers
}

pub fn max_outbound_peers(&self) -> u32 {
self.max_outbound_peers
}

fn read_secret_key(&self) -> Result<Option<secio::SecioKeyPair>, Error> {
let path = self.secret_key_path();
let mut file = match fs::File::open(path) {
Ok(file) => file,
Err(_) => return Ok(None),
};
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
Ok(Some(secio::SecioKeyPair::secp256k1_raw_key(&buf).map_err(
|_err: secio::error::SecioError| ConfigError::InvalidKey,
)?))
}

fn write_secret_key_to_file(&self) -> Result<(), Error> {
let path = self.secret_key_path();
info!(target: "network", "Generate random key");
let random_key_pair = generate_random_key();
info!(target: "network", "write random secret key to {:?}", path);
fs::OpenOptions::new()
.create(true)
.write(true)
.open(path)
.and_then(|mut file| file.write_all(&random_key_pair))
.map_err(Into::into)
}

pub fn fetch_private_key(&self) -> Result<secio::SecioKeyPair, Error> {
match self.read_secret_key()? {
Some(key) => Ok(key),
None => {
self.write_secret_key_to_file()?;
Ok(self.read_secret_key()?.expect("key must exists"))
}
}
}

Expand Down Expand Up @@ -125,32 +134,3 @@ impl NetworkConfig {
Ok(peers)
}
}

impl Default for NetworkConfig {
fn default() -> Self {
NetworkConfig {
listen_addresses: vec![iter::once(Protocol::Ip4(Ipv4Addr::new(0, 0, 0, 0)))
.chain(iter::once(Protocol::Tcp(30333)))
.collect()],
public_addresses: Vec::new(),
client_version: "ckb<unknown>".to_owned(),
protocol_version: "ckb".to_owned(),
reserved_only: false,
max_outbound_peers: 15,
max_inbound_peers: 10,
reserved_peers: vec![],
secret_key: None,
secret_key_path: None,
bootnodes: vec![],
config_dir_path: None,
// protocol services config
ping_interval: Duration::from_secs(15),
ping_timeout: Duration::from_secs(20),
discovery_timeout: Duration::from_secs(20),
discovery_response_count: 20,
discovery_interval: Duration::from_secs(15),
try_outbound_connect_interval: Duration::from_secs(15),
peer_store_path: None,
}
}
}
15 changes: 9 additions & 6 deletions nodes_template/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ filter = "info"
color = true

[network]
path = "default/network"
listen_addresses = ["/ip4/0.0.0.0/tcp/8115"]
public_addresses = []
bootnodes = []
reserved_nodes = []
only_reserved_peers = false
min_peers = 4
max_peers = 8
secret_file = "secret_key"
peer_store_path = "peer_store.db"
reserved_peers = []
reserved_only = false
max_peers = 125
max_outbound_peers = 30
ping_interval_secs = 15
ping_timeout_secs = 20
connect_outbound_interval_secs = 15

[rpc]
listen_address = "0.0.0.0:8114"
Expand Down
4 changes: 1 addition & 3 deletions src/cli/run_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use ckb_db::diskdb::RocksDB;
use ckb_miner::BlockAssembler;
use ckb_network::futures::sync::mpsc::channel;
use ckb_network::CKBProtocol;
use ckb_network::NetworkConfig;
use ckb_network::NetworkService;
use ckb_network::ProtocolId;
use ckb_notify::{NotifyController, NotifyService};
Expand Down Expand Up @@ -55,7 +54,6 @@ pub fn run(setup: Setup) {

let net_time_checker = Arc::new(NetTimeProtocol::default());

let network_config = NetworkConfig::from(setup.configs.network);
let (sender, receiver) = channel(std::u8::MAX as usize);
let protocols = vec![
(
Expand Down Expand Up @@ -87,7 +85,7 @@ pub fn run(setup: Setup) {
),
];
let network = Arc::new(
NetworkService::run_in_thread(&network_config, protocols, receiver)
NetworkService::run_in_thread(&setup.configs.network, protocols, receiver)
.expect("Create and start network"),
);

Expand Down
Loading