diff --git a/base_layer/core/src/base_node/proto/rpc.proto b/base_layer/core/src/base_node/proto/rpc.proto index ac7fb7ec9d..f1c4ee9d6f 100644 --- a/base_layer/core/src/base_node/proto/rpc.proto +++ b/base_layer/core/src/base_node/proto/rpc.proto @@ -45,7 +45,7 @@ message FindChainSplitResponse { } message SyncKernelsRequest { - bytes start_header_hash = 1; + uint64 start = 1; bytes end_header_hash = 2; } diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs index bd1ed5a5f9..e4a163f003 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs @@ -84,7 +84,10 @@ impl HorizonStateSync { // We're already synced because we have full blocks higher than our target pruned height if local_metadata.height_of_longest_chain() >= horizon_sync_height { - info!(target: LOG_TARGET, "Horizon state was already synchronized."); + info!( + target: LOG_TARGET, + "Tip height is higher than our pruned height. Horizon state is already synchronized." + ); return StateEvent::HorizonStateSynchronized; } diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs index e0371fff7a..df62e5495a 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs @@ -20,24 +20,20 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::num::TryFromIntError; - -use thiserror::Error; -use tokio::task; - -use tari_comms::{ - connectivity::ConnectivityError, - peer_manager::NodeId, - protocol::rpc::{RpcError, RpcStatus}, -}; -use tari_mmr::error::MerkleMountainRangeError; - use crate::{ base_node::{comms_interface::CommsInterfaceError, state_machine_service::states::helpers::BaseNodeRequestError}, chain_storage::{ChainStorageError, MmrTree}, transactions::transaction_entities::error::TransactionError, validation::ValidationError, }; +use std::num::TryFromIntError; +use tari_comms::{ + connectivity::ConnectivityError, + protocol::rpc::{RpcError, RpcStatus}, +}; +use tari_mmr::error::MerkleMountainRangeError; +use thiserror::Error; +use tokio::task; #[derive(Debug, Error)] pub enum HorizonSyncError { @@ -74,15 +70,6 @@ pub enum HorizonSyncError { MerkleMountainRangeError(#[from] MerkleMountainRangeError), #[error("Connectivity error: {0}")] ConnectivityError(#[from] ConnectivityError), - #[error( - "Sync peer {peer} has a tip height of {remote_peer_height} which is less than the target height of \ - {target_pruning_horizon}" - )] - InappropriateSyncPeer { - peer: NodeId, - target_pruning_horizon: u64, - remote_peer_height: u64, - }, } impl From for HorizonSyncError { diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index 78d80f72c4..2013ce3264 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -135,8 +135,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { client: &mut rpc::BaseNodeSyncRpcClient, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { - debug!(target: LOG_TARGET, "Initializing"); - self.initialize().await?; + // debug!(target: LOG_TARGET, "Initializing"); + // self.initialize().await?; debug!(target: LOG_TARGET, "Synchronizing kernels"); self.synchronize_kernels(client, to_header).await?; debug!(target: LOG_TARGET, "Synchronizing outputs"); @@ -144,65 +144,30 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(()) } - async fn initialize(&mut self) -> Result<(), HorizonSyncError> { - let db = self.db(); - let local_metadata = db.get_chain_metadata().await?; - - if local_metadata.height_of_longest_chain() == 0 { - let horizon_data = db.fetch_horizon_data().await?; - self.utxo_sum = horizon_data.utxo_sum().clone(); - self.kernel_sum = horizon_data.kernel_sum().clone(); - - return Ok(()); - } - - // let header = self.db().fetch_chain_header(self.horizon_sync_height).await?; - // let acc = db.fetch_block_accumulated_data(header.hash().clone()).await?; - let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height); - if local_metadata.pruned_height() < new_prune_height { - debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height); - db.prune_to_height(new_prune_height).await?; - } - - // let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?; - - // prune_to_height updates the horizon data - let horizon_data = db.fetch_horizon_data().await?; - // if *horizon_data.kernel_sum() != acc.cumulative_kernel_sum { - // error!(target: LOG_TARGET, "KERNEL SUM NOT EQUAL CALCULATED"); - // } - // if *horizon_data.utxo_sum() != acc.cumulative_utxo_sum { - // error!(target: LOG_TARGET, "UTXO SUM NOT EQUAL CALCULATED"); - // } - - // let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?; - // if calc_kernel_sum != acc.cumulative_kernel_sum { - // error!(target: LOG_TARGET, "KERNEL SUM NOT EQUAL CALCULATED"); - // } - // if calc_utxo_sum != acc.cumulative_utxo_sum { - // error!(target: LOG_TARGET, "UTXO SUM NOT EQUAL CALCULATED"); - // } - - // if calc_kernel_sum != *horizon_data.kernel_sum() { - // error!(target: LOG_TARGET, "HORIZON KERNEL SUM NOT EQUAL CALCULATED"); - // } - // if calc_utxo_sum != *horizon_data.utxo_sum() { - // error!(target: LOG_TARGET, "HORIZON UTXO SUM NOT EQUAL CALCULATED"); - // } - - debug!( - target: LOG_TARGET, - "Loaded from horizon data utxo_sum = {}, kernel_sum = {}", - horizon_data.utxo_sum().to_hex(), - horizon_data.kernel_sum().to_hex(), - ); - // self.utxo_sum = calc_utxo_sum; - // self.kernel_sum = calc_kernel_sum; - self.utxo_sum = horizon_data.utxo_sum().clone(); - self.kernel_sum = horizon_data.kernel_sum().clone(); - - Ok(()) - } + // async fn initialize(&mut self) -> Result<(), HorizonSyncError> { + // let db = self.db(); + // let local_metadata = db.get_chain_metadata().await?; + // + // let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height); + // if local_metadata.pruned_height() < new_prune_height { + // debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height); + // db.prune_to_height(new_prune_height).await?; + // } + // + // // prune_to_height updates horizon data + // let horizon_data = db.fetch_horizon_data().await?; + // + // debug!( + // target: LOG_TARGET, + // "Loaded from horizon data utxo_sum = {}, kernel_sum = {}", + // horizon_data.utxo_sum().to_hex(), + // horizon_data.kernel_sum().to_hex(), + // ); + // self.utxo_sum = horizon_data.utxo_sum().clone(); + // self.kernel_sum = horizon_data.kernel_sum().clone(); + // + // Ok(()) + // } async fn synchronize_kernels( &mut self, @@ -247,7 +212,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { .fetch_header_containing_kernel_mmr(local_num_kernels + 1) .await?; let req = SyncKernelsRequest { - start_header_hash: current_header.hash().clone(), + start: local_num_kernels, end_header_hash: to_header.hash(), }; let mut kernel_stream = client.sync_kernels(req).await?; @@ -407,15 +372,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { debug!( target: LOG_TARGET, "Found header for utxos at mmr pos: {} - {} height: {}", - start + 1, + start, current_header.header().output_mmr_size, current_header.height() ); let db = self.db().clone(); - let mut output_hashes = vec![]; - let mut witness_hashes = vec![]; let mut txn = db.write_transaction(); let mut unpruned_outputs = vec![]; let mut mmr_position = start; @@ -435,7 +398,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { while let Some(response) = output_stream.next().await { let res: SyncUtxosResponse = response?; - if res.mmr_index > 0 && res.mmr_index != mmr_position { + if res.mmr_index != 0 && res.mmr_index != mmr_position { return Err(HorizonSyncError::IncorrectResponse(format!( "Expected MMR position of {} but got {}", mmr_position, res.mmr_index, @@ -458,9 +421,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ); height_utxo_counter += 1; let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; - output_hashes.push(output.hash()); - witness_hashes.push(output.witness_hash()); unpruned_outputs.push(output.clone()); + + output_mmr.push(output.hash())?; + witness_mmr.push(output.witness_hash())?; self.utxo_sum = &self.utxo_sum + &output.commitment; txn.insert_output_via_horizon_sync( @@ -481,8 +445,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { current_header.height() ); height_txo_counter += 1; - output_hashes.push(utxo.hash.clone()); - witness_hashes.push(utxo.witness_hash.clone()); + output_mmr.push(utxo.hash.clone())?; + witness_mmr.push(utxo.witness_hash.clone())?; + txn.insert_pruned_output_via_horizon_sync( utxo.hash, utxo.witness_hash, @@ -501,15 +466,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ))); } - // Validate root - for hash in output_hashes.drain(..) { - output_mmr.push(hash)?; - } - - for hash in witness_hashes.drain(..) { - witness_mmr.push(hash)?; - } - // Check that the difference bitmap isn't excessively large. Bitmap::deserialize panics if greater // than isize::MAX, however isize::MAX is still an inordinate amount of data. An // arbitrary 4 MiB limit is used. @@ -559,7 +515,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { }); } - // self.validate_rangeproofs(mem::take(&mut unpruned_outputs)).await?; + self.validate_rangeproofs(mem::take(&mut unpruned_outputs)).await?; txn.update_deleted_bitmap(diff_bitmap.clone()); @@ -577,12 +533,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone()); txn.commit().await?; - let data = self.db().fetch_horizon_data().await?; - error!( - target: LOG_TARGET, - "***************** utxo = {} ********************* ", - data.utxo_sum().to_hex(), - ); debug!( target: LOG_TARGET, "UTXO: {}/{}, Header #{}, added {} utxos, added {} txos in {:.2?}", @@ -593,17 +543,28 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { height_txo_counter, timer.elapsed() ); - height_txo_counter = 0; height_utxo_counter = 0; timer = Instant::now(); - current_header = db.fetch_chain_header(current_header.height() + 1).await?; - debug!( - target: LOG_TARGET, - "Expecting to receive the next UTXO set for header #{}", - current_header.height() - ); + if mmr_position == end { + debug!( + target: LOG_TARGET, + "Sync complete at mmr position {}, height #{}", + mmr_position, + current_header.height() + ); + break; + } else { + current_header = db.fetch_chain_header(current_header.height() + 1).await?; + debug!( + target: LOG_TARGET, + "Expecting to receive the next UTXO set {}-{} for header #{}", + mmr_position, + current_header.header().output_mmr_size, + current_header.height() + ); + } }, v => { error!(target: LOG_TARGET, "Remote node returned an invalid response {:?}", v); @@ -671,25 +632,17 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ))); let header = self.db().fetch_chain_header(self.horizon_sync_height).await?; - // TODO: Use accumulated sums - // let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?; - // let utxo_sum = &self.utxo_sum; - // let kernel_sum = &self.kernel_sum; - // if *utxo_sum != calc_utxo_sum { - // error!(target: LOG_TARGET, "UTXO sum isnt equal!"); - // } - // if *kernel_sum != calc_kernel_sum { - // error!(target: LOG_TARGET, "KERNEL sum isnt equal!"); - // } + // TODO: Use cumulative kernel and utxo sums + let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?; self.shared .sync_validators .final_horizon_state .validate( &*self.db().inner().db_read_access()?, - header.height() - 1, - &self.utxo_sum, - &self.kernel_sum, + header.height(), + &calc_utxo_sum, + &calc_kernel_sum, ) .map_err(HorizonSyncError::FinalStateValidationFailed)?; @@ -742,7 +695,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ); let (utxos, _) = self .db() - .fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone()) + .fetch_utxos_in_block(curr_header.hash().clone(), bitmap.clone()) .await?; trace!( target: LOG_TARGET, @@ -752,19 +705,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1 ); - let kernels = self - .db() - .fetch_kernels_by_mmr_position(prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1) - .await?; - let mut utxo_sum = HomomorphicCommitment::default(); - trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); let mut prune_counter = 0; for u in utxos { match u { PrunedOutput::NotPruned { output } => { - utxo_sum = &output.commitment + &utxo_sum; + pruned_utxo_sum = &output.commitment + &pruned_utxo_sum; }, _ => { prune_counter += 1; @@ -776,8 +723,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } prev_mmr = curr_header.header().output_mmr_size; - pruned_utxo_sum = &utxo_sum + &pruned_utxo_sum; - + let kernels = self.db().fetch_kernels_in_block(curr_header.hash().clone()).await?; + trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); for k in kernels { pruned_kernel_sum = &k.excess + &pruned_kernel_sum; } @@ -791,7 +738,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { pruned_utxo_sum ); } - Ok((pruned_utxo_sum, pruned_kernel_sum)) } diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index b17da13bc3..b476f4078f 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -31,7 +31,7 @@ use crate::{ proto::base_node::SyncBlocksRequest, tari_utilities::{hex::Hex, Hashable}, transactions::aggregated_body::AggregateBody, - validation::BlockSyncBodyValidation, + validation::{BlockSyncBodyValidation, ValidationError}, }; use futures::StreamExt; use log::*; @@ -96,6 +96,7 @@ impl BlockSynchronizer { self.db.cleanup_orphans().await?; Ok(()) }, + Err(err @ BlockSyncError::ValidationError(ValidationError::AsyncTaskFailed(_))) => Err(err), Err(err @ BlockSyncError::ValidationError(_)) | Err(err @ BlockSyncError::ReceivedInvalidBlockBody(_)) => { self.ban_peer(node_id, &err).await?; Err(err) diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 47f4b6c75d..cc62c819d0 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -392,10 +392,10 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ let db = self.db(); let start_header = db - .fetch_header_by_block_hash(req.start_header_hash.clone()) + .fetch_header_containing_kernel_mmr(req.start + 1) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| RpcStatus::not_found("Unknown start header"))?; + .into_header(); let end_header = db .fetch_header_by_block_hash(req.end_header_hash.clone()) diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 9a6a59f612..eb13ab4941 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -26,9 +26,8 @@ use crate::{ proto, proto::base_node::{SyncUtxo, SyncUtxosRequest, SyncUtxosResponse}, }; -use croaring::Bitmap; use log::*; -use std::{cmp, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use tari_comms::{protocol::rpc::RpcStatus, utils}; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tokio::{sync::mpsc, task}; @@ -88,20 +87,6 @@ where B: BlockchainBackend + 'static (skip, prev_header.output_mmr_size) }; - // we need to fetch the spent bitmap for the height the client requested - let bitmap = self - .db - .fetch_complete_deleted_bitmap_at(end_header.hash()) - .await - .map_err(|_| { - RpcStatus::general(format!( - "Could not get tip deleted bitmap at hash {}", - end_header.hash().to_hex() - )) - })? - .into_bitmap(); - let bitmap = Arc::new(bitmap); - let include_pruned_utxos = request.include_pruned_utxos; let include_deleted_bitmaps = request.include_deleted_bitmaps; task::spawn(async move { @@ -112,7 +97,6 @@ where B: BlockchainBackend + 'static skip_outputs, prev_utxo_mmr_size, end_header, - bitmap, include_pruned_utxos, include_deleted_bitmaps, ) @@ -125,6 +109,7 @@ where B: BlockchainBackend + 'static Ok(()) } + #[allow(clippy::too_many_arguments)] async fn start_streaming( &self, tx: &mut mpsc::Sender>, @@ -132,10 +117,23 @@ where B: BlockchainBackend + 'static mut skip_outputs: u64, mut prev_utxo_mmr_size: u64, end_header: BlockHeader, - bitmap: Arc, include_pruned_utxos: bool, include_deleted_bitmaps: bool, ) -> Result<(), RpcStatus> { + // we need to fetch the spent bitmap for the height the client requested + let bitmap = self + .db + .fetch_complete_deleted_bitmap_at(end_header.hash()) + .await + .map_err(|err| { + error!(target: LOG_TARGET, "Failed to get deleted bitmap: {}", err); + RpcStatus::general(format!( + "Could not get deleted bitmap at hash {}", + end_header.hash().to_hex() + )) + })? + .into_bitmap(); + let bitmap = Arc::new(bitmap); debug!( target: LOG_TARGET, "Starting stream task with current_header: {}, skip_outputs: {}, prev_utxo_mmr_size: {}, end_header: {}, \ diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index f4f2521149..3d5c61d3c2 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -168,13 +168,9 @@ impl AsyncBlockchainDb { make_async_fn!(fetch_utxos_in_block(hash: HashOutput, deleted: Arc) -> (Vec, Bitmap), "fetch_utxos_in_block"); - make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, deleted: Arc) -> (Vec, Bitmap), "fetch_utxos_by_mmr_position"); - //---------------------------------- Kernel --------------------------------------------// make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig"); - make_async_fn!(fetch_kernels_by_mmr_position(start: u64, end: u64) -> Vec, "fetch_kernels_by_mmr_position"); - make_async_fn!(fetch_kernels_in_block(hash: HashOutput) -> Vec, "fetch_kernels_in_block"); //---------------------------------- MMR --------------------------------------------// diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index c4fa2920f9..e89b8220f2 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -100,9 +100,6 @@ pub trait BlockchainBackend: Send + Sync { excess_sig: &Signature, ) -> Result, ChainStorageError>; - /// Fetch kernels by MMR position - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError>; - /// Fetch all UTXOs and spends in the block fn fetch_utxos_in_block( &self, @@ -110,13 +107,6 @@ pub trait BlockchainBackend: Send + Sync { deleted: &Bitmap, ) -> Result<(Vec, Bitmap), ChainStorageError>; - fn fetch_utxos_by_mmr_position( - &self, - start: u64, - end: u64, - deleted: &Bitmap, - ) -> Result<(Vec, Bitmap), ChainStorageError>; - /// Fetch a specific output. Returns the output and the leaf index in the output MMR fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError>; diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 4ceba06227..116231350e 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -15,7 +15,7 @@ use tari_crypto::tari_utilities::{hex::Hex, ByteArray, Hashable}; use tari_common_types::{ chain_metadata::ChainMetadata, - types::{BlockHash, Commitment, HashDigest, HashOutput, Signature, BLOCK_HASH_LENGTH}, + types::{BlockHash, Commitment, HashDigest, HashOutput, Signature}, }; use tari_mmr::{pruned_hashset::PrunedHashSet, MerkleMountainRange, MutableMmr}; @@ -389,15 +389,6 @@ where B: BlockchainBackend db.fetch_kernels_in_block(&hash) } - pub fn fetch_kernels_by_mmr_position( - &self, - start: u64, - end: u64, - ) -> Result, ChainStorageError> { - let db = self.db_read_access()?; - db.fetch_kernels_by_mmr_position(start, end) - } - pub fn fetch_utxos_in_block( &self, hash: HashOutput, @@ -407,16 +398,6 @@ where B: BlockchainBackend db.fetch_utxos_in_block(&hash, &deleted) } - pub fn fetch_utxos_by_mmr_position( - &self, - start: u64, - end: u64, - deleted: Arc, - ) -> Result<(Vec, Bitmap), ChainStorageError> { - let db = self.db_read_access()?; - db.fetch_utxos_by_mmr_position(start, end, deleted.as_ref()) - } - /// Returns the block header at the given block height. pub fn fetch_header(&self, height: u64) -> Result, ChainStorageError> { let db = self.db_read_access()?; @@ -2105,8 +2086,7 @@ fn prune_database_if_needed( pruning_interval, ); if metadata.pruned_height() < abs_pruning_horizon.saturating_sub(pruning_interval) { - debug!(target: LOG_TARGET, "GONNA PRUNNEEEEE",); - // prune_to_height(db, abs_pruning_horizon)?; + prune_to_height(db, abs_pruning_horizon)?; } Ok(()) @@ -2117,12 +2097,11 @@ fn prune_to_height(db: &mut T, target_horizon_height: u64) let last_pruned = metadata.pruned_height(); if target_horizon_height < last_pruned { return Err(ChainStorageError::InvalidArguments { - func: "prune_to_block", + func: "prune_to_height", arg: "target_horizon_height", message: format!( "Target pruning horizon {} is less than current pruning horizon {}", - target_horizon_height, - last_pruned + 1 + target_horizon_height, last_pruned ), }); } @@ -2135,12 +2114,12 @@ fn prune_to_height(db: &mut T, target_horizon_height: u64) return Ok(()); } - if metadata.height_of_longest_chain() > 0 && target_horizon_height > metadata.height_of_longest_chain() { + if target_horizon_height > metadata.height_of_longest_chain() { return Err(ChainStorageError::InvalidArguments { - func: "prune_to_block", + func: "prune_to_height", arg: "target_horizon_height", message: format!( - "Target pruning horizon {} is less than current block height {}", + "Target pruning horizon {} is greater than current block height {}", target_horizon_height, metadata.height_of_longest_chain() ), @@ -2156,9 +2135,8 @@ fn prune_to_height(db: &mut T, target_horizon_height: u64) "height", last_pruned.to_string(), )?; - let mut block_before_last = None; let mut txn = DbTransaction::new(); - for block_to_prune in (last_pruned + 1)..target_horizon_height { + for block_to_prune in (last_pruned + 1)..=target_horizon_height { let header = db.fetch_chain_header_by_height(block_to_prune)?; let curr_block = db.fetch_block_accumulated_data_by_height(block_to_prune).or_not_found( "BlockAccumulatedData", @@ -2168,30 +2146,27 @@ fn prune_to_height(db: &mut T, target_horizon_height: u64) // Note, this could actually be done in one step instead of each block, since deleted is // accumulated let output_mmr_positions = curr_block.deleted() - last_block.deleted(); - block_before_last = Some(last_block); last_block = curr_block; txn.prune_outputs_at_positions(output_mmr_positions.to_vec()); txn.delete_all_inputs_in_block(header.hash().clone()); } - if let Some(block) = block_before_last { - txn.set_pruned_height( - target_horizon_height, - block.cumulative_kernel_sum().clone(), - block.cumulative_utxo_sum().clone(), - ); - } + txn.set_pruned_height( + target_horizon_height, + last_block.cumulative_kernel_sum().clone(), + last_block.cumulative_utxo_sum().clone(), + ); // If we prune to the tip, we cannot provide any full blocks - if metadata.height_of_longest_chain() == target_horizon_height { - let genesis = db.fetch_chain_header_by_height(0)?; - txn.set_best_block( - 0, - genesis.hash().clone(), - genesis.accumulated_data().total_accumulated_difficulty, - vec![0; BLOCK_HASH_LENGTH], - ); - } + // if metadata.height_of_longest_chain() == target_horizon_height - 1 { + // let genesis = db.fetch_chain_header_by_height(0)?; + // txn.set_best_block( + // 0, + // genesis.hash().clone(), + // genesis.accumulated_data().total_accumulated_difficulty, + // vec![0; BLOCK_HASH_LENGTH], + // ); + // } db.write(txn)?; Ok(()) diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index a231eb87ea..291714c4de 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -24,22 +24,14 @@ // let's ignore this clippy error in this module #![allow(clippy::ptr_arg)] -use std::{convert::TryFrom, fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; - use croaring::Bitmap; use fs2::FileExt; use lmdb_zero::{ConstTransaction, Database, Environment, ReadTransaction, WriteTransaction}; use log::*; use serde::{Deserialize, Serialize}; +use std::{fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex, ByteArray}; -use tari_common_types::{ - chain_metadata::ChainMetadata, - types::{BlockHash, Commitment, HashDigest, HashOutput, Signature, BLOCK_HASH_LENGTH}, -}; -use tari_mmr::{Hash, MerkleMountainRange, MutableMmr}; -use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore}; - use crate::{ blocks::{ Block, @@ -95,6 +87,12 @@ use crate::{ }, }, }; +use tari_common_types::{ + chain_metadata::ChainMetadata, + types::{BlockHash, Commitment, HashDigest, HashOutput, Signature, BLOCK_HASH_LENGTH}, +}; +use tari_mmr::{Hash, MerkleMountainRange, MutableMmr}; +use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore}; type DatabaseRef = Arc>; @@ -1638,66 +1636,6 @@ impl BlockchainBackend for LMDBDatabase { } } - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError> { - let txn = self.read_transaction()?; - - let start_height = match lmdb_first_after(&txn, &self.kernel_mmr_size_index, &(start + 1).to_be_bytes())? { - Some(h) => h, - None => return Ok(vec![]), - }; - let end_height: u64 = - lmdb_first_after(&txn, &self.kernel_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); - - let previous_mmr_count = if start_height == 0 { - 0 - } else { - let header: BlockHeader = - lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); - debug!(target: LOG_TARGET, "Previous header:{}", header); - header.kernel_mmr_size - }; - - let total_size = (end - start) as usize + 1; - let mut result = Vec::with_capacity(total_size); - - let mut skip_amount = (start - previous_mmr_count) as usize; - debug!( - target: LOG_TARGET, - "Fetching kernels by MMR position. Start {}, end {}, in headers at height {}-{}, prev mmr count: {}, \ - skipping the first:{}", - start, - end, - start_height, - end_height, - previous_mmr_count, - skip_amount - ); - - for height in start_height..=end_height { - let acc_data = lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader", - field: "height", - value: height.to_string(), - })?; - - result.extend( - lmdb_fetch_keys_starting_with::( - acc_data.hash.to_hex().as_str(), - &txn, - &self.kernels_db, - )? - .into_iter() - .skip(skip_amount) - .take(total_size - result.len()) - .map(|f| f.kernel), - ); - - skip_amount = 0; - } - Ok(result) - } - fn fetch_utxos_in_block( &self, header_hash: &HashOutput, @@ -1705,14 +1643,6 @@ impl BlockchainBackend for LMDBDatabase { ) -> Result<(Vec, Bitmap), ChainStorageError> { let txn = self.read_transaction()?; - let height = - self.fetch_height_from_hash(&txn, header_hash)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader", - field: "hash", - value: header_hash.to_hex(), - })?; - let utxos = lmdb_fetch_keys_starting_with::( header_hash.to_hex().as_str(), &txn, @@ -1737,6 +1667,14 @@ impl BlockchainBackend for LMDBDatabase { }) .collect(); + let height = + self.fetch_height_from_hash(&txn, header_hash)? + .ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockHeader", + field: "hash", + value: header_hash.to_hex(), + })?; + // Builds a BitMap of the deleted UTXO MMR indexes that occurred at the current height let acc_data = self.fetch_block_accumulated_data(&txn, height)? @@ -1752,105 +1690,6 @@ impl BlockchainBackend for LMDBDatabase { Ok((utxos, difference_bitmap)) } - fn fetch_utxos_by_mmr_position( - &self, - start: u64, - end: u64, - deleted: &Bitmap, - ) -> Result<(Vec, Bitmap), ChainStorageError> { - let txn = self.read_transaction()?; - let start_height = lmdb_first_after(&txn, &self.output_mmr_size_index, &(start + 1).to_be_bytes())? - .ok_or_else(|| { - ChainStorageError::InvalidQuery(format!( - "Unable to find block height from start output MMR index {}", - start - )) - })?; - let end_height: u64 = - lmdb_first_after(&txn, &self.output_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); - - let previous_mmr_count = if start_height == 0 { - 0 - } else { - let header: BlockHeader = - lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); - debug!(target: LOG_TARGET, "Previous header:{}", header); - header.output_mmr_size - }; - - let total_size = end - .checked_sub(start) - .and_then(|v| v.checked_add(1)) - .and_then(|v| usize::try_from(v).ok()) - .ok_or_else(|| { - ChainStorageError::InvalidQuery("fetch_utxos_by_mmr_position: end is less than start".to_string()) - })?; - let mut result = Vec::with_capacity(total_size); - - let mut skip_amount = (start - previous_mmr_count) as usize; - debug!( - target: LOG_TARGET, - "Fetching outputs by MMR position. Start {}, end {}, starting in header at height {}, prev mmr count: \ - {}, skipping the first:{}", - start, - end, - start_height, - previous_mmr_count, - skip_amount - ); - let mut difference_bitmap = Bitmap::create(); - - for height in start_height..=end_height { - let accum_data = - lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader", - field: "height", - value: height.to_string(), - })?; - - result.extend( - lmdb_fetch_keys_starting_with::( - accum_data.hash.to_hex().as_str(), - &txn, - &self.utxos_db, - )? - .into_iter() - .skip(skip_amount) - .take(total_size - result.len()) - .map(|row| { - if deleted.contains(row.mmr_position) { - return PrunedOutput::Pruned { - output_hash: row.hash, - witness_hash: row.witness_hash, - }; - } - if let Some(output) = row.output { - PrunedOutput::NotPruned { output } - } else { - PrunedOutput::Pruned { - output_hash: row.hash, - witness_hash: row.witness_hash, - } - } - }), - ); - - // Builds a BitMap of the deleted UTXO MMR indexes that occurred at the current height - let diff_bitmap = self - .fetch_block_accumulated_data(&txn, height) - .or_not_found("BlockAccumulatedData", "height", height.to_string())? - .deleted() - .clone(); - difference_bitmap.or_inplace(&diff_bitmap); - - skip_amount = 0; - } - - difference_bitmap.run_optimize(); - Ok((result, difference_bitmap)) - } - fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { debug!(target: LOG_TARGET, "Fetch output: {}", output_hash.to_hex()); let txn = self.read_transaction()?; @@ -2192,7 +2031,7 @@ impl BlockchainBackend for LMDBDatabase { fn fetch_horizon_data(&self) -> Result, ChainStorageError> { let txn = self.read_transaction()?; - fetch_horizon_data(&txn, &self.metadata_db) + Ok(Some(fetch_horizon_data(&txn, &self.metadata_db)?)) } fn get_stats(&self) -> Result { @@ -2249,7 +2088,7 @@ fn fetch_chain_height(txn: &ConstTransaction<'_>, db: &Database) -> Result, db: &Database) -> Result { let k = MetadataKey::PrunedHeight; let val: Option = lmdb_get(txn, db, &k.as_u32())?; @@ -2258,13 +2097,18 @@ fn fetch_pruned_height(txn: &ConstTransaction<'_>, db: &Database) -> Result Ok(0), } } -// Fetches the best block hash from the provided metadata db. -fn fetch_horizon_data(txn: &ConstTransaction<'_>, db: &Database) -> Result, ChainStorageError> { + +/// Fetches the horizon data from the provided metadata db. +fn fetch_horizon_data(txn: &ConstTransaction<'_>, db: &Database) -> Result { let k = MetadataKey::HorizonData; let val: Option = lmdb_get(txn, db, &k.as_u32())?; match val { - Some(MetadataValue::HorizonData(data)) => Ok(Some(data)), - None => Ok(None), + Some(MetadataValue::HorizonData(data)) => Ok(data), + None => Err(ChainStorageError::ValueNotFound { + entity: "HorizonData", + field: "metadata", + value: "".to_string(), + }), Some(k) => Err(ChainStorageError::DataInconsistencyDetected { function: "fetch_horizon_data", details: format!("Received incorrect value {:?} for key horizon data", k), diff --git a/base_layer/core/src/lib.rs b/base_layer/core/src/lib.rs index 965a6b980c..1522d8420b 100644 --- a/base_layer/core/src/lib.rs +++ b/base_layer/core/src/lib.rs @@ -20,10 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// #![cfg_attr(not(debug_assertions), deny(unused_variables))] -// #![cfg_attr(not(debug_assertions), deny(unused_imports))] -// #![cfg_attr(not(debug_assertions), deny(dead_code))] -// #![cfg_attr(not(debug_assertions), deny(unused_extern_crates))] +#![cfg_attr(not(debug_assertions), deny(unused_variables))] +#![cfg_attr(not(debug_assertions), deny(unused_imports))] +#![cfg_attr(not(debug_assertions), deny(dead_code))] +#![cfg_attr(not(debug_assertions), deny(unused_extern_crates))] #![deny(unused_must_use)] #![deny(unreachable_patterns)] #![deny(unknown_lints)] diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index 51f09fe0bd..3673f7e950 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -268,10 +268,6 @@ impl BlockchainBackend for TempDatabase { self.db.as_ref().unwrap().fetch_kernel_by_excess_sig(excess_sig) } - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError> { - self.db.as_ref().unwrap().fetch_kernels_by_mmr_position(start, end) - } - fn fetch_utxos_in_block( &self, header_hash: &HashOutput, @@ -280,18 +276,6 @@ impl BlockchainBackend for TempDatabase { self.db.as_ref().unwrap().fetch_utxos_in_block(header_hash, deleted) } - fn fetch_utxos_by_mmr_position( - &self, - start: u64, - end: u64, - deleted: &Bitmap, - ) -> Result<(Vec, Bitmap), ChainStorageError> { - self.db - .as_ref() - .unwrap() - .fetch_utxos_by_mmr_position(start, end, deleted) - } - fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { self.db.as_ref().unwrap().fetch_output(output_hash) } diff --git a/base_layer/core/src/validation/error.rs b/base_layer/core/src/validation/error.rs index 369cf1831a..bc2f3bb0d5 100644 --- a/base_layer/core/src/validation/error.rs +++ b/base_layer/core/src/validation/error.rs @@ -57,7 +57,7 @@ pub enum ValidationError { InvalidAccountingBalance, #[error("Transaction contains already spent inputs")] ContainsSTxO, - #[error("Transaction contains already outputs that already exist")] + #[error("Transaction contains outputs that already exist")] ContainsTxO, #[error("Transaction contains an output commitment that already exists")] ContainsDuplicateUtxoCommitment, diff --git a/base_layer/core/tests/base_node_rpc.rs b/base_layer/core/tests/base_node_rpc.rs index 7643b5ca28..0a7ecdd856 100644 --- a/base_layer/core/tests/base_node_rpc.rs +++ b/base_layer/core/tests/base_node_rpc.rs @@ -303,8 +303,7 @@ async fn test_get_height_at_time() { let (_, service, base_node, request_mock, consensus_manager, block0, _utxo0, _temp_dir) = setup().await; let mut prev_block = block0.clone(); - let mut times = Vec::new(); - times.push(prev_block.header().timestamp); + let mut times = vec![prev_block.header().timestamp]; for _ in 0..10 { tokio::time::sleep(Duration::from_secs(2)).await; let new_block = base_node diff --git a/base_layer/key_manager/src/mnemonic.rs b/base_layer/key_manager/src/mnemonic.rs index 9338e82217..8df3f90734 100644 --- a/base_layer/key_manager/src/mnemonic.rs +++ b/base_layer/key_manager/src/mnemonic.rs @@ -350,7 +350,7 @@ mod test { "abandon".to_string(), "tipico".to_string(), ]; - assert_eq!(MnemonicLanguage::detect_language(&words2).is_err(), true); + assert!(MnemonicLanguage::detect_language(&words2).is_err()); // bounds check (last word is invalid) let words3 = vec![ @@ -360,7 +360,7 @@ mod test { "abandon".to_string(), "topazio".to_string(), ]; - assert_eq!(MnemonicLanguage::detect_language(&words3).is_err(), true); + assert!(MnemonicLanguage::detect_language(&words3).is_err()); // building up a word list: English/French + French -> French let mut words = Vec::with_capacity(3); diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index d54c307c01..c33481f3c2 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -69,7 +69,6 @@ use tari_wallet::{ handle::TransactionEvent, storage::sqlite_db::TransactionServiceSqliteDatabase, }, - utxo_scanner_service::utxo_scanning::UtxoScannerService, Wallet, WalletConfig, WalletSqlite,