Skip to content

Commit

Permalink
fix(core): add utxo and block into to get_remplate_registrations request
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 10, 2022
1 parent 7fffec8 commit f18aded
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 187 deletions.
16 changes: 14 additions & 2 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ service BaseNode {
rpc GetCommittee(GetCommitteeRequest) returns (GetCommitteeResponse);
rpc GetShardKey(GetShardKeyRequest) returns (GetShardKeyResponse);
// Get templates
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream TemplateRegistration);
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream GetTemplateRegistrationResponse);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -471,5 +471,17 @@ message GetShardKeyResponse {
}

message GetTemplateRegistrationsRequest {
uint64 from_height = 1;
uint64 start_height = 1;
uint64 end_height = 2;
}

message GetTemplateRegistrationResponse {
BlockInfo block_info = 1;
bytes utxo_hash = 2;
TemplateRegistration registration = 3;
}

message BlockInfo {
uint64 height = 1;
bytes hash = 2;
}
2 changes: 1 addition & 1 deletion applications/tari_app_grpc/proto/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ message Authority {
bytes proxied_by = 3;
}

message InvokeMethodRequest{
message InvokeMethodRequest {
bytes contract_id = 1;
uint32 template_id = 2;
string method = 3;
Expand Down
96 changes: 38 additions & 58 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::TemplateRegistration, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
Expand Down Expand Up @@ -1484,7 +1484,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
request: Request<tari_rpc::GetActiveValidatorNodesRequest>,
) -> Result<Response<Self::GetActiveValidatorNodesStream>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetActiveValidatorNodes");

let mut handler = self.node_service.clone();
Expand All @@ -1493,39 +1492,24 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
task::spawn(async move {
let active_validator_nodes = match handler.get_active_validator_nodes(request.height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
warn!(target: LOG_TARGET, "Base node service error: {}", err,);
return;
},
Ok(data) => data,
};
// dbg!(&active_validator_nodes);

for (public_key, shard_key) in active_validator_nodes {
let active_validator_node = tari_rpc::GetActiveValidatorNodesResponse {
public_key: public_key.to_vec(),
shard_key: shard_key.to_vec(),
};

match tx.send(Ok(active_validator_node)).await {
Ok(_) => (),
Err(err) => {
warn!(
target: LOG_TARGET,
"Error sending mempool transaction via GRPC: {}", err
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
if tx.send(Ok(active_validator_node)).await.is_err() {
debug!(
target: LOG_TARGET,
"[get_active_validator_nodes] Client has disconnected before stream completed"
);
return;
}
}
});
Expand All @@ -1547,60 +1531,56 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(1000);

let start_height = request.start_height;
let end_height = request.end_height;
if end_height < start_height {
return Err(Status::invalid_argument(
"End height must be greater than or equal to start height",
));
}

task::spawn(async move {
let template_registrations = match handler.get_template_registrations(request.from_height).await {
let template_registrations = match handler.get_template_registrations(start_height, end_height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
warn!(target: LOG_TARGET, "Base node service error: {}", err);
return;
},
Ok(data) => data,
};

for template_registration in template_registrations {
let template_registration = match tari_rpc::TemplateRegistration::try_from(template_registration) {
let registration = match template_registration.registration_data.try_into() {
Ok(t) => t,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending converting template registration for GRPC: {}", e
);
match tx
let _ignore = tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal("Error converting template_registration"),
Status::internal(format!("Error converting template_registration: {}", e)),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
.await;
return;
},
};

match tx.send(Ok(template_registration)).await {
Ok(_) => (),
Err(err) => {
warn!(
target: LOG_TARGET,
"Error sending template registration via GRPC: {}", err
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
let resp = tari_rpc::GetTemplateRegistrationResponse {
block_info: Some(tari_rpc::BlockInfo {
height: template_registration.block_height,
hash: template_registration.block_hash.to_vec(),
}),
utxo_hash: template_registration.output_hash.to_vec(),
registration: Some(registration),
};

if tx.send(Ok(resp)).await.is_err() {
debug!(
target: LOG_TARGET,
"[get_template_registrations] Client has disconnected before stream completed"
);
return;
}
}
});
Expand Down
10 changes: 7 additions & 3 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ pub enum NodeCommsRequest {
public_key: PublicKey,
},
FetchTemplateRegistrations {
from_height: u64,
start_height: u64,
end_height: u64,
},
}

Expand Down Expand Up @@ -127,8 +128,11 @@ impl Display for NodeCommsRequest {
GetShardKey { height, public_key } => {
write!(f, "GetShardKey height ({}), public key ({:?})", height, public_key)
},
FetchTemplateRegistrations { from_height } => {
write!(f, "FetchTemplateRegistrations ({})", from_height)
FetchTemplateRegistrations {
start_height: start,
end_height: end,
} => {
write!(f, "FetchTemplateRegistrations ({}..={})", start, end)
},
}
}
Expand Down
11 changes: 3 additions & 8 deletions base_layer/core/src/base_node/comms_interface/comms_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,9 @@ use tari_common_types::{

use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::{ActiveValidatorNode, UtxoMinedInfo},
chain_storage::{ActiveValidatorNode, TemplateRegistrationEntry, UtxoMinedInfo},
proof_of_work::Difficulty,
transactions::transaction_components::{
CodeTemplateRegistration,
Transaction,
TransactionKernel,
TransactionOutput,
},
transactions::transaction_components::{Transaction, TransactionKernel, TransactionOutput},
};

/// API Response enum
Expand Down Expand Up @@ -79,7 +74,7 @@ pub enum NodeCommsResponse {
FetchValidatorNodesKeysResponse(Vec<(PublicKey, [u8; 32])>),
FetchCommitteeResponse(Vec<ActiveValidatorNode>),
GetShardKeyResponse(Option<[u8; 32]>),
FetchTemplateRegistrationsResponse(Vec<CodeTemplateRegistration>),
FetchTemplateRegistrationsResponse(Vec<TemplateRegistrationEntry>),
}

impl Display for NodeCommsResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,14 @@ where B: BlockchainBackend + 'static
let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
},
NodeCommsRequest::FetchTemplateRegistrations { from_height } => {
NodeCommsRequest::FetchTemplateRegistrations {
start_height,
end_height,
} => {
let template_registrations = self
.blockchain_db
.fetch_template_registrations(from_height)
.await?
.into_iter()
.map(|tr| tr.registration_data)
.collect();
.fetch_template_registrations(start_height..=end_height)
.await?;
Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
template_registrations,
))
Expand Down
14 changes: 9 additions & 5 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use crate::{
NodeCommsResponse,
},
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::ActiveValidatorNode,
chain_storage::{ActiveValidatorNode, TemplateRegistrationEntry},
proof_of_work::PowAlgorithm,
transactions::transaction_components::{CodeTemplateRegistration, TransactionKernel, TransactionOutput},
transactions::transaction_components::{TransactionKernel, TransactionOutput},
};

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
Expand Down Expand Up @@ -327,11 +327,15 @@ impl LocalNodeCommsInterface {

pub async fn get_template_registrations(
&mut self,
from_height: u64,
) -> Result<Vec<CodeTemplateRegistration>, CommsInterfaceError> {
start_height: u64,
end_height: u64,
) -> Result<Vec<TemplateRegistrationEntry>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchTemplateRegistrations { from_height })
.call(NodeCommsRequest::FetchTemplateRegistrations {
start_height,
end_height,
})
.await??
{
NodeCommsResponse::FetchTemplateRegistrationsResponse(template_registrations) => Ok(template_registrations),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common_types::{
};
use tari_utilities::epoch_time::EpochTime;

use super::{ActiveValidatorNode, TemplateRegistration};
use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(get_shard_key(height:u64, public_key: PublicKey) -> Option<[u8;32]>, "get_shard_key");

make_async_fn!(fetch_template_registrations(from_height: u64) -> Vec<TemplateRegistration>, "fetch_template_registrations");
make_async_fn!(fetch_template_registrations<T: RangeBounds<u64>>(range: T) -> Vec<TemplateRegistrationEntry>, "fetch_template_registrations");
}

impl<B: BlockchainBackend + 'static> From<BlockchainDatabase<B>> for AsyncBlockchainDb<B> {
Expand Down
8 changes: 6 additions & 2 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::{
types::{Commitment, HashOutput, PublicKey, Signature},
};

use super::{ActiveValidatorNode, TemplateRegistration};
use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -196,5 +196,9 @@ pub trait BlockchainBackend: Send + Sync {
fn fetch_active_validator_nodes(&self, height: u64) -> Result<Vec<(PublicKey, [u8; 32])>, ChainStorageError>;
fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError>;
fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<Option<[u8; 32]>, ChainStorageError>;
fn fetch_template_registrations(&self, from_height: u64) -> Result<Vec<TemplateRegistration>, ChainStorageError>;
fn fetch_template_registrations(
&self,
start_height: u64,
end_height: u64,
) -> Result<Vec<TemplateRegistrationEntry>, ChainStorageError>;
}
16 changes: 11 additions & 5 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tari_common_types::{
use tari_mmr::pruned_hashset::PrunedHashSet;
use tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray};

use super::{ActiveValidatorNode, TemplateRegistration};
use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -1188,12 +1188,18 @@ where B: BlockchainBackend
db.fetch_committee(height, shard)
}

pub fn fetch_template_registrations(
pub fn fetch_template_registrations<T: RangeBounds<u64>>(
&self,
from_height: u64,
) -> Result<Vec<TemplateRegistration>, ChainStorageError> {
range: T,
) -> Result<Vec<TemplateRegistrationEntry>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_template_registrations(from_height)
let (start, mut end) = convert_to_option_bounds(range);
if end.is_none() {
// `(n..)` means fetch block headers until this node's tip
end = Some(db.fetch_last_header()?.height);
}
let (start, end) = (start.unwrap_or(0), end.unwrap());
db.fetch_template_registrations(start, end)
}
}

Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use croaring::Bitmap;
use tari_common_types::types::{BlockHash, Commitment, HashOutput};
use tari_utilities::hex::Hex;

use super::{ActiveValidatorNode, TemplateRegistration};
use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use crate::{
blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader, UpdateBlockAccumulatedData},
chain_storage::{error::ChainStorageError, HorizonData, Reorg},
Expand Down Expand Up @@ -363,7 +363,7 @@ pub enum WriteOperation {
validator_node: ActiveValidatorNode,
},
InsertTemplateRegistration {
template_registration: TemplateRegistration,
template_registration: TemplateRegistrationEntry,
},
}

Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub enum ChainStorageError {
UnspendableDueToDependentUtxos { details: String },
#[error("FixedHashSize Error: {0}")]
FixedHashSizeError(#[from] FixedHashSizeError),
#[error("Composite key length was exceeded (THIS SHOULD NEVER HAPPEN)")]
CompositeKeyLengthExceeded,
}

impl ChainStorageError {
Expand Down
Loading

0 comments on commit f18aded

Please sign in to comment.