Skip to content

Commit

Permalink
Blockchain backend fetch MMR nodes
Browse files Browse the repository at this point in the history
Merge pull request #1918

- Added fetch_mmr_nodes and fetch_mmr_node_count to the blockchain backends to enable downloading of the Kernel, UTXO and Range Proof MMRs.
- Added chain backend tests for testing the lmdb and memory backend implementations of the fetch_mmr_nodes and fetch_mmr_node_count functions for the Kernel, UTXO and Range Proof MMRs.
- Added a pruned mode test for downloading of blocks when the node is running in pruned mode.

* pull/1918/head:
  - Added fetch_mmr_nodes and fetch_mmr_node_count to the blockchain backends to enable downloading of the Kernel, UTXO and Range Proof MMRs. - Added chain backend tests for testing the lmdb and memory backend implementations of the fetch_mmr_nodes and fetch_mmr_node_count functions for the Kernel, UTXO and Range Proof MMRs. - Added a pruned mode test for downloading of blocks when the node is running in pruned mode.
  • Loading branch information
sdbondi committed May 28, 2020
2 parents 4649238 + a28f7f5 commit 060ab70
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 8 deletions.
6 changes: 5 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ pub trait BlockchainBackend: Send + Sync {
/// Fetches the checkpoint corresponding to the provided height, the checkpoint consist of the list of nodes
/// added & deleted for the given Merkle tree.
fn fetch_checkpoint(&self, tree: MmrTree, height: u64) -> Result<MerkleCheckPoint, ChainStorageError>;
/// Fetches the total merkle mountain range node count upto the specified height.
fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result<u32, ChainStorageError>;
/// Fetches the leaf node hash and its deletion status for the nth leaf node in the given MMR tree.
fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Hash, bool), ChainStorageError>;
/// Fetches the set of leaf node hashes and their deletion status' for the nth to nth+count leaf node index in the
/// given MMR tree.
fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result<Vec<(Hash, bool)>, ChainStorageError>;
/// Performs the function F for each orphan block in the orphan pool.
fn for_each_orphan<F>(&self, f: F) -> Result<(), ChainStorageError>
where
Expand Down Expand Up @@ -271,7 +276,6 @@ where T: BlockchainBackend
);
blockchain_db.store_pruning_horizon(config.pruning_horizon)?;
}

Ok(blockchain_db)
}

Expand Down
38 changes: 37 additions & 1 deletion base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use croaring::Bitmap;
use digest::Digest;
use lmdb_zero::{Database, Environment, WriteTransaction};
use log::*;
use std::{collections::VecDeque, path::Path, sync::Arc};
use std::{cmp::min, collections::VecDeque, path::Path, sync::Arc};
use tari_crypto::tari_utilities::{epoch_time::EpochTime, hash::Hashable};
use tari_mmr::{
functions::{prune_mutable_mmr, PrunedMutableMmr},
Expand Down Expand Up @@ -645,6 +645,14 @@ where D: Digest + Send + Sync
.ok_or_else(|| ChainStorageError::OutOfRange)
}

fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result<u32, ChainStorageError> {
match tree {
MmrTree::Kernel => count_mmr_nodes_added(&self.kernel_checkpoints, height),
MmrTree::Utxo => count_mmr_nodes_added(&self.utxo_checkpoints, height),
MmrTree::RangeProof => count_mmr_nodes_added(&self.range_proof_checkpoints, height),
}
}

fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Vec<u8>, bool), ChainStorageError> {
let (hash, deleted) = match tree {
MmrTree::Kernel => self.kernel_mmr.fetch_mmr_node(pos)?,
Expand All @@ -657,6 +665,14 @@ where D: Digest + Send + Sync
Ok((hash, deleted))
}

fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result<Vec<(Vec<u8>, bool)>, ChainStorageError> {
let mut lead_nodes = Vec::<(Vec<u8>, bool)>::with_capacity(count as usize);
for pos in pos..pos + count {
lead_nodes.push(self.fetch_mmr_node(tree.clone(), pos)?);
}
Ok(lead_nodes)
}

/// Iterate over all the stored orphan blocks and execute the function `f` for each block.
fn for_each_orphan<F>(&self, f: F) -> Result<(), ChainStorageError>
where F: FnMut(Result<(HashOutput, Block), ChainStorageError>) {
Expand Down Expand Up @@ -796,3 +812,23 @@ fn rewind_checkpoint_index(cp_count: usize, steps_back: usize) -> usize {
1
}
}

// Calculate the total leaf node count upto a specified height.
fn count_mmr_nodes_added(checkpoints: &LMDBVec<MerkleCheckPoint>, height: u64) -> Result<u32, ChainStorageError> {
let mut node_count: u32 = 0;
let last_index = min(
checkpoints
.len()
.map_err(|e| ChainStorageError::AccessError(e.to_string()))?,
(height + 1) as usize,
);
for cp_index in 0..last_index {
if let Some(cp) = checkpoints
.get(cp_index)
.map_err(|e| ChainStorageError::AccessError(format!("Checkpoint error: {}", e.to_string())))?
{
node_count += cp.nodes_added().len() as u32;
}
}
Ok(node_count)
}
33 changes: 33 additions & 0 deletions base_layer/core/src/chain_storage/memory_db/memory_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::{
use croaring::Bitmap;
use digest::Digest;
use std::{
cmp::min,
collections::{HashMap, VecDeque},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
Expand Down Expand Up @@ -454,6 +455,15 @@ where D: Digest + Send + Sync
.ok_or_else(|| ChainStorageError::OutOfRange)
}

fn fetch_mmr_node_count(&self, tree: MmrTree, height: u64) -> Result<u32, ChainStorageError> {
let db = self.db_access()?;
match tree {
MmrTree::Kernel => count_mmr_nodes_added(&db.kernel_checkpoints, height),
MmrTree::Utxo => count_mmr_nodes_added(&db.utxo_checkpoints, height),
MmrTree::RangeProof => count_mmr_nodes_added(&db.range_proof_checkpoints, height),
}
}

fn fetch_mmr_node(&self, tree: MmrTree, pos: u32) -> Result<(Vec<u8>, bool), ChainStorageError> {
let db = self.db_access()?;
let (hash, deleted) = match tree {
Expand All @@ -467,6 +477,14 @@ where D: Digest + Send + Sync
Ok((hash, deleted))
}

fn fetch_mmr_nodes(&self, tree: MmrTree, pos: u32, count: u32) -> Result<Vec<(Vec<u8>, bool)>, ChainStorageError> {
let mut lead_nodes = Vec::<(Vec<u8>, bool)>::with_capacity(count as usize);
for pos in pos..pos + count {
lead_nodes.push(self.fetch_mmr_node(tree.clone(), pos)?);
}
Ok(lead_nodes)
}

/// Iterate over all the stored orphan blocks and execute the function `f` for each block.
fn for_each_orphan<F>(&self, mut f: F) -> Result<(), ChainStorageError>
where F: FnMut(Result<(HashOutput, Block), ChainStorageError>) {
Expand Down Expand Up @@ -713,3 +731,18 @@ fn rewind_checkpoint_index(cp_count: usize, steps_back: usize) -> usize {
1
}
}

// Calculate the total leaf node count upto a specified height.
fn count_mmr_nodes_added(checkpoints: &MemDbVec<MerkleCheckPoint>, height: u64) -> Result<u32, ChainStorageError> {
let mut node_count: u32 = 0;
let last_index = min(checkpoints.len()?, (height + 1) as usize);
for cp_index in 0..last_index {
if let Some(cp) = checkpoints
.get(cp_index)
.map_err(|e| ChainStorageError::AccessError(format!("Checkpoint error: {}", e.to_string())))?
{
node_count += cp.nodes_added().len() as u32;
}
}
Ok(node_count)
}
11 changes: 10 additions & 1 deletion base_layer/core/src/chain_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ pub use blockchain_database::{
MutableMmrState,
Validators,
};
pub use db_transaction::{DbKey, DbKeyValuePair, DbTransaction, DbValue, MetadataKey, MetadataValue, MmrTree};
pub use db_transaction::{
DbKey,
DbKeyValuePair,
DbTransaction,
DbValue,
MetadataKey,
MetadataValue,
MmrTree,
WriteOperation,
};
pub use error::ChainStorageError;
pub use historical_block::HistoricalBlock;
pub use lmdb_db::{
Expand Down
14 changes: 14 additions & 0 deletions base_layer/core/src/helpers/mock_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,24 @@ impl BlockchainBackend for MockBackend {
unimplemented!()
}

fn fetch_mmr_node_count(&self, _tree: MmrTree, _height: u64) -> Result<u32, ChainStorageError> {
unimplemented!()
}

fn fetch_mmr_node(&self, _tree: MmrTree, _pos: u32) -> Result<(Hash, bool), ChainStorageError> {
unimplemented!()
}

fn fetch_mmr_nodes(
&self,
_tree: MmrTree,
_pos: u32,
_count: u32,
) -> Result<Vec<(Vec<u8>, bool)>, ChainStorageError>
{
unimplemented!()
}

fn for_each_orphan<F>(&self, _f: F) -> Result<(), ChainStorageError>
where
Self: Sized,
Expand Down
189 changes: 187 additions & 2 deletions base_layer/core/tests/chain_storage_tests/chain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tari_core::{
MetadataKey,
MetadataValue,
MmrTree,
WriteOperation,
},
consensus::{ConsensusConstants, Network},
helpers::create_orphan_block,
Expand Down Expand Up @@ -775,8 +776,6 @@ fn lmdb_commit_block_and_create_fetch_checkpoint_and_rewind_mmr() {
}
}

// TODO: Test Needed: fetch_mmr_node

fn for_each_orphan<T: BlockchainBackend>(mut db: T, consensus_constants: &ConsensusConstants) {
let orphan1 = create_orphan_block(
5,
Expand Down Expand Up @@ -1472,3 +1471,189 @@ fn lmdb_fetch_target_difficulties() {
std::fs::remove_dir_all(&temp_path).unwrap();
}
}

fn fetch_utxo_rp_mmr_nodes_and_count<T: BlockchainBackend>(mut db: T) {
let factories = CryptoFactories::default();

let (utxo1, _) = create_utxo(MicroTari(10_000), &factories, None);
let (utxo2, _) = create_utxo(MicroTari(20_000), &factories, None);
let (utxo3, _) = create_utxo(MicroTari(30_000), &factories, None);
let (utxo4, _) = create_utxo(MicroTari(40_000), &factories, None);
let (utxo5, _) = create_utxo(MicroTari(50_000), &factories, None);
let (utxo6, _) = create_utxo(MicroTari(60_000), &factories, None);
let utxo_hash1 = utxo1.hash();
let utxo_hash2 = utxo2.hash();
let utxo_hash3 = utxo3.hash();
let utxo_hash4 = utxo4.hash();
let utxo_hash5 = utxo5.hash();
let utxo_hash6 = utxo6.hash();
let utxo_leaf_nodes = vec![
(utxo_hash1.clone(), true),
(utxo_hash2.clone(), false),
(utxo_hash3.clone(), true),
(utxo_hash4.clone(), true),
(utxo_hash5.clone(), false),
(utxo_hash6.clone(), false),
];
let rp_leaf_nodes = vec![
(utxo1.proof.hash(), false),
(utxo2.proof.hash(), false),
(utxo3.proof.hash(), false),
(utxo4.proof.hash(), false),
(utxo5.proof.hash(), false),
(utxo6.proof.hash(), false),
];

let mut txn = DbTransaction::new();
txn.insert_utxo(utxo1, true);
txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo));
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_utxo(utxo2, true);
txn.insert_utxo(utxo3, true);
txn.spend_utxo(utxo_hash1.clone());
txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo));
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_utxo(utxo4, true);
txn.insert_utxo(utxo5, true);
txn.spend_utxo(utxo_hash3.clone());
txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo));
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_utxo(utxo6, true);
txn.spend_utxo(utxo_hash4.clone());
txn.operations.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Utxo));
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::RangeProof));
assert!(db.write(txn).is_ok());

for i in 0..=3 {
let mmr_node = db.fetch_mmr_node(MmrTree::Utxo, i).unwrap();
assert_eq!(mmr_node, utxo_leaf_nodes[i as usize]);
let mmr_node = db.fetch_mmr_node(MmrTree::RangeProof, i).unwrap();
assert_eq!(mmr_node, rp_leaf_nodes[i as usize]);

let mmr_node = db.fetch_mmr_nodes(MmrTree::Utxo, i, 3).unwrap();
assert_eq!(mmr_node.len(), 3);
assert_eq!(mmr_node[0], utxo_leaf_nodes[i as usize]);
assert_eq!(mmr_node[1], utxo_leaf_nodes[(i + 1) as usize]);
assert_eq!(mmr_node[2], utxo_leaf_nodes[(i + 2) as usize]);
let mmr_node = db.fetch_mmr_nodes(MmrTree::RangeProof, i, 3).unwrap();
assert_eq!(mmr_node.len(), 3);
assert_eq!(mmr_node[0], rp_leaf_nodes[i as usize]);
assert_eq!(mmr_node[1], rp_leaf_nodes[(i + 1) as usize]);
assert_eq!(mmr_node[2], rp_leaf_nodes[(i + 2) as usize]);
}

assert!(db.fetch_mmr_node(MmrTree::Utxo, 7).is_err());
assert!(db.fetch_mmr_nodes(MmrTree::Utxo, 5, 4).is_err());
assert!(db.fetch_mmr_node(MmrTree::RangeProof, 7).is_err());
assert!(db.fetch_mmr_nodes(MmrTree::RangeProof, 5, 4).is_err());
}

#[test]
fn memory_fetch_utxo_rp_mmr_nodes_and_count() {
let db = MemoryDatabase::<HashDigest>::default();
fetch_utxo_rp_mmr_nodes_and_count(db);
}

#[test]
fn lmdb_fetch_utxo_rp_nodes_and_count() {
// Create temporary test folder
let temp_path = create_temporary_data_path();

// Perform test
{
let db = create_lmdb_database(&temp_path, MmrCacheConfig::default()).unwrap();
fetch_utxo_rp_mmr_nodes_and_count(db);
}

// Cleanup test data - in Windows the LMBD `set_mapsize` sets file size equals to map size; Linux use sparse files
if std::path::Path::new(&temp_path).exists() {
std::fs::remove_dir_all(&temp_path).unwrap();
}
}

fn fetch_kernel_mmr_nodes_and_count<T: BlockchainBackend>(mut db: T) {
let kernel1 = create_test_kernel(100.into(), 0);
let kernel2 = create_test_kernel(200.into(), 1);
let kernel3 = create_test_kernel(300.into(), 1);
let kernel4 = create_test_kernel(400.into(), 2);
let kernel5 = create_test_kernel(500.into(), 2);
let kernel6 = create_test_kernel(600.into(), 3);
let leaf_nodes = vec![
(kernel1.hash(), false),
(kernel2.hash(), false),
(kernel3.hash(), false),
(kernel4.hash(), false),
(kernel5.hash(), false),
(kernel6.hash(), false),
];

let mut txn = DbTransaction::new();
txn.insert_kernel(kernel1, true);
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_kernel(kernel2, true);
txn.insert_kernel(kernel3, true);
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_kernel(kernel4, true);
txn.insert_kernel(kernel5, true);
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel));
assert!(db.write(txn).is_ok());
let mut txn = DbTransaction::new();
txn.insert_kernel(kernel6, true);
txn.operations
.push(WriteOperation::CreateMmrCheckpoint(MmrTree::Kernel));
assert!(db.write(txn).is_ok());

for i in 0..=3 {
let mmr_node = db.fetch_mmr_node(MmrTree::Kernel, i).unwrap();
assert_eq!(mmr_node, leaf_nodes[i as usize]);

let mmr_node = db.fetch_mmr_nodes(MmrTree::Kernel, i, 3).unwrap();
assert_eq!(mmr_node.len(), 3);
assert_eq!(mmr_node[0], leaf_nodes[i as usize]);
assert_eq!(mmr_node[1], leaf_nodes[(i + 1) as usize]);
assert_eq!(mmr_node[2], leaf_nodes[(i + 2) as usize]);
}

assert!(db.fetch_mmr_node(MmrTree::Kernel, 7).is_err());
assert!(db.fetch_mmr_nodes(MmrTree::Kernel, 5, 4).is_err());
}

#[test]
fn memory_fetch_kernel_mmr_nodes_and_count() {
let db = MemoryDatabase::<HashDigest>::default();
fetch_kernel_mmr_nodes_and_count(db);
}

#[test]
fn lmdb_fetch_kernel_nodes_and_count() {
// Create temporary test folder
let temp_path = create_temporary_data_path();

// Perform test
{
let db = create_lmdb_database(&temp_path, MmrCacheConfig::default()).unwrap();
fetch_kernel_mmr_nodes_and_count(db);
}

// Cleanup test data - in Windows the LMBD `set_mapsize` sets file size equals to map size; Linux use sparse files
if std::path::Path::new(&temp_path).exists() {
std::fs::remove_dir_all(&temp_path).unwrap();
}
}
Loading

0 comments on commit 060ab70

Please sign in to comment.