Skip to content

Commit

Permalink
fix(pruned mode): prune inputs, keep track of kernel/utxo sum
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 1, 2021
1 parent c162f07 commit 9e1ceaa
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::{
},
sync::rpc,
},
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput},
blocks::{BlockHeader, UpdateBlockAccumulatedData},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
proto::base_node::{
sync_utxo as proto_sync_utxo,
sync_utxos_response::UtxoOrDeleted,
Expand All @@ -44,16 +44,10 @@ use crate::{
use croaring::Bitmap;
use futures::StreamExt;
use log::*;
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use tari_common_types::types::{HashDigest, RangeProofService};
use std::convert::{TryFrom, TryInto};
use tari_common_types::types::{Commitment, HashDigest, RangeProofService};
use tari_comms::PeerConnection;
use tari_crypto::{
commitment::HomomorphicCommitment,
tari_utilities::{hex::Hex, Hashable},
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tari_mmr::{MerkleMountainRange, MutableMmr};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
Expand All @@ -65,6 +59,8 @@ pub struct HorizonStateSynchronization<'a, B: BlockchainBackend> {
prover: &'a RangeProofService,
num_kernels: u64,
num_outputs: u64,
kernel_sum: Commitment,
utxo_sum: Commitment,
}

impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Expand All @@ -81,6 +77,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
prover,
num_kernels: 0,
num_outputs: 0,
kernel_sum: Default::default(),
utxo_sum: Default::default(),
}
}

Expand Down Expand Up @@ -119,19 +117,38 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
self.initialize().await?;
debug!(target: LOG_TARGET, "Synchronizing kernels");
self.synchronize_kernels(client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
self.synchronize_outputs(client, to_header).await?;
Ok(())
}

async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
let metadata = self.db().get_chain_metadata().await?;
let data = self
.db()
.fetch_block_accumulated_data(metadata.best_block().clone())
.await?;
self.utxo_sum = data.utxo_sum().clone();
self.kernel_sum = data.kernel_sum().clone();
debug!(
target: LOG_TARGET,
"Loaded utxo_sum = {}, kernel_sum = {}",
self.utxo_sum.to_hex(),
self.kernel_sum.to_hex()
);
Ok(())
}

async fn synchronize_kernels(
&mut self,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
let metadata = self.db().get_chain_metadata().await?;

let remote_num_kernels = to_header.kernel_mmr_size;
self.num_kernels = remote_num_kernels;
Expand Down Expand Up @@ -192,6 +209,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.map_err(HorizonSyncError::InvalidKernelSignature)?;

kernels.push(kernel.clone());
self.kernel_sum = &self.kernel_sum + &kernel.excess;
error!(target: LOG_TARGET, "DEBUG: kernel_sum = {}", self.kernel_sum.to_hex());
txn.insert_kernel_via_horizon_sync(kernel, current_header.hash().clone(), mmr_position as u32);
if mmr_position == current_header.header().kernel_mmr_size - 1 {
debug!(
Expand Down Expand Up @@ -221,11 +240,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
});
}

txn.update_pruned_hash_set(
MmrTree::Kernel,
let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
kernel_mmr.get_pruned_hash_set()?,
UpdateBlockAccumulatedData {
kernel_sum: Some(self.kernel_sum.clone()),
kernel_hash_set: Some(kernel_hash_set),
..Default::default()
},
);
txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());

txn.commit().await?;
if mmr_position < end - 1 {
Expand Down Expand Up @@ -258,6 +282,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
) -> Result<(), HorizonSyncError> {
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let metadata = self.db().get_chain_metadata().await?;

let remote_num_outputs = to_header.output_mmr_size;
self.num_outputs = remote_num_outputs;

Expand Down Expand Up @@ -322,10 +348,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let block_data = db
.fetch_block_accumulated_data(current_header.header().prev_hash.clone())
.await?;
let (_, output_pruned_set, rp_pruned_set, mut full_bitmap) = block_data.dissolve();
let (_, output_pruned_set, witness_pruned_set, _) = block_data.dissolve();
let mut full_bitmap = self.db().fetch_deleted_bitmap_at_tip().await?.into_bitmap();

let mut output_mmr = MerkleMountainRange::<HashDigest, _>::new(output_pruned_set);
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(rp_pruned_set);
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);

while let Some(response) = output_stream.next().await {
let res: SyncUtxosResponse = response?;
Expand Down Expand Up @@ -356,6 +383,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
output_hashes.push(output.hash());
witness_hashes.push(output.witness_hash());
unpruned_outputs.push(output.clone());
self.utxo_sum = &self.utxo_sum + &output.commitment;
txn.insert_output_via_horizon_sync(
output,
current_header.hash().clone(),
Expand Down Expand Up @@ -415,8 +443,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
witness_mmr.push(hash)?;
}

// Check that the difference bitmap is excessively large. Bitmap::deserialize panics if greater than
// isize::MAX, however isize::MAX is still an inordinate amount of data. An
// Check that the difference bitmap isn't excessively large. Bitmap::deserialize panics if greater
// than isize::MAX, however isize::MAX is still an inordinate amount of data. An
// arbitrary 4 MiB limit is used.
const MAX_DIFF_BITMAP_BYTE_LEN: usize = 4 * 1024 * 1024;
if diff_bitmap.len() > MAX_DIFF_BITMAP_BYTE_LEN {
Expand Down Expand Up @@ -471,14 +499,19 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

txn.update_deleted_bitmap(diff_bitmap.clone());
txn.update_pruned_hash_set(MmrTree::Utxo, current_header.hash().clone(), pruned_output_set);
txn.update_pruned_hash_set(
MmrTree::Witness,

let witness_hash_set = witness_mmr.get_pruned_hash_set()?;
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
witness_mmr.get_pruned_hash_set()?,
UpdateBlockAccumulatedData {
utxo_sum: Some(self.utxo_sum.clone()),
utxo_hash_set: Some(pruned_output_set),
witness_hash_set: Some(witness_hash_set),
deleted_diff: Some(diff_bitmap.into()),
..Default::default()
},
);
txn.update_block_accumulated_data_with_deleted_diff(current_header.hash().clone(), diff_bitmap);

txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());
txn.commit().await?;

current_header = db.fetch_chain_header(current_header.height() + 1).await?;
Expand Down Expand Up @@ -518,99 +551,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn finalize_horizon_sync(&mut self) -> Result<(), HorizonSyncError> {
debug!(target: LOG_TARGET, "Validating horizon state");

let info = HorizonSyncInfo::new(
self.shared.set_state_info(StateInfo::HorizonSync(HorizonSyncInfo::new(
vec![self.sync_peer.peer_node_id().clone()],
HorizonSyncStatus::Finalizing,
);
self.shared.set_state_info(StateInfo::HorizonSync(info));
)));

let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
let mut pruned_utxo_sum = HomomorphicCommitment::default();
let mut pruned_kernel_sum = HomomorphicCommitment::default();

let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;
let bitmap = Arc::new(
self.db()
.fetch_complete_deleted_bitmap_at(header.hash().clone())
.await?
.into_bitmap(),
);
let expected_prev_best_block = self.shared.db.get_chain_metadata().await?.best_block().clone();
for h in 0..=header.height() {
let curr_header = self.db().fetch_chain_header(h).await?;

trace!(
target: LOG_TARGET,
"Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().output_mmr_size,
prev_mmr,
curr_header.header().output_mmr_size - 1
);
let (utxos, _) = self
.db()
.fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone())
.await?;
trace!(
target: LOG_TARGET,
"Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
);
let kernels = self
.db()
.fetch_kernels_by_mmr_position(prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1)
.await?;

let mut utxo_sum = HomomorphicCommitment::default();
debug!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
debug!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
let mut prune_counter = 0;
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
utxo_sum = &output.commitment + &utxo_sum;
},
_ => {
prune_counter += 1;
},
}
}
if prune_counter > 0 {
debug!(target: LOG_TARGET, "Pruned {} outputs", prune_counter);
}
prev_mmr = curr_header.header().output_mmr_size;

pruned_utxo_sum = &utxo_sum + &pruned_utxo_sum;

for k in kernels {
pruned_kernel_sum = &k.excess + &pruned_kernel_sum;
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;

trace!(
target: LOG_TARGET,
"Height: {} Kernel sum:{:?} Pruned UTXO sum: {:?}",
h,
pruned_kernel_sum,
pruned_utxo_sum
);
}

self.shared
.sync_validators
.final_horizon_state
.validate(
&*self.db().clone().into_inner().db_read_access()?,
&*self.db().inner().db_read_access()?,
header.height(),
&pruned_utxo_sum,
&pruned_kernel_sum,
&self.utxo_sum,
&self.kernel_sum,
)
.map_err(HorizonSyncError::FinalStateValidationFailed)?;

let metadata = self.db().get_chain_metadata().await?;
info!(
target: LOG_TARGET,
"Horizon state validation succeeded! Committing horizon state."
Expand All @@ -621,9 +580,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
header.height(),
header.hash().clone(),
header.accumulated_data().total_accumulated_difficulty,
expected_prev_best_block,
metadata.best_block().clone(),
)
.set_pruned_height(header.height(), pruned_kernel_sum, pruned_utxo_sum)
.set_pruned_height(header.height(), self.kernel_sum.clone(), self.utxo_sum.clone())
.commit()
.await?;

Expand Down
41 changes: 32 additions & 9 deletions base_layer/core/src/blocks/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,55 @@ use tari_mmr::{pruned_hashset::PrunedHashSet, ArrayLike};

const LOG_TARGET: &str = "c::bn::acc_data";

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlockAccumulatedData {
pub(crate) kernels: PrunedHashSet,
pub(crate) outputs: PrunedHashSet,
pub(crate) witness: PrunedHashSet,
pub(crate) deleted: DeletedBitmap,
pub(crate) range_proofs: PrunedHashSet,
pub(crate) kernel_sum: Commitment,
pub(crate) utxo_sum: Commitment,
}

impl BlockAccumulatedData {
pub fn new(
kernels: PrunedHashSet,
outputs: PrunedHashSet,
range_proofs: PrunedHashSet,
witness: PrunedHashSet,
deleted: Bitmap,
total_kernel_sum: Commitment,
kernel_sum: Commitment,
utxo_sum: Commitment,
) -> Self {
Self {
kernels,
outputs,
range_proofs,
witness,
deleted: DeletedBitmap { deleted },
kernel_sum: total_kernel_sum,
kernel_sum,
utxo_sum,
}
}

pub fn deleted(&self) -> &Bitmap {
&self.deleted.deleted
}

pub fn set_deleted(&mut self, deleted: DeletedBitmap) -> &mut Self {
self.deleted = deleted;
self
}

pub fn dissolve(self) -> (PrunedHashSet, PrunedHashSet, PrunedHashSet, Bitmap) {
(self.kernels, self.outputs, self.range_proofs, self.deleted.deleted)
(self.kernels, self.outputs, self.witness, self.deleted.deleted)
}

pub fn kernel_sum(&self) -> &Commitment {
&self.kernel_sum
}

pub fn utxo_sum(&self) -> &Commitment {
&self.utxo_sum
}
}

impl Default for BlockAccumulatedData {
Expand All @@ -96,8 +108,9 @@ impl Default for BlockAccumulatedData {
deleted: DeletedBitmap {
deleted: Bitmap::create(),
},
range_proofs: Default::default(),
witness: Default::default(),
kernel_sum: Default::default(),
utxo_sum: Default::default(),
}
}
}
Expand All @@ -110,11 +123,21 @@ impl Display for BlockAccumulatedData {
self.outputs.len().unwrap_or(0),
self.deleted.deleted.cardinality(),
self.kernels.len().unwrap_or(0),
self.range_proofs.len().unwrap_or(0)
self.witness.len().unwrap_or(0)
)
}
}

#[derive(Debug, Clone, Default)]
pub struct UpdateBlockAccumulatedData {
pub kernel_hash_set: Option<PrunedHashSet>,
pub utxo_hash_set: Option<PrunedHashSet>,
pub witness_hash_set: Option<PrunedHashSet>,
pub deleted_diff: Option<DeletedBitmap>,
pub utxo_sum: Option<Commitment>,
pub kernel_sum: Option<Commitment>,
}

/// Wrapper struct to serialize and deserialize Bitmap
#[derive(Debug, Clone)]
pub struct DeletedBitmap {
Expand Down
Loading

0 comments on commit 9e1ceaa

Please sign in to comment.