Skip to content

Commit

Permalink
feat!: add less aggressive txn cancelling (#3904)
Browse files Browse the repository at this point in the history
Description
---
- Added less aggressive transaction canceling, i.e. removed automatic transaction cancelling, when:
  - a wallet has difficulty discovering the receiving party (for direct P2P comms);
  - a wallet has difficulty discovering base node peers that could be used to send the transaction via store and forward;
  - tor is not available.
- Updated the wallet ffi with the new callbacks.
- Added a cucumber test to test sending transactions while offline.

Users will now manually cancel transactions stuck in pending due to the wallet continuing to be being offline.

_**Note:** Wallet ffi interface changed._

Motivation and Context
---
During a recent semi-stress test [[report here](https://github.com/tari-project/tari-data-analysis/blob/master/reports/stress_tests/20211221-make-it-rain/Stress%20test%20of%2020211221%20-%20analysis.md)] adverse connectivity and/or discovery symptoms caused ~55% of transactions to be abandoned. Attempted transactions should not be canceled if they failed to be sent both directly and via store and forward; a peer-to-peer connection may fail for valid reasons but failing to submit a transaction to a base node for store and forward delivery does not warrant the transaction to be canceled.

How Has This Been Tested?
---
System-level testing
Unit tests
Cucumber test: `Scenario: Wallet send transactions while offline`
  • Loading branch information
hansieodendaal authored Apr 1, 2022
1 parent cbf75ca commit 40bedde
Show file tree
Hide file tree
Showing 35 changed files with 1,288 additions and 639 deletions.
20 changes: 10 additions & 10 deletions applications/ffi_client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,9 @@ try {
console.log("txFauxUnconfirmed: ", ptr, confirmations);
}
);
// callback_direct_send_result: unsafe extern "C" fn(c_ulonglong, bool),
const directSendResult = ffi.Callback("void", [u64, bool], function (i, j) {
console.log("directSendResult: ", i, j);
});
// callback_store_and_forward_send_result: unsafe extern "C" fn(c_ulonglong, bool),
const safResult = ffi.Callback("void", [u64, bool], function (i, j) {
console.log("safResult: ", i, j);
// callback_transaction_send_result: unsafe extern "C" fn(c_ulonglong, *mut TariTransactionSendStatus),
const transactionSendResult = ffi.Callback("void", [u64, ["pointer"]], function (i, ptr) {
console.log("transactionSendResult: ", i, ptr);
});
// callback_transaction_cancellation: unsafe extern "C" fn(*mut TariCompletedTransaction),
const txCancelled = ffi.Callback("void", ["pointer"], function (ptr) {
Expand All @@ -100,7 +96,7 @@ try {
const txoValidation = ffi.Callback("void", [u64, u8], function (i, j) {
console.log("txoValidation: ", i, j);
});
// callback_contacts_liveness_data_updated: unsafe extern "C" fn(*mut ContactsLivenessData),
// callback_contacts_liveness_data_updated: unsafe extern "C" fn(*mut TariContactsLivenessData),
const contactsLivenessDataUpdated = ffi.Callback("void", ["pointer"], function (ptr) {
console.log("contactsLivenessDataUpdated: ", ptr);
});
Expand All @@ -116,6 +112,10 @@ try {
const safsReceived = ffi.Callback("void", [], function () {
console.log("safsReceived");
});
// callback_connectivity_status: unsafe extern "C" fn(),
const connectivityStatus = ffi.Callback("void", [u64], function () {
console.log("connectivityStatus");
});

console.log("Create Wallet...");
let wallet = lib.wallet_create(
Expand All @@ -133,14 +133,14 @@ try {
txMinedUnconfirmed,
txFauxConfirmed,
txFauxUnconfirmed,
directSendResult,
safResult,
transactionSendResult,
txCancelled,
txoValidation,
contactsLivenessDataUpdated,
balanceUpdated,
txValidation,
safsReceived,
connectivityStatus,
recoveryInProgress,
err
);
Expand Down
1 change: 0 additions & 1 deletion applications/ffi_client/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ const libWallet = ffi.Library("./libtari_wallet_ffi.dylib", {
fn,
fn,
fn,
fn,
bool,
errPtr,
],
Expand Down
19 changes: 9 additions & 10 deletions applications/ffi_client/recovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,9 @@ try {
console.log("txFauxUnconfirmed: ", ptr, confirmations);
}
);
// callback_direct_send_result: unsafe extern "C" fn(c_ulonglong, bool),
const directSendResult = ffi.Callback("void", [u64, bool], function (i, j) {
console.log("directSendResult: ", i, j);
});
// callback_store_and_forward_send_result: unsafe extern "C" fn(c_ulonglong, bool),
const safResult = ffi.Callback("void", [u64, bool], function (i, j) {
console.log("safResult: ", i, j);
});
// callback_transaction_send_result: unsafe extern "C" fn(c_ulonglong, *mut TariTransactionSendStatus),
const transactionSendResult = ffi.Callback("void", [u64, ["pointer"]], function (i, ptr) {
console.log("transactionSendResult: ", i, ptr);
// callback_transaction_cancellation: unsafe extern "C" fn(*mut TariCompletedTransaction),
const txCancelled = ffi.Callback("void", ["pointer"], function (ptr) {
console.log("txCancelled: ", ptr);
Expand All @@ -129,6 +124,10 @@ try {
const safsReceived = ffi.Callback("void", [], function () {
console.log("safsReceived");
});
// callback_connectivity_status: unsafe extern "C" fn(),
const connectivityStatus = ffi.Callback("void", [u64], function () {
console.log("connectivityStatus");
});

const recovery = ffi.Callback("void", [u64, u64], function (current, total) {
console.log("recovery scanning UTXOs: ", { current }, { total });
Expand Down Expand Up @@ -163,14 +162,14 @@ try {
txMinedUnconfirmed,
txFauxConfirmed,
txFauxUnconfirmed,
directSendResult,
safResult,
transactionSendResult,
txCancelled,
txoValidation,
contactsLivenessDataUpdated,
balanceUpdated,
txValidation,
safsReceived,
connectivityStatus,
recoveryInProgress,
err
);
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ enum TransactionStatus {
TRANSACTION_STATUS_FAUX_UNCONFIRMED = 9;
// All Imported and FauxUnconfirmed transactions will end up with this status when the outputs have been confirmed
TRANSACTION_STATUS_FAUX_CONFIRMED = 10;
// This transaction is still being queued for sending
TRANSACTION_STATUS_QUEUED = 11;
}

message GetCompletedTransactionsRequest { }
Expand Down
1 change: 1 addition & 0 deletions applications/tari_app_grpc/src/conversions/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl From<TransactionStatus> for grpc::TransactionStatus {
Rejected => grpc::TransactionStatus::Rejected,
FauxUnconfirmed => grpc::TransactionStatus::FauxUnconfirmed,
FauxConfirmed => grpc::TransactionStatus::FauxConfirmed,
Queued => grpc::TransactionStatus::Queued,
}
}
}
Expand Down
23 changes: 5 additions & 18 deletions applications/tari_console_wallet/src/automation/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,24 +549,11 @@ pub async fn monitor_transactions(
loop {
match event_stream.recv().await {
Ok(event) => match &*event {
TransactionEvent::TransactionDirectSendResult(id, success) if tx_ids.contains(id) => {
debug!(
target: LOG_TARGET,
"tx direct send event for tx_id: {}, success: {}", *id, success
);
if wait_stage == TransactionStage::DirectSendOrSaf {
results.push(SentTransaction {});
if results.len() == tx_ids.len() {
break;
}
}
},
TransactionEvent::TransactionStoreForwardSendResult(id, success) if tx_ids.contains(id) => {
debug!(
target: LOG_TARGET,
"tx store and forward event for tx_id: {}, success: {}", *id, success
);
if wait_stage == TransactionStage::DirectSendOrSaf {
TransactionEvent::TransactionSendResult(id, status) if tx_ids.contains(id) => {
debug!(target: LOG_TARGET, "tx send event for tx_id: {}, {}", *id, status);
if wait_stage == TransactionStage::DirectSendOrSaf &&
(status.direct_send_result || status.store_and_forward_send_result)
{
results.push(SentTransaction {});
if results.len() == tx_ids.len() {
break;
Expand Down
18 changes: 14 additions & 4 deletions applications/tari_console_wallet/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tokio::runtime::Handle;
pub const LOG_TARGET: &str = "wallet::notifier";
const RECEIVED: &str = "received";
const SENT: &str = "sent";
const QUEUED: &str = "queued";
const CONFIRMATION: &str = "confirmation";
const MINED: &str = "mined";
const CANCELLED: &str = "cancelled";
Expand Down Expand Up @@ -132,9 +133,18 @@ impl Notifier {
}
}

/// Trigger a notification that a pending transaction was sent.
pub fn transaction_sent(&self, tx_id: TxId) {
debug!(target: LOG_TARGET, "transaction_sent tx_id: {}", tx_id);
/// Trigger a notification that a pending transaction was sent or queued.
pub fn transaction_sent_or_queued(&self, tx_id: TxId, is_sent: bool) {
let event = if is_sent {
debug!(target: LOG_TARGET, "Transaction sent tx_id: {}", tx_id);
SENT
} else {
debug!(
target: LOG_TARGET,
"Transaction queued for further retry sending tx_id: {}", tx_id
);
QUEUED
};

if let Some(program) = self.path.clone() {
let mut transaction_service = self.wallet.transaction_service.clone();
Expand All @@ -143,7 +153,7 @@ impl Notifier {
match transaction_service.get_pending_outbound_transactions().await {
Ok(txs) => {
if let Some(tx) = txs.get(&tx_id) {
let args = args_from_outbound(tx, SENT);
let args = args_from_outbound(tx, event);
let result = Command::new(program).args(&args).output();
log(result);
} else {
Expand Down
25 changes: 25 additions & 0 deletions applications/tari_console_wallet/src/ui/components/send_tab.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_core::transactions::tari_amount::MicroTari;
use tari_utilities::hex::Hex;
use tari_wallet::tokens::Token;
Expand All @@ -21,6 +22,8 @@ use crate::ui::{
widgets::{draw_dialog, WindowedListState},
};

const LOG_TARGET: &str = "wallet::console_wallet::send_tab ";

pub struct SendTab {
balance: Balance,
send_input_mode: SendInputMode,
Expand All @@ -31,6 +34,7 @@ pub struct SendTab {
message_field: String,
error_message: Option<String>,
success_message: Option<String>,
offline_message: Option<String>,
contacts_list_state: WindowedListState,
send_result_watch: Option<watch::Receiver<UiTransactionSendStatus>>,
confirmation_dialog: Option<ConfirmationDialogType>,
Expand All @@ -50,6 +54,7 @@ impl SendTab {
message_field: String::new(),
error_message: None,
success_message: None,
offline_message: None,
contacts_list_state: WindowedListState::new(),
send_result_watch: None,
confirmation_dialog: None,
Expand Down Expand Up @@ -451,6 +456,7 @@ impl<B: Backend> Component<B> for SendTab {

let rx_option = self.send_result_watch.take();
if let Some(rx) = rx_option {
trace!(target: LOG_TARGET, "{:?}", (*rx.borrow()).clone());
let status = match (*rx.borrow()).clone() {
UiTransactionSendStatus::Initiated => "Initiated",
UiTransactionSendStatus::DiscoveryInProgress => "Discovery In Progress",
Expand All @@ -463,6 +469,14 @@ impl<B: Backend> Component<B> for SendTab {
Some("Transaction successfully sent!\nPlease press Enter to continue".to_string());
return;
},
UiTransactionSendStatus::Queued => {
self.offline_message = Some(
"This wallet appears to be offline; transaction queued for further retry sending.\n Please \
press Enter to continue"
.to_string(),
);
return;
},
UiTransactionSendStatus::TransactionComplete => {
self.success_message =
Some("Transaction completed successfully!\nPlease press Enter to continue".to_string());
Expand All @@ -485,6 +499,10 @@ impl<B: Backend> Component<B> for SendTab {
draw_dialog(f, area, "Success!".to_string(), msg, Color::Green, 120, 9);
}

if let Some(msg) = self.offline_message.clone() {
draw_dialog(f, area, "Offline!".to_string(), msg, Color::Green, 120, 9);
}

if let Some(msg) = self.error_message.clone() {
draw_dialog(f, area, "Error!".to_string(), msg, Color::Red, 120, 9);
}
Expand Down Expand Up @@ -531,6 +549,13 @@ impl<B: Backend> Component<B> for SendTab {
return;
}

if self.offline_message.is_some() {
if '\n' == c {
self.offline_message = None;
}
return;
}

if self.send_result_watch.is_some() {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ impl<B: Backend> Component<B> for TransactionsTab {

match c {
'p' => {
if let Err(e) = Handle::current().block_on(app_state.restart_transaction_protocols()) {
error!(target: LOG_TARGET, "Error rebroadcasting transactions: {}", e);
}
self.completed_list_state.select(None);
self.selected_tx_list = SelectedTransactionList::PendingTxs;
self.pending_list_state.set_num_items(app_state.get_pending_txs().len());
Expand Down
23 changes: 22 additions & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ impl AppState {
Ok(())
}

pub async fn restart_transaction_protocols(&mut self) -> Result<(), UiError> {
let inner = self.inner.write().await;
let mut tx_service = inner.wallet.transaction_service.clone();
tx_service.restart_transaction_protocols().await?;
Ok(())
}

pub fn get_identity(&self) -> &MyIdentity {
&self.cached_data.my_identity
}
Expand Down Expand Up @@ -857,6 +864,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
Expand All @@ -881,6 +889,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
Expand Down Expand Up @@ -922,6 +931,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_peer_custom = None;
Expand Down Expand Up @@ -956,6 +966,16 @@ impl AppStateInner {
});
}

pub fn spawn_restart_transaction_protocols_task(&mut self) {
let mut txn_service = self.wallet.transaction_service.clone();

task::spawn(async move {
if let Err(e) = txn_service.restart_transaction_protocols().await {
error!(target: LOG_TARGET, "Problem restarting transaction protocols: {}", e);
}
});
}

pub fn add_notification(&mut self, notification: String) {
self.data.notifications.push((Local::now(), notification));
self.data.new_notification_count += 1;
Expand Down Expand Up @@ -1173,9 +1193,10 @@ pub struct MyIdentity {
pub node_id: String,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum UiTransactionSendStatus {
Initiated,
Queued,
SentDirect,
TransactionComplete,
DiscoveryInProgress,
Expand Down
Loading

0 comments on commit 40bedde

Please sign in to comment.