Skip to content

Commit

Permalink
Update Tx Service to one-shot response for SendDirect(..) & fix OMS bug
Browse files Browse the repository at this point in the history
This PR updates the Tx Service to use the embedded oneshot reply channel to determine if a Direct message was sent or not rather than relying on the MessagingEvent stream.

It also fixes a bug in the Output Manager service where an Inbound transaction output was not fully encumbered so if the app restarted it would be cleared as a short term encuberance.

Finally some Clippy cleanup.
  • Loading branch information
philipr-za committed Apr 19, 2020
1 parent 383d506 commit 0e22f95
Show file tree
Hide file tree
Showing 18 changed files with 145 additions and 210 deletions.
1 change: 0 additions & 1 deletion applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,6 @@ async fn register_wallet_services(
.add_initializer(TransactionServiceInitializer::new(
TransactionServiceConfig::default(),
subscription_factory,
wallet_comms.message_event_sender(),
TransactionServiceSqliteDatabase::new(wallet_db_conn.clone()),
wallet_comms.node_identity(),
factories,
Expand Down
5 changes: 5 additions & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ where
.accept_incoming_pending_transaction(tx_id, amount, key.clone(), OutputFeatures::default())
.await?;

self.confirm_encumberance(tx_id).await?;
Ok(key)
}

Expand Down Expand Up @@ -654,6 +655,10 @@ where

/// Cancel a pending transaction and place the encumbered outputs back into the unspent pool
pub async fn cancel_transaction(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
trace!(
target: LOG_TARGET,
"Cancelling pending transaction outputs for TxId: tx_id"
);
Ok(self.db.cancel_pending_transaction_outputs(tx_id).await?)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl OutputManagerBackend for OutputManagerMemoryDatabase {
db.unspent_outputs.push(*o);
},
DbKeyValuePair::PendingTransactionOutputs(t, p) => {
db.pending_transactions.insert(t, *p);
db.short_term_pending_transactions.insert(t, *p);
},
DbKeyValuePair::KeyManagerState(km) => db.key_manager_state = Some(km),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ impl From<UpdateOutput> for UpdateOutputSql {

/// This struct represents a PendingTransactionOutputs in the Sql database. A distinct struct is required to define the
/// Sql friendly equivalent datatypes for the members.
#[derive(Clone, Queryable, Insertable)]
#[derive(Debug, Clone, Queryable, Insertable)]
#[table_name = "pending_transaction_outputs"]
struct PendingTransactionOutputSql {
tx_id: i64,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/testnet_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ pub fn generate_wallet_test_data<
MicroTari::from(100),
messages[message_index].clone(),
))?;
outbound_tx_ids.push(tx_id.clone());
outbound_tx_ids.push(tx_id);
message_index = (message_index + 1) % messages.len();

let tx_id = wallet.runtime.block_on(wallet.transaction_service.send_transaction(
Expand All @@ -334,7 +334,7 @@ pub fn generate_wallet_test_data<
MicroTari::from(110),
messages[message_index].clone(),
))?;
outbound_tx_ids.push(tx_id.clone());
outbound_tx_ids.push(tx_id);
message_index = (message_index + 1) % messages.len();

wallet_alice.runtime.block_on(async {
Expand Down
11 changes: 1 addition & 10 deletions base_layer/wallet/src/transaction_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
use futures::{future, Future, Stream, StreamExt};
use log::*;
use std::sync::Arc;
use tari_comms::{peer_manager::NodeIdentity, protocol::messaging::MessagingEventSender};
use tari_comms::peer_manager::NodeIdentity;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_core::{
base_node::proto::base_node as BaseNodeProto,
Expand Down Expand Up @@ -69,7 +69,6 @@ where T: TransactionBackend
{
config: TransactionServiceConfig,
subscription_factory: Arc<TopicSubscriptionFactory<TariMessageType, Arc<PeerMessage>>>,
message_event_receiver: Option<MessagingEventSender>,
backend: Option<T>,
node_identity: Arc<NodeIdentity>,
factories: CryptoFactories,
Expand All @@ -81,7 +80,6 @@ where T: TransactionBackend
pub fn new(
config: TransactionServiceConfig,
subscription_factory: Arc<TopicSubscriptionFactory<TariMessageType, Arc<PeerMessage>>>,
message_event_receiver: MessagingEventSender,
backend: T,
node_identity: Arc<NodeIdentity>,
factories: CryptoFactories,
Expand All @@ -90,7 +88,6 @@ where T: TransactionBackend
Self {
config,
subscription_factory,
message_event_receiver: Some(message_event_receiver),
backend: Some(backend),
node_identity,
factories,
Expand Down Expand Up @@ -165,11 +162,6 @@ where T: TransactionBackend + Clone + 'static
.take()
.expect("Cannot start Transaction Service without providing a backend");

let message_event_receiver = self
.message_event_receiver
.take()
.expect("Cannot start Transaction Service without providing an Message Event Receiver");

let node_identity = self.node_identity.clone();
let factories = self.factories.clone();
let config = self.config.clone();
Expand All @@ -195,7 +187,6 @@ where T: TransactionBackend + Clone + 'static
base_node_response_stream,
output_manager_service,
outbound_message_service,
message_event_receiver,
publisher,
node_identity,
factories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use crate::transaction_service::{
storage::database::{CompletedTransaction, OutboundTransaction, TransactionBackend, TransactionStatus},
};
use futures::channel::oneshot;
use tari_comms::{
protocol::messaging::{MessagingEvent, MessagingEventReceiver},
types::CommsPublicKey,
};
use tari_comms::{peer_manager::NodeId, types::CommsPublicKey};
use tari_comms_dht::{domain_message::OutboundDomainMessage, envelope::NodeDestination, outbound::OutboundEncryption};
use tari_core::transactions::{
tari_amount::MicroTari,
Expand All @@ -61,7 +58,6 @@ where TBackend: TransactionBackend + Clone + 'static
resources: TransactionServiceResources<TBackend>,
transaction_reply_receiver: Option<Receiver<(CommsPublicKey, RecipientSignedMessage)>>,
cancellation_receiver: Option<oneshot::Receiver<()>>,
message_send_event_receiver: Option<MessagingEventReceiver>,
dest_pubkey: CommsPublicKey,
amount: MicroTari,
message: String,
Expand All @@ -78,7 +74,6 @@ where TBackend: TransactionBackend + Clone + 'static
resources: TransactionServiceResources<TBackend>,
transaction_reply_receiver: Receiver<(CommsPublicKey, RecipientSignedMessage)>,
cancellation_receiver: oneshot::Receiver<()>,
message_send_event_receiver: MessagingEventReceiver,
dest_pubkey: CommsPublicKey,
amount: MicroTari,
message: String,
Expand All @@ -91,7 +86,6 @@ where TBackend: TransactionBackend + Clone + 'static
resources,
transaction_reply_receiver: Some(transaction_reply_receiver),
cancellation_receiver: Some(cancellation_receiver),
message_send_event_receiver: Some(message_send_event_receiver),
dest_pubkey,
amount,
message,
Expand All @@ -108,6 +102,8 @@ where TBackend: TransactionBackend + Clone + 'static
self.stage
);

// Only Send the transaction of the protocol stage is Initial. If the protocol is started in a later stage
// ignore this
if self.stage == TransactionProtocolStage::Initial {
self.send_transaction().await?;
}
Expand Down Expand Up @@ -216,7 +212,7 @@ where TBackend: TransactionBackend + Clone + 'static

self.resources
.db
.complete_outbound_transaction(tx_id.clone(), completed_transaction.clone())
.complete_outbound_transaction(tx_id, completed_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
info!(
Expand Down Expand Up @@ -261,7 +257,9 @@ where TBackend: TransactionBackend + Clone + 'static
.resources
.outbound_message_service
.propagate(
NodeDestination::from(self.dest_pubkey.clone()),
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(
Expand Down Expand Up @@ -346,13 +344,6 @@ where TBackend: TransactionBackend + Clone + 'static
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionDirectSendResult(tx_id, false)));
error!(target: LOG_TARGET, "Transaction Send directly to recipient failed");
},
Some(send_states) if send_states.len() == 1 => {
info!(
target: LOG_TARGET,
Expand All @@ -362,54 +353,36 @@ where TBackend: TransactionBackend + Clone + 'static
send_states[0].tag,
);
direct_send_success = true;
let message_tag = send_states[0].tag;

let event_publisher = self.resources.event_publisher.clone();
// Launch a task to monitor if the message gets sent
if let Some(mut message_event_receiver) = self.message_send_event_receiver.take() {
tokio::spawn(async move {
loop {
let (received_tag, success) = match message_event_receiver.next().await {
Some(read_item) => match read_item {
Ok(event) => match &*event {
MessagingEvent::MessageSent(message_tag) => (*message_tag, true),
MessagingEvent::SendMessageFailed(outbound_message, _reason) => {
(outbound_message.tag, false)
},
_ => continue,
},
Err(e) => {
error!(
target: LOG_TARGET,
"Error reading from message send event stream: {:?}", e
);
break;
},
},
None => {
error!(target: LOG_TARGET, "Error reading from message send event stream");
break;
},
};
if received_tag != message_tag {
continue;
}
tokio::spawn(async move {
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send process for TX_ID: {} was successful", tx_id
);
let _ = event_publisher
.send(Arc::new(TransactionEvent::TransactionDirectSendResult(tx_id, true)));
},
false => {
error!(
target: LOG_TARGET,
"Direct Send process for TX_ID: {} was unsuccessful and no message was sent", tx_id
);
let _ = event_publisher
.send(Arc::new(TransactionEvent::TransactionDirectSendResult(tx_id, success)));
break;
}
});
}
.send(Arc::new(TransactionEvent::TransactionDirectSendResult(tx_id, false)));
},
}
});
},
Some(_tags) => {
error!(
target: LOG_TARGET,
"Direct Send process for TX_ID: {} was unsuccessful and no message was sent", tx_id
);
_ => {
let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionDirectSendResult(tx_id, false)));
error!(target: LOG_TARGET, "Transaction Send message failed to send");
error!(target: LOG_TARGET, "Transaction Send Direct for TxID: {} failed", tx_id);
},
},
Err(e) => {
Expand All @@ -427,7 +400,9 @@ where TBackend: TransactionBackend + Clone + 'static
.resources
.outbound_message_service
.propagate(
NodeDestination::from(self.dest_pubkey.clone()),
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::SenderPartialTransaction, proto_message),
Expand Down Expand Up @@ -469,6 +444,12 @@ where TBackend: TransactionBackend + Clone + 'static
};

if !direct_send_success && !store_and_forward_send_success {
error!(
target: LOG_TARGET,
"Failed to Send Transaction (TxId: {}) both Directly or via Store and Forward. Pending Transaction \
will be cancelled",
tx_id
);
if let Err(e) = self.resources.output_manager_service.cancel_transaction(tx_id).await {
error!(
target: LOG_TARGET,
Expand Down
Loading

0 comments on commit 0e22f95

Please sign in to comment.