Skip to content

Commit

Permalink
fix(core): use compact inputs for block propagation (#4714)
Browse files Browse the repository at this point in the history
Description
---
- Send compact inputs when the full block is requested
- Only load full inputs when required

Motivation and Context
---
Full blocks are requested if any transactions are missing. This PR reduces the size of those full blocks.
This also improves slightly optimizes block sync by removing the need to load inputs for blocks that are being sent. 

How Has This Been Tested?
---
Modified existing rust integration test to include a single transaction, forcing reconcile block to request the full block and testing the fetching of full inputs from the local db.
  • Loading branch information
sdbondi authored Sep 27, 2022
1 parent 79ff23a commit c659275
Show file tree
Hide file tree
Showing 26 changed files with 377 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ impl TryFrom<TransactionInput> for grpc::TransactionInput {
.commitment()
.map_err(|_| "Non-compact Transaction input should contain commitment".to_string())?
.to_vec(),
hash: input
.canonical_hash()
.map_err(|_| "Non-compact Transaction input should be able to be hashed".to_string())?
.to_vec(),
hash: input.canonical_hash().to_vec(),

script: input
.script()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CommandContext {
io::stdout().flush().await?;
// we can only check till the pruning horizon, 0 is archive node so it needs to check every block.
if height > horizon_height {
match self.node_service.get_block(height).await {
match self.node_service.get_block(height, false).await {
Err(err) => {
// We need to check the data itself, as FetchMatchingBlocks will suppress any error, only
// logging it.
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/commands/command/get_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl CommandContext {
pub async fn get_block(&self, height: u64, format: Format) -> Result<(), Error> {
let block = self
.blockchain_db
.fetch_blocks(height..=height)
.fetch_blocks(height..=height, false)
.await?
.pop()
.ok_or(ArgsError::NotFoundAt { height })?;
Expand All @@ -90,7 +90,7 @@ impl CommandContext {
pub async fn get_block_by_hash(&self, hash: HashOutput, format: Format) -> Result<(), Error> {
let block = self
.blockchain_db
.fetch_block_by_hash(hash)
.fetch_block_by_hash(hash, false)
.await?
.ok_or(ArgsError::NotFound)?;
match format {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl CommandContext {

let block = self
.node_service
.get_block(height)
.get_block(height, true)
.await?
.ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?;

let prev_block = self
.node_service
.get_block(height - 1)
.get_block(height - 1, true)
.await?
.ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?;

Expand Down
6 changes: 3 additions & 3 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
for (start, end) in page_iter {
debug!(target: LOG_TARGET, "Page: {}-{}", start, end);
// TODO: Better error handling
let result_data = match handler.get_blocks(start..=end).await {
let result_data = match handler.get_blocks(start..=end, true).await {
Err(err) => {
warn!(target: LOG_TARGET, "Internal base node service error: {}", err);
return;
Expand Down Expand Up @@ -850,7 +850,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
task::spawn(async move {
let page_iter = NonOverlappingIntegerPairIter::new(start, end + 1, GET_BLOCKS_PAGE_SIZE);
for (start, end) in page_iter {
let blocks = match handler.get_blocks(start..=end).await {
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1455,7 +1455,7 @@ async fn get_block_group(

let (start, end) = get_heights(&height_request, handler.clone()).await?;

let blocks = match handler.get_blocks(start..=end).await {
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn do_recovery<D: BlockchainBackend + 'static>(
io::stdout().flush().unwrap();
trace!(target: LOG_TARGET, "Asking for block with height: {}", counter);
let block = source_database
.fetch_block(counter)
.fetch_block(counter, true)
.map_err(|e| anyhow!("Could not get block from recovery db: {}", e))?
.try_into_block()?;
trace!(target: LOG_TARGET, "Adding block: {}", block);
Expand Down
22 changes: 16 additions & 6 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ pub enum NodeCommsRequest {
FetchHeaders(RangeInclusive<u64>),
FetchHeadersByHashes(Vec<HashOutput>),
FetchMatchingUtxos(Vec<HashOutput>),
FetchMatchingBlocks(RangeInclusive<u64>),
FetchBlocksByHash(Vec<HashOutput>),
FetchMatchingBlocks {
range: RangeInclusive<u64>,
compact: bool,
},
FetchBlocksByHash {
block_hashes: Vec<HashOutput>,
compact: bool,
},
FetchBlocksByKernelExcessSigs(Vec<Signature>),
FetchBlocksByUtxos(Vec<Commitment>),
GetHeaderByHash(HashOutput),
GetBlockByHash(HashOutput),
GetNewBlockTemplate(GetNewBlockTemplateRequest),
GetNewBlock(NewBlockTemplate),
FetchKernelByExcessSig(Signature),
FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec<PrivateKey> },
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -75,10 +83,12 @@ impl Display for NodeCommsRequest {
},
FetchHeadersByHashes(v) => write!(f, "FetchHeadersByHashes (n={})", v.len()),
FetchMatchingUtxos(v) => write!(f, "FetchMatchingUtxos (n={})", v.len()),
FetchMatchingBlocks(range) => {
write!(f, "FetchMatchingBlocks ({:?})", range)
FetchMatchingBlocks { range, compact } => {
write!(f, "FetchMatchingBlocks ({:?}, {})", range, compact)
},
FetchBlocksByHash { block_hashes, compact } => {
write!(f, "FetchBlocksByHash (n={}, {})", block_hashes.len(), compact)
},
FetchBlocksByHash(v) => write!(f, "FetchBlocksByHash (n={})", v.len()),
FetchBlocksByKernelExcessSigs(v) => write!(f, "FetchBlocksByKernelExcessSigs (n={})", v.len()),
FetchBlocksByUtxos(v) => write!(f, "FetchBlocksByUtxos (n={})", v.len()),
GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()),
Expand Down
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_common_types::types::FixedHash;
use tari_comms_dht::outbound::DhtOutboundError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
Expand Down Expand Up @@ -67,4 +68,6 @@ pub enum CommsInterfaceError {
BlockError(#[from] BlockError),
#[error("Invalid request for {request}: {details}")]
InvalidRequest { request: &'static str, details: String },
#[error("Peer sent invalid full block {hash}: {details}")]
InvalidFullBlock { hash: FixedHash, details: String },
}
90 changes: 78 additions & 12 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::{
consensus::{ConsensusConstants, ConsensusManager},
mempool::Mempool,
proof_of_work::{Difficulty, PowAlgorithm},
transactions::aggregated_body::AggregateBody,
validation::helpers,
};

Expand Down Expand Up @@ -164,20 +165,20 @@ where B: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::TransactionOutputs(res))
},
NodeCommsRequest::FetchMatchingBlocks(range) => {
let blocks = self.blockchain_db.fetch_blocks(range).await?;
NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByHash(block_hashes) => {
NodeCommsRequest::FetchBlocksByHash { block_hashes, compact } => {
let mut blocks = Vec::with_capacity(block_hashes.len());
for block_hash in block_hashes {
let block_hex = block_hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", block_hex
"A peer has requested a block with hash {} (compact = {})", block_hex, compact
);

match self.blockchain_db.fetch_block_by_hash(block_hash).await {
match self.blockchain_db.fetch_block_by_hash(block_hash, compact).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -269,7 +270,7 @@ where B: BlockchainBackend + 'static
Ok(NodeCommsResponse::BlockHeader(header))
},
NodeCommsRequest::GetBlockByHash(hash) => {
let block = self.blockchain_db.fetch_block_by_hash(hash).await?;
let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
},
NodeCommsRequest::GetNewBlockTemplate(request) => {
Expand Down Expand Up @@ -422,7 +423,7 @@ where B: BlockchainBackend + 'static
&mut self,
source_peer: NodeId,
new_block: NewBlock,
) -> Result<Arc<Block>, CommsInterfaceError> {
) -> Result<Block, CommsInterfaceError> {
let NewBlock {
header,
coinbase_kernel,
Expand All @@ -436,7 +437,7 @@ where B: BlockchainBackend + 'static
.with_coinbase_utxo(coinbase_output, coinbase_kernel)
.with_header(header)
.build();
return Ok(Arc::new(block));
return Ok(block);
}

let block_hash = header.hash();
Expand All @@ -454,6 +455,7 @@ where B: BlockchainBackend + 'static
current_meta.best_block().to_hex(),
source_peer,
);
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
}
Expand Down Expand Up @@ -549,22 +551,22 @@ where B: BlockchainBackend + 'static
return Ok(block);
}

Ok(Arc::new(block))
Ok(block)
}

async fn request_full_block_from_peer(
&mut self,
source_peer: NodeId,
block_hash: BlockHash,
) -> Result<Arc<Block>, CommsInterfaceError> {
) -> Result<Block, CommsInterfaceError> {
let mut historical_block = self
.outbound_nci
.request_blocks_by_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

return match historical_block.pop() {
Some(block) => {
let block = Arc::new(block.try_into_block()?);
let block = block.try_into_block()?;
Ok(block)
},
None => {
Expand Down Expand Up @@ -600,7 +602,7 @@ where B: BlockchainBackend + 'static
/// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
pub async fn handle_block(
&mut self,
block: Arc<Block>,
block: Block,
source_peer: Option<NodeId>,
) -> Result<BlockHash, CommsInterfaceError> {
let block_hash = block.hash();
Expand All @@ -618,6 +620,8 @@ where B: BlockchainBackend + 'static
);
debug!(target: LOG_TARGET, "Incoming block: {}", block);
let timer = Instant::now();
let block = self.hydrate_block(block).await?;

let add_block_result = self.blockchain_db.add_block(block.clone()).await;
// Create block event on block event stream
match add_block_result {
Expand Down Expand Up @@ -691,6 +695,68 @@ where B: BlockchainBackend + 'static
}
}

async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
let block_hash = block.hash();
let block_height = block.header.height;
if block.body.inputs().is_empty() {
debug!(
target: LOG_TARGET,
"Block #{} ({}) contains no inputs so nothing to hydrate",
block_height,
block_hash.to_hex(),
);
return Ok(Arc::new(block));
}

let timer = Instant::now();
let (header, mut inputs, outputs, kernels) = block.dissolve();

let db = self.blockchain_db.inner().db_read_access()?;
for input in &mut inputs {
if !input.is_compact() {
continue;
}

let output_mined_info =
db.fetch_output(&input.output_hash())?
.ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
hash: block_hash,
details: format!("Output {} to be spent does not exist in db", input.output_hash()),
})?;

match output_mined_info.output {
PrunedOutput::Pruned { .. } => {
return Err(CommsInterfaceError::InvalidFullBlock {
hash: block_hash,
details: format!("Output {} to be spent is pruned", input.output_hash()),
});
},
PrunedOutput::NotPruned { output } => {
input.add_output_data(
output.version,
output.features,
output.commitment,
output.script,
output.sender_offset_public_key,
output.covenant,
output.encrypted_value,
output.minimum_value_promise,
);
},
}
}
debug!(
target: LOG_TARGET,
"Hydrated block #{} ({}) with {} input(s) in {:.2?}",
block_height,
block_hash.to_hex(),
inputs.len(),
timer.elapsed()
);
let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
Ok(Arc::new(block))
}

fn publish_block_event(&self, event: BlockEvent) {
if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
Expand Down
14 changes: 11 additions & 3 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ impl LocalNodeCommsInterface {
pub async fn get_blocks(
&mut self,
range: RangeInclusive<u64>,
compact: bool,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchMatchingBlocks(range))
.call(NodeCommsRequest::FetchMatchingBlocks { range, compact })
.await??
{
NodeCommsResponse::HistoricalBlocks(blocks) => Ok(blocks),
Expand All @@ -96,10 +97,17 @@ impl LocalNodeCommsInterface {
}

/// Request the block header at the given height
pub async fn get_block(&mut self, height: u64) -> Result<Option<HistoricalBlock>, CommsInterfaceError> {
pub async fn get_block(
&mut self,
height: u64,
compact: bool,
) -> Result<Option<HistoricalBlock>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchMatchingBlocks(height..=height))
.call(NodeCommsRequest::FetchMatchingBlocks {
range: height..=height,
compact,
})
.await??
{
NodeCommsResponse::HistoricalBlocks(mut blocks) => Ok(blocks.pop()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ impl OutboundNodeCommsInterface {
}
}

/// Fetch the Blocks corresponding to the provided block hashes from a specific base node. The requested blocks
/// could be chain blocks or orphan blocks.
/// Fetch the Blocks corresponding to the provided block hashes from a specific base node.
pub async fn request_blocks_by_hashes_from_peer(
&mut self,
block_hashes: Vec<BlockHash>,
node_id: Option<NodeId>,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
if let NodeCommsResponse::HistoricalBlocks(blocks) = self
.request_sender
.call((NodeCommsRequest::FetchBlocksByHash(block_hashes), node_id))
.call((
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
// We always request compact inputs from peer
compact: true,
},
node_id,
))
.await??
{
Ok(blocks)
Expand Down
Loading

0 comments on commit c659275

Please sign in to comment.