Skip to content

Commit

Permalink
Implement transaction negotiation message resending in the wallet
Browse files Browse the repository at this point in the history
Currently the logic for sending transactions and negotiating them between parties is not very robust. Each stage in the negotiation is sent only once. When sending we first try to send the message directly and if that fails, or discovery needs to occur, the message is sent indirectly via Store and Forward (SAF). We hoped to get SAF 100% reliable but due to reasons laid out in tari-project#2137 it is not tenable. 

This PR adds in the logic to periodically resends the initial Transaction Sender Message and the Transaction Reply in order to illicit the next step in the negotiation protocol from the counterparty. The PR also adds the logic to respond to repeated messages but with a cool down period in which repeats will be ignored to protect against DoS attacks. 

The PR also adds in a new comms level message to inform the counterparty if a transaction protocol has been cancelled. Initially this is sent if the Sender cancels an in progress transaction so that the receiver knows to stop expecting it. This message is also sent if a Transaction Reply is received for a cancelled message to let the Received know about the cancellation.

In order to stop a wallet resending these message indefinitely a transaction will be cancelled if it hasn’t resolved after a long timeout period (Default is being set to 3 days).

A large portion of the PR are tests so its not as big as it looks.
  • Loading branch information
philipr-za committed Aug 31, 2020
1 parent 6ae1a54 commit b9c3931
Show file tree
Hide file tree
Showing 37 changed files with 3,075 additions and 960 deletions.
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
};
use tari_wallet::{
contacts_service::storage::database::Contact,
transaction_service::storage::database::CompletedTransaction,
transaction_service::storage::models::CompletedTransaction,
util::emoji::EmojiId,
};

Expand Down
26 changes: 25 additions & 1 deletion applications/tari_console_wallet/src/dummy_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tari_core::transactions::{
use tari_crypto::keys::{PublicKey as PublicKeyTrait, SecretKey as SecretKeyTrait};
use tari_wallet::{
contacts_service::storage::database::Contact,
transaction_service::storage::database::{
transaction_service::storage::models::{
CompletedTransaction,
InboundTransaction,
OutboundTransaction,
Expand All @@ -61,6 +61,8 @@ pub fn dummy_inbound_txs() -> Vec<InboundTransaction> {
.unwrap(),
cancelled: false,
direct_send_success: false,
send_count: 0,
last_send_timestamp: None,
});

inbound_txs.push(InboundTransaction {
Expand All @@ -78,6 +80,8 @@ pub fn dummy_inbound_txs() -> Vec<InboundTransaction> {
.unwrap(),
cancelled: false,
direct_send_success: false,
send_count: 0,
last_send_timestamp: None,
});

inbound_txs.push(InboundTransaction {
Expand All @@ -95,6 +99,8 @@ pub fn dummy_inbound_txs() -> Vec<InboundTransaction> {
.unwrap(),
cancelled: false,
direct_send_success: false,
send_count: 0,
last_send_timestamp: None,
});

inbound_txs.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());
Expand All @@ -121,6 +127,8 @@ pub fn dummy_outbound_txs() -> Vec<OutboundTransaction> {
.unwrap(),
cancelled: false,
direct_send_success: false,
send_count: 0,
last_send_timestamp: None,
});

outbound_txs.push(OutboundTransaction {
Expand All @@ -139,6 +147,8 @@ pub fn dummy_outbound_txs() -> Vec<OutboundTransaction> {
.unwrap(),
cancelled: false,
direct_send_success: false,
send_count: 0,
last_send_timestamp: None,
});
outbound_txs.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());
outbound_txs
Expand All @@ -165,6 +175,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Outbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -185,6 +197,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Inbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -205,6 +219,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Inbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -225,6 +241,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Outbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -245,6 +263,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Outbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -265,6 +285,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Inbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});

completed_txs.push(CompletedTransaction {
Expand All @@ -285,6 +307,8 @@ pub fn dummy_completed_txs() -> Vec<CompletedTransaction> {
cancelled: false,
direction: TransactionDirection::Outbound,
coinbase_block_height: None,
send_count: 0,
last_send_timestamp: None,
});
completed_txs.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());

Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
utils::{formatting::display_compressed_string, widgets::MultiColumnList},
};
use tari_core::transactions::tari_amount::MicroTari;
use tari_wallet::transaction_service::storage::database::{TransactionDirection, TransactionStatus};
use tari_wallet::transaction_service::storage::models::{TransactionDirection, TransactionStatus};
use tui::{
backend::Backend,
layout::{Constraint, Direction, Layout, Rect},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pub struct TransactionFinalizedMessage {
pub transaction: ::std::option::Option<super::types::Transaction>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionCancelledMessage {
/// The transaction id for the cancelled transaction
#[prost(uint64, tag = "1")]
pub tx_id: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionMetadata {
/// The absolute fee for the transaction
#[prost(uint64, tag = "1")]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

package tari.transaction_protocol;

message TransactionCancelledMessage {
// The transaction id for the cancelled transaction
uint64 tx_id = 1;
}

1 change: 1 addition & 0 deletions base_layer/p2p/src/proto/message_type.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum TariMessageType {
TariMessageTypeMempoolRequest= 71;
TariMessageTypeMempoolResponse = 72;
TariMessageTypeTransactionFinalized = 73;
TariMessageTypeTransactionCancelled = 74;
// -- DAN Messages --

// -- Extended --
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/proto/tari.p2p.message_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ pub enum TariMessageType {
BaseNodeResponse = 70,
MempoolRequest = 71,
MempoolResponse = 72,
/// -- DAN Messages --
TransactionFinalized = 73,
/// -- DAN Messages --
TransactionCancelled = 74,
// -- Extended --
Text = 225,
TextAck = 226,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
PRAGMA foreign_keys=off;
ALTER TABLE completed_transactions RENAME TO completed_transactions_old;
CREATE TABLE completed_transactions (
tx_id INTEGER PRIMARY KEY NOT NULL,
source_public_key BLOB NOT NULL,
destination_public_key BLOB NOT NULL,
amount INTEGER NOT NULL,
fee INTEGER NOT NULL,
transaction_protocol TEXT NOT NULL,
status INTEGER NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL,
cancelled INTEGER NOT NULL DEFAULT 0,
direction INTEGER NULL DEFAULT NULL,
coinbase_block_height INTEGER NULL DEFAULT NULL
);
INSERT INTO completed_transactions (tx_id, source_public_key, destination_public_key, amount, fee, transaction_protocol, status, message, timestamp, cancelled, direction, coinbase_block_height)
SELECT tx_id, source_public_key, destination_public_key, amount, fee, transaction_protocol, status, message, timestamp, cancelled, direction, coinbase_block_height
FROM completed_transactions_old;

DROP TABLE completed_transactions_old;

ALTER TABLE inbound_transactions RENAME TO inbound_transactions_old;
CREATE TABLE inbound_transactions (
tx_id INTEGER PRIMARY KEY NOT NULL,
source_public_key BLOB NOT NULL,
amount INTEGER NOT NULL,
receiver_protocol TEXT NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL,
cancelled INTEGER NOT NULL DEFAULT 0,
direct_send_success INTEGER NOT NULL DEFAULT 0
);
INSERT INTO inbound_transactions (tx_id, source_public_key, amount, receiver_protocol, message, timestamp, cancelled, direct_send_success)
SELECT tx_id, source_public_key, amount, receiver_protocol, message, timestamp, cancelled, direct_send_success
FROM inbound_transactions_old;

DROP TABLE inbound_transactions_old;

ALTER TABLE outbound_transactions RENAME TO outbound_transactions_old;
CREATE TABLE outbound_transactions (
tx_id INTEGER PRIMARY KEY NOT NULL,
destination_public_key BLOB NOT NULL,
amount INTEGER NOT NULL,
fee INTEGER NOT NULL,
sender_protocol TEXT NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL,
cancelled INTEGER NOT NULL DEFAULT 0,
direct_send_success INTEGER NOT NULL DEFAULT 0
);
INSERT INTO outbound_transactions (tx_id, destination_public_key, amount, fee, sender_protocol, message, timestamp, cancelled, direct_send_success)
SELECT tx_id, destination_public_key, amount, fee, sender_protocol, message, timestamp, cancelled, direct_send_success
FROM outbound_transactions_old;

DROP TABLE outbound_transactions_old;

PRAGMA foreign_keys=on;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
ALTER TABLE completed_transactions
ADD COLUMN send_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE completed_transactions
ADD COLUMN last_send_timestamp DATETIME NULL DEFAULT NULL;

ALTER TABLE inbound_transactions
ADD COLUMN send_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE inbound_transactions
ADD COLUMN last_send_timestamp DATETIME NULL DEFAULT NULL;

ALTER TABLE outbound_transactions
ADD COLUMN send_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE outbound_transactions
ADD COLUMN last_send_timestamp DATETIME NULL DEFAULT NULL;
2 changes: 1 addition & 1 deletion base_layer/wallet/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![recursion_limit = "1024"]
#![recursion_limit = "2048"]
#![feature(drain_filter)]
#![feature(type_alias_impl_trait)]

Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ table! {
cancelled -> Integer,
direction -> Nullable<Integer>,
coinbase_block_height -> Nullable<BigInt>,
send_count -> Integer,
last_send_timestamp -> Nullable<Timestamp>,
}
}

Expand All @@ -32,6 +34,8 @@ table! {
timestamp -> Timestamp,
cancelled -> Integer,
direct_send_success -> Integer,
send_count -> Integer,
last_send_timestamp -> Nullable<Timestamp>,
}
}

Expand All @@ -56,6 +60,8 @@ table! {
timestamp -> Timestamp,
cancelled -> Integer,
direct_send_success -> Integer,
send_count -> Integer,
last_send_timestamp -> Nullable<Timestamp>,
}
}

Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/testnet_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use crate::{
transaction_service::{
handle::TransactionEvent,
storage::{
database::{CompletedTransaction, TransactionBackend, TransactionDirection, TransactionStatus},
database::TransactionBackend,
memory_db::TransactionMemoryDatabase,
models::{CompletedTransaction, TransactionDirection, TransactionStatus},
},
},
wallet::WalletConfig,
Expand Down
9 changes: 7 additions & 2 deletions base_layer/wallet/src/transaction_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ pub struct TransactionServiceConfig {
pub base_node_monitoring_timeout: Duration,
pub direct_send_timeout: Duration,
pub broadcast_send_timeout: Duration,
pub low_power_polling_timeout: Duration, /* This is the timeout period that will be used when the wallet is in
* low_power mode */
pub low_power_polling_timeout: Duration,
pub transaction_resend_period: Duration,
pub resend_response_cooldown: Duration,
pub pending_transaction_cancellation_timeout: Duration,
}

impl Default for TransactionServiceConfig {
Expand All @@ -41,6 +43,9 @@ impl Default for TransactionServiceConfig {
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(30),
low_power_polling_timeout: Duration::from_secs(300),
transaction_resend_period: Duration::from_secs(3600),
resend_response_cooldown: Duration::from_secs(300),
pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,16 @@ pub enum TransactionServiceError {
NodeIdError(#[from] NodeIdError),
#[error("Broadcast recv error: `{0}`")]
BroadcastRecvError(#[from] RecvError),
#[error("Broadcast send error: `{0}`")]
BroadcastSendError(String),
#[error("Oneshot cancelled error: `{0}`")]
OneshotCancelled(#[from] Canceled),
#[error("Liveness error: `{0}`")]
LivenessError(#[from] LivenessError),
#[error("Coinbase build error: `{0}`")]
CoinbaseBuildError(#[from] CoinbaseBuildError),
#[error("Pending Transaction Timed out")]
Timeout,
}

#[derive(Debug, Error)]
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
output_manager_service::TxId,
transaction_service::{
error::TransactionServiceError,
storage::database::{CompletedTransaction, InboundTransaction, OutboundTransaction},
storage::models::{CompletedTransaction, InboundTransaction, OutboundTransaction},
},
};
use aes_gcm::Aes256Gcm;
Expand Down
16 changes: 16 additions & 0 deletions base_layer/wallet/src/transaction_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod handle;
pub mod protocols;
pub mod service;
pub mod storage;
pub mod tasks;

use crate::{
output_manager_service::handle::OutputManagerHandle,
Expand Down Expand Up @@ -163,6 +164,19 @@ where T: TransactionBackend
.map(map_decode::<BaseNodeProto::BaseNodeServiceResponse>)
.filter_map(ok_or_skip_result)
}

fn transaction_cancelled_stream(&self) -> impl Stream<Item = DomainMessage<proto::TransactionCancelledMessage>> {
trace!(
target: LOG_TARGET,
"Subscription '{}' for topic '{:?}' created.",
SUBSCRIPTION_LABEL,
TariMessageType::TransactionCancelled
);
self.subscription_factory
.get_subscription(TariMessageType::TransactionCancelled, SUBSCRIPTION_LABEL)
.map(map_decode::<proto::TransactionCancelledMessage>)
.filter_map(ok_or_skip_result)
}
}

impl<T> ServiceInitializer for TransactionServiceInitializer<T>
Expand All @@ -183,6 +197,7 @@ where T: TransactionBackend + Clone + 'static
let transaction_finalized_stream = self.transaction_finalized_stream();
let mempool_response_stream = self.mempool_response_stream();
let base_node_response_stream = self.base_node_response_stream();
let transaction_cancelled_stream = self.transaction_cancelled_stream();

let (publisher, _) = broadcast::channel(200);

Expand Down Expand Up @@ -219,6 +234,7 @@ where T: TransactionBackend + Clone + 'static
transaction_finalized_stream,
mempool_response_stream,
base_node_response_stream,
transaction_cancelled_stream,
output_manager_service,
outbound_message_service,
publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
service::TransactionServiceResources,
storage::database::{TransactionBackend, TransactionStatus},
storage::{database::TransactionBackend, models::TransactionStatus},
};
use futures::{channel::mpsc::Receiver, FutureExt, StreamExt};
use log::*;
Expand Down
Loading

0 comments on commit b9c3931

Please sign in to comment.