diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index e208a09ccf..54caae3e11 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -45,10 +45,7 @@ use tower::Service; use crate::output_manager_service::{ error::OutputManagerError, service::{Balance, OutputInfoByTxId}, - storage::{ - database::OutputBackendQuery, - models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority}, - }, + storage::models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority}, UtxoSelectionCriteria, }; @@ -90,7 +87,6 @@ pub enum OutputManagerRequest { CancelTransaction(TxId), GetSpentOutputs, GetUnspentOutputs, - GetOutputsBy(OutputBackendQuery), GetInvalidOutputs, ValidateUtxos, RevalidateTxos, @@ -152,7 +148,6 @@ impl fmt::Display for OutputManagerRequest { CancelTransaction(v) => write!(f, "CancelTransaction ({})", v), GetSpentOutputs => write!(f, "GetSpentOutputs"), GetUnspentOutputs => write!(f, "GetUnspentOutputs"), - GetOutputsBy(q) => write!(f, "GetOutputs({:#?})", q), GetInvalidOutputs => write!(f, "GetInvalidOutputs"), ValidateUtxos => write!(f, "ValidateUtxos"), RevalidateTxos => write!(f, "RevalidateTxos"), diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 9ce219b3f6..641913cb34 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -306,10 +306,6 @@ where let outputs = self.fetch_unspent_outputs()?; Ok(OutputManagerResponse::UnspentOutputs(outputs)) }, - OutputManagerRequest::GetOutputsBy(q) => { - let outputs = self.fetch_outputs_by(q)?.into_iter().map(|v| v.into()).collect(); - Ok(OutputManagerResponse::Outputs(outputs)) - }, OutputManagerRequest::ValidateUtxos => { self.validate_outputs().map(OutputManagerResponse::TxoValidationStarted) }, @@ -1381,8 +1377,8 @@ where Ok(self.resources.db.fetch_all_unspent_outputs()?) } - pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result, OutputManagerError> { - Ok(self.resources.db.fetch_outputs_by(q)?) + pub fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result, OutputManagerError> { + Ok(self.resources.db.fetch_outputs_by_query(q)?) } pub fn fetch_invalid_outputs(&self) -> Result, OutputManagerError> { diff --git a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs index 722817846e..03eb5c434b 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs @@ -101,5 +101,5 @@ pub trait OutputManagerBackend: Send + Sync + Clone { current_tip_height: Option, ) -> Result, OutputManagerStorageError>; fn fetch_outputs_by_tx_id(&self, tx_id: TxId) -> Result, OutputManagerStorageError>; - fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result, OutputManagerStorageError>; + fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result, OutputManagerStorageError>; } diff --git a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs index 6548ed4200..6029caa2c4 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs @@ -434,8 +434,11 @@ where T: OutputManagerBackend + 'static Ok(outputs) } - pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result, OutputManagerStorageError> { - self.db.fetch_outputs_by(q) + pub fn fetch_outputs_by_query( + &self, + q: OutputBackendQuery, + ) -> Result, OutputManagerStorageError> { + self.db.fetch_outputs_by_query(q) } } diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 45db3bb940..963c046408 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -1037,9 +1037,9 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { .collect::, _>>() } - fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result, OutputManagerStorageError> { + fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result, OutputManagerStorageError> { let mut conn = self.database_connection.get_pooled_connection()?; - Ok(OutputSql::fetch_outputs_by(q, &mut conn)? + Ok(OutputSql::fetch_outputs_by_query(q, &mut conn)? .into_iter() .filter_map(|x| { x.to_db_wallet_output() diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs index c8ed573044..7fe605b9af 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs @@ -120,7 +120,7 @@ impl OutputSql { /// Retrieves UTXOs by a set of given rules #[allow(clippy::cast_sign_loss)] - pub fn fetch_outputs_by( + pub fn fetch_outputs_by_query( q: OutputBackendQuery, conn: &mut SqliteConnection, ) -> Result, OutputManagerStorageError> { diff --git a/base_layer/wallet/src/storage/sqlite_utilities/mod.rs b/base_layer/wallet/src/storage/sqlite_utilities/mod.rs index 7345323bb0..078ff857cf 100644 --- a/base_layer/wallet/src/storage/sqlite_utilities/mod.rs +++ b/base_layer/wallet/src/storage/sqlite_utilities/mod.rs @@ -74,6 +74,27 @@ pub fn run_migration_and_create_sqlite_connection>( Ok(WalletDbConnection::new(pool, Some(file_lock))) } +pub fn run_migration_and_create_sqlite_memory_connection( + sqlite_pool_size: usize, +) -> Result { + let mut pool = SqliteConnectionPool::new( + String::from(":memory:"), + sqlite_pool_size, + true, + true, + Duration::from_secs(60), + ); + pool.create_pool()?; + let mut connection = pool.get_pooled_connection()?; + + const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); + connection + .run_pending_migrations(MIGRATIONS) + .map_err(|err| WalletStorageError::DatabaseMigrationError(format!("Database migration failed {}", err)))?; + + Ok(WalletDbConnection::new(pool, None)) +} + pub fn acquire_exclusive_file_lock(db_path: &Path) -> Result { let lock_file_path = match db_path.file_name() { None => { diff --git a/base_layer/wallet/src/test_utils.rs b/base_layer/wallet/src/test_utils.rs index 92b366cfe9..c6e372cc2a 100644 --- a/base_layer/wallet/src/test_utils.rs +++ b/base_layer/wallet/src/test_utils.rs @@ -30,6 +30,7 @@ use tempfile::{tempdir, TempDir}; use crate::storage::sqlite_utilities::{ run_migration_and_create_sqlite_connection, + run_migration_and_create_sqlite_memory_connection, wallet_db_connection::WalletDbConnection, }; @@ -58,6 +59,11 @@ pub fn make_wallet_database_connection(path: Option) -> (WalletDbConnect (connection, temp_dir) } +/// A test helper to create a temporary wallet service memory databases +pub fn make_wallet_database_memory_connection() -> WalletDbConnection { + run_migration_and_create_sqlite_memory_connection(16).unwrap() +} + pub fn create_consensus_rules() -> ConsensusManager { ConsensusManager::builder(Network::LocalNet).build().unwrap() } diff --git a/base_layer/wallet/src/transaction_service/error.rs b/base_layer/wallet/src/transaction_service/error.rs index 1743fa8a44..780430e501 100644 --- a/base_layer/wallet/src/transaction_service/error.rs +++ b/base_layer/wallet/src/transaction_service/error.rs @@ -185,6 +185,12 @@ pub enum TransactionServiceError { InvalidKeyId(String), #[error("Invalid key manager data: `{0}`")] KeyManagerServiceError(#[from] KeyManagerServiceError), + #[error("Serialization error: `{0}`")] + SerializationError(String), + #[error("Transaction exceed maximum byte size. Expected < {expected} but got {got}.")] + TransactionTooLarge { got: usize, expected: usize }, + #[error("Pending Transaction was oversized")] + Oversized, } impl From for TransactionServiceError { diff --git a/base_layer/wallet/src/transaction_service/protocols/mod.rs b/base_layer/wallet/src/transaction_service/protocols/mod.rs index 15bdb1dd1c..9c7cfc3ce1 100644 --- a/base_layer/wallet/src/transaction_service/protocols/mod.rs +++ b/base_layer/wallet/src/transaction_service/protocols/mod.rs @@ -20,7 +20,48 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use bincode::serialize_into; +use log::{debug, error}; +use serde::Serialize; +use tari_common_types::transaction::TxId; +use tari_comms::protocol::rpc; + +use crate::transaction_service::error::{TransactionServiceError, TransactionServiceProtocolError}; + pub mod transaction_broadcast_protocol; pub mod transaction_receive_protocol; pub mod transaction_send_protocol; pub mod transaction_validation_protocol; + +const LOG_TARGET: &str = "wallet::transaction_service::protocols"; + +/// Verify that the negotiated transaction is not too large to be broadcast +pub fn check_transaction_size( + transaction: &T, + tx_id: TxId, +) -> Result<(), TransactionServiceProtocolError> { + let mut buf: Vec = Vec::new(); + serialize_into(&mut buf, transaction).map_err(|e| { + TransactionServiceProtocolError::new(tx_id, TransactionServiceError::SerializationError(e.to_string())) + })?; + const SIZE_MARGIN: usize = 1024 * 10; + if buf.len() > rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN) { + let err = TransactionServiceProtocolError::new(tx_id, TransactionServiceError::TransactionTooLarge { + got: buf.len(), + expected: rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN), + }); + error!( + target: LOG_TARGET, + "Transaction '{}' too large, cannot be broadcast ({:?}).", + tx_id, err + ); + Err(err) + } else { + debug!( + target: LOG_TARGET, + "Transaction '{}' size ok, can be broadcast (got: {}, limit: {}).", + tx_id, buf.len(), rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN) + ); + Ok(()) + } +} diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index f74f97bde9..4f4e32efb3 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -47,6 +47,7 @@ use crate::{ transaction_service::{ error::{TransactionServiceError, TransactionServiceProtocolError}, handle::TransactionEvent, + protocols::check_transaction_size, service::TransactionServiceResources, storage::{ database::TransactionBackend, @@ -127,6 +128,10 @@ where ); return Ok(self.tx_id); } + if let Err(e) = check_transaction_size(&completed_tx.transaction, self.tx_id) { + self.cancel_transaction(TxCancellationReason::Oversized).await; + return Err(e); + } loop { tokio::select! { diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs index 38d948a5f7..c0a16a38aa 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs @@ -44,6 +44,7 @@ use crate::{ transaction_service::{ error::{TransactionServiceError, TransactionServiceProtocolError}, handle::TransactionEvent, + protocols::check_transaction_size, service::TransactionServiceResources, storage::{ database::TransactionBackend, @@ -159,6 +160,12 @@ where Utc::now().naive_utc(), ); + // Verify that the negotiated transaction is not too large to be broadcast + if let Err(e) = check_transaction_size(&inbound_transaction, self.id) { + self.cancel_oversized_transaction().await?; + return Err(e); + } + self.resources .db .add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone()) @@ -242,6 +249,12 @@ where }, }; + // Verify that the negotiated transaction is not too large to be broadcast + if let Err(e) = check_transaction_size(&inbound_tx, self.id) { + self.cancel_oversized_transaction().await?; + return Err(e); + } + // Determine the time remaining before this transaction times out let elapsed_time = utc_duration_since(&inbound_tx.timestamp) .map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?; @@ -469,6 +482,32 @@ where "Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id ); + self.cancel_transaction(TxCancellationReason::Timeout).await?; + + info!( + target: LOG_TARGET, + "Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id + ); + + Err(TransactionServiceProtocolError::new( + self.id, + TransactionServiceError::Timeout, + )) + } + + async fn cancel_oversized_transaction(&mut self) -> Result<(), TransactionServiceProtocolError> { + info!( + target: LOG_TARGET, + "Cancelling Transaction Receive Protocol (TxId: {}) due to transaction being oversized", self.id + ); + + self.cancel_transaction(TxCancellationReason::Oversized).await + } + + async fn cancel_transaction( + &mut self, + cancel_reason: TxCancellationReason, + ) -> Result<(), TransactionServiceProtocolError> { self.resources.db.cancel_pending_transaction(self.id).map_err(|e| { warn!( target: LOG_TARGET, @@ -486,10 +525,7 @@ where let _size = self .resources .event_publisher - .send(Arc::new(TransactionEvent::TransactionCancelled( - self.id, - TxCancellationReason::Timeout, - ))) + .send(Arc::new(TransactionEvent::TransactionCancelled(self.id, cancel_reason))) .map_err(|e| { trace!( target: LOG_TARGET, @@ -502,14 +538,6 @@ where ) }); - info!( - target: LOG_TARGET, - "Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id - ); - - Err(TransactionServiceProtocolError::new( - self.id, - TransactionServiceError::Timeout, - )) + Ok(()) } } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 30eed38c44..ab12db4f47 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -63,6 +63,7 @@ use crate::{ config::TransactionRoutingMechanism, error::{TransactionServiceError, TransactionServiceProtocolError}, handle::{TransactionEvent, TransactionSendStatus, TransactionServiceResponse}, + protocols::check_transaction_size, service::{TransactionSendResult, TransactionServiceResources}, storage::{ database::TransactionBackend, @@ -281,28 +282,45 @@ where )); } + // Calculate the size of the transaction - initial send transaction to the peer (always a small message) should + // not be attempted if the final transaction size will be too large to be broadcast + let outbound_tx_check = OutboundTransaction::new( + tx_id, + self.dest_address.clone(), + self.amount, + MicroMinotari::zero(), // This does not matter for the check + sender_protocol.clone(), + TransactionStatus::Pending, // This does not matter for the check + self.message.clone(), + Utc::now().naive_utc(), + true, // This does not matter for the check + ); + // Attempt to send the initial transaction - let SendResult { - direct_send_result, - store_and_forward_send_result, - transaction_status, - } = match self.send_transaction(msg).await { - Ok(val) => val, - Err(e) => { - warn!( - target: LOG_TARGET, - "Problem sending Outbound Transaction TxId: {:?}: {:?}", self.id, e - ); - SendResult { - direct_send_result: false, - store_and_forward_send_result: false, - transaction_status: TransactionStatus::Queued, - } - }, + let mut initial_send = SendResult { + direct_send_result: false, + store_and_forward_send_result: false, + transaction_status: TransactionStatus::Queued, + }; + if let Err(e) = check_transaction_size(&outbound_tx_check, self.id) { + info!( + target: LOG_TARGET, + "Initial Transaction TxId: {:?} will not be sent due to it being oversize ({:?})", self.id, e + ); + } else { + match self.send_transaction(msg).await { + Ok(val) => initial_send = val, + Err(e) => { + warn!( + target: LOG_TARGET, + "Problem sending Outbound Transaction TxId: {:?}: {:?}", self.id, e + ); + }, + } }; // Confirm pending transaction (confirm encumbered outputs) - if transaction_status == TransactionStatus::Pending { + if initial_send.transaction_status == TransactionStatus::Pending { self.resources .output_manager_service .confirm_pending_transaction(self.id) @@ -326,17 +344,21 @@ where self.amount, fee, sender_protocol.clone(), - transaction_status.clone(), + initial_send.transaction_status.clone(), self.message.clone(), Utc::now().naive_utc(), - direct_send_result, + initial_send.direct_send_result, ); self.resources .db - .add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx) + .add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx.clone()) .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; + if let Err(e) = check_transaction_size(&outbound_tx, self.id) { + self.cancel_oversized_transaction().await?; + return Err(e); + } } - if transaction_status == TransactionStatus::Pending { + if initial_send.transaction_status == TransactionStatus::Pending { self.resources .db .increment_send_count(self.id) @@ -350,13 +372,13 @@ where .send(Arc::new(TransactionEvent::TransactionSendResult( self.id, TransactionSendStatus { - direct_send_result, - store_and_forward_send_result, - queued_for_retry: transaction_status == TransactionStatus::Queued, + direct_send_result: initial_send.direct_send_result, + store_and_forward_send_result: initial_send.store_and_forward_send_result, + queued_for_retry: initial_send.transaction_status == TransactionStatus::Queued, }, ))); - if transaction_status == TransactionStatus::Pending { + if initial_send.transaction_status == TransactionStatus::Pending { info!( target: LOG_TARGET, "Pending Outbound Transaction TxId: {:?} added. Waiting for Reply or Cancellation", self.id, @@ -367,7 +389,7 @@ where "Pending Outbound Transaction TxId: {:?} queued. Waiting for wallet to come online", self.id, ); } - Ok(transaction_status) + Ok(initial_send.transaction_status) } #[allow(clippy::too_many_lines)] @@ -391,6 +413,12 @@ where .get_pending_outbound_transaction(tx_id) .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; + // Verify that the negotiated transaction is not too large to be broadcast + if let Err(e) = check_transaction_size(&outbound_tx, self.id) { + self.cancel_oversized_transaction().await?; + return Err(e); + } + if !outbound_tx.sender_protocol.is_collecting_single_signature() { error!( target: LOG_TARGET, @@ -883,6 +911,33 @@ where target: LOG_TARGET, "Cancelling Transaction Send Protocol (TxId: {}) due to timeout after no counterparty response", self.id ); + + self.cancel_transaction(TxCancellationReason::Timeout).await?; + + info!( + target: LOG_TARGET, + "Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id + ); + + Err(TransactionServiceProtocolError::new( + self.id, + TransactionServiceError::Timeout, + )) + } + + async fn cancel_oversized_transaction(&mut self) -> Result<(), TransactionServiceProtocolError> { + info!( + target: LOG_TARGET, + "Cancelling Transaction Send Protocol (TxId: {}) due to transaction being oversized", self.id + ); + + self.cancel_transaction(TxCancellationReason::Oversized).await + } + + async fn cancel_transaction( + &mut self, + cancel_reason: TxCancellationReason, + ) -> Result<(), TransactionServiceProtocolError> { let _ = send_transaction_cancelled_message( self.id, self.dest_address.public_key().clone(), @@ -917,10 +972,7 @@ where let _size = self .resources .event_publisher - .send(Arc::new(TransactionEvent::TransactionCancelled( - self.id, - TxCancellationReason::Timeout, - ))) + .send(Arc::new(TransactionEvent::TransactionCancelled(self.id, cancel_reason))) .map_err(|e| { trace!( target: LOG_TARGET, @@ -933,15 +985,7 @@ where ) }); - info!( - target: LOG_TARGET, - "Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id - ); - - Err(TransactionServiceProtocolError::new( - self.id, - TransactionServiceError::Timeout, - )) + Ok(()) } } diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index f03d0d8934..c187c1cf10 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -107,6 +107,7 @@ use crate::{ TransactionServiceResponse, }, protocols::{ + check_transaction_size, transaction_broadcast_protocol::TransactionBroadcastProtocol, transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage}, transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage}, @@ -782,16 +783,19 @@ where let tx_id = match tx { PendingInbound(inbound_tx) => { let tx_id = inbound_tx.tx_id; + check_transaction_size(&inbound_tx, tx_id)?; self.db.insert_pending_inbound_transaction(tx_id, inbound_tx)?; tx_id }, PendingOutbound(outbound_tx) => { let tx_id = outbound_tx.tx_id; + check_transaction_size(&outbound_tx, tx_id)?; self.db.insert_pending_outbound_transaction(tx_id, outbound_tx)?; tx_id }, Completed(completed_tx) => { let tx_id = completed_tx.tx_id; + check_transaction_size(&completed_tx.transaction, tx_id)?; self.db.insert_completed_transaction(tx_id, completed_tx)?; tx_id }, @@ -822,6 +826,7 @@ where .map(TransactionServiceResponse::UtxoImported), TransactionServiceRequest::SubmitTransactionToSelf(tx_id, tx, fee, amount, message) => self .submit_transaction_to_self(transaction_broadcast_join_handles, tx_id, tx, fee, amount, message) + .await .map(|_| TransactionServiceResponse::TransactionSubmitted), TransactionServiceRequest::SetLowPowerMode => { self.set_power_mode(PowerMode::Low).await?; @@ -1028,7 +1033,8 @@ where None, None, )?, - )?; + ) + .await?; let _result = reply_channel .send(Ok(TransactionServiceResponse::TransactionSent(tx_id))) @@ -1277,7 +1283,8 @@ where None, None, )?, - )?; + ) + .await?; let tx_output = output .to_transaction_output(&self.resources.transaction_key_manager_service) @@ -1465,7 +1472,8 @@ where None, None, )?, - )?; + ) + .await?; Ok(tx_id) } @@ -1726,7 +1734,8 @@ where None, None, )?, - )?; + ) + .await?; info!(target: LOG_TARGET, "Submitted burning transaction - TxId: {}", tx_id); Ok((tx_id, BurntProof { @@ -2860,7 +2869,7 @@ where } /// Submit a completed transaction to the Transaction Manager - fn submit_transaction( + async fn submit_transaction( &mut self, transaction_broadcast_join_handles: &mut FuturesUnordered< JoinHandle>>, @@ -2869,12 +2878,17 @@ where ) -> Result<(), TransactionServiceError> { let tx_id = completed_transaction.tx_id; trace!(target: LOG_TARGET, "Submit transaction ({}) to db.", tx_id); - self.db.insert_completed_transaction(tx_id, completed_transaction)?; + self.db + .insert_completed_transaction(tx_id, completed_transaction.clone())?; trace!( target: LOG_TARGET, "Launch the transaction broadcast protocol for submitted transaction ({}).", tx_id ); + if let Err(e) = check_transaction_size(&completed_transaction.transaction, tx_id) { + self.cancel_transaction(tx_id, TxCancellationReason::Oversized).await; + return Err(e.into()); + } self.complete_send_transaction_protocol( Ok(TransactionSendResult { tx_id, @@ -2885,9 +2899,24 @@ where Ok(()) } + async fn cancel_transaction(&mut self, tx_id: TxId, reason: TxCancellationReason) { + if let Err(e) = self.resources.output_manager_service.cancel_transaction(tx_id).await { + warn!( + target: LOG_TARGET, + "Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", tx_id, e + ); + } + if let Err(e) = self.resources.db.reject_completed_transaction(tx_id, reason) { + warn!( + target: LOG_TARGET, + "Failed to Cancel TxId: {} after failed sending attempt with error {:?}", tx_id, e + ); + } + } + /// Submit a completed coin split transaction to the Transaction Manager. This is different from /// `submit_transaction` in that it will expose less information about the completed transaction. - pub fn submit_transaction_to_self( + pub async fn submit_transaction_to_self( &mut self, transaction_broadcast_join_handles: &mut FuturesUnordered< JoinHandle>>, @@ -2914,7 +2943,8 @@ where None, None, )?, - )?; + ) + .await?; Ok(()) } diff --git a/base_layer/wallet/src/transaction_service/storage/models.rs b/base_layer/wallet/src/transaction_service/storage/models.rs index 77208cdbd0..9830c2a8fa 100644 --- a/base_layer/wallet/src/transaction_service/storage/models.rs +++ b/base_layer/wallet/src/transaction_service/storage/models.rs @@ -330,6 +330,7 @@ pub enum TxCancellationReason { Orphan, // 4 TimeLocked, // 5 InvalidTransaction, // 6 + Oversized, // 7 } impl TryFrom for TxCancellationReason { @@ -344,6 +345,7 @@ impl TryFrom for TxCancellationReason { 4 => Ok(TxCancellationReason::Orphan), 5 => Ok(TxCancellationReason::TimeLocked), 6 => Ok(TxCancellationReason::InvalidTransaction), + 7 => Ok(TxCancellationReason::Oversized), code => Err(TransactionConversionError { code: code as i32 }), } } @@ -361,6 +363,7 @@ impl Display for TxCancellationReason { Orphan => "Orphan", TimeLocked => "TimeLocked", InvalidTransaction => "Invalid Transaction", + Oversized => "Oversized", }; fmt.write_str(response) } diff --git a/base_layer/wallet/tests/support/utils.rs b/base_layer/wallet/tests/support/utils.rs index 704ff911f6..65da7d7baa 100644 --- a/base_layer/wallet/tests/support/utils.rs +++ b/base_layer/wallet/tests/support/utils.rs @@ -52,6 +52,17 @@ pub async fn make_input( .unwrap() } +pub async fn make_fake_input_from_copy( + wallet_output: &mut WalletOutput, + key_manager: &MemoryDbKeyManager, +) -> WalletOutput { + let (spend_key_id, _spend_key_pk, script_key_id, _script_key_pk) = + key_manager.get_next_spend_and_script_key_ids().await.unwrap(); + wallet_output.spending_key_id = spend_key_id; + wallet_output.script_key_id = script_key_id; + wallet_output.clone() +} + pub async fn create_wallet_output_from_sender_data( info: &TransactionSenderMessage, key_manager: &MemoryDbKeyManager, diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 18913bd781..f4a5ade587 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -63,7 +63,12 @@ use minotari_wallet::{ sqlite_db::wallet::WalletSqliteDatabase, sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}, }, - test_utils::{create_consensus_constants, make_wallet_database_connection, random_string}, + test_utils::{ + create_consensus_constants, + make_wallet_database_connection, + make_wallet_database_memory_connection, + random_string, + }, transaction_service::{ config::TransactionServiceConfig, error::TransactionServiceError, @@ -90,7 +95,10 @@ use tari_common_types::{ use tari_comms::{ message::EnvelopeBody, peer_manager::{NodeIdentity, PeerFeatures}, - protocol::rpc::{mock::MockRpcServer, NamedProtocolService}, + protocol::{ + rpc, + rpc::{mock::MockRpcServer, NamedProtocolService}, + }, test_utils::node_identity::build_node_identity, types::CommsDHKE, CommsNode, @@ -161,7 +169,7 @@ use crate::support::{ base_node_service_mock::MockBaseNodeService, comms_and_services::{create_dummy_message, setup_comms_services}, comms_rpc::{connect_rpc_client, BaseNodeWalletRpcMockService, BaseNodeWalletRpcMockState}, - utils::{create_wallet_output_from_sender_data, make_input}, + utils::{create_wallet_output_from_sender_data, make_fake_input_from_copy, make_input}, }; async fn setup_transaction_service>( @@ -530,8 +538,8 @@ async fn manage_single_transaction() { ); let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); - let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let alice_connection = make_wallet_database_memory_connection(); + let bob_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = @@ -693,8 +701,8 @@ async fn large_interactive_transaction() { ); let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); - let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let alice_connection = make_wallet_database_memory_connection(); + let bob_connection = make_wallet_database_memory_connection(); // Alice sets up her Transaction Service let shutdown = Shutdown::new(); @@ -839,6 +847,384 @@ async fn large_interactive_transaction() { ); } +#[allow(clippy::cast_possible_truncation)] +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_spend_dust_to_self_in_oversized_transaction() { + //` cargo test --release --test wallet_integration_tests + //` transaction_service_tests::service::test_spend_dust_to_self_in_oversized_transaction > .\target\output.txt + //` 2>&1 + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + let network = Network::LocalNet; + let consensus_manager = ConsensusManager::builder(network).build().unwrap(); + let factories = CryptoFactories::default(); + let shutdown = Shutdown::new(); + + // Alice's wallet parameters + let alice_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + let bob_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + log::info!( + "manage_single_transaction: Alice: '{}', Bob: '{}'", + alice_node_identity.node_id().short_str(), + bob_node_identity.node_id().short_str(), + ); + let temp_dir = tempdir().unwrap(); + let database_path = temp_dir.path().to_str().unwrap().to_string(); + let alice_connection = make_wallet_database_memory_connection(); + + let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = + setup_transaction_service( + alice_node_identity.clone(), + vec![], + consensus_manager.clone(), + factories.clone(), + alice_connection, + database_path.clone(), + Duration::from_secs(0), + shutdown.to_signal(), + ) + .await; + + // Alice create dust + + let amount_per_output = 10_000 * uT; + // This value was determined by running the test and evaluating the error message, + // e.g. `TransactionTooLarge { got: 3379097, expected: 3135488 }` + let max_number_of_outputs_in_frame = (rpc::RPC_MAX_FRAME_SIZE as f64 / 700.0f64).ceil() as usize; + let number_of_outputs = max_number_of_outputs_in_frame + 100; + let mut uo_reference = make_input( + &mut OsRng, + amount_per_output, + &OutputFeatures::default(), + &alice_key_manager_handle, + ) + .await; + for _ in 0..number_of_outputs { + let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; + + alice_oms.add_output(uo.clone(), None).await.unwrap(); + alice_db + .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) + .unwrap(); + } + + let balance = alice_oms.get_balance().await.unwrap(); + let initial_available_balance = balance.available_balance; + + // Alice try to spend too much dust to self + + let fee_per_gram = MicroMinotari::from(1); + let message = "TAKE MAH _OWN_ MONEYS!".to_string(); + let value = balance.available_balance - amount_per_output * 10; + let alice_address = TariAddress::new(alice_node_identity.public_key().clone(), network); + assert!(alice_ts + .send_transaction( + alice_address, + value, + UtxoSelectionCriteria::default(), + OutputFeatures::default(), + fee_per_gram, + message.clone(), + ) + .await + .is_err()); + let balance = alice_oms.get_balance().await.unwrap(); + // Encumbered outputs are re-instated + assert_eq!(balance.available_balance, initial_available_balance); +} + +#[allow(clippy::cast_possible_truncation)] +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_spend_dust_to_other_in_oversized_transaction() { + //` cargo test --release --test wallet_integration_tests + //` transaction_service_tests::service::test_spend_dust_to_other_in_oversized_transaction > .\target\output.txt + //` 2>&1 + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + let network = Network::LocalNet; + let consensus_manager = ConsensusManager::builder(network).build().unwrap(); + let factories = CryptoFactories::default(); + let shutdown = Shutdown::new(); + + // Alice's wallet parameters + let alice_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + let bob_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + log::info!( + "manage_single_transaction: Alice: '{}', Bob: '{}'", + alice_node_identity.node_id().short_str(), + bob_node_identity.node_id().short_str(), + ); + let temp_dir = tempdir().unwrap(); + let database_path = temp_dir.path().to_str().unwrap().to_string(); + let alice_connection = make_wallet_database_memory_connection(); + + let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = + setup_transaction_service( + alice_node_identity.clone(), + vec![], + consensus_manager.clone(), + factories.clone(), + alice_connection, + database_path.clone(), + Duration::from_secs(0), + shutdown.to_signal(), + ) + .await; + + // Alice create dust + + let amount_per_output = 10_000 * uT; + // This value was determined by running the test and evaluating the error message, + // e.g. `TransactionTooLarge { got: 3205068, expected: 3135488 }` + let max_number_of_outputs_in_frame = (rpc::RPC_MAX_FRAME_SIZE as f64 / 1175.0f64).ceil() as usize; + let number_of_outputs = max_number_of_outputs_in_frame + 100; + let mut uo_reference = make_input( + &mut OsRng, + amount_per_output, + &OutputFeatures::default(), + &alice_key_manager_handle, + ) + .await; + for _ in 0..number_of_outputs { + let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; + + alice_oms.add_output(uo.clone(), None).await.unwrap(); + alice_db + .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) + .unwrap(); + } + + let balance = alice_oms.get_balance().await.unwrap(); + let initial_available_balance = balance.available_balance; + + // Alice try to spend too much dust to Bob + + let fee_per_gram = MicroMinotari::from(1); + let message = "GIVE MAH _OWN_ MONEYS AWAY!".to_string(); + let value = balance.available_balance - amount_per_output * 10; + let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), network); + let tx_id = alice_ts + .send_transaction( + bob_address, + value, + UtxoSelectionCriteria::default(), + OutputFeatures::default(), + fee_per_gram, + message.clone(), + ) + .await + .unwrap(); + println!("tx_id: {}", tx_id); + + let mut count = 0; + loop { + match alice_ts.get_any_transaction(tx_id).await { + Ok(None) => tokio::time::sleep(Duration::from_millis(100)).await, + Ok(Some(WalletTransaction::PendingOutbound(_))) => { + println!("waited {}ms to detect the transaction", count * 100); + break; + }, + _ => { + panic!( + "waited {}ms to detect the transaction, unexpected error/inbound/completed!", + count * 100 + ); + }, + } + count += 1; + if count > 20 * 10 { + panic!("waited {}ms but could not detect the transaction!", count * 100); + } + } + // Encumbered outputs are re-instated + assert_eq!(balance.available_balance, initial_available_balance); +} + +#[allow(clippy::cast_possible_truncation)] +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_spend_dust_happy_path() { + //` cargo test --release --test wallet_integration_tests + //` transaction_service_tests::service::test_spend_dust_happy_path > .\target\output.txt 2>&1 + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + let network = Network::LocalNet; + let consensus_manager = ConsensusManager::builder(network).build().unwrap(); + let factories = CryptoFactories::default(); + let shutdown = Shutdown::new(); + + // Alice's wallet parameters + let alice_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + let bob_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + log::info!( + "manage_single_transaction: Alice: '{}', Bob: '{}'", + alice_node_identity.node_id().short_str(), + bob_node_identity.node_id().short_str(), + ); + let temp_dir = tempdir().unwrap(); + let database_path = temp_dir.path().to_str().unwrap().to_string(); + let alice_connection = make_wallet_database_memory_connection(); + + let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = + setup_transaction_service( + alice_node_identity.clone(), + vec![], + consensus_manager.clone(), + factories.clone(), + alice_connection, + database_path.clone(), + Duration::from_secs(0), + shutdown.to_signal(), + ) + .await; + + // Alice create dust + + let amount_per_output = 10_000 * uT; + let number_of_outputs = 1000; + let fee_per_gram = MicroMinotari::from(1); + let mut uo_reference = make_input( + &mut OsRng, + amount_per_output, + &OutputFeatures::default(), + &alice_key_manager_handle, + ) + .await; + for _ in 0..number_of_outputs { + let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; + + alice_oms.add_output(uo.clone(), None).await.unwrap(); + alice_db + .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) + .unwrap(); + } + + let balance = alice_oms.get_balance().await.unwrap(); + let initial_available_balance = balance.available_balance; + + // Alice try to spend a fair amount of dust to self [should succeed] (we just need to verify that the + // transaction is created and that the available balance is correct) + + let message = "TAKE MAH _OWN_ MONEYS!".to_string(); + let value_self = (number_of_outputs / 3) * amount_per_output; + let alice_address = TariAddress::new(alice_node_identity.public_key().clone(), network); + let tx_id = alice_ts + .send_transaction( + alice_address, + value_self, + UtxoSelectionCriteria::default(), + OutputFeatures::default(), + fee_per_gram, + message.clone(), + ) + .await + .unwrap(); + let mut count = 0; + let mut fees_self = loop { + match alice_ts.get_any_transaction(tx_id).await { + Ok(None) => tokio::time::sleep(Duration::from_millis(100)).await, + Ok(Some(WalletTransaction::Completed(tx))) => { + println!("waited {}ms to detect the transaction", count * 100); + break tx.fee; + }, + _ => { + panic!( + "waited {}ms to detect the transaction, unexpected error/inbound/outboubd!", + count * 100 + ); + }, + } + count += 1; + if count > 20 * 10 { + panic!("waited {}ms but could not detect the transaction!", count * 100); + } + }; + fees_self = (fees_self.0 as f64 / amount_per_output.0 as f64).ceil() as u64 * amount_per_output; + let balance = alice_oms.get_balance().await.unwrap(); + assert_eq!( + balance.available_balance, + initial_available_balance - value_self - fees_self + ); + + // Alice try to spend a fair amount of dust to Bob [should succeed] (We do not need Bob to be present, + // we just need to verify that the transaction is created and that the available balance is correct) + + let message = "GIVE MAH _OWN_ MONEYS AWAY!".to_string(); + let value_bob = (number_of_outputs / 3) * amount_per_output; + let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), network); + let tx_id = alice_ts + .send_transaction( + bob_address, + value_bob, + UtxoSelectionCriteria::default(), + OutputFeatures::default(), + fee_per_gram, + message.clone(), + ) + .await + .unwrap(); + println!("tx_id: {}", tx_id); + + let mut count = 0; + let mut fees_bob = loop { + match alice_ts.get_any_transaction(tx_id).await { + Ok(None) => tokio::time::sleep(Duration::from_millis(100)).await, + Ok(Some(WalletTransaction::PendingOutbound(tx))) => { + println!("waited {}ms to detect the transaction", count * 100); + break tx.fee; + }, + _ => { + panic!( + "waited {}ms to detect the transaction, unexpected error/inbound/completed!", + count * 100 + ); + }, + } + count += 1; + if count > 20 * 10 { + panic!("waited {}ms but could not detect the transaction!", count * 100); + } + }; + fees_bob = (fees_bob.0 as f64 / amount_per_output.0 as f64).ceil() as u64 * amount_per_output; + let balance = alice_oms.get_balance().await.unwrap(); + assert_eq!( + balance.available_balance, + initial_available_balance - value_self - fees_self - value_bob - fees_bob + ); +} + #[tokio::test] async fn single_transaction_to_self() { let network = Network::LocalNet; @@ -866,7 +1252,7 @@ async fn single_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let db_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, key_manager_handle, alice_db) = @@ -950,7 +1336,7 @@ async fn large_coin_split_transaction() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let db_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, key_manager_handle, alice_db) = @@ -1035,7 +1421,7 @@ async fn single_transaction_burn_tari() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let db_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, key_manager_handle, alice_db) = @@ -1183,7 +1569,7 @@ async fn send_one_sided_transaction_to_other() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let db_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, key_manager_handle, alice_db) = @@ -1301,8 +1687,8 @@ async fn recover_one_sided_transaction() { let database_path = temp_dir.path().to_str().unwrap().to_string(); let database_path2 = temp_dir2.path().to_str().unwrap().to_string(); - let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); - let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path2.clone())); + let alice_connection = make_wallet_database_memory_connection(); + let bob_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (mut alice_ts, alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = @@ -1424,7 +1810,7 @@ async fn test_htlc_send_and_claim() { let bob_db_name = format!("{}.sqlite3", random::string(8).as_str()); let bob_db_path = format!("{}/{}", path_string, bob_db_name); - let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let db_connection = make_wallet_database_memory_connection(); let bob_connection = run_migration_and_create_sqlite_connection(&bob_db_path, 16).unwrap(); let shutdown = Shutdown::new(); @@ -1557,7 +1943,7 @@ async fn send_one_sided_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let alice_connection = make_wallet_database_memory_connection(); let shutdown = Shutdown::new(); let (alice_ts, alice_oms, _alice_comms, _alice_connectivity, key_manager_handle, alice_db) = @@ -1648,6 +2034,7 @@ async fn manage_multiple_transactions() { let database_path = temp_dir.path().to_str().unwrap().to_string(); + // TODO: When using a memory type db connection this test fails at `assert_eq!(tx_reply, 3, "Need 3 replies");` let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let (carol_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); @@ -2318,7 +2705,7 @@ async fn discovery_async_return_test() { ); let mut shutdown = Shutdown::new(); - let (carol_connection, _temp_dir1) = make_wallet_database_connection(None); + let carol_connection = make_wallet_database_memory_connection(); let (_carol_ts, _carol_oms, carol_comms, _carol_connectivity, key_manager_handle, _carol_db) = setup_transaction_service( @@ -2333,7 +2720,7 @@ async fn discovery_async_return_test() { ) .await; - let (alice_connection, _temp_dir2) = make_wallet_database_connection(None); + let alice_connection = make_wallet_database_memory_connection(); let (mut alice_ts, mut alice_oms, alice_comms, _alice_connectivity, alice_key_manager_handle, alice_db) = setup_transaction_service( @@ -2488,7 +2875,7 @@ async fn discovery_async_return_test() { #[tokio::test] async fn test_power_mode_updates() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let tx_backend = alice_ts_interface.ts_db; @@ -2638,7 +3025,7 @@ async fn test_power_mode_updates() { async fn test_set_num_confirmations() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut ts_interface = setup_transaction_service_no_comms( factories, @@ -2684,7 +3071,7 @@ async fn test_transaction_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -3034,7 +3421,7 @@ async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; @@ -3094,7 +3481,7 @@ async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { }, }; assert_eq!(tx_id, msg_tx_id); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); // Test sending the Reply to a receiver with Direct and then with SAF and never both let mut bob_ts_interface = setup_transaction_service_no_comms( @@ -3146,7 +3533,7 @@ async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { 0, "Should be no more calls" ); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut bob2_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -3347,7 +3734,7 @@ async fn test_tx_direct_send_behaviour() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let mut alice_event_stream = alice_ts_interface.transaction_service_handle.get_event_stream(); @@ -3616,13 +4003,13 @@ async fn test_tx_direct_send_behaviour() { async fn test_restarting_transaction_protocols() { let network = Network::LocalNet; let factories = CryptoFactories::default(); - let (alice_connection, _temp_dir) = make_wallet_database_connection(None); + let alice_connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), alice_connection, None).await; let alice_backend = alice_ts_interface.ts_db; - let (bob_connection, _temp_dir2) = make_wallet_database_connection(None); + let bob_connection = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms(factories.clone(), bob_connection, None).await; let bob_backend = bob_ts_interface.ts_db; @@ -3845,7 +4232,7 @@ async fn test_transaction_resending() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Setup Alice wallet with no comms stack - let (connection, _tempdir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -3912,7 +4299,7 @@ async fn test_transaction_resending() { } // Setup Bob's wallet with no comms stack - let (connection, _tempdir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms( factories, @@ -4120,7 +4507,7 @@ async fn test_resend_on_startup() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4169,7 +4556,7 @@ async fn test_resend_on_startup() { outbound_tx.send_count = 1; outbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (connection2, _temp_dir2) = make_wallet_database_connection(None); + let connection2 = make_wallet_database_memory_connection(); let mut alice2_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4252,7 +4639,7 @@ async fn test_resend_on_startup() { send_count: 0, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_connection = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4300,7 +4687,7 @@ async fn test_resend_on_startup() { // Now we do it again with the timestamp prior to the cooldown and see that a message is sent inbound_tx.send_count = 1; inbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (bob_connection2, _temp_dir2) = make_wallet_database_connection(None); + let bob_connection2 = make_wallet_database_memory_connection(); let mut bob2_ts_interface = setup_transaction_service_no_comms( factories, @@ -4359,7 +4746,7 @@ async fn test_replying_to_cancelled_tx() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (alice_connection, _tempdir) = make_wallet_database_connection(None); + let alice_connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4426,7 +4813,7 @@ async fn test_replying_to_cancelled_tx() { .unwrap(); // Setup Bob's wallet with no comms stack - let (bob_connection, _tempdir) = make_wallet_database_connection(None); + let bob_connection = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms( factories, @@ -4491,7 +4878,7 @@ async fn test_transaction_timeout_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (alice_connection, _tempdir) = make_wallet_database_connection(None); + let alice_connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4637,7 +5024,7 @@ async fn test_transaction_timeout_cancellation() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_connection = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms( factories.clone(), @@ -4687,7 +5074,7 @@ async fn test_transaction_timeout_cancellation() { let call = bob_ts_interface.outbound_service_mock_state.pop_call().await.unwrap(); let bob_cancelled_message = try_decode_transaction_cancelled_message(call.1.to_vec()).unwrap(); assert_eq!(bob_cancelled_message.tx_id, tx_id.as_u64()); - let (carol_connection, _temp) = make_wallet_database_connection(None); + let carol_connection = make_wallet_database_memory_connection(); // Now to do this for the Receiver let mut carol_ts_interface = setup_transaction_service_no_comms( @@ -4761,7 +5148,7 @@ async fn transaction_service_tx_broadcast() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let mut alice_event_stream = alice_ts_interface.transaction_service_handle.get_event_stream(); @@ -4770,7 +5157,7 @@ async fn transaction_service_tx_broadcast() { .wallet_connectivity_service_mock .set_base_node(alice_ts_interface.base_node_identity.to_peer()); - let (connection2, _temp_dir2) = make_wallet_database_connection(None); + let connection2 = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection2, None).await; let alice_output_value = MicroMinotari(250000); @@ -5127,7 +5514,7 @@ async fn transaction_service_tx_broadcast() { #[tokio::test] async fn broadcast_all_completed_transactions_on_startup() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let db = alice_ts_interface.ts_db.clone(); @@ -5267,7 +5654,7 @@ async fn broadcast_all_completed_transactions_on_startup() { async fn test_update_faux_tx_on_oms_validation() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let alice_address = TariAddress::new( @@ -5441,7 +5828,7 @@ async fn test_update_faux_tx_on_oms_validation() { async fn test_update_coinbase_tx_on_oms_validation() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; let alice_address = TariAddress::new( @@ -5613,7 +6000,7 @@ async fn test_update_coinbase_tx_on_oms_validation() { #[tokio::test] async fn test_get_fee_per_gram_per_block_basic() { let factories = CryptoFactories::default(); - let (connection, _temp_dir) = make_wallet_database_connection(None); + let connection = make_wallet_database_memory_connection(); let mut alice_ts_interface = setup_transaction_service_no_comms(factories, connection, None).await; let stats = vec![base_node_proto::MempoolFeePerGramStat { order: 0, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 37fab7c676..7d9c5492b1 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -5739,7 +5739,7 @@ pub unsafe extern "C" fn wallet_get_utxos( }], }; - match (*wallet).wallet.output_db.fetch_outputs_by(q) { + match (*wallet).wallet.output_db.fetch_outputs_by_query(q) { Ok(outputs) => { ptr::replace(error_ptr, 0); Box::into_raw(Box::new(TariVector::from(outputs))) @@ -5809,7 +5809,7 @@ pub unsafe extern "C" fn wallet_get_all_utxos(wallet: *mut TariWallet, error_ptr sorting: vec![], }; - match (*wallet).wallet.output_db.fetch_outputs_by(q) { + match (*wallet).wallet.output_db.fetch_outputs_by_query(q) { Ok(outputs) => { ptr::replace(error_ptr, 0); Box::into_raw(Box::new(TariVector::from(outputs))) @@ -10425,7 +10425,7 @@ mod test { let unspent_outputs = (*alice_wallet) .wallet .output_db - .fetch_outputs_by(OutputBackendQuery { + .fetch_outputs_by_query(OutputBackendQuery { status: vec![OutputStatus::Unspent], ..Default::default() }) @@ -10437,7 +10437,7 @@ mod test { let new_pending_outputs = (*alice_wallet) .wallet .output_db - .fetch_outputs_by(OutputBackendQuery { + .fetch_outputs_by_query(OutputBackendQuery { status: vec![OutputStatus::EncumberedToBeReceived], ..Default::default() }) @@ -10641,7 +10641,7 @@ mod test { let unspent_outputs = (*alice_wallet) .wallet .output_db - .fetch_outputs_by(OutputBackendQuery { + .fetch_outputs_by_query(OutputBackendQuery { status: vec![OutputStatus::Unspent], ..Default::default() }) @@ -10653,7 +10653,7 @@ mod test { let new_pending_outputs = (*alice_wallet) .wallet .output_db - .fetch_outputs_by(OutputBackendQuery { + .fetch_outputs_by_query(OutputBackendQuery { status: vec![OutputStatus::EncumberedToBeReceived], ..Default::default() })