diff --git a/dapps/src/api/api.rs b/dapps/src/api/api.rs index 7d38e288f1e..3452669b63d 100644 --- a/dapps/src/api/api.rs +++ b/dapps/src/api/api.rs @@ -21,7 +21,7 @@ use hyper::method::Method; use hyper::status::StatusCode; use api::{response, types}; -use api::time::TimeChecker; +use api::time::{TimeChecker, MAX_DRIFT}; use apps::fetcher::Fetcher; use handlers::{self, extract_url}; use endpoint::{Endpoint, Handler, EndpointPath}; @@ -122,7 +122,6 @@ impl RestApiRouter { // Check time let time = { - const MAX_DRIFT: i64 = 500; let (status, message, details) = match time { Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => { (HealthStatus::Ok, "".into(), diff) diff --git a/dapps/src/api/time.rs b/dapps/src/api/time.rs index b81b4a84429..8ce79f6adc4 100644 --- a/dapps/src/api/time.rs +++ b/dapps/src/api/time.rs @@ -33,10 +33,12 @@ use std::io; use std::{fmt, mem, time}; +use std::sync::atomic::{self, AtomicUsize}; use std::collections::VecDeque; use futures::{self, Future, BoxFuture}; -use futures_cpupool::CpuPool; +use futures::future::{self, IntoFuture}; +use futures_cpupool::{CpuPool, CpuFuture}; use ntp; use time::{Duration, Timespec}; use util::{Arc, RwLock}; @@ -44,6 +46,8 @@ use util::{Arc, RwLock}; /// Time checker error. #[derive(Debug, Clone, PartialEq)] pub enum Error { + /// No servers are currently available for a query. + NoServersAvailable, /// There was an error when trying to reach the NTP server. Ntp(String), /// IO error when reading NTP response. @@ -55,6 +59,7 @@ impl fmt::Display for Error { use self::Error::*; match *self { + NoServersAvailable => write!(fmt, "No NTP servers available"), Ntp(ref err) => write!(fmt, "NTP error: {}", err), Io(ref err) => write!(fmt, "Connection Error: {}", err), } @@ -71,58 +76,123 @@ impl From for Error { /// NTP time drift checker. pub trait Ntp { + /// Returned Future. + type Future: IntoFuture; + /// Returns the current time drift. - fn drift(&self) -> BoxFuture; + fn drift(&self) -> Self::Future; +} + +const SERVER_MAX_POLL_INTERVAL_SECS: u64 = 60; +#[derive(Debug)] +struct Server { + pub address: String, + next_call: RwLock, + failures: AtomicUsize, +} + +impl Server { + pub fn is_available(&self) -> bool { + *self.next_call.read() < time::Instant::now() + } + + pub fn report_success(&self) { + self.failures.store(0, atomic::Ordering::SeqCst); + self.update_next_call(1) + } + + pub fn report_failure(&self) { + let errors = self.failures.fetch_add(1, atomic::Ordering::SeqCst); + self.update_next_call(1 << errors) + } + + fn update_next_call(&self, delay: usize) { + *self.next_call.write() = time::Instant::now() + time::Duration::from_secs(delay as u64 * SERVER_MAX_POLL_INTERVAL_SECS); + } +} + +impl> From for Server { + fn from(t: T) -> Self { + Server { + address: t.as_ref().to_owned(), + next_call: RwLock::new(time::Instant::now()), + failures: Default::default(), + } + } } /// NTP client using the SNTP algorithm for calculating drift. #[derive(Clone)] pub struct SimpleNtp { - address: Arc, + addresses: Vec>, pool: CpuPool, } impl fmt::Debug for SimpleNtp { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Ntp {{ address: {} }}", self.address) + f + .debug_struct("SimpleNtp") + .field("addresses", &self.addresses) + .finish() } } impl SimpleNtp { - fn new(address: &str, pool: CpuPool) -> SimpleNtp { + fn new>(addresses: &[T], pool: CpuPool) -> SimpleNtp { SimpleNtp { - address: Arc::new(address.to_owned()), + addresses: addresses.iter().map(Server::from).map(Arc::new).collect(), pool: pool, } } } impl Ntp for SimpleNtp { - fn drift(&self) -> BoxFuture { - let address = self.address.clone(); - if &*address == "none" { - return futures::future::err(Error::Ntp("NTP server is not provided.".into())).boxed(); - } - - self.pool.spawn_fn(move || { - let packet = ntp::request(&*address)?; - let dest_time = ::time::now_utc().to_timespec(); - let orig_time = Timespec::from(packet.orig_time); - let recv_time = Timespec::from(packet.recv_time); - let transmit_time = Timespec::from(packet.transmit_time); - - let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2; - - Ok(drift) - }).boxed() + type Future = future::Either< + CpuFuture, + future::FutureResult, + >; + + fn drift(&self) -> Self::Future { + use self::future::Either::{A, B}; + + let server = self.addresses.iter().find(|server| server.is_available()); + server.map(|server| { + let server = server.clone(); + A(self.pool.spawn_fn(move || { + debug!(target: "dapps", "Fetching time from {}.", server.address); + + match ntp::request(&server.address) { + Ok(packet) => { + let dest_time = ::time::now_utc().to_timespec(); + let orig_time = Timespec::from(packet.orig_time); + let recv_time = Timespec::from(packet.recv_time); + let transmit_time = Timespec::from(packet.transmit_time); + + let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2; + + server.report_success(); + Ok(drift) + }, + Err(err) => { + server.report_failure(); + Err(err.into()) + }, + } + })) + }).unwrap_or_else(|| B(future::err(Error::NoServersAvailable))) } } // NOTE In a positive scenario first results will be seen after: -// MAX_RESULTS * UPDATE_TIMEOUT_OK_SECS seconds. -const MAX_RESULTS: usize = 7; -const UPDATE_TIMEOUT_OK_SECS: u64 = 30; -const UPDATE_TIMEOUT_ERR_SECS: u64 = 2; +// MAX_RESULTS * UPDATE_TIMEOUT_INCOMPLETE_SECS seconds. +const MAX_RESULTS: usize = 4; +const UPDATE_TIMEOUT_OK_SECS: u64 = 6 * 60 * 60; +const UPDATE_TIMEOUT_WARN_SECS: u64 = 15 * 60; +const UPDATE_TIMEOUT_ERR_SECS: u64 = 60; +const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10; + +/// Maximal valid time drift. +pub const MAX_DRIFT: i64 = 500; #[derive(Debug, Clone)] /// A time checker. @@ -133,13 +203,13 @@ pub struct TimeChecker { impl TimeChecker { /// Creates new time checker given the NTP server address. - pub fn new(ntp_address: String, pool: CpuPool) -> Self { + pub fn new>(ntp_addresses: &[T], pool: CpuPool) -> Self { let last_result = Arc::new(RwLock::new( // Assume everything is ok at the very beginning. (time::Instant::now(), vec![Ok(0)].into()) )); - let ntp = SimpleNtp::new(&ntp_address, pool); + let ntp = SimpleNtp::new(ntp_addresses, pool); TimeChecker { ntp, @@ -148,22 +218,34 @@ impl TimeChecker { } } -impl TimeChecker { +impl TimeChecker where ::Future: Send + 'static { /// Updates the time pub fn update(&self) -> BoxFuture { + trace!(target: "dapps", "Updating time from NTP."); let last_result = self.last_result.clone(); - self.ntp.drift().then(move |res| { + self.ntp.drift().into_future().then(move |res| { + let res = res.map(|d| d.num_milliseconds()); + + if let Err(Error::NoServersAvailable) = res { + debug!(target: "dapps", "No NTP servers available. Selecting an older result."); + return select_result(last_result.read().1.iter()); + } + + // Update the results. let mut results = mem::replace(&mut last_result.write().1, VecDeque::new()); + let has_all_results = results.len() >= MAX_RESULTS; let valid_till = time::Instant::now() + time::Duration::from_secs( - if res.is_ok() && results.len() == MAX_RESULTS { - UPDATE_TIMEOUT_OK_SECS - } else { - UPDATE_TIMEOUT_ERR_SECS + match res { + Ok(time) if has_all_results && time < MAX_DRIFT => UPDATE_TIMEOUT_OK_SECS, + Ok(_) if has_all_results => UPDATE_TIMEOUT_WARN_SECS, + Err(_) if has_all_results => UPDATE_TIMEOUT_ERR_SECS, + _ => UPDATE_TIMEOUT_INCOMPLETE_SECS, } ); + trace!(target: "dapps", "New time drift received: {:?}", res); // Push the result. - results.push_back(res.map(|d| d.num_milliseconds())); + results.push_back(res); while results.len() > MAX_RESULTS { results.pop_front(); } @@ -208,7 +290,7 @@ mod tests { use std::cell::{Cell, RefCell}; use std::time::Instant; use time::Duration; - use futures::{self, BoxFuture, Future}; + use futures::{future, Future}; use super::{Ntp, TimeChecker, Error}; use util::RwLock; @@ -223,9 +305,11 @@ mod tests { } impl Ntp for FakeNtp { - fn drift(&self) -> BoxFuture { + type Future = future::FutureResult; + + fn drift(&self) -> Self::Future { self.1.set(self.1.get() + 1); - futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed() + future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")) } } diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 0cb7024cc15..f34c24cae60 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -130,7 +130,7 @@ impl Middleware { /// Creates new middleware for UI server. pub fn ui( - ntp_server: &str, + ntp_servers: &[String], pool: CpuPool, remote: Remote, dapps_domain: &str, @@ -146,7 +146,7 @@ impl Middleware { ).embeddable_on(None).allow_dapps(false)); let special = { let mut special = special_endpoints( - ntp_server, + ntp_servers, pool, content_fetcher.clone(), remote.clone(), @@ -171,7 +171,7 @@ impl Middleware { /// Creates new Dapps server middleware. pub fn dapps( - ntp_server: &str, + ntp_servers: &[String], pool: CpuPool, remote: Remote, ui_address: Option<(String, u16)>, @@ -203,7 +203,7 @@ impl Middleware { let special = { let mut special = special_endpoints( - ntp_server, + ntp_servers, pool, content_fetcher.clone(), remote.clone(), @@ -237,8 +237,8 @@ impl http::RequestMiddleware for Middleware { } } -fn special_endpoints( - ntp_server: &str, +fn special_endpoints>( + ntp_servers: &[T], pool: CpuPool, content_fetcher: Arc, remote: Remote, @@ -250,7 +250,7 @@ fn special_endpoints( special.insert(router::SpecialEndpoint::Api, Some(api::RestApi::new( content_fetcher, sync_status, - api::TimeChecker::new(ntp_server.into(), pool), + api::TimeChecker::new(ntp_servers, pool), remote, ))); special diff --git a/dapps/src/tests/helpers/mod.rs b/dapps/src/tests/helpers/mod.rs index 2d9d5f3410f..38dd82de65a 100644 --- a/dapps/src/tests/helpers/mod.rs +++ b/dapps/src/tests/helpers/mod.rs @@ -255,7 +255,7 @@ impl Server { fetch: F, ) -> Result { let middleware = Middleware::dapps( - "pool.ntp.org:123", + &["0.pool.ntp.org:123".into(), "1.pool.ntp.org:123".into()], CpuPool::new(4), remote, signer_address, diff --git a/parity/cli/config.toml b/parity/cli/config.toml index 0ad9e775330..459deaea040 100644 --- a/parity/cli/config.toml +++ b/parity/cli/config.toml @@ -78,7 +78,7 @@ disable_periodic = true jit = false [misc] -ntp_server = "pool.ntp.org:123" +ntp_servers = ["0.parity.pool.ntp.org:123"] logging = "own_tx=trace" log_file = "/var/log/parity.log" color = true diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 3b2b4cd4817..01339e9cdd6 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -356,8 +356,8 @@ usage! { or |c: &Config| otry!(c.vm).jit.clone(), // -- Miscellaneous Options - flag_ntp_server: String = "none", - or |c: &Config| otry!(c.misc).ntp_server.clone(), + flag_ntp_servers: String = "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123", + or |c: &Config| otry!(c.misc).ntp_servers.clone().map(|vec| vec.join(",")), flag_logging: Option = None, or |c: &Config| otry!(c.misc).logging.clone().map(Some), flag_log_file: Option = None, @@ -595,7 +595,7 @@ struct VM { #[derive(Default, Debug, PartialEq, Deserialize)] struct Misc { - ntp_server: Option, + ntp_servers: Option>, logging: Option, log_file: Option, color: Option, @@ -897,7 +897,7 @@ mod tests { flag_dapps_apis_all: None, // -- Miscellaneous Options - flag_ntp_server: "none".into(), + flag_ntp_servers: "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123".into(), flag_version: false, flag_logging: Some("own_tx=trace".into()), flag_log_file: Some("/var/log/parity.log".into()), @@ -1075,7 +1075,7 @@ mod tests { jit: Some(false), }), misc: Some(Misc { - ntp_server: Some("pool.ntp.org:123".into()), + ntp_servers: Some(vec!["0.parity.pool.ntp.org:123".into()]), logging: Some("own_tx=trace".into()), log_file: Some("/var/log/parity.log".into()), color: Some(true), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index dc4796e0513..c82fe57626c 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -470,8 +470,10 @@ Internal Options: --can-restart Executable will auto-restart if exiting with 69. Miscellaneous Options: - --ntp-server HOST NTP server to provide current time (host:port). Used to verify node health. - (default: {flag_ntp_server}) + --ntp-servers HOSTS Comma separated list of NTP servers to provide current time (host:port). + Used to verify node health. Parity uses pool.ntp.org NTP servers, + consider joining the pool: http://www.pool.ntp.org/join.html + (default: {flag_ntp_servers}) -l --logging LOGGING Specify the logging level. Must conform to the same format as RUST_LOG. (default: {flag_logging:?}) --log-file FILENAME Specify a filename into which logging should be diff --git a/parity/configuration.rs b/parity/configuration.rs index 09bac053d56..4fc29196315 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -556,10 +556,14 @@ impl Configuration { Ok(options) } + fn ntp_servers(&self) -> Vec { + self.args.flag_ntp_servers.split(",").map(str::to_owned).collect() + } + fn ui_config(&self) -> UiConfiguration { UiConfiguration { enabled: self.ui_enabled(), - ntp_server: self.args.flag_ntp_server.clone(), + ntp_servers: self.ntp_servers(), interface: self.ui_interface(), port: self.args.flag_ports_shift + self.args.flag_ui_port, hosts: self.ui_hosts(), @@ -569,7 +573,7 @@ impl Configuration { fn dapps_config(&self) -> DappsConfiguration { DappsConfiguration { enabled: self.dapps_enabled(), - ntp_server: self.args.flag_ntp_server.clone(), + ntp_servers: self.ntp_servers(), dapps_path: PathBuf::from(self.directories().dapps), extra_dapps: if self.args.cmd_dapp { self.args.arg_path.iter().map(|path| PathBuf::from(path)).collect() @@ -1272,7 +1276,12 @@ mod tests { support_token_api: true }, UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1510,10 +1519,16 @@ mod tests { let conf3 = parse(&["parity", "--ui-path", "signer", "--ui-interface", "test"]); // then + let ntp_servers = vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ]; assert_eq!(conf0.directories().signer, "signer".to_owned()); assert_eq!(conf0.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1522,7 +1537,7 @@ mod tests { assert_eq!(conf1.directories().signer, "signer".to_owned()); assert_eq!(conf1.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1532,7 +1547,7 @@ mod tests { assert_eq!(conf2.directories().signer, "signer".to_owned()); assert_eq!(conf2.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 3123, hosts: Some(vec![]), @@ -1541,7 +1556,7 @@ mod tests { assert_eq!(conf3.directories().signer, "signer".to_owned()); assert_eq!(conf3.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "test".into(), port: 8180, hosts: Some(vec![]), diff --git a/parity/dapps.rs b/parity/dapps.rs index 8d996a68b5c..4a86c68b12d 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -36,7 +36,7 @@ use util::{Bytes, Address}; #[derive(Debug, PartialEq, Clone)] pub struct Configuration { pub enabled: bool, - pub ntp_server: String, + pub ntp_servers: Vec, pub dapps_path: PathBuf, pub extra_dapps: Vec, pub extra_embed_on: Vec<(String, u16)>, @@ -47,7 +47,12 @@ impl Default for Configuration { let data_dir = default_data_path(); Configuration { enabled: true, - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], dapps_path: replace_home(&data_dir, "$BASE/dapps").into(), extra_dapps: vec![], extra_embed_on: vec![], @@ -158,7 +163,7 @@ pub fn new(configuration: Configuration, deps: Dependencies) -> Result Result Result, String> { +pub fn new_ui(enabled: bool, ntp_servers: &[String], deps: Dependencies) -> Result, String> { if !enabled { return Ok(None); } server::ui_middleware( deps, - ntp_server, + ntp_servers, rpc::DAPPS_DOMAIN, ).map(Some) } @@ -201,7 +206,7 @@ mod server { pub fn dapps_middleware( _deps: Dependencies, - _ntp_server: &str, + _ntp_servers: &[String], _dapps_path: PathBuf, _extra_dapps: Vec, _dapps_domain: &str, @@ -212,7 +217,7 @@ mod server { pub fn ui_middleware( _deps: Dependencies, - _ntp_server: &str, + _ntp_servers: &[String], _dapps_domain: &str, ) -> Result { Err("Your Parity version has been compiled without UI support.".into()) @@ -238,7 +243,7 @@ mod server { pub fn dapps_middleware( deps: Dependencies, - ntp_server: &str, + ntp_servers: &[String], dapps_path: PathBuf, extra_dapps: Vec, dapps_domain: &str, @@ -249,7 +254,7 @@ mod server { let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token)); Ok(parity_dapps::Middleware::dapps( - ntp_server, + ntp_servers, deps.pool, parity_remote, deps.ui_address, @@ -266,12 +271,12 @@ mod server { pub fn ui_middleware( deps: Dependencies, - ntp_server: &str, + ntp_servers: &[String], dapps_domain: &str, ) -> Result { let parity_remote = parity_reactor::Remote::new(deps.remote.clone()); Ok(parity_dapps::Middleware::ui( - ntp_server, + ntp_servers, deps.pool, parity_remote, dapps_domain, diff --git a/parity/rpc.rs b/parity/rpc.rs index de501c5a552..f0d96b31c19 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -74,7 +74,7 @@ impl Default for HttpConfiguration { #[derive(Debug, PartialEq, Clone)] pub struct UiConfiguration { pub enabled: bool, - pub ntp_server: String, + pub ntp_servers: Vec, pub interface: String, pub port: u16, pub hosts: Option>, @@ -108,7 +108,12 @@ impl Default for UiConfiguration { fn default() -> Self { UiConfiguration { enabled: true && cfg!(feature = "ui-enabled"), - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], port: 8180, interface: "127.0.0.1".into(), hosts: Some(vec![]), diff --git a/parity/run.rs b/parity/run.rs index c28f8e1ba1f..00db46b37ea 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -298,7 +298,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> }; let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?; - let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?; + let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?; // start RPCs let dapps_service = dapps::service(&dapps_middleware); @@ -660,7 +660,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R } }; let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?; - let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?; + let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?; let dapps_service = dapps::service(&dapps_middleware); let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies {