Skip to content

Commit

Permalink
refactor(mempool)!: optimisations,excess sig index,fix weight calc (#…
Browse files Browse the repository at this point in the history
…3691)

Description
---

- fix weighting calculation for metadata (integer division bug: `(∑v_i) / d != v_1 / d + v_2/d +.... v_n/d` unless all `v`s are multiples of `d` due to integer rounding)
- updated fee estimation in wallet
- add excess signature index in mempool (required for compact block propagation)
- account for multiple kernels in transactions in mempool
- many smaller mempool optimisations (clones, etc)
- remove blocking thread pool usage for mempool
- remove unused mempool event stream (fills up and never empties, consuming some memory)
- clean up some legacy code (from when base node contained the wallet and miner)
- updated tests

Because the block weight calculation changed, this is a breaking change only affecting full/near full blocks

Motivation and Context
---
A bug in the way metadata weights are calculated meant that a set of transactions can have a different total weight from a block containing those same transactions. After a correctly weighted full block is returned from get block template, the block validator could incorrectly reject a block once mined as exceeding maximum weight.

Create a many to many index between transactions and kernels in the mempool.

How Has This Been Tested?
---
Unit: Existing tests updated
Manual: Producing 3 blocks worth of transactions, mining and checking the block is accepted and the transactions removed from the mempool
  • Loading branch information
sdbondi authored Jan 12, 2022
1 parent 86d864c commit 75ad348
Show file tree
Hide file tree
Showing 57 changed files with 1,181 additions and 1,178 deletions.
22 changes: 7 additions & 15 deletions applications/tari_app_grpc/src/conversions/aggregate_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,14 @@ impl TryFrom<AggregateBody> for grpc::AggregateBody {
type Error = String;

fn try_from(source: AggregateBody) -> Result<Self, Self::Error> {
let (inputs, outputs, kernels) = source.dissolve();
Ok(Self {
inputs: source
.inputs()
.iter()
.map(|input| grpc::TransactionInput::try_from(input.clone()))
.collect::<Result<Vec<grpc::TransactionInput>, _>>()?,
outputs: source
.outputs()
.iter()
.map(|output| grpc::TransactionOutput::from(output.clone()))
.collect(),
kernels: source
.kernels()
.iter()
.map(|kernel| grpc::TransactionKernel::from(kernel.clone()))
.collect(),
inputs: inputs
.into_iter()
.map(grpc::TransactionInput::try_from)
.collect::<Result<Vec<_>, _>>()?,
outputs: outputs.into_iter().map(grpc::TransactionOutput::from).collect(),
kernels: kernels.into_iter().map(grpc::TransactionKernel::from).collect(),
})
}
}
Expand Down
20 changes: 19 additions & 1 deletion applications/tari_app_grpc/src/conversions/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};

use tari_common_types::transaction::{TransactionDirection, TransactionStatus, TxId};
use tari_core::transactions::transaction::Transaction;
Expand All @@ -41,6 +44,21 @@ impl TryFrom<Transaction> for grpc::Transaction {
}
}

impl TryFrom<Arc<Transaction>> for grpc::Transaction {
type Error = String;

fn try_from(source: Arc<Transaction>) -> Result<Self, Self::Error> {
match Arc::try_unwrap(source) {
Ok(tx) => tx.try_into(),
Err(tx) => Ok(Self {
offset: Vec::from(tx.offset.as_bytes()),
body: Some(tx.body.clone().try_into()?),
script_offset: Vec::from(tx.script_offset.as_bytes()),
}),
}
}
}

impl TryFrom<grpc::Transaction> for Transaction {
type Error = String;

Expand Down
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ async fn build_node_context(
cleanup_orphans_at_startup: bool,
) -> Result<BaseNodeContext, anyhow::Error> {
//---------------------------------- Blockchain --------------------------------------------//

debug!(
target: LOG_TARGET,
"Building base node context for {} network", config.network
);
let rules = ConsensusManager::builder(config.network).build();
let factories = CryptoFactories::default();
let randomx_factory = RandomXFactory::new(config.max_randomx_vms);
Expand Down Expand Up @@ -250,7 +253,7 @@ async fn build_node_context(
Box::new(TxInputAndMaturityValidator::new(blockchain_db.clone())),
Box::new(TxConsensusValidator::new(blockchain_db.clone())),
]);
let mempool = Mempool::new(MempoolConfig::default(), rules.clone(), Arc::new(mempool_validator));
let mempool = Mempool::new(MempoolConfig::default(), rules.clone(), Box::new(mempool_validator));

//---------------------------------- Base Node --------------------------------------------//
debug!(target: LOG_TARGET, "Creating base node state machine.");
Expand Down
8 changes: 2 additions & 6 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ use tari_app_utilities::consts;
use tari_common_types::types::{Commitment, PublicKey, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
comms_interface::{Broadcast, CommsInterfaceError},
LocalNodeCommsInterface,
StateMachineHandle,
},
base_node::{comms_interface::CommsInterfaceError, LocalNodeCommsInterface, StateMachineHandle},
blocks::{Block, BlockHeader, NewBlockTemplate},
chain_storage::{ChainStorageError, PrunedOutput},
consensus::{emission::Emission, ConsensusManager, NetworkConsensus},
Expand Down Expand Up @@ -722,7 +718,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {

let mut handler = self.node_service.clone();
let block_hash = handler
.submit_block(block, Broadcast::from(true))
.submit_block(block)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Expand Down
17 changes: 8 additions & 9 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,17 @@ impl wallet_server::Wallet for WalletGrpcServer {
request: Request<GetCoinbaseRequest>,
) -> Result<Response<GetCoinbaseResponse>, Status> {
let request = request.into_inner();

let mut tx_service = self.get_transaction_service();
let response = tx_service

let coinbase = tx_service
.generate_coinbase_transaction(request.reward.into(), request.fee.into(), request.height)
.await;
.await
.map_err(|err| Status::unknown(err.to_string()))?;

match response {
Ok(resp) => Ok(Response::new(GetCoinbaseResponse {
transaction: Some(resp.try_into().map_err(Status::internal)?),
})),
Err(err) => Err(Status::unknown(err.to_string())),
}
let coinbase = coinbase.try_into().map_err(Status::internal)?;
Ok(Response::new(GetCoinbaseResponse {
transaction: Some(coinbase),
}))
}

async fn send_sha_atomic_swap_transaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,24 @@ impl TransactionsTab {
.constraints([pending_constraint, completed_constraint].as_ref())
.split(area);

self.draw_pending_transactions(f, list_areas[0], app_state);
self.draw_completed_transactions(f, list_areas[1], app_state);
}

fn draw_pending_transactions<B>(&mut self, f: &mut Frame<B>, area: Rect, app_state: &AppState)
where B: Backend {
let style = if self.selected_tx_list == SelectedTransactionList::PendingTxs {
Style::default().fg(Color::Magenta).add_modifier(Modifier::BOLD)
} else {
Style::default().fg(Color::White).add_modifier(Modifier::BOLD)
};
let block = Block::default()
.borders(Borders::ALL)
.title(Span::styled("(P)ending Transactions", style));
f.render_widget(block, list_areas[0]);

self.draw_pending_transactions(f, list_areas[0], app_state);
self.draw_completed_transactions(f, list_areas[1], app_state);
}
let title = Block::default().borders(Borders::ALL).title(Span::styled(
format!("(P)ending Transactions ({}) ", app_state.get_pending_txs().len()),
style,
));
f.render_widget(title, area);

fn draw_pending_transactions<B>(&mut self, f: &mut Frame<B>, area: Rect, app_state: &AppState)
where B: Backend {
// Pending Transactions
self.pending_list_state.set_num_items(app_state.get_pending_txs().len());
let mut pending_list_state = self
Expand Down Expand Up @@ -155,9 +157,10 @@ impl TransactionsTab {
} else {
Style::default().fg(Color::White).add_modifier(Modifier::BOLD)
};
let block = Block::default()
.borders(Borders::ALL)
.title(Span::styled("Completed (T)ransactions", style));
let block = Block::default().borders(Borders::ALL).title(Span::styled(
format!("Completed (T)ransactions ({}) ", app_state.get_completed_txs().len()),
style,
));
f.render_widget(block, area);

let completed_txs = app_state.get_completed_txs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl ChainMetadataService {
/// Handle BlockEvents
async fn handle_block_event(&mut self, event: &BlockEvent) -> Result<(), ChainMetadataSyncError> {
match event {
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_), _) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }, _) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) |
BlockEvent::BlockSyncComplete(_) => {
self.update_liveness_chain_metadata().await?;
},
Expand Down
72 changes: 27 additions & 45 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
fmt::{Display, Error, Formatter},
sync::Arc,
};
use std::sync::Arc;

use log::*;
use strum_macros::Display;
Expand All @@ -44,7 +41,7 @@ use crate::{
blocks::{Block, BlockHeader, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, PrunedOutput},
consensus::{ConsensusConstants, ConsensusManager},
mempool::{async_mempool, Mempool},
mempool::Mempool,
proof_of_work::{Difficulty, PowAlgorithm},
transactions::transaction::TransactionKernel,
};
Expand All @@ -56,42 +53,12 @@ const MAX_HEADERS_PER_RESPONSE: u32 = 100;
/// Broadcast is to notify subscribers if this is a valid propagated block event
#[derive(Debug, Clone, Display)]
pub enum BlockEvent {
ValidBlockAdded(Arc<Block>, BlockAddResult, Broadcast),
AddBlockFailed(Arc<Block>, Broadcast),
ValidBlockAdded(Arc<Block>, BlockAddResult),
AddBlockFailed(Arc<Block>),
BlockSyncComplete(Arc<ChainBlock>),
BlockSyncRewind(Vec<Arc<ChainBlock>>),
}

/// Used to notify if the block event is for a propagated block.
#[derive(Debug, Clone, Copy)]
pub struct Broadcast(bool);

impl Broadcast {
#[inline]
pub fn is_true(&self) -> bool {
self.0
}
}

#[allow(clippy::identity_op)]
impl Display for Broadcast {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
write!(f, "Broadcast[{}]", self.0)
}
}

impl From<Broadcast> for bool {
fn from(v: Broadcast) -> Self {
v.0
}
}

impl From<bool> for Broadcast {
fn from(v: bool) -> Self {
Broadcast(v)
}
}

/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
pub struct InboundNodeCommsHandlers<T> {
block_event_sender: BlockEventSender,
Expand Down Expand Up @@ -341,7 +308,13 @@ where T: BlockchainBackend + 'static
request.max_weight
};

let transactions = async_mempool::retrieve(self.mempool.clone(), asking_weight)
debug!(
target: LOG_TARGET,
"Fetching transactions with a maximum weight of {} for the template", asking_weight
);
let transactions = self
.mempool
.retrieve(asking_weight)
.await?
.into_iter()
.map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
Expand All @@ -350,7 +323,7 @@ where T: BlockchainBackend + 'static
debug!(
target: LOG_TARGET,
"Adding {} transaction(s) to new block template",
transactions.len()
transactions.len(),
);

let prev_hash = header.prev_hash.clone();
Expand All @@ -364,13 +337,23 @@ where T: BlockchainBackend + 'static
);
debug!(
target: LOG_TARGET,
"New block template requested at height {}", block_template.header.height,
"New block template requested at height {}, weight: {}",
block_template.header.height,
block_template.body.calculate_weight(constants.transaction_weight())
);
trace!(target: LOG_TARGET, "{}", block_template);
Ok(NodeCommsResponse::NewBlockTemplate(block_template))
},
NodeCommsRequest::GetNewBlock(block_template) => {
let block = self.blockchain_db.prepare_new_block(block_template).await?;
let constants = self.consensus_manager.consensus_constants(block.header.height);
debug!(
target: LOG_TARGET,
"Prepared new block from template (hash: {}, weight: {}, {})",
block.hash().to_hex(),
block.body.calculate_weight(constants.transaction_weight()),
block.body.to_counts_string()
);
Ok(NodeCommsResponse::NewBlock {
success: true,
error: None,
Expand Down Expand Up @@ -506,7 +489,7 @@ where T: BlockchainBackend + 'static

match block.pop() {
Some(block) => {
self.handle_block(Arc::new(block.try_into_block()?), true.into(), Some(source_peer))
self.handle_block(Arc::new(block.try_into_block()?), Some(source_peer))
.await?;
Ok(())
},
Expand All @@ -529,7 +512,6 @@ where T: BlockchainBackend + 'static
pub async fn handle_block(
&self,
block: Arc<Block>,
broadcast: Broadcast,
source_peer: Option<NodeId>,
) -> Result<BlockHash, CommsInterfaceError> {
let block_hash = block.hash();
Expand Down Expand Up @@ -560,9 +542,9 @@ where T: BlockchainBackend + 'static

self.blockchain_db.cleanup_orphans().await?;

self.publish_block_event(BlockEvent::ValidBlockAdded(block, block_add_result, broadcast));
self.publish_block_event(BlockEvent::ValidBlockAdded(block, block_add_result));

if should_propagate && broadcast.is_true() {
if should_propagate {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
Expand All @@ -582,7 +564,7 @@ where T: BlockchainBackend + 'static
block_hash.to_hex(),
e
);
self.publish_block_event(BlockEvent::AddBlockFailed(block, broadcast));
self.publish_block_event(BlockEvent::AddBlockFailed(block));
Err(CommsInterfaceError::ChainStorageError(e))
},
}
Expand Down
11 changes: 5 additions & 6 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
comms_request::GetNewBlockTemplateRequest,
error::CommsInterfaceError,
BlockEvent,
Broadcast,
NodeCommsRequest,
NodeCommsResponse,
},
Expand All @@ -52,15 +51,15 @@ pub type BlockEventReceiver = broadcast::Receiver<Arc<BlockEvent>>;
#[derive(Clone)]
pub struct LocalNodeCommsInterface {
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<BlockHash, CommsInterfaceError>>,
block_sender: SenderService<Block, Result<BlockHash, CommsInterfaceError>>,
block_event_sender: BlockEventSender,
}

impl LocalNodeCommsInterface {
/// Construct a new LocalNodeCommsInterface with the specified SenderService.
pub fn new(
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<BlockHash, CommsInterfaceError>>,
block_sender: SenderService<Block, Result<BlockHash, CommsInterfaceError>>,
block_event_sender: BlockEventSender,
) -> Self {
Self {
Expand Down Expand Up @@ -178,9 +177,9 @@ impl LocalNodeCommsInterface {
}
}

/// Submit a block to the base node service. Internal_only flag will prevent propagation.
pub async fn submit_block(&mut self, block: Block, propagate: Broadcast) -> Result<BlockHash, CommsInterfaceError> {
self.block_sender.call((block, propagate)).await?
/// Submit a block to the base node service.
pub async fn submit_block(&mut self, block: Block) -> Result<BlockHash, CommsInterfaceError> {
self.block_sender.call(block).await?
}

pub fn publish_block_event(&self, event: BlockEvent) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/comms_interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod error;
pub use error::CommsInterfaceError;

mod inbound_handlers;
pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers};
pub use inbound_handlers::{BlockEvent, InboundNodeCommsHandlers};

mod local_interface;
pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface};
Expand Down
Loading

0 comments on commit 75ad348

Please sign in to comment.