diff --git a/base_layer/core/tests/wallet.rs b/base_layer/core/tests/wallet.rs index c618db9531..be24454377 100644 --- a/base_layer/core/tests/wallet.rs +++ b/base_layer/core/tests/wallet.rs @@ -158,9 +158,10 @@ fn wallet_base_node_integration_test() { comms_config: alice_comms_config, factories: factories.clone(), transaction_service_config: Some(TransactionServiceConfig { - mempool_broadcast_timeout: Duration::from_secs(10), - base_node_mined_timeout: Duration::from_secs(1), - ..Default::default() + base_node_monitoring_timeout: Duration::from_secs(1), + direct_send_timeout: Default::default(), + broadcast_send_timeout: Default::default(), + low_power_polling_timeout: Duration::from_secs(10), }), }; let alice_runtime = create_runtime(); @@ -294,7 +295,9 @@ fn wallet_base_node_integration_test() { ); }); } - + runtime + .block_on(alice_wallet.transaction_service.set_low_power_mode()) + .unwrap(); let transaction = transaction.expect("Transaction must be present"); // Setup and start the miner diff --git a/base_layer/wallet/src/transaction_service/config.rs b/base_layer/wallet/src/transaction_service/config.rs index 855eda85e6..aa02a04f48 100644 --- a/base_layer/wallet/src/transaction_service/config.rs +++ b/base_layer/wallet/src/transaction_service/config.rs @@ -24,19 +24,20 @@ use std::time::Duration; #[derive(Clone)] pub struct TransactionServiceConfig { - pub mempool_broadcast_timeout: Duration, - pub base_node_mined_timeout: Duration, + pub base_node_monitoring_timeout: Duration, pub direct_send_timeout: Duration, pub broadcast_send_timeout: Duration, + pub low_power_polling_timeout: Duration, /* This is the timeout period that will be used when the wallet is in + * low_power mode */ } impl Default for TransactionServiceConfig { fn default() -> Self { Self { - mempool_broadcast_timeout: Duration::from_secs(30), - base_node_mined_timeout: Duration::from_secs(30), + base_node_monitoring_timeout: Duration::from_secs(30), direct_send_timeout: Duration::from_secs(20), broadcast_send_timeout: Duration::from_secs(30), + low_power_polling_timeout: Duration::from_secs(300), } } } diff --git a/base_layer/wallet/src/transaction_service/handle.rs b/base_layer/wallet/src/transaction_service/handle.rs index 2b9dbfed15..98aff042fd 100644 --- a/base_layer/wallet/src/transaction_service/handle.rs +++ b/base_layer/wallet/src/transaction_service/handle.rs @@ -49,6 +49,8 @@ pub enum TransactionServiceRequest { CancelTransaction(TxId), ImportUtxo(MicroTari, CommsPublicKey, String), SubmitTransaction((TxId, Transaction, MicroTari, MicroTari, String)), + SetLowPowerMode, + SetNormalPowerMode, #[cfg(feature = "test_harness")] CompletePendingOutboundTransaction(CompletedTransaction), #[cfg(feature = "test_harness")] @@ -78,6 +80,8 @@ impl fmt::Display for TransactionServiceRequest { Self::CancelTransaction(t) => f.write_str(&format!("CancelTransaction ({})", t)), Self::ImportUtxo(v, k, msg) => f.write_str(&format!("ImportUtxo (from {}, {}, {})", k, v, msg)), Self::SubmitTransaction((id, _, _, _, _)) => f.write_str(&format!("SubmitTransaction ({})", id)), + Self::SetLowPowerMode => f.write_str("SetLowPowerMode "), + Self::SetNormalPowerMode => f.write_str("SetNormalPowerMode"), #[cfg(feature = "test_harness")] Self::CompletePendingOutboundTransaction(tx) => { f.write_str(&format!("CompletePendingOutboundTransaction ({})", tx.tx_id)) @@ -108,6 +112,8 @@ pub enum TransactionServiceResponse { BaseNodePublicKeySet, UtxoImported(TxId), TransactionSubmitted, + LowPowerModeSet, + NormalPowerModeSet, #[cfg(feature = "test_harness")] CompletedPendingTransaction, #[cfg(feature = "test_harness")] @@ -346,6 +352,24 @@ impl TransactionServiceHandle { } } + pub async fn set_low_power_mode(&mut self) -> Result<(), TransactionServiceError> { + match self.handle.call(TransactionServiceRequest::SetLowPowerMode).await?? { + TransactionServiceResponse::LowPowerModeSet => Ok(()), + _ => Err(TransactionServiceError::UnexpectedApiResponse), + } + } + + pub async fn set_normal_power_mode(&mut self) -> Result<(), TransactionServiceError> { + match self + .handle + .call(TransactionServiceRequest::SetNormalPowerMode) + .await?? + { + TransactionServiceResponse::NormalPowerModeSet => Ok(()), + _ => Err(TransactionServiceError::UnexpectedApiResponse), + } + } + #[cfg(feature = "test_harness")] pub async fn test_complete_pending_transaction( &mut self, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index e27a0bc7f1..2f1793068f 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -48,7 +48,7 @@ use tari_core::{ }; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tari_p2p::tari_message::TariMessageType; -use tokio::time::delay_for; +use tokio::{sync::broadcast, time::delay_for}; const LOG_TARGET: &str = "wallet::transaction_service::protocols::broadcast_protocol"; @@ -63,6 +63,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Option>, base_node_response_receiver: Option>, + timeout_update_receiver: Option>, } impl TransactionBroadcastProtocol @@ -75,6 +76,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Receiver, base_node_response_receiver: Receiver, + timeout_update_receiver: broadcast::Receiver, ) -> Self { Self { @@ -84,6 +86,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key, mempool_response_receiver: Some(mempool_response_receiver), base_node_response_receiver: Some(base_node_response_receiver), + timeout_update_receiver: Some(timeout_update_receiver), } } @@ -99,6 +102,12 @@ where TBackend: TransactionBackend + Clone + 'static .take() .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?; + let mut timeout_update_receiver = self + .timeout_update_receiver + .take() + .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))? + .fuse(); + // This is the main loop of the protocol and following the following steps // 1) Check transaction being monitored is still in the Completed state and needs to be monitored // 2) Send a MempoolRequest::SubmitTransaction to Mempool and a Mined? Request to base node @@ -191,6 +200,15 @@ where TBackend: TransactionBackend + Clone + 'static break; } }, + updated_timeout = timeout_update_receiver.select_next_some() => { + if let Ok(to) = updated_timeout { + self.timeout = to; + info!( + target: LOG_TARGET, + "Broadcast monitoring protocol (Id: {}) timeout updated to {:?}", self.id ,self.timeout + ); + } + }, () = delay => { }, } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs index 9eb4c968fd..7e76dc2bfc 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs @@ -51,12 +51,12 @@ use tari_core::{ }; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tari_p2p::tari_message::TariMessageType; -use tokio::time::delay_for; - +use tokio::{sync::broadcast, time::delay_for}; const LOG_TARGET: &str = "wallet::transaction_service::protocols::chain_monitoring_protocol"; /// This protocol defines the process of monitoring a mempool and base node to detect when a Broadcast transaction is /// Mined or leaves the mempool in which case it should be cancelled + pub struct TransactionChainMonitoringProtocol where TBackend: TransactionBackend + Clone + 'static { @@ -67,11 +67,13 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Option>, base_node_response_receiver: Option>, + timeout_update_receiver: Option>, } impl TransactionChainMonitoringProtocol where TBackend: TransactionBackend + Clone + 'static { + #[allow(clippy::too_many_arguments)] pub fn new( id: u64, tx_id: TxId, @@ -80,6 +82,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Receiver, base_node_response_receiver: Receiver, + timeout_update_receiver: broadcast::Receiver, ) -> Self { Self { @@ -90,6 +93,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key, mempool_response_receiver: Some(mempool_response_receiver), base_node_response_receiver: Some(base_node_response_receiver), + timeout_update_receiver: Some(timeout_update_receiver), } } @@ -105,6 +109,12 @@ where TBackend: TransactionBackend + Clone + 'static .take() .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?; + let mut timeout_update_receiver = self + .timeout_update_receiver + .take() + .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))? + .fuse(); + trace!( target: LOG_TARGET, "Starting chain monitoring protocol for TxId: {} with Protocol ID: {}", @@ -220,6 +230,16 @@ where TBackend: TransactionBackend + Clone + 'static } base_node_response_received = true; }, + updated_timeout = timeout_update_receiver.select_next_some() => { + if let Ok(to) = updated_timeout { + self.timeout = to; + info!( + target: LOG_TARGET, + "Chain monitoring protocol (Id: {}) timeout updated to {:?}", self.id, self.timeout + ); + break; + } + }, () = delay => { break; }, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index fa3a27d428..7026fbbb1c 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -50,6 +50,7 @@ use std::{ collections::HashMap, convert::{TryFrom, TryInto}, sync::Arc, + time::Duration, }; use tari_comms::{peer_manager::NodeIdentity, types::CommsPublicKey}; use tari_comms_dht::outbound::OutboundMessageRequester; @@ -67,9 +68,7 @@ use tari_core::{ }; use tari_p2p::domain_message::DomainMessage; use tari_service_framework::{reply_channel, reply_channel::Receiver}; -#[cfg(feature = "test_harness")] -use tokio::sync::broadcast; -use tokio::task::JoinHandle; +use tokio::{sync::broadcast, task::JoinHandle}; const LOG_TARGET: &str = "wallet::transaction_service::service"; @@ -120,6 +119,8 @@ where TBackend: TransactionBackend + Clone + 'static send_transaction_cancellation_senders: HashMap>, finalized_transaction_senders: HashMap>, receiver_transaction_cancellation_senders: HashMap>, + timeout_update_publisher: broadcast::Sender, + power_mode: PowerMode, } #[allow(clippy::too_many_arguments)] @@ -163,6 +164,8 @@ where factories, config: config.clone(), }; + let (timeout_update_publisher, _) = broadcast::channel(20); + TransactionService { config, db, @@ -183,6 +186,8 @@ where send_transaction_cancellation_senders: HashMap::new(), finalized_transaction_senders: HashMap::new(), receiver_transaction_cancellation_senders: HashMap::new(), + timeout_update_publisher, + power_mode: PowerMode::Normal, } } @@ -496,6 +501,14 @@ where self.mine_transaction(tx_id).await?; Ok(TransactionServiceResponse::TransactionMined) }, + TransactionServiceRequest::SetLowPowerMode => { + self.set_power_mode(PowerMode::Low).await?; + Ok(TransactionServiceResponse::LowPowerModeSet) + }, + TransactionServiceRequest::SetNormalPowerMode => { + self.set_power_mode(PowerMode::Normal).await?; + Ok(TransactionServiceResponse::NormalPowerModeSet) + }, } } @@ -952,20 +965,25 @@ where if completed_tx.status != TransactionStatus::Completed || completed_tx.transaction.body.kernels().is_empty() { return Err(TransactionServiceError::InvalidCompletedTransaction); } + let timeout = match self.power_mode { + PowerMode::Normal => self.config.base_node_monitoring_timeout, + PowerMode::Low => self.config.low_power_polling_timeout, + }; match self.base_node_public_key.clone() { None => return Err(TransactionServiceError::NoBaseNodeKeysProvided), Some(pk) => { - let (mempool_response_sender, mempool_response_receiver) = mpsc::channel(100); - let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(100); + let (mempool_response_sender, mempool_response_receiver) = mpsc::channel(500); + let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(500); self.mempool_response_senders.insert(tx_id, mempool_response_sender); self.base_node_response_senders.insert(tx_id, base_node_response_sender); let protocol = TransactionBroadcastProtocol::new( tx_id, self.service_resources.clone(), - self.config.mempool_broadcast_timeout, + timeout, pk, mempool_response_receiver, base_node_response_receiver, + self.timeout_update_publisher.subscribe(), ); let join_handle = tokio::spawn(protocol.execute()); join_handles.push(join_handle); @@ -1067,6 +1085,7 @@ where Err(TransactionServiceProtocolError { id, error }) => { let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + error!( target: LOG_TARGET, "Error completing Transaction Broadcast Protocol (Id: {}): {:?}", id, error @@ -1091,7 +1110,10 @@ where if completed_tx.status != TransactionStatus::Broadcast || completed_tx.transaction.body.kernels().is_empty() { return Err(TransactionServiceError::InvalidCompletedTransaction); } - + let timeout = match self.power_mode { + PowerMode::Normal => self.config.base_node_monitoring_timeout, + PowerMode::Low => self.config.low_power_polling_timeout, + }; match self.base_node_public_key.clone() { None => return Err(TransactionServiceError::NoBaseNodeKeysProvided), Some(pk) => { @@ -1107,10 +1129,11 @@ where protocol_id, completed_tx.tx_id, self.service_resources.clone(), - self.config.base_node_mined_timeout, + timeout, pk, mempool_response_receiver, base_node_response_receiver, + self.timeout_update_publisher.subscribe(), ); let join_handle = tokio::spawn(protocol.execute()); join_handles.push(join_handle); @@ -1130,6 +1153,7 @@ where // Cleanup any registered senders let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + trace!( target: LOG_TARGET, "Transaction chain monitoring Protocol for TxId: {} completed successfully", @@ -1139,6 +1163,7 @@ where Err(TransactionServiceProtocolError { id, error }) => { let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + error!( target: LOG_TARGET, "Error completing Transaction chain monitoring Protocol (Id: {}): {:?}", id, error @@ -1197,6 +1222,19 @@ where Ok(()) } + async fn set_power_mode(&mut self, mode: PowerMode) -> Result<(), TransactionServiceError> { + self.power_mode = mode; + let timeout = match mode { + PowerMode::Low => self.config.low_power_polling_timeout, + PowerMode::Normal => self.config.base_node_monitoring_timeout, + }; + self.timeout_update_publisher + .send(timeout) + .map_err(|_| TransactionServiceError::ProtocolChannelError)?; + + Ok(()) + } + /// Add a completed transaction to the Transaction Manager to record directly importing a spendable UTXO. pub async fn add_utxo_import_transaction( &mut self, @@ -1512,3 +1550,9 @@ where TBackend: TransactionBackend + Clone + 'static pub factories: CryptoFactories, pub config: TransactionServiceConfig, } + +#[derive(Clone, Copy)] +enum PowerMode { + Low, + Normal, +} diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2df1c77836..e212c6ad3f 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -40,7 +40,7 @@ use crate::{ }; use blake2::Digest; use log::*; -use std::{marker::PhantomData, sync::Arc, time::Duration}; +use std::{marker::PhantomData, sync::Arc}; use tari_comms::{ multiaddr::Multiaddr, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, @@ -62,10 +62,7 @@ use tari_crypto::{ use tari_p2p::{ comms_connector::pubsub_connector, initialization::{initialize_comms, CommsConfig}, - services::{ - comms_outbound::CommsOutboundServiceInitializer, - liveness::{LivenessConfig, LivenessHandle, LivenessInitializer}, - }, + services::comms_outbound::CommsOutboundServiceInitializer, }; use tari_service_framework::StackBuilder; use tokio::runtime::Runtime; @@ -91,7 +88,6 @@ where pub comms: CommsNode, pub dht_service: Dht, pub store_and_forward_requester: StoreAndForwardRequester, - pub liveness_service: LivenessHandle, pub output_manager_service: OutputManagerHandle, pub transaction_service: TransactionServiceHandle, pub contacts_service: ContactsServiceHandle, @@ -137,15 +133,6 @@ where let fut = StackBuilder::new(runtime.handle().clone(), comms.shutdown_signal()) .add_initializer(CommsOutboundServiceInitializer::new(dht.outbound_requester())) - .add_initializer(LivenessInitializer::new( - LivenessConfig { - auto_ping_interval: Some(Duration::from_secs(30)), - useragent: format!("tari/wallet/{}", env!("CARGO_PKG_VERSION")), - ..Default::default() - }, - Arc::clone(&subscription_factory), - dht.dht_requester(), - )) .add_initializer(OutputManagerServiceInitializer::new( OutputManagerServiceConfig::default(), subscription_factory.clone(), @@ -154,7 +141,7 @@ where )) .add_initializer(TransactionServiceInitializer::new( config.transaction_service_config.unwrap_or_default(), - subscription_factory.clone(), + subscription_factory, transaction_backend, comms.node_identity(), factories.clone(), @@ -170,9 +157,6 @@ where let mut transaction_service_handle = handles .get_handle::() .expect("Could not get Transaction Service Handle"); - let liveness_handle = handles - .get_handle::() - .expect("Could not get Liveness Service Handle"); let contacts_handle = handles .get_handle::() .expect("Could not get Contacts Service Handle"); @@ -188,7 +172,6 @@ where comms, dht_service: dht, store_and_forward_requester, - liveness_service: liveness_handle, output_manager_service: output_manager_handle, transaction_service: transaction_service_handle, contacts_service: contacts_handle, diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 03168c546c..a446d79dde 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -81,16 +81,7 @@ use tari_crypto::{ use tari_p2p::{ comms_connector::pubsub_connector, domain_message::DomainMessage, - services::{ - comms_outbound::CommsOutboundServiceInitializer, - liveness::{ - mock::{create_p2p_liveness_mock, LivenessMockState}, - LivenessConfig, - LivenessEventSender, - LivenessHandle, - LivenessInitializer, - }, - }, + services::comms_outbound::CommsOutboundServiceInitializer, }; use tari_service_framework::{reply_channel, StackBuilder}; use tari_test_utils::paths::with_temp_dir; @@ -172,8 +163,8 @@ pub fn setup_transaction_service>, Sender>, Sender>, - LivenessHandle, - LivenessMockState, - LivenessEventSender, ) { let (oms_request_sender, oms_request_receiver) = reply_channel::unbounded(); @@ -228,10 +205,6 @@ pub fn setup_transaction_service_no_comms(al mut alice_tx_finalized, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _, _, _, _) = + let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -996,15 +960,12 @@ fn finalize_tx_with_missing_output(alic mut alice_tx_finalized, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _, _, _, _) = + let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1279,9 +1240,6 @@ fn transaction_mempool_broadcast() { _, mut alice_mempool_response_sender, mut alice_base_node_response_sender, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), TransactionMemoryDatabase::new(), None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -1289,7 +1247,7 @@ fn transaction_mempool_broadcast() { .block_on(alice_ts.set_base_node_public_key(base_node_identity.public_key().clone())) .unwrap(); - let (_bob_ts, _bob_output_manager, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = + let (_bob_ts, _bob_output_manager, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), TransactionMemoryDatabase::new(), None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1506,7 +1464,7 @@ fn transaction_mempool_broadcast() { event = alice_event_stream.select_next_some() => { if let TransactionEvent::MempoolBroadcastTimedOut(_) = &*event.unwrap(){ broadcast_timeout_count +=1; - if broadcast_timeout_count >= 1 { + if broadcast_timeout_count >= 2 { break; } @@ -1517,7 +1475,7 @@ fn transaction_mempool_broadcast() { }, } } - assert!(broadcast_timeout_count >= 1); + assert!(broadcast_timeout_count >= 2); }); let mempool_response = MempoolProto::MempoolServiceResponse { @@ -1652,6 +1610,83 @@ fn try_decode_base_node_request(bytes: Vec) -> Option= 2); }); + runtime.block_on(alice_ts.set_low_power_mode()).unwrap(); + runtime.block_on(alice_ts.set_normal_power_mode()).unwrap(); + // Test that receiving a base node response with the wrong outputs does not result in a TX being mined let wrong_outputs = vec![completed_tx_outputs[0].clone(), TransactionOutput::default().into()]; @@ -2217,7 +2252,7 @@ fn query_all_completed_transactions_on_startup() { ))) .unwrap(); - let (mut alice_ts, _, _, _, _, _, _, _, _, _, _) = + let (mut alice_ts, _, _, _, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), db, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -2276,9 +2311,6 @@ fn transaction_cancellation_when_not_in_mempool() { _, mut alice_mempool_response_sender, mut alice_base_node_response_sender, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -2286,13 +2318,12 @@ fn transaction_cancellation_when_not_in_mempool() { Some(Duration::from_secs(5)), ); let mut alice_event_stream = alice_ts.get_event_stream_fused(); - let (mut bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = - setup_transaction_service_no_comms( - &mut runtime, - factories.clone(), - TransactionMemoryDatabase::new(), - Some(Duration::from_secs(20)), - ); + let (mut bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); runtime .block_on(bob_ts.set_base_node_public_key(base_node_identity.public_key().clone())) .unwrap(); @@ -2524,7 +2555,7 @@ fn test_transaction_cancellation(backen let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (mut alice_ts, mut alice_output_manager, _alice_outbound_service, mut alice_tx_sender, _, _, _, _, _, _, _) = + let (mut alice_ts, mut alice_output_manager, _alice_outbound_service, mut alice_tx_sender, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, Some(Duration::from_secs(20))); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -2675,9 +2706,6 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { _, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -2723,7 +2751,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { assert_eq!(tx_id, msg_tx_id); // Test sending the Reply to a receiver with Direct and then with SAF and never both - let (_bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = setup_transaction_service_no_comms( + let (_bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), TransactionMemoryDatabase::new(), @@ -2759,13 +2787,12 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { runtime.block_on(async { delay_for(Duration::from_secs(5)).await }); } - let (_bob2_ts, _, bob2_outbound_service, mut bob2_tx_sender, _, _, _, _, _, _, _) = - setup_transaction_service_no_comms( - &mut runtime, - factories.clone(), - TransactionMemoryDatabase::new(), - Some(Duration::from_secs(20)), - ); + let (_bob2_ts, _, bob2_outbound_service, mut bob2_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); bob2_outbound_service.set_behaviour(MockBehaviour { direct: ResponseType::Failed, broadcast: ResponseType::Queued, @@ -2899,9 +2926,6 @@ fn test_tx_direct_send_behaviour() { _, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -3189,7 +3213,7 @@ fn test_restarting_transaction_protocols() { .unwrap(); // Test that Bob's node restarts the send protocol - let (mut bob_ts, _bob_oms, _bob_outbound_service, _, mut bob_tx_reply, _, _, _, _, _, _) = + let (mut bob_ts, _bob_oms, _bob_outbound_service, _, mut bob_tx_reply, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let mut bob_event_stream = bob_ts.get_event_stream_fused(); @@ -3222,7 +3246,7 @@ fn test_restarting_transaction_protocols() { }); // Test Alice's node restarts the receive protocol - let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, mut alice_tx_finalized, _, _, _, _, _) = + let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, mut alice_tx_finalized, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 2905423b3a..9ae71e5c08 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -4135,6 +4135,61 @@ pub unsafe extern "C" fn wallet_get_seed_words(wallet: *mut TariWallet, error_ou } } +/// Set the power mode of the wallet to Low Power mode which will reduce the amount of network operations the wallet +/// performs to conserve power +/// +/// ## Arguments +/// `wallet` - The TariWallet pointer +/// `error_out` - Pointer to an int which will be modified to an error code should one occur, may not be null. Functions +/// as an out parameter. +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn wallet_set_low_power_mode(wallet: *mut TariWallet, error_out: *mut c_int) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + if wallet.is_null() { + error = LibWalletError::from(InterfaceError::NullError("wallet".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return; + } + + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).transaction_service.set_low_power_mode()) + { + error = LibWalletError::from(WalletError::TransactionServiceError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + } +} + +/// Set the power mode of the wallet to Normal Power mode which will then use the standard level of network traffic +/// +/// ## Arguments +/// `wallet` - The TariWallet pointer +/// `error_out` - Pointer to an int which will be modified to an error code should one occur, may not be null. Functions +/// as an out parameter. +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn wallet_set_normal_power_mode(wallet: *mut TariWallet, error_out: *mut c_int) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + if wallet.is_null() { + error = LibWalletError::from(InterfaceError::NullError("wallet".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return; + } + + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).transaction_service.set_normal_power_mode()) + { + error = LibWalletError::from(WalletError::TransactionServiceError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + } +} + /// Frees memory for a TariWallet /// /// ## Arguments @@ -5017,6 +5072,11 @@ mod test { assert_eq!(split_tx.is_ok(), true); string_destroy(split_msg_str as *mut c_char); + wallet_set_low_power_mode(alice_wallet, error_ptr); + assert_eq!((*error_ptr), 0); + wallet_set_normal_power_mode(alice_wallet, error_ptr); + assert_eq!((*error_ptr), 0); + // Test seed words let seed_words = wallet_get_seed_words(alice_wallet, error_ptr); let seed_word_len = seed_words_get_length(seed_words, error_ptr); diff --git a/base_layer/wallet_ffi/wallet.h b/base_layer/wallet_ffi/wallet.h index 36352619ec..41ab5bff30 100644 --- a/base_layer/wallet_ffi/wallet.h +++ b/base_layer/wallet_ffi/wallet.h @@ -432,6 +432,12 @@ unsigned long long wallet_import_utxo(struct TariWallet *wallet, unsigned long l // This function will tell the wallet to query the set base node to confirm the status of wallet data. unsigned long long wallet_sync_with_base_node(struct TariWallet *wallet, int* error_out); +// Set the power mode of the wallet to Low Power mode which will reduce the amount of network operations the wallet performs to conserve power +void wallet_set_low_power_mode(struct TariWallet *wallet, int* error_out); + +// Set the power mode of the wallet to Normal Power mode which will then use the standard level of network traffic +void wallet_set_normal_power_mode(struct TariWallet *wallet, int* error_out); + // Simulates the completion of a broadcasted TariPendingInboundTransaction bool wallet_test_broadcast_transaction(struct TariWallet *wallet, unsigned long long tx, int* error_out);