Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mempool)!: optimisations,excess sig index,fix weight calc #3691

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions applications/tari_app_grpc/src/conversions/aggregate_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,14 @@ impl TryFrom<AggregateBody> for grpc::AggregateBody {
type Error = String;

fn try_from(source: AggregateBody) -> Result<Self, Self::Error> {
let (inputs, outputs, kernels) = source.dissolve();
Ok(Self {
inputs: source
.inputs()
.iter()
.map(|input| grpc::TransactionInput::try_from(input.clone()))
.collect::<Result<Vec<grpc::TransactionInput>, _>>()?,
outputs: source
.outputs()
.iter()
.map(|output| grpc::TransactionOutput::from(output.clone()))
.collect(),
kernels: source
.kernels()
.iter()
.map(|kernel| grpc::TransactionKernel::from(kernel.clone()))
.collect(),
inputs: inputs
.into_iter()
.map(grpc::TransactionInput::try_from)
.collect::<Result<Vec<_>, _>>()?,
outputs: outputs.into_iter().map(grpc::TransactionOutput::from).collect(),
kernels: kernels.into_iter().map(grpc::TransactionKernel::from).collect(),
})
}
}
Expand Down
20 changes: 19 additions & 1 deletion applications/tari_app_grpc/src/conversions/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};

use tari_common_types::transaction::{TransactionDirection, TransactionStatus, TxId};
use tari_core::transactions::transaction::Transaction;
Expand All @@ -41,6 +44,21 @@ impl TryFrom<Transaction> for grpc::Transaction {
}
}

impl TryFrom<Arc<Transaction>> for grpc::Transaction {
type Error = String;

fn try_from(source: Arc<Transaction>) -> Result<Self, Self::Error> {
match Arc::try_unwrap(source) {
Ok(tx) => tx.try_into(),
Err(tx) => Ok(Self {
offset: Vec::from(tx.offset.as_bytes()),
body: Some(tx.body.clone().try_into()?),
script_offset: Vec::from(tx.script_offset.as_bytes()),
}),
}
}
}

impl TryFrom<grpc::Transaction> for Transaction {
type Error = String;

Expand Down
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ async fn build_node_context(
cleanup_orphans_at_startup: bool,
) -> Result<BaseNodeContext, anyhow::Error> {
//---------------------------------- Blockchain --------------------------------------------//

debug!(
target: LOG_TARGET,
"Building base node context for {} network", config.network
);
let rules = ConsensusManager::builder(config.network).build();
let factories = CryptoFactories::default();
let randomx_factory = RandomXFactory::new(config.max_randomx_vms);
Expand Down Expand Up @@ -250,7 +253,7 @@ async fn build_node_context(
Box::new(TxInputAndMaturityValidator::new(blockchain_db.clone())),
Box::new(TxConsensusValidator::new(blockchain_db.clone())),
]);
let mempool = Mempool::new(MempoolConfig::default(), rules.clone(), Arc::new(mempool_validator));
let mempool = Mempool::new(MempoolConfig::default(), rules.clone(), Box::new(mempool_validator));

//---------------------------------- Base Node --------------------------------------------//
debug!(target: LOG_TARGET, "Creating base node state machine.");
Expand Down
8 changes: 2 additions & 6 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ use tari_app_utilities::consts;
use tari_common_types::types::{Commitment, PublicKey, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
comms_interface::{Broadcast, CommsInterfaceError},
LocalNodeCommsInterface,
StateMachineHandle,
},
base_node::{comms_interface::CommsInterfaceError, LocalNodeCommsInterface, StateMachineHandle},
blocks::{Block, BlockHeader, NewBlockTemplate},
chain_storage::{ChainStorageError, PrunedOutput},
consensus::{emission::Emission, ConsensusManager, NetworkConsensus},
Expand Down Expand Up @@ -722,7 +718,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {

let mut handler = self.node_service.clone();
let block_hash = handler
.submit_block(block, Broadcast::from(true))
.submit_block(block)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Expand Down
17 changes: 8 additions & 9 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,17 @@ impl wallet_server::Wallet for WalletGrpcServer {
request: Request<GetCoinbaseRequest>,
) -> Result<Response<GetCoinbaseResponse>, Status> {
let request = request.into_inner();

let mut tx_service = self.get_transaction_service();
let response = tx_service

let coinbase = tx_service
.generate_coinbase_transaction(request.reward.into(), request.fee.into(), request.height)
.await;
.await
.map_err(|err| Status::unknown(err.to_string()))?;

match response {
Ok(resp) => Ok(Response::new(GetCoinbaseResponse {
transaction: Some(resp.try_into().map_err(Status::internal)?),
})),
Err(err) => Err(Status::unknown(err.to_string())),
}
let coinbase = coinbase.try_into().map_err(Status::internal)?;
Ok(Response::new(GetCoinbaseResponse {
transaction: Some(coinbase),
}))
}

async fn send_sha_atomic_swap_transaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,24 @@ impl TransactionsTab {
.constraints([pending_constraint, completed_constraint].as_ref())
.split(area);

self.draw_pending_transactions(f, list_areas[0], app_state);
self.draw_completed_transactions(f, list_areas[1], app_state);
}

fn draw_pending_transactions<B>(&mut self, f: &mut Frame<B>, area: Rect, app_state: &AppState)
where B: Backend {
let style = if self.selected_tx_list == SelectedTransactionList::PendingTxs {
Style::default().fg(Color::Magenta).add_modifier(Modifier::BOLD)
} else {
Style::default().fg(Color::White).add_modifier(Modifier::BOLD)
};
let block = Block::default()
.borders(Borders::ALL)
.title(Span::styled("(P)ending Transactions", style));
f.render_widget(block, list_areas[0]);

self.draw_pending_transactions(f, list_areas[0], app_state);
self.draw_completed_transactions(f, list_areas[1], app_state);
}
let title = Block::default().borders(Borders::ALL).title(Span::styled(
format!("(P)ending Transactions ({}) ", app_state.get_pending_txs().len()),
style,
));
f.render_widget(title, area);

fn draw_pending_transactions<B>(&mut self, f: &mut Frame<B>, area: Rect, app_state: &AppState)
where B: Backend {
// Pending Transactions
self.pending_list_state.set_num_items(app_state.get_pending_txs().len());
let mut pending_list_state = self
Expand Down Expand Up @@ -155,9 +157,10 @@ impl TransactionsTab {
} else {
Style::default().fg(Color::White).add_modifier(Modifier::BOLD)
};
let block = Block::default()
.borders(Borders::ALL)
.title(Span::styled("Completed (T)ransactions", style));
let block = Block::default().borders(Borders::ALL).title(Span::styled(
format!("Completed (T)ransactions ({}) ", app_state.get_completed_txs().len()),
style,
));
f.render_widget(block, area);

let completed_txs = app_state.get_completed_txs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl ChainMetadataService {
/// Handle BlockEvents
async fn handle_block_event(&mut self, event: &BlockEvent) -> Result<(), ChainMetadataSyncError> {
match event {
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_), _) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }, _) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) |
BlockEvent::BlockSyncComplete(_) => {
self.update_liveness_chain_metadata().await?;
},
Expand Down
72 changes: 27 additions & 45 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
fmt::{Display, Error, Formatter},
sync::Arc,
};
use std::sync::Arc;

use log::*;
use strum_macros::Display;
Expand All @@ -44,7 +41,7 @@ use crate::{
blocks::{Block, BlockHeader, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, PrunedOutput},
consensus::{ConsensusConstants, ConsensusManager},
mempool::{async_mempool, Mempool},
mempool::Mempool,
proof_of_work::{Difficulty, PowAlgorithm},
transactions::transaction::TransactionKernel,
};
Expand All @@ -56,42 +53,12 @@ const MAX_HEADERS_PER_RESPONSE: u32 = 100;
/// Broadcast is to notify subscribers if this is a valid propagated block event
#[derive(Debug, Clone, Display)]
pub enum BlockEvent {
ValidBlockAdded(Arc<Block>, BlockAddResult, Broadcast),
AddBlockFailed(Arc<Block>, Broadcast),
ValidBlockAdded(Arc<Block>, BlockAddResult),
AddBlockFailed(Arc<Block>),
BlockSyncComplete(Arc<ChainBlock>),
BlockSyncRewind(Vec<Arc<ChainBlock>>),
}

/// Used to notify if the block event is for a propagated block.
#[derive(Debug, Clone, Copy)]
pub struct Broadcast(bool);

impl Broadcast {
#[inline]
pub fn is_true(&self) -> bool {
self.0
}
}

#[allow(clippy::identity_op)]
impl Display for Broadcast {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
write!(f, "Broadcast[{}]", self.0)
}
}

impl From<Broadcast> for bool {
fn from(v: Broadcast) -> Self {
v.0
}
}

impl From<bool> for Broadcast {
fn from(v: bool) -> Self {
Broadcast(v)
}
}

/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
pub struct InboundNodeCommsHandlers<T> {
block_event_sender: BlockEventSender,
Expand Down Expand Up @@ -341,7 +308,13 @@ where T: BlockchainBackend + 'static
request.max_weight
};

let transactions = async_mempool::retrieve(self.mempool.clone(), asking_weight)
debug!(
target: LOG_TARGET,
"Fetching transactions with a maximum weight of {} for the template", asking_weight
);
let transactions = self
.mempool
.retrieve(asking_weight)
.await?
.into_iter()
.map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
Expand All @@ -350,7 +323,7 @@ where T: BlockchainBackend + 'static
debug!(
target: LOG_TARGET,
"Adding {} transaction(s) to new block template",
transactions.len()
transactions.len(),
);

let prev_hash = header.prev_hash.clone();
Expand All @@ -364,13 +337,23 @@ where T: BlockchainBackend + 'static
);
debug!(
target: LOG_TARGET,
"New block template requested at height {}", block_template.header.height,
"New block template requested at height {}, weight: {}",
block_template.header.height,
block_template.body.calculate_weight(constants.transaction_weight())
);
trace!(target: LOG_TARGET, "{}", block_template);
Ok(NodeCommsResponse::NewBlockTemplate(block_template))
},
NodeCommsRequest::GetNewBlock(block_template) => {
let block = self.blockchain_db.prepare_new_block(block_template).await?;
let constants = self.consensus_manager.consensus_constants(block.header.height);
debug!(
target: LOG_TARGET,
"Prepared new block from template (hash: {}, weight: {}, {})",
block.hash().to_hex(),
block.body.calculate_weight(constants.transaction_weight()),
block.body.to_counts_string()
);
Ok(NodeCommsResponse::NewBlock {
success: true,
error: None,
Expand Down Expand Up @@ -506,7 +489,7 @@ where T: BlockchainBackend + 'static

match block.pop() {
Some(block) => {
self.handle_block(Arc::new(block.try_into_block()?), true.into(), Some(source_peer))
self.handle_block(Arc::new(block.try_into_block()?), Some(source_peer))
.await?;
Ok(())
},
Expand All @@ -529,7 +512,6 @@ where T: BlockchainBackend + 'static
pub async fn handle_block(
&self,
block: Arc<Block>,
broadcast: Broadcast,
source_peer: Option<NodeId>,
) -> Result<BlockHash, CommsInterfaceError> {
let block_hash = block.hash();
Expand Down Expand Up @@ -560,9 +542,9 @@ where T: BlockchainBackend + 'static

self.blockchain_db.cleanup_orphans().await?;

self.publish_block_event(BlockEvent::ValidBlockAdded(block, block_add_result, broadcast));
self.publish_block_event(BlockEvent::ValidBlockAdded(block, block_add_result));

if should_propagate && broadcast.is_true() {
if should_propagate {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
Expand All @@ -582,7 +564,7 @@ where T: BlockchainBackend + 'static
block_hash.to_hex(),
e
);
self.publish_block_event(BlockEvent::AddBlockFailed(block, broadcast));
self.publish_block_event(BlockEvent::AddBlockFailed(block));
Err(CommsInterfaceError::ChainStorageError(e))
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
comms_request::GetNewBlockTemplateRequest,
error::CommsInterfaceError,
BlockEvent,
Broadcast,
NodeCommsRequest,
NodeCommsResponse,
},
Expand All @@ -52,15 +51,15 @@ pub type BlockEventReceiver = broadcast::Receiver<Arc<BlockEvent>>;
#[derive(Clone)]
pub struct LocalNodeCommsInterface {
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<BlockHash, CommsInterfaceError>>,
block_sender: SenderService<Block, Result<BlockHash, CommsInterfaceError>>,
block_event_sender: BlockEventSender,
}

impl LocalNodeCommsInterface {
/// Construct a new LocalNodeCommsInterface with the specified SenderService.
pub fn new(
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<BlockHash, CommsInterfaceError>>,
block_sender: SenderService<Block, Result<BlockHash, CommsInterfaceError>>,
block_event_sender: BlockEventSender,
) -> Self {
Self {
Expand Down Expand Up @@ -178,9 +177,9 @@ impl LocalNodeCommsInterface {
}
}

/// Submit a block to the base node service. Internal_only flag will prevent propagation.
pub async fn submit_block(&mut self, block: Block, propagate: Broadcast) -> Result<BlockHash, CommsInterfaceError> {
self.block_sender.call((block, propagate)).await?
/// Submit a block to the base node service.
pub async fn submit_block(&mut self, block: Block) -> Result<BlockHash, CommsInterfaceError> {
self.block_sender.call(block).await?
}

pub fn publish_block_event(&self, event: BlockEvent) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/comms_interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod error;
pub use error::CommsInterfaceError;

mod inbound_handlers;
pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers};
pub use inbound_handlers::{BlockEvent, InboundNodeCommsHandlers};

mod local_interface;
pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface};
Expand Down
Loading