Skip to content

Commit

Permalink
feat: ban bad block-sync peers (#5871)
Browse files Browse the repository at this point in the history
Description
---
- Added a check to ban a misbehaving peer after block sync when not
supplying any or all of the blocks corresponding to the accumulated
difficulty they claimed they had.
- Added a check in the RPC block-sync server method not to try and
supply blocks if it does not have both blocks corresponding to the start
and end hash in its chain.
- Moved all block sync RPC errors to the short ban category from the
no=ban cetegrory.
- Added happy path and ban integration-level unit tests for block sync.

Motivation and Context
---
The new unit tests that were added highlighted some issues where sync
peers are not banned for their bad behaviour.

How Has This Been Tested?
---
Added new integration-level unit tests.

What process can a PR reviewer use to test or verify this change?
---
- Code walk through.
- Review and run unit tests .

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

---------

Co-authored-by: SW van Heerden <[email protected]>
  • Loading branch information
hansieodendaal and SWvheerden authored Oct 27, 2023
1 parent 879e1e1 commit 5c2781e
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ where B: BlockchainBackend + 'static
}

async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
if self.blockchain_db.block_exists(block).await? {
if self.blockchain_db.chain_block_or_orphan_block_exists(block).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
Expand Down
10 changes: 7 additions & 3 deletions base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub enum BlockSyncError {
SyncRoundFailed,
#[error("Could not find peer info")]
PeerNotFound,
#[error("Peer did not supply all the blocks they claimed they had: {0}")]
PeerDidNotSupplyAllClaimedBlocks(String),
}

impl BlockSyncError {
Expand All @@ -93,6 +95,7 @@ impl BlockSyncError {
BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError",
BlockSyncError::SyncRoundFailed => "SyncRoundFailed",
BlockSyncError::PeerNotFound => "PeerNotFound",
BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) => "PeerDidNotSupplyAllClaimedBlocks",
}
}
}
Expand All @@ -102,8 +105,6 @@ impl BlockSyncError {
match self {
// no ban
BlockSyncError::AsyncTaskFailed(_) |
BlockSyncError::RpcError(_) |
BlockSyncError::RpcRequestError(_) |
BlockSyncError::ChainStorageError(_) |
BlockSyncError::ConnectivityError(_) |
BlockSyncError::NoMoreSyncPeers(_) |
Expand All @@ -113,7 +114,10 @@ impl BlockSyncError {
BlockSyncError::SyncRoundFailed => None,

// short ban
err @ BlockSyncError::MaxLatencyExceeded { .. } => Some(BanReason {
err @ BlockSyncError::MaxLatencyExceeded { .. } |
err @ BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) |
err @ BlockSyncError::RpcError(_) |
err @ BlockSyncError::RpcRequestError(_) => Some(BanReason {
reason: format!("{}", err),
ban_duration: short_ban,
}),
Expand Down
13 changes: 12 additions & 1 deletion base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
warn!(target: LOG_TARGET, "{} ({})", err, sync_round);
continue;
},
Err(err) => return Err(err),
Err(err) => {
return Err(err);
},
}
}
}
Expand Down Expand Up @@ -407,6 +409,15 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
last_sync_timer = Instant::now();
}

let accumulated_difficulty = self.db.get_chain_metadata().await?.accumulated_difficulty();
if accumulated_difficulty < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
"Their claimed difficulty: {}, our local difficulty after block sync: {}",
sync_peer.claimed_chain_metadata().accumulated_difficulty(),
accumulated_difficulty
)));
}

if let Some(block) = current_block {
self.hooks.call_on_complete_hooks(block, best_height);
}
Expand Down
10 changes: 7 additions & 3 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.start_hash
.try_into()
.map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
if db.fetch_block_by_hash(hash, true).await.is_err() {
return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
}
let start_header = db
.fetch_header_by_block_hash(hash)
.await
Expand All @@ -137,13 +140,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
)));
}

if start_height > metadata.height_of_longest_chain() {
return Ok(Streaming::empty());
}
let hash = message
.end_hash
.try_into()
.map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
if db.fetch_block_by_hash(hash, true).await.is_err() {
return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
}

let end_header = db
.fetch_header_by_block_hash(hash)
.await
Expand Down
5 changes: 2 additions & 3 deletions base_layer/core/src/base_node/sync/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mod sync_blocks {
}

#[tokio::test]
async fn it_sends_an_empty_response() {
async fn it_sends_bad_request_on_bad_response() {
let (service, db, rpc_request_mock, _tmp) = setup();

let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"])).await;
Expand All @@ -86,8 +86,7 @@ mod sync_blocks {
end_hash: block.hash().to_vec(),
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let mut streaming = service.sync_blocks(req).await.unwrap();
assert!(streaming.next().await.is_none());
assert!(service.sync_blocks(req).await.is_err());
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(cleanup_all_orphans() -> (), "cleanup_all_orphans");

make_async_fn!(block_exists(block_hash: BlockHash) -> bool, "block_exists");
make_async_fn!(chain_block_or_orphan_block_exists(block_hash: BlockHash) -> bool, "block_exists");

make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists");

Expand Down
35 changes: 18 additions & 17 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where B: BlockchainBackend
txn.set_horizon_data(kernel_sum, utxo_sum);
blockchain_db.write(txn)?;
blockchain_db.store_pruning_horizon(config.pruning_horizon)?;
} else if !blockchain_db.block_exists(genesis_block.accumulated_data().hash)? {
} else if !blockchain_db.chain_block_or_orphan_block_exists(genesis_block.accumulated_data().hash)? {
// Check the genesis block in the DB.
error!(
target: LOG_TARGET,
Expand Down Expand Up @@ -917,7 +917,7 @@ where B: BlockchainBackend
after_lock - before_lock,
);

if db.contains(&DbKey::BlockHash(block_hash))? {
if db.contains(&DbKey::HeaderHash(block_hash))? {
return Ok(BlockAddResult::BlockExists);
}
if db.bad_block_exists(block_hash)? {
Expand Down Expand Up @@ -1083,9 +1083,10 @@ where B: BlockchainBackend
}

/// Returns true if this block exists in the chain, or is orphaned.
pub fn block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
pub fn chain_block_or_orphan_block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.contains(&DbKey::BlockHash(hash))? || db.contains(&DbKey::OrphanBlock(hash))?)
// we need to check if the block accumulated data exists, and the header might exist without a body
Ok(db.fetch_block_accumulated_data(&hash)?.is_some() || db.contains(&DbKey::OrphanBlock(hash))?)
}

/// Returns true if this block exists in the chain, or is orphaned.
Expand Down Expand Up @@ -1404,7 +1405,7 @@ pub fn calculate_validator_node_mr(validator_nodes: &[(PublicKey, [u8; 32])]) ->
}

pub fn fetch_header<T: BlockchainBackend>(db: &T, block_num: u64) -> Result<BlockHeader, ChainStorageError> {
fetch!(db, block_num, BlockHeader)
fetch!(db, block_num, HeaderHeight)
}

pub fn fetch_headers<T: BlockchainBackend>(
Expand All @@ -1422,8 +1423,8 @@ pub fn fetch_headers<T: BlockchainBackend>(
#[allow(clippy::cast_possible_truncation)]
let mut headers = Vec::with_capacity((end_inclusive - start) as usize);
for h in start..=end_inclusive {
match db.fetch(&DbKey::BlockHeader(h))? {
Some(DbValue::BlockHeader(header)) => {
match db.fetch(&DbKey::HeaderHeight(h))? {
Some(DbValue::HeaderHeight(header)) => {
headers.push(*header);
},
Some(_) => unreachable!(),
Expand Down Expand Up @@ -1476,7 +1477,7 @@ fn fetch_header_by_block_hash<T: BlockchainBackend>(
db: &T,
hash: BlockHash,
) -> Result<Option<BlockHeader>, ChainStorageError> {
try_fetch!(db, hash, BlockHash)
try_fetch!(db, hash, HeaderHash)
}

fn fetch_orphan<T: BlockchainBackend>(db: &T, hash: BlockHash) -> Result<Block, ChainStorageError> {
Expand Down Expand Up @@ -2367,7 +2368,7 @@ fn get_orphan_link_main_chain<T: BlockchainBackend>(

// If this hash is part of the main chain, we're done - since curr_hash has already been set to the previous
// hash, the chain Vec does not include the fork block in common with both chains
if db.contains(&DbKey::BlockHash(curr_hash))? {
if db.contains(&DbKey::HeaderHash(curr_hash))? {
break;
}
}
Expand Down Expand Up @@ -2893,7 +2894,7 @@ mod test {
// Check 2b was added
let access = test.db_write_access();
let block = orphan_chain_b.get("2b").unwrap().clone();
assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());

// Check 7d is the tip
let block = orphan_chain_d.get("7d").unwrap().clone();
Expand All @@ -2902,7 +2903,7 @@ mod test {
let metadata = access.fetch_chain_metadata().unwrap();
assert_eq!(metadata.best_block(), block.hash());
assert_eq!(metadata.height_of_longest_chain(), block.height());
assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());

let mut all_blocks = main_chain
.into_iter()
Expand All @@ -2916,8 +2917,8 @@ mod test {
for (height, name) in expected_chain.iter().enumerate() {
let expected_block = all_blocks.get(*name).unwrap();
unpack_enum!(
DbValue::BlockHeader(found_block) =
access.fetch(&DbKey::BlockHeader(height as u64)).unwrap().unwrap()
DbValue::HeaderHeight(found_block) =
access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap()
);
assert_eq!(*found_block, *expected_block.header());
}
Expand Down Expand Up @@ -2988,7 +2989,7 @@ mod test {
// Check 2b was added
let access = test.db_write_access();
let block = orphan_chain_b.get("2b").unwrap().clone();
assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());

// Check 12b is the tip
let block = orphan_chain_b.get("12b").unwrap().clone();
Expand All @@ -2997,7 +2998,7 @@ mod test {
let metadata = access.fetch_chain_metadata().unwrap();
assert_eq!(metadata.best_block(), block.hash());
assert_eq!(metadata.height_of_longest_chain(), block.height());
assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());

let mut all_blocks = main_chain.into_iter().chain(orphan_chain_b).collect::<HashMap<_, _>>();
all_blocks.insert("GB".to_string(), genesis);
Expand All @@ -3008,8 +3009,8 @@ mod test {
for (height, name) in expected_chain.iter().enumerate() {
let expected_block = all_blocks.get(*name).unwrap();
unpack_enum!(
DbValue::BlockHeader(found_block) =
access.fetch(&DbKey::BlockHeader(height as u64)).unwrap().unwrap()
DbValue::HeaderHeight(found_block) =
access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap()
);
assert_eq!(*found_block, *expected_block.header());
}
Expand Down
20 changes: 10 additions & 10 deletions base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ impl fmt::Display for WriteOperation {

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DbKey {
BlockHeader(u64),
BlockHash(BlockHash),
HeaderHeight(u64),
HeaderHash(BlockHash),
OrphanBlock(HashOutput),
}

impl DbKey {
pub fn to_value_not_found_error(&self) -> ChainStorageError {
let (entity, field, value) = match self {
DbKey::BlockHeader(v) => ("BlockHeader", "Height", v.to_string()),
DbKey::BlockHash(v) => ("Block", "Hash", v.to_hex()),
DbKey::HeaderHeight(v) => ("BlockHeader", "Height", v.to_string()),
DbKey::HeaderHash(v) => ("Header", "Hash", v.to_hex()),
DbKey::OrphanBlock(v) => ("Orphan", "Hash", v.to_hex()),
};
ChainStorageError::ValueNotFound { entity, field, value }
Expand All @@ -482,16 +482,16 @@ impl DbKey {

#[derive(Debug)]
pub enum DbValue {
BlockHeader(Box<BlockHeader>),
BlockHash(Box<BlockHeader>),
HeaderHeight(Box<BlockHeader>),
HeaderHash(Box<BlockHeader>),
OrphanBlock(Box<Block>),
}

impl Display for DbValue {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
match self {
DbValue::BlockHeader(_) => f.write_str("Block header"),
DbValue::BlockHash(_) => f.write_str("Block hash"),
DbValue::HeaderHeight(_) => f.write_str("Header by height"),
DbValue::HeaderHash(_) => f.write_str("Header by hash"),
DbValue::OrphanBlock(_) => f.write_str("Orphan block"),
}
}
Expand All @@ -500,8 +500,8 @@ impl Display for DbValue {
impl Display for DbKey {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
match self {
DbKey::BlockHeader(v) => f.write_str(&format!("Block header (#{})", v)),
DbKey::BlockHash(v) => f.write_str(&format!("Block hash (#{})", v.to_hex())),
DbKey::HeaderHeight(v) => f.write_str(&format!("Header height (#{})", v)),
DbKey::HeaderHash(v) => f.write_str(&format!("Header hash (#{})", v.to_hex())),
DbKey::OrphanBlock(v) => f.write_str(&format!("Orphan block hash ({})", v.to_hex())),
}
}
Expand Down
12 changes: 6 additions & 6 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1806,11 +1806,11 @@ impl BlockchainBackend for LMDBDatabase {
fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, ChainStorageError> {
let txn = self.read_transaction()?;
let res = match key {
DbKey::BlockHeader(k) => {
DbKey::HeaderHeight(k) => {
let val: Option<BlockHeader> = lmdb_get(&txn, &self.headers_db, k)?;
val.map(|val| DbValue::BlockHeader(Box::new(val)))
val.map(|val| DbValue::HeaderHeight(Box::new(val)))
},
DbKey::BlockHash(hash) => {
DbKey::HeaderHash(hash) => {
let k: Option<u64> = self.fetch_height_from_hash(&txn, hash)?;
match k {
Some(k) => {
Expand All @@ -1821,7 +1821,7 @@ impl BlockchainBackend for LMDBDatabase {
k
);
let val: Option<BlockHeader> = lmdb_get(&txn, &self.headers_db, &k)?;
val.map(|val| DbValue::BlockHash(Box::new(val)))
val.map(|val| DbValue::HeaderHash(Box::new(val)))
},
None => {
trace!(
Expand All @@ -1843,8 +1843,8 @@ impl BlockchainBackend for LMDBDatabase {
fn contains(&self, key: &DbKey) -> Result<bool, ChainStorageError> {
let txn = self.read_transaction()?;
Ok(match key {
DbKey::BlockHeader(k) => lmdb_exists(&txn, &self.headers_db, k)?,
DbKey::BlockHash(h) => lmdb_exists(&txn, &self.block_hashes_db, h.deref())?,
DbKey::HeaderHeight(k) => lmdb_exists(&txn, &self.headers_db, k)?,
DbKey::HeaderHash(h) => lmdb_exists(&txn, &self.block_hashes_db, h.deref())?,
DbKey::OrphanBlock(k) => lmdb_exists(&txn, &self.orphans_db, k.deref())?,
})
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub type ConfidentialOutputHasher = DomainSeparatedConsensusHasher<ConfidentialO

/// The reason for a peer being banned
#[cfg(feature = "base_node")]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BanReason {
/// The reason for the ban
pub reason: String,
Expand Down
Loading

0 comments on commit 5c2781e

Please sign in to comment.