-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Resumable warp-sync / Seed downloaded snapshots #8544
Changes from 54 commits
1f735cf
d5bcfa5
8217b82
da0eb24
017fd34
96b9e0d
5a74ba9
ef779c7
e240635
f37c0ad
487e51d
157cf32
68aeaee
290adf7
5bc5983
2c46379
87efaf5
0d28cdd
2ea7e6f
18af9cb
d81b784
60950b6
fa2b5a9
4be5dd4
f977338
8ad988d
0f1e5b4
20145c8
14979ff
a6cdd99
91f91a3
39f67cc
ce2bce4
79a46f9
329a8f4
6f171b9
72302ae
9735382
a83c49c
50eadfb
f9500bc
67980cc
ddf2f3d
75fd4b5
4fc215c
21d1cee
dc5a035
abf1647
572fde0
b59e743
6cef217
9757a5c
6b0d11f
721ae85
e3947c9
3afd81f
ae3b919
259432e
3475321
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,8 @@ | |
//! Snapshot network service implementation. | ||
|
||
use std::collections::HashSet; | ||
use std::io::ErrorKind; | ||
use std::fs; | ||
use std::io::{self, Read, ErrorKind}; | ||
use std::fs::{self, File}; | ||
use std::path::PathBuf; | ||
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
|
@@ -30,6 +30,7 @@ use blockchain::BlockChain; | |
use client::{Client, ChainInfo, ClientIoMessage}; | ||
use engines::EthEngine; | ||
use error::Error; | ||
use hash::keccak; | ||
use ids::BlockId; | ||
|
||
use io::IoChannel; | ||
|
@@ -270,8 +271,8 @@ impl Service { | |
} | ||
} | ||
|
||
// delete the temporary restoration dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_dir()) { | ||
// delete the temporary restoration DB dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_db()) { | ||
if e.kind() != ErrorKind::NotFound { | ||
return Err(e.into()) | ||
} | ||
|
@@ -325,6 +326,13 @@ impl Service { | |
dir | ||
} | ||
|
||
// previous snapshot chunks path. | ||
fn prev_chunks_dir(&self) -> PathBuf { | ||
let mut dir = self.snapshot_root.clone(); | ||
dir.push("prev_chunks"); | ||
dir | ||
} | ||
|
||
// replace one the client's database with our own. | ||
fn replace_client_db(&self) -> Result<(), Error> { | ||
let our_db = self.restoration_db(); | ||
|
@@ -406,9 +414,26 @@ impl Service { | |
/// Initialize the restoration synchronously. | ||
/// The recover flag indicates whether to recover the restored snapshot. | ||
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { | ||
let mut res = self.restoration.lock(); | ||
|
||
let rest_dir = self.restoration_dir(); | ||
let rest_db = self.restoration_db(); | ||
let recovery_temp = self.temp_recovery_dir(); | ||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
let mut res = self.restoration.lock(); | ||
// delete and restore the restoration dir. | ||
if let Err(e) = fs::remove_dir_all(&prev_chunks) { | ||
match e.kind() { | ||
ErrorKind::NotFound => {}, | ||
_ => return Err(e.into()), | ||
} | ||
} | ||
|
||
// Move the previous recovery temp directory | ||
// to `prev_chunks` to be able to restart restoring | ||
// with previously downloaded blocks | ||
// This step is optional, so don't fail on error | ||
fs::rename(&recovery_temp, &prev_chunks).ok(); | ||
|
||
self.state_chunks.store(0, Ordering::SeqCst); | ||
self.block_chunks.store(0, Ordering::SeqCst); | ||
|
@@ -424,37 +449,102 @@ impl Service { | |
} | ||
} | ||
|
||
{ | ||
*self.status.lock() = RestorationStatus::Initializing { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't think scope here is necessary since |
||
chunks_done: 0, | ||
}; | ||
} | ||
|
||
fs::create_dir_all(&rest_dir)?; | ||
|
||
// make new restoration. | ||
let writer = match recover { | ||
true => Some(LooseWriter::new(self.temp_recovery_dir())?), | ||
true => Some(LooseWriter::new(recovery_temp)?), | ||
false => None | ||
}; | ||
|
||
let params = RestorationParams { | ||
manifest: manifest, | ||
manifest: manifest.clone(), | ||
pruning: self.pruning, | ||
db: self.restoration_db_handler.open(&self.restoration_db())?, | ||
db: self.restoration_db_handler.open(&rest_db)?, | ||
writer: writer, | ||
genesis: &self.genesis_block, | ||
guard: Guard::new(rest_dir), | ||
guard: Guard::new(rest_db), | ||
engine: &*self.engine, | ||
}; | ||
|
||
let state_chunks = params.manifest.state_hashes.len(); | ||
let block_chunks = params.manifest.block_hashes.len(); | ||
let state_chunks = manifest.state_hashes.len(); | ||
let block_chunks = manifest.block_hashes.len(); | ||
|
||
*res = Some(Restoration::new(params)?); | ||
|
||
self.restoring_snapshot.store(true, Ordering::SeqCst); | ||
|
||
// Import previous chunks, continue if it fails | ||
self.import_prev_chunks(&mut res, manifest).ok(); | ||
|
||
*self.status.lock() = RestorationStatus::Ongoing { | ||
state_chunks: state_chunks as u32, | ||
block_chunks: block_chunks as u32, | ||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, | ||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, | ||
}; | ||
|
||
self.restoring_snapshot.store(true, Ordering::SeqCst); | ||
Ok(()) | ||
} | ||
|
||
/// Import the previous chunks into the current restoration | ||
fn import_prev_chunks(&self, restoration: &mut Option<Restoration>, manifest: ManifestData) -> Result<(), Error> { | ||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
// Restore previous snapshot chunks | ||
let files = fs::read_dir(prev_chunks.as_path())?; | ||
let mut num_temp_chunks = 0; | ||
|
||
for prev_chunk_file in files { | ||
// Don't go over all the files if the restoration has been aborted | ||
if !self.restoring_snapshot.load(Ordering::SeqCst) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think restoration can be aborted since this function is called holding the restoration lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it first sets |
||
trace!(target:"snapshot", "Aborting importing previous chunks"); | ||
return Ok(()); | ||
} | ||
// Import the chunk, don't fail and continue if one fails | ||
match self.import_prev_chunk(restoration, &manifest, prev_chunk_file) { | ||
Ok(_) => num_temp_chunks += 1, | ||
Err(e) => trace!(target: "snapshot", "Error importing chunk: {:?}", e), | ||
} | ||
} | ||
|
||
trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Judging by the way it is computed, I think, |
||
|
||
// Remove the prev temp directory | ||
fs::remove_dir_all(&prev_chunks)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Import a previous chunk at the given path | ||
fn import_prev_chunk(&self, restoration: &mut Option<Restoration>, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<(), Error> { | ||
let file = file?; | ||
let path = file.path(); | ||
|
||
let mut file = File::open(path.clone())?; | ||
let mut buffer = Vec::new(); | ||
file.read_to_end(&mut buffer)?; | ||
|
||
let hash = keccak(&buffer); | ||
|
||
let is_state = if manifest.block_hashes.contains(&hash) { | ||
false | ||
} else if manifest.state_hashes.contains(&hash) { | ||
true | ||
} else { | ||
return Ok(()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I believe something like |
||
}; | ||
|
||
self.feed_chunk_with_restoration(restoration, hash, &buffer, is_state)?; | ||
|
||
trace!(target: "snapshot", "Fed chunk {:?}", hash); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -499,12 +589,19 @@ impl Service { | |
/// Feed a chunk of either kind. no-op if no restoration or status is wrong. | ||
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | ||
// TODO: be able to process block chunks and state chunks at same time? | ||
let (result, db) = { | ||
let mut restoration = self.restoration.lock(); | ||
let mut restoration = self.restoration.lock(); | ||
self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state) | ||
} | ||
|
||
/// Feed a chunk with the Restoration | ||
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | ||
let (result, db) = { | ||
match self.status() { | ||
RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()), | ||
RestorationStatus::Ongoing { .. } => { | ||
RestorationStatus::Inactive | RestorationStatus::Failed => { | ||
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash); | ||
return Ok(()); | ||
}, | ||
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => { | ||
let (res, db) = { | ||
let rest = match *restoration { | ||
Some(ref mut r) => r, | ||
|
@@ -583,11 +680,41 @@ impl SnapshotService for Service { | |
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) | ||
} | ||
|
||
fn completed_chunks(&self) -> Option<Vec<H256>> { | ||
let restoration = self.restoration.lock(); | ||
|
||
match *restoration { | ||
Some(ref restoration) => { | ||
let completed_chunks = restoration.manifest.block_hashes | ||
.iter() | ||
.filter(|h| !restoration.block_chunks_left.contains(h)) | ||
.chain( | ||
restoration.manifest.state_hashes | ||
.iter() | ||
.filter(|h| !restoration.state_chunks_left.contains(h)) | ||
) | ||
.map(|h| *h) | ||
.collect(); | ||
|
||
Some(completed_chunks) | ||
}, | ||
None => None, | ||
} | ||
} | ||
|
||
fn status(&self) -> RestorationStatus { | ||
let mut cur_status = self.status.lock(); | ||
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status { | ||
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; | ||
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; | ||
|
||
match *cur_status { | ||
RestorationStatus::Initializing { ref mut chunks_done } => { | ||
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 + | ||
self.block_chunks.load(Ordering::SeqCst) as u32; | ||
} | ||
RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } => { | ||
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; | ||
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; | ||
}, | ||
_ => (), | ||
} | ||
|
||
cur_status.clone() | ||
|
@@ -600,6 +727,7 @@ impl SnapshotService for Service { | |
} | ||
|
||
fn abort_restore(&self) { | ||
trace!(target: "snapshot", "Aborting restore"); | ||
self.restoring_snapshot.store(false, Ordering::SeqCst); | ||
*self.restoration.lock() = None; | ||
*self.status.lock() = RestorationStatus::Inactive; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,14 +101,27 @@ impl SyncHandler { | |
} | ||
|
||
/// Called by peer when it is disconnecting | ||
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) { | ||
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer)); | ||
sync.handshaking_peers.remove(&peer); | ||
if sync.peers.contains_key(&peer) { | ||
debug!(target: "sync", "Disconnected {}", peer); | ||
sync.clear_peer_download(peer); | ||
sync.peers.remove(&peer); | ||
sync.active_peers.remove(&peer); | ||
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { | ||
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_info(peer_id)); | ||
sync.handshaking_peers.remove(&peer_id); | ||
if sync.peers.contains_key(&peer_id) { | ||
debug!(target: "sync", "Disconnected {}", peer_id); | ||
sync.clear_peer_download(peer_id); | ||
sync.peers.remove(&peer_id); | ||
sync.active_peers.remove(&peer_id); | ||
|
||
if sync.state == SyncState::SnapshotManifest { | ||
// Check if other we are asking other peers for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: check if __ we are |
||
// the snapshot manifest as well. | ||
// If not, return to initial state | ||
let still_asking_manifest = sync.peers.iter() | ||
.filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest) | ||
.next().is_none(); | ||
|
||
if still_asking_manifest { | ||
sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain()); | ||
} | ||
} | ||
sync.continue_sync(io); | ||
} | ||
} | ||
|
@@ -321,6 +334,10 @@ impl SyncHandler { | |
} | ||
|
||
fn on_peer_confirmed(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { | ||
{ | ||
let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers"); | ||
peer.confirmation = ForkConfirmation::Confirmed; | ||
} | ||
sync.sync_peer(io, peer_id, false); | ||
} | ||
|
||
|
@@ -345,8 +362,8 @@ impl SyncHandler { | |
} | ||
|
||
trace!(target: "sync", "{}: Confirmed peer", peer_id); | ||
peer.confirmation = ForkConfirmation::Confirmed; | ||
if !io.chain_overlay().read().contains_key(&fork_number) { | ||
trace!(target: "sync", "Inserting (fork) block {} header", fork_number); | ||
io.chain_overlay().write().insert(fork_number, header.to_vec()); | ||
} | ||
} | ||
|
@@ -561,6 +578,10 @@ impl SyncHandler { | |
sync.continue_sync(io); | ||
return Ok(()); | ||
}, | ||
RestorationStatus::Initializing { .. } => { | ||
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id); | ||
return Ok(()); | ||
} | ||
RestorationStatus::Ongoing { .. } => { | ||
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id); | ||
}, | ||
|
@@ -657,11 +678,16 @@ impl SyncHandler { | |
// Let the current sync round complete first. | ||
sync.active_peers.insert(peer_id.clone()); | ||
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); | ||
if let Some((fork_block, _)) = sync.fork_block { | ||
SyncRequester::request_fork_header(sync, io, peer_id, fork_block); | ||
} else { | ||
SyncHandler::on_peer_confirmed(sync, io, peer_id); | ||
|
||
match sync.fork_block { | ||
Some((fork_block, _)) => { | ||
SyncRequester::request_fork_header(sync, io, peer_id, fork_block); | ||
}, | ||
_ => { | ||
SyncHandler::on_peer_confirmed(sync, io, peer_id); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this should be moved under
restoration.lock()
, previously it was ok to remove and create the same directory multiple times, now we are doing a rename which should probably be done only once.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is done now)