From e9c602bb4bd4b9f0ce019f9bf1223f5f87d72943 Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Wed, 16 May 2018 22:01:55 +0200 Subject: [PATCH] Resumable warp-sync / Seed downloaded snapshots (#8544) * Start dividing sync chain : first supplier method * WIP - updated chain sync supplier * Finish refactoring the Chain Sync Supplier * Create Chain Sync Requester * Add Propagator for Chain Sync * Add the Chain Sync Handler * Move tests from mod -> handler * Move tests to propagator * Refactor SyncRequester arguments * Refactoring peer fork header handler * Fix wrong highest block number in snapshot sync * Small refactor... * Resume warp-sync downloaded chunks * Add comments * Refactoring the previous chunks import * Fix tests * Address PR grumbles * Fix not seeding current snapshot * Address PR Grumbles * Address PR grumble * Retry failed CI job * Update SnapshotService readiness check Fix restoration locking issue for previous chunks restoration * Fix tests * Fix tests * Fix test * Early abort importing previous chunks * PR Grumbles * Update Gitlab CI config * SyncState back to Waiting when Manifest peers disconnect * Move fix * Better fix * Revert GitLab CI changes * Fix Warning * Refactor resuming snapshots * Fix string construction * Revert "Refactor resuming snapshots" This reverts commit 75fd4b553a38e4a49dc5d6a878c70e830ff382eb. * Update informant log * Fix string construction * Refactor resuming snapshots * Fix informant * PR Grumbles * Update informant message : show chunks done * PR Grumbles * Fix * Fix Warning * PR Grumbles --- ethcore/src/snapshot/service.rs | 165 ++++++++++++++++--- ethcore/src/snapshot/tests/service.rs | 8 +- ethcore/src/snapshot/traits.rs | 3 + ethcore/sync/src/chain/handler.rs | 52 ++++-- ethcore/sync/src/chain/mod.rs | 104 +++++++----- ethcore/sync/src/chain/supplier.rs | 26 +-- ethcore/sync/src/snapshot.rs | 71 ++++++-- ethcore/sync/src/tests/snapshot.rs | 4 + ethcore/types/src/restoration_status.rs | 5 + parity/informant.rs | 23 ++- parity/snapshot.rs | 1 + rpc/src/v1/tests/helpers/snapshot_service.rs | 1 + 12 files changed, 361 insertions(+), 102 deletions(-) diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index c5b9123e10f..17c362e0440 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -17,8 +17,8 @@ //! Snapshot network service implementation. use std::collections::HashSet; -use std::io::ErrorKind; -use std::fs; +use std::io::{self, Read, ErrorKind}; +use std::fs::{self, File}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -30,6 +30,7 @@ use blockchain::BlockChain; use client::{Client, ChainInfo, ClientIoMessage}; use engines::EthEngine; use error::Error; +use hash::keccak; use ids::BlockId; use io::IoChannel; @@ -270,8 +271,8 @@ impl Service { } } - // delete the temporary restoration dir if it does exist. - if let Err(e) = fs::remove_dir_all(service.restoration_dir()) { + // delete the temporary restoration DB dir if it does exist. + if let Err(e) = fs::remove_dir_all(service.restoration_db()) { if e.kind() != ErrorKind::NotFound { return Err(e.into()) } @@ -325,6 +326,13 @@ impl Service { dir } + // previous snapshot chunks path. + fn prev_chunks_dir(&self) -> PathBuf { + let mut dir = self.snapshot_root.clone(); + dir.push("prev_chunks"); + dir + } + // replace one the client's database with our own. fn replace_client_db(&self) -> Result<(), Error> { let our_db = self.restoration_db(); @@ -406,9 +414,26 @@ impl Service { /// Initialize the restoration synchronously. /// The recover flag indicates whether to recover the restored snapshot. pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { + let mut res = self.restoration.lock(); + let rest_dir = self.restoration_dir(); + let rest_db = self.restoration_db(); + let recovery_temp = self.temp_recovery_dir(); + let prev_chunks = self.prev_chunks_dir(); - let mut res = self.restoration.lock(); + // delete and restore the restoration dir. + if let Err(e) = fs::remove_dir_all(&prev_chunks) { + match e.kind() { + ErrorKind::NotFound => {}, + _ => return Err(e.into()), + } + } + + // Move the previous recovery temp directory + // to `prev_chunks` to be able to restart restoring + // with previously downloaded blocks + // This step is optional, so don't fail on error + fs::rename(&recovery_temp, &prev_chunks).ok(); self.state_chunks.store(0, Ordering::SeqCst); self.block_chunks.store(0, Ordering::SeqCst); @@ -424,29 +449,38 @@ impl Service { } } + *self.status.lock() = RestorationStatus::Initializing { + chunks_done: 0, + }; + fs::create_dir_all(&rest_dir)?; // make new restoration. let writer = match recover { - true => Some(LooseWriter::new(self.temp_recovery_dir())?), + true => Some(LooseWriter::new(recovery_temp)?), false => None }; let params = RestorationParams { - manifest: manifest, + manifest: manifest.clone(), pruning: self.pruning, - db: self.restoration_db_handler.open(&self.restoration_db())?, + db: self.restoration_db_handler.open(&rest_db)?, writer: writer, genesis: &self.genesis_block, - guard: Guard::new(rest_dir), + guard: Guard::new(rest_db), engine: &*self.engine, }; - let state_chunks = params.manifest.state_hashes.len(); - let block_chunks = params.manifest.block_hashes.len(); + let state_chunks = manifest.state_hashes.len(); + let block_chunks = manifest.block_hashes.len(); *res = Some(Restoration::new(params)?); + self.restoring_snapshot.store(true, Ordering::SeqCst); + + // Import previous chunks, continue if it fails + self.import_prev_chunks(&mut res, manifest).ok(); + *self.status.lock() = RestorationStatus::Ongoing { state_chunks: state_chunks as u32, block_chunks: block_chunks as u32, @@ -454,10 +488,65 @@ impl Service { block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, }; - self.restoring_snapshot.store(true, Ordering::SeqCst); Ok(()) } + /// Import the previous chunks into the current restoration + fn import_prev_chunks(&self, restoration: &mut Option, manifest: ManifestData) -> Result<(), Error> { + let prev_chunks = self.prev_chunks_dir(); + + // Restore previous snapshot chunks + let files = fs::read_dir(prev_chunks.as_path())?; + let mut num_temp_chunks = 0; + + for prev_chunk_file in files { + // Don't go over all the files if the restoration has been aborted + if !self.restoring_snapshot.load(Ordering::SeqCst) { + trace!(target:"snapshot", "Aborting importing previous chunks"); + return Ok(()); + } + // Import the chunk, don't fail and continue if one fails + match self.import_prev_chunk(restoration, &manifest, prev_chunk_file) { + Ok(true) => num_temp_chunks += 1, + Err(e) => trace!(target: "snapshot", "Error importing chunk: {:?}", e), + _ => (), + } + } + + trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks); + + // Remove the prev temp directory + fs::remove_dir_all(&prev_chunks)?; + + Ok(()) + } + + /// Import a previous chunk at the given path. Returns whether the block was imported or not + fn import_prev_chunk(&self, restoration: &mut Option, manifest: &ManifestData, file: io::Result) -> Result { + let file = file?; + let path = file.path(); + + let mut file = File::open(path.clone())?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + let hash = keccak(&buffer); + + let is_state = if manifest.block_hashes.contains(&hash) { + false + } else if manifest.state_hashes.contains(&hash) { + true + } else { + return Ok(false); + }; + + self.feed_chunk_with_restoration(restoration, hash, &buffer, is_state)?; + + trace!(target: "snapshot", "Fed chunk {:?}", hash); + + Ok(true) + } + // finalize the restoration. this accepts an already-locked // restoration as an argument -- so acquiring it again _will_ // lead to deadlock. @@ -499,12 +588,19 @@ impl Service { /// Feed a chunk of either kind. no-op if no restoration or status is wrong. fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { // TODO: be able to process block chunks and state chunks at same time? - let (result, db) = { - let mut restoration = self.restoration.lock(); + let mut restoration = self.restoration.lock(); + self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state) + } + /// Feed a chunk with the Restoration + fn feed_chunk_with_restoration(&self, restoration: &mut Option, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { + let (result, db) = { match self.status() { - RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()), - RestorationStatus::Ongoing { .. } => { + RestorationStatus::Inactive | RestorationStatus::Failed => { + trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash); + return Ok(()); + }, + RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => { let (res, db) = { let rest = match *restoration { Some(ref mut r) => r, @@ -583,11 +679,41 @@ impl SnapshotService for Service { self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) } + fn completed_chunks(&self) -> Option> { + let restoration = self.restoration.lock(); + + match *restoration { + Some(ref restoration) => { + let completed_chunks = restoration.manifest.block_hashes + .iter() + .filter(|h| !restoration.block_chunks_left.contains(h)) + .chain( + restoration.manifest.state_hashes + .iter() + .filter(|h| !restoration.state_chunks_left.contains(h)) + ) + .map(|h| *h) + .collect(); + + Some(completed_chunks) + }, + None => None, + } + } + fn status(&self) -> RestorationStatus { let mut cur_status = self.status.lock(); - if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status { - *state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; - *block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; + + match *cur_status { + RestorationStatus::Initializing { ref mut chunks_done } => { + *chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 + + self.block_chunks.load(Ordering::SeqCst) as u32; + } + RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } => { + *state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; + *block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; + }, + _ => (), } cur_status.clone() @@ -600,6 +726,7 @@ impl SnapshotService for Service { } fn abort_restore(&self) { + trace!(target: "snapshot", "Aborting restore"); self.restoring_snapshot.store(false, Ordering::SeqCst); *self.restoration.lock() = None; *self.status.lock() = RestorationStatus::Inactive; diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index 3e3087f2bf7..3fcb0addfab 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -130,12 +130,16 @@ fn guards_delete_folders() { service.init_restore(manifest.clone(), true).unwrap(); assert!(path.exists()); + // The `db` folder should have been deleted, + // while the `temp` one kept service.abort_restore(); - assert!(!path.exists()); + assert!(!path.join("db").exists()); + assert!(path.join("temp").exists()); service.init_restore(manifest.clone(), true).unwrap(); assert!(path.exists()); drop(service); - assert!(!path.exists()); + assert!(!path.join("db").exists()); + assert!(path.join("temp").exists()); } diff --git a/ethcore/src/snapshot/traits.rs b/ethcore/src/snapshot/traits.rs index a81c14f08d5..2b6ee9df9f4 100644 --- a/ethcore/src/snapshot/traits.rs +++ b/ethcore/src/snapshot/traits.rs @@ -30,6 +30,9 @@ pub trait SnapshotService : Sync + Send { /// `None` indicates warp sync isn't supported by the consensus engine. fn supported_versions(&self) -> Option<(u64, u64)>; + /// Returns a list of the completed chunks + fn completed_chunks(&self) -> Option>; + /// Get raw chunk for a given hash. fn chunk(&self, hash: H256) -> Option; diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index a55c0319b37..7668a645945 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -100,14 +100,27 @@ impl SyncHandler { } /// Called by peer when it is disconnecting - pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer)); - sync.handshaking_peers.remove(&peer); - if sync.peers.contains_key(&peer) { - debug!(target: "sync", "Disconnected {}", peer); - sync.clear_peer_download(peer); - sync.peers.remove(&peer); - sync.active_peers.remove(&peer); + pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { + trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_info(peer_id)); + sync.handshaking_peers.remove(&peer_id); + if sync.peers.contains_key(&peer_id) { + debug!(target: "sync", "Disconnected {}", peer_id); + sync.clear_peer_download(peer_id); + sync.peers.remove(&peer_id); + sync.active_peers.remove(&peer_id); + + if sync.state == SyncState::SnapshotManifest { + // Check if we are asking other peers for + // the snapshot manifest as well. + // If not, return to initial state + let still_asking_manifest = sync.peers.iter() + .filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest) + .next().is_none(); + + if still_asking_manifest { + sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain()); + } + } sync.continue_sync(io); } } @@ -320,6 +333,10 @@ impl SyncHandler { } fn on_peer_confirmed(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { + { + let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers"); + peer.confirmation = ForkConfirmation::Confirmed; + } sync.sync_peer(io, peer_id, false); } @@ -344,8 +361,8 @@ impl SyncHandler { } trace!(target: "sync", "{}: Confirmed peer", peer_id); - peer.confirmation = ForkConfirmation::Confirmed; if !io.chain_overlay().read().contains_key(&fork_number) { + trace!(target: "sync", "Inserting (fork) block {} header", fork_number); io.chain_overlay().write().insert(fork_number, header.to_vec()); } } @@ -560,6 +577,10 @@ impl SyncHandler { sync.continue_sync(io); return Ok(()); }, + RestorationStatus::Initializing { .. } => { + trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id); + return Ok(()); + } RestorationStatus::Ongoing { .. } => { trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id); }, @@ -659,11 +680,16 @@ impl SyncHandler { // Let the current sync round complete first. sync.active_peers.insert(peer_id.clone()); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); - if let Some((fork_block, _)) = sync.fork_block { - SyncRequester::request_fork_header(sync, io, peer_id, fork_block); - } else { - SyncHandler::on_peer_confirmed(sync, io, peer_id); + + match sync.fork_block { + Some((fork_block, _)) => { + SyncRequester::request_fork_header(sync, io, peer_id, fork_block); + }, + _ => { + SyncHandler::on_peer_confirmed(sync, io, peer_id); + } } + Ok(()) } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 381936949bd..5af28925493 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -245,9 +245,12 @@ pub struct SyncStatus { impl SyncStatus { /// Indicates if snapshot download is in progress pub fn is_snapshot_syncing(&self) -> bool { - self.state == SyncState::SnapshotManifest - || self.state == SyncState::SnapshotData - || self.state == SyncState::SnapshotWaiting + match self.state { + SyncState::SnapshotManifest | + SyncState::SnapshotData | + SyncState::SnapshotWaiting => true, + _ => false, + } } /// Returns max no of peers to display in informants @@ -643,7 +646,7 @@ impl ChainSync { } } - /// Resume downloading + /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { // Collect active peers that can sync let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| @@ -751,26 +754,45 @@ impl ChainSync { } } - if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) { - SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); - return; + // Only ask for old blocks if the peer has a higher difficulty + if force || higher_difficulty { + if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) { + SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); + return; + } + } else { + trace!(target: "sync", "peer {} is not suitable for asking old blocks", peer_id); + self.deactivate_peer(io, peer_id); } }, SyncState::SnapshotData => { - if let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = io.snapshot_service().status() { - if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { - trace!(target: "sync", "Snapshot queue full, pausing sync"); - self.state = SyncState::SnapshotWaiting; + match io.snapshot_service().status() { + RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { + // Initialize the snapshot if not already done + self.snapshot.initialize(io.snapshot_service()); + if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { + trace!(target: "sync", "Snapshot queue full, pausing sync"); + self.state = SyncState::SnapshotWaiting; + return; + } + }, + RestorationStatus::Initializing { .. } => { + trace!(target: "warp", "Snapshot is stil initializing."); return; - } + }, + _ => { + return; + }, } + if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() { self.clear_peer_download(peer_id); SyncRequester::request_snapshot_data(self, io, peer_id); } }, SyncState::SnapshotManifest | //already downloading from other peer - SyncState::Waiting | SyncState::SnapshotWaiting => () + SyncState::Waiting | + SyncState::SnapshotWaiting => () } } else { trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); @@ -861,10 +883,7 @@ impl ChainSync { packet.append(&chain.best_block_hash); packet.append(&chain.genesis_hash); if warp_protocol { - let manifest = match self.old_blocks.is_some() { - true => None, - false => io.snapshot_service().manifest(), - }; + let manifest = io.snapshot_service().manifest(); let block_number = manifest.as_ref().map_or(0, |m| m.block_number); let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp())); packet.append(&manifest_hash); @@ -908,29 +927,36 @@ impl ChainSync { } fn check_resume(&mut self, io: &mut SyncIo) { - if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() { - self.state = SyncState::Blocks; - self.continue_sync(io); - } else if self.state == SyncState::SnapshotWaiting { - match io.snapshot_service().status() { - RestorationStatus::Inactive => { - trace!(target:"sync", "Snapshot restoration is complete"); - self.restart(io); - }, - RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { - if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { - trace!(target:"sync", "Resuming snapshot sync"); - self.state = SyncState::SnapshotData; + match self.state { + SyncState::Waiting if !io.chain().queue_info().is_full() => { + self.state = SyncState::Blocks; + self.continue_sync(io); + }, + SyncState::SnapshotWaiting => { + match io.snapshot_service().status() { + RestorationStatus::Inactive => { + trace!(target:"sync", "Snapshot restoration is complete"); + self.restart(io); + }, + RestorationStatus::Initializing { .. } => { + trace!(target:"sync", "Snapshot restoration is initializing"); + }, + RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { + if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { + trace!(target:"sync", "Resuming snapshot sync"); + self.state = SyncState::SnapshotData; + self.continue_sync(io); + } + }, + RestorationStatus::Failed => { + trace!(target: "sync", "Snapshot restoration aborted"); + self.state = SyncState::WaitingPeers; + self.snapshot.clear(); self.continue_sync(io); - } - }, - RestorationStatus::Failed => { - trace!(target: "sync", "Snapshot restoration aborted"); - self.state = SyncState::WaitingPeers; - self.snapshot.clear(); - self.continue_sync(io); - }, - } + }, + } + }, + _ => (), } } diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 0bfb8569823..e0245fdbcd9 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -120,8 +120,9 @@ impl SyncSupplier { None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing } } else { - trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, r.val_at::(0)?, max_headers, skip, reverse); - r.val_at(0)? + let number = r.val_at::(0)?; + trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, number, max_headers, skip, reverse); + number }; let mut number = if reverse { @@ -135,7 +136,10 @@ impl SyncSupplier { let inc = (skip + 1) as BlockNumber; let overlay = io.chain_overlay().read(); - while number <= last && count < max_count { + // We are checking the `overlay` as well since it's where the ForkBlock + // header is cached : so peers can confirm we are on the right fork, + // even if we are not synced until the fork block + while (number <= last || overlay.contains_key(&number)) && count < max_count { if let Some(hdr) = overlay.get(&number) { trace!(target: "sync", "{}: Returning cached fork header", peer_id); data.extend_from_slice(hdr); @@ -152,8 +156,7 @@ impl SyncSupplier { break; } number -= inc; - } - else { + } else { number += inc; } } @@ -237,20 +240,20 @@ impl SyncSupplier { /// Respond to GetSnapshotManifest request fn return_snapshot_manifest(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let count = r.item_count().unwrap_or(0); - trace!(target: "sync", "{} -> GetSnapshotManifest", peer_id); + trace!(target: "warp", "{} -> GetSnapshotManifest", peer_id); if count != 0 { - debug!(target: "sync", "Invalid GetSnapshotManifest request, ignoring."); + debug!(target: "warp", "Invalid GetSnapshotManifest request, ignoring."); return Ok(None); } let rlp = match io.snapshot_service().manifest() { Some(manifest) => { - trace!(target: "sync", "{} <- SnapshotManifest", peer_id); + trace!(target: "warp", "{} <- SnapshotManifest", peer_id); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&manifest.into_rlp(), 1); rlp }, None => { - trace!(target: "sync", "{}: No manifest to return", peer_id); + trace!(target: "warp", "{}: No snapshot manifest to return", peer_id); RlpStream::new_list(0) } }; @@ -260,15 +263,16 @@ impl SyncSupplier { /// Respond to GetSnapshotData request fn return_snapshot_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let hash: H256 = r.val_at(0)?; - trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash); + trace!(target: "warp", "{} -> GetSnapshotData {:?}", peer_id, hash); let rlp = match io.snapshot_service().chunk(hash) { Some(data) => { let mut rlp = RlpStream::new_list(1); - trace!(target: "sync", "{} <- SnapshotData", peer_id); + trace!(target: "warp", "{} <- SnapshotData", peer_id); rlp.append(&data); rlp }, None => { + trace!(target: "warp", "{}: No snapshot data to return", peer_id); RlpStream::new_list(0) } }; diff --git a/ethcore/sync/src/snapshot.rs b/ethcore/sync/src/snapshot.rs index 917e84c8846..b603a2a007a 100644 --- a/ethcore/sync/src/snapshot.rs +++ b/ethcore/sync/src/snapshot.rs @@ -14,10 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use hash::keccak; +use ethcore::snapshot::{ManifestData, SnapshotService}; use ethereum_types::H256; +use hash::keccak; +use rand::{thread_rng, Rng}; + use std::collections::HashSet; -use ethcore::snapshot::ManifestData; +use std::iter::FromIterator; #[derive(PartialEq, Eq, Debug)] pub enum ChunkType { @@ -32,6 +35,7 @@ pub struct Snapshot { completed_chunks: HashSet, snapshot_hash: Option, bad_hashes: HashSet, + initialized: bool, } impl Snapshot { @@ -44,7 +48,27 @@ impl Snapshot { completed_chunks: HashSet::new(), snapshot_hash: None, bad_hashes: HashSet::new(), + initialized: false, + } + } + + /// Sync the Snapshot completed chunks with the Snapshot Service + pub fn initialize(&mut self, snapshot_service: &SnapshotService) { + if self.initialized { + return; + } + + if let Some(completed_chunks) = snapshot_service.completed_chunks() { + self.completed_chunks = HashSet::from_iter(completed_chunks); } + + trace!( + target: "snapshot", + "Snapshot is now initialized with {} completed chunks.", + self.completed_chunks.len(), + ); + + self.initialized = true; } /// Clear everything. @@ -54,6 +78,7 @@ impl Snapshot { self.downloading_chunks.clear(); self.completed_chunks.clear(); self.snapshot_hash = None; + self.initialized = false; } /// Check if currently downloading a snapshot. @@ -89,18 +114,35 @@ impl Snapshot { Err(()) } - /// Find a chunk to download + /// Find a random chunk to download pub fn needed_chunk(&mut self) -> Option { - // check state chunks first - let chunk = self.pending_state_chunks.iter() - .chain(self.pending_block_chunks.iter()) - .find(|&h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h)) - .cloned(); + // Find all random chunks: first blocks, then state + let needed_chunks = { + let chunk_filter = |h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h); + + let needed_block_chunks = self.pending_block_chunks.iter() + .filter(|&h| chunk_filter(h)) + .map(|h| *h) + .collect::>(); + + // If no block chunks to download, get the state chunks + if needed_block_chunks.len() == 0 { + self.pending_state_chunks.iter() + .filter(|&h| chunk_filter(h)) + .map(|h| *h) + .collect::>() + } else { + needed_block_chunks + } + }; + + // Get a random chunk + let chunk = thread_rng().choose(&needed_chunks); if let Some(hash) = chunk { self.downloading_chunks.insert(hash.clone()); } - chunk + chunk.map(|h| *h) } pub fn clear_chunk_download(&mut self, hash: &H256) { @@ -185,8 +227,15 @@ mod test { let requested: Vec = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect(); assert!(snapshot.needed_chunk().is_none()); - assert_eq!(&requested[0..20], &manifest.state_hashes[..]); - assert_eq!(&requested[20..40], &manifest.block_hashes[..]); + + let requested_all_block_chunks = manifest.block_hashes.iter() + .all(|h| requested.iter().any(|rh| rh == h)); + assert!(requested_all_block_chunks); + + let requested_all_state_chunks = manifest.state_hashes.iter() + .all(|h| requested.iter().any(|rh| rh == h)); + assert!(requested_all_state_chunks); + assert_eq!(snapshot.downloading_chunks.len(), 40); assert_eq!(snapshot.validate_chunk(&state_chunks[4]), Ok(ChunkType::State(manifest.state_hashes[4].clone()))); diff --git a/ethcore/sync/src/tests/snapshot.rs b/ethcore/sync/src/tests/snapshot.rs index 804ebe9c535..864f3d4dc6e 100644 --- a/ethcore/sync/src/tests/snapshot.rs +++ b/ethcore/sync/src/tests/snapshot.rs @@ -80,6 +80,10 @@ impl SnapshotService for TestSnapshotService { Some((1, 2)) } + fn completed_chunks(&self) -> Option> { + Some(vec![]) + } + fn chunk(&self, hash: H256) -> Option { self.chunks.get(&hash).cloned() } diff --git a/ethcore/types/src/restoration_status.rs b/ethcore/types/src/restoration_status.rs index 0cc7fccc08e..51f5b8aa0a3 100644 --- a/ethcore/types/src/restoration_status.rs +++ b/ethcore/types/src/restoration_status.rs @@ -21,6 +21,11 @@ pub enum RestorationStatus { /// No restoration. Inactive, + /// Restoration is initalizing + Initializing { + /// Number of chunks done/imported + chunks_done: u32, + }, /// Ongoing restoration. Ongoing { /// Total number of state chunks. diff --git a/parity/informant.rs b/parity/informant.rs index beeb258b522..a4d2727c359 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -278,15 +278,12 @@ impl Informant { } = full_report; let rpc_stats = self.rpc_stats.as_ref(); - - let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| + let snapshot_sync = sync_info.as_ref().map_or(false, |s| s.snapshot_sync) && self.snapshot.as_ref().map_or(false, |s| match s.status() { - RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => - (true, state_chunks_done + block_chunks_done, state_chunks + block_chunks), - _ => (false, 0, 0), + RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => true, + _ => false, } ); - let snapshot_sync = snapshot_sync && sync_info.as_ref().map_or(false, |s| s.snapshot_sync); if !importing && !snapshot_sync && elapsed < Duration::from_secs(30) { return; } @@ -318,7 +315,19 @@ impl Informant { paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) ), - true => format!("Syncing snapshot {}/{}", snapshot_current, snapshot_total), + true => { + self.snapshot.as_ref().map_or(String::new(), |s| + match s.status() { + RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => { + format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks) + }, + RestorationStatus::Initializing { chunks_done } => { + format!("Snapshot initializing ({} chunks restored)", chunks_done) + }, + _ => String::new(), + } + ) + }, }, false => String::new(), }, diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 423864679a2..3c0dadedaa3 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -122,6 +122,7 @@ fn restore_using(snapshot: Arc, reader: &R, match snapshot.status() { RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), + RestorationStatus::Initializing { .. } => Err("Snapshot restoration is still initializing.".into()), RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), RestorationStatus::Inactive => { info!("Restoration complete."); diff --git a/rpc/src/v1/tests/helpers/snapshot_service.rs b/rpc/src/v1/tests/helpers/snapshot_service.rs index 25abab578e9..099773ab522 100644 --- a/rpc/src/v1/tests/helpers/snapshot_service.rs +++ b/rpc/src/v1/tests/helpers/snapshot_service.rs @@ -43,6 +43,7 @@ impl TestSnapshotService { impl SnapshotService for TestSnapshotService { fn manifest(&self) -> Option { None } fn supported_versions(&self) -> Option<(u64, u64)> { None } + fn completed_chunks(&self) -> Option> { Some(vec![]) } fn chunk(&self, _hash: H256) -> Option { None } fn status(&self) -> RestorationStatus { self.status.lock().clone() } fn begin_restore(&self, _manifest: ManifestData) { }