From af66fe92bcf9caadcd8a594cc7cd4487bf37e4d3 Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Wed, 10 Jun 2020 12:26:24 +0200 Subject: [PATCH] Remove Liveness and add a Low Power mode to the Wallet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In order to help reduce battery consumption this PR removes the Liveness service from the Wallet which is no longer required due to updates to the connectivity manager. This PR also adds two FFI functions to set Low power and Normal power modes. Currently these modes will change the Timeouts used by the two base node monitoring protocols to poll the base node from 30 seconds to 5 minutes. The mobile client can switch to the low power mode when the app moves to the background when it doesn’t need to be as responsive and it will generate 10 times less network traffic and thus use less battery. --- base_layer/core/tests/wallet.rs | 11 +- .../wallet/src/transaction_service/config.rs | 9 +- .../wallet/src/transaction_service/handle.rs | 24 +++ .../transaction_broadcast_protocol.rs | 20 +- .../transaction_chain_monitoring_protocol.rs | 24 ++- .../wallet/src/transaction_service/service.rs | 60 +++++- base_layer/wallet/src/wallet.rs | 23 +-- .../tests/transaction_service/service.rs | 194 ++++++++++-------- base_layer/wallet_ffi/src/lib.rs | 60 ++++++ base_layer/wallet_ffi/wallet.h | 6 + 10 files changed, 307 insertions(+), 124 deletions(-) diff --git a/base_layer/core/tests/wallet.rs b/base_layer/core/tests/wallet.rs index c618db9531..be24454377 100644 --- a/base_layer/core/tests/wallet.rs +++ b/base_layer/core/tests/wallet.rs @@ -158,9 +158,10 @@ fn wallet_base_node_integration_test() { comms_config: alice_comms_config, factories: factories.clone(), transaction_service_config: Some(TransactionServiceConfig { - mempool_broadcast_timeout: Duration::from_secs(10), - base_node_mined_timeout: Duration::from_secs(1), - ..Default::default() + base_node_monitoring_timeout: Duration::from_secs(1), + direct_send_timeout: Default::default(), + broadcast_send_timeout: Default::default(), + low_power_polling_timeout: Duration::from_secs(10), }), }; let alice_runtime = create_runtime(); @@ -294,7 +295,9 @@ fn wallet_base_node_integration_test() { ); }); } - + runtime + .block_on(alice_wallet.transaction_service.set_low_power_mode()) + .unwrap(); let transaction = transaction.expect("Transaction must be present"); // Setup and start the miner diff --git a/base_layer/wallet/src/transaction_service/config.rs b/base_layer/wallet/src/transaction_service/config.rs index 855eda85e6..aa02a04f48 100644 --- a/base_layer/wallet/src/transaction_service/config.rs +++ b/base_layer/wallet/src/transaction_service/config.rs @@ -24,19 +24,20 @@ use std::time::Duration; #[derive(Clone)] pub struct TransactionServiceConfig { - pub mempool_broadcast_timeout: Duration, - pub base_node_mined_timeout: Duration, + pub base_node_monitoring_timeout: Duration, pub direct_send_timeout: Duration, pub broadcast_send_timeout: Duration, + pub low_power_polling_timeout: Duration, /* This is the timeout period that will be used when the wallet is in + * low_power mode */ } impl Default for TransactionServiceConfig { fn default() -> Self { Self { - mempool_broadcast_timeout: Duration::from_secs(30), - base_node_mined_timeout: Duration::from_secs(30), + base_node_monitoring_timeout: Duration::from_secs(30), direct_send_timeout: Duration::from_secs(20), broadcast_send_timeout: Duration::from_secs(30), + low_power_polling_timeout: Duration::from_secs(300), } } } diff --git a/base_layer/wallet/src/transaction_service/handle.rs b/base_layer/wallet/src/transaction_service/handle.rs index 2b9dbfed15..98aff042fd 100644 --- a/base_layer/wallet/src/transaction_service/handle.rs +++ b/base_layer/wallet/src/transaction_service/handle.rs @@ -49,6 +49,8 @@ pub enum TransactionServiceRequest { CancelTransaction(TxId), ImportUtxo(MicroTari, CommsPublicKey, String), SubmitTransaction((TxId, Transaction, MicroTari, MicroTari, String)), + SetLowPowerMode, + SetNormalPowerMode, #[cfg(feature = "test_harness")] CompletePendingOutboundTransaction(CompletedTransaction), #[cfg(feature = "test_harness")] @@ -78,6 +80,8 @@ impl fmt::Display for TransactionServiceRequest { Self::CancelTransaction(t) => f.write_str(&format!("CancelTransaction ({})", t)), Self::ImportUtxo(v, k, msg) => f.write_str(&format!("ImportUtxo (from {}, {}, {})", k, v, msg)), Self::SubmitTransaction((id, _, _, _, _)) => f.write_str(&format!("SubmitTransaction ({})", id)), + Self::SetLowPowerMode => f.write_str("SetLowPowerMode "), + Self::SetNormalPowerMode => f.write_str("SetNormalPowerMode"), #[cfg(feature = "test_harness")] Self::CompletePendingOutboundTransaction(tx) => { f.write_str(&format!("CompletePendingOutboundTransaction ({})", tx.tx_id)) @@ -108,6 +112,8 @@ pub enum TransactionServiceResponse { BaseNodePublicKeySet, UtxoImported(TxId), TransactionSubmitted, + LowPowerModeSet, + NormalPowerModeSet, #[cfg(feature = "test_harness")] CompletedPendingTransaction, #[cfg(feature = "test_harness")] @@ -346,6 +352,24 @@ impl TransactionServiceHandle { } } + pub async fn set_low_power_mode(&mut self) -> Result<(), TransactionServiceError> { + match self.handle.call(TransactionServiceRequest::SetLowPowerMode).await?? { + TransactionServiceResponse::LowPowerModeSet => Ok(()), + _ => Err(TransactionServiceError::UnexpectedApiResponse), + } + } + + pub async fn set_normal_power_mode(&mut self) -> Result<(), TransactionServiceError> { + match self + .handle + .call(TransactionServiceRequest::SetNormalPowerMode) + .await?? + { + TransactionServiceResponse::NormalPowerModeSet => Ok(()), + _ => Err(TransactionServiceError::UnexpectedApiResponse), + } + } + #[cfg(feature = "test_harness")] pub async fn test_complete_pending_transaction( &mut self, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index e27a0bc7f1..2f1793068f 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -48,7 +48,7 @@ use tari_core::{ }; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tari_p2p::tari_message::TariMessageType; -use tokio::time::delay_for; +use tokio::{sync::broadcast, time::delay_for}; const LOG_TARGET: &str = "wallet::transaction_service::protocols::broadcast_protocol"; @@ -63,6 +63,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Option>, base_node_response_receiver: Option>, + timeout_update_receiver: Option>, } impl TransactionBroadcastProtocol @@ -75,6 +76,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Receiver, base_node_response_receiver: Receiver, + timeout_update_receiver: broadcast::Receiver, ) -> Self { Self { @@ -84,6 +86,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key, mempool_response_receiver: Some(mempool_response_receiver), base_node_response_receiver: Some(base_node_response_receiver), + timeout_update_receiver: Some(timeout_update_receiver), } } @@ -99,6 +102,12 @@ where TBackend: TransactionBackend + Clone + 'static .take() .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?; + let mut timeout_update_receiver = self + .timeout_update_receiver + .take() + .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))? + .fuse(); + // This is the main loop of the protocol and following the following steps // 1) Check transaction being monitored is still in the Completed state and needs to be monitored // 2) Send a MempoolRequest::SubmitTransaction to Mempool and a Mined? Request to base node @@ -191,6 +200,15 @@ where TBackend: TransactionBackend + Clone + 'static break; } }, + updated_timeout = timeout_update_receiver.select_next_some() => { + if let Ok(to) = updated_timeout { + self.timeout = to; + info!( + target: LOG_TARGET, + "Broadcast monitoring protocol (Id: {}) timeout updated to {:?}", self.id ,self.timeout + ); + } + }, () = delay => { }, } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs index 9eb4c968fd..7e76dc2bfc 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_chain_monitoring_protocol.rs @@ -51,12 +51,12 @@ use tari_core::{ }; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tari_p2p::tari_message::TariMessageType; -use tokio::time::delay_for; - +use tokio::{sync::broadcast, time::delay_for}; const LOG_TARGET: &str = "wallet::transaction_service::protocols::chain_monitoring_protocol"; /// This protocol defines the process of monitoring a mempool and base node to detect when a Broadcast transaction is /// Mined or leaves the mempool in which case it should be cancelled + pub struct TransactionChainMonitoringProtocol where TBackend: TransactionBackend + Clone + 'static { @@ -67,11 +67,13 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Option>, base_node_response_receiver: Option>, + timeout_update_receiver: Option>, } impl TransactionChainMonitoringProtocol where TBackend: TransactionBackend + Clone + 'static { + #[allow(clippy::too_many_arguments)] pub fn new( id: u64, tx_id: TxId, @@ -80,6 +82,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key: CommsPublicKey, mempool_response_receiver: Receiver, base_node_response_receiver: Receiver, + timeout_update_receiver: broadcast::Receiver, ) -> Self { Self { @@ -90,6 +93,7 @@ where TBackend: TransactionBackend + Clone + 'static base_node_public_key, mempool_response_receiver: Some(mempool_response_receiver), base_node_response_receiver: Some(base_node_response_receiver), + timeout_update_receiver: Some(timeout_update_receiver), } } @@ -105,6 +109,12 @@ where TBackend: TransactionBackend + Clone + 'static .take() .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?; + let mut timeout_update_receiver = self + .timeout_update_receiver + .take() + .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))? + .fuse(); + trace!( target: LOG_TARGET, "Starting chain monitoring protocol for TxId: {} with Protocol ID: {}", @@ -220,6 +230,16 @@ where TBackend: TransactionBackend + Clone + 'static } base_node_response_received = true; }, + updated_timeout = timeout_update_receiver.select_next_some() => { + if let Ok(to) = updated_timeout { + self.timeout = to; + info!( + target: LOG_TARGET, + "Chain monitoring protocol (Id: {}) timeout updated to {:?}", self.id, self.timeout + ); + break; + } + }, () = delay => { break; }, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index fa3a27d428..7026fbbb1c 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -50,6 +50,7 @@ use std::{ collections::HashMap, convert::{TryFrom, TryInto}, sync::Arc, + time::Duration, }; use tari_comms::{peer_manager::NodeIdentity, types::CommsPublicKey}; use tari_comms_dht::outbound::OutboundMessageRequester; @@ -67,9 +68,7 @@ use tari_core::{ }; use tari_p2p::domain_message::DomainMessage; use tari_service_framework::{reply_channel, reply_channel::Receiver}; -#[cfg(feature = "test_harness")] -use tokio::sync::broadcast; -use tokio::task::JoinHandle; +use tokio::{sync::broadcast, task::JoinHandle}; const LOG_TARGET: &str = "wallet::transaction_service::service"; @@ -120,6 +119,8 @@ where TBackend: TransactionBackend + Clone + 'static send_transaction_cancellation_senders: HashMap>, finalized_transaction_senders: HashMap>, receiver_transaction_cancellation_senders: HashMap>, + timeout_update_publisher: broadcast::Sender, + power_mode: PowerMode, } #[allow(clippy::too_many_arguments)] @@ -163,6 +164,8 @@ where factories, config: config.clone(), }; + let (timeout_update_publisher, _) = broadcast::channel(20); + TransactionService { config, db, @@ -183,6 +186,8 @@ where send_transaction_cancellation_senders: HashMap::new(), finalized_transaction_senders: HashMap::new(), receiver_transaction_cancellation_senders: HashMap::new(), + timeout_update_publisher, + power_mode: PowerMode::Normal, } } @@ -496,6 +501,14 @@ where self.mine_transaction(tx_id).await?; Ok(TransactionServiceResponse::TransactionMined) }, + TransactionServiceRequest::SetLowPowerMode => { + self.set_power_mode(PowerMode::Low).await?; + Ok(TransactionServiceResponse::LowPowerModeSet) + }, + TransactionServiceRequest::SetNormalPowerMode => { + self.set_power_mode(PowerMode::Normal).await?; + Ok(TransactionServiceResponse::NormalPowerModeSet) + }, } } @@ -952,20 +965,25 @@ where if completed_tx.status != TransactionStatus::Completed || completed_tx.transaction.body.kernels().is_empty() { return Err(TransactionServiceError::InvalidCompletedTransaction); } + let timeout = match self.power_mode { + PowerMode::Normal => self.config.base_node_monitoring_timeout, + PowerMode::Low => self.config.low_power_polling_timeout, + }; match self.base_node_public_key.clone() { None => return Err(TransactionServiceError::NoBaseNodeKeysProvided), Some(pk) => { - let (mempool_response_sender, mempool_response_receiver) = mpsc::channel(100); - let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(100); + let (mempool_response_sender, mempool_response_receiver) = mpsc::channel(500); + let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(500); self.mempool_response_senders.insert(tx_id, mempool_response_sender); self.base_node_response_senders.insert(tx_id, base_node_response_sender); let protocol = TransactionBroadcastProtocol::new( tx_id, self.service_resources.clone(), - self.config.mempool_broadcast_timeout, + timeout, pk, mempool_response_receiver, base_node_response_receiver, + self.timeout_update_publisher.subscribe(), ); let join_handle = tokio::spawn(protocol.execute()); join_handles.push(join_handle); @@ -1067,6 +1085,7 @@ where Err(TransactionServiceProtocolError { id, error }) => { let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + error!( target: LOG_TARGET, "Error completing Transaction Broadcast Protocol (Id: {}): {:?}", id, error @@ -1091,7 +1110,10 @@ where if completed_tx.status != TransactionStatus::Broadcast || completed_tx.transaction.body.kernels().is_empty() { return Err(TransactionServiceError::InvalidCompletedTransaction); } - + let timeout = match self.power_mode { + PowerMode::Normal => self.config.base_node_monitoring_timeout, + PowerMode::Low => self.config.low_power_polling_timeout, + }; match self.base_node_public_key.clone() { None => return Err(TransactionServiceError::NoBaseNodeKeysProvided), Some(pk) => { @@ -1107,10 +1129,11 @@ where protocol_id, completed_tx.tx_id, self.service_resources.clone(), - self.config.base_node_mined_timeout, + timeout, pk, mempool_response_receiver, base_node_response_receiver, + self.timeout_update_publisher.subscribe(), ); let join_handle = tokio::spawn(protocol.execute()); join_handles.push(join_handle); @@ -1130,6 +1153,7 @@ where // Cleanup any registered senders let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + trace!( target: LOG_TARGET, "Transaction chain monitoring Protocol for TxId: {} completed successfully", @@ -1139,6 +1163,7 @@ where Err(TransactionServiceProtocolError { id, error }) => { let _ = self.mempool_response_senders.remove(&id); let _ = self.base_node_response_senders.remove(&id); + error!( target: LOG_TARGET, "Error completing Transaction chain monitoring Protocol (Id: {}): {:?}", id, error @@ -1197,6 +1222,19 @@ where Ok(()) } + async fn set_power_mode(&mut self, mode: PowerMode) -> Result<(), TransactionServiceError> { + self.power_mode = mode; + let timeout = match mode { + PowerMode::Low => self.config.low_power_polling_timeout, + PowerMode::Normal => self.config.base_node_monitoring_timeout, + }; + self.timeout_update_publisher + .send(timeout) + .map_err(|_| TransactionServiceError::ProtocolChannelError)?; + + Ok(()) + } + /// Add a completed transaction to the Transaction Manager to record directly importing a spendable UTXO. pub async fn add_utxo_import_transaction( &mut self, @@ -1512,3 +1550,9 @@ where TBackend: TransactionBackend + Clone + 'static pub factories: CryptoFactories, pub config: TransactionServiceConfig, } + +#[derive(Clone, Copy)] +enum PowerMode { + Low, + Normal, +} diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2df1c77836..e212c6ad3f 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -40,7 +40,7 @@ use crate::{ }; use blake2::Digest; use log::*; -use std::{marker::PhantomData, sync::Arc, time::Duration}; +use std::{marker::PhantomData, sync::Arc}; use tari_comms::{ multiaddr::Multiaddr, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, @@ -62,10 +62,7 @@ use tari_crypto::{ use tari_p2p::{ comms_connector::pubsub_connector, initialization::{initialize_comms, CommsConfig}, - services::{ - comms_outbound::CommsOutboundServiceInitializer, - liveness::{LivenessConfig, LivenessHandle, LivenessInitializer}, - }, + services::comms_outbound::CommsOutboundServiceInitializer, }; use tari_service_framework::StackBuilder; use tokio::runtime::Runtime; @@ -91,7 +88,6 @@ where pub comms: CommsNode, pub dht_service: Dht, pub store_and_forward_requester: StoreAndForwardRequester, - pub liveness_service: LivenessHandle, pub output_manager_service: OutputManagerHandle, pub transaction_service: TransactionServiceHandle, pub contacts_service: ContactsServiceHandle, @@ -137,15 +133,6 @@ where let fut = StackBuilder::new(runtime.handle().clone(), comms.shutdown_signal()) .add_initializer(CommsOutboundServiceInitializer::new(dht.outbound_requester())) - .add_initializer(LivenessInitializer::new( - LivenessConfig { - auto_ping_interval: Some(Duration::from_secs(30)), - useragent: format!("tari/wallet/{}", env!("CARGO_PKG_VERSION")), - ..Default::default() - }, - Arc::clone(&subscription_factory), - dht.dht_requester(), - )) .add_initializer(OutputManagerServiceInitializer::new( OutputManagerServiceConfig::default(), subscription_factory.clone(), @@ -154,7 +141,7 @@ where )) .add_initializer(TransactionServiceInitializer::new( config.transaction_service_config.unwrap_or_default(), - subscription_factory.clone(), + subscription_factory, transaction_backend, comms.node_identity(), factories.clone(), @@ -170,9 +157,6 @@ where let mut transaction_service_handle = handles .get_handle::() .expect("Could not get Transaction Service Handle"); - let liveness_handle = handles - .get_handle::() - .expect("Could not get Liveness Service Handle"); let contacts_handle = handles .get_handle::() .expect("Could not get Contacts Service Handle"); @@ -188,7 +172,6 @@ where comms, dht_service: dht, store_and_forward_requester, - liveness_service: liveness_handle, output_manager_service: output_manager_handle, transaction_service: transaction_service_handle, contacts_service: contacts_handle, diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 03168c546c..a446d79dde 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -81,16 +81,7 @@ use tari_crypto::{ use tari_p2p::{ comms_connector::pubsub_connector, domain_message::DomainMessage, - services::{ - comms_outbound::CommsOutboundServiceInitializer, - liveness::{ - mock::{create_p2p_liveness_mock, LivenessMockState}, - LivenessConfig, - LivenessEventSender, - LivenessHandle, - LivenessInitializer, - }, - }, + services::comms_outbound::CommsOutboundServiceInitializer, }; use tari_service_framework::{reply_channel, StackBuilder}; use tari_test_utils::paths::with_temp_dir; @@ -172,8 +163,8 @@ pub fn setup_transaction_service>, Sender>, Sender>, - LivenessHandle, - LivenessMockState, - LivenessEventSender, ) { let (oms_request_sender, oms_request_receiver) = reply_channel::unbounded(); @@ -228,10 +205,6 @@ pub fn setup_transaction_service_no_comms(al mut alice_tx_finalized, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _, _, _, _) = + let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -996,15 +960,12 @@ fn finalize_tx_with_missing_output(alic mut alice_tx_finalized, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _, _, _, _) = + let (_bob_ts, mut bob_output_manager, _bob_outbound_service, _bob_tx_sender, _bob_tx_ack_sender, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1279,9 +1240,6 @@ fn transaction_mempool_broadcast() { _, mut alice_mempool_response_sender, mut alice_base_node_response_sender, - _, - _, - _, ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), TransactionMemoryDatabase::new(), None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -1289,7 +1247,7 @@ fn transaction_mempool_broadcast() { .block_on(alice_ts.set_base_node_public_key(base_node_identity.public_key().clone())) .unwrap(); - let (_bob_ts, _bob_output_manager, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = + let (_bob_ts, _bob_output_manager, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), TransactionMemoryDatabase::new(), None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1506,7 +1464,7 @@ fn transaction_mempool_broadcast() { event = alice_event_stream.select_next_some() => { if let TransactionEvent::MempoolBroadcastTimedOut(_) = &*event.unwrap(){ broadcast_timeout_count +=1; - if broadcast_timeout_count >= 1 { + if broadcast_timeout_count >= 2 { break; } @@ -1517,7 +1475,7 @@ fn transaction_mempool_broadcast() { }, } } - assert!(broadcast_timeout_count >= 1); + assert!(broadcast_timeout_count >= 2); }); let mempool_response = MempoolProto::MempoolServiceResponse { @@ -1652,6 +1610,83 @@ fn try_decode_base_node_request(bytes: Vec) -> Option= 2); }); + runtime.block_on(alice_ts.set_low_power_mode()).unwrap(); + runtime.block_on(alice_ts.set_normal_power_mode()).unwrap(); + // Test that receiving a base node response with the wrong outputs does not result in a TX being mined let wrong_outputs = vec![completed_tx_outputs[0].clone(), TransactionOutput::default().into()]; @@ -2217,7 +2252,7 @@ fn query_all_completed_transactions_on_startup() { ))) .unwrap(); - let (mut alice_ts, _, _, _, _, _, _, _, _, _, _) = + let (mut alice_ts, _, _, _, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), db, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -2276,9 +2311,6 @@ fn transaction_cancellation_when_not_in_mempool() { _, mut alice_mempool_response_sender, mut alice_base_node_response_sender, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -2286,13 +2318,12 @@ fn transaction_cancellation_when_not_in_mempool() { Some(Duration::from_secs(5)), ); let mut alice_event_stream = alice_ts.get_event_stream_fused(); - let (mut bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = - setup_transaction_service_no_comms( - &mut runtime, - factories.clone(), - TransactionMemoryDatabase::new(), - Some(Duration::from_secs(20)), - ); + let (mut bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); runtime .block_on(bob_ts.set_base_node_public_key(base_node_identity.public_key().clone())) .unwrap(); @@ -2524,7 +2555,7 @@ fn test_transaction_cancellation(backen let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); - let (mut alice_ts, mut alice_output_manager, _alice_outbound_service, mut alice_tx_sender, _, _, _, _, _, _, _) = + let (mut alice_ts, mut alice_output_manager, _alice_outbound_service, mut alice_tx_sender, _, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, Some(Duration::from_secs(20))); let mut alice_event_stream = alice_ts.get_event_stream_fused(); @@ -2675,9 +2706,6 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { _, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -2723,7 +2751,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { assert_eq!(tx_id, msg_tx_id); // Test sending the Reply to a receiver with Direct and then with SAF and never both - let (_bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = setup_transaction_service_no_comms( + let (_bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), TransactionMemoryDatabase::new(), @@ -2759,13 +2787,12 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { runtime.block_on(async { delay_for(Duration::from_secs(5)).await }); } - let (_bob2_ts, _, bob2_outbound_service, mut bob2_tx_sender, _, _, _, _, _, _, _) = - setup_transaction_service_no_comms( - &mut runtime, - factories.clone(), - TransactionMemoryDatabase::new(), - Some(Duration::from_secs(20)), - ); + let (_bob2_ts, _, bob2_outbound_service, mut bob2_tx_sender, _, _, _, _) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); bob2_outbound_service.set_behaviour(MockBehaviour { direct: ResponseType::Failed, broadcast: ResponseType::Queued, @@ -2899,9 +2926,6 @@ fn test_tx_direct_send_behaviour() { _, _, _, - _, - _, - _, ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), @@ -3189,7 +3213,7 @@ fn test_restarting_transaction_protocols() { .unwrap(); // Test that Bob's node restarts the send protocol - let (mut bob_ts, _bob_oms, _bob_outbound_service, _, mut bob_tx_reply, _, _, _, _, _, _) = + let (mut bob_ts, _bob_oms, _bob_outbound_service, _, mut bob_tx_reply, _, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, None); let mut bob_event_stream = bob_ts.get_event_stream_fused(); @@ -3222,7 +3246,7 @@ fn test_restarting_transaction_protocols() { }); // Test Alice's node restarts the receive protocol - let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, mut alice_tx_finalized, _, _, _, _, _) = + let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, mut alice_tx_finalized, _, _) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, None); let mut alice_event_stream = alice_ts.get_event_stream_fused(); diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 2905423b3a..9ae71e5c08 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -4135,6 +4135,61 @@ pub unsafe extern "C" fn wallet_get_seed_words(wallet: *mut TariWallet, error_ou } } +/// Set the power mode of the wallet to Low Power mode which will reduce the amount of network operations the wallet +/// performs to conserve power +/// +/// ## Arguments +/// `wallet` - The TariWallet pointer +/// `error_out` - Pointer to an int which will be modified to an error code should one occur, may not be null. Functions +/// as an out parameter. +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn wallet_set_low_power_mode(wallet: *mut TariWallet, error_out: *mut c_int) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + if wallet.is_null() { + error = LibWalletError::from(InterfaceError::NullError("wallet".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return; + } + + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).transaction_service.set_low_power_mode()) + { + error = LibWalletError::from(WalletError::TransactionServiceError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + } +} + +/// Set the power mode of the wallet to Normal Power mode which will then use the standard level of network traffic +/// +/// ## Arguments +/// `wallet` - The TariWallet pointer +/// `error_out` - Pointer to an int which will be modified to an error code should one occur, may not be null. Functions +/// as an out parameter. +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn wallet_set_normal_power_mode(wallet: *mut TariWallet, error_out: *mut c_int) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + if wallet.is_null() { + error = LibWalletError::from(InterfaceError::NullError("wallet".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return; + } + + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).transaction_service.set_normal_power_mode()) + { + error = LibWalletError::from(WalletError::TransactionServiceError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + } +} + /// Frees memory for a TariWallet /// /// ## Arguments @@ -5017,6 +5072,11 @@ mod test { assert_eq!(split_tx.is_ok(), true); string_destroy(split_msg_str as *mut c_char); + wallet_set_low_power_mode(alice_wallet, error_ptr); + assert_eq!((*error_ptr), 0); + wallet_set_normal_power_mode(alice_wallet, error_ptr); + assert_eq!((*error_ptr), 0); + // Test seed words let seed_words = wallet_get_seed_words(alice_wallet, error_ptr); let seed_word_len = seed_words_get_length(seed_words, error_ptr); diff --git a/base_layer/wallet_ffi/wallet.h b/base_layer/wallet_ffi/wallet.h index 36352619ec..41ab5bff30 100644 --- a/base_layer/wallet_ffi/wallet.h +++ b/base_layer/wallet_ffi/wallet.h @@ -432,6 +432,12 @@ unsigned long long wallet_import_utxo(struct TariWallet *wallet, unsigned long l // This function will tell the wallet to query the set base node to confirm the status of wallet data. unsigned long long wallet_sync_with_base_node(struct TariWallet *wallet, int* error_out); +// Set the power mode of the wallet to Low Power mode which will reduce the amount of network operations the wallet performs to conserve power +void wallet_set_low_power_mode(struct TariWallet *wallet, int* error_out); + +// Set the power mode of the wallet to Normal Power mode which will then use the standard level of network traffic +void wallet_set_normal_power_mode(struct TariWallet *wallet, int* error_out); + // Simulates the completion of a broadcasted TariPendingInboundTransaction bool wallet_test_broadcast_transaction(struct TariWallet *wallet, unsigned long long tx, int* error_out);