diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index 2c2e1cfaedcff..f20b7e2b5e688 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -25,7 +25,7 @@ use runtime_primitives::bft::Justification; /// Local client abstraction for the network. pub trait Client: Send + Sync { /// Import a new block. Parent is supposed to be existing in the blockchain. - fn import(&self, is_best: bool, header: Block::Header, justification: Justification, body: Option>) -> Result; + fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification, body: Option>) -> Result; /// Get blockchain info. fn info(&self) -> Result, Error>; @@ -54,10 +54,9 @@ impl Client for SubstrateClient where E: CallExecutor + Send + Sync + 'static, Block: BlockT, { - fn import(&self, is_best: bool, header: Block::Header, justification: Justification, body: Option>) -> Result { + fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification, body: Option>) -> Result { // TODO: defer justification check. let justified_header = self.check_justification(header, justification.into())?; - let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; (self as &SubstrateClient).import_block(origin, justified_header, body) } diff --git a/substrate/network/src/error.rs b/substrate/network/src/error.rs index e5c860cf9a8d0..fcf881c68d0a9 100644 --- a/substrate/network/src/error.rs +++ b/substrate/network/src/error.rs @@ -16,12 +16,14 @@ //! Polkadot service possible errors. +use std::io::Error as IoError; use network_libp2p::Error as NetworkError; use client; error_chain! { foreign_links { Network(NetworkError) #[doc = "Devp2p error."]; + Io(IoError) #[doc = "IO error."]; } links { diff --git a/substrate/network/src/import_queue.rs b/substrate/network/src/import_queue.rs new file mode 100644 index 0000000000000..9ffb83218b6cc --- /dev/null +++ b/substrate/network/src/import_queue.rs @@ -0,0 +1,566 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! Blocks import queue. + +use std::collections::{HashSet, VecDeque}; +use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicBool, Ordering}; +use parking_lot::{Condvar, Mutex, RwLock}; + +use client::{BlockOrigin, BlockStatus, ImportResult}; +use network_libp2p::PeerId; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; + +use blocks::BlockData; +use chain::Client; +use error::{ErrorKind, Error}; +use protocol::Context; +use service::ExecuteInContext; +use sync::ChainSync; + +/// Blocks import queue API. +pub trait ImportQueue: Send + Sync { + /// Clear the queue when sync is restarting. + fn clear(&self); + /// Get queue status. + fn status(&self) -> ImportQueueStatus; + /// Is block with given hash is currently in the queue. + fn is_importing(&self, hash: &B::Hash) -> bool; + /// Import bunch of blocks. + fn import_blocks(&self, sync: &mut ChainSync, protocol: &mut Context, blocks: (BlockOrigin, Vec>)); +} + +/// Import queue status. It isn't completely accurate. +pub struct ImportQueueStatus { + /// Number of blocks that are currently in the queue. + pub importing_count: usize, + /// The number of the best block that was ever in the queue since start/last failure. + pub best_importing_number: <::Header as HeaderT>::Number, +} + +/// Blocks import queue that is importing blocks in the separate thread. +pub struct AsyncImportQueue { + handle: Mutex>>, + data: Arc>, +} + +/// Locks order: queue, queue_blocks, best_importing_number +struct AsyncImportQueueData { + signal: Condvar, + queue: Mutex>)>>, + queue_blocks: RwLock>, + best_importing_number: RwLock<<::Header as HeaderT>::Number>, + is_stopping: AtomicBool, +} + +impl AsyncImportQueue { + pub fn new() -> Self { + Self { + handle: Mutex::new(None), + data: Arc::new(AsyncImportQueueData::new()), + } + } + + pub fn start>(&self, sync: Weak>>, service: Weak, chain: Weak>) -> Result<(), Error> { + debug_assert!(self.handle.lock().is_none()); + + let qdata = self.data.clone(); + *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { + import_thread(sync, service, chain, qdata) + }).map_err(|err| Error::from(ErrorKind::Io(err)))?); + Ok(()) + } +} + +impl AsyncImportQueueData { + pub fn new() -> Self { + Self { + signal: Default::default(), + queue: Mutex::new(VecDeque::new()), + queue_blocks: RwLock::new(HashSet::new()), + best_importing_number: RwLock::new(Zero::zero()), + is_stopping: Default::default(), + } + } +} + +impl ImportQueue for AsyncImportQueue { + fn clear(&self) { + let mut queue = self.data.queue.lock(); + let mut queue_blocks = self.data.queue_blocks.write(); + let mut best_importing_number = self.data.best_importing_number.write(); + queue_blocks.clear(); + queue.clear(); + *best_importing_number = Zero::zero(); + } + + fn status(&self) -> ImportQueueStatus { + ImportQueueStatus { + importing_count: self.data.queue_blocks.read().len(), + best_importing_number: *self.data.best_importing_number.read(), + } + } + + fn is_importing(&self, hash: &B::Hash) -> bool { + self.data.queue_blocks.read().contains(hash) + } + + fn import_blocks(&self, _sync: &mut ChainSync, _protocol: &mut Context, blocks: (BlockOrigin, Vec>)) { + trace!(target:"sync", "Scheduling {} blocks for import", blocks.1.len()); + + let mut queue = self.data.queue.lock(); + let mut queue_blocks = self.data.queue_blocks.write(); + let mut best_importing_number = self.data.best_importing_number.write(); + let new_best_importing_number = blocks.1.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero()); + queue_blocks.extend(blocks.1.iter().map(|b| b.block.hash.clone())); + if new_best_importing_number > *best_importing_number { + *best_importing_number = new_best_importing_number; + } + queue.push_back(blocks); + self.data.signal.notify_one(); + } +} + +impl Drop for AsyncImportQueue { + fn drop(&mut self) { + if let Some(handle) = self.handle.lock().take() { + self.data.is_stopping.store(true, Ordering::SeqCst); + let _ = handle.join(); + } + } +} + +/// Blocks import thread. +fn import_thread>(sync: Weak>>, service: Weak, chain: Weak>, qdata: Arc>) { + trace!(target: "sync", "Starting import thread"); + + loop { + let new_blocks = { + let mut queue_lock = qdata.queue.lock(); + if queue_lock.is_empty() { + qdata.signal.wait(&mut queue_lock); + } + + match queue_lock.pop_front() { + Some(new_blocks) => new_blocks, + None => break, + } + }; + if qdata.is_stopping.load(Ordering::SeqCst) { + break; + } + + match (sync.upgrade(), service.upgrade(), chain.upgrade()) { + (Some(sync), Some(service), Some(chain)) => { + let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); + if !import_many_blocks(&mut SyncLink::Indirect(&sync, &*chain, &*service), Some(&*qdata), new_blocks) { + break; + } + + let mut queue_blocks = qdata.queue_blocks.write(); + for blocks_hash in blocks_hashes { + queue_blocks.remove(&blocks_hash); + } + }, + _ => break, + } + } + + trace!(target: "sync", "Stopping import thread"); +} + +/// ChainSync link trait. +trait SyncLinkApi { + /// Get chain reference. + fn chain(&self) -> &Client; + /// Block imported. + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor); + /// Maintain sync. + fn maintain_sync(&mut self); + /// Disconnect from peer. + fn disconnect(&mut self, peer_id: PeerId); + /// Disconnect from peer and restart sync. + fn disconnect_and_restart(&mut self, peer_id: PeerId); + /// Restart sync. + fn restart(&mut self); +} + +/// Link with the ChainSync service. +enum SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext> { + /// Indirect link (through service). + Indirect(&'a RwLock>, &'a Client, &'a E), + /// Direct references are given. + #[cfg(test)] + Direct(&'a mut ChainSync, &'a mut Context), +} + +/// Block import successful result. +#[derive(Debug, PartialEq)] +enum BlockImportResult { + /// Block is not imported. + NotImported(H, N), + /// Imported known block. + ImportedKnown(H, N), + /// Imported unknown block. + ImportedUnknown(H, N), +} + +/// Block import error. +#[derive(Debug, PartialEq)] +enum BlockImportError { + /// Disconnect from peer and continue import of next bunch of blocks. + Disconnect(PeerId), + /// Disconnect from peer and restart sync. + DisconnectAndRestart(PeerId), + /// Restart sync. + Restart, +} + +/// Import a bunch of blocks. +fn import_many_blocks<'a, B: BlockT>( + link: &mut SyncLinkApi, + qdata: Option<&AsyncImportQueueData>, + blocks: (BlockOrigin, Vec>) +) -> bool +{ + let (blocks_origin, blocks) = blocks; + let count = blocks.len(); + let mut imported = 0; + + // Blocks in the response/drain should be in ascending order. + for block in blocks { + let import_result = import_single_block(link.chain(), blocks_origin.clone(), block); + let is_import_failed = import_result.is_err(); + imported += process_import_result(link, import_result); + if is_import_failed { + qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero()); + return true; + } + + if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() { + return false; + } + } + + trace!(target: "sync", "Imported {} of {}", imported, count); + link.maintain_sync(); + true +} + +/// Single block import function. +fn import_single_block( + chain: &Client, + block_origin: BlockOrigin, + block: BlockData +) -> Result::Header as HeaderT>::Number>, BlockImportError> +{ + let origin = block.origin; + let block = block.block; + match (block.header, block.justification) { + (Some(header), Some(justification)) => { + let number = header.number().clone(); + let hash = header.hash(); + let parent = header.parent_hash().clone(); + + // check whether the block is known before importing. + match chain.block_status(&BlockId::Hash(hash)) { + Ok(BlockStatus::InChain) => return Ok(BlockImportResult::NotImported(hash, number)), + Ok(_) => {}, + Err(e) => { + debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); + return Err(BlockImportError::Restart); + } + } + + let result = chain.import( + block_origin, + header, + justification, + block.body, + ); + match result { + Ok(ImportResult::AlreadyInChain) => { + trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedKnown(hash, number)) + }, + Ok(ImportResult::AlreadyQueued) => { + trace!(target: "sync", "Block already queued {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedKnown(hash, number)) + }, + Ok(ImportResult::Queued) => { + trace!(target: "sync", "Block queued {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedUnknown(hash, number)) + }, + Ok(ImportResult::UnknownParent) => { + debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); + Err(BlockImportError::Restart) + }, + Ok(ImportResult::KnownBad) => { + debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash); + Err(BlockImportError::DisconnectAndRestart(origin)) //TODO: use persistent ID + } + Err(e) => { + debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); + Err(BlockImportError::Restart) + } + } + }, + (None, _) => { + debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin); + Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID + }, + (_, None) => { + debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin); + Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID + } + } +} + +/// Process single block import result. +fn process_import_result<'a, B: BlockT>( + link: &mut SyncLinkApi, + result: Result::Header as HeaderT>::Number>, BlockImportError> +) -> usize +{ + match result { + Ok(BlockImportResult::NotImported(_, _)) => 0, + Ok(BlockImportResult::ImportedKnown(hash, number)) => { + link.block_imported(&hash, number); + 0 + }, + Ok(BlockImportResult::ImportedUnknown(hash, number)) => { + link.block_imported(&hash, number); + 1 + }, + Err(BlockImportError::Disconnect(peer_id)) => { + link.disconnect(peer_id); + 0 + }, + Err(BlockImportError::DisconnectAndRestart(peer_id)) => { + link.disconnect_and_restart(peer_id); + 0 + }, + Err(BlockImportError::Restart) => { + link.restart(); + 0 + }, + } +} + +impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext> SyncLink<'a, B, E> { + /// Execute closure with locked ChainSync. + fn with_sync, &mut Context)>(&mut self, closure: F) { + match *self { + #[cfg(test)] + SyncLink::Direct(ref mut sync, ref mut protocol) => + closure(*sync, *protocol), + SyncLink::Indirect(ref sync, _, ref service) => + service.execute_in_context(move |protocol| { + let mut sync = sync.write(); + closure(&mut *sync, protocol) + }), + } + } +} + +impl<'a, B: 'static + BlockT, E: ExecuteInContext> SyncLinkApi for SyncLink<'a, B, E> { + fn chain(&self) -> &Client { + match *self { + #[cfg(test)] + SyncLink::Direct(_, ref protocol) => protocol.client(), + SyncLink::Indirect(_, ref chain, _) => *chain, + } + } + + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + self.with_sync(|sync, _| sync.block_imported(&hash, number)) + } + + fn maintain_sync(&mut self) { + self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) + } + + fn disconnect(&mut self, peer_id: PeerId) { + self.with_sync(|_, protocol| protocol.disconnect_peer(peer_id)) + } + + fn disconnect_and_restart(&mut self, peer_id: PeerId) { + self.with_sync(|sync, protocol| { + protocol.disconnect_peer(peer_id); + sync.restart(protocol); + }) + } + + fn restart(&mut self) { + self.with_sync(|sync, protocol| sync.restart(protocol)) + } +} + +#[cfg(test)] +pub mod tests { + use client; + use message; + use test_client::{self, TestClient}; + use test_client::runtime::{Block, Hash}; + use super::*; + + /// Blocks import queue that is importing blocks in the same thread. + pub struct SyncImportQueue; + struct DummyExecuteInContext; + + impl ExecuteInContext for DummyExecuteInContext { + fn execute_in_context)>(&self, _closure: F) { } + } + + impl ImportQueue for SyncImportQueue { + fn clear(&self) { } + + fn status(&self) -> ImportQueueStatus { + ImportQueueStatus { + importing_count: 0, + best_importing_number: Zero::zero(), + } + } + + fn is_importing(&self, _hash: &B::Hash) -> bool { + false + } + + fn import_blocks(&self, sync: &mut ChainSync, protocol: &mut Context, blocks: (BlockOrigin, Vec>)) { + import_many_blocks(&mut SyncLink::Direct::<_, DummyExecuteInContext>(sync, protocol), None, blocks); + } + } + + struct TestLink { + chain: Arc>, + imported: usize, + maintains: usize, + disconnects: usize, + restarts: usize, + } + + impl TestLink { + fn new() -> TestLink { + TestLink { + chain: Arc::new(test_client::new()), + imported: 0, + maintains: 0, + disconnects: 0, + restarts: 0, + } + } + + fn total(&self) -> usize { + self.imported + self.maintains + self.disconnects + self.restarts + } + } + + impl SyncLinkApi for TestLink { + fn chain(&self) -> &Client { &*self.chain } + fn block_imported(&mut self, _hash: &Hash, _number: NumberFor) { self.imported += 1; } + fn maintain_sync(&mut self) { self.maintains += 1; } + fn disconnect(&mut self, _peer_id: PeerId) { self.disconnects += 1; } + fn disconnect_and_restart(&mut self, _peer_id: PeerId) { self.disconnects += 1; self.restarts += 1; } + fn restart(&mut self) { self.restarts += 1; } + } + + fn prepare_good_block() -> (client::Client, Hash, u64, BlockData) { + let client = test_client::new(); + let block = client.new_block().unwrap().bake().unwrap(); + client.justify_and_import(BlockOrigin::File, block).unwrap(); + + let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1); + let block = message::BlockData:: { + hash: client.block_hash(1).unwrap().unwrap(), + header: client.header(&BlockId::Number(1)).unwrap(), + body: None, + receipt: None, + message_queue: None, + justification: client.justification(&BlockId::Number(1)).unwrap(), + }; + + (client, hash, number, BlockData { block, origin: 0 }) + } + + #[test] + fn import_single_good_block_works() { + let (_, hash, number, block) = prepare_good_block(); + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Ok(BlockImportResult::ImportedUnknown(hash, number))); + } + + #[test] + fn import_single_good_known_block_is_ignored() { + let (client, hash, number, block) = prepare_good_block(); + assert_eq!(import_single_block(&client, BlockOrigin::File, block), Ok(BlockImportResult::NotImported(hash, number))); + } + + #[test] + fn import_single_good_block_without_header_fails() { + let (_, _, _, mut block) = prepare_good_block(); + block.block.header = None; + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0))); + } + + #[test] + fn import_single_good_block_without_justification_fails() { + let (_, _, _, mut block) = prepare_good_block(); + block.block.justification = None; + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0))); + } + + #[test] + fn process_import_result_works() { + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::NotImported(Default::default(), 0))), 0); + assert_eq!(link.total(), 0); + + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.imported, 1); + + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); + assert_eq!(link.total(), 1); + assert_eq!(link.imported, 1); + + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::Disconnect(0))), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.disconnects, 1); + + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::DisconnectAndRestart(0))), 0); + assert_eq!(link.total(), 2); + assert_eq!(link.disconnects, 1); + assert_eq!(link.restarts, 1); + + let mut link = TestLink::new(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::Restart)), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.restarts, 1); + } + + #[test] + fn import_many_blocks_stops_when_stopping() { + let (_, _, _, block) = prepare_good_block(); + let qdata = AsyncImportQueueData::new(); + qdata.is_stopping.store(true, Ordering::SeqCst); + assert!(!import_many_blocks(&mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block]))); + } +} diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 9319918db15d7..2bb025a835113 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -46,6 +46,7 @@ mod config; mod chain; mod blocks; mod on_demand; +mod import_queue; pub mod consensus_gossip; pub mod error; pub mod message; diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index 341dccaa2a2b6..e7e6addc39d9f 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -506,7 +506,7 @@ pub mod generic { 3 => Some(Message::BlockAnnounce(Decode::decode(input)?)), 4 => Some(Message::Transactions(Decode::decode(input)?)), 5 => Some(Message::BftMessage(Decode::decode(input)?)), - 6 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), + 6 => Some(Message::RemoteCallRequest(Decode::decode(input)?)), 7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), 255 => Some(Message::ChainSpecific(Decode::decode(input)?)), _ => None, diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index ff35f6a190def..597834090e0c3 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -29,6 +29,7 @@ use message::generic::Message as GenericMessage; use specialization::Specialization; use sync::{ChainSync, Status as SyncStatus, SyncState}; use service::{Roles, TransactionPool}; +use import_queue::ImportQueue; use config::ProtocolConfig; use chain::Client; use on_demand::OnDemandService; @@ -51,7 +52,7 @@ pub struct Protocol> { config: ProtocolConfig, on_demand: Option>>, genesis_hash: B::Hash, - sync: RwLock>, + sync: Arc>>, specialization: RwLock, context_data: ContextData, // Connected peers pending Status message. @@ -196,12 +197,13 @@ impl> Protocol { pub fn new( config: ProtocolConfig, chain: Arc>, + import_queue: Arc>, on_demand: Option>>, transaction_pool: Arc>, specialization: S, ) -> error::Result { let info = chain.info()?; - let sync = ChainSync::new(config.roles, &info); + let sync = ChainSync::new(config.roles, &info, import_queue); let protocol = Protocol { config: config, context_data: ContextData { @@ -210,7 +212,7 @@ impl> Protocol { }, on_demand, genesis_hash: info.chain.genesis_hash, - sync: RwLock::new(sync), + sync: Arc::new(RwLock::new(sync)), specialization: RwLock::new(specialization), handshaking_peers: RwLock::new(HashMap::new()), transaction_pool: transaction_pool, @@ -222,6 +224,10 @@ impl> Protocol { &self.context_data } + pub(crate) fn sync(&self) -> &Arc>> { + &self.sync + } + /// Returns protocol status pub fn status(&self) -> ProtocolStatus { let sync = self.sync.read(); diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index ea990a304a58c..788c80ea5e6a0 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -31,6 +31,7 @@ use chain::Client; use message::LocalizedBftMessage; use specialization::Specialization; use on_demand::OnDemandService; +use import_queue::AsyncImportQueue; use runtime_primitives::traits::{Block as BlockT}; /// Type that represents fetch completion future. @@ -147,21 +148,30 @@ pub struct Service> { impl> Service { /// Creates and register protocol with the network service pub fn new(params: Params, protocol_id: ProtocolId) -> Result>, Error> { + let chain = params.chain.clone(); let service = NetworkService::new(params.network_config.clone(), None)?; + let import_queue = Arc::new(AsyncImportQueue::new()); let sync = Arc::new(Service { network: service, protocol_id, handler: Arc::new(ProtocolHandler { protocol: Protocol::new( params.config, - params.chain.clone(), + params.chain, + import_queue.clone(), params.on_demand, params.transaction_pool, - params.specialization + params.specialization, )?, }), }); + import_queue.start( + Arc::downgrade(sync.handler.protocol.sync()), + Arc::downgrade(&sync), + Arc::downgrade(&chain) + )?; + Ok(sync) } diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index a54b920a39693..14f3ee78ea46c 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -15,17 +15,22 @@ // along with Polkadot. If not, see .? use std::collections::HashMap; +use std::sync::Arc; use protocol::Context; use network_libp2p::PeerId; -use client::{ImportResult, BlockStatus, ClientInfo}; +use client::{BlockStatus, BlockOrigin, ClientInfo}; +use client::error::Error as ClientError; use blocks::{self, BlockCollection}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; use service::Roles; +use import_queue::ImportQueue; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; +// Maximum blocks to store in the import queue. +const MAX_IMPORING_BLOCKS: usize = 2048; struct PeerSync { pub common_hash: B::Hash, @@ -51,6 +56,7 @@ pub struct ChainSync { best_queued_number: NumberFor, best_queued_hash: B::Hash, required_block_attributes: message::BlockAttributes, + import_queue: Arc>, } /// Reported sync state. @@ -73,7 +79,7 @@ pub struct Status { impl ChainSync { /// Create a new instance. - pub(crate) fn new(role: Roles, info: &ClientInfo) -> Self { + pub(crate) fn new(role: Roles, info: &ClientInfo, import_queue: Arc>) -> Self { let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; if role.intersects(Roles::FULL) { required_block_attributes |= message::BlockAttributes::BODY; @@ -86,6 +92,7 @@ impl ChainSync { best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), required_block_attributes, + import_queue, } } @@ -109,7 +116,7 @@ impl ChainSync { /// Handle new connected peer. pub(crate) fn new_peer(&mut self, protocol: &mut Context, peer_id: PeerId) { if let Some(info) = protocol.peer_info(peer_id) { - match (protocol.client().block_status(&BlockId::Hash(info.best_hash)), info.best_number) { + match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); protocol.disconnect_peer(peer_id); @@ -160,8 +167,6 @@ impl ChainSync { } pub(crate) fn on_block_data(&mut self, protocol: &mut Context, peer_id: PeerId, _request: message::BlockRequest, response: message::BlockResponse) { - let count = response.blocks.len(); - let mut imported: usize = 0; let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { match peer.state { PeerSyncState::DownloadingNew(start_block) => { @@ -225,87 +230,20 @@ impl ChainSync { }; let best_seen = self.best_seen_block(); - // Blocks in the response/drain should be in ascending order. - for block in new_blocks { - let origin = block.origin; - let block = block.block; - match (block.header, block.justification) { - (Some(header), Some(justification)) => { - let number = header.number().clone(); - let hash = header.hash(); - let parent = header.parent_hash().clone(); - let is_best = best_seen.as_ref().map_or(false, |n| number >= *n); - - // check whether the block is known before importing. - match protocol.client().block_status(&BlockId::Hash(hash)) { - Ok(BlockStatus::InChain) => continue, - Ok(_) => {}, - Err(e) => { - debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - self.restart(protocol); - return; - } - } - - let result = protocol.client().import( - is_best, - header, - justification, - block.body, - ); - match result { - Ok(ImportResult::AlreadyInChain) => { - trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::Queued) => { - trace!(target: "sync", "Block queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - imported = imported + 1; - }, - Ok(ImportResult::UnknownParent) => { - debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); - self.restart(protocol); - return; - }, - Ok(ImportResult::KnownBad) => { - protocol.disable_peer(origin, &format!("Peer gave us a bad block {}: {:?}", number, hash)); //TODO: use persistent ID - self.restart(protocol); - return; - } - Err(e) => { - debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - self.restart(protocol); - return; - } - } - }, - (None, _) => { - protocol.disable_peer(origin, &format!("Header {} was not provided by peer ", block.hash)); //TODO: use persistent ID - return; - }, - (_, None) => { - protocol.disable_peer(origin, &format!("Justification set for block {} was not provided by peer", block.hash)); //TODO: use persistent ID - return; - } - } - } - trace!(target: "sync", "Imported {} of {}", imported, count); - self.maintain_sync(protocol); + let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n)); + let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; + let import_queue = self.import_queue.clone(); + import_queue.import_blocks(self, protocol, (origin, new_blocks)) } - fn maintain_sync(&mut self, protocol: &mut Context) { + pub fn maintain_sync(&mut self, protocol: &mut Context) { let peers: Vec = self.peers.keys().map(|p| *p).collect(); for peer in peers { self.download_new(protocol, peer); } } - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; @@ -359,7 +297,7 @@ impl ChainSync { fn is_known_or_already_downloading(&self, protocol: &mut Context, hash: &B::Hash) -> bool { self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - || protocol.client().block_status(&BlockId::Hash(*hash)).ok().map_or(false, |s| s != BlockStatus::Unknown) + || block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, peer_id: PeerId) { @@ -369,6 +307,7 @@ impl ChainSync { } pub(crate) fn restart(&mut self, protocol: &mut Context) { + self.import_queue.clear(); self.blocks.clear(); let ids: Vec = self.peers.keys().map(|p| *p).collect(); for id in ids { @@ -416,10 +355,18 @@ impl ChainSync { // Issue a request for a peer to download new blocks, if any are available fn download_new(&mut self, protocol: &mut Context, peer_id: PeerId) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, peer.common_number, peer.best_number); + let import_status = self.import_queue.status(); + // when there are too many blocks in the queue => do not try to download new blocks + if import_status.importing_count > MAX_IMPORING_BLOCKS { + return; + } + // we should not download already queued blocks + let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number); + + trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, common_number, peer.best_number); match peer.state { PeerSyncState::Available => { - if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) { trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end); let request = message::generic::BlockRequest { id: 0, @@ -453,3 +400,16 @@ impl ChainSync { protocol.send_message(peer_id, GenericMessage::BlockRequest(request)); } } + +/// Get block status, taking into account import queue. +fn block_status( + chain: &::chain::Client, + queue: &ImportQueue, + hash: B::Hash) -> Result +{ + if queue.is_importing(&hash) { + return Ok(BlockStatus::Queued); + } + + chain.block_status(&BlockId::Hash(hash)) +} diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index b96332ff45e03..40a4ad61d10b4 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -31,6 +31,7 @@ use service::TransactionPool; use network_libp2p::{PeerId, SessionInfo, Error as NetworkError}; use keyring::Keyring; use codec::Encode; +use import_queue::tests::SyncImportQueue; use test_client::{self, TestClient}; use test_client::runtime::{Block, Hash, Transfer, Extrinsic}; use specialization::Specialization; @@ -247,7 +248,8 @@ impl TestNet { pub fn add_peer(&mut self, config: &ProtocolConfig) { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); - let sync = Protocol::new(config.clone(), client.clone(), None, tx_pool, DummySpecialization).unwrap(); + let import_queue = Arc::new(SyncImportQueue); + let sync = Protocol::new(config.clone(), client.clone(), import_queue, None, tx_pool, DummySpecialization).unwrap(); self.peers.push(Arc::new(Peer { sync: sync, client: client,