Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Better logging when backfilling ancient blocks fail #10796

Merged
merged 22 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ethcore/blockchain/src/best_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use common_types::header::Header;
/// For GHOST fork-choice rule it would typically describe the block with highest
/// combined difficulty (usually the block with the highest block number).
///
/// Sometimes refered as 'latest block'.
/// Sometimes referred as 'latest block'.
#[derive(Debug)]
pub struct BestBlock {
/// Best block decoded header.
pub header: Header,
Expand All @@ -35,7 +36,7 @@ pub struct BestBlock {
}

/// Best ancient block info. If the blockchain has a gap this keeps track of where it starts.
#[derive(Default)]
#[derive(Debug, Default)]
pub struct BestAncientBlock {
/// Best block hash.
pub hash: H256,
Expand Down
5 changes: 1 addition & 4 deletions ethcore/blockchain/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,10 +652,7 @@ impl BlockChain {
// and write them
if let (Some(hash), Some(number)) = (best_ancient, best_ancient_number) {
let mut best_ancient_block = bc.best_ancient_block.write();
*best_ancient_block = Some(BestAncientBlock {
hash: hash,
number: number,
});
*best_ancient_block = Some(BestAncientBlock { hash, number });
}
}

Expand Down
6 changes: 2 additions & 4 deletions ethcore/db/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub struct TransactionAddress {
}

/// Contains all block receipts.
#[derive(Clone, RlpEncodableWrapper, RlpDecodableWrapper, MallocSizeOf)]
#[derive(Debug, Clone, RlpEncodableWrapper, RlpDecodableWrapper, MallocSizeOf)]
pub struct BlockReceipts {
/// Block receipts
pub receipts: Vec<Receipt>,
Expand All @@ -214,9 +214,7 @@ pub struct BlockReceipts {
impl BlockReceipts {
/// Create new block receipts wrapper.
pub fn new(receipts: Vec<Receipt>) -> Self {
BlockReceipts {
receipts: receipts
}
BlockReceipts { receipts }
}
}

Expand Down
3 changes: 2 additions & 1 deletion ethcore/src/snapshot/consensus/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl Rebuilder for ChunkRebuilder {
Ok(())
}

fn finalize(&mut self, _engine: &dyn EthEngine) -> Result<(), ::error::Error> {
fn finalize(&mut self) -> Result<(), ::error::Error> {
if !self.had_genesis {
return Err(Error::WrongChunkFormat("No genesis transition included.".into()).into());
}
Expand All @@ -358,6 +358,7 @@ impl Rebuilder for ChunkRebuilder {
None => return Err(Error::WrongChunkFormat("Warp target block not included.".into()).into()),
};

trace!(target: "snapshot", "rebuilder, finalize: verifying {} unverified first blocks", self.unverified_firsts.len());
// verify the first entries of chunks we couldn't before.
// we store all last verifiers, but not all firsts.
// match each unverified first epoch with a last epoch verifier.
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/snapshot/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,5 @@ pub trait Rebuilder: Send {
///
/// This should apply the necessary "glue" between chunks,
/// and verify against the restored state.
fn finalize(&mut self, engine: &dyn EthEngine) -> Result<(), ::error::Error>;
fn finalize(&mut self) -> Result<(), ::error::Error>;
}
10 changes: 5 additions & 5 deletions ethcore/src/snapshot/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ impl PowRebuilder {
/// Create a new PowRebuilder.
fn new(chain: BlockChain, db: Arc<dyn KeyValueDB>, manifest: &ManifestData, snapshot_blocks: u64) -> Result<Self, ::error::Error> {
Ok(PowRebuilder {
chain: chain,
db: db,
chain,
db,
rng: OsRng::new().map_err(|e| format!("{}", e))?,
disconnected: Vec::new(),
best_number: manifest.block_number,
best_hash: manifest.block_hash,
best_root: manifest.state_root,
fed_blocks: 0,
snapshot_blocks: snapshot_blocks,
snapshot_blocks,
})
}
}
Expand Down Expand Up @@ -298,9 +298,9 @@ impl Rebuilder for PowRebuilder {
}

/// Glue together any disconnected chunks and check that the chain is complete.
fn finalize(&mut self, _: &dyn EthEngine) -> Result<(), ::error::Error> {
fn finalize(&mut self) -> Result<(), ::error::Error> {
let mut batch = self.db.transaction();

trace!(target: "snapshot", "rebuilder, finalize: inserting {} disconnected chunks", self.disconnected.len());
for (first_num, first_hash) in self.disconnected.drain(..) {
let parent_num = first_num - 1;

Expand Down
6 changes: 3 additions & 3 deletions ethcore/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ pub enum Error {
BadEpochProof(u64),
/// Wrong chunk format.
WrongChunkFormat(String),
/// Unlinked ancient block chain
UnlinkedAncientBlockChain,
/// Unlinked ancient block chain; includes the parent hash where linkage failed
UnlinkedAncientBlockChain(H256),
}

impl error::Error for Error {
Expand Down Expand Up @@ -108,7 +108,7 @@ impl fmt::Display for Error {
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
Error::UnlinkedAncientBlockChain(parent_hash) => write!(f, "Unlinked ancient blocks chain at parent_hash={}", parent_hash),
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
87 changes: 57 additions & 30 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ impl Restoration {

let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?;

let root = manifest.state_root.clone();
let final_state_root = manifest.state_root.clone();

Ok(Restoration {
manifest: manifest,
manifest,
state_chunks_left: state_chunks,
block_chunks_left: block_chunks,
state: StateRebuilder::new(raw_db.key_value().clone(), params.pruning),
secondary: secondary,
secondary,
writer: params.writer,
snappy_buffer: Vec::new(),
final_state_root: root,
final_state_root,
guard: params.guard,
db: raw_db,
})
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Restoration {
}

// finish up restoration.
fn finalize(mut self, engine: &dyn EthEngine) -> Result<(), Error> {
fn finalize(mut self) -> Result<(), Error> {
use trie::TrieError;

if !self.is_done() { return Ok(()) }
Expand All @@ -186,13 +186,14 @@ impl Restoration {
self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?;

// connect out-of-order chunks and verify chain integrity.
self.secondary.finalize(engine)?;
self.secondary.finalize()?;

if let Some(writer) = self.writer {
writer.finish(self.manifest)?;
}

self.guard.disarm();
trace!(target: "snapshot", "restoration finalised correctly");
Ok(())
}

Expand Down Expand Up @@ -337,16 +338,6 @@ impl Service {
dir
}

// replace one the client's database with our own.
fn replace_client_db(&self) -> Result<(), Error> {
let migrated_blocks = self.migrate_blocks()?;
info!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);

let rest_db = self.restoration_db();
self.client.restore_db(&*rest_db.to_string_lossy())?;
Ok(())
}

// Migrate the blocks in the current DB into the new chain
fn migrate_blocks(&self) -> Result<usize, Error> {
// Count the number of migrated blocks
Expand All @@ -361,11 +352,27 @@ impl Service {

// The old database looks like this:
// [genesis, best_ancient_block] ... [first_block, best_block]
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can assume that the whole range from [genesis, best_block] is imported.
// The new database only contains the tip of the chain ([first_block, best_block]),
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can
// assume that the whole range from [genesis, best_block] is imported.
// The new database only contains the tip of the chain ([new_first_block, new_best_block]),
// so the useful set of blocks is defined as:
// [0 ... min(new.first_block, best_ancient_block or best_block)]
//
// If, for whatever reason, the old db does not have ancient blocks (i.e.
// `best_ancient_block` is `None`) AND a non-zero `first_block`, such that the old db looks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think best_ancient_block being None can either mean: "no ancient blocks imported" or "all ancient blocks imported (no gaps)". So comment is misleading, although in the case when first_block is Some then the former is the true.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I should move the parens I think.

// like [old_first_block..old_best_block] (which may or may not partially overlap with
// [new_first_block..new_best_block]) we do the conservative thing and do not migrate the
// old blocks.
let find_range = || -> Option<(H256, H256)> {
// TODO: In theory, if the current best_block is > new first_block (i.e. they overlap)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep the comment, but drop the TODO: prefix :)

// we could salvage them but what if there's been a re-org at the boundary and the two
// chains do not match anymore? Would it actually work?
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
if cur_chain_info.ancient_block_number.is_none() && cur_chain_info.first_block_number.unwrap_or(0) > 0 {
warn!(target: "blockchain", "blocks in the current DB do not stretch back to genesis; can't salvage them into the new DB. In current DB first block : {:?}/#{:?}, best block: {:?}, #{:?}",
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
cur_chain_info.first_block_hash, cur_chain_info.first_block_number,
cur_chain_info.best_block_number, cur_chain_info.best_block_hash);
return None;
}
let next_available_from = next_chain_info.first_block_number?;
let cur_available_to = cur_chain_info.ancient_block_number.unwrap_or(cur_chain_info.best_block_number);

Expand All @@ -375,10 +382,11 @@ impl Service {
return None;
}

trace!(target: "snapshot", "Trying to import ancient blocks until {}", highest_block_num);
trace!(target: "snapshot", "Trying to import ancient blocks until {}. First block in new chain=#{}, first block in old chain=#{:?}, best block in old chain=#{}", highest_block_num,
next_available_from, cur_chain_info.first_block_number, cur_chain_info.best_block_number);

// Here we start from the highest block number and go backward to 0,
// thus starting at `highest_block_num` and targetting `0`.
// thus starting at `highest_block_num` and targeting `0`.
let target_hash = self.client.block_hash(BlockId::Number(0))?;
let start_hash = self.client.block_hash(BlockId::Number(highest_block_num))?;

Expand All @@ -398,7 +406,10 @@ impl Service {
return Ok(count);
}

let block = self.client.block(BlockId::Hash(parent_hash)).ok_or(::snapshot::error::Error::UnlinkedAncientBlockChain)?;
let block = self.client.block(BlockId::Hash(parent_hash)).ok_or_else(|| {
error!(target: "snapshot", "migrate_blocks: did not find block from parent_hash={} (start_hash={})", parent_hash, start_hash);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
::snapshot::error::Error::UnlinkedAncientBlockChain(parent_hash)
})?;
parent_hash = block.parent_hash();

let block_number = block.number();
Expand All @@ -412,7 +423,11 @@ impl Service {
next_chain.insert_unordered_block(&mut batch, block, block_receipts, Some(parent_total_difficulty), false, true);
count += 1;
},
_ => break,
_ => {
error!(target: "snapshot", "migrate_blocks: failed to find receipts and parent total difficulty. Block #{}, parent_hash={:?}, parent_total_difficulty={:?}",
block_number, parent_hash, parent_total_difficulty);
break
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
},
}

// Writing changes to DB and logging every now and then
Expand All @@ -435,7 +450,11 @@ impl Service {

// We couldn't reach the targeted hash
if parent_hash != target_hash {
return Err(::snapshot::error::Error::UnlinkedAncientBlockChain.into());
error!(target: "snapshot", "migrate_blocks: could not reach the target_hash, parent_hash={:?}, target_hash={:?}, start_hash={:?}, ancient_block_number={:?}, best_block_number={:?}",
parent_hash, target_hash, start_hash,
cur_chain_info.ancient_block_number, cur_chain_info.best_block_number,
);
return Err(::snapshot::error::Error::UnlinkedAncientBlockChain(parent_hash).into());
}

// Update best ancient block in the Next Chain
Expand Down Expand Up @@ -549,6 +568,8 @@ impl Service {

*self.status.lock() = RestorationStatus::Initializing {
chunks_done: 0,
state_chunks: manifest.state_hashes.len() as u32,
block_chunks: manifest.block_hashes.len() as u32,
};

fs::create_dir_all(&rest_dir)?;
Expand All @@ -563,7 +584,7 @@ impl Service {
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&rest_db)?,
writer: writer,
writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: &*self.engine,
Expand Down Expand Up @@ -654,15 +675,20 @@ impl Service {
// lead to deadlock.
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
trace!(target: "snapshot", "finalizing restoration");
*self.status.lock() = RestorationStatus::Finalizing;

let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());

// destroy the restoration before replacing databases and snapshot.
rest.take()
.map(|r| r.finalize(&*self.engine))
.map(|r| r.finalize())
.unwrap_or(Ok(()))?;

self.replace_client_db()?;
let migrated_blocks = self.migrate_blocks()?;
info!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);

// replace the Client's database with the new one (restart the Client).
self.client.restore_db(&*self.restoration_db().to_string_lossy())?;

if recover {
let mut reader = self.reader.write();
Expand Down Expand Up @@ -695,8 +721,9 @@ impl Service {
Ok(()) |
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => (),
Err(e) => {
// TODO: after this we're sometimes deadlocked
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually always deadlock - because we already have self.restoration.lock() acquired in line 720. parking_lot locks are not re-entrant.

Instead of calling abort_restore() we should rather use *restoration = None here. and maybe add a trace line if you care about it.
Alternatively we could have abort_restore take the locked restoration. IMHO if we do that for some methods already we should do that for all of them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually always deadlock

Well, for some reason we do not. It's pretty rare, I've seen it twice.

Instead of calling abort_restore() we should rather use *restoration = None

That's what abort_restore() does though: here. Why is it better to do it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it better to do it here?

To avoid locking twice?

As I said the second ption should be abort_restore_with_restoration(locked_res: &mut Option<Restoration>) so we can keep the logic there, but just pass the existing lock.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the reason I removed *restoration = None from here and replaced it with a call to abort_restore() was to collect all shutdown actions in one place. Not that abort_restore() is doing very much atm, but I still think it's good to have a single point of abortion.
is locking twice really a concern though: this is an error handler and I don't think we care much about performance?

abort_restore_with_restoration(locked_res…)

The restoration is a member of the struct here, so I'm not sure what the point of passing it as a param tbh. Now you got me confused!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since parking_lot::Mutex is not re-entrant locking it twice in the same thread can (or does) lead to a deadlock. So the call like that:

fn some(&self) {
  let mut restorating = self.restoration.lock();
  ....
  self.abort_restore();
}

fn abort_restore(&self) {
  *self.restoration.lock() = None
}

is guaranteed to deadlock, I thought always, but I suspect it might be just random.

In the same file I suppose we already had deadlock issues, so the with_restoration pattern emerged, where instead of locking resources locally in a particular function you get passed a (mutable) reference to the locked resource:

let mut restoration = self.restoration.lock();
self.feed_chunk_with_restoration(&mut restoration)

I propose to use exactly the same patter for abort_restore - I'm totally fine grouping all the shutdown actions there, but to prevent potential double-locking and potential deadlocks we can pass the locked resource, so the first example becomes:

fn some(&self) {
  let mut restoration = self.restoration.lock();
  ....
  self.abort_restore(&mut restoration);
}

fn abort_restore(&self, res: &mut Option<Restoration>) {
  *res = None
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for explaining.

Is this an option?

		let r = {
			let mut restoration = self.restoration.lock();
			self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state)
		};
		match r {
			…
			Err(e) => {
				…
				self.abort_restore();
				…
			}
		}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to docs:

Attempts to lock a mutex in the thread which already holds the lock will result in a deadlock.

So I'm not sure either why it didn't always deadlock. The proposed fix looks good.

warn!("Encountered error during snapshot restoration: {}", e);
*self.restoration.lock() = None;
self.abort_restore();
*self.status.lock() = RestorationStatus::Failed;
let _ = fs::remove_dir_all(self.restoration_dir());
}
Expand All @@ -707,8 +734,8 @@ impl Service {
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
let (result, db) = {
match self.status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash);
RestorationStatus::Inactive | RestorationStatus::Failed | RestorationStatus::Finalizing => {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive, failed or finalizing", hash);
return Ok(());
},
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => {
Expand Down Expand Up @@ -803,7 +830,7 @@ impl SnapshotService for Service {
let mut cur_status = self.status.lock();

match *cur_status {
RestorationStatus::Initializing { ref mut chunks_done } => {
RestorationStatus::Initializing { ref mut chunks_done, .. } => {
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 +
self.block_chunks.load(Ordering::SeqCst) as u32;
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/snapshot/tests/proof_of_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn chunk_and_restore(amount: u64) {
rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap();
}

rebuilder.finalize(engine.as_ref()).unwrap();
rebuilder.finalize().unwrap();
drop(rebuilder);

// and test it.
Expand Down
8 changes: 6 additions & 2 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl SyncHandler {
return Err(DownloaderImportError::Invalid);
}
match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain => {
BlockStatus::InChain => {
trace!(target: "sync", "New block hash already in chain {:?}", hash);
},
BlockStatus::Queued => {
Expand Down Expand Up @@ -529,10 +529,14 @@ impl SyncHandler {
sync.snapshot.clear();
return Ok(());
},
RestorationStatus::Initializing { .. } => {
RestorationStatus::Initializing { .. } => {
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
return Ok(());
}
RestorationStatus::Finalizing => {
trace!(target: "warp", "{}: Snapshot finalizing restoration", peer_id);
return Ok(());
}
RestorationStatus::Ongoing { .. } => {
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
},
Expand Down
5 changes: 4 additions & 1 deletion ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ impl ChainSync {
RestorationStatus::Inactive | RestorationStatus::Failed => {
self.set_state(SyncState::SnapshotWaiting);
},
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (),
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } | RestorationStatus::Finalizing => (),
},
SyncState::SnapshotWaiting => {
match io.snapshot_service().status() {
Expand All @@ -1221,6 +1221,9 @@ impl ChainSync {
RestorationStatus::Initializing { .. } => {
trace!(target:"sync", "Snapshot restoration is initializing");
},
RestorationStatus::Finalizing { .. } => {
trace!(target:"sync", "Snapshot finalizing restoration");
},
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
Expand Down
Loading