diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 665fe9e4ca1..3a161f8545b 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -241,7 +241,6 @@ pub struct Service { progress: super::Progress, taking_snapshot: AtomicBool, restoring_snapshot: AtomicBool, - restoration_ready: AtomicBool, } impl Service { @@ -263,7 +262,6 @@ impl Service { progress: Default::default(), taking_snapshot: AtomicBool::new(false), restoring_snapshot: AtomicBool::new(false), - restoration_ready: AtomicBool::new(false), }; // create the root snapshot dir if it doesn't exist. @@ -416,81 +414,78 @@ impl Service { /// Initialize the restoration synchronously. /// The recover flag indicates whether to recover the restored snapshot. pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { - { - let mut res = self.restoration.lock(); - - let rest_dir = self.restoration_dir(); - let rest_db = self.restoration_db(); - let recovery_temp = self.temp_recovery_dir(); - let prev_chunks = self.prev_chunks_dir(); - - // delete and restore the restoration dir. - if let Err(e) = fs::remove_dir_all(&prev_chunks) { - match e.kind() { - ErrorKind::NotFound => {}, - _ => return Err(e.into()), - } - } + let mut res = self.restoration.lock(); - self.restoration_ready.store(false, Ordering::SeqCst); + let rest_dir = self.restoration_dir(); + let rest_db = self.restoration_db(); + let recovery_temp = self.temp_recovery_dir(); + let prev_chunks = self.prev_chunks_dir(); - // Move the previous recovery temp directory - // to `prev_chunks` to be able to restart restoring - // with previously downloaded blocks - // This step is optional, so don't fail on error - fs::rename(&recovery_temp, &prev_chunks).ok(); + // delete and restore the restoration dir. + if let Err(e) = fs::remove_dir_all(&prev_chunks) { + match e.kind() { + ErrorKind::NotFound => {}, + _ => return Err(e.into()), + } + } - self.state_chunks.store(0, Ordering::SeqCst); - self.block_chunks.store(0, Ordering::SeqCst); + // Move the previous recovery temp directory + // to `prev_chunks` to be able to restart restoring + // with previously downloaded blocks + // This step is optional, so don't fail on error + fs::rename(&recovery_temp, &prev_chunks).ok(); - // tear down existing restoration. - *res = None; + self.state_chunks.store(0, Ordering::SeqCst); + self.block_chunks.store(0, Ordering::SeqCst); - // delete and restore the restoration dir. - if let Err(e) = fs::remove_dir_all(&rest_dir) { - match e.kind() { - ErrorKind::NotFound => {}, - _ => return Err(e.into()), - } - } + // tear down existing restoration. + *res = None; - fs::create_dir_all(&rest_dir)?; - - // make new restoration. - let writer = match recover { - true => Some(LooseWriter::new(self.temp_recovery_dir())?), - false => None - }; - - let params = RestorationParams { - manifest: manifest.clone(), - pruning: self.pruning, - db: self.restoration_db_handler.open(&self.restoration_db())?, - writer: writer, - genesis: &self.genesis_block, - guard: Guard::new(rest_db), - engine: &*self.engine, - }; - - let state_chunks = manifest.state_hashes.len(); - let block_chunks = manifest.block_hashes.len(); - - *res = Some(Restoration::new(params)?); - - *self.status.lock() = RestorationStatus::Ongoing { - state_chunks: state_chunks as u32, - block_chunks: block_chunks as u32, - state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, - block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, - }; - - self.restoring_snapshot.store(true, Ordering::SeqCst); - - // Import previous chunks, continue if it fails - self.import_prev_chunks(&mut res, manifest).ok(); - self.restoration_ready.store(true, Ordering::SeqCst); + // delete and restore the restoration dir. + if let Err(e) = fs::remove_dir_all(&rest_dir) { + match e.kind() { + ErrorKind::NotFound => {}, + _ => return Err(e.into()), + } } + { *self.status.lock() = RestorationStatus::Initializing; } + + fs::create_dir_all(&rest_dir)?; + + // make new restoration. + let writer = match recover { + true => Some(LooseWriter::new(self.temp_recovery_dir())?), + false => None + }; + + let params = RestorationParams { + manifest: manifest.clone(), + pruning: self.pruning, + db: self.restoration_db_handler.open(&self.restoration_db())?, + writer: writer, + genesis: &self.genesis_block, + guard: Guard::new(rest_db), + engine: &*self.engine, + }; + + let state_chunks = manifest.state_hashes.len(); + let block_chunks = manifest.block_hashes.len(); + + *res = Some(Restoration::new(params)?); + + self.restoring_snapshot.store(true, Ordering::SeqCst); + + // Import previous chunks, continue if it fails + self.import_prev_chunks(&mut res, manifest).ok(); + + *self.status.lock() = RestorationStatus::Ongoing { + state_chunks: state_chunks as u32, + block_chunks: block_chunks as u32, + state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, + block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, + }; + Ok(()) } @@ -583,7 +578,6 @@ impl Service { let _ = fs::remove_dir_all(self.restoration_dir()); *self.status.lock() = RestorationStatus::Inactive; - self.restoration_ready.store(false, Ordering::SeqCst); Ok(()) } @@ -603,7 +597,7 @@ impl Service { trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash); return Ok(()); }, - RestorationStatus::Ongoing { .. } => { + RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing => { let (res, db) = { let rest = match *restoration { Some(ref mut r) => r, @@ -682,10 +676,6 @@ impl SnapshotService for Service { self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) } - fn restoration_ready(&self) -> bool { - self.restoration_ready.load(Ordering::SeqCst) - } - fn completed_chunks(&self) -> Option> { let restoration = self.restoration.lock(); @@ -727,7 +717,6 @@ impl SnapshotService for Service { fn abort_restore(&self) { trace!(target: "snapshot", "Aborting restore"); self.restoring_snapshot.store(false, Ordering::SeqCst); - self.restoration_ready.store(false, Ordering::SeqCst); *self.restoration.lock() = None; *self.status.lock() = RestorationStatus::Inactive; } diff --git a/ethcore/src/snapshot/traits.rs b/ethcore/src/snapshot/traits.rs index c4f2a763078..2b6ee9df9f4 100644 --- a/ethcore/src/snapshot/traits.rs +++ b/ethcore/src/snapshot/traits.rs @@ -30,9 +30,6 @@ pub trait SnapshotService : Sync + Send { /// `None` indicates warp sync isn't supported by the consensus engine. fn supported_versions(&self) -> Option<(u64, u64)>; - /// Returns whether the Snapshot Service restoration is ready - fn restoration_ready(&self) -> bool; - /// Returns a list of the completed chunks fn completed_chunks(&self) -> Option>; diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index a6ae7822dc7..8ed3117ca5f 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -539,7 +539,7 @@ impl SyncHandler { } sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw())); io.snapshot_service().begin_restore(manifest); - sync.state = SyncState::SnapshotInit; + sync.state = SyncState::SnapshotData; // give a task to the same peer first. sync.sync_peer(io, peer_id, false); @@ -578,6 +578,10 @@ impl SyncHandler { sync.continue_sync(io); return Ok(()); }, + RestorationStatus::Initializing => { + trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id); + return Ok(()); + } RestorationStatus::Ongoing { .. } => { trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id); }, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 8ff7925b173..33d335fdce9 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -199,8 +199,6 @@ pub enum SyncState { WaitingPeers, /// Waiting for snapshot manifest download SnapshotManifest, - /// Snapshot service is initializing - SnapshotInit, /// Downloading snapshot data SnapshotData, /// Waiting for snapshot restoration progress. @@ -254,7 +252,6 @@ impl SyncStatus { match self.state { SyncState::SnapshotManifest | SyncState::SnapshotData | - SyncState::SnapshotInit | SyncState::SnapshotWaiting => true, _ => false, } @@ -653,17 +650,6 @@ impl ChainSync { } } - /// Check if the snapshot service is ready - fn check_snapshot_service(&mut self, io: &SyncIo) { - if io.snapshot_service().restoration_ready() { - trace!(target: "snapshot", "Snapshot Service is ready!"); - // Sync the previously resumed chunks - self.snapshot.sync(io); - // Move to fetching snapshot data - self.state = SyncState::SnapshotData; - } - } - /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { // Collect active peers that can sync @@ -726,10 +712,6 @@ impl ChainSync { trace!(target: "sync", "Waiting for the block queue"); return; } - if self.state == SyncState::SnapshotInit { - trace!(target: "sync", "Waiting for the snapshot service to initialize"); - return; - } if self.state == SyncState::SnapshotWaiting { trace!(target: "sync", "Waiting for the snapshot restoration"); return; @@ -788,13 +770,28 @@ impl ChainSync { } }, SyncState::SnapshotData => { - if let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = io.snapshot_service().status() { - if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { - trace!(target: "sync", "Snapshot queue full, pausing sync"); - self.state = SyncState::SnapshotWaiting; + match io.snapshot_service().status() { + RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { + // Initialize the snapshot if not already done + if !self.snapshot.is_initialized() { + self.snapshot.initialize(io); + } + + if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { + trace!(target: "sync", "Snapshot queue full, pausing sync"); + self.state = SyncState::SnapshotWaiting; + return; + } + }, + RestorationStatus::Initializing => { + trace!(target: "warp", "Snapshot is stil initializing."); return; - } + }, + _ => { + return; + }, } + if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() { self.clear_peer_download(peer_id); SyncRequester::request_snapshot_data(self, io, peer_id); @@ -802,8 +799,7 @@ impl ChainSync { }, SyncState::SnapshotManifest | //already downloading from other peer SyncState::Waiting | - SyncState::SnapshotWaiting | - SyncState::SnapshotInit => () + SyncState::SnapshotWaiting => () } } else { trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); @@ -949,6 +945,9 @@ impl ChainSync { trace!(target:"sync", "Snapshot restoration is complete"); self.restart(io); }, + RestorationStatus::Initializing => { + trace!(target:"sync", "Snapshot restoration is initializing"); + }, RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { trace!(target:"sync", "Resuming snapshot sync"); @@ -964,9 +963,6 @@ impl ChainSync { }, } }, - SyncState::SnapshotInit => { - self.check_snapshot_service(io); - } _ => (), } } diff --git a/ethcore/sync/src/snapshot.rs b/ethcore/sync/src/snapshot.rs index bde0ed00847..108bb1d2b1e 100644 --- a/ethcore/sync/src/snapshot.rs +++ b/ethcore/sync/src/snapshot.rs @@ -36,6 +36,7 @@ pub struct Snapshot { completed_chunks: HashSet, snapshot_hash: Option, bad_hashes: HashSet, + initialized: bool, } impl Snapshot { @@ -48,20 +49,27 @@ impl Snapshot { completed_chunks: HashSet::new(), snapshot_hash: None, bad_hashes: HashSet::new(), + initialized: false, } } /// Sync the Snapshot completed chunks with the Snapshot Service - pub fn sync (&mut self, io: &SyncIo) { + pub fn initialize (&mut self, io: &SyncIo) { + if self.initialized { + return; + } + if let Some(completed_chunks) = io.snapshot_service().completed_chunks() { self.completed_chunks = HashSet::from_iter(completed_chunks); } trace!( target: "snapshot", - "Synced ChainSync snapshot with {} completed chunks", + "Snapshot is now initialized with {} completed chunks.", self.completed_chunks.len(), ); + + self.initialized = true; } /// Clear everything. @@ -71,6 +79,7 @@ impl Snapshot { self.downloading_chunks.clear(); self.completed_chunks.clear(); self.snapshot_hash = None; + self.initialized = false; } /// Check if currently downloading a snapshot. @@ -153,6 +162,10 @@ impl Snapshot { pub fn is_complete(&self) -> bool { self.total_chunks() == self.completed_chunks.len() } + + pub fn is_initialized(&self) -> bool { + self.initialized + } } #[cfg(test)] diff --git a/ethcore/sync/src/tests/snapshot.rs b/ethcore/sync/src/tests/snapshot.rs index 9f69db115c5..864f3d4dc6e 100644 --- a/ethcore/sync/src/tests/snapshot.rs +++ b/ethcore/sync/src/tests/snapshot.rs @@ -80,10 +80,6 @@ impl SnapshotService for TestSnapshotService { Some((1, 2)) } - fn restoration_ready(&self) -> bool { - true - } - fn completed_chunks(&self) -> Option> { Some(vec![]) } diff --git a/ethcore/types/src/restoration_status.rs b/ethcore/types/src/restoration_status.rs index 0cc7fccc08e..e02770e0c96 100644 --- a/ethcore/types/src/restoration_status.rs +++ b/ethcore/types/src/restoration_status.rs @@ -21,6 +21,8 @@ pub enum RestorationStatus { /// No restoration. Inactive, + /// Restoration is initalizing + Initializing, /// Ongoing restoration. Ongoing { /// Total number of state chunks. diff --git a/parity/informant.rs b/parity/informant.rs index beeb258b522..37781ee7595 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -279,11 +279,12 @@ impl Informant { let rpc_stats = self.rpc_stats.as_ref(); - let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| + let (snapshot_sync, snapshot_starting, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, false, 0, 0), |s| match s.status() { RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => - (true, state_chunks_done + block_chunks_done, state_chunks + block_chunks), - _ => (false, 0, 0), + (true, false, state_chunks_done + block_chunks_done, state_chunks + block_chunks), + RestorationStatus::Initializing => (true, true, 0, 0), + _ => (false, false, 0, 0), } ); let snapshot_sync = snapshot_sync && sync_info.as_ref().map_or(false, |s| s.snapshot_sync); @@ -318,7 +319,13 @@ impl Informant { paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) ), - true => format!("Syncing snapshot {}/{}", snapshot_current, snapshot_total), + true => { + if snapshot_starting { + String::new("Snapshot initializing") + } else { + format!("Syncing snapshot {}/{}", snapshot_current, snapshot_total) + } + }, }, false => String::new(), }, diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 423864679a2..9f3ce39c8ac 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -122,6 +122,7 @@ fn restore_using(snapshot: Arc, reader: &R, match snapshot.status() { RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), + RestorationStatus::Initializing => Err("Snapshot restoration is still initializing.".into()), RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), RestorationStatus::Inactive => { info!("Restoration complete."); diff --git a/rpc/src/v1/tests/helpers/snapshot_service.rs b/rpc/src/v1/tests/helpers/snapshot_service.rs index 276d2c4a0c9..099773ab522 100644 --- a/rpc/src/v1/tests/helpers/snapshot_service.rs +++ b/rpc/src/v1/tests/helpers/snapshot_service.rs @@ -43,7 +43,6 @@ impl TestSnapshotService { impl SnapshotService for TestSnapshotService { fn manifest(&self) -> Option { None } fn supported_versions(&self) -> Option<(u64, u64)> { None } - fn restoration_ready(&self) -> bool { true } fn completed_chunks(&self) -> Option> { Some(vec![]) } fn chunk(&self, _hash: H256) -> Option { None } fn status(&self) -> RestorationStatus { self.status.lock().clone() }