Skip to content

Commit

Permalink
- These changes enforces a storage limit on the orphan block pool, li…
Browse files Browse the repository at this point in the history
…miting the number of orphan blocks that are kept.

- Orphan block cleanup is performed when a single orphan block is added and on chain reorgs. When a single orphan block is added, then the cheaper cleanup_orphans_single function is called to remove a single orphan block from the orphan pool when the storage limits are reached. The more expensive cleanup_orphans_comprehensive function is called when a chain reorg is performed and many orphan blocks have been added to the block orphan pool that could have resulted in the storage limits being exceeded.
- Added get_orphan_count to the blockchain backends and provided implementations for lmdb_db and memory_db that can be used to query the number of blocks in the orphan block pool.
- Added BlockchainDatabaseConfig to the blockchain db to enable the orphan storage capacity to be set.
- Provided two tests for testing the two different orphan block cleanup mechanisms. The orphan_cleanup_on_add test checks the removal of a single orphan block when the storage capacity is reached on an add block operation. The orphan_cleanup_on_reorg test checks the removal of a set of orphan blocks when the storage capacity is reached on a chain reorg operation, when many blocks were added to the orphan pool.
  • Loading branch information
Yuko Roodt committed Apr 7, 2020
1 parent 3c18269 commit ac9e660
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 18 deletions.
5 changes: 4 additions & 1 deletion applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use tari_core::{
create_lmdb_database,
BlockchainBackend,
BlockchainDatabase,
BlockchainDatabaseConfig,
LMDBDatabase,
MemoryDatabase,
Validators,
Expand Down Expand Up @@ -403,7 +404,9 @@ where
FullConsensusValidator::new(rules.clone(), factories.clone()),
StatelessBlockValidator::new(&rules.consensus_constants()),
);
let db = BlockchainDatabase::new(backend, &rules, validators).map_err(|e| e.to_string())?;
// TODO - make BlockchainDatabaseConfig configurable
let db = BlockchainDatabase::new(backend, &rules, validators, BlockchainDatabaseConfig::default())
.map_err(|e| e.to_string())?;
let mempool_validator =
MempoolValidators::new(FullTxValidator::new(factories.clone()), TxInputAndMaturityValidator {});
let mempool = Mempool::new(db.clone(), MempoolConfig::default(), mempool_validator);
Expand Down
111 changes: 106 additions & 5 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use crate::{
blocks::{blockheader::BlockHash, Block, BlockHeader, NewBlockTemplate},
chain_storage::{
consts::BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY,
db_transaction::{DbKey, DbKeyValuePair, DbTransaction, DbValue, MetadataKey, MetadataValue, MmrTree},
error::ChainStorageError,
ChainMetadata,
Expand Down Expand Up @@ -53,6 +54,20 @@ use tari_mmr::{Hash, MerkleCheckPoint, MerkleProof, MutableMmrLeafNodes};

const LOG_TARGET: &str = "c::cs::database";

/// Configuration for the BlockchainDatabase.
#[derive(Clone, Copy)]
pub struct BlockchainDatabaseConfig {
pub orphan_storage_capacity: usize,
}

impl Default for BlockchainDatabaseConfig {
fn default() -> Self {
Self {
orphan_storage_capacity: BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY,
}
}
}

#[derive(Clone, Debug, PartialEq, Display)]
pub enum BlockAddResult {
Ok,
Expand Down Expand Up @@ -141,6 +156,8 @@ pub trait BlockchainBackend: Send + Sync {
where
Self: Sized,
F: FnMut(Result<(HashOutput, Block), ChainStorageError>);
/// Returns the number of blocks in the block orphan pool.
fn get_orphan_count(&self) -> Result<usize, ChainStorageError>;
/// Performs the function F for each transaction kernel.
fn for_each_kernel<F>(&self, f: F) -> Result<(), ChainStorageError>
where
Expand Down Expand Up @@ -186,7 +203,7 @@ macro_rules! fetch {
///
/// ```
/// use tari_core::{
/// chain_storage::{BlockchainDatabase, MemoryDatabase, Validators},
/// chain_storage::{BlockchainDatabase, BlockchainDatabaseConfig, MemoryDatabase, Validators},
/// consensus::{ConsensusManagerBuilder, Network},
/// transactions::types::HashDigest,
/// validation::{mocks::MockValidator, Validation},
Expand All @@ -196,14 +213,15 @@ macro_rules! fetch {
/// let db = MemoryDatabase::<HashDigest>::default();
/// let network = Network::LocalNet;
/// let rules = ConsensusManagerBuilder::new(network).build();
/// let db = BlockchainDatabase::new(db_backend, &rules, validators).unwrap();
/// let db = BlockchainDatabase::new(db_backend, &rules, validators, BlockchainDatabaseConfig::default()).unwrap();
/// // Do stuff with db
/// ```
pub struct BlockchainDatabase<T>
where T: BlockchainBackend
{
db: Arc<RwLock<T>>,
validators: Validators<T>,
config: BlockchainDatabaseConfig,
}

impl<T> BlockchainDatabase<T>
Expand All @@ -214,11 +232,13 @@ where T: BlockchainBackend
db: T,
consensus_manager: &ConsensusManager,
validators: Validators<T>,
config: BlockchainDatabaseConfig,
) -> Result<Self, ChainStorageError>
{
let blockchain_db = BlockchainDatabase {
db: Arc::new(RwLock::new(db)),
validators,
config,
};
if blockchain_db.get_height()?.is_none() {
let genesis_block = consensus_manager.get_genesis_block();
Expand Down Expand Up @@ -397,7 +417,12 @@ where T: BlockchainBackend
.map_err(ChainStorageError::ValidationError)?;

let mut db = self.db_write_access()?;
add_block(&mut db, &self.validators.block, block)
add_block(
&mut db,
&self.validators.block,
block,
self.config.orphan_storage_capacity,
)
}

fn store_new_block(&self, block: Block) -> Result<(), ChainStorageError> {
Expand Down Expand Up @@ -565,14 +590,22 @@ fn add_block<T: BlockchainBackend>(
db: &mut RwLockWriteGuard<T>,
block_validator: &Arc<Validator<Block, T>>,
block: Block,
orphan_storage_capacity: usize,
) -> Result<BlockAddResult, ChainStorageError>
{
let block_hash = block.hash();
if db.contains(&DbKey::BlockHash(block_hash))? {
return Ok(BlockAddResult::BlockExists);
}

handle_possible_reorg(db, block_validator, block)
let block_add_result = handle_possible_reorg(db, block_validator, block)?;
// Cleanup orphan block pool
match block_add_result {
BlockAddResult::Ok => {},
BlockAddResult::BlockExists => {},
BlockAddResult::OrphanBlock => cleanup_orphans_single(db, orphan_storage_capacity)?,
BlockAddResult::ChainReorg(_) => cleanup_orphans_comprehensive(db, orphan_storage_capacity)?,
}
Ok(block_add_result)
}

// Adds a new block onto the chain tip.
Expand Down Expand Up @@ -1150,6 +1183,73 @@ fn find_strongest_orphan_tip<T: BlockchainBackend>(
Ok((best_accum_difficulty, best_tip_hash))
}

// Discards the orphan block with the minimum height from the block orphan pool to maintain the configured orphan pool
// storage limit.
fn cleanup_orphans_single<T: BlockchainBackend>(
db: &mut RwLockWriteGuard<T>,
orphan_storage_capacity: usize,
) -> Result<(), ChainStorageError>
{
if db.get_orphan_count()? > orphan_storage_capacity {
trace!(
target: LOG_TARGET,
"Orphan block storage limit reached, performing simple cleanup.",
);
let mut min_height: u64 = u64::max_value();
let mut remove_hash: Option<BlockHash> = None;
db.for_each_orphan(|pair| {
let (_, block) = pair.unwrap();
if block.header.height < min_height {
min_height = block.header.height;
remove_hash = Some(block.hash());
}
})
.expect("Unexpected result for database query");
if let Some(hash) = remove_hash {
trace!(target: LOG_TARGET, "Discarding orphan block ({}).", hash.to_hex());
remove_orphan(db, hash)?;
}
}
Ok(())
}

// Perform a comprehensive search to remove all the minimum height orphans to maintain the configured orphan pool
// storage limit.
fn cleanup_orphans_comprehensive<T: BlockchainBackend>(
db: &mut RwLockWriteGuard<T>,
orphan_storage_capacity: usize,
) -> Result<(), ChainStorageError>
{
let orphan_count = db.get_orphan_count()?;
if orphan_count > orphan_storage_capacity {
trace!(
target: LOG_TARGET,
"Orphan block storage limit reached, performing comprehensive cleanup.",
);
let remove_count = orphan_count - orphan_storage_capacity;

let mut orphans = Vec::<(u64, BlockHash)>::with_capacity(orphan_count);
db.for_each_orphan(|pair| {
let (_, block) = pair.unwrap();
orphans.push((block.header.height, block.hash()));
})
.expect("Unexpected result for database query");
orphans.sort_by(|a, b| a.0.cmp(&b.0));

let mut txn = DbTransaction::new();
for i in 0..remove_count {
trace!(
target: LOG_TARGET,
"Discarding orphan block ({}).",
orphans[i].1.to_hex()
);
txn.delete(DbKey::OrphanBlock(orphans[i].1.clone()));
}
commit(db, txn)?;
}
Ok(())
}

fn log_error<T>(req: DbKey, err: ChainStorageError) -> Result<T, ChainStorageError> {
error!(
target: LOG_TARGET,
Expand All @@ -1167,6 +1267,7 @@ where T: BlockchainBackend
BlockchainDatabase {
db: self.db.clone(),
validators: self.validators.clone(),
config: self.config.clone(),
}
}
}
24 changes: 24 additions & 0 deletions base_layer/core/src/chain_storage/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

/// The maximum number of orphans that can be stored in the Orphan block pool.
pub const BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY: usize = 720;
5 changes: 5 additions & 0 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,11 @@ where D: Digest + Send + Sync
lmdb_for_each::<F, HashOutput, Block>(&self.env, &self.orphans_db, f)
}

/// Returns the number of blocks in the block orphan pool.
fn get_orphan_count(&self) -> Result<usize, ChainStorageError> {
lmdb_len(&self.env, &self.orphans_db)
}

/// Iterate over all the stored transaction kernels and execute the function `f` for each kernel.
fn for_each_kernel<F>(&self, f: F) -> Result<(), ChainStorageError>
where F: FnMut(Result<(HashOutput, TransactionKernel), ChainStorageError>) {
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/chain_storage/memory_db/memory_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@ where D: Digest + Send + Sync
Ok(())
}

/// Returns the number of blocks in the block orphan pool.
fn get_orphan_count(&self) -> Result<usize, ChainStorageError> {
let db = self.db_access()?;
Ok(db.orphans.len())
}

/// Iterate over all the stored transaction kernels and execute the function `f` for each kernel.
fn for_each_kernel<F>(&self, mut f: F) -> Result<(), ChainStorageError>
where F: FnMut(Result<(HashOutput, TransactionKernel), ChainStorageError>) {
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! backed by LMDB, while the merkle trees are stored in flat files for example.
mod blockchain_database;
mod consts;
mod db_transaction;
mod error;
mod historical_block;
Expand All @@ -46,6 +47,7 @@ pub use blockchain_database::{
BlockAddResult,
BlockchainBackend,
BlockchainDatabase,
BlockchainDatabaseConfig,
MutableMmrState,
Validators,
};
Expand Down
4 changes: 4 additions & 0 deletions base_layer/core/src/helpers/mock_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl BlockchainBackend for MockBackend {
unimplemented!()
}

fn get_orphan_count(&self) -> Result<usize, ChainStorageError> {
unimplemented!()
}

fn for_each_kernel<F>(&self, _f: F) -> Result<(), ChainStorageError>
where
Self: Sized,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod mock_backend;

use crate::{
blocks::{Block, BlockHeader},
chain_storage::{BlockchainDatabase, MemoryDatabase, Validators},
chain_storage::{BlockchainDatabase, BlockchainDatabaseConfig, MemoryDatabase, Validators},
consensus::{ConsensusConstants, ConsensusManager},
transactions::{transaction::Transaction, types::HashDigest},
validation::mocks::MockValidator,
Expand All @@ -51,5 +51,5 @@ pub fn create_orphan_block(
pub fn create_mem_db(consensus_manager: &ConsensusManager) -> BlockchainDatabase<MemoryDatabase<HashDigest>> {
let validators = Validators::new(MockValidator::new(true), MockValidator::new(true));
let db = MemoryDatabase::<HashDigest>::default();
BlockchainDatabase::new(db, consensus_manager, validators).unwrap()
BlockchainDatabase::new(db, consensus_manager, validators, BlockchainDatabaseConfig::default()).unwrap()
}
1 change: 0 additions & 1 deletion base_layer/core/src/mempool/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

use std::time::Duration;

Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/tests/block_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_core::{
chain_storage::{BlockchainDatabase, MemoryDatabase, Validators},
chain_storage::{BlockchainDatabase, BlockchainDatabaseConfig, MemoryDatabase, Validators},
consensus::{ConsensusManagerBuilder, Network},
proof_of_work::DiffAdjManager,
transactions::types::{CryptoFactories, HashDigest},
Expand All @@ -38,7 +38,7 @@ fn test_genesis_block() {
FullConsensusValidator::new(rules.clone(), factories),
StatelessBlockValidator::new(&rules.consensus_constants()),
);
let db = BlockchainDatabase::new(backend, &rules, validators).unwrap();
let db = BlockchainDatabase::new(backend, &rules, validators, BlockchainDatabaseConfig::default()).unwrap();
let diff_adj_manager = DiffAdjManager::new(&rules.consensus_constants()).unwrap();
rules.set_diff_manager(diff_adj_manager).unwrap();
let block = rules.get_genesis_block();
Expand Down
Loading

0 comments on commit ac9e660

Please sign in to comment.