diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 8e5a21c2c1..bd757d3a6f 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -155,8 +155,13 @@ pub trait BlockchainBackend: Send + Sync { /// Fetches the checkpoint corresponding to the provided height, the checkpoint consist of the list of nodes /// added & deleted for the given Merkle tree. fn fetch_checkpoint(&self, tree: MmrTree, height: u64) -> Result; + /// Fetches the total merkle mountain range node count upto the specified height. + fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result; /// Fetches the leaf node hash and its deletion status for the nth leaf node in the given MMR tree. fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Hash, bool), ChainStorageError>; + /// Fetches the set of leaf node hashes and their deletion status' for the nth to nth+count leaf node index in the + /// given MMR tree. + fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result, ChainStorageError>; /// Performs the function F for each orphan block in the orphan pool. fn for_each_orphan(&self, f: F) -> Result<(), ChainStorageError> where @@ -271,7 +276,6 @@ where T: BlockchainBackend ); blockchain_db.store_pruning_horizon(config.pruning_horizon)?; } - Ok(blockchain_db) } diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 19ffe909b0..eeb54806b7 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -66,7 +66,7 @@ use croaring::Bitmap; use digest::Digest; use lmdb_zero::{Database, Environment, WriteTransaction}; use log::*; -use std::{collections::VecDeque, path::Path, sync::Arc}; +use std::{cmp::min, collections::VecDeque, path::Path, sync::Arc}; use tari_crypto::tari_utilities::{epoch_time::EpochTime, hash::Hashable}; use tari_mmr::{ functions::{prune_mutable_mmr, PrunedMutableMmr}, @@ -645,6 +645,14 @@ where D: Digest + Send + Sync .ok_or_else(|| ChainStorageError::OutOfRange) } + fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result { + match tree { + MmrTree::Kernel => count_mmr_nodes_added(&self.kernel_checkpoints, height), + MmrTree::Utxo => count_mmr_nodes_added(&self.utxo_checkpoints, height), + MmrTree::RangeProof => count_mmr_nodes_added(&self.range_proof_checkpoints, height), + } + } + fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Vec, bool), ChainStorageError> { let (hash, deleted) = match tree { MmrTree::Kernel => self.kernel_mmr.fetch_mmr_node(pos)?, @@ -657,6 +665,14 @@ where D: Digest + Send + Sync Ok((hash, deleted)) } + fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result, bool)>, ChainStorageError> { + let mut lead_nodes = Vec::<(Vec, bool)>::with_capacity(count as usize); + for pos in pos..pos + count { + lead_nodes.push(self.fetch_mmr_node(tree.clone(), pos)?); + } + Ok(lead_nodes) + } + /// Iterate over all the stored orphan blocks and execute the function `f` for each block. fn for_each_orphan(&self, f: F) -> Result<(), ChainStorageError> where F: FnMut(Result<(HashOutput, Block), ChainStorageError>) { @@ -796,3 +812,23 @@ fn rewind_checkpoint_index(cp_count: usize, steps_back: usize) -> usize { 1 } } + +// Calculate the total leaf node count upto a specified height. +fn count_mmr_nodes_added(checkpoints: &LMDBVec, height: u64) -> Result { + let mut node_count: u32 = 0; + let last_index = min( + checkpoints + .len() + .map_err(|e| ChainStorageError::AccessError(e.to_string()))?, + (height + 1) as usize, + ); + for cp_index in 0..last_index { + if let Some(cp) = checkpoints + .get(cp_index) + .map_err(|e| ChainStorageError::AccessError(format!("Checkpoint error: {}", e.to_string())))? + { + node_count += cp.nodes_added().len() as u32; + } + } + Ok(node_count) +} diff --git a/base_layer/core/src/chain_storage/memory_db/memory_db.rs b/base_layer/core/src/chain_storage/memory_db/memory_db.rs index 2372fc7b01..97d776725d 100644 --- a/base_layer/core/src/chain_storage/memory_db/memory_db.rs +++ b/base_layer/core/src/chain_storage/memory_db/memory_db.rs @@ -49,6 +49,7 @@ use crate::{ use croaring::Bitmap; use digest::Digest; use std::{ + cmp::min, collections::{HashMap, VecDeque}, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; @@ -454,6 +455,15 @@ where D: Digest + Send + Sync .ok_or_else(|| ChainStorageError::OutOfRange) } + fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result { + let db = self.db_access()?; + match tree { + MmrTree::Kernel => count_mmr_nodes_added(&db.kernel_checkpoints, height), + MmrTree::Utxo => count_mmr_nodes_added(&db.utxo_checkpoints, height), + MmrTree::RangeProof => count_mmr_nodes_added(&db.range_proof_checkpoints, height), + } + } + fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Vec, bool), ChainStorageError> { let db = self.db_access()?; let (hash, deleted) = match tree { @@ -467,6 +477,14 @@ where D: Digest + Send + Sync Ok((hash, deleted)) } + fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result, bool)>, ChainStorageError> { + let mut lead_nodes = Vec::<(Vec, bool)>::with_capacity(count as usize); + for pos in pos..pos + count { + lead_nodes.push(self.fetch_mmr_node(tree.clone(), pos)?); + } + Ok(lead_nodes) + } + /// Iterate over all the stored orphan blocks and execute the function `f` for each block. fn for_each_orphan(&self, mut f: F) -> Result<(), ChainStorageError> where F: FnMut(Result<(HashOutput, Block), ChainStorageError>) { @@ -713,3 +731,18 @@ fn rewind_checkpoint_index(cp_count: usize, steps_back: usize) -> usize { 1 } } + +// Calculate the total leaf node count upto a specified height. +fn count_mmr_nodes_added(checkpoints: &MemDbVec, height: u64) -> Result { + let mut node_count: u32 = 0; + let last_index = min(checkpoints.len()?, (height + 1) as usize); + for cp_index in 0..last_index { + if let Some(cp) = checkpoints + .get(cp_index) + .map_err(|e| ChainStorageError::AccessError(format!("Checkpoint error: {}", e.to_string())))? + { + node_count += cp.nodes_added().len() as u32; + } + } + Ok(node_count) +} diff --git a/base_layer/core/src/chain_storage/mod.rs b/base_layer/core/src/chain_storage/mod.rs index 3a8f9f4e1d..478fb74e30 100644 --- a/base_layer/core/src/chain_storage/mod.rs +++ b/base_layer/core/src/chain_storage/mod.rs @@ -53,7 +53,16 @@ pub use blockchain_database::{ MutableMmrState, Validators, }; -pub use db_transaction::{DbKey, DbKeyValuePair, DbTransaction, DbValue, MetadataKey, MetadataValue, MmrTree}; +pub use db_transaction::{ + DbKey, + DbKeyValuePair, + DbTransaction, + DbValue, + MetadataKey, + MetadataValue, + MmrTree, + WriteOperation, +}; pub use error::ChainStorageError; pub use historical_block::HistoricalBlock; pub use lmdb_db::{ diff --git a/base_layer/core/src/helpers/mock_backend.rs b/base_layer/core/src/helpers/mock_backend.rs index f3dde27577..78a962a14d 100644 --- a/base_layer/core/src/helpers/mock_backend.rs +++ b/base_layer/core/src/helpers/mock_backend.rs @@ -75,10 +75,24 @@ impl BlockchainBackend for MockBackend { unimplemented!() } + fn fetch_mmr_node_count(&self, _tree: MmrTree, _height: u64) -> Result { + unimplemented!() + } + fn fetch_mmr_node(&self, _tree: MmrTree, _pos: u32) -> Result<(Hash, bool), ChainStorageError> { unimplemented!() } + fn fetch_mmr_nodes( + &self, + _tree: MmrTree, + _pos: u32, + _count: u32, + ) -> Result, bool)>, ChainStorageError> + { + unimplemented!() + } + fn for_each_orphan(&self, _f: F) -> Result<(), ChainStorageError> where Self: Sized, diff --git a/base_layer/core/tests/chain_storage_tests/chain_backend.rs b/base_layer/core/tests/chain_storage_tests/chain_backend.rs index 4a6d4a7368..3edcdb4a2e 100644 --- a/base_layer/core/tests/chain_storage_tests/chain_backend.rs +++ b/base_layer/core/tests/chain_storage_tests/chain_backend.rs @@ -34,6 +34,7 @@ use tari_core::{ MetadataKey, MetadataValue, MmrTree, + WriteOperation, }, consensus::{ConsensusConstants, Network}, helpers::create_orphan_block, @@ -775,8 +776,6 @@ fn lmdb_commit_block_and_create_fetch_checkpoint_and_rewind_mmr() { } } -// TODO: Test Needed: fetch_mmr_node - fn for_each_orphan(mut db: T, consensus_constants: &ConsensusConstants) { let orphan1 = create_orphan_block( 5, @@ -1472,3 +1471,189 @@ fn lmdb_fetch_target_difficulties() { std::fs::remove_dir_all(&temp_path).unwrap(); } } + +fn fetch_utxo_rp_mmr_nodes_and_count(mut db: T) { + let factories = CryptoFactories::default(); + + let (utxo1, _) = create_utxo(MicroTari(10_000), &factories, None); + let (utxo2, _) = create_utxo(MicroTari(20_000), &factories, None); + let (utxo3, _) = create_utxo(MicroTari(30_000), &factories, None); + let (utxo4, _) = create_utxo(MicroTari(40_000), &factories, None); + let (utxo5, _) = create_utxo(MicroTari(50_000), &factories, None); + let (utxo6, _) = create_utxo(MicroTari(60_000), &factories, None); + let utxo_hash1 = utxo1.hash(); + let utxo_hash2 = utxo2.hash(); + let utxo_hash3 = utxo3.hash(); + let utxo_hash4 = utxo4.hash(); + let utxo_hash5 = utxo5.hash(); + let utxo_hash6 = utxo6.hash(); + let utxo_leaf_nodes = vec![ + (utxo_hash1.clone(), true), + (utxo_hash2.clone(), false), + (utxo_hash3.clone(), true), + (utxo_hash4.clone(), true), + (utxo_hash5.clone(), false), + (utxo_hash6.clone(), false), + ]; + let rp_leaf_nodes = vec![ + (utxo1.proof.hash(), false), + (utxo2.proof.hash(), false), + (utxo3.proof.hash(), false), + (utxo4.proof.hash(), false), + (utxo5.proof.hash(), false), + (utxo6.proof.hash(), false), + ]; + + let mut txn = DbTransaction::new(); + txn.insert_utxo(utxo1, true); + txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo)); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_utxo(utxo2, true); + txn.insert_utxo(utxo3, true); + txn.spend_utxo(utxo_hash1.clone()); + txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo)); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_utxo(utxo4, true); + txn.insert_utxo(utxo5, true); + txn.spend_utxo(utxo_hash3.clone()); + txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo)); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_utxo(utxo6, true); + txn.spend_utxo(utxo_hash4.clone()); + txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo)); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof)); + assert!(db.write(txn).is_ok()); + + for i in 0..=3 { + let mmr_node = db.fetch_mmr_node(MmrTree::Utxo, i).unwrap(); + assert_eq!(mmr_node, utxo_leaf_nodes[i as usize]); + let mmr_node = db.fetch_mmr_node(MmrTree::RangeProof, i).unwrap(); + assert_eq!(mmr_node, rp_leaf_nodes[i as usize]); + + let mmr_node = db.fetch_mmr_nodes(MmrTree::Utxo, i, 3).unwrap(); + assert_eq!(mmr_node.len(), 3); + assert_eq!(mmr_node[0], utxo_leaf_nodes[i as usize]); + assert_eq!(mmr_node[1], utxo_leaf_nodes[(i + 1) as usize]); + assert_eq!(mmr_node[2], utxo_leaf_nodes[(i + 2) as usize]); + let mmr_node = db.fetch_mmr_nodes(MmrTree::RangeProof, i, 3).unwrap(); + assert_eq!(mmr_node.len(), 3); + assert_eq!(mmr_node[0], rp_leaf_nodes[i as usize]); + assert_eq!(mmr_node[1], rp_leaf_nodes[(i + 1) as usize]); + assert_eq!(mmr_node[2], rp_leaf_nodes[(i + 2) as usize]); + } + + assert!(db.fetch_mmr_node(MmrTree::Utxo, 7).is_err()); + assert!(db.fetch_mmr_nodes(MmrTree::Utxo, 5, 4).is_err()); + assert!(db.fetch_mmr_node(MmrTree::RangeProof, 7).is_err()); + assert!(db.fetch_mmr_nodes(MmrTree::RangeProof, 5, 4).is_err()); +} + +#[test] +fn memory_fetch_utxo_rp_mmr_nodes_and_count() { + let db = MemoryDatabase::::default(); + fetch_utxo_rp_mmr_nodes_and_count(db); +} + +#[test] +fn lmdb_fetch_utxo_rp_nodes_and_count() { + // Create temporary test folder + let temp_path = create_temporary_data_path(); + + // Perform test + { + let db = create_lmdb_database(&temp_path, MmrCacheConfig::default()).unwrap(); + fetch_utxo_rp_mmr_nodes_and_count(db); + } + + // Cleanup test data - in Windows the LMBD `set_mapsize` sets file size equals to map size; Linux use sparse files + if std::path::Path::new(&temp_path).exists() { + std::fs::remove_dir_all(&temp_path).unwrap(); + } +} + +fn fetch_kernel_mmr_nodes_and_count(mut db: T) { + let kernel1 = create_test_kernel(100.into(), 0); + let kernel2 = create_test_kernel(200.into(), 1); + let kernel3 = create_test_kernel(300.into(), 1); + let kernel4 = create_test_kernel(400.into(), 2); + let kernel5 = create_test_kernel(500.into(), 2); + let kernel6 = create_test_kernel(600.into(), 3); + let leaf_nodes = vec![ + (kernel1.hash(), false), + (kernel2.hash(), false), + (kernel3.hash(), false), + (kernel4.hash(), false), + (kernel5.hash(), false), + (kernel6.hash(), false), + ]; + + let mut txn = DbTransaction::new(); + txn.insert_kernel(kernel1, true); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_kernel(kernel2, true); + txn.insert_kernel(kernel3, true); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_kernel(kernel4, true); + txn.insert_kernel(kernel5, true); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel)); + assert!(db.write(txn).is_ok()); + let mut txn = DbTransaction::new(); + txn.insert_kernel(kernel6, true); + txn.operations + .push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel)); + assert!(db.write(txn).is_ok()); + + for i in 0..=3 { + let mmr_node = db.fetch_mmr_node(MmrTree::Kernel, i).unwrap(); + assert_eq!(mmr_node, leaf_nodes[i as usize]); + + let mmr_node = db.fetch_mmr_nodes(MmrTree::Kernel, i, 3).unwrap(); + assert_eq!(mmr_node.len(), 3); + assert_eq!(mmr_node[0], leaf_nodes[i as usize]); + assert_eq!(mmr_node[1], leaf_nodes[(i + 1) as usize]); + assert_eq!(mmr_node[2], leaf_nodes[(i + 2) as usize]); + } + + assert!(db.fetch_mmr_node(MmrTree::Kernel, 7).is_err()); + assert!(db.fetch_mmr_nodes(MmrTree::Kernel, 5, 4).is_err()); +} + +#[test] +fn memory_fetch_kernel_mmr_nodes_and_count() { + let db = MemoryDatabase::::default(); + fetch_kernel_mmr_nodes_and_count(db); +} + +#[test] +fn lmdb_fetch_kernel_nodes_and_count() { + // Create temporary test folder + let temp_path = create_temporary_data_path(); + + // Perform test + { + let db = create_lmdb_database(&temp_path, MmrCacheConfig::default()).unwrap(); + fetch_kernel_mmr_nodes_and_count(db); + } + + // Cleanup test data - in Windows the LMBD `set_mapsize` sets file size equals to map size; Linux use sparse files + if std::path::Path::new(&temp_path).exists() { + std::fs::remove_dir_all(&temp_path).unwrap(); + } +} diff --git a/base_layer/core/tests/node_comms_interface.rs b/base_layer/core/tests/node_comms_interface.rs index 182263d9b0..1d3842815b 100644 --- a/base_layer/core/tests/node_comms_interface.rs +++ b/base_layer/core/tests/node_comms_interface.rs @@ -24,6 +24,7 @@ mod helpers; use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, executor::block_on, StreamExt}; +use helpers::block_builders::append_block; use tari_comms::peer_manager::NodeId; use tari_core::{ base_node::{ @@ -31,7 +32,15 @@ use tari_core::{ OutboundNodeCommsInterface, }, blocks::{BlockBuilder, BlockHeader}, - chain_storage::{BlockchainDatabase, ChainMetadata, DbTransaction, HistoricalBlock, MemoryDatabase}, + chain_storage::{ + BlockchainDatabase, + BlockchainDatabaseConfig, + ChainMetadata, + DbTransaction, + HistoricalBlock, + MemoryDatabase, + Validators, + }, consensus::{ConsensusManagerBuilder, Network}, helpers::create_mem_db, mempool::{Mempool, MempoolConfig, MempoolValidators}, @@ -40,7 +49,11 @@ use tari_core::{ tari_amount::MicroTari, types::{CryptoFactories, HashDigest}, }, - validation::transaction_validators::TxInputAndMaturityValidator, + validation::{ + accum_difficulty_validators::MockAccumDifficultyValidator, + mocks::MockValidator, + transaction_validators::TxInputAndMaturityValidator, + }, }; use tari_crypto::tari_utilities::hash::Hashable; use tari_service_framework::{reply_channel, reply_channel::Receiver}; @@ -352,3 +365,63 @@ fn inbound_fetch_blocks() { }); }); } + +#[test] +fn inbound_fetch_blocks_before_horizon_height() { + let network = Network::LocalNet; + let consensus_constants = network.create_consensus_constants(); + let consensus_manager = ConsensusManagerBuilder::new(network) + .with_consensus_constants(consensus_constants.clone()) + .build(); + let validators = Validators::new( + MockValidator::new(true), + MockValidator::new(true), + MockAccumDifficultyValidator {}, + ); + let db = MemoryDatabase::::default(); + let mut config = BlockchainDatabaseConfig::default(); + config.pruning_horizon = 2; + let store = BlockchainDatabase::new(db, &consensus_manager, validators, config).unwrap(); + let mempool_validator = MempoolValidators::new(TxInputAndMaturityValidator {}, TxInputAndMaturityValidator {}); + let mempool = Mempool::new(store.clone(), MempoolConfig::default(), mempool_validator); + let (block_event_sender, _) = broadcast::channel(50); + let (request_sender, _) = reply_channel::unbounded(); + let (block_sender, _) = futures_mpsc_channel_unbounded(); + let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); + let inbound_nch = InboundNodeCommsHandlers::new( + block_event_sender, + store.clone(), + mempool, + consensus_manager, + outbound_nci, + ); + + let block0 = store.fetch_block(0).unwrap().block().clone(); + let block1 = append_block(&store, &block0, vec![], &consensus_constants, 1.into()).unwrap(); + let block2 = append_block(&store, &block1, vec![], &consensus_constants, 1.into()).unwrap(); + let block3 = append_block(&store, &block2, vec![], &consensus_constants, 1.into()).unwrap(); + let _block4 = append_block(&store, &block3, vec![], &consensus_constants, 1.into()).unwrap(); + + test_async(move |rt| { + rt.spawn(async move { + if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch + .handle_request(&NodeCommsRequest::FetchBlocks(vec![1])) + .await + { + assert_eq!(received_blocks.len(), 0); + } else { + assert!(false); + } + + if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch + .handle_request(&NodeCommsRequest::FetchBlocks(vec![2])) + .await + { + assert_eq!(received_blocks.len(), 1); + assert_eq!(*received_blocks[0].block(), block2); + } else { + assert!(false); + } + }); + }); +} diff --git a/base_layer/mmr/src/merkle_mountain_range.rs b/base_layer/mmr/src/merkle_mountain_range.rs index 6658de866a..145d8aa5de 100644 --- a/base_layer/mmr/src/merkle_mountain_range.rs +++ b/base_layer/mmr/src/merkle_mountain_range.rs @@ -27,7 +27,6 @@ use crate::{ Hash, }; use digest::Digest; -use log::*; use std::{ cmp::{max, min}, marker::PhantomData,