Skip to content

Commit

Permalink
feat(base-node): improve contract utxo scanning (#4208)
Browse files Browse the repository at this point in the history
Description
---
```
 feat(blockchain-db):      add {block_hash, output_type, contract_id} index to contract_index
 docs(chain-storage):      document mapping for each lmdb database
 feat(chain-storage):      replace fetch_all_constitutions with fetch_contract_outputs_for_block
 test(chain-storage):      add unit tests for fetch_contract_outputs_for_block
 feat(base-node-service):  replace get_constitutions with get_contract_outputs_for_block
 feat(base-node-grpc):     use get_contract_outputs_for_block to fetch constitutions
 test(cucumber):           mark `Publish contract acceptance` as broken
 test(cucumber):           fix command to publish contract utxos
```

Motivation and Context
---
`get_all_constitutions` is currently very inefficient and has to load the entire UTXO set. This PR adds a `<block_hash, output_type, contract_id>` index to contract_index db that allows efficient loading of contract outputs optionally filtered by type contained in a block. 

The get_constitutions grpc method now takes in a start_block_hash and streams utxos starting from that hash. 

How Has This Been Tested?
---
Additional unit tests
Manually, sync on igor



* feat(blockchain-db): add {block_hash, output_type, output_hash} index to contract_index

* docs(chain_storage/lmdb-db): document mapping for each lmdb database

* feat(chain-storage): replace fetch_all_constitutions with fetch_contract_outputs_for_block

* test(chain-storage): add unit tests for fetch_contract_outputs_for_block

* feat(base-node-service): replace get_constitutions with get_contract_outputs_for_block

* feat(base-node-grpc): use get_contract_outputs_for_block to fetch constitutions

* test(cucumber): mark `Publish contract acceptance` as broken

* test(cucumber):           fix command to publish contract utxos

* fix(chain-storage): fetch_chain_headers should return empty vec if query is out of range
  • Loading branch information
sdbondi authored Jun 20, 2022
1 parent 7317d6b commit 0fcde31
Show file tree
Hide file tree
Showing 21 changed files with 694 additions and 204 deletions.
12 changes: 10 additions & 2 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ service BaseNode {
rpc GetAssetMetadata(GetAssetMetadataRequest) returns (GetAssetMetadataResponse);

// Get all constitutions where the public key is in the committee
rpc GetConstitutions(GetConstitutionsRequest) returns (stream TransactionOutput);
rpc GetConstitutions(GetConstitutionsRequest) returns (stream GetConstitutionsResponse);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -446,5 +446,13 @@ message MempoolStatsResponse {
}

message GetConstitutionsRequest {
bytes dan_node_public_key = 1;
bytes start_block_hash = 1;
bytes dan_node_public_key = 2;
}

message GetConstitutionsResponse {
TransactionOutput output = 1;
uint32 mmr_position = 2;
uint64 mined_height = 3;
bytes header_hash = 4;
}
150 changes: 108 additions & 42 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tari_app_grpc::{
tari_rpc::{CalcType, Sorting},
};
use tari_app_utilities::consts;
use tari_common_types::types::{Commitment, PublicKey, Signature};
use tari_common_types::types::{Commitment, FixedHash, PublicKey, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
Expand All @@ -48,7 +48,10 @@ use tari_core::{
iterators::NonOverlappingIntegerPairIter,
mempool::{service::LocalMempoolService, TxStorageResponse},
proof_of_work::PowAlgorithm,
transactions::{aggregated_body::AggregateBody, transaction_components::Transaction},
transactions::{
aggregated_body::AggregateBody,
transaction_components::{OutputType, Transaction},
},
};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray, Hashable};
Expand Down Expand Up @@ -134,7 +137,7 @@ pub async fn get_heights(
impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;
type GetBlocksStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type GetConstitutionsStream = mpsc::Receiver<Result<tari_rpc::TransactionOutput, Status>>;
type GetConstitutionsStream = mpsc::Receiver<Result<tari_rpc::GetConstitutionsResponse, Status>>;
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
Expand Down Expand Up @@ -1827,64 +1830,127 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Ok(Response::new(response))
}

#[allow(clippy::too_many_lines)]
async fn get_constitutions(
&self,
request: Request<tari_rpc::GetConstitutionsRequest>,
) -> Result<Response<Self::GetConstitutionsStream>, Status> {
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key).map_err(|err| {
report_error(
report_error_flag,
Status::invalid_argument(format!("Dan node public key is not a valid public key:{}", err)),
)
})?;
let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key)
.map_err(|_| Status::invalid_argument("Dan node public key is not a valid public key"))?;

let mut handler = self.node_service.clone();
let start_hash = Some(request.start_block_hash)
.filter(|h| !h.is_empty())
.map(FixedHash::try_from)
.transpose()
.map_err(|_| Status::invalid_argument("Block hash has an invalid length"))?;

let mut node_service = self.node_service.clone();
// Check the start_hash is correct, or if not provided, start from genesis
let start_header = match start_hash {
Some(hash) => node_service
.get_header_by_hash(hash.to_vec())
.await
.map_err(|e| report_error(report_error_flag, Status::internal(e.to_string())))?
.ok_or_else(|| Status::not_found(format!("No block found with hash {}", hash)))?,
None => node_service
.get_header(0)
.await
.map_err(|e| report_error(report_error_flag, Status::internal(e.to_string())))?
.ok_or_else(|| Status::internal("Node does not have a genesis block!?"))?,
};
// The number of headers to load at once to query for UTXOs
const BATCH_SIZE: u64 = 50;
let (mut sender, receiver) = mpsc::channel(50);
task::spawn(async move {
let dan_node_public_key_hex = dan_node_public_key.to_hex();
debug!(
target: LOG_TARGET,
"Starting thread to process GetConstitutions: dan_node_public_key: {}", dan_node_public_key_hex,
);
let constitutions = match handler.get_constitutions(dan_node_public_key).await {
Ok(constitutions) => constitutions,
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {:?}", err,);
let _get_token_response =
sender.send(Err(report_error(report_error_flag, Status::internal("Internal error"))));
return;
},
};
let mut current_height = start_header.height();

debug!(
target: LOG_TARGET,
"Found {} constitutions for {}",
constitutions.len(),
dan_node_public_key_hex
);

for constitution in constitutions {
match sender.send(Ok(constitution.into())).await {
Ok(_) => (),
loop {
let headers = match node_service
.get_headers(current_height..=current_height + BATCH_SIZE)
.await
{
Ok(h) => h,
Err(err) => {
warn!(target: LOG_TARGET, "Error sending constitution via GRPC: {}", err);
match sender
.send(Err(report_error(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
error!(target: LOG_TARGET, "Error fetching headers: {}", err);
let _err = sender
.send(Err(report_error(report_error_flag, Status::internal(err.to_string()))))
.await;
return;
},
};

if headers.is_empty() {
break;
}
let num_headers = headers.len();

for header in headers {
let block_hash_hex = header.hash().to_hex();
match node_service
.get_contract_outputs_for_block(header.hash().clone(), OutputType::ContractConstitution)
.await
{
Ok(constitutions) => {
debug!(
target: LOG_TARGET,
"Found {} constitutions in block {}",
constitutions.len(),
block_hash_hex
);

let constitutions = constitutions.into_iter().filter(|utxo| {
// Filter for constitutions containing the dan_node_public_key
utxo.output
.as_transaction_output()
.and_then(|output| output.features.sidechain_features.as_ref())
.and_then(|sidechain| sidechain.constitution.as_ref())
.filter(|constitution| {
constitution.validator_committee.contains(&dan_node_public_key)
})
.is_some()
});

for utxo in constitutions {
if sender
.send(Ok(tari_rpc::GetConstitutionsResponse {
header_hash: utxo.header_hash,
mined_height: utxo.mined_height,
mmr_position: utxo.mmr_position,
output: utxo.output.into_unpruned_output().map(Into::into),
}))
.await
.is_err()
{
warn!(
target: LOG_TARGET,
"Client disconnected while sending constitution via GRPC"
);
break;
}
}
},
Err(err) => {
warn!(target: LOG_TARGET, "Error fetching contract outputs for block: {}", err);
let _err = sender
.send(Err(report_error(report_error_flag, Status::internal("Internal error"))))
.await;
return;
},
}
}

if num_headers < BATCH_SIZE as usize {
break;
}

current_height += BATCH_SIZE + 1;
}
});
Ok(Response::new(receiver))
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl DanNode {
for output in outputs {
if let Some(sidechain_features) = output.features.sidechain_features {
let contract_id = sidechain_features.contract_id;
// TODO: expect will crash the validator node if the base node misbehaves
let constitution = sidechain_features.constitution.expect("Constitution wasn't present");

if constitution.acceptance_requirements.acceptance_period_expiry < last_tip {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,16 @@ impl BaseNodeClient for GrpcBaseNodeClient {
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
let inner = self.connection().await?;
let request = grpc::GetConstitutionsRequest {
// TODO: pass in the last block hash that was scanned
start_block_hash: vec![],
dan_node_public_key: dan_node_public_key.as_bytes().to_vec(),
};
let mut result = inner.get_constitutions(request).await?.into_inner();
let mut outputs = vec![];
while let Some(output) = result.message().await? {
while let Some(mined_info) = result.message().await? {
let output = mined_info
.output
.ok_or_else(|| DigitalAssetError::InvalidPeerMessage("Mined info contained no output".to_string()))?;
outputs.push(output.try_into().map_err(DigitalAssetError::ConversionError)?);
}
Ok(outputs)
Expand Down
14 changes: 10 additions & 4 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ use serde::{Deserialize, Serialize};
use tari_common_types::types::{Commitment, HashOutput, PrivateKey, PublicKey, Signature};
use tari_utilities::hex::Hex;

use crate::{blocks::NewBlockTemplate, chain_storage::MmrTree, proof_of_work::PowAlgorithm};
use crate::{
blocks::NewBlockTemplate,
chain_storage::MmrTree,
proof_of_work::PowAlgorithm,
transactions::transaction_components::OutputType,
};

/// A container for the parameters required for a FetchMmrState request.
#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -70,8 +75,9 @@ pub enum NodeCommsRequest {
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
FetchConstitutions {
dan_node_public_key: PublicKey,
FetchContractOutputsForBlock {
block_hash: HashOutput,
output_type: OutputType,
},
}

Expand Down Expand Up @@ -122,7 +128,7 @@ impl Display for NodeCommsRequest {
FetchMempoolTransactionsByExcessSigs { .. } => {
write!(f, "FetchMempoolTransactionsByExcessSigs")
},
FetchConstitutions { .. } => {
FetchContractOutputsForBlock { .. } => {
write!(f, "FetchConstitutions")
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub enum NodeCommsResponse {
output: Box<Option<UtxoMinedInfo>>,
},
FetchMempoolTransactionsByExcessSigsResponse(FetchMempoolTransactionsResponse),
FetchConstitutionsResponse {
outputs: Vec<TransactionOutput>,
FetchOutputsForBlockResponse {
outputs: Vec<UtxoMinedInfo>,
},
}

Expand Down Expand Up @@ -106,7 +106,7 @@ impl Display for NodeCommsResponse {
resp.transactions.len(),
resp.not_found.len()
),
FetchConstitutionsResponse { .. } => write!(f, "FetchConstitutionsResponse"),
FetchOutputsForBlockResponse { .. } => write!(f, "FetchConstitutionsResponse"),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,15 @@ where B: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::FetchTokensResponse { outputs })
},
NodeCommsRequest::FetchConstitutions { dan_node_public_key } => {
debug!(target: LOG_TARGET, "Starting fetch constitutions");
Ok(NodeCommsResponse::FetchConstitutionsResponse {
outputs: self.blockchain_db.fetch_all_constitutions(dan_node_public_key).await?,
})
},
NodeCommsRequest::FetchContractOutputsForBlock {
block_hash,
output_type,
} => Ok(NodeCommsResponse::FetchOutputsForBlockResponse {
outputs: self
.blockchain_db
.fetch_contract_outputs_for_block(block_hash, output_type)
.await?,
}),
NodeCommsRequest::FetchAssetRegistrations { range } => {
let top_level_pubkey = PublicKey::default();
#[allow(clippy::range_plus_one)]
Expand Down
16 changes: 10 additions & 6 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::UtxoMinedInfo,
proof_of_work::PowAlgorithm,
transactions::transaction_components::{TransactionKernel, TransactionOutput},
transactions::transaction_components::{OutputType, TransactionKernel, TransactionOutput},
};

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
Expand Down Expand Up @@ -319,16 +319,20 @@ impl LocalNodeCommsInterface {
}
}

pub async fn get_constitutions(
pub async fn get_contract_outputs_for_block(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, CommsInterfaceError> {
block_hash: BlockHash,
output_type: OutputType,
) -> Result<Vec<UtxoMinedInfo>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchConstitutions { dan_node_public_key })
.call(NodeCommsRequest::FetchContractOutputsForBlock {
block_hash,
output_type,
})
.await??
{
NodeCommsResponse::FetchConstitutionsResponse { outputs } => Ok(outputs),
NodeCommsResponse::FetchOutputsForBlockResponse { outputs } => Ok(outputs),
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::{
},
common::rolling_vec::RollingVec,
proof_of_work::{PowAlgorithm, TargetDifficultyWindow},
transactions::transaction_components::{TransactionKernel, TransactionOutput},
transactions::transaction_components::{OutputType, TransactionKernel, TransactionOutput},
};

const LOG_TARGET: &str = "c::bn::async_db";
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(utxo_count() -> usize, "utxo_count");

make_async_fn!(fetch_all_constitutions(dan_node_public_key: PublicKey) -> Vec<TransactionOutput>, "fetch_all_constitutions");
make_async_fn!(fetch_contract_outputs_for_block(block_hash: BlockHash, output_type: OutputType) -> Vec<UtxoMinedInfo>, "fetch_contract_outputs_for_block");

//---------------------------------- Kernel --------------------------------------------//
make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig");
Expand Down
Loading

0 comments on commit 0fcde31

Please sign in to comment.