Skip to content

Commit

Permalink
feat: remove orphan validation and only validate on insertion (tari-p…
Browse files Browse the repository at this point in the history
…roject#5601)

Description
---
Currently, block validation is done in two steps orphan check and full
check.
An orphan check is done when a block is first inserted into the DB (all
blocks are). But before this is done, the block must pass an internal
integrity check (checking signatures, rangeproofs, etc)
The blocks are then stored in the orphan pool. 
When they are added to the main chain, a full check is done on the
blocks to ensure the mmr's are correct, no double spends + another
integrity check.

Bad blocks are also only tracked when syncing. 

This PR removes the orphan check and only does validation on the block
when we add it to the main chain. But to ensure a node is not spammed by
useless blocks, it makes sure the block received at least has a min
difficulty. The orphan pool will clean out the oldest blocks and this
stored amount is configurable via the config.

As soon as we can track a block to the main chain, we will do a full
Proof of work check.

Then as when a chain/block proof of work is higher than the current
chain the blocks will be added to the main chain. This is then when we
do the full validation of the block.

Changed up how the bad blocks are handled, and we now properly keep
track of all bad blocks. When a block fails any part of the validation
it will be added to the bad_block list. When we receive a new request
for a block, this will be one of the first things we check, before we
spent resources validation it again.

Motivation and Context
---
TARI-0003
A base node waste resources validating a block more than once. From
testing, the most expensive tests to run are(in order): Rangeproofs,
Metadatasignature, Scriptsignature, kernel signature. These tests are
all run on orphan blocks. Its very easy to put junk data in these blocks
and spam a node to keep wasting resources and validated these fake
blocks.

By using the tests added in tari-project#5599, this PR improves `add_block` time by
almost 100%.

How Has This Been Tested?
---
Unit tests.

What process can a PR reviewer use to test or verify this change?
---
Checking that we have not changed or removed any required validation
rules.
  • Loading branch information
SWvheerden authored Aug 10, 2023
1 parent 2205b1e commit 41244a3
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ where B: BlockchainBackend + 'static
self.mempool.clone(),
self.rules.clone(),
base_node_config.messaging_request_timeout,
self.randomx_factory.clone(),
))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
Expand Down
5 changes: 5 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
chain_storage::ChainStorageError,
consensus::ConsensusManagerError,
mempool::MempoolError,
proof_of_work::{monero_rx::MergeMineError, DifficultyError},
};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -70,4 +71,8 @@ pub enum CommsInterfaceError {
InvalidRequest { request: &'static str, details: String },
#[error("Peer sent invalid full block {hash}: {details}")]
InvalidFullBlock { hash: FixedHash, details: String },
#[error("Invalid merge mined block: {0}")]
MergeMineError(#[from] MergeMineError),
#[error("Invalid difficulty: {0}")]
DifficultyError(#[from] DifficultyError),
}
84 changes: 69 additions & 15 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@ use crate::{
},
metrics,
},
blocks::{Block, BlockBuilder, BlockHeader, ChainBlock, NewBlock, NewBlockTemplate},
blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError, PrunedOutput},
consensus::{ConsensusConstants, ConsensusManager},
mempool::Mempool,
proof_of_work::{Difficulty, PowAlgorithm},
proof_of_work::{
randomx_difficulty,
randomx_factory::RandomXFactory,
sha3x_difficulty,
Difficulty,
PowAlgorithm,
PowError,
},
transactions::aggregated_body::AggregateBody,
validation::helpers,
validation::{helpers, ValidationError},
};

const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
Expand Down Expand Up @@ -85,6 +92,7 @@ pub struct InboundNodeCommsHandlers<B> {
list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
outbound_nci: OutboundNodeCommsInterface,
connectivity: ConnectivityRequester,
randomx_factory: RandomXFactory,
}

impl<B> InboundNodeCommsHandlers<B>
Expand All @@ -98,6 +106,7 @@ where B: BlockchainBackend + 'static
consensus_manager: ConsensusManager,
outbound_nci: OutboundNodeCommsInterface,
connectivity: ConnectivityRequester,
randomx_factory: RandomXFactory,
) -> Self {
Self {
block_event_sender,
Expand All @@ -107,6 +116,7 @@ where B: BlockchainBackend + 'static
list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
outbound_nci,
connectivity,
randomx_factory,
}
}

Expand Down Expand Up @@ -438,15 +448,17 @@ where B: BlockchainBackend + 'static
}

// Lets check if the block exists before we try and ask for a complete block
if self.blockchain_db.block_exists(block_hash).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
block_hash.to_hex()
);
if self.check_exists_and_not_bad_block(block_hash).await? {
return Ok(());
}

// lets check that the difficulty at least matches the min required difficulty
// We cannot check the target difficulty as orphan blocks dont have a target difficulty.
// All we care here is that bad blocks are not free to make, and that they are more expensive to make then they
// are to validate. As soon as a block can be linked to the main chain, a proper full proof of work check will
// be done before any other validation.
self.check_min_block_difficulty(&new_block).await?;

{
// we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
// same block from multiple peer near simultaneously. We should only reconcile each unique block once.
Expand All @@ -462,14 +474,10 @@ where B: BlockchainBackend + 'static
}
{
let mut write_lock = self.list_of_reconciling_blocks.write().await;
if self.blockchain_db.block_exists(block_hash).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
block_hash.to_hex()
);
if self.check_exists_and_not_bad_block(block_hash).await? {
return Ok(());
}

if !write_lock.insert(block_hash) {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -498,6 +506,51 @@ where B: BlockchainBackend + 'static
Ok(())
}

async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
let constants = self.consensus_manager.consensus_constants(new_block.header.height);
let min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
let achieved = match new_block.header.pow_algo() {
PowAlgorithm::RandomX => randomx_difficulty(&new_block.header, &self.randomx_factory)?,
PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
};
if achieved < min_difficulty {
self.blockchain_db
.add_bad_block(
new_block.header.hash(),
self.blockchain_db.get_chain_metadata().await?.height_of_longest_chain(),
)
.await?;
return Err(CommsInterfaceError::InvalidBlockHeader(
BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin),
));
}
Ok(())
}

async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
if self.blockchain_db.block_exists(block).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
block.to_hex()
);
return Ok(true);
}
if self.blockchain_db.bad_block_exists(block).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already validated as a bad block",
block.to_hex()
);
return Err(CommsInterfaceError::ChainStorageError(
ChainStorageError::ValidationError {
source: ValidationError::BadBlockFound { hash: block.to_hex() },
},
));
}
Ok(false)
}

async fn reconcile_and_add_block(
&mut self,
source_peer: NodeId,
Expand Down Expand Up @@ -949,6 +1002,7 @@ impl<B> Clone for InboundNodeCommsHandlers<B> {
list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
outbound_nci: self.outbound_nci.clone(),
connectivity: self.connectivity.clone(),
randomx_factory: self.randomx_factory.clone(),
}
}
}
6 changes: 6 additions & 0 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::{
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
consensus::ConsensusManager,
mempool::Mempool,
proof_of_work::randomx_factory::RandomXFactory,
proto as shared_protos,
proto::base_node as proto,
};
Expand All @@ -65,6 +66,7 @@ pub struct BaseNodeServiceInitializer<T> {
mempool: Mempool,
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
}

impl<T> BaseNodeServiceInitializer<T>
Expand All @@ -77,13 +79,15 @@ where T: BlockchainBackend
mempool: Mempool,
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
) -> Self {
Self {
inbound_message_subscription_factory,
blockchain_db,
mempool,
consensus_manager,
service_request_timeout,
randomx_factory,
}
}

Expand Down Expand Up @@ -175,6 +179,7 @@ where T: BlockchainBackend + 'static
let blockchain_db = self.blockchain_db.clone();
let mempool = self.mempool.clone();
let consensus_manager = self.consensus_manager.clone();
let randomx_factory = self.randomx_factory.clone();

context.spawn_when_ready(move |handles| async move {
let dht = handles.expect_handle::<Dht>();
Expand All @@ -190,6 +195,7 @@ where T: BlockchainBackend + 'static
consensus_manager,
outbound_nci.clone(),
connectivity,
randomx_factory,
);

let streams = BaseNodeStreams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
header.pow_algo(),
header.hash().to_hex(),
);
self.header_validator.validate(header)?;
self.header_validator.validate(header).await?;
}

debug!(
Expand Down Expand Up @@ -662,7 +662,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
continue;
}
let current_height = header.height;
last_total_accumulated_difficulty = self.header_validator.validate(header)?;
last_total_accumulated_difficulty = self.header_validator.validate(header).await?;

if has_switched_to_new_chain {
// If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
Expand Down
39 changes: 31 additions & 8 deletions base_layer/core/src/base_node/sync/header_sync/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use tari_utilities::{epoch_time::EpochTime, hex::Hex};

use crate::{
base_node::sync::BlockHeaderSyncError,
blocks::{BlockHeader, BlockHeaderAccumulatedData, ChainHeader},
blocks::{BlockHeader, BlockHeaderAccumulatedData, BlockHeaderValidationError, ChainHeader},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, TargetDifficulties},
common::rolling_vec::RollingVec,
consensus::ConsensusManager,
proof_of_work::{randomx_factory::RandomXFactory, PowAlgorithm},
validation::{header::HeaderFullValidator, DifficultyCalculator, HeaderChainLinkedValidator},
validation::{header::HeaderFullValidator, DifficultyCalculator, HeaderChainLinkedValidator, ValidationError},
};

const LOG_TARGET: &str = "c::bn::header_sync";
Expand Down Expand Up @@ -109,7 +109,7 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
self.valid_headers().last()
}

pub fn validate(&mut self, header: BlockHeader) -> Result<u128, BlockHeaderSyncError> {
pub async fn validate(&mut self, header: BlockHeader) -> Result<u128, BlockHeaderSyncError> {
let state = self.state();
let constants = self.consensus_rules.consensus_constants(header.height);

Expand All @@ -118,15 +118,38 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
constants.max_pow_difficulty(header.pow_algo()),
);

let achieved_target = {
let result = {
let txn = self.db.inner().db_read_access()?;
self.validator.validate(
&*txn,
&header,
&state.previous_header,
&state.timestamps,
Some(target_difficulty),
)?
)
};
let achieved_target = match result {
Ok(achieved_target) => achieved_target,
// future timelimit validation can succeed at a later time. As the block is not yet valid, we discard it
// for now and ban the peer, but wont blacklist the block.
Err(e @ ValidationError::BlockHeaderError(BlockHeaderValidationError::InvalidTimestampFutureTimeLimit)) => {
return Err(e.into())
},
// We dont want to mark a block as bad for internal failures
Err(
e @ ValidationError::FatalStorageError(_) |
e @ ValidationError::NotEnoughTimestamps { .. } |
e @ ValidationError::AsyncTaskFailed(_),
) => return Err(e.into()),
// We dont have to mark the block twice
Err(e @ ValidationError::BadBlockFound { .. }) => return Err(e.into()),

Err(e) => {
let mut txn = self.db.write_transaction();
txn.insert_bad_block(header.hash(), header.height);
txn.commit().await?;
return Err(e.into());
},
};

// Header is valid, add this header onto the validation state for the next round
Expand Down Expand Up @@ -299,11 +322,11 @@ mod test {
validator.initialize_state(tip.hash()).await.unwrap();
assert!(validator.valid_headers().is_empty());
let next = BlockHeader::from_previous(tip.header());
validator.validate(next).unwrap();
validator.validate(next).await.unwrap();
assert_eq!(validator.valid_headers().len(), 1);
let tip = validator.valid_headers().last().cloned().unwrap();
let next = BlockHeader::from_previous(tip.header());
validator.validate(next).unwrap();
validator.validate(next).await.unwrap();
assert_eq!(validator.valid_headers().len(), 2);
}

Expand All @@ -313,7 +336,7 @@ mod test {
validator.initialize_state(tip.hash()).await.unwrap();
let mut next = BlockHeader::from_previous(tip.header());
next.height = 14;
let err = validator.validate(next).unwrap_err();
let err = validator.validate(next).await.unwrap_err();
unpack_enum!(BlockHeaderSyncError::ValidationFailed(val_err) = err);
unpack_enum!(ValidationError::BlockHeaderError(header_err) = val_err);
unpack_enum!(BlockHeaderValidationError::InvalidHeight { actual, expected } = header_err);
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists");

make_async_fn!(add_bad_block(hash: BlockHash, height: u64) -> (), "add_bad_block");

make_async_fn!(fetch_block(height: u64, compact: bool) -> HistoricalBlock, "fetch_block");

make_async_fn!(fetch_blocks<T: RangeBounds<u64>>(bounds: T, compact: bool) -> Vec<HistoricalBlock>, "fetch_blocks");
Expand Down
Loading

0 comments on commit 41244a3

Please sign in to comment.