Skip to content

Commit

Permalink
feat(core): store and fetch templates from lmdb (#4726)
Browse files Browse the repository at this point in the history
Description
---
* Created a new `lmdb` database for template registrations, with methods for inserting and fetching them.
* Base layer stores in the database all templates that appear in new blocks.
* New gRPC method `GetTemplateRegistrations` to retrieve all new templates since a specific block height

Motivation and Context
---
In [previous work](#4470) we added template registration to UTXO sidechain features.

The next step is for the base layer to store and index all template registrations that appear in blocks, as well as provide a gRPC method to retrieve them.

As the main query for templates will come from the base layer scanner in the Validator Node, the gRPC query method should allow to filter all new templates since a specific block height.

How Has This Been Tested?
---
Manually ran the base layer and perform a gRPC query via Postman.
  • Loading branch information
mrnaveira authored Sep 27, 2022
1 parent 72018f4 commit 27f77b2
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 17 deletions.
7 changes: 7 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import "types.proto";
import "transaction.proto";
import "block.proto";
import "network.proto";
import "sidechain_types.proto";

package tari.rpc;

Expand Down Expand Up @@ -92,6 +93,8 @@ service BaseNode {
rpc GetActiveValidatorNodes(GetActiveValidatorNodesRequest) returns (stream GetActiveValidatorNodesResponse);
rpc GetCommittee(GetCommitteeRequest) returns (GetCommitteeResponse);
rpc GetShardKey(GetShardKeyRequest) returns (GetShardKeyResponse);
// Get templates
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream TemplateRegistration);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -465,3 +468,7 @@ message GetShardKeyRequest {
message GetShardKeyResponse {
bytes shard_key = 1;
}

message GetTemplateRegistrationsRequest {
uint64 from_height = 1;
}
78 changes: 77 additions & 1 deletion applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::TemplateRegistration, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
Expand Down Expand Up @@ -1488,7 +1489,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
},
Ok(data) => data,
};
dbg!(&active_validator_nodes);
// dbg!(&active_validator_nodes);
for (public_key, shard_key) in active_validator_nodes {
let active_validator_node = tari_rpc::GetActiveValidatorNodesResponse {
public_key: public_key.to_vec(),
Expand Down Expand Up @@ -1525,6 +1526,81 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
);
Ok(Response::new(rx))
}

async fn get_template_registrations(
&self,
request: Request<tari_rpc::GetTemplateRegistrationsRequest>,
) -> Result<Response<Self::GetTemplateRegistrationsStream>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");

let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(1000);

task::spawn(async move {
let template_registrations = match handler.get_template_registrations(request.from_height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
return;
},
Ok(data) => data,
};

for template_registration in template_registrations {
let template_registration = match tari_rpc::TemplateRegistration::try_from(template_registration) {
Ok(t) => t,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending converting template registration for GRPC: {}", e
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal("Error converting template_registration"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
};

match tx.send(Ok(template_registration)).await {
Ok(_) => (),
Err(err) => {
warn!(
target: LOG_TARGET,
"Error sending template registration via GRPC: {}", err
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
}
}
});
debug!(
target: LOG_TARGET,
"Sending GetTemplateRegistrations response stream to client"
);
Ok(Response::new(rx))
}
}

enum BlockGroupType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum NodeCommsRequest {
FetchValidatorNodesKeys { height: u64 },
FetchCommittee { height: u64, shard: [u8; 32] },
GetShardKey { height: u64, public_key: PublicKey },
FetchTemplateRegistrations { from_height: u64 },
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -106,6 +107,9 @@ impl Display for NodeCommsRequest {
GetShardKey { height, public_key } => {
write!(f, "GetShardKey height ({}), public key ({:?})", height, public_key)
},
FetchTemplateRegistrations { from_height } => {
write!(f, "FetchTemplateRegistrations ({})", from_height)
},
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::{ActiveValidatorNode, UtxoMinedInfo},
proof_of_work::Difficulty,
transactions::transaction_components::{Transaction, TransactionKernel, TransactionOutput},
transactions::transaction_components::{
CodeTemplateRegistration,
Transaction,
TransactionKernel,
TransactionOutput,
},
};

/// API Response enum
Expand Down Expand Up @@ -74,6 +79,7 @@ pub enum NodeCommsResponse {
FetchValidatorNodesKeysResponse(Vec<(PublicKey, [u8; 32])>),
FetchCommitteeResponse(Vec<ActiveValidatorNode>),
GetShardKeyResponse([u8; 32]),
FetchTemplateRegistrationsResponse(Vec<CodeTemplateRegistration>),
}

impl Display for NodeCommsResponse {
Expand Down Expand Up @@ -115,6 +121,7 @@ impl Display for NodeCommsResponse {
FetchValidatorNodesKeysResponse(_) => write!(f, "FetchValidatorNodesKeysResponse"),
FetchCommitteeResponse(_) => write!(f, "FetchCommitteeResponse"),
GetShardKeyResponse(_) => write!(f, "GetShardKeyResponse"),
FetchTemplateRegistrationsResponse(_) => write!(f, "FetchTemplateRegistrationsResponse"),
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,18 @@ where B: BlockchainBackend + 'static
let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
},
NodeCommsRequest::FetchTemplateRegistrations { from_height } => {
let template_registrations = self
.blockchain_db
.fetch_template_registrations(from_height)
.await?
.into_iter()
.map(|tr| tr.registration_data)
.collect();
Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
template_registrations,
))
},
}
}

Expand Down
16 changes: 15 additions & 1 deletion base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::ActiveValidatorNode,
proof_of_work::PowAlgorithm,
transactions::transaction_components::{TransactionKernel, TransactionOutput},
transactions::transaction_components::{CodeTemplateRegistration, TransactionKernel, TransactionOutput},
};

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
Expand Down Expand Up @@ -312,4 +312,18 @@ impl LocalNodeCommsInterface {
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}

pub async fn get_template_registrations(
&mut self,
from_height: u64,
) -> Result<Vec<CodeTemplateRegistration>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchTemplateRegistrations { from_height })
.await??
{
NodeCommsResponse::FetchTemplateRegistrationsResponse(template_registrations) => Ok(template_registrations),
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}
}
4 changes: 3 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common_types::{
};
use tari_utilities::epoch_time::EpochTime;

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -270,6 +270,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(fetch_committee(height: u64, shard: [u8;32]) -> Vec<ActiveValidatorNode>, "fetch_committee");

make_async_fn!(get_shard_key(height:u64, public_key: PublicKey) -> [u8;32], "get_shard_key");

make_async_fn!(fetch_template_registrations(from_height: u64) -> Vec<TemplateRegistration>, "fetch_template_registrations");
}

impl<B: BlockchainBackend + 'static> From<BlockchainDatabase<B>> for AsyncBlockchainDb<B> {
Expand Down
3 changes: 2 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::{
types::{Commitment, HashOutput, PublicKey, Signature},
};

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -196,4 +196,5 @@ pub trait BlockchainBackend: Send + Sync {
fn fetch_active_validator_nodes(&self, height: u64) -> Result<Vec<(PublicKey, [u8; 32])>, ChainStorageError>;
fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError>;
fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<[u8; 32], ChainStorageError>;
fn fetch_template_registrations(&self, from_height: u64) -> Result<Vec<TemplateRegistration>, ChainStorageError>;
}
10 changes: 9 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tari_common_types::{
use tari_mmr::pruned_hashset::PrunedHashSet;
use tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray};

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -1181,6 +1181,14 @@ where B: BlockchainBackend
let db = self.db_read_access()?;
db.fetch_committee(height, shard)
}

pub fn fetch_template_registrations(
&self,
from_height: u64,
) -> Result<Vec<TemplateRegistration>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_template_registrations(from_height)
}
}

fn unexpected_result<T>(request: DbKey, response: DbValue) -> Result<T, ChainStorageError> {
Expand Down
8 changes: 7 additions & 1 deletion base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use croaring::Bitmap;
use tari_common_types::types::{BlockHash, Commitment, HashOutput};
use tari_utilities::hex::Hex;

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader, UpdateBlockAccumulatedData},
chain_storage::{error::ChainStorageError, HorizonData, Reorg},
Expand Down Expand Up @@ -362,6 +362,9 @@ pub enum WriteOperation {
InsertValidatorNode {
validator_node: ActiveValidatorNode,
},
InsertTemplateRegistration {
template_registration: TemplateRegistration,
},
}

impl fmt::Display for WriteOperation {
Expand Down Expand Up @@ -461,6 +464,9 @@ impl fmt::Display for WriteOperation {
InsertValidatorNode { validator_node } => {
write!(f, "Inserting VN {:?}", validator_node)
},
InsertTemplateRegistration { template_registration } => {
write!(f, "Inserting Template {:?}", template_registration)
},
}
}
}
Expand Down
Loading

0 comments on commit 27f77b2

Please sign in to comment.