This repository has been archived by the owner on Nov 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Resumable warp-sync / Seed downloaded snapshots #8544
Merged
Merged
Changes from all commits
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
1f735cf
Start dividing sync chain : first supplier method
ngotchac d5bcfa5
WIP - updated chain sync supplier
ngotchac 8217b82
Finish refactoring the Chain Sync Supplier
ngotchac da0eb24
Create Chain Sync Requester
ngotchac 017fd34
Add Propagator for Chain Sync
ngotchac 96b9e0d
Add the Chain Sync Handler
ngotchac 5a74ba9
Move tests from mod -> handler
ngotchac ef779c7
Move tests to propagator
ngotchac e240635
Refactor SyncRequester arguments
ngotchac f37c0ad
Refactoring peer fork header handler
ngotchac 487e51d
Fix wrong highest block number in snapshot sync
ngotchac 157cf32
Small refactor...
ngotchac 68aeaee
Merge branch 'master' into ng-warp-update
ngotchac 290adf7
Resume warp-sync downloaded chunks
ngotchac 5bc5983
Add comments
ngotchac 2c46379
Refactoring the previous chunks import
ngotchac 87efaf5
Fix tests
ngotchac 0d28cdd
Merge branch 'master' into ng-warp-update
ngotchac 2ea7e6f
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac 18af9cb
Address PR grumbles
ngotchac d81b784
Merge branch 'master' into ng-warp-update
ngotchac 60950b6
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac fa2b5a9
Fix not seeding current snapshot
ngotchac 4be5dd4
Address PR Grumbles
ngotchac f977338
Address PR grumble
ngotchac 8ad988d
Retry failed CI job
ngotchac 0f1e5b4
Update SnapshotService readiness check
ngotchac 20145c8
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac 14979ff
Fix tests
ngotchac a6cdd99
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac 91f91a3
Fix tests
ngotchac 39f67cc
Fix test
ngotchac ce2bce4
Early abort importing previous chunks
ngotchac 79a46f9
PR Grumbles
ngotchac 329a8f4
Merge branch 'master' into ng-warp-update
ngotchac 6f171b9
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac 72302ae
Update Gitlab CI config
ngotchac 9735382
SyncState back to Waiting when Manifest peers disconnect
ngotchac a83c49c
Move fix
ngotchac 50eadfb
Better fix
ngotchac f9500bc
Revert GitLab CI changes
ngotchac 67980cc
Merge branch 'master' into ng-warp-resume
ngotchac ddf2f3d
Fix Warning
ngotchac 75fd4b5
Refactor resuming snapshots
ngotchac 4fc215c
Fix string construction
ngotchac 21d1cee
Revert "Refactor resuming snapshots"
ngotchac dc5a035
Update informant log
ngotchac abf1647
Merge branch 'ng-warp-resume-refacto' into ng-warp-resume
ngotchac 572fde0
Fix string construction
ngotchac b59e743
Refactor resuming snapshots
ngotchac 6cef217
Fix informant
ngotchac 9757a5c
PR Grumbles
ngotchac 6b0d11f
Merge branch 'master' into ng-warp-resume
ngotchac 721ae85
Update informant message : show chunks done
ngotchac e3947c9
PR Grumbles
ngotchac 3afd81f
Fix
ngotchac ae3b919
Fix Warning
ngotchac 259432e
Merge branch 'master' into ng-warp-resume
ngotchac 3475321
PR Grumbles
ngotchac File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,8 @@ | |
//! Snapshot network service implementation. | ||
|
||
use std::collections::HashSet; | ||
use std::io::ErrorKind; | ||
use std::fs; | ||
use std::io::{self, Read, ErrorKind}; | ||
use std::fs::{self, File}; | ||
use std::path::PathBuf; | ||
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
|
@@ -30,6 +30,7 @@ use blockchain::BlockChain; | |
use client::{Client, ChainInfo, ClientIoMessage}; | ||
use engines::EthEngine; | ||
use error::Error; | ||
use hash::keccak; | ||
use ids::BlockId; | ||
|
||
use io::IoChannel; | ||
|
@@ -270,8 +271,8 @@ impl Service { | |
} | ||
} | ||
|
||
// delete the temporary restoration dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_dir()) { | ||
// delete the temporary restoration DB dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_db()) { | ||
if e.kind() != ErrorKind::NotFound { | ||
return Err(e.into()) | ||
} | ||
|
@@ -325,6 +326,13 @@ impl Service { | |
dir | ||
} | ||
|
||
// previous snapshot chunks path. | ||
fn prev_chunks_dir(&self) -> PathBuf { | ||
let mut dir = self.snapshot_root.clone(); | ||
dir.push("prev_chunks"); | ||
dir | ||
} | ||
|
||
// replace one the client's database with our own. | ||
fn replace_client_db(&self) -> Result<(), Error> { | ||
let our_db = self.restoration_db(); | ||
|
@@ -406,9 +414,26 @@ impl Service { | |
/// Initialize the restoration synchronously. | ||
/// The recover flag indicates whether to recover the restored snapshot. | ||
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { | ||
let mut res = self.restoration.lock(); | ||
|
||
let rest_dir = self.restoration_dir(); | ||
let rest_db = self.restoration_db(); | ||
let recovery_temp = self.temp_recovery_dir(); | ||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
let mut res = self.restoration.lock(); | ||
// delete and restore the restoration dir. | ||
if let Err(e) = fs::remove_dir_all(&prev_chunks) { | ||
match e.kind() { | ||
ErrorKind::NotFound => {}, | ||
_ => return Err(e.into()), | ||
} | ||
} | ||
|
||
// Move the previous recovery temp directory | ||
// to `prev_chunks` to be able to restart restoring | ||
// with previously downloaded blocks | ||
// This step is optional, so don't fail on error | ||
fs::rename(&recovery_temp, &prev_chunks).ok(); | ||
|
||
self.state_chunks.store(0, Ordering::SeqCst); | ||
self.block_chunks.store(0, Ordering::SeqCst); | ||
|
@@ -424,40 +449,104 @@ impl Service { | |
} | ||
} | ||
|
||
*self.status.lock() = RestorationStatus::Initializing { | ||
chunks_done: 0, | ||
}; | ||
|
||
fs::create_dir_all(&rest_dir)?; | ||
|
||
// make new restoration. | ||
let writer = match recover { | ||
true => Some(LooseWriter::new(self.temp_recovery_dir())?), | ||
true => Some(LooseWriter::new(recovery_temp)?), | ||
false => None | ||
}; | ||
|
||
let params = RestorationParams { | ||
manifest: manifest, | ||
manifest: manifest.clone(), | ||
pruning: self.pruning, | ||
db: self.restoration_db_handler.open(&self.restoration_db())?, | ||
db: self.restoration_db_handler.open(&rest_db)?, | ||
writer: writer, | ||
genesis: &self.genesis_block, | ||
guard: Guard::new(rest_dir), | ||
guard: Guard::new(rest_db), | ||
engine: &*self.engine, | ||
}; | ||
|
||
let state_chunks = params.manifest.state_hashes.len(); | ||
let block_chunks = params.manifest.block_hashes.len(); | ||
let state_chunks = manifest.state_hashes.len(); | ||
let block_chunks = manifest.block_hashes.len(); | ||
|
||
*res = Some(Restoration::new(params)?); | ||
|
||
self.restoring_snapshot.store(true, Ordering::SeqCst); | ||
|
||
// Import previous chunks, continue if it fails | ||
self.import_prev_chunks(&mut res, manifest).ok(); | ||
|
||
*self.status.lock() = RestorationStatus::Ongoing { | ||
state_chunks: state_chunks as u32, | ||
block_chunks: block_chunks as u32, | ||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, | ||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, | ||
}; | ||
|
||
self.restoring_snapshot.store(true, Ordering::SeqCst); | ||
Ok(()) | ||
} | ||
|
||
/// Import the previous chunks into the current restoration | ||
fn import_prev_chunks(&self, restoration: &mut Option<Restoration>, manifest: ManifestData) -> Result<(), Error> { | ||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
// Restore previous snapshot chunks | ||
let files = fs::read_dir(prev_chunks.as_path())?; | ||
let mut num_temp_chunks = 0; | ||
|
||
for prev_chunk_file in files { | ||
// Don't go over all the files if the restoration has been aborted | ||
if !self.restoring_snapshot.load(Ordering::SeqCst) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think restoration can be aborted since this function is called holding the restoration lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it first sets |
||
trace!(target:"snapshot", "Aborting importing previous chunks"); | ||
return Ok(()); | ||
} | ||
// Import the chunk, don't fail and continue if one fails | ||
match self.import_prev_chunk(restoration, &manifest, prev_chunk_file) { | ||
Ok(true) => num_temp_chunks += 1, | ||
Err(e) => trace!(target: "snapshot", "Error importing chunk: {:?}", e), | ||
_ => (), | ||
} | ||
} | ||
|
||
trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks); | ||
|
||
// Remove the prev temp directory | ||
fs::remove_dir_all(&prev_chunks)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Import a previous chunk at the given path. Returns whether the block was imported or not | ||
fn import_prev_chunk(&self, restoration: &mut Option<Restoration>, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<bool, Error> { | ||
let file = file?; | ||
let path = file.path(); | ||
|
||
let mut file = File::open(path.clone())?; | ||
let mut buffer = Vec::new(); | ||
file.read_to_end(&mut buffer)?; | ||
|
||
let hash = keccak(&buffer); | ||
|
||
let is_state = if manifest.block_hashes.contains(&hash) { | ||
false | ||
} else if manifest.state_hashes.contains(&hash) { | ||
true | ||
} else { | ||
return Ok(false); | ||
}; | ||
|
||
self.feed_chunk_with_restoration(restoration, hash, &buffer, is_state)?; | ||
|
||
trace!(target: "snapshot", "Fed chunk {:?}", hash); | ||
|
||
Ok(true) | ||
} | ||
|
||
// finalize the restoration. this accepts an already-locked | ||
// restoration as an argument -- so acquiring it again _will_ | ||
// lead to deadlock. | ||
|
@@ -499,12 +588,19 @@ impl Service { | |
/// Feed a chunk of either kind. no-op if no restoration or status is wrong. | ||
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | ||
// TODO: be able to process block chunks and state chunks at same time? | ||
let (result, db) = { | ||
let mut restoration = self.restoration.lock(); | ||
let mut restoration = self.restoration.lock(); | ||
self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state) | ||
} | ||
|
||
/// Feed a chunk with the Restoration | ||
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | ||
let (result, db) = { | ||
match self.status() { | ||
RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()), | ||
RestorationStatus::Ongoing { .. } => { | ||
RestorationStatus::Inactive | RestorationStatus::Failed => { | ||
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash); | ||
return Ok(()); | ||
}, | ||
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => { | ||
let (res, db) = { | ||
let rest = match *restoration { | ||
Some(ref mut r) => r, | ||
|
@@ -583,11 +679,41 @@ impl SnapshotService for Service { | |
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) | ||
} | ||
|
||
fn completed_chunks(&self) -> Option<Vec<H256>> { | ||
let restoration = self.restoration.lock(); | ||
|
||
match *restoration { | ||
Some(ref restoration) => { | ||
let completed_chunks = restoration.manifest.block_hashes | ||
.iter() | ||
.filter(|h| !restoration.block_chunks_left.contains(h)) | ||
.chain( | ||
restoration.manifest.state_hashes | ||
.iter() | ||
.filter(|h| !restoration.state_chunks_left.contains(h)) | ||
) | ||
.map(|h| *h) | ||
.collect(); | ||
|
||
Some(completed_chunks) | ||
}, | ||
None => None, | ||
} | ||
} | ||
|
||
fn status(&self) -> RestorationStatus { | ||
let mut cur_status = self.status.lock(); | ||
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status { | ||
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; | ||
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; | ||
|
||
match *cur_status { | ||
RestorationStatus::Initializing { ref mut chunks_done } => { | ||
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 + | ||
self.block_chunks.load(Ordering::SeqCst) as u32; | ||
} | ||
RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } => { | ||
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; | ||
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; | ||
}, | ||
_ => (), | ||
} | ||
|
||
cur_status.clone() | ||
|
@@ -600,6 +726,7 @@ impl SnapshotService for Service { | |
} | ||
|
||
fn abort_restore(&self) { | ||
trace!(target: "snapshot", "Aborting restore"); | ||
self.restoring_snapshot.store(false, Ordering::SeqCst); | ||
*self.restoration.lock() = None; | ||
*self.status.lock() = RestorationStatus::Inactive; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this should be moved under
restoration.lock()
, previously it was ok to remove and create the same directory multiple times, now we are doing a rename which should probably be done only once.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is done now)