From 5d7fb207c9994ea7f0cd6a0d7c05786d8da60792 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 19 Oct 2021 20:30:53 +0400 Subject: [PATCH] fix: remove unbounded vec allocations from base node grpc/p2p messaging (#3467) Description --- - replaces unbounded height lists with bound height ranges in internal base node service interfaces and p2p messaging - removes unused and deprecated base node p2p messaging requests/responses - load chain headers when requesting headers, this simplifies and reduces the database load when calling grpc get_blocks - use iterator with almost no footprint for paging in grpc calls - implement `DoubleSidedIterator` for `NonOverlappingIntegerPairIter` - add overflow protection to `NonOverlappingIntegerPairIter` and additional tests This PR does not contain any breaking changes, as no nodes using the base node p2p messaging interface except for block propagation (which is unchanged). GRPC protobuf contract is preserved. **New dependency** `either = "1.6.1"` added to `tari_base_node` crate and used for its `Either` iterator impl Motivation and Context --- - removing unused "outbound comms interface" request/responses reduces attack surface (TODO: remove this interface entirely - specifically the coupling for internal service requests and external p2p comms is troublesome, but mostly removed in the PR, but removing it entirely will make making changes to the base node service much easier) - remote peers could request any number of heights allowing an external party to allocate memory without bounds Closes #3310 How Has This Been Tested? --- Existing GRPC tests Manually tested block explorer api connected to local base node Additional unit tests --- Cargo.lock | 1 + applications/tari_base_node/Cargo.toml | 1 + .../tari_base_node/src/command_handler.rs | 70 ++--- .../src/grpc/base_node_grpc_server.rs | 273 ++++++++---------- .../tari_base_node/src/grpc/blocks.rs | 14 +- .../comms_interface/comms_request.rs | 21 +- .../comms_interface/comms_response.rs | 6 +- .../comms_interface/inbound_handlers.rs | 54 +--- .../comms_interface/local_interface.rs | 47 ++- .../core/src/base_node/comms_interface/mod.rs | 1 + .../comms_interface/outbound_interface.rs | 167 +---------- base_layer/core/src/base_node/mod.rs | 2 +- .../core/src/base_node/proto/request.proto | 32 +- .../core/src/base_node/proto/request.rs | 91 +----- .../core/src/base_node/proto/response.proto | 22 -- .../core/src/base_node/proto/response.rs | 72 +---- .../core/src/base_node/service/service.rs | 47 +-- .../state_machine_service/initializer.rs | 3 - .../state_machine_service/state_machine.rs | 5 +- base_layer/core/src/blocks/block_header.rs | 8 - base_layer/core/src/iterators/chunk.rs | 141 ++++++++- base_layer/core/tests/helpers/nodes.rs | 2 +- base_layer/core/tests/node_comms_interface.rs | 148 +--------- base_layer/core/tests/node_service.rs | 173 +---------- base_layer/core/tests/node_state_machine.rs | 2 - 25 files changed, 429 insertions(+), 974 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e86dd62bbf..d7645fcc5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4282,6 +4282,7 @@ dependencies = [ "bincode", "chrono", "config", + "either", "futures 0.3.16", "log 0.4.14", "num_cpus", diff --git a/applications/tari_base_node/Cargo.toml b/applications/tari_base_node/Cargo.toml index 72166a6fa1..350f6417a3 100644 --- a/applications/tari_base_node/Cargo.toml +++ b/applications/tari_base_node/Cargo.toml @@ -25,6 +25,7 @@ anyhow = "1.0.32" bincode = "1.3.1" chrono = "0.4" config = { version = "0.9.3" } +either = "1.6.1" futures = { version = "^0.3.16", default-features = false, features = ["alloc"] } log = { version = "0.4.8", features = ["std"] } num_cpus = "1" diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index 103da488a7..8db816de57 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -131,14 +131,9 @@ impl CommandHandler { status_line.add_field("State", state_info.borrow().state_info.short_desc()); let metadata = node.get_metadata().await.unwrap(); - - let last_header = node - .get_headers(vec![metadata.height_of_longest_chain()]) - .await - .unwrap() - .pop() - .unwrap(); - let last_block_time = DateTime::::from(last_header.timestamp); + let height = metadata.height_of_longest_chain(); + let last_header = node.get_header(height).await.unwrap().unwrap(); + let last_block_time = DateTime::::from(last_header.header().timestamp); status_line.add_field( "Tip", format!( @@ -870,21 +865,20 @@ impl CommandHandler { io::stdout().flush().unwrap(); // we can only check till the pruning horizon, 0 is archive node so it needs to check every block. if height > horizon_height { - match node.get_blocks(vec![height]).await { - Err(_err) => { - missing_blocks.push(height); - }, - Ok(mut data) => match data.pop() { - // We need to check the data it self, as FetchMatchingBlocks will suppress any error, only + match node.get_block(height).await { + Err(err) => { + // We need to check the data itself, as FetchMatchingBlocks will suppress any error, only // logging it. - Some(_historical_block) => {}, - None => missing_blocks.push(height), + error!(target: LOG_TARGET, "{}", err); + missing_blocks.push(height); }, + Ok(Some(_)) => {}, + Ok(None) => missing_blocks.push(height), }; } height -= 1; - let next_header = node.get_headers(vec![height]).await; - if next_header.is_err() { + let next_header = node.get_header(height).await.ok().flatten(); + if next_header.is_none() { // this header is missing, so we stop here and need to ask for this header missing_headers.push(height); }; @@ -921,34 +915,30 @@ impl CommandHandler { print!("{}", height); io::stdout().flush().unwrap(); - let block = match node.get_blocks(vec![height]).await { - Err(_err) => { - println!("Error in db, could not get block"); + let block = match node.get_block(height).await { + Err(err) => { + println!("Error in db, could not get block: {}", err); break; }, - Ok(mut data) => match data.pop() { - // We need to check the data it self, as FetchMatchingBlocks will suppress any error, only - // logging it. - Some(historical_block) => historical_block, - None => { - println!("Error in db, could not get block"); - break; - }, + // We need to check the data it self, as FetchMatchingBlocks will suppress any error, only + // logging it. + Ok(Some(historical_block)) => historical_block, + Ok(None) => { + println!("Error in db, block not found at height {}", height); + break; }, }; - let prev_block = match node.get_blocks(vec![height - 1]).await { - Err(_err) => { - println!("Error in db, could not get block"); + let prev_block = match node.get_block(height - 1).await { + Err(err) => { + println!("Error in db, could not get block: {}", err); break; }, - Ok(mut data) => match data.pop() { - // We need to check the data it self, as FetchMatchingBlocks will suppress any error, only - // logging it. - Some(historical_block) => historical_block, - None => { - println!("Error in db, could not get block"); - break; - }, + // We need to check the data it self, as FetchMatchingBlocks will suppress any error, only + // logging it. + Ok(Some(historical_block)) => historical_block, + Ok(None) => { + println!("Error in db, block not found at height {}", height - 1); + break; }, }; height -= 1; diff --git a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs index 7db518cbcc..11a962d59a 100644 --- a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs +++ b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs @@ -26,6 +26,7 @@ use crate::{ helpers::{mean, median}, }, }; +use either::Either; use futures::{channel::mpsc, SinkExt}; use log::*; use std::{ @@ -49,6 +50,7 @@ use tari_core::{ chain_storage::ChainStorageError, consensus::{emission::Emission, ConsensusManager, NetworkConsensus}, crypto::tari_utilities::{hex::Hex, ByteArray}, + iterators::NonOverlappingIntegerPairIter, mempool::{service::LocalMempoolService, TxStorageResponse}, proof_of_work::PowAlgorithm, transactions::transaction::Transaction, @@ -64,7 +66,7 @@ const GET_TOKENS_IN_CIRCULATION_PAGE_SIZE: usize = 1_000; // The maximum number of difficulty ints that can be requested at a time. These will be streamed to the // client, so memory is not really a concern here, but a malicious client could request a large // number here to keep the node busy -const GET_DIFFICULTY_MAX_HEIGHTS: usize = 10_000; +const GET_DIFFICULTY_MAX_HEIGHTS: u64 = 10_000; const GET_DIFFICULTY_PAGE_SIZE: usize = 1_000; // The maximum number of headers a client can request at a time. If the client requests more than // this, this is the maximum that will be returned. @@ -104,7 +106,7 @@ impl BaseNodeGrpcServer { pub async fn get_heights( request: &tari_rpc::HeightRequest, handler: LocalNodeCommsInterface, -) -> Result, Status> { +) -> Result<(u64, u64), Status> { block_heights(handler, request.start_height, request.end_height, request.from_tip).await } @@ -132,111 +134,74 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { request.end_height ); let mut handler = self.node_service.clone(); - let mut heights: Vec = get_heights(&request, handler.clone()).await?; - heights = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_MAX_HEIGHTS)) - .collect(); - let (mut tx, rx) = mpsc::channel(GET_DIFFICULTY_MAX_HEIGHTS); + let (start_height, end_height) = get_heights(&request, handler.clone()).await?; + // Overflow safety: checked in get_heights + let num_requested = end_height - start_height; + if num_requested > GET_DIFFICULTY_MAX_HEIGHTS { + return Err(Status::invalid_argument(format!( + "Number of headers requested exceeds maximum. Expected less than {} but got {}", + GET_DIFFICULTY_MAX_HEIGHTS, num_requested + ))); + } + let (mut tx, rx) = mpsc::channel(cmp::min(num_requested as usize, GET_DIFFICULTY_PAGE_SIZE)); task::spawn(async move { - let mut page: Vec = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_PAGE_SIZE)) - .collect(); - while !page.is_empty() { - let mut difficulties = match handler.get_headers(page.clone()).await { + let page_iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, GET_DIFFICULTY_PAGE_SIZE); + for (start, end) in page_iter { + // headers are returned by height + let headers = match handler.get_headers(start..=end).await { + Ok(headers) => headers, Err(err) => { - warn!( - target: LOG_TARGET, - "Error communicating with local base node: {:?}", err, - ); + warn!(target: LOG_TARGET, "Base node service error: {:?}", err,); + let _ = tx + .send(Err(Status::internal("Internal error when fetching blocks"))) + .await; return; }, - Ok(mut data) => { - data.sort_by(|a, b| a.height.cmp(&b.height)); - let mut iter = data.iter().peekable(); - let mut result = Vec::new(); - while let Some(next) = iter.next() { - match handler.get_blocks(vec![next.height]).await { - Err(err) => { - warn!( - target: LOG_TARGET, - "Error communicating with local base node: {:?}", err, - ); - return; - }, - Ok(blocks) => { - match blocks.first() { - Some(block) => { - let current_difficulty: u64 = - block.accumulated_data.target_difficulty.as_u64(); - let current_timestamp = next.timestamp.as_u64(); - let current_height = next.height; - let pow_algo = next.pow.pow_algo.as_u64(); - let estimated_hash_rate = if let Some(peek) = iter.peek() { - let peeked_timestamp = peek.timestamp.as_u64(); - // Sometimes blocks can have the same timestamp, lucky miner and some - // clock drift. - if peeked_timestamp > current_timestamp { - current_difficulty / (peeked_timestamp - current_timestamp) - } else { - 0 - } - } else { - 0 - }; - result.push(( - current_difficulty, - estimated_hash_rate, - current_height, - current_timestamp, - pow_algo, - )) - }, - None => { - return; - }, - } - }, - }; - } - result - }, }; - difficulties.sort_by(|a, b| b.2.cmp(&a.2)); - let result_size = difficulties.len(); - for difficulty in difficulties { - match tx - .send(Ok({ - tari_rpc::NetworkDifficultyResponse { - difficulty: difficulty.0, - estimated_hash_rate: difficulty.1, - height: difficulty.2, - timestamp: difficulty.3, - pow_algo: difficulty.4, - } - })) - .await - { - Ok(_) => (), - Err(err) => { - warn!(target: LOG_TARGET, "Error sending difficulty via GRPC: {}", err); - match tx.send(Err(Status::unknown("Error sending data"))).await { - Ok(_) => (), - Err(send_err) => { - warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err) - }, - } - return; - }, - } + if headers.is_empty() { + let _ = tx.send(Err(Status::invalid_argument(format!( + "No blocks found within range {} - {}", + start, end + )))); + return; } - if result_size < GET_DIFFICULTY_PAGE_SIZE { - break; + + let mut headers_iter = headers.iter().peekable(); + + while let Some(chain_header) = headers_iter.next() { + let current_difficulty = chain_header.accumulated_data().target_difficulty.as_u64(); + let current_timestamp = chain_header.header().timestamp.as_u64(); + let current_height = chain_header.header().height; + let pow_algo = chain_header.header().pow.pow_algo.as_u64(); + + let estimated_hash_rate = headers_iter + .peek() + .map(|chain_header| chain_header.header().timestamp.as_u64()) + .and_then(|peeked_timestamp| { + // Sometimes blocks can have the same timestamp, lucky miner and some + // clock drift. + peeked_timestamp + .checked_sub(current_timestamp) + .filter(|td| *td > 0) + .map(|time_diff| current_timestamp / time_diff) + }) + .unwrap_or(0); + + let difficulty = tari_rpc::NetworkDifficultyResponse { + difficulty: current_difficulty, + estimated_hash_rate, + height: current_height, + timestamp: current_timestamp, + pow_algo, + }; + + if let Err(err) = tx.send(Ok(difficulty)).await { + warn!(target: LOG_TARGET, "Error sending difficulties via GRPC: {}", err); + return; + } } - page = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_PAGE_SIZE)) - .collect(); } }); @@ -326,21 +291,18 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { let from_height = cmp::min(request.from_height, tip); - let headers: Vec = if from_height != 0 { + let (header_range, is_reversed) = if from_height != 0 { match sorting { Sorting::Desc => { let from = match from_height.overflowing_sub(num_headers) { (_, true) => 0, (res, false) => res + 1, }; - (from..=from_height).rev().collect() + (from..=from_height, true) }, Sorting::Asc => { - let to = match from_height.overflowing_add(num_headers) { - (_, true) => u64::MAX, - (res, false) => res, - }; - (from_height..to).collect() + let to = from_height.saturating_add(num_headers).saturating_sub(1); + (from_height..=to, false) }, } } else { @@ -350,34 +312,50 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { (_, true) => 0, (res, false) => res + 1, }; - (from..=tip).rev().collect() + (from..=tip, true) }, - Sorting::Asc => (0..num_headers).collect(), + Sorting::Asc => (0..=num_headers.saturating_sub(1), false), } }; task::spawn(async move { - trace!(target: LOG_TARGET, "Starting base node request"); - let mut headers = headers; - trace!(target: LOG_TARGET, "Headers:{:?}", headers); - let mut page: Vec = headers - .drain(..cmp::min(headers.len(), LIST_HEADERS_PAGE_SIZE)) - .collect(); - while !page.is_empty() { - trace!(target: LOG_TARGET, "Page: {:?}", page); - let result_headers = match handler.get_headers(page).await { + debug!( + target: LOG_TARGET, + "Starting base node request {}-{}", + header_range.start(), + header_range.end() + ); + let page_iter = NonOverlappingIntegerPairIter::new( + *header_range.start(), + *header_range.end() + 1, + LIST_HEADERS_PAGE_SIZE, + ); + let page_iter = if is_reversed { + Either::Left(page_iter.rev()) + } else { + Either::Right(page_iter) + }; + for (start, end) in page_iter { + debug!(target: LOG_TARGET, "Page: {}-{}", start, end); + let result_headers = match handler.get_headers(start..=end).await { Err(err) => { - warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,); + warn!(target: LOG_TARGET, "Internal base node service error: {}", err); return; }, - Ok(data) => data, + Ok(data) => { + if is_reversed { + data.into_iter().rev().collect::>() + } else { + data + } + }, }; - trace!(target: LOG_TARGET, "Result headers: {}", result_headers.len()); let result_size = result_headers.len(); + debug!(target: LOG_TARGET, "Result headers: {}", result_size); for header in result_headers { - trace!(target: LOG_TARGET, "Sending block header: {}", header.height); - match tx.send(Ok(header.into())).await { + debug!(target: LOG_TARGET, "Sending block header: {}", header.height()); + match tx.send(Ok(header.into_header().into())).await { Ok(_) => (), Err(err) => { warn!(target: LOG_TARGET, "Error sending block header via GRPC: {}", err); @@ -391,12 +369,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { }, } } - if result_size < LIST_HEADERS_PAGE_SIZE { - break; - } - page = headers - .drain(..cmp::min(headers.len(), LIST_HEADERS_PAGE_SIZE)) - .collect(); } }); @@ -670,18 +642,24 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { target: LOG_TARGET, "Incoming GRPC request for GetBlocks: {:?}", request.heights ); + let mut heights = request.heights; - heights = heights - .drain(..cmp::min(heights.len(), GET_BLOCKS_MAX_HEIGHTS)) - .collect(); + if heights.is_empty() { + return Err(Status::invalid_argument("heights cannot be empty")); + } + + heights.truncate(GET_BLOCKS_MAX_HEIGHTS); + heights.sort_unstable(); + // unreachable panic: `heights` is not empty + let start = *heights.first().expect("unreachable"); + let end = *heights.last().expect("unreachable"); let mut handler = self.node_service.clone(); let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE); task::spawn(async move { - let mut page: Vec = heights.drain(..cmp::min(heights.len(), GET_BLOCKS_PAGE_SIZE)).collect(); - - while !page.is_empty() { - let blocks = match handler.get_blocks(page.clone()).await { + let page_iter = NonOverlappingIntegerPairIter::new(start, end + 1, GET_BLOCKS_PAGE_SIZE); + for (start, end) in page_iter { + let blocks = match handler.get_blocks(start..=end).await { Err(err) => { warn!( target: LOG_TARGET, @@ -689,10 +667,19 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { ); return; }, - Ok(data) => data, + Ok(data) => { + // TODO: Change this interface to a start-end ranged one (clients like the block explorer + // convert start end ranges to integer lists anyway) + data.into_iter().filter(|b| heights.contains(&b.header().height)) + }, }; - let result_size = blocks.len(); + for block in blocks { + debug!( + target: LOG_TARGET, + "GetBlock GRPC sending block #{}", + block.header().height + ); match tx .send( block @@ -714,10 +701,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { }, } } - if result_size < GET_BLOCKS_PAGE_SIZE { - break; - } - page = heights.drain(..cmp::min(heights.len(), GET_BLOCKS_PAGE_SIZE)).collect(); } }); @@ -888,10 +871,10 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { ); let mut handler = self.node_service.clone(); - let heights: Vec = get_heights(&request, handler.clone()).await?; + let (start, end) = get_heights(&request, handler.clone()).await?; - let headers = match handler.get_headers(heights).await { - Ok(headers) => headers, + let headers = match handler.get_headers(start..=end).await { + Ok(headers) => headers.into_iter().map(|h| h.into_header()).collect::>(), Err(err) => { warn!(target: LOG_TARGET, "Error getting headers for GRPC client: {}", err); Vec::new() @@ -1177,9 +1160,9 @@ async fn get_block_group( height_request.end_height ); - let heights = get_heights(&height_request, handler.clone()).await?; + let (start, end) = get_heights(&height_request, handler.clone()).await?; - let blocks = match handler.get_blocks(heights).await { + let blocks = match handler.get_blocks(start..=end).await { Err(err) => { warn!( target: LOG_TARGET, diff --git a/applications/tari_base_node/src/grpc/blocks.rs b/applications/tari_base_node/src/grpc/blocks.rs index 2baa015dd2..4a58a0e54b 100644 --- a/applications/tari_base_node/src/grpc/blocks.rs +++ b/applications/tari_base_node/src/grpc/blocks.rs @@ -21,10 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::cmp; -use tari_core::{ - base_node::LocalNodeCommsInterface, - blocks::{BlockHeader, HistoricalBlock}, -}; +use tari_core::{base_node::LocalNodeCommsInterface, blocks::HistoricalBlock}; use tonic::Status; // The maximum number of blocks that can be requested at a time. These will be streamed to the @@ -46,9 +43,12 @@ pub async fn block_heights( start_height: u64, end_height: u64, from_tip: u64, -) -> Result, Status> { +) -> Result<(u64, u64), Status> { if end_height > 0 { - Ok(BlockHeader::get_height_range(start_height, end_height)) + if start_height > end_height { + return Err(Status::invalid_argument("Start height was greater than end height")); + } + Ok((start_height, end_height)) } else if from_tip > 0 { let metadata = handler .get_metadata() @@ -58,7 +58,7 @@ pub async fn block_heights( // Avoid overflow let height_from_tip = cmp::min(tip, from_tip); let start = cmp::max(tip - height_from_tip, 0); - Ok(BlockHeader::get_height_range(start, tip)) + Ok((start, tip)) } else { Err(Status::invalid_argument("Invalid arguments provided")) } diff --git a/base_layer/core/src/base_node/comms_interface/comms_request.rs b/base_layer/core/src/base_node/comms_interface/comms_request.rs index eef287d8f1..a309eeb606 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_request.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_request.rs @@ -22,7 +22,10 @@ use crate::{blocks::NewBlockTemplate, chain_storage::MmrTree, proof_of_work::PowAlgorithm}; use serde::{Deserialize, Serialize}; -use std::fmt::{Display, Error, Formatter}; +use std::{ + fmt::{Display, Error, Formatter}, + ops::RangeInclusive, +}; use tari_common_types::types::{Commitment, HashOutput, Signature}; use tari_crypto::tari_utilities::hex::Hex; @@ -38,13 +41,13 @@ pub struct MmrStateRequest { #[derive(Debug, Serialize, Deserialize)] pub enum NodeCommsRequest { GetChainMetadata, - FetchHeaders(Vec), + FetchHeaders(RangeInclusive), FetchHeadersWithHashes(Vec), FetchHeadersAfter(Vec, HashOutput), FetchMatchingUtxos(Vec), FetchMatchingTxos(Vec), - FetchMatchingBlocks(Vec), - FetchBlocksWithHashes(Vec), + FetchMatchingBlocks(RangeInclusive), + FetchBlocksByHash(Vec), FetchBlocksWithKernels(Vec), FetchBlocksWithUtxos(Vec), GetHeaderByHash(HashOutput), @@ -65,13 +68,17 @@ impl Display for NodeCommsRequest { use NodeCommsRequest::*; match self { GetChainMetadata => write!(f, "GetChainMetadata"), - FetchHeaders(v) => write!(f, "FetchHeaders (n={})", v.len()), + FetchHeaders(range) => { + write!(f, "FetchHeaders ({:?})", range) + }, FetchHeadersWithHashes(v) => write!(f, "FetchHeadersWithHashes (n={})", v.len()), FetchHeadersAfter(v, _hash) => write!(f, "FetchHeadersAfter (n={})", v.len()), FetchMatchingUtxos(v) => write!(f, "FetchMatchingUtxos (n={})", v.len()), FetchMatchingTxos(v) => write!(f, "FetchMatchingTxos (n={})", v.len()), - FetchMatchingBlocks(v) => write!(f, "FetchMatchingBlocks (n={})", v.len()), - FetchBlocksWithHashes(v) => write!(f, "FetchBlocksWithHashes (n={})", v.len()), + FetchMatchingBlocks(range) => { + write!(f, "FetchMatchingBlocks ({:?})", range) + }, + FetchBlocksByHash(v) => write!(f, "FetchBlocksByHash (n={})", v.len()), FetchBlocksWithKernels(v) => write!(f, "FetchBlocksWithKernels (n={})", v.len()), FetchBlocksWithUtxos(v) => write!(f, "FetchBlocksWithUtxos (n={})", v.len()), GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()), diff --git a/base_layer/core/src/base_node/comms_interface/comms_response.rs b/base_layer/core/src/base_node/comms_interface/comms_response.rs index 1d0fa7f53d..e5a1fbaa1c 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_response.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_response.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - blocks::{Block, BlockHeader, HistoricalBlock, NewBlockTemplate}, + blocks::{Block, BlockHeader, ChainHeader, HistoricalBlock, NewBlockTemplate}, proof_of_work::Difficulty, transactions::transaction::{TransactionKernel, TransactionOutput}, }; @@ -34,8 +34,8 @@ use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput}; pub enum NodeCommsResponse { ChainMetadata(ChainMetadata), TransactionKernels(Vec), - BlockHeaders(Vec), - BlockHeader(Option), + BlockHeaders(Vec), + BlockHeader(Option), TransactionOutputs(Vec), HistoricalBlocks(Vec), HistoricalBlock(Box>), diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 84e0da17a3..6dfec0b3b4 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -20,13 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - base_node::{ - comms_interface::{ - error::CommsInterfaceError, - local_interface::BlockEventSender, - NodeCommsRequest, - NodeCommsResponse, - }, + base_node::comms_interface::{ + error::CommsInterfaceError, + local_interface::BlockEventSender, + NodeCommsRequest, + NodeCommsResponse, OutboundNodeCommsInterface, }, blocks::{Block, BlockHeader, ChainBlock, NewBlock, NewBlockTemplate}, @@ -128,27 +126,15 @@ where T: BlockchainBackend + 'static NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata( self.blockchain_db.get_chain_metadata().await?, )), - NodeCommsRequest::FetchHeaders(block_nums) => { - let mut block_headers = Vec::::with_capacity(block_nums.len()); - for block_num in block_nums { - match self.blockchain_db.fetch_header(block_num).await { - Ok(Some(block_header)) => { - block_headers.push(block_header); - }, - Ok(None) => return Err(CommsInterfaceError::BlockHeaderNotFound(block_num)), - Err(err) => { - error!(target: LOG_TARGET, "Could not fetch headers: {}", err.to_string()); - return Err(err.into()); - }, - } - } - Ok(NodeCommsResponse::BlockHeaders(block_headers)) + NodeCommsRequest::FetchHeaders(range) => { + let headers = self.blockchain_db.fetch_chain_headers(range).await?; + Ok(NodeCommsResponse::BlockHeaders(headers)) }, NodeCommsRequest::FetchHeadersWithHashes(block_hashes) => { - let mut block_headers = Vec::::with_capacity(block_hashes.len()); + let mut block_headers = Vec::with_capacity(block_hashes.len()); for block_hash in block_hashes { let block_hex = block_hash.to_hex(); - match self.blockchain_db.fetch_header_by_block_hash(block_hash).await? { + match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? { Some(block_header) => { block_headers.push(block_header); }, @@ -248,23 +234,11 @@ where T: BlockchainBackend + 'static .collect(); Ok(NodeCommsResponse::TransactionOutputs(res)) }, - NodeCommsRequest::FetchMatchingBlocks(block_nums) => { - let mut blocks = Vec::with_capacity(block_nums.len()); - for block_num in block_nums { - debug!(target: LOG_TARGET, "A peer has requested block {}", block_num); - match self.blockchain_db.fetch_block(block_num).await { - Ok(block) => blocks.push(block), - // We need to suppress the error as another node might ask for a block we dont have, so we - // return ok([]) - Err(e) => debug!( - target: LOG_TARGET, - "Could not provide requested block {} to peer because: {}", block_num, e - ), - } - } + NodeCommsRequest::FetchMatchingBlocks(range) => { + let blocks = self.blockchain_db.fetch_blocks(range).await?; Ok(NodeCommsResponse::HistoricalBlocks(blocks)) }, - NodeCommsRequest::FetchBlocksWithHashes(block_hashes) => { + NodeCommsRequest::FetchBlocksByHash(block_hashes) => { let mut blocks = Vec::with_capacity(block_hashes.len()); for block_hash in block_hashes { let block_hex = block_hash.to_hex(); @@ -340,7 +314,7 @@ where T: BlockchainBackend + 'static Ok(NodeCommsResponse::HistoricalBlocks(blocks)) }, NodeCommsRequest::GetHeaderByHash(hash) => { - let header = self.blockchain_db.fetch_header_by_block_hash(hash).await?; + let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?; Ok(NodeCommsResponse::BlockHeader(header)) }, NodeCommsRequest::GetBlockByHash(hash) => { diff --git a/base_layer/core/src/base_node/comms_interface/local_interface.rs b/base_layer/core/src/base_node/comms_interface/local_interface.rs index b5ec655633..04e72860a3 100644 --- a/base_layer/core/src/base_node/comms_interface/local_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/local_interface.rs @@ -28,11 +28,11 @@ use crate::{ NodeCommsRequest, NodeCommsResponse, }, - blocks::{Block, BlockHeader, HistoricalBlock, NewBlockTemplate}, + blocks::{Block, HistoricalBlock, NewBlockTemplate}, proof_of_work::PowAlgorithm, transactions::transaction::TransactionKernel, }; -use std::sync::Arc; +use std::{ops::RangeInclusive, sync::Arc}; use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash}; use tari_service_framework::{reply_channel::SenderService, Service}; use tokio::sync::broadcast; @@ -41,6 +41,7 @@ pub type BlockEventSender = broadcast::Sender>; pub type BlockEventReceiver = broadcast::Receiver>; use crate::{ base_node::comms_interface::comms_request::GetNewBlockTemplateRequest, + blocks::ChainHeader, transactions::transaction::TransactionOutput, }; use tari_common_types::types::{Commitment, HashOutput, Signature}; @@ -80,11 +81,14 @@ impl LocalNodeCommsInterface { } } - /// Request the block header of the current tip at the block height - pub async fn get_blocks(&mut self, block_heights: Vec) -> Result, CommsInterfaceError> { + /// Request the block headers within the given range + pub async fn get_blocks( + &mut self, + range: RangeInclusive, + ) -> Result, CommsInterfaceError> { match self .request_sender - .call(NodeCommsRequest::FetchMatchingBlocks(block_heights)) + .call(NodeCommsRequest::FetchMatchingBlocks(range)) .await?? { NodeCommsResponse::HistoricalBlocks(blocks) => Ok(blocks), @@ -92,11 +96,24 @@ impl LocalNodeCommsInterface { } } - /// Request the block header of the current tip at the block height - pub async fn get_headers(&mut self, block_heights: Vec) -> Result, CommsInterfaceError> { + /// Request the block header at the given height + pub async fn get_block(&mut self, height: u64) -> Result, CommsInterfaceError> { + match self + .request_sender + .call(NodeCommsRequest::FetchMatchingBlocks(height..=height)) + .await?? + { + NodeCommsResponse::HistoricalBlocks(mut blocks) => Ok(blocks.pop()), + _ => Err(CommsInterfaceError::UnexpectedApiResponse), + } + } + + /// Request the block headers with the given range of heights. The returned headers are ordered from lowest to + /// highest block height + pub async fn get_headers(&mut self, range: RangeInclusive) -> Result, CommsInterfaceError> { match self .request_sender - .call(NodeCommsRequest::FetchHeaders(block_heights)) + .call(NodeCommsRequest::FetchHeaders(range)) .await?? { NodeCommsResponse::BlockHeaders(headers) => Ok(headers), @@ -104,6 +121,18 @@ impl LocalNodeCommsInterface { } } + /// Request the block header with the height. + pub async fn get_header(&mut self, height: u64) -> Result, CommsInterfaceError> { + match self + .request_sender + .call(NodeCommsRequest::FetchHeaders(height..=height)) + .await?? + { + NodeCommsResponse::BlockHeaders(mut headers) => Ok(headers.pop()), + _ => Err(CommsInterfaceError::UnexpectedApiResponse), + } + } + /// Request the construction of a new mineable block template from the base node service. pub async fn get_new_block_template( &mut self, @@ -203,7 +232,7 @@ impl LocalNodeCommsInterface { } /// Return header matching the given hash. If the header cannot be found `Ok(None)` is returned. - pub async fn get_header_by_hash(&mut self, hash: HashOutput) -> Result, CommsInterfaceError> { + pub async fn get_header_by_hash(&mut self, hash: HashOutput) -> Result, CommsInterfaceError> { match self .request_sender .call(NodeCommsRequest::GetHeaderByHash(hash)) diff --git a/base_layer/core/src/base_node/comms_interface/mod.rs b/base_layer/core/src/base_node/comms_interface/mod.rs index fca07c92f1..66185bdb01 100644 --- a/base_layer/core/src/base_node/comms_interface/mod.rs +++ b/base_layer/core/src/base_node/comms_interface/mod.rs @@ -35,5 +35,6 @@ pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers}; mod local_interface; pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface}; +// TODO: Remove this entirely when able mod outbound_interface; pub use outbound_interface::OutboundNodeCommsInterface; diff --git a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs index 7f8a9d0c33..cf67713c93 100644 --- a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs @@ -22,20 +22,13 @@ use crate::{ base_node::comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse}, - blocks::{BlockHeader, HistoricalBlock, NewBlock}, - transactions::transaction::TransactionOutput, -}; -use log::*; -use tari_common_types::{ - chain_metadata::ChainMetadata, - types::{BlockHash, HashOutput}, + blocks::{HistoricalBlock, NewBlock}, }; +use tari_common_types::types::BlockHash; use tari_comms::peer_manager::NodeId; use tari_service_framework::{reply_channel::SenderService, Service}; use tokio::sync::mpsc::UnboundedSender; -pub const LOG_TARGET: &str = "c::bn::comms_interface::outbound_interface"; - /// The OutboundNodeCommsInterface provides an interface to request information from remove nodes. #[derive(Clone)] pub struct OutboundNodeCommsInterface { @@ -58,160 +51,6 @@ impl OutboundNodeCommsInterface { } } - /// Request metadata from remote base nodes. - pub async fn get_metadata(&mut self) -> Result { - self.request_metadata_from_peer(None).await - } - - /// Request metadata from a specific base node, if None is provided as a node_id then a random base node will be - /// queried. - pub async fn request_metadata_from_peer( - &mut self, - node_id: Option, - ) -> Result { - if let NodeCommsResponse::ChainMetadata(metadata) = self - .request_sender - .call((NodeCommsRequest::GetChainMetadata, node_id)) - .await?? - { - trace!(target: LOG_TARGET, "Remote metadata requested: {:?}", metadata,); - Ok(metadata) - } else { - // TODO: Potentially ban peer - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the block headers corresponding to the provided block numbers from remote base nodes. - pub async fn fetch_headers(&mut self, block_nums: Vec) -> Result, CommsInterfaceError> { - self.request_headers_from_peer(block_nums, None).await - } - - /// Fetch the block headers corresponding to the provided block numbers from a specific base node, if None is - /// provided as a node_id then a random base node will be queried. - pub async fn request_headers_from_peer( - &mut self, - block_nums: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::BlockHeaders(headers) = self - .request_sender - .call((NodeCommsRequest::FetchHeaders(block_nums), node_id)) - .await?? - { - Ok(headers) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Headers corresponding to the provided block hashes from remote base nodes. - pub async fn fetch_headers_with_hashes( - &mut self, - block_hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_headers_with_hashes_from_peer(block_hashes, None).await - } - - /// Fetch the Headers corresponding to the provided block hashes from a specific base node, if None is provided as a - /// node_id then a random base node will be queried. - pub async fn request_headers_with_hashes_from_peer( - &mut self, - block_hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::BlockHeaders(headers) = self - .request_sender - .call((NodeCommsRequest::FetchHeadersWithHashes(block_hashes), node_id)) - .await?? - { - Ok(headers) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the UTXOs with the provided hashes from remote base nodes. - pub async fn fetch_utxos( - &mut self, - hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_utxos_from_peer(hashes, None).await - } - - /// Fetch the UTXOs with the provided hashes from a specific base node, if None is provided as a node_id then a - /// random base node will be queried. - pub async fn request_utxos_from_peer( - &mut self, - hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::TransactionOutputs(utxos) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingUtxos(hashes), node_id)) - .await?? - { - Ok(utxos) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the UTXOs or STXOs with the provided hashes from remote base nodes. - pub async fn fetch_txos(&mut self, hashes: Vec) -> Result, CommsInterfaceError> { - self.request_txos_from_peer(hashes, None).await - } - - /// Fetch the UTXOs or STXOS with the provided hashes from a specific base node, if None is provided as a node_id - /// then a random base node will be queried. - pub async fn request_txos_from_peer( - &mut self, - hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::TransactionOutputs(txos) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingTxos(hashes), node_id)) - .await?? - { - Ok(txos) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Historical Blocks corresponding to the provided block numbers from remote base nodes. - pub async fn fetch_blocks(&mut self, block_nums: Vec) -> Result, CommsInterfaceError> { - self.request_blocks_from_peer(block_nums, None).await - } - - /// Fetch the Historical Blocks corresponding to the provided block numbers from a specific base node, if None is - /// provided as a node_id then a random base node will be queried. - pub async fn request_blocks_from_peer( - &mut self, - block_nums: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::HistoricalBlocks(blocks) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingBlocks(block_nums), node_id)) - .await?? - { - Ok(blocks) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Blocks corresponding to the provided block hashes from remote base nodes. The requested blocks could - /// be chain blocks or orphan blocks. - pub async fn fetch_blocks_with_hashes( - &mut self, - block_hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_blocks_with_hashes_from_peer(block_hashes, None).await - } - /// Fetch the Blocks corresponding to the provided block hashes from a specific base node. The requested blocks /// could be chain blocks or orphan blocks. pub async fn request_blocks_with_hashes_from_peer( @@ -221,7 +60,7 @@ impl OutboundNodeCommsInterface { ) -> Result, CommsInterfaceError> { if let NodeCommsResponse::HistoricalBlocks(blocks) = self .request_sender - .call((NodeCommsRequest::FetchBlocksWithHashes(block_hashes), node_id)) + .call((NodeCommsRequest::FetchBlocksByHash(block_hashes), node_id)) .await?? { Ok(blocks) diff --git a/base_layer/core/src/base_node/mod.rs b/base_layer/core/src/base_node/mod.rs index 826a044704..0275c4e9d0 100644 --- a/base_layer/core/src/base_node/mod.rs +++ b/base_layer/core/src/base_node/mod.rs @@ -38,7 +38,7 @@ pub mod chain_metadata_service; #[cfg(feature = "base_node")] pub mod comms_interface; #[cfg(feature = "base_node")] -pub use comms_interface::{LocalNodeCommsInterface, OutboundNodeCommsInterface}; +pub use comms_interface::LocalNodeCommsInterface; #[cfg(feature = "base_node")] pub mod service; diff --git a/base_layer/core/src/base_node/proto/request.proto b/base_layer/core/src/base_node/proto/request.proto index 323c6c6a64..3f7893abe4 100644 --- a/base_layer/core/src/base_node/proto/request.proto +++ b/base_layer/core/src/base_node/proto/request.proto @@ -10,36 +10,8 @@ package tari.base_node; message BaseNodeServiceRequest { uint64 request_key = 1; oneof request { - // Indicates a GetChainMetadata request. The value of the bool should be ignored. - bool get_chain_metadata = 2; - // Indicates a FetchHeaders request. - BlockHeights fetch_headers = 4; - // Indicates a FetchHeadersWithHashes request. - HashOutputs fetch_headers_with_hashes = 5; - // Indicates a FetchMatchingUtxos request. - HashOutputs fetch_matching_utxos = 6; - // Indicates a FetchMatchingBlocks request. - BlockHeights fetch_matching_blocks = 7; - // Indicates a FetchBlocksWithHashes request. - HashOutputs fetch_blocks_with_hashes = 8; - // Indicates a GetNewBlockTemplate request. - NewBlockTemplateRequest get_new_block_template = 9; - // Indicates a GetNewBlock request. - tari.core.NewBlockTemplate get_new_block = 10; - // Get headers in best chain following any headers in this list - FetchHeadersAfter fetch_headers_after = 12; - // Indicates a FetchMatchingTxos request. - HashOutputs fetch_matching_txos = 15; - // Indicates a Fetch block with kernels request - Signatures fetch_blocks_with_kernels = 16; - // Indicates a Fetch block with kernels request - Commitments fetch_blocks_with_utxos = 18; - // Indicates a Fetch kernel by excess signature request - tari.types.Signature fetch_kernel_by_excess_sig = 19; - // Indicates a GetHeaderByHash request. - bytes get_header_by_hash = 20; - // Indicates a GetBlockByHash request. - bytes get_block_by_hash = 21; + // Indicates a FetchBlocksByHash request. + HashOutputs fetch_blocks_by_hash = 8; } } diff --git a/base_layer/core/src/base_node/proto/request.rs b/base_layer/core/src/base_node/proto/request.rs index bf766bffc2..79eccc40dc 100644 --- a/base_layer/core/src/base_node/proto/request.rs +++ b/base_layer/core/src/base_node/proto/request.rs @@ -21,21 +21,11 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - base_node::{comms_interface as ci, comms_interface::GetNewBlockTemplateRequest}, - proof_of_work::PowAlgorithm, - proto::{ - base_node as proto, - base_node::{ - base_node_service_request::Request as ProtoNodeCommsRequest, - BlockHeights, - FetchHeadersAfter as ProtoFetchHeadersAfter, - HashOutputs, - }, - }, + base_node::comms_interface as ci, + proto::base_node::{base_node_service_request::Request as ProtoNodeCommsRequest, BlockHeights, HashOutputs}, }; use std::convert::{From, TryFrom, TryInto}; -use tari_common_types::types::{Commitment, HashOutput, Signature}; -use tari_crypto::tari_utilities::ByteArrayError; +use tari_common_types::types::HashOutput; //---------------------------------- BaseNodeRequest --------------------------------------------// impl TryInto for ProtoNodeCommsRequest { @@ -44,81 +34,20 @@ impl TryInto for ProtoNodeCommsRequest { fn try_into(self) -> Result { use ProtoNodeCommsRequest::*; let request = match self { - // Field was not specified - GetChainMetadata(_) => ci::NodeCommsRequest::GetChainMetadata, - FetchHeaders(block_heights) => ci::NodeCommsRequest::FetchHeaders(block_heights.heights), - FetchHeadersWithHashes(block_hashes) => ci::NodeCommsRequest::FetchHeadersWithHashes(block_hashes.outputs), - FetchHeadersAfter(request) => { - ci::NodeCommsRequest::FetchHeadersAfter(request.hashes, request.stopping_hash) - }, - FetchMatchingUtxos(hash_outputs) => ci::NodeCommsRequest::FetchMatchingUtxos(hash_outputs.outputs), - FetchMatchingTxos(hash_outputs) => ci::NodeCommsRequest::FetchMatchingTxos(hash_outputs.outputs), - FetchMatchingBlocks(block_heights) => ci::NodeCommsRequest::FetchMatchingBlocks(block_heights.heights), - FetchBlocksWithHashes(block_hashes) => ci::NodeCommsRequest::FetchBlocksWithHashes(block_hashes.outputs), - FetchBlocksWithKernels(signatures) => { - let mut sigs = Vec::new(); - for sig in signatures.sigs { - sigs.push(Signature::try_from(sig).map_err(|err: ByteArrayError| err.to_string())?) - } - ci::NodeCommsRequest::FetchBlocksWithKernels(sigs) - }, - FetchBlocksWithUtxos(commitments) => { - let mut commits = Vec::new(); - for stxo in commitments.commitments { - commits.push(Commitment::try_from(stxo).map_err(|err: ByteArrayError| err.to_string())?) - } - ci::NodeCommsRequest::FetchBlocksWithUtxos(commits) - }, - GetHeaderByHash(hash) => ci::NodeCommsRequest::GetHeaderByHash(hash), - GetBlockByHash(hash) => ci::NodeCommsRequest::GetBlockByHash(hash), - GetNewBlockTemplate(message) => { - let request = GetNewBlockTemplateRequest { - algo: PowAlgorithm::try_from(message.algo)?, - max_weight: message.max_weight, - }; - ci::NodeCommsRequest::GetNewBlockTemplate(request) - }, - GetNewBlock(block_template) => ci::NodeCommsRequest::GetNewBlock(block_template.try_into()?), - FetchKernelByExcessSig(sig) => ci::NodeCommsRequest::FetchKernelByExcessSig( - Signature::try_from(sig).map_err(|err: ByteArrayError| err.to_string())?, - ), + FetchBlocksByHash(block_hashes) => ci::NodeCommsRequest::FetchBlocksByHash(block_hashes.outputs), }; Ok(request) } } -impl From for ProtoNodeCommsRequest { - fn from(request: ci::NodeCommsRequest) -> Self { +impl TryFrom for ProtoNodeCommsRequest { + type Error = String; + + fn try_from(request: ci::NodeCommsRequest) -> Result { use ci::NodeCommsRequest::*; match request { - GetChainMetadata => ProtoNodeCommsRequest::GetChainMetadata(true), - FetchHeaders(block_heights) => ProtoNodeCommsRequest::FetchHeaders(block_heights.into()), - FetchHeadersWithHashes(block_hashes) => ProtoNodeCommsRequest::FetchHeadersWithHashes(block_hashes.into()), - FetchHeadersAfter(hashes, stopping_hash) => { - ProtoNodeCommsRequest::FetchHeadersAfter(ProtoFetchHeadersAfter { hashes, stopping_hash }) - }, - FetchMatchingUtxos(hash_outputs) => ProtoNodeCommsRequest::FetchMatchingUtxos(hash_outputs.into()), - FetchMatchingTxos(hash_outputs) => ProtoNodeCommsRequest::FetchMatchingTxos(hash_outputs.into()), - FetchMatchingBlocks(block_heights) => ProtoNodeCommsRequest::FetchMatchingBlocks(block_heights.into()), - FetchBlocksWithHashes(block_hashes) => ProtoNodeCommsRequest::FetchBlocksWithHashes(block_hashes.into()), - FetchBlocksWithKernels(signatures) => { - let sigs = signatures.into_iter().map(Into::into).collect(); - ProtoNodeCommsRequest::FetchBlocksWithKernels(proto::Signatures { sigs }) - }, - FetchBlocksWithUtxos(commitments) => { - let commits = commitments.into_iter().map(Into::into).collect(); - ProtoNodeCommsRequest::FetchBlocksWithUtxos(proto::Commitments { commitments: commits }) - }, - GetHeaderByHash(hash) => ProtoNodeCommsRequest::GetHeaderByHash(hash), - GetBlockByHash(hash) => ProtoNodeCommsRequest::GetBlockByHash(hash), - GetNewBlockTemplate(request) => { - ProtoNodeCommsRequest::GetNewBlockTemplate(proto::NewBlockTemplateRequest { - algo: request.algo as u64, - max_weight: request.max_weight, - }) - }, - GetNewBlock(block_template) => ProtoNodeCommsRequest::GetNewBlock(block_template.into()), - FetchKernelByExcessSig(signature) => ProtoNodeCommsRequest::FetchKernelByExcessSig(signature.into()), + FetchBlocksByHash(block_hashes) => Ok(ProtoNodeCommsRequest::FetchBlocksByHash(block_hashes.into())), + e => Err(format!("{} request is not supported", e)), } } } diff --git a/base_layer/core/src/base_node/proto/response.proto b/base_layer/core/src/base_node/proto/response.proto index fd581e3fef..985225f58a 100644 --- a/base_layer/core/src/base_node/proto/response.proto +++ b/base_layer/core/src/base_node/proto/response.proto @@ -10,30 +10,8 @@ package tari.base_node; message BaseNodeServiceResponse { uint64 request_key = 1; oneof response { - // Indicates a ChainMetadata response. - ChainMetadata chain_metadata = 2; - // Indicates a TransactionKernels response. - TransactionKernels transaction_kernels = 3; - // Indicates a BlockHeaders response. - BlockHeaders block_headers = 4; - // Indicates a TransactionOutputs response. - TransactionOutputs transaction_outputs = 5; // Indicates a HistoricalBlocks response. HistoricalBlocks historical_blocks = 6; - // Indicates a NewBlockTemplate response. - tari.core.NewBlockTemplate new_block_template = 7; - // Indicates a NewBlock response. - NewBlockResponse new_block = 8; - // Indicates a TargetDifficulty response. - uint64 target_difficulty = 9; - // Block headers in range response - BlockHeaders fetch_headers_after_response = 10; - // Indicates a MmrNodes response - MmrNodes MmrNodes = 12; - // A single header response - BlockHeaderResponse block_header = 14; - // A single historical block response - HistoricalBlockResponse historical_block = 15; } bool is_synced = 13; } diff --git a/base_layer/core/src/base_node/proto/response.rs b/base_layer/core/src/base_node/proto/response.rs index 459bed3b11..93fb3f7d0c 100644 --- a/base_layer/core/src/base_node/proto/response.rs +++ b/base_layer/core/src/base_node/proto/response.rs @@ -24,15 +24,12 @@ pub use crate::proto::base_node::base_node_service_response::Response as ProtoNo use crate::{ base_node::comms_interface as ci, blocks::{BlockHeader, HistoricalBlock}, - proof_of_work::Difficulty, proto, proto::{ base_node as base_node_proto, base_node::{ BlockHeaders as ProtoBlockHeaders, HistoricalBlocks as ProtoHistoricalBlocks, - MmrNodes as ProtoMmrNodes, - NewBlockResponse as ProtoNewBlockResponse, TransactionKernels as ProtoTransactionKernels, TransactionOutputs as ProtoTransactionOutputs, }, @@ -41,7 +38,7 @@ use crate::{ tari_utilities::convert::try_convert_all, }; use std::{ - convert::TryInto, + convert::{TryFrom, TryInto}, iter::{FromIterator, Iterator}, }; @@ -51,81 +48,28 @@ impl TryInto for ProtoNodeCommsResponse { fn try_into(self) -> Result { use ProtoNodeCommsResponse::*; let response = match self { - ChainMetadata(chain_metadata) => ci::NodeCommsResponse::ChainMetadata(chain_metadata.try_into()?), - TransactionKernels(kernels) => { - let kernels = try_convert_all(kernels.kernels)?; - ci::NodeCommsResponse::TransactionKernels(kernels) - }, - BlockHeaders(headers) => { - let headers = try_convert_all(headers.headers)?; - ci::NodeCommsResponse::BlockHeaders(headers) - }, - BlockHeader(header) => ci::NodeCommsResponse::BlockHeader(header.try_into()?), - HistoricalBlock(block) => ci::NodeCommsResponse::HistoricalBlock(Box::new(block.try_into()?)), - FetchHeadersAfterResponse(headers) => { - let headers = try_convert_all(headers.headers)?; - ci::NodeCommsResponse::FetchHeadersAfterResponse(headers) - }, - TransactionOutputs(outputs) => { - let outputs = try_convert_all(outputs.outputs)?; - ci::NodeCommsResponse::TransactionOutputs(outputs) - }, HistoricalBlocks(blocks) => { let blocks = try_convert_all(blocks.blocks)?; ci::NodeCommsResponse::HistoricalBlocks(blocks) }, - NewBlockTemplate(block_template) => ci::NodeCommsResponse::NewBlockTemplate(block_template.try_into()?), - NewBlock(block) => ci::NodeCommsResponse::NewBlock { - success: block.success, - error: Some(block.error), - block: match block.block { - Some(b) => Some(b.try_into()?), - None => None, - }, - }, - TargetDifficulty(difficulty) => ci::NodeCommsResponse::TargetDifficulty(Difficulty::from(difficulty)), - MmrNodes(response) => ci::NodeCommsResponse::MmrNodes(response.added, response.deleted), }; Ok(response) } } -impl From for ProtoNodeCommsResponse { - fn from(response: ci::NodeCommsResponse) -> Self { +impl TryFrom for ProtoNodeCommsResponse { + type Error = String; + + fn try_from(response: ci::NodeCommsResponse) -> Result { use ci::NodeCommsResponse::*; match response { - ChainMetadata(chain_metadata) => ProtoNodeCommsResponse::ChainMetadata(chain_metadata.into()), - TransactionKernels(kernels) => { - let kernels = kernels.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::TransactionKernels(kernels) - }, - BlockHeaders(headers) => { - let block_headers = headers.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::BlockHeaders(block_headers) - }, - BlockHeader(header) => ProtoNodeCommsResponse::BlockHeader(header.into()), - HistoricalBlock(block) => ProtoNodeCommsResponse::HistoricalBlock((*block).into()), - FetchHeadersAfterResponse(headers) => { - let block_headers = headers.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::FetchHeadersAfterResponse(block_headers) - }, - TransactionOutputs(outputs) => { - let outputs = outputs.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::TransactionOutputs(outputs) - }, HistoricalBlocks(historical_blocks) => { let historical_blocks = historical_blocks.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::HistoricalBlocks(historical_blocks) + Ok(ProtoNodeCommsResponse::HistoricalBlocks(historical_blocks)) }, - NewBlockTemplate(block_template) => ProtoNodeCommsResponse::NewBlockTemplate(block_template.into()), - NewBlock { success, error, block } => ProtoNodeCommsResponse::NewBlock(ProtoNewBlockResponse { - success, - error: error.unwrap_or_else(|| "".to_string()), - block: block.map(|b| b.into()), - }), - TargetDifficulty(difficulty) => ProtoNodeCommsResponse::TargetDifficulty(difficulty.as_u64()), - MmrNodes(added, deleted) => ProtoNodeCommsResponse::MmrNodes(ProtoMmrNodes { added, deleted }), + // This would only occur if a programming error sent out the unsupported response + resp => Err(format!("Response not supported {:?}", resp)), } } } diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index 1d66cbf1b1..c556d66704 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -36,7 +36,7 @@ use crate::{ blocks::{Block, NewBlock}, chain_storage::BlockchainBackend, proto as shared_protos, - proto::{base_node as proto, base_node::base_node_service_request::Request}, + proto::base_node as proto, }; use futures::{pin_mut, stream::StreamExt, Stream}; use log::*; @@ -400,7 +400,7 @@ async fn handle_incoming_request( let message = proto::BaseNodeServiceResponse { request_key: inner_msg.request_key, - response: Some(response.into()), + response: Some(response.try_into().map_err(BaseNodeServiceError::InvalidResponse)?), is_synced, }; @@ -501,7 +501,7 @@ async fn handle_outbound_request( let request_key = generate_request_key(&mut OsRng); let service_request = proto::BaseNodeServiceRequest { request_key, - request: Some(request.into()), + request: Some(request.try_into().map_err(CommsInterfaceError::ApiError)?), }; let mut send_msg_params = SendMessageParams::new(); @@ -533,39 +533,14 @@ async fn handle_outbound_request( // Wait for matching responses to arrive waiting_requests.insert(request_key, reply_tx).await; // Spawn timeout for waiting_request - if let Some(r) = service_request.request.clone() { - match r { - Request::FetchMatchingBlocks(_) | - Request::FetchBlocksWithHashes(_) | - Request::FetchBlocksWithKernels(_) | - Request::FetchBlocksWithUtxos(_) => { - trace!( - target: LOG_TARGET, - "Timeout for service request FetchBlocks... ({}) set at {:?}", - request_key, - config.fetch_blocks_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.fetch_blocks_timeout) - }, - Request::FetchMatchingUtxos(_) => { - trace!( - target: LOG_TARGET, - "Timeout for service request FetchMatchingUtxos ({}) set at {:?}", - request_key, - config.fetch_utxos_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.fetch_utxos_timeout) - }, - _ => { - trace!( - target: LOG_TARGET, - "Timeout for service request ... ({}) set at {:?}", - request_key, - config.service_request_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout) - }, - }; + if service_request.request.is_some() { + trace!( + target: LOG_TARGET, + "Timeout for service request ... ({}) set at {:?}", + request_key, + config.service_request_timeout + ); + spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout) }; // Log messages let msg_tag = send_states[0].tag; diff --git a/base_layer/core/src/base_node/state_machine_service/initializer.rs b/base_layer/core/src/base_node/state_machine_service/initializer.rs index d731f6e0cd..b3dea2f6af 100644 --- a/base_layer/core/src/base_node/state_machine_service/initializer.rs +++ b/base_layer/core/src/base_node/state_machine_service/initializer.rs @@ -38,7 +38,6 @@ use crate::{ }, sync::SyncValidators, LocalNodeCommsInterface, - OutboundNodeCommsInterface, }, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, consensus::ConsensusManager, @@ -95,7 +94,6 @@ where B: BlockchainBackend + 'static let config = self.config.clone(); context.spawn_when_ready(move |handles| async move { - let outbound_interface = handles.expect_handle::(); let chain_metadata_service = handles.expect_handle::(); let node_local_interface = handles.expect_handle::(); let connectivity = handles.expect_handle::(); @@ -113,7 +111,6 @@ where B: BlockchainBackend + 'static let node = BaseNodeStateMachine::new( db, node_local_interface, - outbound_interface, connectivity, peer_manager, chain_metadata_service.get_event_stream(), diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index 35125655b3..f886bb8fe7 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -22,7 +22,7 @@ use crate::{ base_node::{ chain_metadata_service::ChainMetadataEvent, - comms_interface::{LocalNodeCommsInterface, OutboundNodeCommsInterface}, + comms_interface::LocalNodeCommsInterface, state_machine_service::{ states, states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus}, @@ -83,7 +83,6 @@ impl Default for BaseNodeStateMachineConfig { pub struct BaseNodeStateMachine { pub(super) db: AsyncBlockchainDb, pub(super) local_node_interface: LocalNodeCommsInterface, - pub(super) _outbound_nci: OutboundNodeCommsInterface, pub(super) connectivity: ConnectivityRequester, pub(super) peer_manager: Arc, pub(super) metadata_event_stream: broadcast::Receiver>, @@ -104,7 +103,6 @@ impl BaseNodeStateMachine { pub fn new( db: AsyncBlockchainDb, local_node_interface: LocalNodeCommsInterface, - outbound_nci: OutboundNodeCommsInterface, connectivity: ConnectivityRequester, peer_manager: Arc, metadata_event_stream: broadcast::Receiver>, @@ -119,7 +117,6 @@ impl BaseNodeStateMachine { Self { db, local_node_interface, - _outbound_nci: outbound_nci, connectivity, peer_manager, metadata_event_stream, diff --git a/base_layer/core/src/blocks/block_header.rs b/base_layer/core/src/blocks/block_header.rs index fb7aad681e..f8365104ae 100644 --- a/base_layer/core/src/blocks/block_header.rs +++ b/base_layer/core/src/blocks/block_header.rs @@ -164,14 +164,6 @@ impl BlockHeader { BlockBuilder::new(self.version).with_header(self) } - /// Returns a height range in descending order - pub fn get_height_range(start: u64, end_inclusive: u64) -> Vec { - let mut heights: Vec = - (std::cmp::min(start, end_inclusive)..=std::cmp::max(start, end_inclusive)).collect(); - heights.reverse(); - heights - } - /// Given a slice of headers (in reverse order), calculate the maximum, minimum and average periods between them pub fn timing_stats(headers: &[BlockHeader]) -> (u64, u64, f64) { if headers.len() < 2 { diff --git a/base_layer/core/src/iterators/chunk.rs b/base_layer/core/src/iterators/chunk.rs index 59044bc7c6..1d36e2f142 100644 --- a/base_layer/core/src/iterators/chunk.rs +++ b/base_layer/core/src/iterators/chunk.rs @@ -28,7 +28,7 @@ pub struct VecChunkIter { inner: NonOverlappingIntegerPairIter, } -impl VecChunkIter { +impl VecChunkIter { pub fn new(start: Idx, end_exclusive: Idx, chunk_size: usize) -> Self { Self { inner: NonOverlappingIntegerPairIter::new(start, end_exclusive, chunk_size), @@ -56,15 +56,21 @@ vec_chunk_impl!(usize); /// Iterator that produces non-overlapping integer pairs. pub struct NonOverlappingIntegerPairIter { current: Idx, + current_end: Idx, end: Idx, size: usize, } -impl NonOverlappingIntegerPairIter { +impl NonOverlappingIntegerPairIter { + /// Create a new iterator that emits non-overlapping integers. + /// + /// ## Panics + /// Panics if start > end_exclusive pub fn new(start: Idx, end_exclusive: Idx, chunk_size: usize) -> Self { assert!(start <= end_exclusive, "`start` must be less than `end`"); Self { current: start, + current_end: end_exclusive, end: end_exclusive, size: chunk_size, } @@ -80,20 +86,73 @@ macro_rules! non_overlapping_iter_impl { if self.size == 0 { return None; } + if self.current == <$ty>::MAX { + return None; + } + if self.current == self.end { + return None; + } + let size = self.size as $ty; + match self.current.checked_add(size) { + Some(next) => { + let next = cmp::min(next, self.end); + + if self.current == next { + return None; + } + let chunk = (self.current, next - 1); + self.current = next; + Some(chunk) + }, + None => { + let chunk = (self.current, <$ty>::MAX); + self.current = <$ty>::MAX; + Some(chunk) + }, + } + } + } + impl DoubleEndedIterator for NonOverlappingIntegerPairIter<$ty> { + fn next_back(&mut self) -> Option { + if self.size == 0 || self.current_end == 0 { + return None; + } + + let size = self.size as $ty; + // Is this the first iteration? + if self.end == self.current_end { + let rem = (self.end - self.current) % size; - let next = cmp::min(self.current + self.size as $ty, self.end); + // Would there be an overflow (if iterating from the forward to back) + if self.current_end.saturating_sub(rem).checked_add(size).is_none() { + self.current_end = self.current_end.saturating_sub(rem); + let chunk = (self.current_end, <$ty>::MAX); + return Some(chunk); + } - if self.current == next { + if rem > 0 { + self.current_end = self.end - rem; + let chunk = (self.current_end, self.end - 1); + return Some(chunk); + } + } + + // Check if end will go beyond start + if self.current_end == self.current { return None; } - let chunk = (self.current, next - 1); - self.current = next; + + let next = self.current_end.saturating_sub(size); + let chunk = (next, self.current_end - 1); + self.current_end = next; Some(chunk) } } }; } +non_overlapping_iter_impl!(u8); +non_overlapping_iter_impl!(u16); non_overlapping_iter_impl!(u32); non_overlapping_iter_impl!(u64); non_overlapping_iter_impl!(usize); @@ -101,6 +160,7 @@ non_overlapping_iter_impl!(usize); #[cfg(test)] mod test { use super::*; + use rand::{rngs::OsRng, Rng}; #[test] fn zero_size() { let mut iter = NonOverlappingIntegerPairIter::new(10u32, 10, 0); @@ -134,19 +194,26 @@ mod test { #[test] fn chunk_size_not_multiple_of_end() { - let mut iter = NonOverlappingIntegerPairIter::new(0u32, 11, 3); + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 10, 3); assert_eq!(iter.next().unwrap(), (0, 2)); assert_eq!(iter.next().unwrap(), (3, 5)); assert_eq!(iter.next().unwrap(), (6, 8)); - assert_eq!(iter.next().unwrap(), (9, 10)); + assert_eq!(iter.next().unwrap(), (9, 9)); assert!(iter.next().is_none()); - let mut iter = VecChunkIter::new(0u32, 11, 3); + let mut iter = VecChunkIter::new(0u32, 10, 3); assert_eq!(iter.next().unwrap(), vec![0, 1, 2]); assert_eq!(iter.next().unwrap(), vec![3, 4, 5]); assert_eq!(iter.next().unwrap(), vec![6, 7, 8]); - assert_eq!(iter.next().unwrap(), vec![9, 10]); + assert_eq!(iter.next().unwrap(), vec![9]); assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 16, 5); + assert_eq!(iter.next().unwrap(), (0, 4)); + assert_eq!(iter.next().unwrap(), (5, 9)); + assert_eq!(iter.next().unwrap(), (10, 14)); + assert_eq!(iter.next().unwrap(), (15, 15)); + assert_eq!(iter.next(), None); } #[test] @@ -164,4 +231,58 @@ mod test { assert_eq!(iter.next().unwrap(), vec![19, 20]); assert!(iter.next().is_none()); } + + #[test] + fn overflow() { + let mut iter = NonOverlappingIntegerPairIter::new(250u8, 255, 3); + assert_eq!(iter.next().unwrap(), (250, 252)); + assert_eq!(iter.next().unwrap(), (253, 255)); + assert!(iter.next().is_none()); + } + + #[test] + fn double_ended() { + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 9, 3).rev(); + assert_eq!(iter.next().unwrap(), (6, 8)); + assert_eq!(iter.next().unwrap(), (3, 5)); + assert_eq!(iter.next().unwrap(), (0, 2)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 10, 3).rev(); + assert_eq!(iter.next().unwrap(), (9, 9)); + assert_eq!(iter.next().unwrap(), (6, 8)); + assert_eq!(iter.next().unwrap(), (3, 5)); + assert_eq!(iter.next().unwrap(), (0, 2)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 16, 5).rev(); + assert_eq!(iter.next().unwrap(), (15, 15)); + assert_eq!(iter.next().unwrap(), (10, 14)); + assert_eq!(iter.next().unwrap(), (5, 9)); + assert_eq!(iter.next().unwrap(), (0, 4)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(1001u32, 4000, 1000).rev(); + assert_eq!(iter.next().unwrap(), (3001, 3999)); + assert_eq!(iter.next().unwrap(), (2001, 3000)); + assert_eq!(iter.next().unwrap(), (1001, 2000)); + assert!(iter.next().is_none()); + } + + #[test] + fn iterator_symmetry() { + let size = OsRng.gen_range(3usize..=10); + let rand_start = OsRng.gen::(); + let rand_end = OsRng.gen::().saturating_add(rand_start); + let iter_rev = NonOverlappingIntegerPairIter::new(rand_start, rand_end, size).rev(); + let iter = NonOverlappingIntegerPairIter::new(rand_start, rand_end, size); + assert_eq!( + iter.collect::>(), + iter_rev.collect::>().into_iter().rev().collect::>(), + "Failed with rand_start = {}, rand_end = {}, size = {}", + rand_start, + rand_end, + size + ); + } } diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 2850c90470..393a95e81a 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -34,9 +34,9 @@ use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{ base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, + comms_interface::OutboundNodeCommsInterface, service::{BaseNodeServiceConfig, BaseNodeServiceInitializer}, LocalNodeCommsInterface, - OutboundNodeCommsInterface, StateMachineHandle, }, chain_storage::{BlockchainDatabase, Validators}, diff --git a/base_layer/core/tests/node_comms_interface.rs b/base_layer/core/tests/node_comms_interface.rs index 759e1fd646..b5461f4f02 100644 --- a/base_layer/core/tests/node_comms_interface.rs +++ b/base_layer/core/tests/node_comms_interface.rs @@ -20,21 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; - -use futures::StreamExt; use helpers::block_builders::append_block; +use std::sync::Arc; use tari_common::configuration::Network; -use tari_common_types::{chain_metadata::ChainMetadata, types::PublicKey}; -use tari_comms::peer_manager::NodeId; +use tari_common_types::types::PublicKey; use tari_core::{ - base_node::{ - comms_interface::{CommsInterfaceError, InboundNodeCommsHandlers, NodeCommsRequest, NodeCommsResponse}, + base_node::comms_interface::{ + InboundNodeCommsHandlers, + NodeCommsRequest, + NodeCommsResponse, OutboundNodeCommsInterface, }, - blocks::{BlockBuilder, BlockHeader, HistoricalBlock}, chain_storage::{BlockchainDatabaseConfig, DbTransaction, Validators}, - consensus::{ConsensusManager, NetworkConsensus}, + consensus::ConsensusManager, mempool::{Mempool, MempoolConfig}, test_helpers::blockchain::{create_store_with_consensus_and_validators_and_config, create_test_blockchain_db}, transactions::{ @@ -53,7 +51,7 @@ use tari_crypto::{ script::TariScript, tari_utilities::hash::Hashable, }; -use tari_service_framework::{reply_channel, reply_channel::Receiver}; +use tari_service_framework::reply_channel; use tokio::sync::broadcast; use tari_core::test_helpers::create_consensus_rules; @@ -61,15 +59,6 @@ use tokio::sync::mpsc; #[allow(dead_code)] mod helpers; -// use crate::helpers::database::create_test_db; - -async fn test_request_responder( - receiver: &mut Receiver<(NodeCommsRequest, Option), Result>, - response: NodeCommsResponse, -) { - let req_context = receiver.next().await.unwrap(); - req_context.reply(Ok(response)).unwrap() -} fn new_mempool() -> Mempool { let rules = create_consensus_rules(); @@ -77,21 +66,6 @@ fn new_mempool() -> Mempool { Mempool::new(MempoolConfig::default(), rules, Arc::new(mempool_validator)) } -#[tokio::test] -async fn outbound_get_metadata() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let metadata = ChainMetadata::new(5, vec![0u8], 3, 0, 5); - let metadata_response = NodeCommsResponse::ChainMetadata(metadata.clone()); - let (received_metadata, _) = futures::join!( - outbound_nci.get_metadata(), - test_request_responder(&mut request_receiver, metadata_response) - ); - assert_eq!(received_metadata.unwrap(), metadata); -} - #[tokio::test] async fn inbound_get_metadata() { let store = create_test_blockchain_db(); @@ -155,24 +129,6 @@ async fn inbound_fetch_kernel_by_excess_sig() { } } -#[tokio::test] -async fn outbound_fetch_headers() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let mut header = BlockHeader::new(0); - header.height = 0; - let header_response = NodeCommsResponse::BlockHeaders(vec![header.clone()]); - let (received_headers, _) = futures::join!( - outbound_nci.fetch_headers(vec![0]), - test_request_responder(&mut request_receiver, header_response) - ); - let received_headers = received_headers.unwrap(); - assert_eq!(received_headers.len(), 1); - assert_eq!(received_headers[0], header); -} - #[tokio::test] async fn inbound_fetch_headers() { let store = create_test_blockchain_db(); @@ -192,41 +148,16 @@ async fn inbound_fetch_headers() { ); let header = store.fetch_block(0).unwrap().header().clone(); - if let Ok(NodeCommsResponse::BlockHeaders(received_headers)) = inbound_nch - .handle_request(NodeCommsRequest::FetchHeaders(vec![0])) - .await + if let Ok(NodeCommsResponse::BlockHeaders(received_headers)) = + inbound_nch.handle_request(NodeCommsRequest::FetchHeaders(0..=0)).await { assert_eq!(received_headers.len(), 1); - assert_eq!(received_headers[0], header); + assert_eq!(*received_headers[0].header(), header); } else { panic!(); } } -#[tokio::test] -async fn outbound_fetch_utxos() { - let factories = CryptoFactories::default(); - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let (utxo, _, _) = create_utxo( - MicroTari(10_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let hash = utxo.hash(); - let utxo_response = NodeCommsResponse::TransactionOutputs(vec![utxo.clone()]); - let (received_utxos, _) = futures::join!( - outbound_nci.fetch_utxos(vec![hash]), - test_request_responder(&mut request_receiver, utxo_response) - ); - let received_utxos = received_utxos.unwrap(); - assert_eq!(received_utxos.len(), 1); - assert_eq!(received_utxos[0], utxo); -} - #[tokio::test] async fn inbound_fetch_utxos() { let factories = CryptoFactories::default(); @@ -270,38 +201,6 @@ async fn inbound_fetch_utxos() { } } -#[tokio::test] -async fn outbound_fetch_txos() { - let factories = CryptoFactories::default(); - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let (txo1, _, _) = create_utxo( - MicroTari(10_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let (txo2, _, _) = create_utxo( - MicroTari(15_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let hash1 = txo1.hash(); - let hash2 = txo2.hash(); - let txo_response = NodeCommsResponse::TransactionOutputs(vec![txo1.clone(), txo2.clone()]); - let (received_txos, _) = futures::join!( - outbound_nci.fetch_txos(vec![hash1, hash2]), - test_request_responder(&mut request_receiver, txo_response) - ); - let received_txos = received_txos.unwrap(); - assert_eq!(received_txos.len(), 2); - assert_eq!(received_txos[0], txo1); - assert_eq!(received_txos[1], txo2); -} - #[tokio::test] async fn inbound_fetch_txos() { let factories = CryptoFactories::default(); @@ -372,25 +271,6 @@ async fn inbound_fetch_txos() { } } -#[tokio::test] -async fn outbound_fetch_blocks() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - let network = Network::LocalNet; - let consensus_constants = NetworkConsensus::from(network).create_consensus_constants(); - let gb = BlockBuilder::new(consensus_constants[0].blockchain_version()).build(); - let block = HistoricalBlock::new(gb, 0, Default::default(), vec![], 0); - let block_response = NodeCommsResponse::HistoricalBlocks(vec![block.clone()]); - let (received_blocks, _) = futures::join!( - outbound_nci.fetch_blocks(vec![0]), - test_request_responder(&mut request_receiver, block_response) - ); - let received_blocks = received_blocks.unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0], block); -} - #[tokio::test] async fn inbound_fetch_blocks() { let store = create_test_blockchain_db(); @@ -411,7 +291,7 @@ async fn inbound_fetch_blocks() { let block = store.fetch_block(0).unwrap().block().clone(); if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![0])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks(0..=0)) .await { assert_eq!(received_blocks.len(), 1); @@ -491,7 +371,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { let _block5 = append_block(&store, &block4, vec![], &consensus_manager, 1.into()).unwrap(); if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![1])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks(1..=1)) .await { assert_eq!(received_blocks.len(), 1); @@ -501,7 +381,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { } if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![2])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks(2..=2)) .await { assert_eq!(received_blocks.len(), 1); diff --git a/base_layer/core/tests/node_service.rs b/base_layer/core/tests/node_service.rs index 2333cb8ebf..3b0ca36393 100644 --- a/base_layer/core/tests/node_service.rs +++ b/base_layer/core/tests/node_service.rs @@ -24,21 +24,9 @@ mod helpers; use crate::helpers::block_builders::{construct_chained_blocks, create_coinbase}; use helpers::{ - block_builders::{ - append_block, - chain_block, - create_genesis_block, - create_genesis_block_with_utxos, - generate_block, - }, + block_builders::{append_block, chain_block, create_genesis_block, create_genesis_block_with_utxos}, event_stream::event_stream_next, - nodes::{ - create_network_with_2_base_nodes_with_config, - create_network_with_3_base_nodes_with_config, - random_node_identity, - wait_until_online, - BaseNodeBuilder, - }, + nodes::{create_network_with_2_base_nodes_with_config, random_node_identity, wait_until_online, BaseNodeBuilder}, }; use randomx_rs::RandomXFlag; use std::{sync::Arc, time::Duration}; @@ -72,154 +60,6 @@ use tari_p2p::services::liveness::LivenessConfig; use tari_test_utils::unpack_enum; use tempfile::tempdir; -#[tokio::test] -async fn request_response_get_metadata() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0) - .build(); - let (mut alice_node, bob_node, carol_node, _consensus_manager) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager, - temp_dir.path().to_str().unwrap(), - ) - .await; - - let received_metadata = alice_node.outbound_nci.get_metadata().await.unwrap(); - assert_eq!(received_metadata.height_of_longest_chain(), 0); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - -#[tokio::test] -async fn request_and_response_fetch_blocks() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0.clone()) - .build(); - let (mut alice_node, mut bob_node, carol_node, _) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager.clone(), - temp_dir.path().to_str().unwrap(), - ) - .await; - - let mut blocks = vec![block0]; - let db = &mut bob_node.blockchain_db; - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - - carol_node - .blockchain_db - .add_block(blocks[1].to_arc_block()) - .unwrap() - .assert_added(); - carol_node - .blockchain_db - .add_block(blocks[2].to_arc_block()) - .unwrap() - .assert_added(); - - let received_blocks = alice_node.outbound_nci.fetch_blocks(vec![0]).await.unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0].block(), blocks[0].block()); - - let received_blocks = alice_node.outbound_nci.fetch_blocks(vec![0, 1]).await.unwrap(); - assert_eq!(received_blocks.len(), 2); - assert_ne!(*received_blocks[0].block(), *received_blocks[1].block()); - assert!(received_blocks[0].block() == blocks[0].block() || received_blocks[1].block() == blocks[0].block()); - assert!(received_blocks[0].block() == blocks[1].block() || received_blocks[1].block() == blocks[1].block()); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - -#[tokio::test] -async fn request_and_response_fetch_blocks_with_hashes() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0.clone()) - .build(); - let (mut alice_node, mut bob_node, carol_node, _) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager.clone(), - temp_dir.path().to_str().unwrap(), - ) - .await; - - let mut blocks = vec![block0]; - let db = &mut bob_node.blockchain_db; - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - let block0_hash = blocks[0].hash(); - let block1_hash = blocks[1].hash(); - - carol_node - .blockchain_db - .add_block(blocks[1].to_arc_block()) - .unwrap() - .assert_added(); - carol_node - .blockchain_db - .add_block(blocks[2].to_arc_block()) - .unwrap() - .assert_added(); - - let received_blocks = alice_node - .outbound_nci - .fetch_blocks_with_hashes(vec![block0_hash.clone()]) - .await - .unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0].block(), blocks[0].block()); - - let received_blocks = alice_node - .outbound_nci - .fetch_blocks_with_hashes(vec![block0_hash.clone(), block1_hash.clone()]) - .await - .unwrap(); - assert_eq!(received_blocks.len(), 2); - assert_ne!(received_blocks[0], received_blocks[1]); - assert!(received_blocks[0].block() == blocks[0].block() || received_blocks[1].block() == blocks[0].block()); - assert!(received_blocks[0].block() == blocks[1].block() || received_blocks[1].block() == blocks[1].block()); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - #[tokio::test] async fn propagate_and_forward_many_valid_blocks() { let temp_dir = tempdir().unwrap(); @@ -583,9 +423,16 @@ async fn service_request_timeout() { ) .await; + let bob_node_id = bob_node.node_identity.node_id().clone(); // Bob should not be reachable bob_node.shutdown().await; - unpack_enum!(CommsInterfaceError::RequestTimedOut = alice_node.outbound_nci.get_metadata().await.unwrap_err()); + unpack_enum!( + CommsInterfaceError::RequestTimedOut = alice_node + .outbound_nci + .request_blocks_with_hashes_from_peer(vec![], Some(bob_node_id)) + .await + .unwrap_err() + ); alice_node.shutdown().await; } diff --git a/base_layer/core/tests/node_state_machine.rs b/base_layer/core/tests/node_state_machine.rs index b7d11e7c17..a126a3c145 100644 --- a/base_layer/core/tests/node_state_machine.rs +++ b/base_layer/core/tests/node_state_machine.rs @@ -89,7 +89,6 @@ async fn test_listening_lagging() { let mut alice_state_machine = BaseNodeStateMachine::new( alice_node.blockchain_db.clone().into(), alice_node.local_nci.clone(), - alice_node.outbound_nci.clone(), alice_node.comms.connectivity(), alice_node.comms.peer_manager(), alice_node.chain_metadata_handle.get_event_stream(), @@ -148,7 +147,6 @@ async fn test_event_channel() { let state_machine = BaseNodeStateMachine::new( db.into(), node.local_nci.clone(), - node.outbound_nci.clone(), node.comms.connectivity(), node.comms.peer_manager(), mock.subscription(),