diff --git a/applications/tari_app_grpc/proto/base_node.proto b/applications/tari_app_grpc/proto/base_node.proto index 13abf1da19..fcc83a6f96 100644 --- a/applications/tari_app_grpc/proto/base_node.proto +++ b/applications/tari_app_grpc/proto/base_node.proto @@ -93,7 +93,7 @@ service BaseNode { rpc GetAssetMetadata(GetAssetMetadataRequest) returns (GetAssetMetadataResponse); // Get all constitutions where the public key is in the committee - rpc GetConstitutions(GetConstitutionsRequest) returns (stream TransactionOutput); + rpc GetConstitutions(GetConstitutionsRequest) returns (stream GetConstitutionsResponse); } message GetAssetMetadataRequest { @@ -446,5 +446,13 @@ message MempoolStatsResponse { } message GetConstitutionsRequest { - bytes dan_node_public_key = 1; + bytes start_block_hash = 1; + bytes dan_node_public_key = 2; +} + +message GetConstitutionsResponse { + TransactionOutput output = 1; + uint32 mmr_position = 2; + uint64 mined_height = 3; + bytes header_hash = 4; } diff --git a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs index cc5447452d..9e780bfbca 100644 --- a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs +++ b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs @@ -33,7 +33,7 @@ use tari_app_grpc::{ tari_rpc::{CalcType, Sorting}, }; use tari_app_utilities::consts; -use tari_common_types::types::{Commitment, PublicKey, Signature}; +use tari_common_types::types::{Commitment, FixedHash, PublicKey, Signature}; use tari_comms::{Bytes, CommsNode}; use tari_core::{ base_node::{ @@ -48,7 +48,10 @@ use tari_core::{ iterators::NonOverlappingIntegerPairIter, mempool::{service::LocalMempoolService, TxStorageResponse}, proof_of_work::PowAlgorithm, - transactions::{aggregated_body::AggregateBody, transaction_components::Transaction}, + transactions::{ + aggregated_body::AggregateBody, + transaction_components::{OutputType, Transaction}, + }, }; use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle}; use tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray, Hashable}; @@ -134,7 +137,7 @@ pub async fn get_heights( impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { type FetchMatchingUtxosStream = mpsc::Receiver>; type GetBlocksStream = mpsc::Receiver>; - type GetConstitutionsStream = mpsc::Receiver>; + type GetConstitutionsStream = mpsc::Receiver>; type GetMempoolTransactionsStream = mpsc::Receiver>; type GetNetworkDifficultyStream = mpsc::Receiver>; type GetPeersStream = mpsc::Receiver>; @@ -1827,20 +1830,38 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { Ok(Response::new(response)) } + #[allow(clippy::too_many_lines)] async fn get_constitutions( &self, request: Request, ) -> Result, Status> { let report_error_flag = self.report_error_flag(); let request = request.into_inner(); - let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key).map_err(|err| { - report_error( - report_error_flag, - Status::invalid_argument(format!("Dan node public key is not a valid public key:{}", err)), - ) - })?; + let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key) + .map_err(|_| Status::invalid_argument("Dan node public key is not a valid public key"))?; - let mut handler = self.node_service.clone(); + let start_hash = Some(request.start_block_hash) + .filter(|h| !h.is_empty()) + .map(FixedHash::try_from) + .transpose() + .map_err(|_| Status::invalid_argument("Block hash has an invalid length"))?; + + let mut node_service = self.node_service.clone(); + // Check the start_hash is correct, or if not provided, start from genesis + let start_header = match start_hash { + Some(hash) => node_service + .get_header_by_hash(hash.to_vec()) + .await + .map_err(|e| report_error(report_error_flag, Status::internal(e.to_string())))? + .ok_or_else(|| Status::not_found(format!("No block found with hash {}", hash)))?, + None => node_service + .get_header(0) + .await + .map_err(|e| report_error(report_error_flag, Status::internal(e.to_string())))? + .ok_or_else(|| Status::internal("Node does not have a genesis block!?"))?, + }; + // The number of headers to load at once to query for UTXOs + const BATCH_SIZE: u64 = 50; let (mut sender, receiver) = mpsc::channel(50); task::spawn(async move { let dan_node_public_key_hex = dan_node_public_key.to_hex(); @@ -1848,43 +1869,88 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { target: LOG_TARGET, "Starting thread to process GetConstitutions: dan_node_public_key: {}", dan_node_public_key_hex, ); - let constitutions = match handler.get_constitutions(dan_node_public_key).await { - Ok(constitutions) => constitutions, - Err(err) => { - warn!(target: LOG_TARGET, "Error communicating with base node: {:?}", err,); - let _get_token_response = - sender.send(Err(report_error(report_error_flag, Status::internal("Internal error")))); - return; - }, - }; + let mut current_height = start_header.height(); - debug!( - target: LOG_TARGET, - "Found {} constitutions for {}", - constitutions.len(), - dan_node_public_key_hex - ); - - for constitution in constitutions { - match sender.send(Ok(constitution.into())).await { - Ok(_) => (), + loop { + let headers = match node_service + .get_headers(current_height..=current_height + BATCH_SIZE) + .await + { + Ok(h) => h, Err(err) => { - warn!(target: LOG_TARGET, "Error sending constitution via GRPC: {}", err); - match sender - .send(Err(report_error( - report_error_flag, - Status::unknown("Error sending data"), - ))) - .await - { - Ok(_) => (), - Err(send_err) => { - warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err) - }, - } + error!(target: LOG_TARGET, "Error fetching headers: {}", err); + let _err = sender + .send(Err(report_error(report_error_flag, Status::internal(err.to_string())))) + .await; return; }, + }; + + if headers.is_empty() { + break; + } + let num_headers = headers.len(); + + for header in headers { + let block_hash_hex = header.hash().to_hex(); + match node_service + .get_contract_outputs_for_block(header.hash().clone(), OutputType::ContractConstitution) + .await + { + Ok(constitutions) => { + debug!( + target: LOG_TARGET, + "Found {} constitutions in block {}", + constitutions.len(), + block_hash_hex + ); + + let constitutions = constitutions.into_iter().filter(|utxo| { + // Filter for constitutions containing the dan_node_public_key + utxo.output + .as_transaction_output() + .and_then(|output| output.features.sidechain_features.as_ref()) + .and_then(|sidechain| sidechain.constitution.as_ref()) + .filter(|constitution| { + constitution.validator_committee.contains(&dan_node_public_key) + }) + .is_some() + }); + + for utxo in constitutions { + if sender + .send(Ok(tari_rpc::GetConstitutionsResponse { + header_hash: utxo.header_hash, + mined_height: utxo.mined_height, + mmr_position: utxo.mmr_position, + output: utxo.output.into_unpruned_output().map(Into::into), + })) + .await + .is_err() + { + warn!( + target: LOG_TARGET, + "Client disconnected while sending constitution via GRPC" + ); + break; + } + } + }, + Err(err) => { + warn!(target: LOG_TARGET, "Error fetching contract outputs for block: {}", err); + let _err = sender + .send(Err(report_error(report_error_flag, Status::internal("Internal error")))) + .await; + return; + }, + } + } + + if num_headers < BATCH_SIZE as usize { + break; } + + current_height += BATCH_SIZE + 1; } }); Ok(Response::new(receiver)) diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 44125c3abc..2c3b078fb3 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -97,6 +97,7 @@ impl DanNode { for output in outputs { if let Some(sidechain_features) = output.features.sidechain_features { let contract_id = sidechain_features.contract_id; + // TODO: expect will crash the validator node if the base node misbehaves let constitution = sidechain_features.constitution.expect("Constitution wasn't present"); if constitution.acceptance_requirements.acceptance_period_expiry < last_tip { diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index 803eb06c55..3136adf419 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -106,11 +106,16 @@ impl BaseNodeClient for GrpcBaseNodeClient { ) -> Result, DigitalAssetError> { let inner = self.connection().await?; let request = grpc::GetConstitutionsRequest { + // TODO: pass in the last block hash that was scanned + start_block_hash: vec![], dan_node_public_key: dan_node_public_key.as_bytes().to_vec(), }; let mut result = inner.get_constitutions(request).await?.into_inner(); let mut outputs = vec![]; - while let Some(output) = result.message().await? { + while let Some(mined_info) = result.message().await? { + let output = mined_info + .output + .ok_or_else(|| DigitalAssetError::InvalidPeerMessage("Mined info contained no output".to_string()))?; outputs.push(output.try_into().map_err(DigitalAssetError::ConversionError)?); } Ok(outputs) diff --git a/base_layer/core/src/base_node/comms_interface/comms_request.rs b/base_layer/core/src/base_node/comms_interface/comms_request.rs index d48f297f2b..6cb12fc6a3 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_request.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_request.rs @@ -29,7 +29,12 @@ use serde::{Deserialize, Serialize}; use tari_common_types::types::{Commitment, HashOutput, PrivateKey, PublicKey, Signature}; use tari_utilities::hex::Hex; -use crate::{blocks::NewBlockTemplate, chain_storage::MmrTree, proof_of_work::PowAlgorithm}; +use crate::{ + blocks::NewBlockTemplate, + chain_storage::MmrTree, + proof_of_work::PowAlgorithm, + transactions::transaction_components::OutputType, +}; /// A container for the parameters required for a FetchMmrState request. #[derive(Debug, Serialize, Deserialize)] @@ -70,8 +75,9 @@ pub enum NodeCommsRequest { FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec, }, - FetchConstitutions { - dan_node_public_key: PublicKey, + FetchContractOutputsForBlock { + block_hash: HashOutput, + output_type: OutputType, }, } @@ -122,7 +128,7 @@ impl Display for NodeCommsRequest { FetchMempoolTransactionsByExcessSigs { .. } => { write!(f, "FetchMempoolTransactionsByExcessSigs") }, - FetchConstitutions { .. } => { + FetchContractOutputsForBlock { .. } => { write!(f, "FetchConstitutions") }, } diff --git a/base_layer/core/src/base_node/comms_interface/comms_response.rs b/base_layer/core/src/base_node/comms_interface/comms_response.rs index 3f9fb4d7f3..c7d805c3bf 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_response.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_response.rs @@ -66,8 +66,8 @@ pub enum NodeCommsResponse { output: Box>, }, FetchMempoolTransactionsByExcessSigsResponse(FetchMempoolTransactionsResponse), - FetchConstitutionsResponse { - outputs: Vec, + FetchOutputsForBlockResponse { + outputs: Vec, }, } @@ -106,7 +106,7 @@ impl Display for NodeCommsResponse { resp.transactions.len(), resp.not_found.len() ), - FetchConstitutionsResponse { .. } => write!(f, "FetchConstitutionsResponse"), + FetchOutputsForBlockResponse { .. } => write!(f, "FetchConstitutionsResponse"), } } } diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 5a09669887..7cd703dce0 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -463,12 +463,15 @@ where B: BlockchainBackend + 'static } Ok(NodeCommsResponse::FetchTokensResponse { outputs }) }, - NodeCommsRequest::FetchConstitutions { dan_node_public_key } => { - debug!(target: LOG_TARGET, "Starting fetch constitutions"); - Ok(NodeCommsResponse::FetchConstitutionsResponse { - outputs: self.blockchain_db.fetch_all_constitutions(dan_node_public_key).await?, - }) - }, + NodeCommsRequest::FetchContractOutputsForBlock { + block_hash, + output_type, + } => Ok(NodeCommsResponse::FetchOutputsForBlockResponse { + outputs: self + .blockchain_db + .fetch_contract_outputs_for_block(block_hash, output_type) + .await?, + }), NodeCommsRequest::FetchAssetRegistrations { range } => { let top_level_pubkey = PublicKey::default(); #[allow(clippy::range_plus_one)] diff --git a/base_layer/core/src/base_node/comms_interface/local_interface.rs b/base_layer/core/src/base_node/comms_interface/local_interface.rs index d047327a95..2629454dda 100644 --- a/base_layer/core/src/base_node/comms_interface/local_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/local_interface.rs @@ -40,7 +40,7 @@ use crate::{ blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate}, chain_storage::UtxoMinedInfo, proof_of_work::PowAlgorithm, - transactions::transaction_components::{TransactionKernel, TransactionOutput}, + transactions::transaction_components::{OutputType, TransactionKernel, TransactionOutput}, }; pub type BlockEventSender = broadcast::Sender>; @@ -319,16 +319,20 @@ impl LocalNodeCommsInterface { } } - pub async fn get_constitutions( + pub async fn get_contract_outputs_for_block( &mut self, - dan_node_public_key: PublicKey, - ) -> Result, CommsInterfaceError> { + block_hash: BlockHash, + output_type: OutputType, + ) -> Result, CommsInterfaceError> { match self .request_sender - .call(NodeCommsRequest::FetchConstitutions { dan_node_public_key }) + .call(NodeCommsRequest::FetchContractOutputsForBlock { + block_hash, + output_type, + }) .await?? { - NodeCommsResponse::FetchConstitutionsResponse { outputs } => Ok(outputs), + NodeCommsResponse::FetchOutputsForBlockResponse { outputs } => Ok(outputs), _ => Err(CommsInterfaceError::UnexpectedApiResponse), } } diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 70373d6e93..933d285377 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -66,7 +66,7 @@ use crate::{ }, common::rolling_vec::RollingVec, proof_of_work::{PowAlgorithm, TargetDifficultyWindow}, - transactions::transaction_components::{TransactionKernel, TransactionOutput}, + transactions::transaction_components::{OutputType, TransactionKernel, TransactionOutput}, }; const LOG_TARGET: &str = "c::bn::async_db"; @@ -176,7 +176,7 @@ impl AsyncBlockchainDb { make_async_fn!(utxo_count() -> usize, "utxo_count"); - make_async_fn!(fetch_all_constitutions(dan_node_public_key: PublicKey) -> Vec, "fetch_all_constitutions"); + make_async_fn!(fetch_contract_outputs_for_block(block_hash: BlockHash, output_type: OutputType) -> Vec, "fetch_contract_outputs_for_block"); //---------------------------------- Kernel --------------------------------------------// make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig"); diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index eddd0ace3b..293b6d7be3 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -6,7 +6,7 @@ use std::ops::Range; use croaring::Bitmap; use tari_common_types::{ chain_metadata::ChainMetadata, - types::{Commitment, FixedHash, HashOutput, PublicKey, Signature}, + types::{BlockHash, Commitment, FixedHash, HashOutput, PublicKey, Signature}, }; use tari_mmr::Hash; @@ -33,7 +33,7 @@ use crate::{ Reorg, UtxoMinedInfo, }, - transactions::transaction_components::{OutputType, TransactionInput, TransactionKernel, TransactionOutput}, + transactions::transaction_components::{OutputType, TransactionInput, TransactionKernel}, }; /// Identify behaviour for Blockchain database backends. Implementations must support `Send` and `Sync` so that @@ -138,10 +138,12 @@ pub trait BlockchainBackend: Send + Sync { range: Range, ) -> Result, ChainStorageError>; - fn fetch_all_constitutions( + /// Fetches contract UTXOs mined within the given block. + fn fetch_contract_outputs_for_block( &self, - dan_node_public_key: &PublicKey, - ) -> Result, ChainStorageError>; + block_hash: &BlockHash, + output_type: OutputType, + ) -> Result, ChainStorageError>; /// Fetch all outputs in a block fn fetch_outputs_in_block(&self, header_hash: &HashOutput) -> Result, ChainStorageError>; diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index e29bfce801..12251def0d 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -79,7 +79,7 @@ use crate::{ common::rolling_vec::RollingVec, consensus::{chain_strength_comparer::ChainStrengthComparer, ConsensusConstants, ConsensusManager}, proof_of_work::{monero_rx::MoneroPowData, PowAlgorithm, TargetDifficultyWindow}, - transactions::transaction_components::{OutputType, TransactionInput, TransactionKernel, TransactionOutput}, + transactions::transaction_components::{OutputType, TransactionInput, TransactionKernel}, validation::{ helpers::calc_median_timestamp, DifficultyCalculator, @@ -423,12 +423,13 @@ where B: BlockchainBackend Ok(result) } - pub fn fetch_all_constitutions( + pub fn fetch_contract_outputs_for_block( &self, - dan_node_public_key: PublicKey, - ) -> Result, ChainStorageError> { + block_hash: BlockHash, + output_type: OutputType, + ) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_all_constitutions(&dan_node_public_key) + db.fetch_contract_outputs_for_block(&block_hash, output_type) } pub fn fetch_kernel_by_excess( @@ -1371,9 +1372,19 @@ pub fn fetch_chain_headers( )); } - (start..=end_inclusive) - .map(|h| db.fetch_chain_header_by_height(h)) - .collect() + #[allow(clippy::cast_possible_truncation)] + let mut headers = Vec::with_capacity((end_inclusive - start) as usize); + for h in start..=end_inclusive { + match db.fetch_chain_header_by_height(h) { + Ok(header) => { + headers.push(header); + }, + Err(ChainStorageError::ValueNotFound { .. }) => break, + Err(e) => return Err(e), + } + } + + Ok(headers) } fn insert_headers(db: &mut T, headers: Vec) -> Result<(), ChainStorageError> { diff --git a/base_layer/core/src/chain_storage/lmdb_db/composite_key.rs b/base_layer/core/src/chain_storage/lmdb_db/composite_key.rs new file mode 100644 index 0000000000..565feb8104 --- /dev/null +++ b/base_layer/core/src/chain_storage/lmdb_db/composite_key.rs @@ -0,0 +1,93 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + fmt::{Display, Formatter}, + ops::{Deref, DerefMut}, +}; + +use tari_utilities::hex::to_hex; + +#[derive(Debug, Clone, Copy)] +pub(super) struct CompositeKey { + bytes: [u8; KEY_LEN], + len: usize, +} + +impl CompositeKey { + pub fn new() -> Self { + Self { + bytes: Self::new_buf(), + len: 0, + } + } + + pub fn push>(&mut self, bytes: T) -> bool { + let b = bytes.as_ref(); + let new_len = self.len + b.len(); + if new_len > KEY_LEN { + return false; + } + self.bytes[self.len..new_len].copy_from_slice(b); + self.len = new_len; + true + } + + fn as_bytes(&self) -> &[u8] { + &self.bytes[..self.len] + } + + fn as_bytes_mut(&mut self) -> &mut [u8] { + &mut self.bytes[..self.len] + } + + /// Returns a fixed 0-filled byte array. + const fn new_buf() -> [u8; KEY_LEN] { + [0x0u8; KEY_LEN] + } +} + +impl Display for CompositeKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", to_hex(self.as_bytes())) + } +} + +impl Deref for CompositeKey { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_bytes() + } +} + +impl DerefMut for CompositeKey { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_bytes_mut() + } +} + +impl AsRef<[u8]> for CompositeKey { + fn as_ref(&self) -> &[u8] { + self.as_bytes() + } +} diff --git a/base_layer/core/src/chain_storage/lmdb_db/contract_index.rs b/base_layer/core/src/chain_storage/lmdb_db/contract_index.rs index 9d5661aa38..5f07a0fff4 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/contract_index.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/contract_index.rs @@ -23,20 +23,23 @@ use std::{ collections::{hash_map::DefaultHasher, HashSet}, convert::{TryFrom, TryInto}, - fmt::{Debug, Display, Formatter}, - hash::BuildHasherDefault, + fmt::Debug, + hash::{BuildHasherDefault, Hash}, ops::Deref, }; use lmdb_zero::{traits::AsLmdbBytes, ConstTransaction, Database, WriteTransaction}; use log::*; -use serde::{de::DeserializeOwned, Serialize}; -use tari_common_types::types::FixedHash; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tari_common_types::types::{BlockHash, FixedHash}; use tari_utilities::{hex::to_hex, Hashable}; use crate::{ chain_storage::{ - lmdb_db::lmdb::{lmdb_delete, lmdb_exists, lmdb_get, lmdb_insert, lmdb_replace}, + lmdb_db::{ + composite_key::CompositeKey, + lmdb::{lmdb_delete, lmdb_exists, lmdb_fetch_matching_after, lmdb_get, lmdb_insert, lmdb_replace}, + }, ChainStorageError, }, transactions::transaction_components::{OutputType, TransactionInput, TransactionOutput}, @@ -53,6 +56,12 @@ pub(super) struct ContractIndex<'a, T> { db: &'a Database<'a>, } +/// A hash set using the DefaultHasher. Since output hashes are not user controlled and uniformly random there is no +/// need to use RandomState hasher. +type DefaultHashSet = HashSet>; +type FixedHashSet = DefaultHashSet; +type ContractValueHashSet = DefaultHashSet; + impl<'a, T> ContractIndex<'a, T> where T: Deref> { @@ -61,18 +70,52 @@ where T: Deref> Self { txn, db } } - pub fn fetch(&self, contract_id: FixedHash, output_type: OutputType) -> Result, ChainStorageError> { + pub fn find_by_contract_id( + &self, + contract_id: FixedHash, + output_type: OutputType, + ) -> Result, ChainStorageError> { let key = ContractIndexKey::new(contract_id, output_type); - match output_type { OutputType::ContractDefinition | OutputType::ContractCheckpoint | OutputType::ContractConstitution => { - Ok(self.find::(&key)?.into_iter().collect()) + Ok(self + .get::<_, ContractIndexValue>(&key)? + .into_iter() + .map(|v| v.output_hash) + .collect()) }, OutputType::ContractValidatorAcceptance | OutputType::ContractConstitutionProposal | - OutputType::ContractConstitutionChangeAcceptance => { - Ok(self.find::(&key)?.into_iter().flatten().collect()) + OutputType::ContractConstitutionChangeAcceptance => Ok(self + .get::<_, ContractValueHashSet>(&key)? + .into_iter() + .flatten() + .map(|v| v.output_hash) + .collect()), + _ => Err(ChainStorageError::InvalidOperation(format!( + "Cannot fetch output type {} from contract index", + output_type + ))), + } + } + + pub fn find_by_block( + &self, + block_hash: FixedHash, + output_type: OutputType, + ) -> Result, ChainStorageError> { + let key = BlockContractIndexKey::prefixed(block_hash, output_type); + match output_type { + OutputType::ContractDefinition | OutputType::ContractCheckpoint | OutputType::ContractConstitution => { + self.get_all_matching::<_, FixedHash>(&key) }, + OutputType::ContractValidatorAcceptance | + OutputType::ContractConstitutionProposal | + OutputType::ContractConstitutionChangeAcceptance => Ok(self + .get_all_matching::<_, FixedHashSet>(&key)? + .into_iter() + .flatten() + .collect()), _ => Err(ChainStorageError::InvalidOperation(format!( "Cannot fetch output type {} from contract index", output_type @@ -80,18 +123,24 @@ where T: Deref> } } - fn find(&self, key: &ContractIndexKey) -> Result, ChainStorageError> { + fn get(&self, key: &K) -> Result, ChainStorageError> { lmdb_get(&**self.txn, self.db, key) } - fn exists(&self, key: &ContractIndexKey) -> Result { + fn get_all_matching(&self, key: &K) -> Result, ChainStorageError> { + lmdb_fetch_matching_after(&**self.txn, self.db, key.as_lmdb_bytes()) + } + + fn exists(&self, key: &K) -> Result { lmdb_exists(&**self.txn, self.db, key) } } impl<'a> ContractIndex<'a, WriteTransaction<'a>> { /// Called when a new output must be added to the index - pub fn add_output(&self, output: &TransactionOutput) -> Result<(), ChainStorageError> { + pub fn add_output(&self, block_hash: &BlockHash, output: &TransactionOutput) -> Result<(), ChainStorageError> { + let block_hash = FixedHash::try_from(block_hash.as_slice()) + .map_err(|_| ChainStorageError::CriticalError("block_hash was not 32-bytes".to_string()))?; let output_hash = FixedHash::try_from(output.hash()) .map_err(|_| ChainStorageError::CriticalError("output.hash() was not 32-bytes".to_string()))?; @@ -101,12 +150,14 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { output_hash )) })?; - self.add_to_index(contract_id, output.features.output_type, output_hash) + self.add_to_index(block_hash, contract_id, output.features.output_type, output_hash) } /// Updates the index, removing references to the output that the given input spends. pub fn spend(&self, input: &TransactionInput) -> Result<(), ChainStorageError> { - let output_hash = FixedHash::try_from(input.output_hash()).unwrap(); + let output_hash = FixedHash::try_from(input.output_hash()) + .map_err(|_| ChainStorageError::CriticalError("input.output_hash() was not 32-bytes".to_string()))?; + let features = input.features()?; let contract_id = features.contract_id().ok_or_else(|| { ChainStorageError::InvalidOperation(format!( @@ -119,7 +170,8 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { /// Updates the index, rewinding (undoing) the effect of the output on the index state. pub fn rewind_output(&self, output: &TransactionOutput) -> Result<(), ChainStorageError> { - let output_hash = FixedHash::try_from(output.hash()).unwrap(); + let output_hash = FixedHash::try_from(output.hash()) + .map_err(|_| ChainStorageError::CriticalError("output.hash() was not 32-bytes".to_string()))?; let features = &output.features; let contract_id = features.contract_id().ok_or_else(|| { ChainStorageError::InvalidOperation(format!( @@ -131,7 +183,11 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { } /// Updates the index, rewinding (undoing) the effect of the input on the index state. - pub fn rewind_input(&self, input: &TransactionInput) -> Result<(), ChainStorageError> { + pub fn rewind_input(&self, block_hash: &[u8], input: &TransactionInput) -> Result<(), ChainStorageError> { + let block_hash = block_hash + .try_into() + .map_err(|_| ChainStorageError::CriticalError("block_hash was not 32-bytes".to_string()))?; + let output_hash = input .output_hash() .try_into() @@ -144,31 +200,40 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { output_hash )) })?; - self.add_to_index(contract_id, features.output_type, output_hash) + self.add_to_index(block_hash, contract_id, features.output_type, output_hash) } fn add_to_index( &self, + block_hash: FixedHash, contract_id: FixedHash, output_type: OutputType, output_hash: FixedHash, ) -> Result<(), ChainStorageError> { - let key = ContractIndexKey::new(contract_id, output_type); + let contract_key = ContractIndexKey::new(contract_id, output_type); + let block_key = BlockContractIndexKey::new(block_hash, output_type, contract_id); match output_type { OutputType::ContractDefinition => { debug!( target: LOG_TARGET, "inserting index for new contract_id {} in output {}.", contract_id, output_hash ); - self.insert(&key, &output_hash)?; - + self.insert(&contract_key, &ContractIndexValue { + block_hash, + output_hash, + })?; + self.insert(&block_key, &output_hash)?; Ok(()) }, // Only one contract checkpoint and constitution can exist at a time and can be overwritten. Consensus rules // decide whether this is valid but we just assume this is valid here. OutputType::ContractConstitution | OutputType::ContractCheckpoint => { self.assert_definition_exists(contract_id)?; - self.set(&key, &*output_hash)?; + self.set(&contract_key, &ContractIndexValue { + block_hash, + output_hash, + })?; + self.set(&block_key, &output_hash)?; Ok(()) }, // These are collections of output hashes @@ -177,16 +242,11 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { OutputType::ContractConstitutionChangeAcceptance | OutputType::ContractAmendment => { self.assert_definition_exists(contract_id)?; - let mut hashes = self.find::(&key)?.unwrap_or_default(); - - if !hashes.insert(output_hash) { - return Err(ChainStorageError::InvalidOperation(format!( - "{} UTXO for contract {} with hash {} has already been added to index", - output_type, contract_id, output_hash - ))); - } - - self.set(&key, &hashes)?; + self.add_to_set(&contract_key, ContractIndexValue { + block_hash, + output_hash, + })?; + self.add_to_set(&block_key, output_hash)?; Ok(()) }, _ => Err(ChainStorageError::InvalidOperation(format!( @@ -202,11 +262,11 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { output_type: OutputType, output_hash: FixedHash, ) -> Result<(), ChainStorageError> { - let key = ContractIndexKey::new(contract_id, output_type); + let contract_key = ContractIndexKey::new(contract_id, output_type); match output_type { OutputType::ContractDefinition => { - if self.has_dependent_outputs(&key)? { + if self.has_dependent_outputs(&contract_key)? { return Err(ChainStorageError::UnspendableDueToDependentUtxos { details: format!( "Cannot deregister contract definition for contract {} because there are dependent outputs", @@ -215,26 +275,21 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { }); } - self.delete(&key)?; + let contract = self.get_and_delete::<_, ContractIndexValue>(&contract_key)?; + let block_key = BlockContractIndexKey::new(contract.block_hash, output_type, contract_id); + self.delete(&block_key)?; Ok(()) }, OutputType::ContractConstitution | OutputType::ContractCheckpoint => { - self.delete(&key)?; + let contract = self.get_and_delete::<_, ContractIndexValue>(&contract_key)?; + let block_key = BlockContractIndexKey::new(contract.block_hash, output_type, contract_id); + self.delete(&block_key)?; Ok(()) }, OutputType::ContractValidatorAcceptance | OutputType::ContractConstitutionProposal => { - let mut hash_set = self.find::(&key)?.unwrap_or_default(); - if !hash_set.remove(&output_hash) { - return Err(ChainStorageError::InvalidOperation(format!( - "Output {} was not found in {} UTXO set for contract_id {}", - output_hash, output_type, contract_id - ))); - } - if hash_set.is_empty() { - self.delete(&key)?; - } else { - self.set(&key, &hash_set)?; - } + let contract = self.remove_from_contract_index(&contract_key, &output_hash)?; + let block_key = BlockContractIndexKey::new(contract.block_hash, output_type, contract_id); + self.remove_from_set(&block_key, &output_hash)?; Ok(()) }, _ => Err(ChainStorageError::InvalidOperation(format!( @@ -244,6 +299,68 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { } } + fn add_to_set( + &self, + key: &K, + value: V, + ) -> Result<(), ChainStorageError> { + let mut hash_set = self.get::<_, DefaultHashSet>(key)?.unwrap_or_default(); + if !hash_set.insert(value) { + return Err(ChainStorageError::InvalidOperation(format!( + "UTXO with has already been added to contract index at key {}", + to_hex(key.as_lmdb_bytes()) + ))); + } + + self.set(key, &hash_set)?; + Ok(()) + } + + fn remove_from_contract_index( + &self, + key: &ContractIndexKey, + output_hash: &FixedHash, + ) -> Result { + let mut hash_set = self.get::<_, ContractValueHashSet>(key)?.unwrap_or_default(); + let value = hash_set + .iter() + .find(|v| v.output_hash == *output_hash) + .cloned() + .ok_or_else(|| { + ChainStorageError::InvalidOperation(format!( + "Contract output was not found in UTXO set with key {}", + to_hex(key.as_lmdb_bytes()) + )) + })?; + hash_set.remove(&value); + if hash_set.is_empty() { + self.delete(key)?; + } else { + self.set(key, &hash_set)?; + } + Ok(value) + } + + fn remove_from_set( + &self, + key: &K, + value: &V, + ) -> Result<(), ChainStorageError> { + let mut hash_set = self.get::<_, DefaultHashSet>(key)?.unwrap_or_default(); + if !hash_set.remove(value) { + return Err(ChainStorageError::InvalidOperation(format!( + "Contract output was not found in UTXO set with key {}", + to_hex(key.as_lmdb_bytes()) + ))); + } + if hash_set.is_empty() { + self.delete(key)?; + } else { + self.set(key, &hash_set)?; + } + Ok(()) + } + fn has_dependent_outputs(&self, key: &ContractIndexKey) -> Result { let constitution_key = key.to_key_with_output_type(OutputType::ContractConstitution); if self.exists(&constitution_key)? { @@ -268,72 +385,119 @@ impl<'a> ContractIndex<'a, WriteTransaction<'a>> { } } - fn insert(&self, key: &ContractIndexKey, value: &V) -> Result<(), ChainStorageError> { + fn insert( + &self, + key: &K, + value: &V, + ) -> Result<(), ChainStorageError> { lmdb_insert(self.txn, self.db, key, value, "contract_index") } - fn set(&self, key: &ContractIndexKey, value: &V) -> Result<(), ChainStorageError> { + fn set(&self, key: &K, value: &V) -> Result<(), ChainStorageError> { lmdb_replace(self.txn, self.db, key, value) } - fn delete(&self, key: &ContractIndexKey) -> Result<(), ChainStorageError> { + fn delete(&self, key: &K) -> Result<(), ChainStorageError> { lmdb_delete(self.txn, self.db, key, "contract_index") } + + fn get_and_delete(&self, key: &K) -> Result { + let value = self.get(key)?.ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "contract_index", + field: "", + value: to_hex(key.as_lmdb_bytes()), + })?; + self.delete(key)?; + Ok(value) + } } -/// A hash set using the DefaultHasher. Since output hashes are not user controlled and uniformly random there is no -/// need to use RandomState hasher. -type FixedHashSet = HashSet>; +#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] +struct ContractIndexValue { + pub block_hash: FixedHash, + pub output_hash: FixedHash, +} -/// A 33-byte contract ID index key. -/// -/// The first 32-bytes are the contract ID, the next byte is the `OutputType`. #[derive(Debug, Clone, Copy)] -pub(self) struct ContractIndexKey { - bytes: [u8; Self::FULL_KEY_LEN], +#[repr(u8)] +enum KeyType { + PerContract = 0, + PerBlock = 1, +} + +/// An index key constisting of {block_hash, output_type, contract_id}. +#[derive(Debug, Clone, Copy)] +struct BlockContractIndexKey { + key: CompositeKey<{ Self::KEY_LEN }>, +} + +impl BlockContractIndexKey { + const KEY_LEN: usize = 1 + 32 + 1 + 32; + + pub fn new(block_hash: FixedHash, output_type: OutputType, contract_id: FixedHash) -> Self { + let mut key = Self::prefixed(block_hash, output_type); + assert!(key.key.push(&contract_id)); + key + } + + pub fn prefixed(block_hash: FixedHash, output_type: OutputType) -> Self { + let mut key = CompositeKey::new(); + assert!(key.push(&[KeyType::PerBlock as u8])); + assert!(key.push(&block_hash)); + assert!(key.push(&[output_type.as_byte()])); + Self { key } + } +} + +impl Deref for BlockContractIndexKey { + type Target = CompositeKey<{ Self::KEY_LEN }>; + + fn deref(&self) -> &Self::Target { + &self.key + } +} + +impl AsLmdbBytes for BlockContractIndexKey { + fn as_lmdb_bytes(&self) -> &[u8] { + &self.key + } +} + +/// An index key constisting of {contract_id, output_type}. +#[derive(Debug, Clone, Copy)] +struct ContractIndexKey { + key: CompositeKey<{ Self::KEY_LEN }>, } impl ContractIndexKey { - const FULL_KEY_LEN: usize = FixedHash::byte_size() + 1; + const KEY_LEN: usize = 1 + 32 + 1; pub fn new(contract_id: FixedHash, output_type: OutputType) -> Self { - Self { - bytes: Self::bytes_from_parts(contract_id, output_type), - } + let mut key = CompositeKey::new(); + assert!(key.push(&[KeyType::PerContract as u8])); + assert!(key.push(&*contract_id)); + assert!(key.push(&[output_type.as_byte()])); + Self { key } } pub fn to_key_with_output_type(self, output_type: OutputType) -> Self { let mut key = self; - key.bytes[FixedHash::byte_size()] = output_type.as_byte(); + key.key[FixedHash::byte_size() + 1] = output_type.as_byte(); key } +} - pub fn as_bytes(&self) -> &[u8] { - &self.bytes[..] - } +impl Deref for ContractIndexKey { + type Target = CompositeKey<{ Self::KEY_LEN }>; - fn bytes_from_parts(contract_id: FixedHash, output_type: OutputType) -> [u8; Self::FULL_KEY_LEN] { - let mut buf = Self::new_buf(); - buf[..FixedHash::byte_size()].copy_from_slice(&*contract_id); - buf[FixedHash::byte_size()] = output_type.as_byte(); - buf - } - - /// Returns a fixed 0-filled byte array. - const fn new_buf() -> [u8; Self::FULL_KEY_LEN] { - [0x0u8; Self::FULL_KEY_LEN] + fn deref(&self) -> &Self::Target { + &self.key } } impl AsLmdbBytes for ContractIndexKey { fn as_lmdb_bytes(&self) -> &[u8] { - self.as_bytes() - } -} - -impl Display for ContractIndexKey { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", to_hex(self.as_bytes())) + &self.key } } @@ -344,16 +508,16 @@ mod tests { use super::*; mod contract_index_key { - use super::*; #[test] fn it_represents_a_well_formed_contract_index_key() { let hash = HashDigest::new().chain(b"foobar").finalize().into(); let key = ContractIndexKey::new(hash, OutputType::ContractCheckpoint); - assert_eq!(key.as_lmdb_bytes()[..32], *hash.as_slice()); + assert_eq!(key.as_lmdb_bytes()[0], KeyType::PerContract as u8); + assert_eq!(key.as_lmdb_bytes()[1..33], *hash.as_slice()); assert_eq!( - OutputType::from_byte(key.as_lmdb_bytes()[32]).unwrap(), + OutputType::from_byte(key.as_lmdb_bytes()[33]).unwrap(), OutputType::ContractCheckpoint ); } diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 9958b920f4..ff0cb549ec 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -196,28 +196,52 @@ pub struct LMDBDatabase { env: Arc, env_config: LMDBConfig, metadata_db: DatabaseRef, + /// Maps height -> BlockHeader headers_db: DatabaseRef, + /// Maps height -> BlockHeaderAccumulatedData header_accumulated_data_db: DatabaseRef, + /// Maps height -> BlockAccumulatedData block_accumulated_data_db: DatabaseRef, + /// Maps block_hash -> height block_hashes_db: DatabaseRef, + /// Maps OutputKey -> TransactionOutputRowData utxos_db: DatabaseRef, + /// Maps InputKey -> TransactionInputRowData inputs_db: DatabaseRef, + /// Maps OutputHash -> txos_hash_to_index_db: DatabaseRef, + /// Maps KernelKey -> TransactionKernelRowData kernels_db: DatabaseRef, + /// Maps excess -> kernel_excess_index: DatabaseRef, + /// Maps excess_sig -> kernel_excess_sig_index: DatabaseRef, + /// Maps kernel_mmr_size -> height kernel_mmr_size_index: DatabaseRef, + /// Maps output_mmr_size -> height output_mmr_size_index: DatabaseRef, + /// Maps commitment -> output_hash utxo_commitment_index: DatabaseRef, + /// Maps unique_id -> output_hash unique_id_index: DatabaseRef, + /// Maps -> (block_hash, output_hash) + /// and -> output_hash contract_index: DatabaseRef, + /// Maps output_mmr_pos -> deleted_txo_mmr_position_to_height_index: DatabaseRef, + /// Maps block_hash -> Block orphans_db: DatabaseRef, + /// Maps randomx_seed -> height monero_seed_height_db: DatabaseRef, + /// Maps block_hash -> BlockHeaderAccumulatedData orphan_header_accumulated_data_db: DatabaseRef, + /// Stores the orphan tip block hashes orphan_chain_tips_db: DatabaseRef, + /// Maps parent_block_hash -> block_hash orphan_parent_map_index: DatabaseRef, + /// Stores bad blocks by block_hash and height bad_blocks: DatabaseRef, + /// Stores reorgs by epochtime and Reorg reorgs: DatabaseRef, _file_lock: Arc, } @@ -550,7 +574,7 @@ impl LMDBDatabase { } if output.features.contract_id().is_some() { - self.get_contract_index(txn).add_output(output)?; + self.get_contract_index(txn).add_output(header_hash, output)?; } lmdb_insert( @@ -986,11 +1010,11 @@ impl LMDBDatabase { &self, txn: &WriteTransaction<'_>, height: u64, - hash: &[u8], + block_hash: &[u8], ) -> Result<(), ChainStorageError> { - let output_rows = lmdb_delete_keys_starting_with::(txn, &self.utxos_db, hash)?; + let output_rows = lmdb_delete_keys_starting_with::(txn, &self.utxos_db, block_hash)?; debug!(target: LOG_TARGET, "Deleted {} outputs...", output_rows.len()); - let inputs = lmdb_delete_keys_starting_with::(txn, &self.inputs_db, hash)?; + let inputs = lmdb_delete_keys_starting_with::(txn, &self.inputs_db, block_hash)?; debug!(target: LOG_TARGET, "Deleted {} input(s)...", inputs.len()); for utxo in &output_rows { @@ -1087,7 +1111,7 @@ impl LMDBDatabase { } if input.features()?.is_sidechain_contract() { - self.get_contract_index(txn).rewind_input(&input)?; + self.get_contract_index(txn).rewind_input(block_hash, &input)?; } } Ok(()) @@ -2046,29 +2070,32 @@ impl BlockchainBackend for LMDBDatabase { Ok(result) } - fn fetch_all_constitutions( + fn fetch_contract_outputs_for_block( &self, - dan_node_public_key: &PublicKey, - ) -> Result, ChainStorageError> { + block_hash: &BlockHash, + output_type: OutputType, + ) -> Result, ChainStorageError> { let txn = self.read_transaction()?; - lmdb_filter_map_values(&txn, &self.utxos_db, |output: TransactionOutputRowData| { - match output.output { - None => None, - Some(output) => match output.features.sidechain_features.clone() { - None => None, - Some(sidechain_features) => match sidechain_features.constitution { - None => None, - Some(constitution) => { - if constitution.validator_committee.members().contains(dan_node_public_key) { - Some(output) - } else { - None - } - }, - }, - }, - } - }) + let block_hash = + FixedHash::try_from(block_hash.as_slice()).map_err(|e| ChainStorageError::InvalidQuery(e.to_string()))?; + if !output_type.is_contract_utxo() { + return Err(ChainStorageError::InvalidQuery(format!( + "Cannot query for OutputType {} using fetch_contract_outputs_for_block", + output_type + ))); + } + let output_hashes = self.get_contract_index(&txn).find_by_block(block_hash, output_type)?; + + output_hashes + .into_iter() + .map(|hash| { + self.fetch_output_in_txn(&txn, &*hash)? + .ok_or_else(|| ChainStorageError::DataInconsistencyDetected { + function: "fetch_contract_outputs_for_block", + details: format!("Output with hash {} exists in contract_index but not in utxo_db", hash), + }) + }) + .collect() } fn fetch_outputs_in_block(&self, header_hash: &HashOutput) -> Result, ChainStorageError> { @@ -2414,7 +2441,9 @@ impl BlockchainBackend for LMDBDatabase { output_type: OutputType, ) -> Result, ChainStorageError> { let txn = self.read_transaction()?; - let output_hashes = self.get_contract_index(&txn).fetch(contract_id, output_type)?; + let output_hashes = self + .get_contract_index(&txn) + .find_by_contract_id(contract_id, output_type)?; output_hashes .into_iter() .map(|output_hash| { diff --git a/base_layer/core/src/chain_storage/lmdb_db/mod.rs b/base_layer/core/src/chain_storage/lmdb_db/mod.rs index db29bfc3af..b349466398 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/mod.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/mod.rs @@ -26,6 +26,7 @@ use tari_common_types::types::HashOutput; use crate::transactions::transaction_components::{TransactionInput, TransactionKernel, TransactionOutput}; +mod composite_key; mod contract_index; pub(crate) mod helpers; pub(crate) mod key_prefix_cursor; diff --git a/base_layer/core/src/chain_storage/tests/blockchain_database.rs b/base_layer/core/src/chain_storage/tests/blockchain_database.rs index 1309378505..78b08dd93b 100644 --- a/base_layer/core/src/chain_storage/tests/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/tests/blockchain_database.rs @@ -899,7 +899,7 @@ mod with_contract_utxos { fn it_allows_spend_of_contract_definition_without_dependent_utxos() { let contract_id = FixedHash::zero(); let mut blockchain = helpers::create_blockchain_without_validation(); - let (_, coinbase_a) = blockchain.append_to_tip(block_spec!("A", parent: "GB")).unwrap(); + let (_, coinbase_a) = blockchain.append_to_tip(block_spec!("A->GB")).unwrap(); // Spend coinbase_a to new contract definition let (contract_definition, outputs) = helpers::create_contract_definition_transaction(vec![coinbase_a], vec![2 * T], contract_id); @@ -948,6 +948,76 @@ mod with_contract_utxos { } } + mod fetch_contract_outputs_for_block { + use super::*; + + #[test] + fn it_returns_none_if_block_does_not_exist() { + let blockchain = TestBlockchain::default(); + let utxo = blockchain + .db() + .fetch_contract_outputs_for_block(vec![0u8; 32], OutputType::ContractDefinition) + .unwrap(); + assert!(utxo.is_empty()); + } + + #[test] + fn it_returns_none_if_contract_does_not_exist() { + let mut blockchain = helpers::create_blockchain_without_validation(); + let (block_a, coinbase_a) = blockchain.append_to_tip(block_spec!("A->GB")).unwrap(); + // Spend coinbase_a to new contract definition + let (contract_definition, _) = + helpers::create_contract_definition_transaction(vec![coinbase_a], vec![2 * T], [0u8; 32].into()); + + let _block = blockchain + .append_to_tip(block_spec!("B", transactions: vec![contract_definition])) + .unwrap(); + let utxo = blockchain + .db() + .fetch_contract_outputs_for_block(block_a.hash().clone(), OutputType::ContractDefinition) + .unwrap(); + assert!(utxo.is_empty()); + } + + #[test] + fn it_finds_contract_utxos_by_block_hash_and_type() { + let contract_id = FixedHash::from([1u8; 32]); + let mut blockchain = helpers::create_blockchain_without_validation(); + let (_, coinbase_a) = blockchain.append_to_tip(block_spec!("A->GB")).unwrap(); + // Spend coinbase_a to new contract definition + let (contract_definition, outputs) = + helpers::create_contract_definition_transaction(vec![coinbase_a], vec![2 * T], contract_id); + + let (block, _) = blockchain + .append_to_tip(block_spec!("B", transactions: vec![contract_definition])) + .unwrap(); + let (contract_definition, change) = outputs + .into_iter() + .partition::, _>(|output| output.features.is_sidechain_contract()); + let contract_def_hash = contract_definition[0].hash(&CryptoFactories::default()); + let utxo = blockchain + .db() + .fetch_contract_outputs_for_block(block.hash().clone(), OutputType::ContractDefinition) + .unwrap(); + assert_eq!(utxo[0].output.hash(), contract_def_hash); + + let (constitution, outputs) = helpers::create_contract_constitution_transaction(change, contract_id); + let (block, _) = blockchain + .append_to_tip(block_spec!("C", transactions: vec![constitution])) + .unwrap(); + let contract_const_hash = outputs + .into_iter() + .find(|o| o.features.is_sidechain_contract()) + .map(|o| o.hash(&CryptoFactories::default())) + .unwrap(); + let utxos = blockchain + .db() + .fetch_contract_outputs_for_block(block.hash().clone(), OutputType::ContractConstitution) + .unwrap(); + assert_eq!(utxos[0].output.hash(), contract_const_hash); + } + } + mod fetch_contract_outputs_by_contract_id_and_type { use super::*; @@ -962,7 +1032,7 @@ mod with_contract_utxos { } #[test] - fn it_errors_on_spend_of_contract_definition_with_dependent_utxos() { + fn it_finds_contract_utxos_by_contract_id_and_type() { let contract_id = FixedHash::zero(); let mut blockchain = helpers::create_blockchain_without_validation(); let (_, coinbase_a) = blockchain.append_to_tip(block_spec!("A", parent: "GB")).unwrap(); diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index 4f70f3365f..8366c8383e 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -32,7 +32,7 @@ use croaring::Bitmap; use tari_common::configuration::Network; use tari_common_types::{ chain_metadata::ChainMetadata, - types::{Commitment, FixedHash, HashOutput, PublicKey, Signature}, + types::{BlockHash, Commitment, FixedHash, HashOutput, PublicKey, Signature}, }; use tari_storage::lmdb_store::LMDBConfig; use tari_test_utils::paths::create_temporary_data_path; @@ -74,7 +74,7 @@ use crate::{ proof_of_work::{AchievedTargetDifficulty, Difficulty, PowAlgorithm}, test_helpers::{block_spec::BlockSpecs, create_consensus_rules, BlockSpec}, transactions::{ - transaction_components::{OutputType, TransactionInput, TransactionKernel, TransactionOutput, UnblindedOutput}, + transaction_components::{OutputType, TransactionInput, TransactionKernel, UnblindedOutput}, CryptoFactories, }, validation::{ @@ -326,11 +326,15 @@ impl BlockchainBackend for TempDatabase { .fetch_all_unspent_by_parent_public_key(parent_public_key, range) } - fn fetch_all_constitutions( + fn fetch_contract_outputs_for_block( &self, - dan_node_public_key: &PublicKey, - ) -> Result, ChainStorageError> { - self.db.as_ref().unwrap().fetch_all_constitutions(dan_node_public_key) + block_hash: &BlockHash, + output_type: OutputType, + ) -> Result, ChainStorageError> { + self.db + .as_ref() + .unwrap() + .fetch_contract_outputs_for_block(block_hash, output_type) } fn fetch_outputs_in_block(&self, header_hash: &HashOutput) -> Result, ChainStorageError> { diff --git a/base_layer/core/src/transactions/transaction_components/output_type.rs b/base_layer/core/src/transactions/transaction_components/output_type.rs index 86351ab875..7e29a0b0c4 100644 --- a/base_layer/core/src/transactions/transaction_components/output_type.rs +++ b/base_layer/core/src/transactions/transaction_components/output_type.rs @@ -79,6 +79,20 @@ impl OutputType { pub fn from_byte(value: u8) -> Option { FromPrimitive::from_u8(value) } + + pub fn is_contract_utxo(self) -> bool { + #[allow(clippy::enum_glob_use)] + use OutputType::*; + matches!( + self, + ContractDefinition | + ContractConstitution | + ContractValidatorAcceptance | + ContractCheckpoint | + ContractConstitutionProposal | + ContractConstitutionChangeAcceptance + ) + } } impl Default for OutputType { diff --git a/base_layer/core/src/transactions/transaction_components/side_chain/committee_members.rs b/base_layer/core/src/transactions/transaction_components/side_chain/committee_members.rs index af6b453885..29075e4bd4 100644 --- a/base_layer/core/src/transactions/transaction_components/side_chain/committee_members.rs +++ b/base_layer/core/src/transactions/transaction_components/side_chain/committee_members.rs @@ -48,6 +48,10 @@ impl CommitteeMembers { pub fn members(&self) -> &[PublicKey] { &self.members } + + pub fn contains(&self, x: &PublicKey) -> bool { + self.members.contains(x) + } } impl TryFrom> for CommitteeMembers { diff --git a/integration_tests/features/ValidatorNode.feature b/integration_tests/features/ValidatorNode.feature index b1b7fcf008..3407c5a544 100644 --- a/integration_tests/features/ValidatorNode.feature +++ b/integration_tests/features/ValidatorNode.feature @@ -21,7 +21,9 @@ Feature: Validator Node And I create 40 NFTs And I mine 3 blocks - @dan @critical + # Broken: needs a contract definition before publishing acceptance, however this is currently not easily done because + # GRPC methods need to be added and you cannot use the cli for a wallet while that wallet is already running + @dan @critical @broken Scenario: Publish contract acceptance Given I have a seed node NODE1 And I have wallet WALLET1 connected to all seed nodes diff --git a/integration_tests/helpers/walletProcess.js b/integration_tests/helpers/walletProcess.js index b916a3223b..906aae0072 100644 --- a/integration_tests/helpers/walletProcess.js +++ b/integration_tests/helpers/walletProcess.js @@ -291,8 +291,11 @@ class WalletProcess { const overrides = this.getOverrides(); Object.keys(overrides).forEach((k) => { - args.push("-p"); - args.push(`${k}=${overrides[k]}`); + let v = overrides[k]; + if (typeof v !== "undefined") { + args.push("-p"); + args.push(`${k}=${v}`); + } }); // Append command arguments