From c8b737e246183e36e97c6821649b3920d4c2a32e Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Tue, 7 Sep 2021 12:20:32 +0200 Subject: [PATCH] fix: disable P2P transaction negotiation while recovery is in progress Some weird behaviour was observed when a wallet would be busy with recovery and then receive transaction negotiation messages, either directly or via SAF. The Recovery process is updating the Key Manager Indices and looking for commitments on the blockchain so to allow transaction negotiation during this time is dangerous as it might put duplicate commitments into the db and reuse spending keys. This PR checks for the db key/value used to indicate Recovery progress before handling a transaction negotiation p2p message and if it is there the message is ignored with a log. --- base_layer/wallet/src/test_utils.rs | 25 +- .../wallet/src/transaction_service/error.rs | 5 + .../wallet/src/transaction_service/mod.rs | 57 +-- .../wallet/src/transaction_service/service.rs | 117 ++++--- base_layer/wallet/src/wallet.rs | 1 + .../tests/transaction_service/service.rs | 328 +++++++----------- base_layer/wallet/tests/wallet/mod.rs | 21 +- base_layer/wallet_ffi/src/callback_handler.rs | 7 +- 8 files changed, 268 insertions(+), 293 deletions(-) diff --git a/base_layer/wallet/src/test_utils.rs b/base_layer/wallet/src/test_utils.rs index 5cf0758919..32da53b26c 100644 --- a/base_layer/wallet/src/test_utils.rs +++ b/base_layer/wallet/src/test_utils.rs @@ -20,12 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{ - contacts_service::storage::sqlite_db::ContactsServiceSqliteDatabase, - output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, - storage::{sqlite_db::WalletSqliteDatabase, sqlite_utilities::run_migration_and_create_sqlite_connection}, - transaction_service::storage::sqlite_db::TransactionServiceSqliteDatabase, -}; +use crate::storage::sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}; use core::iter; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use std::path::Path; @@ -39,15 +34,7 @@ pub fn random_string(len: usize) -> String { } /// A test helper to create a temporary wallet service databases -pub fn make_wallet_databases( - path: Option, -) -> ( - WalletSqliteDatabase, - TransactionServiceSqliteDatabase, - OutputManagerSqliteDatabase, - ContactsServiceSqliteDatabase, - Option, -) { +pub fn make_wallet_database_connection(path: Option) -> (WalletDbConnection, Option) { let (path_string, temp_dir): (String, Option) = if let Some(p) = path { (p, None) } else { @@ -61,11 +48,5 @@ pub fn make_wallet_databases( let connection = run_migration_and_create_sqlite_connection(&db_path.to_str().expect("Should be able to make path")).unwrap(); - ( - WalletSqliteDatabase::new(connection.clone(), None).expect("Should be able to create wallet database"), - TransactionServiceSqliteDatabase::new(connection.clone(), None), - OutputManagerSqliteDatabase::new(connection.clone(), None), - ContactsServiceSqliteDatabase::new(connection), - temp_dir, - ) + (connection, temp_dir) } diff --git a/base_layer/wallet/src/transaction_service/error.rs b/base_layer/wallet/src/transaction_service/error.rs index 05e9e4af2b..6a2a8af144 100644 --- a/base_layer/wallet/src/transaction_service/error.rs +++ b/base_layer/wallet/src/transaction_service/error.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ + error::WalletStorageError, output_manager_service::{error::OutputManagerError, TxId}, transaction_service::storage::database::DbKey, }; @@ -100,6 +101,8 @@ pub enum TransactionServiceError { TransportChannelError(#[from] TransportChannelError), #[error("Transaction storage error: `{0}`")] TransactionStorageError(#[from] TransactionStorageError), + #[error("Wallet storage error: `{0}`")] + WalletStorageError(#[from] WalletStorageError), #[error("Invalid message error: `{0}`")] InvalidMessageError(String), #[error("Transaction error: `{0}`")] @@ -140,6 +143,8 @@ pub enum TransactionServiceError { ByteArrayError(#[from] tari_crypto::tari_utilities::ByteArrayError), #[error("Transaction Service Error: `{0}`")] ServiceError(String), + #[error("Wallet Recovery in progress so Transaction Service Messaging Requests ignored")] + WalletRecoveryInProgress, } #[derive(Debug, Error)] diff --git a/base_layer/wallet/src/transaction_service/mod.rs b/base_layer/wallet/src/transaction_service/mod.rs index 541d898770..48b3484dca 100644 --- a/base_layer/wallet/src/transaction_service/mod.rs +++ b/base_layer/wallet/src/transaction_service/mod.rs @@ -22,6 +22,16 @@ use std::sync::Arc; +use crate::{ + output_manager_service::handle::OutputManagerHandle, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::{ + config::TransactionServiceConfig, + handle::TransactionServiceHandle, + service::TransactionService, + storage::database::{TransactionBackend, TransactionDatabase}, + }, +}; use futures::{Stream, StreamExt}; use log::*; use tokio::sync::broadcast; @@ -46,16 +56,6 @@ use tari_service_framework::{ ServiceInitializerContext, }; -use crate::{ - output_manager_service::handle::OutputManagerHandle, - transaction_service::{ - config::TransactionServiceConfig, - handle::TransactionServiceHandle, - service::TransactionService, - storage::database::{TransactionBackend, TransactionDatabase}, - }, -}; - pub mod config; pub mod error; pub mod handle; @@ -67,18 +67,23 @@ pub mod tasks; const LOG_TARGET: &str = "wallet::transaction_service"; const SUBSCRIPTION_LABEL: &str = "Transaction Service"; -pub struct TransactionServiceInitializer -where T: TransactionBackend +pub struct TransactionServiceInitializer +where + T: TransactionBackend, + W: WalletBackend, { config: TransactionServiceConfig, subscription_factory: Arc, - backend: Option, + tx_backend: Option, node_identity: Arc, factories: CryptoFactories, + wallet_database: Option>, } -impl TransactionServiceInitializer -where T: TransactionBackend +impl TransactionServiceInitializer +where + T: TransactionBackend, + W: WalletBackend, { pub fn new( config: TransactionServiceConfig, @@ -86,13 +91,15 @@ where T: TransactionBackend backend: T, node_identity: Arc, factories: CryptoFactories, + wallet_database: WalletDatabase, ) -> Self { Self { config, subscription_factory, - backend: Some(backend), + tx_backend: Some(backend), node_identity, factories, + wallet_database: Some(wallet_database), } } @@ -164,8 +171,10 @@ where T: TransactionBackend } #[async_trait] -impl ServiceInitializer for TransactionServiceInitializer -where T: TransactionBackend + 'static +impl ServiceInitializer for TransactionServiceInitializer +where + T: TransactionBackend + 'static, + W: WalletBackend + 'static, { async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { let (sender, receiver) = reply_channel::unbounded(); @@ -182,11 +191,16 @@ where T: TransactionBackend + 'static // Register handle before waiting for handles to be ready context.register_handle(transaction_handle); - let backend = self - .backend + let tx_backend = self + .tx_backend .take() .expect("Cannot start Transaction Service without providing a backend"); + let wallet_database = self + .wallet_database + .take() + .expect("Cannot start Transaction Service without providing a wallet database"); + let node_identity = self.node_identity.clone(); let factories = self.factories.clone(); let config = self.config.clone(); @@ -198,7 +212,8 @@ where T: TransactionBackend + 'static let result = TransactionService::new( config, - TransactionDatabase::new(backend), + TransactionDatabase::new(tx_backend), + wallet_database, receiver, transaction_stream, transaction_reply_stream, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 12aa052382..c5fbb892d2 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -20,21 +20,44 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - collections::{HashMap, HashSet}, - convert::TryInto, - sync::Arc, - time::{Duration, Instant}, +use crate::{ + output_manager_service::{handle::OutputManagerHandle, TxId}, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::{ + config::TransactionServiceConfig, + error::{TransactionServiceError, TransactionServiceProtocolError}, + handle::{TransactionEvent, TransactionEventSender, TransactionServiceRequest, TransactionServiceResponse}, + protocols::{ + transaction_broadcast_protocol::TransactionBroadcastProtocol, + transaction_coinbase_monitoring_protocol::TransactionCoinbaseMonitoringProtocol, + transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage}, + transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage}, + transaction_validation_protocol::TransactionValidationProtocol, + }, + storage::{ + database::{TransactionBackend, TransactionDatabase}, + models::{CompletedTransaction, TransactionDirection, TransactionStatus}, + }, + tasks::{ + send_finalized_transaction::send_finalized_transaction_message, + send_transaction_cancelled::send_transaction_cancelled_message, + send_transaction_reply::send_transaction_reply, + }, + }, + types::{HashDigest, ValidationRetryStrategy}, + utxo_scanner_service::utxo_scanning::RECOVERY_KEY, }; - use chrono::{NaiveDateTime, Utc}; use digest::Digest; use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt}; use log::*; use rand::{rngs::OsRng, RngCore}; -use tari_crypto::{keys::DiffieHellmanSharedSecret, script, tari_utilities::ByteArray}; -use tokio::{sync::broadcast, task::JoinHandle}; - +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + sync::Arc, + time::{Duration, Instant}, +}; use tari_common_types::types::PrivateKey; use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeIdentity, types::CommsPublicKey}; use tari_comms_dht::outbound::OutboundMessageRequester; @@ -54,35 +77,13 @@ use tari_core::{ ReceiverTransactionProtocol, }, }; +use tari_crypto::{keys::DiffieHellmanSharedSecret, script, tari_utilities::ByteArray}; use tari_p2p::domain_message::DomainMessage; use tari_service_framework::{reply_channel, reply_channel::Receiver}; use tari_shutdown::ShutdownSignal; -use tokio::sync::{mpsc, mpsc::Sender, oneshot}; - -use crate::{ - output_manager_service::{handle::OutputManagerHandle, TxId}, - transaction_service::{ - config::TransactionServiceConfig, - error::{TransactionServiceError, TransactionServiceProtocolError}, - handle::{TransactionEvent, TransactionEventSender, TransactionServiceRequest, TransactionServiceResponse}, - protocols::{ - transaction_broadcast_protocol::TransactionBroadcastProtocol, - transaction_coinbase_monitoring_protocol::TransactionCoinbaseMonitoringProtocol, - transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage}, - transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage}, - transaction_validation_protocol::TransactionValidationProtocol, - }, - storage::{ - database::{TransactionBackend, TransactionDatabase}, - models::{CompletedTransaction, TransactionDirection, TransactionStatus}, - }, - tasks::{ - send_finalized_transaction::send_finalized_transaction_message, - send_transaction_cancelled::send_transaction_cancelled_message, - send_transaction_reply::send_transaction_reply, - }, - }, - types::{HashDigest, ValidationRetryStrategy}, +use tokio::{ + sync::{broadcast, mpsc, mpsc::Sender, oneshot}, + task::JoinHandle, }; const LOG_TARGET: &str = "wallet::transaction_service::service"; @@ -107,7 +108,10 @@ pub struct TransactionService< BNResponseStream, TBackend, TTxCancelledStream, -> where TBackend: TransactionBackend + 'static + WBackend, +> where + TBackend: TransactionBackend + 'static, + WBackend: WalletBackend + 'static, { config: TransactionServiceConfig, db: TransactionDatabase, @@ -134,11 +138,20 @@ pub struct TransactionService< timeout_update_publisher: broadcast::Sender, base_node_update_publisher: broadcast::Sender, power_mode: PowerMode, + wallet_db: WalletDatabase, } #[allow(clippy::too_many_arguments)] -impl - TransactionService +impl + TransactionService< + TTxStream, + TTxReplyStream, + TTxFinalizedStream, + BNResponseStream, + TBackend, + TTxCancelledStream, + WBackend, + > where TTxStream: Stream>, TTxReplyStream: Stream>, @@ -146,10 +159,12 @@ where BNResponseStream: Stream>, TTxCancelledStream: Stream>, TBackend: TransactionBackend + 'static, + WBackend: WalletBackend + 'static, { pub fn new( config: TransactionServiceConfig, db: TransactionDatabase, + wallet_db: WalletDatabase, request_stream: Receiver< TransactionServiceRequest, Result, @@ -208,6 +223,7 @@ where timeout_update_publisher, base_node_update_publisher, power_mode: PowerMode::Normal, + wallet_db, } } @@ -316,7 +332,7 @@ where msg.dht_header.message_tag); } Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: {}", + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {} for NodeID: {}, Trace: {}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_tag); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling \ Transaction Sender message: {:?}", e).to_string()))); @@ -346,7 +362,7 @@ where msg.dht_header.message_tag); }, Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} \ + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {} \ for NodeId: {}, Trace: {}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_tag); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling \ @@ -384,7 +400,7 @@ where msg.dht_header.message_tag); }, Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} \ + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {} \ for NodeID: {}, Trace: {}", e , self.node_identity.node_id().short_str(), msg.dht_header.message_tag.as_value()); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction \ @@ -879,6 +895,9 @@ where source_pubkey: CommsPublicKey, recipient_reply: proto::RecipientSignedMessage, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let recipient_reply: RecipientSignedMessage = recipient_reply .try_into() .map_err(TransactionServiceError::InvalidMessageError)?; @@ -1181,6 +1200,9 @@ where traced_message_tag: u64, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let sender_message: TransactionSenderMessage = sender_message .try_into() .map_err(TransactionServiceError::InvalidMessageError)?; @@ -1289,6 +1311,9 @@ where finalized_transaction: proto::TransactionFinalizedMessage, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let tx_id = finalized_transaction.tx_id; let transaction: Transaction = finalized_transaction .transaction @@ -2025,6 +2050,16 @@ where Ok(()) } + + /// Check if a Recovery Status is currently stored in the databse, this indicates that a wallet recovery is in + /// progress + async fn check_recovery_status(&self) -> Result<(), TransactionServiceError> { + let value = self.wallet_db.get_client_key_value(RECOVERY_KEY.to_owned()).await?; + match value { + None => Ok(()), + Some(_) => Err(TransactionServiceError::WalletRecoveryInProgress), + } + } } /// This struct is a collection of the common resources that a protocol in the service requires. diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index f29b78cdfc..72733c459b 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -187,6 +187,7 @@ where transaction_backend, node_identity.clone(), factories.clone(), + wallet_database.clone(), )) .add_initializer(ContactsServiceInitializer::new(contacts_backend)) .add_initializer(BaseNodeServiceInitializer::new( diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 6e1265460d..a7db549994 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -115,17 +115,18 @@ use tari_wallet::{ handle::OutputManagerHandle, service::OutputManagerService, storage::{ - database::{OutputManagerBackend, OutputManagerDatabase}, + database::OutputManagerDatabase, models::KnownOneSidedPaymentScript, sqlite_db::OutputManagerSqliteDatabase, }, OutputManagerServiceInitializer, }, storage::{ - database::{WalletBackend, WalletDatabase}, - sqlite_utilities::run_migration_and_create_sqlite_connection, + database::WalletDatabase, + sqlite_db::WalletSqliteDatabase, + sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}, }, - test_utils::make_wallet_databases, + test_utils::make_wallet_database_connection, transaction_service::{ config::TransactionServiceConfig, error::TransactionServiceError, @@ -158,19 +159,12 @@ fn create_runtime() -> Runtime { } #[allow(clippy::too_many_arguments)] -pub fn setup_transaction_service< - W: WalletBackend + 'static, - T: TransactionBackend + 'static, - K: OutputManagerBackend + 'static, - P: AsRef, ->( +pub fn setup_transaction_service>( runtime: &mut Runtime, node_identity: Arc, peers: Vec>, factories: CryptoFactories, - wallet_backend: W, - tx_backend: T, - oms_backend: K, + db_connection: WalletDbConnection, database_path: P, discovery_request_timeout: Duration, shutdown_signal: ShutdownSignal, @@ -187,11 +181,14 @@ pub fn setup_transaction_service< shutdown_signal.clone(), )); - let db = WalletDatabase::new(wallet_backend); + let db = WalletDatabase::new(WalletSqliteDatabase::new(db_connection.clone(), None).unwrap()); let metadata = ChainMetadata::new(std::u64::MAX, Vec::new(), 0, 0, 0); runtime.block_on(db.set_chain_metadata(metadata)).unwrap(); + let ts_backend = TransactionServiceSqliteDatabase::new(db_connection.clone(), None); + let oms_backend = OutputManagerSqliteDatabase::new(db_connection, None); + let fut = StackBuilder::new(shutdown_signal) .add_initializer(RegisterHandle::new(dht)) .add_initializer(RegisterHandle::new(comms.connectivity())) @@ -211,9 +208,10 @@ pub fn setup_transaction_service< ..Default::default() }, subscription_factory, - tx_backend, + ts_backend, comms.node_identity(), factories, + db.clone(), )) .add_initializer(BaseNodeServiceInitializer::new(BaseNodeServiceConfig::default(), db)) .add_initializer(WalletConnectivityInitializer::new(BaseNodeServiceConfig::default())) @@ -230,11 +228,10 @@ pub fn setup_transaction_service< /// This utility function creates a Transaction service without using the Service Framework Stack and exposes all the /// streams for testing purposes. #[allow(clippy::type_complexity)] -pub fn setup_transaction_service_no_comms( +pub fn setup_transaction_service_no_comms( runtime: &mut Runtime, factories: CryptoFactories, - tx_backend: T, - oms_backend: K, + db_connection: WalletDbConnection, config: Option, ) -> ( TransactionServiceHandle, @@ -251,18 +248,14 @@ pub fn setup_transaction_service_no_comms, BaseNodeWalletRpcMockState, ) { - setup_transaction_service_no_comms_and_oms_backend(runtime, factories, tx_backend, oms_backend, config) + setup_transaction_service_no_comms_and_oms_backend(runtime, factories, db_connection, config) } #[allow(clippy::type_complexity)] -pub fn setup_transaction_service_no_comms_and_oms_backend< - T: TransactionBackend + 'static, - S: OutputManagerBackend + 'static, ->( +pub fn setup_transaction_service_no_comms_and_oms_backend( runtime: &mut Runtime, factories: CryptoFactories, - tx_backend: T, - oms_backend: S, + db_connection: WalletDbConnection, config: Option, ) -> ( TransactionServiceHandle, @@ -336,12 +329,18 @@ pub fn setup_transaction_service_no_comms_and_oms_backend< mock_base_node_service.set_default_base_node_state(); runtime.spawn(mock_base_node_service.run()); + let wallet_db = WalletDatabase::new( + WalletSqliteDatabase::new(db_connection.clone(), None).expect("Should be able to create wallet database"), + ); + let ts_db = TransactionDatabase::new(TransactionServiceSqliteDatabase::new(db_connection.clone(), None)); + let oms_db = OutputManagerDatabase::new(OutputManagerSqliteDatabase::new(db_connection, None)); + let output_manager_service = runtime .block_on(OutputManagerService::new( OutputManagerServiceConfig::default(), ts_handle.clone(), oms_request_receiver, - OutputManagerDatabase::new(oms_backend), + oms_db, oms_event_publisher.clone(), factories.clone(), constants, @@ -369,7 +368,8 @@ pub fn setup_transaction_service_no_comms_and_oms_backend< let ts_service = TransactionService::new( test_config, - TransactionDatabase::new(tx_backend), + ts_db, + wallet_db, ts_request_receiver, tx_receiver, tx_ack_receiver, @@ -490,10 +490,8 @@ fn manage_single_transaction() { ); let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -501,9 +499,7 @@ fn manage_single_transaction() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path.clone(), Duration::from_secs(0), shutdown.to_signal(), @@ -521,9 +517,7 @@ fn manage_single_transaction() { bob_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -648,8 +642,7 @@ fn single_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -657,9 +650,7 @@ fn single_transaction_to_self() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + db_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -740,8 +731,7 @@ fn send_one_sided_transaction_to_other() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -749,9 +739,7 @@ fn send_one_sided_transaction_to_other() { alice_node_identity, vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + db_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -861,10 +849,8 @@ fn recover_one_sided_transaction() { let database_path = temp_dir.path().to_str().unwrap().to_string(); let database_path2 = temp_dir2.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path2.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path2.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, alice_oms, _alice_comms) = setup_transaction_service( @@ -872,9 +858,7 @@ fn recover_one_sided_transaction() { alice_node_identity, vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -885,9 +869,7 @@ fn recover_one_sided_transaction() { bob_node_identity.clone(), vec![], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path2, Duration::from_secs(0), shutdown.to_signal(), @@ -976,8 +958,7 @@ fn send_one_sided_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, alice_oms, _alice_comms) = setup_transaction_service( @@ -985,9 +966,7 @@ fn send_one_sided_transaction_to_self() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -1061,12 +1040,9 @@ fn manage_multiple_transactions() { let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (carol_wallet_backend, carol_backend, carol_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (carol_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let mut shutdown = Shutdown::new(); @@ -1075,9 +1051,7 @@ fn manage_multiple_transactions() { alice_node_identity.clone(), vec![bob_node_identity.clone(), carol_node_identity.clone()], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path.clone(), Duration::from_secs(60), shutdown.to_signal(), @@ -1092,9 +1066,7 @@ fn manage_multiple_transactions() { bob_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path.clone(), Duration::from_secs(1), shutdown.to_signal(), @@ -1107,9 +1079,7 @@ fn manage_multiple_transactions() { carol_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - carol_wallet_backend, - carol_backend, - carol_oms_backend, + carol_connection, database_path, Duration::from_secs(1), shutdown.to_signal(), @@ -1299,8 +1269,6 @@ fn test_accepting_unknown_tx_id_and_malformed_reply() { let alice_db_name = format!("{}.sqlite3", random::string(8).as_str()); let alice_db_path = format!("{}/{}", path_string, alice_db_name); let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection_alice, None); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); @@ -1318,7 +1286,7 @@ fn test_accepting_unknown_tx_id_and_malformed_reply() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); @@ -1415,11 +1383,6 @@ fn finalize_tx_with_incorrect_pubkey() { let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); let connection_bob = run_migration_and_create_sqlite_connection(&bob_db_path).unwrap(); - let alice_oms_backend = OutputManagerSqliteDatabase::new(connection_alice.clone(), None); - let bob_oms_backend = OutputManagerSqliteDatabase::new(connection_bob.clone(), None); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice, None); - let bob_backend = TransactionServiceSqliteDatabase::new(connection_bob, None); - let ( mut alice_ts, _alice_output_manager, @@ -1434,7 +1397,7 @@ fn finalize_tx_with_incorrect_pubkey() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, alice_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); let bob_node_identity = @@ -1453,7 +1416,7 @@ fn finalize_tx_with_incorrect_pubkey() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_bob, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1545,10 +1508,6 @@ fn finalize_tx_with_missing_output() { let bob_db_path = format!("{}/{}", path_string, bob_db_name); let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); let connection_bob = run_migration_and_create_sqlite_connection(&bob_db_path).unwrap(); - let alice_oms_backend = OutputManagerSqliteDatabase::new(connection_alice.clone(), None); - let bob_oms_backend = OutputManagerSqliteDatabase::new(connection_bob.clone(), None); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice, None); - let bob_backend = TransactionServiceSqliteDatabase::new(connection_bob, None); let ( mut alice_ts, @@ -1564,7 +1523,7 @@ fn finalize_tx_with_missing_output() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, alice_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); let bob_node_identity = @@ -1583,7 +1542,7 @@ fn finalize_tx_with_missing_output() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_bob, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1710,31 +1669,27 @@ fn discovery_async_return_test() { ); let mut shutdown = Shutdown::new(); - let (carol_wallet_backend, carol_db, carol_oms_db, _, _temp_dir1) = make_wallet_databases(None); + let (carol_connection, _temp_dir1) = make_wallet_database_connection(None); let (_carol_ts, _carol_oms, carol_comms) = setup_transaction_service( &mut runtime, carol_node_identity.clone(), vec![], factories.clone(), - carol_wallet_backend, - carol_db, - carol_oms_db, + carol_connection, db_folder.join("carol"), Duration::from_secs(1), shutdown.to_signal(), ); - let (alice_wallet_backend, alice_db, alice_oms_db, _, _temp_dir2) = make_wallet_databases(None); + let (alice_connection, _temp_dir2) = make_wallet_database_connection(None); let (mut alice_ts, mut alice_oms, alice_comms) = setup_transaction_service( &mut runtime, alice_node_identity, vec![carol_node_identity.clone()], factories.clone(), - alice_wallet_backend, - alice_db, - alice_oms_db, + alice_connection, db_folder.join("alice"), Duration::from_secs(20), shutdown.to_signal(), @@ -1855,7 +1810,8 @@ fn discovery_async_return_test() { fn test_power_mode_updates() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_wallet_backend, tx_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); + let tx_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -1936,7 +1892,7 @@ fn test_power_mode_updates() { _, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -1969,19 +1925,12 @@ fn test_set_num_confirmations() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let db_name = format!("{}.sqlite3", random::string(8).as_str()); - let temp_dir = tempdir().unwrap(); - let db_folder = temp_dir.path().to_str().unwrap().to_string(); - let connection = run_migration_and_create_sqlite_connection(&format!("{}/{}", db_folder, db_name)).unwrap(); - - let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (mut ts, _, _, _, _, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories, - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2012,13 +1961,7 @@ fn test_transaction_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let db_name = format!("{}.sqlite3", random::string(8).as_str()); - let temp_dir = tempdir().unwrap(); - let db_folder = temp_dir.path().to_str().unwrap().to_string(); - let connection = run_migration_and_create_sqlite_connection(&format!("{}/{}", db_folder, db_name)).unwrap(); - - let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2037,8 +1980,7 @@ fn test_transaction_cancellation() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2330,7 +2272,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_wallet_backend, tx_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2346,7 +2288,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let alice_total_available = 250000 * uT; let (_utxo, uo) = make_input(&mut OsRng, alice_total_available, &factories.commitment); @@ -2384,15 +2326,14 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { }, }; assert_eq!(tx_id, msg_tx_id); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); // Test sending the Reply to a receiver with Direct and then with SAF and never both let (_bob_ts, _, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2427,14 +2368,13 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { runtime.block_on(async { sleep(Duration::from_secs(5)).await }); assert_eq!(bob_outbound_service.call_count(), 0, "Should be no more calls"); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (_bob2_ts, _, bob2_outbound_service, _, mut bob2_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2570,7 +2510,7 @@ fn test_tx_direct_send_behaviour() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2586,7 +2526,7 @@ fn test_tx_direct_send_behaviour() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); let (_utxo, uo) = make_input(&mut OsRng, 1000000 * uT, &factories.commitment); @@ -2776,8 +2716,11 @@ tokio::pin!(delay); fn test_restarting_transaction_protocols() { let mut runtime = Runtime::new().unwrap(); let factories = CryptoFactories::default(); - let (_wallet_backend, alice_backend, alice_oms_backend, _, _temp_dir) = make_wallet_databases(None); - let (_, bob_backend, bob_oms_backend, _, _temp_dir2) = make_wallet_databases(None); + let (alice_connection, _temp_dir) = make_wallet_database_connection(None); + let alice_backend = TransactionServiceSqliteDatabase::new(alice_connection.clone(), None); + + let (bob_connection, _temp_dir2) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); let base_node_identity = Arc::new(NodeIdentity::random( &mut OsRng, @@ -2894,7 +2837,7 @@ fn test_restarting_transaction_protocols() { // Test that Bob's node restarts the send protocol let (mut bob_ts, _bob_oms, _bob_outbound_service, _, _, mut bob_tx_reply, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_connection, None); let mut bob_event_stream = bob_ts.get_event_stream(); runtime @@ -2929,7 +2872,7 @@ fn test_restarting_transaction_protocols() { // Test Alice's node restarts the receive protocol let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, _, mut alice_tx_finalized, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories, alice_backend, alice_oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, alice_connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime @@ -2976,7 +2919,7 @@ fn test_coinbase_transactions_rejection_same_height() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2992,7 +2935,7 @@ fn test_coinbase_transactions_rejection_same_height() { _mock_rpc_server, _server_node_identity, _rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let block_height_a = 10; let block_height_b = block_height_a + 1; @@ -3074,7 +3017,7 @@ fn test_coinbase_monitoring_stuck_in_mempool() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3090,7 +3033,7 @@ fn test_coinbase_monitoring_stuck_in_mempool() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3245,7 +3188,7 @@ fn test_coinbase_monitoring_with_base_node_change_and_mined() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3261,7 +3204,7 @@ fn test_coinbase_monitoring_with_base_node_change_and_mined() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3449,7 +3392,7 @@ fn test_coinbase_monitoring_mined_not_synced() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3465,7 +3408,7 @@ fn test_coinbase_monitoring_mined_not_synced() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3620,10 +3563,10 @@ fn test_coinbase_monitoring_mined_not_synced() { fn test_coinbase_transaction_reused_for_same_height() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (mut tx_service, mut output_service, _, _, _, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let blockheight1 = 10; let fees1 = 2000 * uT; @@ -3706,7 +3649,7 @@ fn test_transaction_resending() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Setup Alice wallet with no comms stack - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3725,8 +3668,7 @@ fn test_transaction_resending() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3767,7 +3709,7 @@ fn test_transaction_resending() { } // Setup Bob's wallet with no comms stack - let (_, bob_backend, bob_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (connection, _tempdir) = make_wallet_database_connection(None); let ( _bob_ts, @@ -3786,8 +3728,7 @@ fn test_transaction_resending() { ) = setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend, - bob_oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3952,7 +3893,8 @@ fn test_resend_on_startup() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, alice_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); + let alice_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); alice_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( tx_id, @@ -3964,8 +3906,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3991,7 +3932,8 @@ fn test_resend_on_startup() { outbound_tx.send_count = 1; outbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (_, alice_backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); + let alice_backend2 = TransactionServiceSqliteDatabase::new(connection2.clone(), None); alice_backend2 .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( @@ -4004,8 +3946,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend2, - oms_backend2, + connection2, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4056,7 +3997,8 @@ fn test_resend_on_startup() { send_count: 0, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, bob_backend, bob_oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); bob_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( @@ -4069,8 +4011,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4093,8 +4034,8 @@ fn test_resend_on_startup() { // Now we do it again with the timestamp prior to the cooldown and see that a message is sent inbound_tx.send_count = 1; inbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (_, bob_backend2, bob_oms_backend2, _, _temp_dir2) = make_wallet_databases(None); - + let (bob_connection2, _temp_dir2) = make_wallet_database_connection(None); + let bob_backend2 = TransactionServiceSqliteDatabase::new(bob_connection2.clone(), None); bob_backend2 .write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( tx_id, @@ -4106,8 +4047,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend2, - bob_oms_backend2, + bob_connection2, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4144,7 +4084,7 @@ fn test_replying_to_cancelled_tx() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (alice_connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4163,8 +4103,7 @@ fn test_replying_to_cancelled_tx() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + alice_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4203,7 +4142,7 @@ fn test_replying_to_cancelled_tx() { runtime.block_on(alice_ts.cancel_transaction(tx_id)).unwrap(); // Setup Bob's wallet with no comms stack - let (_, bob_backend, bob_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (bob_connection, _tempdir) = make_wallet_database_connection(None); let ( _bob_ts, @@ -4222,8 +4161,7 @@ fn test_replying_to_cancelled_tx() { ) = setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4276,7 +4214,7 @@ fn test_transaction_timeout_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (alice_connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4295,8 +4233,7 @@ fn test_transaction_timeout_cancellation() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + alice_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4404,8 +4341,8 @@ fn test_transaction_timeout_cancellation() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, bob_backend, bob_oms_backend, _, _temp_dir) = make_wallet_databases(None); - + let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); bob_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( tx_id, @@ -4417,8 +4354,7 @@ fn test_transaction_timeout_cancellation() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4445,15 +4381,14 @@ fn test_transaction_timeout_cancellation() { let call = bob_outbound_service.pop_call().unwrap(); let bob_cancelled_message = try_decode_transaction_cancelled_message(call.1.to_vec()).unwrap(); assert_eq!(bob_cancelled_message.tx_id, tx_id); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (carol_connection, _temp_dir) = make_wallet_database_connection(None); // Now to do this for the Receiver let (carol_ts, _, carol_outbound_service, _, mut carol_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories, - backend, - oms_backend, + carol_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4520,7 +4455,7 @@ fn transaction_service_tx_broadcast() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4536,16 +4471,16 @@ fn transaction_service_tx_broadcast() { _mock_rpc_server, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) .unwrap(); - let (_, backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); let (_bob_ts, _bob_output_manager, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend2, oms_backend2, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection2, None); let alice_output_value = MicroTari(250000); @@ -4826,8 +4761,8 @@ fn transaction_service_tx_broadcast() { fn broadcast_all_completed_transactions_on_startup() { let mut runtime = Runtime::new().unwrap(); let factories = CryptoFactories::default(); - let (_, db, oms_db, _, _temp_dir) = make_wallet_databases(None); - + let (connection, _temp_dir) = make_wallet_database_connection(None); + let db = TransactionServiceSqliteDatabase::new(connection.clone(), None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) .with_signature(&Signature::default()) @@ -4893,7 +4828,7 @@ fn broadcast_all_completed_transactions_on_startup() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, db, oms_db, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); rpc_service_state.set_transaction_query_response(TxQueryResponse { location: TxLocation::Mined, @@ -4960,7 +4895,7 @@ fn transaction_service_tx_broadcast_with_base_node_change() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4976,16 +4911,16 @@ fn transaction_service_tx_broadcast_with_base_node_change() { _mock_rpc_server, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) .unwrap(); - let (_, backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); let (_bob_ts, _bob_output_manager, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend2, oms_backend2, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection2, None); let alice_output_value = MicroTari(250000); @@ -5171,7 +5106,6 @@ fn only_start_one_tx_broadcast_protocol_at_a_time() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -5215,7 +5149,7 @@ fn only_start_one_tx_broadcast_protocol_at_a_time() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -5239,7 +5173,6 @@ fn dont_broadcast_invalid_transactions() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -5283,7 +5216,7 @@ fn dont_broadcast_invalid_transactions() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -5306,9 +5239,8 @@ fn start_validation_protocol_then_broadcast_protocol_change_base_node() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let tx_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); - let db = TransactionDatabase::new(tx_backend.clone()); + let db = TransactionDatabase::new(tx_backend); runtime.block_on(add_transaction_to_database( 1, @@ -5370,7 +5302,7 @@ fn start_validation_protocol_then_broadcast_protocol_change_base_node() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); rpc_service_state.set_transaction_query_response(TxQueryResponse { location: TxLocation::Mined, diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index 671d3db23c..3fc3e06086 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -57,8 +57,9 @@ use tari_p2p::{initialization::CommsConfig, transport::TransportType, Network, D use tari_shutdown::{Shutdown, ShutdownSignal}; use tari_test_utils::random; use tari_wallet::{ - contacts_service::storage::database::Contact, + contacts_service::storage::{database::Contact, sqlite_db::ContactsServiceSqliteDatabase}, error::{WalletError, WalletStorageError}, + output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, storage::{ database::{DbKeyValuePair, WalletBackend, WalletDatabase, WriteOperation}, sqlite_db::WalletSqliteDatabase, @@ -68,8 +69,12 @@ use tari_wallet::{ run_migration_and_create_sqlite_connection, }, }, - test_utils::make_wallet_databases, - transaction_service::{config::TransactionServiceConfig, handle::TransactionEvent}, + test_utils::make_wallet_database_connection, + transaction_service::{ + config::TransactionServiceConfig, + handle::TransactionEvent, + storage::sqlite_db::TransactionServiceSqliteDatabase, + }, Wallet, WalletConfig, WalletSqlite, @@ -679,7 +684,7 @@ async fn test_import_utxo() { PeerFeatures::COMMUNICATION_NODE, ); let temp_dir = tempdir().unwrap(); - let (wallet_backend, tx_backend, oms_backend, contacts_backend, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let comms_config = CommsConfig { network: Network::Weatherwax, node_identity: Arc::new(alice_identity.clone()), @@ -717,10 +722,10 @@ async fn test_import_utxo() { ); let mut alice_wallet = Wallet::start( config, - WalletDatabase::new(wallet_backend), - tx_backend, - oms_backend, - contacts_backend, + WalletDatabase::new(WalletSqliteDatabase::new(connection.clone(), None).unwrap()), + TransactionServiceSqliteDatabase::new(connection.clone(), None), + OutputManagerSqliteDatabase::new(connection.clone(), None), + ContactsServiceSqliteDatabase::new(connection), shutdown.to_signal(), None, ) diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index c5af01c7b6..af90c0bfaf 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -598,7 +598,7 @@ mod test { use tari_shutdown::Shutdown; use tari_wallet::{ output_manager_service::{handle::OutputManagerEvent, TxoValidationType}, - test_utils::make_wallet_databases, + test_utils::make_wallet_database_connection, transaction_service::{ handle::TransactionEvent, storage::{ @@ -610,6 +610,7 @@ mod test { TransactionDirection, TransactionStatus, }, + sqlite_db::TransactionServiceSqliteDatabase, }, }, }; @@ -770,8 +771,8 @@ mod test { fn test_callback_handler() { let runtime = Runtime::new().unwrap(); - let (_wallet_backend, backend, _oms_backend, _, _tempdir) = make_wallet_databases(None); - let db = TransactionDatabase::new(backend); + let (connection, _tempdir) = make_wallet_database_connection(None); + let db = TransactionDatabase::new(TransactionServiceSqliteDatabase::new(connection, None)); let rtp = ReceiverTransactionProtocol::new_placeholder(); let inbound_tx = InboundTransaction::new( 1u64,