Skip to content

Commit

Permalink
Remove Liveness and add a Low Power mode to the Wallet (#1964)
Browse files Browse the repository at this point in the history
Merge pull request #1964

Remove Liveness and add a Low Power mode to the Wallet

* pull/1964/head:
  Remove Liveness and add a Low Power mode to the Wallet
  • Loading branch information
sdbondi committed Jun 11, 2020
2 parents c856c51 + af66fe9 commit 9809f4f
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 124 deletions.
11 changes: 7 additions & 4 deletions base_layer/core/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ fn wallet_base_node_integration_test() {
comms_config: alice_comms_config,
factories: factories.clone(),
transaction_service_config: Some(TransactionServiceConfig {
mempool_broadcast_timeout: Duration::from_secs(10),
base_node_mined_timeout: Duration::from_secs(1),
..Default::default()
base_node_monitoring_timeout: Duration::from_secs(1),
direct_send_timeout: Default::default(),
broadcast_send_timeout: Default::default(),
low_power_polling_timeout: Duration::from_secs(10),
}),
};
let alice_runtime = create_runtime();
Expand Down Expand Up @@ -294,7 +295,9 @@ fn wallet_base_node_integration_test() {
);
});
}

runtime
.block_on(alice_wallet.transaction_service.set_low_power_mode())
.unwrap();
let transaction = transaction.expect("Transaction must be present");

// Setup and start the miner
Expand Down
9 changes: 5 additions & 4 deletions base_layer/wallet/src/transaction_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ use std::time::Duration;

#[derive(Clone)]
pub struct TransactionServiceConfig {
pub mempool_broadcast_timeout: Duration,
pub base_node_mined_timeout: Duration,
pub base_node_monitoring_timeout: Duration,
pub direct_send_timeout: Duration,
pub broadcast_send_timeout: Duration,
pub low_power_polling_timeout: Duration, /* This is the timeout period that will be used when the wallet is in
* low_power mode */
}

impl Default for TransactionServiceConfig {
fn default() -> Self {
Self {
mempool_broadcast_timeout: Duration::from_secs(30),
base_node_mined_timeout: Duration::from_secs(30),
base_node_monitoring_timeout: Duration::from_secs(30),
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(30),
low_power_polling_timeout: Duration::from_secs(300),
}
}
}
24 changes: 24 additions & 0 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub enum TransactionServiceRequest {
CancelTransaction(TxId),
ImportUtxo(MicroTari, CommsPublicKey, String),
SubmitTransaction((TxId, Transaction, MicroTari, MicroTari, String)),
SetLowPowerMode,
SetNormalPowerMode,
#[cfg(feature = "test_harness")]
CompletePendingOutboundTransaction(CompletedTransaction),
#[cfg(feature = "test_harness")]
Expand Down Expand Up @@ -78,6 +80,8 @@ impl fmt::Display for TransactionServiceRequest {
Self::CancelTransaction(t) => f.write_str(&format!("CancelTransaction ({})", t)),
Self::ImportUtxo(v, k, msg) => f.write_str(&format!("ImportUtxo (from {}, {}, {})", k, v, msg)),
Self::SubmitTransaction((id, _, _, _, _)) => f.write_str(&format!("SubmitTransaction ({})", id)),
Self::SetLowPowerMode => f.write_str("SetLowPowerMode "),
Self::SetNormalPowerMode => f.write_str("SetNormalPowerMode"),
#[cfg(feature = "test_harness")]
Self::CompletePendingOutboundTransaction(tx) => {
f.write_str(&format!("CompletePendingOutboundTransaction ({})", tx.tx_id))
Expand Down Expand Up @@ -108,6 +112,8 @@ pub enum TransactionServiceResponse {
BaseNodePublicKeySet,
UtxoImported(TxId),
TransactionSubmitted,
LowPowerModeSet,
NormalPowerModeSet,
#[cfg(feature = "test_harness")]
CompletedPendingTransaction,
#[cfg(feature = "test_harness")]
Expand Down Expand Up @@ -346,6 +352,24 @@ impl TransactionServiceHandle {
}
}

pub async fn set_low_power_mode(&mut self) -> Result<(), TransactionServiceError> {
match self.handle.call(TransactionServiceRequest::SetLowPowerMode).await?? {
TransactionServiceResponse::LowPowerModeSet => Ok(()),
_ => Err(TransactionServiceError::UnexpectedApiResponse),
}
}

pub async fn set_normal_power_mode(&mut self) -> Result<(), TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::SetNormalPowerMode)
.await??
{
TransactionServiceResponse::NormalPowerModeSet => Ok(()),
_ => Err(TransactionServiceError::UnexpectedApiResponse),
}
}

#[cfg(feature = "test_harness")]
pub async fn test_complete_pending_transaction(
&mut self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tari_core::{
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tari_p2p::tari_message::TariMessageType;
use tokio::time::delay_for;
use tokio::{sync::broadcast, time::delay_for};

const LOG_TARGET: &str = "wallet::transaction_service::protocols::broadcast_protocol";

Expand All @@ -63,6 +63,7 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key: CommsPublicKey,
mempool_response_receiver: Option<Receiver<MempoolServiceResponse>>,
base_node_response_receiver: Option<Receiver<BaseNodeProto::BaseNodeServiceResponse>>,
timeout_update_receiver: Option<broadcast::Receiver<Duration>>,
}

impl<TBackend> TransactionBroadcastProtocol<TBackend>
Expand All @@ -75,6 +76,7 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key: CommsPublicKey,
mempool_response_receiver: Receiver<MempoolServiceResponse>,
base_node_response_receiver: Receiver<BaseNodeProto::BaseNodeServiceResponse>,
timeout_update_receiver: broadcast::Receiver<Duration>,
) -> Self
{
Self {
Expand All @@ -84,6 +86,7 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key,
mempool_response_receiver: Some(mempool_response_receiver),
base_node_response_receiver: Some(base_node_response_receiver),
timeout_update_receiver: Some(timeout_update_receiver),
}
}

Expand All @@ -99,6 +102,12 @@ where TBackend: TransactionBackend + Clone + 'static
.take()
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?;

let mut timeout_update_receiver = self
.timeout_update_receiver
.take()
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

// This is the main loop of the protocol and following the following steps
// 1) Check transaction being monitored is still in the Completed state and needs to be monitored
// 2) Send a MempoolRequest::SubmitTransaction to Mempool and a Mined? Request to base node
Expand Down Expand Up @@ -191,6 +200,15 @@ where TBackend: TransactionBackend + Clone + 'static
break;
}
},
updated_timeout = timeout_update_receiver.select_next_some() => {
if let Ok(to) = updated_timeout {
self.timeout = to;
info!(
target: LOG_TARGET,
"Broadcast monitoring protocol (Id: {}) timeout updated to {:?}", self.id ,self.timeout
);
}
},
() = delay => {
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ use tari_core::{
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tari_p2p::tari_message::TariMessageType;
use tokio::time::delay_for;

use tokio::{sync::broadcast, time::delay_for};
const LOG_TARGET: &str = "wallet::transaction_service::protocols::chain_monitoring_protocol";

/// This protocol defines the process of monitoring a mempool and base node to detect when a Broadcast transaction is
/// Mined or leaves the mempool in which case it should be cancelled
pub struct TransactionChainMonitoringProtocol<TBackend>
where TBackend: TransactionBackend + Clone + 'static
{
Expand All @@ -67,11 +67,13 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key: CommsPublicKey,
mempool_response_receiver: Option<Receiver<MempoolServiceResponse>>,
base_node_response_receiver: Option<Receiver<BaseNodeProto::BaseNodeServiceResponse>>,
timeout_update_receiver: Option<broadcast::Receiver<Duration>>,
}

impl<TBackend> TransactionChainMonitoringProtocol<TBackend>
where TBackend: TransactionBackend + Clone + 'static
{
#[allow(clippy::too_many_arguments)]
pub fn new(
id: u64,
tx_id: TxId,
Expand All @@ -80,6 +82,7 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key: CommsPublicKey,
mempool_response_receiver: Receiver<MempoolServiceResponse>,
base_node_response_receiver: Receiver<BaseNodeProto::BaseNodeServiceResponse>,
timeout_update_receiver: broadcast::Receiver<Duration>,
) -> Self
{
Self {
Expand All @@ -90,6 +93,7 @@ where TBackend: TransactionBackend + Clone + 'static
base_node_public_key,
mempool_response_receiver: Some(mempool_response_receiver),
base_node_response_receiver: Some(base_node_response_receiver),
timeout_update_receiver: Some(timeout_update_receiver),
}
}

Expand All @@ -105,6 +109,12 @@ where TBackend: TransactionBackend + Clone + 'static
.take()
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?;

let mut timeout_update_receiver = self
.timeout_update_receiver
.take()
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

trace!(
target: LOG_TARGET,
"Starting chain monitoring protocol for TxId: {} with Protocol ID: {}",
Expand Down Expand Up @@ -220,6 +230,16 @@ where TBackend: TransactionBackend + Clone + 'static
}
base_node_response_received = true;
},
updated_timeout = timeout_update_receiver.select_next_some() => {
if let Ok(to) = updated_timeout {
self.timeout = to;
info!(
target: LOG_TARGET,
"Chain monitoring protocol (Id: {}) timeout updated to {:?}", self.id, self.timeout
);
break;
}
},
() = delay => {
break;
},
Expand Down
Loading

0 comments on commit 9809f4f

Please sign in to comment.