Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge grpc list headers commands #4515

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ package tari.rpc;
// The gRPC interface for interacting with the base node.
service BaseNode {
// Lists headers in the current best chain
rpc ListHeaders(ListHeadersRequest) returns (stream BlockHeader);
rpc ListHeaders(ListHeadersRequest) returns (stream BlockHeaderResponse);
// Get header by hash
rpc GetHeaderByHash(GetHeaderByHashRequest) returns (BlockHeaderResponse);
// Get header by height
rpc GetHeaderByHeight(GetHeaderByHeightRequest) returns (BlockHeaderResponse);
// Returns blocks in the current best chain. Currently only supports querying by height
rpc GetBlocks(GetBlocksRequest) returns (stream HistoricalBlock);
// Returns the block timing for the chain heights
Expand Down Expand Up @@ -256,11 +254,6 @@ message GetHeaderByHashRequest {
bytes hash = 1;
}

// Request that returns a header based by height
message GetHeaderByHeightRequest {
// The height of the block header
uint64 height = 1;
}

message BlockHeaderResponse {
// The block header
Expand Down
84 changes: 43 additions & 41 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeader, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;

Expand Down Expand Up @@ -381,7 +381,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
},
}
};

let consensus_rules = self.consensus_rules.clone();
task::spawn(async move {
debug!(
target: LOG_TARGET,
Expand All @@ -402,25 +402,59 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
for (start, end) in page_iter {
debug!(target: LOG_TARGET, "Page: {}-{}", start, end);
// TODO: Better error handling
let result_headers = match handler.get_headers(start..=end).await {
let result_data = match handler.get_blocks(start..=end).await {
Err(err) => {
warn!(target: LOG_TARGET, "Internal base node service error: {}", err);
return;
},
Ok(data) => {
if is_reversed {
data.into_iter().rev().collect::<Vec<_>>()
data.into_iter()
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
.map(|chain_block| {
let (block, acc_data, confirmations, _) = chain_block.dissolve();
let total_block_reward = consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels());

tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
}
})
.rev()
.collect::<Vec<_>>()
} else {
data
data.into_iter()
.map(|chain_block| {
let (block, acc_data, confirmations, _) = chain_block.dissolve();
let total_block_reward = consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels());

tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
}
})
.collect()
}
},
};
let result_size = result_headers.len();
let result_size = result_data.len();
debug!(target: LOG_TARGET, "Result headers: {}", result_size);

for header in result_headers {
debug!(target: LOG_TARGET, "Sending block header: {}", header.height());
match tx.send(Ok(header.into_header().into())).await {
for response in result_data {
// header wont be none here as we just filled it in above
debug!(
target: LOG_TARGET,
"Sending block header: {}",
response.header.as_ref().map(|h| h.height).unwrap_or(0)
);
match tx.send(Ok(response)).await {
Ok(_) => (),
Err(err) => {
warn!(target: LOG_TARGET, "Error sending block header via GRPC: {}", err);
Expand Down Expand Up @@ -1443,38 +1477,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
}
}

async fn get_header_by_height(
&self,
request: Request<tari_rpc::GetHeaderByHeightRequest>,
) -> Result<Response<tari_rpc::BlockHeaderResponse>, Status> {
let tari_rpc::GetHeaderByHeightRequest { height } = request.into_inner();
let mut node_service = self.node_service.clone();
let block = node_service
.get_block(height)
.await
.map_err(|err| Status::internal(err.to_string()))?;

match block {
Some(block) => {
let (block, acc_data, confirmations, _) = block.dissolve();
let total_block_reward = self
.consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels());

let resp = tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
};

Ok(Response::new(resp))
},
None => Err(Status::not_found(format!("Header not found with height `{}`", height))),
}
}

async fn identify(&self, _: Request<tari_rpc::Empty>) -> Result<Response<tari_rpc::NodeIdentity>, Status> {
let identity = self.comms.node_identity_ref();
Ok(Response::new(tari_rpc::NodeIdentity {
Expand Down