Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bad block list for invalid blocks after sync #3637

Merged
merged 7 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub enum BlockSyncError {
RpcRequestError(#[from] RpcStatus),
#[error("Chain storage error: {0}")]
ChainStorageError(#[from] ChainStorageError),
#[error("Peer sent invalid block body: {0}")]
ReceivedInvalidBlockBody(String),
#[error("Peer sent a block that did not form a chain. Expected hash = {expected}, got = {got}")]
PeerSentBlockThatDidNotFormAChain { expected: String, got: String },
#[error("Connectivity Error: {0}")]
Expand All @@ -48,4 +46,6 @@ pub enum BlockSyncError {
FailedToBan(ConnectivityError),
#[error("Failed to construct valid chain block")]
FailedToConstructChainBlock,
#[error("Peer violated the block sync protocol: {0}")]
ProtocolViolation(String),
}
58 changes: 50 additions & 8 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use crate::{
sync::{hooks::Hooks, rpc, SyncPeer},
BlockSyncConfig,
},
blocks::{Block, ChainBlock},
blocks::{Block, BlockValidationError, ChainBlock},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::SyncBlocksRequest,
tari_utilities::{hex::Hex, Hashable},
transactions::aggregated_body::AggregateBody,
validation::BlockSyncBodyValidation,
validation::{BlockSyncBodyValidation, ValidationError},
};
use futures::StreamExt;
use log::*;
Expand Down Expand Up @@ -96,7 +96,29 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
self.db.cleanup_orphans().await?;
Ok(())
},
Err(err @ BlockSyncError::ValidationError(_)) | Err(err @ BlockSyncError::ReceivedInvalidBlockBody(_)) => {
Err(BlockSyncError::ValidationError(err)) => {
match &err {
ValidationError::BlockHeaderError(_) => {},
ValidationError::BlockError(BlockValidationError::MismatchedMmrRoots) |
ValidationError::BadBlockFound { .. } |
ValidationError::BlockError(BlockValidationError::MismatchedMmrSize { .. }) => {
let num_cleared = self.db.clear_all_pending_headers().await?;
warn!(
target: LOG_TARGET,
"Cleared {} incomplete headers from bad chain", num_cleared
);
},
_ => {},
}
warn!(
target: LOG_TARGET,
"Banning peer because provided block failed validation: {}", err
);
self.ban_peer(node_id, &err).await?;
Err(err.into())
},
Err(err @ BlockSyncError::ProtocolViolation(_)) => {
warn!(target: LOG_TARGET, "Banning peer: {}", err);
self.ban_peer(node_id, &err).await?;
Err(err)
},
Expand Down Expand Up @@ -166,9 +188,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.fetch_chain_header_by_block_hash(block.hash.clone())
.await?
.ok_or_else(|| {
BlockSyncError::ReceivedInvalidBlockBody("Peer sent hash for block header we do not have".into())
BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into())
})?;

let current_height = header.height();
let header_hash = header.hash().clone();

if header.header().prev_hash != prev_hash {
Expand All @@ -183,21 +206,40 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
let body = block
.body
.map(AggregateBody::try_from)
.ok_or_else(|| BlockSyncError::ReceivedInvalidBlockBody("Block body was empty".to_string()))?
.map_err(BlockSyncError::ReceivedInvalidBlockBody)?;
.ok_or_else(|| BlockSyncError::ProtocolViolation("Block body was empty".to_string()))?
.map_err(BlockSyncError::ProtocolViolation)?;

debug!(
target: LOG_TARGET,
"Validating block body #{} (PoW = {}, {})",
header.height(),
current_height,
header.header().pow_algo(),
body.to_counts_string(),
);

let timer = Instant::now();
let (header, header_accum_data) = header.into_parts();

let block = self.block_validator.validate_body(Block::new(header, body)).await?;
let block = match self.block_validator.validate_body(Block::new(header, body)).await {
Ok(block) => block,
Err(err @ ValidationError::BadBlockFound { .. }) |
Err(err @ ValidationError::FatalStorageError(_)) |
Err(err @ ValidationError::AsyncTaskFailed(_)) |
Err(err @ ValidationError::CustomError(_)) => return Err(err.into()),
Err(err) => {
// Add to bad blocks
if let Err(err) = self
.db
.write_transaction()
.insert_bad_block(header_hash, current_height)
.commit()
.await
{
error!(target: LOG_TARGET, "Failed to insert bad block: {}", err);
}
return Err(err.into());
},
};

let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
.map(Arc::new)
Expand Down
11 changes: 9 additions & 2 deletions base_layer/core/src/base_node/sync/header_sync/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
tari_utilities::{epoch_time::EpochTime, hash::Hashable, hex::Hex},
validation::helpers::{
check_header_timestamp_greater_than_median,
check_not_bad_block,
check_pow_data,
check_target_difficulty,
check_timestamp_ftl,
Expand Down Expand Up @@ -138,7 +139,13 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
);
let achieved_target = check_target_difficulty(&header, target_difficulty, &self.randomx_factory)?;

check_pow_data(&header, &self.consensus_rules, &*self.db.inner().db_read_access()?)?;
let block_hash = header.hash();

{
let txn = self.db.inner().db_read_access()?;
check_not_bad_block(&*txn, &block_hash)?;
check_pow_data(&header, &self.consensus_rules, &*txn)?;
}

// Header is valid, add this header onto the validation state for the next round
// Mutable borrow done later in the function to allow multiple immutable borrows before this line. This has
Expand All @@ -159,7 +166,7 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
state.target_difficulties.add_back(&header, target_difficulty);

let accumulated_data = BlockHeaderAccumulatedData::builder(&state.previous_accum)
.with_hash(header.hash())
.with_hash(block_hash)
.with_achieved_target_difficulty(achieved_target)
.with_total_kernel_offset(header.total_kernel_offset.clone())
.build()?;
Expand Down
9 changes: 9 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_last_header() -> BlockHeader, "fetch_last_header");

make_async_fn!(clear_all_pending_headers() -> usize, "clear_all_pending_headers");

make_async_fn!(fetch_last_chain_header() -> ChainHeader, "fetch_last_chain_header");

make_async_fn!(fetch_tip_header() -> ChainHeader, "fetch_tip_header");
Expand All @@ -222,6 +224,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(block_exists(block_hash: BlockHash) -> bool, "block_exists");

make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists");

make_async_fn!(fetch_block(height: u64) -> HistoricalBlock, "fetch_block");

make_async_fn!(fetch_blocks<T: RangeBounds<u64>>(bounds: T) -> Vec<HistoricalBlock>, "fetch_blocks");
Expand Down Expand Up @@ -376,6 +380,11 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> {
self
}

pub fn insert_bad_block(&mut self, hash: HashOutput, height: u64) -> &mut Self {
self.transaction.insert_bad_block(hash, height);
self
}

pub async fn commit(&mut self) -> Result<(), ChainStorageError> {
let transaction = mem::take(&mut self.transaction);
self.db.write(transaction).await
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub trait BlockchainBackend: Send + Sync {
fn orphan_count(&self) -> Result<usize, ChainStorageError>;
/// Returns the stored header with the highest corresponding height.
fn fetch_last_header(&self) -> Result<BlockHeader, ChainStorageError>;

/// Clear all headers that are beyond the current height of longest chain, returning the number of headers that were
/// deleted.
fn clear_all_pending_headers(&self) -> Result<usize, ChainStorageError>;
/// Returns the stored header and accumulated data with the highest height.
fn fetch_last_chain_header(&self) -> Result<ChainHeader, ChainStorageError>;
/// Returns the stored header with the highest corresponding height.
Expand Down Expand Up @@ -181,4 +185,7 @@ pub trait BlockchainBackend: Send + Sync {
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError>;

/// Check if a block hash is in the bad block list
fn bad_block_exists(&self, block_hash: HashOutput) -> Result<bool, ChainStorageError>;
}
23 changes: 18 additions & 5 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ use crate::{
HeaderValidation,
OrphanValidation,
PostOrphanBodyValidation,
ValidationError,
},
};

Expand Down Expand Up @@ -853,6 +852,11 @@ where B: BlockchainBackend
Ok(())
}

pub fn clear_all_pending_headers(&self) -> Result<usize, ChainStorageError> {
let db = self.db_write_access()?;
db.clear_all_pending_headers()
}

/// Clean out the entire orphan pool
pub fn cleanup_all_orphans(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
Expand Down Expand Up @@ -949,6 +953,12 @@ where B: BlockchainBackend
Ok(db.contains(&DbKey::BlockHash(hash.clone()))? || db.contains(&DbKey::OrphanBlock(hash))?)
}

/// Returns true if this block exists in the chain, or is orphaned.
pub fn bad_block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
db.bad_block_exists(hash)
}

/// Atomically commit the provided transaction to the database backend. This function does not update the metadata.
pub fn commit(&self, txn: DbTransaction) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
Expand Down Expand Up @@ -1263,10 +1273,13 @@ fn insert_best_block(txn: &mut DbTransaction, block: Arc<ChainBlock>) -> Result<
block_hash.to_hex()
);
if block.header().pow_algo() == PowAlgorithm::Monero {
let monero_seed = MoneroPowData::from_header(block.header())
.map_err(|e| ValidationError::CustomError(e.to_string()))?
.randomx_key;
txn.insert_monero_seed_height(monero_seed.to_vec(), block.height());
let monero_header =
MoneroPowData::from_header(block.header()).map_err(|e| ChainStorageError::InvalidArguments {
func: "insert_best_block",
arg: "block",
message: format!("block contained invalid or malformed monero PoW data: {}", e),
})?;
txn.insert_monero_seed_height(monero_header.randomx_key.to_vec(), block.height());
}

let height = block.height();
Expand Down
14 changes: 14 additions & 0 deletions base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ impl DbTransaction {
self
}

/// Inserts a block hash into the bad block list
pub fn insert_bad_block(&mut self, block_hash: HashOutput, height: u64) -> &mut Self {
self.operations.push(WriteOperation::InsertBadBlock {
hash: block_hash,
height,
});
self
}

/// Stores an orphan block. No checks are made as to whether this is actually an orphan. That responsibility lies
/// with the calling function.
/// The transaction will rollback and write will return an error if the orphan already exists.
Expand Down Expand Up @@ -298,6 +307,10 @@ pub enum WriteOperation {
witness_hash: HashOutput,
mmr_position: u32,
},
InsertBadBlock {
hash: HashOutput,
height: u64,
},
DeleteHeader(u64),
DeleteOrphan(HashOutput),
DeleteBlock(HashOutput),
Expand Down Expand Up @@ -440,6 +453,7 @@ impl fmt::Display for WriteOperation {
SetPrunedHeight { height, .. } => write!(f, "Set pruned height to {}", height),
DeleteHeader(height) => write!(f, "Delete header at height: {}", height),
DeleteOrphan(hash) => write!(f, "Delete orphan with hash: {}", hash.to_hex()),
InsertBadBlock { hash, height } => write!(f, "Insert bad block #{} {}", height, hash.to_hex()),
}
}
}
Expand Down
41 changes: 39 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use lmdb_zero::{
del,
error::{self, LmdbResultExt},
put,
traits::{AsLmdbBytes, FromLmdbBytes},
traits::{AsLmdbBytes, CreateCursor, FromLmdbBytes},
ConstTransaction,
Cursor,
CursorIter,
Expand Down Expand Up @@ -397,7 +397,7 @@ where
Ok(result)
}

/// Fetches all the size of all key/values in the given DB. Returns the number of entries, the total size of all the
/// Fetches the size of all key/values in the given DB. Returns the number of entries, the total size of all the
/// keys and values in bytes.
pub fn fetch_db_entry_sizes(txn: &ConstTransaction<'_>, db: &Database) -> Result<(u64, u64, u64), ChainStorageError> {
let access = txn.access();
Expand All @@ -412,3 +412,40 @@ pub fn fetch_db_entry_sizes(txn: &ConstTransaction<'_>, db: &Database) -> Result
}
Ok((num_entries, total_key_size, total_value_size))
}

pub fn lmdb_delete_each_where<K, V, F>(
txn: &WriteTransaction<'_>,
db: &Database,
mut predicate: F,
) -> Result<usize, ChainStorageError>
where
K: FromLmdbBytes + ?Sized,
V: DeserializeOwned,
F: FnMut(&K, V) -> Option<bool>,
{
let mut cursor = txn.cursor(db)?;
let mut access = txn.access();
let mut num_deleted = 0;
while let Some((k, v)) = cursor.next::<K, [u8]>(&access).to_opt()? {
match deserialize(v) {
Ok(v) => match predicate(k, v) {
Some(true) => {
cursor.del(&mut access, del::Flags::empty())?;
num_deleted += 1;
},
Some(false) => continue,
None => {
break;
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Could not could not deserialize value from lmdb: {:?}", e
);
return Err(ChainStorageError::AccessError(e.to_string()));
},
}
}
Ok(num_deleted)
}
Loading