From 83be1fc096ba2fc74f84a8b299232a207b30e6a1 Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Fri, 19 Jun 2020 12:19:17 +0200 Subject: [PATCH] Refactor UXTO Validation in Output Manager into async protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit UTXO validation in the Output Manager currently has a few elements that are synchronous relative to the main Select! Loop in the service. This meant that when there were a large number of outputs in the wallet that this could bog down the entire service. This PR refactors the UTXO validation process into a fully asynchronous and more modular Protocol pattern that runs in its own task. The API to the process is also updated to allow clients to select the appropriate Retry strategy. The automatic Utxo Validation of Unspent outputs was removed and must now be done explicitly but the client so they have more control over when it happens. The Automatic Invalid UTXO validation is still in place but instead of repeating until success it is reduced to 5 attempts as it is not critical. It was also found that there was one synchronous Mutex being used in the service outside of a `spawn_blocking(…)` context which could cause problems with the Tokio runtime so that has been updated to use an asynchronous Mutex. The Output Manager was also updated to use to the Tokio Broadcast channel and the tari_broadcast_channel dependency has been removed from the Wallet crate. A test has been added to fully test the wallet_ffi callback handler which ingests the new events the OMS is emitting. A bunch of Clippy fixes --- applications/tari_base_node/src/builder.rs | 15 +- base_layer/wallet/Cargo.toml | 1 - .../src/output_manager_service/error.rs | 26 + .../src/output_manager_service/handle.rs | 36 +- .../wallet/src/output_manager_service/mod.rs | 16 +- .../output_manager_service/protocols/mod.rs | 23 + .../protocols/utxo_validation_protocol.rs | 497 +++++++++++++++ .../src/output_manager_service/service.rs | 591 +++++------------- .../storage/database.rs | 49 +- base_layer/wallet/src/storage/database.rs | 2 +- base_layer/wallet/src/testnet_utils.rs | 12 +- .../wallet/src/transaction_service/handle.rs | 4 +- .../transaction_broadcast_protocol.rs | 7 +- .../wallet/src/transaction_service/service.rs | 35 +- .../transaction_service/storage/database.rs | 57 +- .../transaction_service/storage/memory_db.rs | 2 +- base_layer/wallet/src/wallet.rs | 9 +- .../tests/output_manager_service/service.rs | 205 +++--- .../tests/output_manager_service/storage.rs | 6 +- .../tests/transaction_service/service.rs | 9 +- base_layer/wallet_ffi/Cargo.toml | 1 - base_layer/wallet_ffi/src/callback_handler.rs | 336 +++++++++- base_layer/wallet_ffi/src/lib.rs | 7 +- 23 files changed, 1299 insertions(+), 647 deletions(-) create mode 100644 base_layer/wallet/src/output_manager_service/protocols/mod.rs create mode 100644 base_layer/wallet/src/output_manager_service/protocols/utxo_validation_protocol.rs diff --git a/applications/tari_base_node/src/builder.rs b/applications/tari_base_node/src/builder.rs index c84ea82874..b3a48b2ae7 100644 --- a/applications/tari_base_node/src/builder.rs +++ b/applications/tari_base_node/src/builder.rs @@ -104,6 +104,7 @@ use tari_wallet::{ output_manager_service::{ config::OutputManagerServiceConfig, handle::OutputManagerHandle, + protocols::utxo_validation_protocol::UtxoValidationRetry, storage::sqlite_db::OutputManagerSqliteDatabase, OutputManagerServiceInitializer, }, @@ -225,7 +226,7 @@ impl NodeContainer { let mut oms_handle_clone = wallet_output_handle.clone(); tokio::spawn(async move { delay_for(Duration::from_secs(240)).await; - let _ = oms_handle_clone.sync_with_base_node().await; + let _ = oms_handle_clone.validate_utxos(UtxoValidationRetry::UntilSuccess).await; }); }, Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e), @@ -559,12 +560,18 @@ where .set_base_node_public_key(base_node_public_key.clone()) .await .expect("Problem setting local base node public key for transaction service."); - wallet_handles + let mut oms_handle = wallet_handles .get_handle::() - .expect("OutputManagerService is not registered") + .expect("OutputManagerService is not registered"); + oms_handle .set_base_node_public_key(base_node_public_key) .await .expect("Problem setting local base node public key for output manager service."); + // Start the Output Manager UTXO Validation + oms_handle + .validate_utxos(UtxoValidationRetry::UntilSuccess) + .await + .expect("Problem starting the Output Manager Service Utxo Valdation process"); //---------------------------------- Base Node State Machine --------------------------------------------// let outbound_interface = base_node_handles @@ -1158,7 +1165,7 @@ async fn register_wallet_services( )) // Wallet services .add_initializer(OutputManagerServiceInitializer::new( - OutputManagerServiceConfig::default(), + OutputManagerServiceConfig{ base_node_query_timeout: Duration::from_secs(120), ..Default::default() }, subscription_factory.clone(), OutputManagerSqliteDatabase::new(wallet_db_conn.clone()), factories.clone(), diff --git a/base_layer/wallet/Cargo.toml b/base_layer/wallet/Cargo.toml index 28854f4d1a..80392cfddb 100644 --- a/base_layer/wallet/Cargo.toml +++ b/base_layer/wallet/Cargo.toml @@ -11,7 +11,6 @@ test_harness = ["tari_test_utils"] c_integration = [] [dependencies] -tari_broadcast_channel = "^0.2" tari_comms = { path = "../../comms", version = "^0.1"} tari_comms_dht = { path = "../../comms/dht", version = "^0.1"} tari_crypto = { version = "^0.3" } diff --git a/base_layer/wallet/src/output_manager_service/error.rs b/base_layer/wallet/src/output_manager_service/error.rs index dfcf9464da..35437aa3ba 100644 --- a/base_layer/wallet/src/output_manager_service/error.rs +++ b/base_layer/wallet/src/output_manager_service/error.rs @@ -66,6 +66,11 @@ pub enum OutputManagerError { NoBaseNodeKeysProvided, /// An error occured sending an event out on the event stream EventStreamError, + /// Maximum Attempts Exceeded + MaximumAttemptsExceeded, + /// An error has been experienced in the service + #[error(msg_embedded, non_std, no_from)] + ServiceError(String), } #[derive(Debug, Error, PartialEq)] @@ -88,6 +93,7 @@ pub enum OutputManagerStorageError { OutputAlreadySpent, /// Key Manager not initialized KeyManagerNotInitialized, + OutOfRangeError(OutOfRangeError), R2d2Error, TransactionError(TransactionError), @@ -98,3 +104,23 @@ pub enum OutputManagerStorageError { #[error(msg_embedded, non_std, no_from)] BlockingTaskSpawnError(String), } + +/// This error type is used to return OutputManagerError from inside a Output Manager Service protocol but also +/// include the ID of the protocol +#[derive(Debug)] +pub struct OutputManagerProtocolError { + pub id: u64, + pub error: OutputManagerError, +} + +impl OutputManagerProtocolError { + pub fn new(id: u64, error: OutputManagerError) -> Self { + Self { id, error } + } +} + +impl From for OutputManagerError { + fn from(tspe: OutputManagerProtocolError) -> Self { + tspe.error + } +} diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index 0fd3742e23..dc42b491a1 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -22,12 +22,12 @@ use crate::output_manager_service::{ error::OutputManagerError, + protocols::utxo_validation_protocol::UtxoValidationRetry, service::Balance, storage::database::PendingTransactionOutputs, }; use futures::{stream::Fuse, StreamExt}; use std::{collections::HashMap, fmt, time::Duration}; -use tari_broadcast_channel::Subscriber; use tari_comms::types::CommsPublicKey; use tari_core::transactions::{ tari_amount::MicroTari, @@ -36,6 +36,7 @@ use tari_core::transactions::{ SenderTransactionProtocol, }; use tari_service_framework::reply_channel::SenderService; +use tokio::sync::broadcast; use tower::Service; /// API Request enum @@ -55,7 +56,7 @@ pub enum OutputManagerRequest { GetInvalidOutputs, GetSeedWords, SetBaseNodePublicKey(CommsPublicKey), - SyncWithBaseNode, + ValidateUtxos(UtxoValidationRetry), CreateCoinSplit((MicroTari, usize, MicroTari, Option)), } @@ -78,7 +79,7 @@ impl fmt::Display for OutputManagerRequest { Self::GetInvalidOutputs => f.write_str("GetInvalidOutputs"), Self::GetSeedWords => f.write_str("GetSeedWords"), Self::SetBaseNodePublicKey(k) => f.write_str(&format!("SetBaseNodePublicKey ({})", k)), - Self::SyncWithBaseNode => f.write_str("SyncWithBaseNode"), + Self::ValidateUtxos(retry) => f.write_str(&format!("ValidateUtxos ({:?})", retry)), Self::CreateCoinSplit(v) => f.write_str(&format!("CreateCoinSplit ({})", v.0)), } } @@ -101,35 +102,42 @@ pub enum OutputManagerResponse { InvalidOutputs(Vec), SeedWords(Vec), BaseNodePublicKeySet, - StartedBaseNodeSync(u64), + UtxoValidationStarted(u64), Transaction((u64, Transaction, MicroTari, MicroTari)), } +pub type OutputManagerEventSender = broadcast::Sender; +pub type OutputManagerEventReceiver = broadcast::Receiver; + /// Events that can be published on the Text Message Service Event Stream #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum OutputManagerEvent { - BaseNodeSyncRequestTimedOut(u64), - ReceiveBaseNodeResponse(u64), + UtxoValidationTimedOut(u64), + UtxoValidationSuccess(u64), + UtxoValidationFailure(u64), Error(String), } #[derive(Clone)] pub struct OutputManagerHandle { handle: SenderService>, - event_stream: Subscriber, + event_stream_sender: OutputManagerEventSender, } impl OutputManagerHandle { pub fn new( handle: SenderService>, - event_stream: Subscriber, + event_stream_sender: OutputManagerEventSender, ) -> Self { - OutputManagerHandle { handle, event_stream } + OutputManagerHandle { + handle, + event_stream_sender, + } } - pub fn get_event_stream_fused(&self) -> Fuse> { - self.event_stream.clone().fuse() + pub fn get_event_stream_fused(&self) -> Fuse { + self.event_stream_sender.subscribe().fuse() } pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> { @@ -287,9 +295,9 @@ impl OutputManagerHandle { } } - pub async fn sync_with_base_node(&mut self) -> Result { - match self.handle.call(OutputManagerRequest::SyncWithBaseNode).await?? { - OutputManagerResponse::StartedBaseNodeSync(request_key) => Ok(request_key), + pub async fn validate_utxos(&mut self, retries: UtxoValidationRetry) -> Result { + match self.handle.call(OutputManagerRequest::ValidateUtxos(retries)).await?? { + OutputManagerResponse::UtxoValidationStarted(request_key) => Ok(request_key), _ => Err(OutputManagerError::UnexpectedApiResponse), } } diff --git a/base_layer/wallet/src/output_manager_service/mod.rs b/base_layer/wallet/src/output_manager_service/mod.rs index 0ea606d276..74fda1f736 100644 --- a/base_layer/wallet/src/output_manager_service/mod.rs +++ b/base_layer/wallet/src/output_manager_service/mod.rs @@ -20,11 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::output_manager_service::{handle::OutputManagerHandle, service::OutputManagerService}; - use crate::{ output_manager_service::{ config::OutputManagerServiceConfig, + handle::OutputManagerHandle, + service::OutputManagerService, storage::database::{OutputManagerBackend, OutputManagerDatabase}, }, transaction_service::handle::TransactionServiceHandle, @@ -32,7 +32,6 @@ use crate::{ use futures::{future, Future, Stream, StreamExt}; use log::*; use std::sync::Arc; -use tari_broadcast_channel::bounded; use tari_comms_dht::outbound::OutboundMessageRequester; use tari_core::{base_node::proto::base_node as BaseNodeProto, transactions::types::CryptoFactories}; use tari_p2p::{ @@ -48,11 +47,12 @@ use tari_service_framework::{ ServiceInitializer, }; use tari_shutdown::ShutdownSignal; -use tokio::runtime; +use tokio::{runtime, sync::broadcast}; pub mod config; pub mod error; pub mod handle; +pub mod protocols; #[allow(unused_assignments)] pub mod service; pub mod storage; @@ -72,7 +72,7 @@ where T: OutputManagerBackend } impl OutputManagerServiceInitializer -where T: OutputManagerBackend +where T: OutputManagerBackend + Clone + 'static { pub fn new( config: OutputManagerServiceConfig, @@ -98,7 +98,7 @@ where T: OutputManagerBackend } impl ServiceInitializer for OutputManagerServiceInitializer -where T: OutputManagerBackend + 'static +where T: OutputManagerBackend + Clone + 'static { type Future = impl Future>; @@ -112,9 +112,9 @@ where T: OutputManagerBackend + 'static let base_node_response_stream = self.base_node_response_stream(); let (sender, receiver) = reply_channel::unbounded(); - let (publisher, subscriber) = bounded(100, 7); + let (publisher, _) = broadcast::channel(200); - let oms_handle = OutputManagerHandle::new(sender, subscriber); + let oms_handle = OutputManagerHandle::new(sender, publisher.clone()); // Register handle before waiting for handles to be ready handles_fut.register(oms_handle); diff --git a/base_layer/wallet/src/output_manager_service/protocols/mod.rs b/base_layer/wallet/src/output_manager_service/protocols/mod.rs new file mode 100644 index 0000000000..2d93eee97e --- /dev/null +++ b/base_layer/wallet/src/output_manager_service/protocols/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2020. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod utxo_validation_protocol; diff --git a/base_layer/wallet/src/output_manager_service/protocols/utxo_validation_protocol.rs b/base_layer/wallet/src/output_manager_service/protocols/utxo_validation_protocol.rs new file mode 100644 index 0000000000..2c7001cdef --- /dev/null +++ b/base_layer/wallet/src/output_manager_service/protocols/utxo_validation_protocol.rs @@ -0,0 +1,497 @@ +// Copyright 2020. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::output_manager_service::{ + error::{OutputManagerError, OutputManagerProtocolError}, + handle::OutputManagerEvent, + service::OutputManagerResources, + storage::{database::OutputManagerBackend, models::DbUnblindedOutput}, +}; +use futures::{FutureExt, StreamExt}; +use log::*; +use rand::{rngs::OsRng, RngCore}; +use std::{cmp, collections::HashMap, convert::TryFrom, fmt, sync::Arc, time::Duration}; +use tari_comms::types::CommsPublicKey; +use tari_comms_dht::domain_message::OutboundDomainMessage; +use tari_core::{ + base_node::proto::{ + base_node as BaseNodeProto, + base_node::{ + base_node_service_request::Request as BaseNodeRequestProto, + base_node_service_response::Response as BaseNodeResponseProto, + }, + }, + transactions::transaction::TransactionOutput, +}; +use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex}; +use tari_p2p::tari_message::TariMessageType; +use tokio::{sync::broadcast, time::delay_for}; + +const LOG_TARGET: &str = "wallet::output_manager_service::protocols::utxo_validation_protocol"; + +pub struct UtxoValidationProtocol +where TBackend: OutputManagerBackend + Clone + 'static +{ + id: u64, + validation_type: UtxoValidationType, + retry_strategy: UtxoValidationRetry, + resources: OutputManagerResources, + base_node_public_key: CommsPublicKey, + timeout: Duration, + base_node_response_receiver: Option>>, + pending_queries: HashMap>>, +} + +/// This protocol defines the process of submitting our current UTXO set to the Base Node to validate it. +impl UtxoValidationProtocol +where TBackend: OutputManagerBackend + Clone + 'static +{ + pub fn new( + id: u64, + validation_type: UtxoValidationType, + retry_strategy: UtxoValidationRetry, + resources: OutputManagerResources, + base_node_public_key: CommsPublicKey, + timeout: Duration, + base_node_response_receiver: broadcast::Receiver>, + ) -> Self + { + Self { + id, + validation_type, + retry_strategy, + resources, + base_node_public_key, + timeout, + base_node_response_receiver: Some(base_node_response_receiver), + pending_queries: Default::default(), + } + } + + /// The task that defines the execution of the protocol. + pub async fn execute(mut self) -> Result { + let mut base_node_response_receiver = self + .base_node_response_receiver + .take() + .ok_or_else(|| { + OutputManagerProtocolError::new( + self.id, + OutputManagerError::ServiceError("No base node response channel provided".to_string()), + ) + })? + .fuse(); + + trace!( + target: LOG_TARGET, + "Starting UTXO validation protocol (Id: {}) for {}", + self.id, + self.validation_type, + ); + + let outputs_to_query: Vec> = match self.validation_type { + UtxoValidationType::Unspent => self + .resources + .db + .get_unspent_outputs() + .await + .map_err(|e| { + OutputManagerProtocolError::new(self.id, OutputManagerError::OutputManagerStorageError(e)) + })? + .iter() + .map(|uo| uo.hash.clone()) + .collect(), + UtxoValidationType::Invalid => self + .resources + .db + .get_invalid_outputs() + .await + .map_err(|e| { + OutputManagerProtocolError::new(self.id, OutputManagerError::OutputManagerStorageError(e)) + })? + .into_iter() + .map(|uo| uo.hash) + .collect(), + }; + + if outputs_to_query.is_empty() { + trace!( + target: LOG_TARGET, + "UTXO validation protocol (Id: {}) has no outputs to validate", + self.id, + ); + return Ok(self.id); + } + + let total_retries_str = match self.retry_strategy { + UtxoValidationRetry::Limited(n) => format!("{}", n), + UtxoValidationRetry::UntilSuccess => "∞".to_string(), + }; + + let mut retries = 0; + loop { + self.send_queries(outputs_to_query.clone()).await?; + + let mut delay = delay_for(self.timeout).fuse(); + + loop { + futures::select! { + base_node_response = base_node_response_receiver.select_next_some() => { + match base_node_response { + Ok(response) => if self.handle_base_node_response(response).await? { + error!(target: LOG_TARGET, "Response handled with success for {} and pending_queries len: {}", self.id, self.pending_queries.len()); + if self.pending_queries.is_empty() { + let _ = self + .resources + .event_publisher + .send(OutputManagerEvent::UtxoValidationSuccess(self.id)) + .map_err(|e| { + trace!( + target: LOG_TARGET, + "Error sending event, usually because there are no subscribers: {:?}", + e + ); + e + }); + return Ok(self.id); + } + }, + Err(e) => trace!(target: LOG_TARGET, "Error reading broadcast base_node_response: {:?}", e), + } + + }, + () = delay => { + break; + }, + } + } + + info!( + target: LOG_TARGET, + "UTXO Validation protocol (Id: {}) attempt {} out of {} timed out.", + self.id, + retries + 1, + total_retries_str + ); + + let _ = self + .resources + .event_publisher + .send(OutputManagerEvent::UtxoValidationTimedOut(self.id)) + .map_err(|e| { + trace!( + target: LOG_TARGET, + "Error sending event, usually because there are no subscribers: {:?}", + e + ); + e + }); + + retries += 1; + match self.retry_strategy { + UtxoValidationRetry::Limited(n) => { + if retries >= n { + break; + } + }, + UtxoValidationRetry::UntilSuccess => (), + } + + self.pending_queries.clear(); + } + + info!( + target: LOG_TARGET, + "Maximum attempts exceeded for UTXO Validation Protocol (Id: {})", self.id + ); + Err(OutputManagerProtocolError::new( + self.id, + OutputManagerError::MaximumAttemptsExceeded, + )) + } + + async fn send_queries(&mut self, mut outputs_to_query: Vec>) -> Result<(), OutputManagerProtocolError> { + // Determine how many rounds of base node request we need to query all the outputs in batches of + // max_utxo_query_size + let rounds = + ((outputs_to_query.len() as f32) / (self.resources.config.max_utxo_query_size as f32 + 0.1)) as usize + 1; + + for r in 0..rounds { + let mut output_hashes = Vec::new(); + for uo_hash in + outputs_to_query.drain(..cmp::min(self.resources.config.max_utxo_query_size, outputs_to_query.len())) + { + output_hashes.push(uo_hash); + } + + let request_key = if r == 0 { self.id } else { OsRng.next_u64() }; + + let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs { + outputs: output_hashes.clone(), + }); + + let service_request = BaseNodeProto::BaseNodeServiceRequest { + request_key, + request: Some(request), + }; + + let send_message_response = self + .resources + .outbound_message_service + .send_direct( + self.base_node_public_key.clone(), + OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request), + ) + .await + .map_err(|e| OutputManagerProtocolError::new(self.id, OutputManagerError::from(e)))?; + + // Here we are going to spawn a non-blocking task that will monitor and log the progress of the + // send process. + tokio::spawn(async move { + match send_message_response.resolve().await { + Err(e) => trace!( + target: LOG_TARGET, + "Failed to send Output Manager UTXO query ({}) to Base Node: {}", + request_key, + e + ), + Ok(send_states) => { + trace!( + target: LOG_TARGET, + "Output Manager UTXO query ({}) queued for sending with Message {}", + request_key, + send_states[0].tag, + ); + let message_tag = send_states[0].tag; + if send_states.wait_single().await { + trace!( + target: LOG_TARGET, + "Output Manager UTXO query ({}) successfully sent to Base Node with Message {}", + request_key, + message_tag, + ) + } else { + trace!( + target: LOG_TARGET, + "Failed to send Output Manager UTXO query ({}) to Base Node with Message {}", + request_key, + message_tag, + ); + } + }, + } + }); + + self.pending_queries.insert(request_key, output_hashes); + + info!( + target: LOG_TARGET, + "Output Manager {} query (Id: {}) sent to Base Node, part {} of {} requests", + self.validation_type, + request_key, + r + 1, + rounds + ); + } + + Ok(()) + } + + async fn handle_base_node_response( + &mut self, + response: Arc, + ) -> Result + { + let request_key = response.request_key; + + let queried_hashes = if let Some(hashes) = self.pending_queries.remove(&request_key) { + hashes + } else { + trace!( + target: LOG_TARGET, + "Base Node Response (Id: {}) not expected for UTXO Validation protocol {}", + request_key, + self.id + ); + return Ok(false); + }; + + trace!( + target: LOG_TARGET, + "Handling a Base Node Response for {} request (Id: {}) for UTXO Validation protocol {}", + self.validation_type, + request_key, + self.id + ); + + let response: Vec = match (*response).clone().response + { + Some(BaseNodeResponseProto::TransactionOutputs(outputs)) => outputs.outputs, + _ => { + return Err(OutputManagerProtocolError::new( + self.id, + OutputManagerError::InvalidResponseError("Base Node Response of unexpected variant".to_string()), + )); + }, + }; + + match self.validation_type { + UtxoValidationType::Unspent => { + // Construct a HashMap of all the unspent outputs + let unspent_outputs: Vec = + self.resources.db.get_unspent_outputs().await.map_err(|e| { + OutputManagerProtocolError::new(self.id, OutputManagerError::OutputManagerStorageError(e)) + })?; + + // We only want to check outputs that we were expecting and are still valid + let mut output_hashes = HashMap::new(); + for uo in unspent_outputs.iter() { + let hash = uo.hash.clone(); + if queried_hashes.iter().any(|h| &hash == h) { + output_hashes.insert(hash, uo.clone()); + } + } + + // Go through all the returned UTXOs and if they are in the hashmap remove them + for output in response.iter() { + let response_hash = TransactionOutput::try_from(output.clone()) + .map_err(|_| { + OutputManagerProtocolError::new( + self.id, + OutputManagerError::ConversionError( + "Could not convert protobuf TransactionOutput".to_string(), + ), + ) + })? + .hash(); + + let _ = output_hashes.remove(&response_hash); + } + + // If there are any remaining Unspent Outputs we will move them to the invalid collection + for (_k, v) in output_hashes { + // Get the transaction these belonged to so we can display the kernel signature of the transaction + // this output belonged to. + + warn!( + target: LOG_TARGET, + "Output with value {} not returned from Base Node query ({}) and is thus being invalidated", + v.unblinded_output.value, + request_key, + ); + // If the output that is being invalidated has an associated TxId then get the kernel signature of + // the transaction and display for easier debugging + if let Some(tx_id) = self.resources.db.invalidate_output(v).await.map_err(|e| { + OutputManagerProtocolError::new(self.id, OutputManagerError::OutputManagerStorageError(e)) + })? { + if let Ok(transaction) = self + .resources + .transaction_service + .get_completed_transaction(tx_id) + .await + { + info!( + target: LOG_TARGET, + "Invalidated Output is from Transaction (TxId: {}) with message: {} and Kernel \ + Signature: {}", + transaction.tx_id, + transaction.message, + transaction.transaction.body.kernels()[0] + .excess_sig + .get_signature() + .to_hex() + ) + } + } else { + info!( + target: LOG_TARGET, + "Invalidated Output does not have an associated TxId so it is likely a Coinbase output \ + lost to a Re-Org" + ); + } + } + debug!( + target: LOG_TARGET, + "Handled Base Node response (Id: {}) for Unspent Outputs Query {}", request_key, self.id + ); + }, + UtxoValidationType::Invalid => { + let invalid_outputs = self.resources.db.get_invalid_outputs().await.map_err(|e| { + OutputManagerProtocolError::new(self.id, OutputManagerError::OutputManagerStorageError(e)) + })?; + + for output in response.iter() { + let response_hash = TransactionOutput::try_from(output.clone()) + .map_err(|_| { + OutputManagerProtocolError::new( + self.id, + OutputManagerError::ConversionError("Could not convert Transaction Output".to_string()), + ) + })? + .hash(); + + if let Some(output) = invalid_outputs.iter().find(|o| o.hash == response_hash) { + if self + .resources + .db + .revalidate_output(output.unblinded_output.spending_key.clone()) + .await + .is_ok() + { + trace!( + target: LOG_TARGET, + "Output with value {} has been restored to a valid spendable output", + output.unblinded_output.value + ); + } + } + } + + debug!( + target: LOG_TARGET, + "Handled Base Node response (Id: {}) for Invalidated Outputs Query {}", request_key, self.id + ); + }, + } + Ok(true) + } +} + +pub enum UtxoValidationType { + Unspent, + Invalid, +} + +impl fmt::Display for UtxoValidationType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + UtxoValidationType::Unspent => write!(f, "Unspent Outputs Validation"), + UtxoValidationType::Invalid => write!(f, "Invalid Outputs Validation"), + } + } +} + +// 0 means keep retying until success +#[derive(Debug)] +pub enum UtxoValidationRetry { + Limited(u8), + UntilSuccess, +} diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 0e4c311cf8..0c33833453 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -23,8 +23,9 @@ use crate::{ output_manager_service::{ config::OutputManagerServiceConfig, - error::OutputManagerError, - handle::{OutputManagerEvent, OutputManagerRequest, OutputManagerResponse}, + error::{OutputManagerError, OutputManagerProtocolError}, + handle::{OutputManagerEvent, OutputManagerEventSender, OutputManagerRequest, OutputManagerResponse}, + protocols::utxo_validation_protocol::{UtxoValidationProtocol, UtxoValidationRetry, UtxoValidationType}, storage::{ database::{KeyManagerState, OutputManagerBackend, OutputManagerDatabase, PendingTransactionOutputs}, models::DbUnblindedOutput, @@ -33,23 +34,15 @@ use crate::{ }, transaction_service::handle::TransactionServiceHandle, types::{HashDigest, KeyDigest}, - util::futures::StateDelay, }; -use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, SinkExt, Stream, StreamExt}; +use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt}; use log::*; use rand::{rngs::OsRng, RngCore}; -use std::{cmp, cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration}; -use tari_broadcast_channel::Publisher; +use std::{cmp::Ordering, collections::HashMap, fmt, sync::Arc, time::Duration}; use tari_comms::types::CommsPublicKey; -use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundMessageRequester}; +use tari_comms_dht::outbound::OutboundMessageRequester; use tari_core::{ - base_node::proto::{ - base_node as BaseNodeProto, - base_node::{ - base_node_service_request::Request as BaseNodeRequestProto, - base_node_service_response::Response as BaseNodeResponseProto, - }, - }, + base_node::proto::base_node as BaseNodeProto, transactions::{ fee::Fee, tari_amount::MicroTari, @@ -65,16 +58,17 @@ use tari_core::{ SenderTransactionProtocol, }, }; -use tari_crypto::{ - keys::SecretKey as SecretKeyTrait, - tari_utilities::{hash::Hashable, hex::Hex}, -}; +use tari_crypto::keys::SecretKey as SecretKeyTrait; use tari_key_manager::{ key_manager::KeyManager, mnemonic::{from_secret_key, MnemonicLanguage}, }; -use tari_p2p::{domain_message::DomainMessage, tari_message::TariMessageType}; +use tari_p2p::domain_message::DomainMessage; use tari_service_framework::reply_channel; +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinHandle, +}; const LOG_TARGET: &str = "wallet::output_manager_service"; @@ -83,26 +77,19 @@ const LOG_TARGET: &str = "wallet::output_manager_service"; /// outputs. When the outputs are detected on the blockchain the Transaction service will call this Service to confirm /// them to be moved to the spent and unspent output lists respectively. pub struct OutputManagerService -where TBackend: OutputManagerBackend + 'static +where TBackend: OutputManagerBackend + Clone + 'static { - config: OutputManagerServiceConfig, + resources: OutputManagerResources, key_manager: Mutex>, - db: OutputManagerDatabase, - outbound_message_service: OutboundMessageRequester, - transaction_service: TransactionServiceHandle, request_stream: Option>>, base_node_response_stream: Option, - factories: CryptoFactories, - base_node_public_key: Option, - pending_utxo_query_keys: HashMap>>, - pending_revalidation_query_keys: HashMap>>, - event_publisher: Publisher, + base_node_response_publisher: broadcast::Sender>, } impl OutputManagerService where - TBackend: OutputManagerBackend, + TBackend: OutputManagerBackend + Clone + 'static, BNResponseStream: Stream>, { #[allow(clippy::too_many_arguments)] @@ -116,7 +103,7 @@ where >, base_node_response_stream: BNResponseStream, db: OutputManagerDatabase, - event_publisher: Publisher, + event_publisher: OutputManagerEventSender, factories: CryptoFactories, ) -> Result, OutputManagerError> { @@ -138,23 +125,28 @@ where // Pending Transactions. db.clear_short_term_encumberances().await?; - Ok(OutputManagerService { + let resources = OutputManagerResources { config, + db, outbound_message_service, transaction_service, + factories, + base_node_public_key: None, + event_publisher, + }; + + let (base_node_response_publisher, _) = broadcast::channel(50); + + Ok(OutputManagerService { + resources, key_manager: Mutex::new(KeyManager::::from( key_manager_state.master_seed, key_manager_state.branch_seed, key_manager_state.primary_key_index, )), - db, request_stream: Some(request_stream), base_node_response_stream: Some(base_node_response_stream), - factories, - base_node_public_key: None, - pending_utxo_query_keys: HashMap::new(), - pending_revalidation_query_keys: HashMap::new(), - event_publisher, + base_node_response_publisher, }) } @@ -173,7 +165,8 @@ where .fuse(); pin_mut!(base_node_response_stream); - let mut utxo_query_timeout_futures: FuturesUnordered> = FuturesUnordered::new(); + let mut utxo_validation_handles: FuturesUnordered>> = + FuturesUnordered::new(); info!(target: LOG_TARGET, "Output Manager Service started"); loop { @@ -181,7 +174,7 @@ where request_context = request_stream.select_next_some() => { trace!(target: LOG_TARGET, "Handling Service API Request"); let (request, reply_tx) = request_context.split(); - let _ = reply_tx.send(self.handle_request(request, &mut utxo_query_timeout_futures).await.or_else(|resp| { + let _ = reply_tx.send(self.handle_request(request, &mut utxo_validation_handles).await.or_else(|resp| { error!(target: LOG_TARGET, "Error handling request: {:?}", resp); Err(resp) })).or_else(|resp| { @@ -199,19 +192,19 @@ where }); if result.is_err() { - let _ = self.event_publisher + let _ = self.resources.event_publisher .send(OutputManagerEvent::Error( "Error handling Base Node Response message".to_string(), )) - .await; + ; } } - utxo_hash = utxo_query_timeout_futures.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Base Node Sync Timeout"); - let _ = self.handle_utxo_query_timeout(utxo_hash, &mut utxo_query_timeout_futures).await.or_else(|resp| { - error!(target: LOG_TARGET, "Error handling UTXO query timeout : {:?}", resp); - Err(resp) - }); + join_result = utxo_validation_handles.select_next_some() => { + trace!(target: LOG_TARGET, "UTXO Validation protocol has ended with result {:?}", join_result); + match join_result { + Ok(join_result_inner) => self.complete_utxo_validation_protocol(join_result_inner).await, + Err(e) => error!(target: LOG_TARGET, "Error resolving UTXO Validation protocol: {:?}", e), + }; } complete => { info!(target: LOG_TARGET, "Output manager service shutting down"); @@ -228,7 +221,7 @@ where async fn handle_request( &mut self, request: OutputManagerRequest, - utxo_query_timeout_futures: &mut FuturesUnordered>, + utxo_validation_handles: &mut FuturesUnordered>>, ) -> Result { trace!(target: LOG_TARGET, "Handling Service Request: {}", request); @@ -283,15 +276,14 @@ where .collect(); Ok(OutputManagerResponse::UnspentOutputs(outputs)) }, - OutputManagerRequest::GetSeedWords => self.get_seed_words().map(OutputManagerResponse::SeedWords), + OutputManagerRequest::GetSeedWords => self.get_seed_words().await.map(OutputManagerResponse::SeedWords), OutputManagerRequest::SetBaseNodePublicKey(pk) => self - .set_base_node_public_key(pk, utxo_query_timeout_futures) + .set_base_node_public_key(pk, utxo_validation_handles) .await .map(|_| OutputManagerResponse::BaseNodePublicKeySet), - OutputManagerRequest::SyncWithBaseNode => self - .query_unspent_outputs_status(utxo_query_timeout_futures) - .await - .map(OutputManagerResponse::StartedBaseNodeSync), + OutputManagerRequest::ValidateUtxos(retries) => self + .validate_outputs(UtxoValidationType::Unspent, retries, utxo_validation_handles) + .map(OutputManagerResponse::UtxoValidationStarted), OutputManagerRequest::GetInvalidOutputs => { let outputs = self .fetch_invalid_outputs() @@ -314,344 +306,86 @@ where response: BaseNodeProto::BaseNodeServiceResponse, ) -> Result<(), OutputManagerError> { - let request_key = response.request_key; - - let response: Vec = match response.response { - Some(BaseNodeResponseProto::TransactionOutputs(outputs)) => outputs.outputs, - _ => { - return Ok(()); - }, - }; - - let mut unspent_query_handled = false; - let mut invalid_query_handled = false; - - // Check if the received key is in the pending UTXO query list to be handled - if let Some(queried_hashes) = self.pending_utxo_query_keys.remove(&request_key) { - trace!( - target: LOG_TARGET, - "Handling a Base Node Response for a Unspent Outputs request ({})", - request_key - ); - - // Construct a HashMap of all the unspent outputs - let unspent_outputs: Vec = self.db.get_unspent_outputs().await?; - - let mut output_hashes = HashMap::new(); - for uo in unspent_outputs.iter() { - let hash = uo.hash.clone(); - if queried_hashes.iter().any(|h| &hash == h) { - output_hashes.insert(hash.clone(), uo.clone()); - } - } - - // Go through all the returned UTXOs and if they are in the hashmap remove them - for output in response.iter() { - let response_hash = TransactionOutput::try_from(output.clone()) - .map_err(OutputManagerError::ConversionError)? - .hash(); - - let _ = output_hashes.remove(&response_hash); - } - - // If there are any remaining Unspent Outputs we will move them to the invalid collection - for (_k, v) in output_hashes { - // Get the transaction these belonged to so we can display the kernel signature of the transaction - // this output belonged to. - - warn!( - target: LOG_TARGET, - "Output with value {} not returned from Base Node query and is thus being invalidated", - v.unblinded_output.value - ); - // If the output that is being invalidated has an associated TxId then get the kernel signature of - // the transaction and display for easier debugging - if let Some(tx_id) = self.db.invalidate_output(v).await? { - if let Ok(transaction) = self.transaction_service.get_completed_transaction(tx_id).await { - info!( - target: LOG_TARGET, - "Invalidated Output is from Transaction (TxId: {}) with message: {} and Kernel Signature: \ - {}", - transaction.tx_id, - transaction.message, - transaction.transaction.body.kernels()[0] - .excess_sig - .get_signature() - .to_hex() - ) - } - } else { - info!( - target: LOG_TARGET, - "Invalidated Output does not have an associated TxId so it is likely a Coinbase output lost \ - to a Re-Org" - ); - } - } - unspent_query_handled = true; - debug!( - target: LOG_TARGET, - "Handled Base Node response for Unspent Outputs Query {}", request_key - ); - }; - - // Check if the received key is in the Invalid UTXO query list waiting to be handled - if self.pending_revalidation_query_keys.remove(&request_key).is_some() { + // Publish this response to any protocols that are subscribed + if let Err(e) = self.base_node_response_publisher.send(Arc::new(response)) { trace!( target: LOG_TARGET, - "Handling a Base Node Response for a Invalid Outputs request ({})", - request_key - ); - let invalid_outputs = self.db.get_invalid_outputs().await?; - - for output in response.iter() { - let response_hash = TransactionOutput::try_from(output.clone()) - .map_err(OutputManagerError::ConversionError)? - .hash(); - - if let Some(output) = invalid_outputs.iter().find(|o| o.hash == response_hash) { - if self - .db - .revalidate_output(output.unblinded_output.spending_key.clone()) - .await - .is_ok() - { - trace!( - target: LOG_TARGET, - "Output with value {} has been restored to a valid spendable output", - output.unblinded_output.value - ); - } - } - } - invalid_query_handled = true; - debug!( - target: LOG_TARGET, - "Handled Base Node response for Invalid Outputs Query {}", request_key + "Could not publish Base Node Response, no subscribers to receive. (Err {:?})", + e ); } - if unspent_query_handled || invalid_query_handled { - let _ = self - .event_publisher - .send(OutputManagerEvent::ReceiveBaseNodeResponse(request_key)) - .await - .map_err(|e| { - trace!( - target: LOG_TARGET, - "Error sending event, usually because there are no subscribers: {:?}", - e - ); - e - }); - } - - Ok(()) - } - - /// Handle the timeout of a pending UTXO query. - pub async fn handle_utxo_query_timeout( - &mut self, - query_key: u64, - utxo_query_timeout_futures: &mut FuturesUnordered>, - ) -> Result<(), OutputManagerError> - { - if let Some(hashes) = self.pending_utxo_query_keys.remove(&query_key) { - warn!(target: LOG_TARGET, "UTXO Unspent Outputs Query {} timed out", query_key); - self.query_outputs_status(utxo_query_timeout_futures, hashes, UtxoQueryType::UnspentOutputs) - .await?; - } - - if let Some(hashes) = self.pending_revalidation_query_keys.remove(&query_key) { - warn!(target: LOG_TARGET, "UTXO Invalid Outputs Query {} timed out", query_key); - self.query_outputs_status(utxo_query_timeout_futures, hashes, UtxoQueryType::InvalidOutputs) - .await?; - } - - let _ = self - .event_publisher - .send(OutputManagerEvent::BaseNodeSyncRequestTimedOut(query_key)) - .await - .map_err(|e| { - trace!( - target: LOG_TARGET, - "Error sending event, usually because there are no subscribers: {:?}", - e - ); - e - }); Ok(()) } - pub async fn query_unspent_outputs_status( - &mut self, - utxo_query_timeout_futures: &mut FuturesUnordered>, - ) -> Result - { - let unspent_output_hashes = self - .db - .get_unspent_outputs() - .await? - .iter() - .map(|uo| uo.hash.clone()) - .collect(); - - let key = self - .query_outputs_status( - utxo_query_timeout_futures, - unspent_output_hashes, - UtxoQueryType::UnspentOutputs, - ) - .await?; - - Ok(key) - } - - pub async fn query_invalid_outputs_status( + fn validate_outputs( &mut self, - utxo_query_timeout_futures: &mut FuturesUnordered>, + validation_type: UtxoValidationType, + retry_strategy: UtxoValidationRetry, + utxo_validation_handles: &mut FuturesUnordered>>, ) -> Result { - let invalid_output_hashes: Vec<_> = self - .db - .get_invalid_outputs() - .await? - .into_iter() - .map(|uo| uo.hash) - .collect(); - - let key = if !invalid_output_hashes.is_empty() { - self.query_outputs_status( - utxo_query_timeout_futures, - invalid_output_hashes, - UtxoQueryType::InvalidOutputs, - ) - .await? - } else { - 0 - }; - - Ok(key) - } - - /// Send queries to the base node to check the status of all specified outputs. - async fn query_outputs_status( - &mut self, - utxo_query_timeout_futures: &mut FuturesUnordered>, - mut outputs_to_query: Vec>, - query_type: UtxoQueryType, - ) -> Result - { - match self.base_node_public_key.as_ref() { + match self.resources.base_node_public_key.as_ref() { None => Err(OutputManagerError::NoBaseNodeKeysProvided), Some(pk) => { - let mut first_request_key = 0; - - // Determine how many rounds of base node request we need to query all the outputs in batches of - // max_utxo_query_size - let rounds = - ((outputs_to_query.len() as f32) / (self.config.max_utxo_query_size as f32 + 0.1)) as usize + 1; - - for r in 0..rounds { - let mut output_hashes = Vec::new(); - for uo_hash in - outputs_to_query.drain(..cmp::min(self.config.max_utxo_query_size, outputs_to_query.len())) - { - output_hashes.push(uo_hash); - } - let request_key = OsRng.next_u64(); - if first_request_key == 0 { - first_request_key = request_key; - } + let id = OsRng.next_u64(); + + let utxo_validation_protocol = UtxoValidationProtocol::new( + id, + validation_type, + retry_strategy, + self.resources.clone(), + pk.clone(), + self.resources.config.base_node_query_timeout, + self.base_node_response_publisher.subscribe(), + ); - let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs { - outputs: output_hashes.clone(), - }); + let join_handle = tokio::spawn(utxo_validation_protocol.execute()); + utxo_validation_handles.push(join_handle); - let service_request = BaseNodeProto::BaseNodeServiceRequest { - request_key, - request: Some(request), - }; + Ok(id) + }, + } + } - let send_message_response = self - .outbound_message_service - .send_direct( - pk.clone(), - OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request), - ) - .await?; - - // Here we are going to spawn a non-blocking task that will monitor and log the progress of the - // send process. - tokio::spawn(async move { - match send_message_response.resolve().await { - Err(err) => warn!( - target: LOG_TARGET, - "Failed to send Output Manager UTXO query ({}) to Base Node: {}", request_key, err - ), - Ok(send_states) => { - trace!( - target: LOG_TARGET, - "Output Manager UTXO query ({}) queued for sending with Message {}", - request_key, - send_states[0].tag, - ); - let message_tag = send_states[0].tag; - if send_states.wait_single().await { - trace!( - target: LOG_TARGET, - "Output Manager UTXO query ({}) successfully sent to Base Node with Message {}", - request_key, - message_tag, - ) - } else { - trace!( - target: LOG_TARGET, - "Failed to send Output Manager UTXO query ({}) to Base Node with Message {}", - request_key, - message_tag, - ); - } - }, - } + async fn complete_utxo_validation_protocol(&mut self, join_result: Result) { + match join_result { + Ok(id) => { + trace!( + target: LOG_TARGET, + "UTXO Validation Protocol (Id: {}) completed successfully", + id + ); + }, + Err(OutputManagerProtocolError { id, error }) => { + error!( + target: LOG_TARGET, + "Error completing UTXO Validation Protocol (Id: {}): {:?}", id, error + ); + let _ = self + .resources + .event_publisher + .send(OutputManagerEvent::UtxoValidationFailure(id)) + .map_err(|e| { + trace!( + target: LOG_TARGET, + "Error sending event, usually because there are no subscribers: {:?}", + e + ); + e }); - - match query_type { - UtxoQueryType::UnspentOutputs => { - self.pending_utxo_query_keys.insert(request_key, output_hashes); - }, - UtxoQueryType::InvalidOutputs => { - self.pending_revalidation_query_keys.insert(request_key, output_hashes); - }, - } - - let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key); - utxo_query_timeout_futures.push(state_timeout.delay().boxed()); - - debug!( - target: LOG_TARGET, - "Output Manager {} query ({}) sent to Base Node, part {} of {} requests", - query_type, - request_key, - r + 1, - rounds - ); - } - // We are just going to return the first request key for use by the front end. It is very unlikely that - // a mobile wallet will ever have this query split up - Ok(first_request_key) }, } } /// Add an unblinded output to the unspent outputs list pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> { - let output = DbUnblindedOutput::from_unblinded_output(output, &self.factories)?; - Ok(self.db.add_unspent_output(output).await?) + let output = DbUnblindedOutput::from_unblinded_output(output, &self.resources.factories)?; + Ok(self.resources.db.add_unspent_output(output).await?) } async fn get_balance(&self) -> Result { - let balance = self.db.get_balance().await?; + let balance = self.resources.db.get_balance().await?; trace!(target: LOG_TARGET, "Balance: {:?}", balance); Ok(balance) } @@ -665,13 +399,20 @@ where { let mut key = PrivateKey::default(); { - let mut km = acquire_lock!(self.key_manager); + let mut km = self.key_manager.lock().await; key = km.next_key()?.k; } - self.db.increment_key_index().await?; - self.db - .accept_incoming_pending_transaction(tx_id, amount, key.clone(), OutputFeatures::default(), &self.factories) + self.resources.db.increment_key_index().await?; + self.resources + .db + .accept_incoming_pending_transaction( + tx_id, + amount, + key.clone(), + OutputFeatures::default(), + &self.resources.factories, + ) .await?; self.confirm_encumberance(tx_id).await?; @@ -686,20 +427,21 @@ where received_output: &TransactionOutput, ) -> Result<(), OutputManagerError> { - let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id).await?; + let pending_transaction = self.resources.db.fetch_pending_transaction_outputs(tx_id).await?; // Assumption: We are only allowing a single output per receiver in the current transaction protocols. if pending_transaction.outputs_to_be_received.len() != 1 || pending_transaction.outputs_to_be_received[0] .unblinded_output - .as_transaction_input(&self.factories.commitment, OutputFeatures::default()) + .as_transaction_input(&self.resources.factories.commitment, OutputFeatures::default()) .commitment != received_output.commitment { return Err(OutputManagerError::IncompleteTransaction); } - self.db + self.resources + .db .confirm_pending_transaction_outputs(pending_transaction.tx_id) .await?; @@ -735,8 +477,10 @@ where for uo in outputs.iter() { builder.with_input( - uo.unblinded_output - .as_transaction_input(&self.factories.commitment, uo.unblinded_output.clone().features), + uo.unblinded_output.as_transaction_input( + &self.resources.factories.commitment, + uo.unblinded_output.clone().features, + ), uo.unblinded_output.clone(), ); } @@ -748,16 +492,16 @@ where if total > amount + fee_without_change { let mut key = PrivateKey::default(); { - let mut km = acquire_lock!(self.key_manager); + let mut km = self.key_manager.lock().await; key = km.next_key()?.k; } - self.db.increment_key_index().await?; + self.resources.db.increment_key_index().await?; change_key = Some(key.clone()); builder.with_change_secret(key); } let stp = builder - .build::(&self.factories) + .build::(&self.resources.factories) .map_err(|e| OutputManagerError::BuildError(e.message))?; // If a change output was created add it to the pending_outputs list. @@ -769,13 +513,14 @@ where spending_key: key, features: OutputFeatures::default(), }, - &self.factories, + &self.resources.factories, )?); } // The Transaction Protocol built successfully so we will pull the unspent outputs out of the unspent list and // store them until the transaction times out OR is confirmed - self.db + self.resources + .db .encumber_outputs(stp.get_tx_id()?, outputs, change_output) .await?; @@ -785,7 +530,7 @@ where /// Confirm that a transaction has finished being negotiated between parties so the short-term encumberance can be /// made official async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> { - self.db.confirm_encumbered_outputs(tx_id).await?; + self.resources.db.confirm_encumbered_outputs(tx_id).await?; Ok(()) } @@ -800,7 +545,7 @@ where outputs: &[TransactionOutput], ) -> Result<(), OutputManagerError> { - let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id).await?; + let pending_transaction = self.resources.db.fetch_pending_transaction_outputs(tx_id).await?; // Check that outputs to be spent can all be found in the provided transaction inputs let mut inputs_confirmed = true; @@ -808,7 +553,7 @@ where let input_to_check = output_to_spend .unblinded_output .clone() - .as_transaction_input(&self.factories.commitment, OutputFeatures::default()); + .as_transaction_input(&self.resources.factories.commitment, OutputFeatures::default()); inputs_confirmed = inputs_confirmed && inputs.iter().any(|input| input.commitment == input_to_check.commitment); } @@ -819,7 +564,7 @@ where let output_to_check = output_to_receive .unblinded_output .clone() - .as_transaction_input(&self.factories.commitment, OutputFeatures::default()); + .as_transaction_input(&self.resources.factories.commitment, OutputFeatures::default()); outputs_confirmed = outputs_confirmed && outputs .iter() @@ -830,7 +575,8 @@ where return Err(OutputManagerError::IncompleteTransaction); } - self.db + self.resources + .db .confirm_pending_transaction_outputs(pending_transaction.tx_id) .await?; @@ -844,12 +590,12 @@ where "Cancelling pending transaction outputs for TxId: {}", tx_id ); - Ok(self.db.cancel_pending_transaction_outputs(tx_id).await?) + Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?) } /// Go through the pending transaction and if any have existed longer than the specified duration, cancel them async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> { - Ok(self.db.timeout_pending_transaction_outputs(period).await?) + Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?) } /// Select which unspent transaction outputs to use to send a transaction of the specified amount. Use the specified @@ -867,7 +613,7 @@ where let mut fee_without_change = MicroTari::from(0); let mut fee_with_change = MicroTari::from(0); - let uo = self.db.fetch_sorted_unspent_outputs().await?; + let uo = self.resources.db.fetch_sorted_unspent_outputs().await?; // Heuristic for selecting strategy: Default to MaturityThenSmallest, but if amount > // alpha * largest UTXO, use Largest @@ -935,16 +681,21 @@ where async fn set_base_node_public_key( &mut self, base_node_public_key: CommsPublicKey, - utxo_query_timeout_futures: &mut FuturesUnordered>, + utxo_validation_handles: &mut FuturesUnordered>>, ) -> Result<(), OutputManagerError> { - let startup_query = self.base_node_public_key.is_none(); + let startup_query = self.resources.base_node_public_key.is_none(); - self.base_node_public_key = Some(base_node_public_key); + self.resources.base_node_public_key = Some(base_node_public_key); if startup_query { - self.query_unspent_outputs_status(utxo_query_timeout_futures).await?; - self.query_invalid_outputs_status(utxo_query_timeout_futures).await?; + // This validation is not critical so if the Base node is not reachable we will wait until the next restart + // after 5 attempts + self.validate_outputs( + UtxoValidationType::Invalid, + UtxoValidationRetry::Limited(5), + utxo_validation_handles, + )?; } Ok(()) } @@ -952,19 +703,19 @@ where pub async fn fetch_pending_transaction_outputs( &self, ) -> Result, OutputManagerError> { - Ok(self.db.fetch_all_pending_transaction_outputs().await?) + Ok(self.resources.db.fetch_all_pending_transaction_outputs().await?) } pub async fn fetch_spent_outputs(&self) -> Result, OutputManagerError> { - Ok(self.db.fetch_spent_outputs().await?) + Ok(self.resources.db.fetch_spent_outputs().await?) } pub async fn fetch_unspent_outputs(&self) -> Result, OutputManagerError> { - Ok(self.db.fetch_sorted_unspent_outputs().await?) + Ok(self.resources.db.fetch_sorted_unspent_outputs().await?) } pub async fn fetch_invalid_outputs(&self) -> Result, OutputManagerError> { - Ok(self.db.get_invalid_outputs().await?) + Ok(self.resources.db.get_invalid_outputs().await?) } async fn create_coin_split( @@ -1010,8 +761,10 @@ where trace!(target: LOG_TARGET, "Add inputs to coin split transaction."); for uo in inputs.iter() { builder.with_input( - uo.unblinded_output - .as_transaction_input(&self.factories.commitment, uo.unblinded_output.clone().features), + uo.unblinded_output.as_transaction_input( + &self.resources.factories.commitment, + uo.unblinded_output.clone().features, + ), uo.unblinded_output.clone(), ); } @@ -1031,13 +784,13 @@ where let mut spend_key = PrivateKey::default(); { - let mut km = acquire_lock!(self.key_manager); + let mut km = self.key_manager.lock().await; spend_key = km.next_key()?.k; } - self.db.increment_key_index().await?; + self.resources.db.increment_key_index().await?; let utxo = DbUnblindedOutput::from_unblinded_output( UnblindedOutput::new(output_amount, spend_key, None), - &self.factories, + &self.resources.factories, )?; outputs.push(utxo.clone()); builder.with_output(utxo.unblinded_output); @@ -1045,7 +798,7 @@ where trace!(target: LOG_TARGET, "Build coin split transaction."); let factories = CryptoFactories::default(); let mut stp = builder - .build::(&self.factories) + .build::(&self.resources.factories) .map_err(|e| OutputManagerError::BuildError(e.message))?; // The Transaction Protocol built successfully so we will pull the unspent outputs out of the unspent list and // store them until the transaction times out OR is confirmed @@ -1055,7 +808,7 @@ where "Encumber coin split transaction ({}) outputs.", tx_id ); - self.db.encumber_outputs(tx_id, inputs, outputs).await?; + self.resources.db.encumber_outputs(tx_id, inputs, outputs).await?; self.confirm_encumberance(tx_id).await?; trace!(target: LOG_TARGET, "Finalize coin split transaction ({}).", tx_id); stp.finalize(KernelFeatures::empty(), &factories)?; @@ -1064,9 +817,9 @@ where } /// Return the Seed words for the current Master Key set in the Key Manager - pub fn get_seed_words(&self) -> Result, OutputManagerError> { + pub async fn get_seed_words(&self) -> Result, OutputManagerError> { Ok(from_secret_key( - &acquire_lock!(self.key_manager).master_key, + &self.key_manager.lock().await.master_key, &MnemonicLanguage::English, )?) } @@ -1104,16 +857,16 @@ impl fmt::Display for Balance { } } -enum UtxoQueryType { - UnspentOutputs, - InvalidOutputs, -} - -impl fmt::Display for UtxoQueryType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - UtxoQueryType::UnspentOutputs => write!(f, "Unspent Outputs"), - UtxoQueryType::InvalidOutputs => write!(f, "Invalid Outputs"), - } - } +/// This struct is a collection of the common resources that a async task in the service requires. +#[derive(Clone)] +pub struct OutputManagerResources +where TBackend: OutputManagerBackend + Clone + 'static +{ + pub config: OutputManagerServiceConfig, + pub db: OutputManagerDatabase, + pub outbound_message_service: OutboundMessageRequester, + pub transaction_service: TransactionServiceHandle, + pub factories: CryptoFactories, + pub base_node_public_key: Option, + pub event_publisher: OutputManagerEventSender, } diff --git a/base_layer/wallet/src/output_manager_service/storage/database.rs b/base_layer/wallet/src/output_manager_service/storage/database.rs index eaea0f0b45..e44550858a 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database.rs @@ -155,14 +155,15 @@ macro_rules! fetch { /// This structure holds an inner type that implements the `OutputManagerBackend` trait and contains the more complex /// data access logic required by the module built onto the functionality defined by the trait +#[derive(Clone)] pub struct OutputManagerDatabase -where T: OutputManagerBackend + 'static +where T: OutputManagerBackend + Clone + 'static { db: Arc, } impl OutputManagerDatabase -where T: OutputManagerBackend + 'static +where T: OutputManagerBackend + Clone + 'static { pub fn new(db: T) -> Self { Self { db: Arc::new(db) } @@ -177,7 +178,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::KeyManagerState, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -187,7 +188,7 @@ where T: OutputManagerBackend + 'static db_clone.write(WriteOperation::Insert(DbKeyValuePair::KeyManagerState(state))) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -196,7 +197,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.increment_key_index()) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -209,7 +210,7 @@ where T: OutputManagerBackend + 'static ))) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -226,7 +227,7 @@ where T: OutputManagerBackend + 'static }) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; let unspent_outputs = tokio::task::spawn_blocking(move || { db_clone2.fetch(&DbKey::UnspentOutputs)?.ok_or_else(|| { @@ -234,7 +235,7 @@ where T: OutputManagerBackend + 'static }) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; if let DbValue::UnspentOutputs(uo) = unspent_outputs { if let DbValue::AllPendingTransactionOutputs(pto) = pending_txs { let available_balance = uo @@ -280,7 +281,7 @@ where T: OutputManagerBackend + 'static ))) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -293,7 +294,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || fetch!(db_clone, tx_id, PendingTransactionOutputs)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -304,7 +305,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.confirm_transaction(tx_id)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -340,7 +341,7 @@ where T: OutputManagerBackend + 'static ))) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -358,7 +359,7 @@ where T: OutputManagerBackend + 'static db_clone.short_term_encumber_outputs(tx_id, &outputs_to_send, &outputs_to_receive) }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -368,7 +369,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.confirm_encumbered_outputs(tx_id)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -378,7 +379,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.clear_short_term_encumberances()) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -388,7 +389,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.cancel_pending_transaction(tx_id)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -398,7 +399,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.timeout_pending_transactions(period)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -415,7 +416,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::UnspentOutputs, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; uo.sort(); Ok(uo) @@ -434,7 +435,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::SpentOutputs, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(uo) } @@ -455,7 +456,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::AllPendingTransactionOutputs, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(uo) } @@ -472,7 +473,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::UnspentOutputs, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(uo) } @@ -489,7 +490,7 @@ where T: OutputManagerBackend + 'static Err(e) => log_error(DbKey::InvalidOutputs, e), }) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(uo) } @@ -501,7 +502,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.invalidate_unspent_output(&output)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -509,7 +510,7 @@ where T: OutputManagerBackend + 'static let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.revalidate_unspent_output(&spending_key)) .await - .or_else(|err| Err(OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } } diff --git a/base_layer/wallet/src/storage/database.rs b/base_layer/wallet/src/storage/database.rs index d388887110..a43888094d 100644 --- a/base_layer/wallet/src/storage/database.rs +++ b/base_layer/wallet/src/storage/database.rs @@ -96,7 +96,7 @@ where T: WalletBackend + 'static tokio::task::spawn_blocking(move || fetch!(db_clone, pub_key.clone(), Peer)) .await - .or_else(|err| Err(WalletStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } diff --git a/base_layer/wallet/src/testnet_utils.rs b/base_layer/wallet/src/testnet_utils.rs index ab38b670ca..92f9e0b119 100644 --- a/base_layer/wallet/src/testnet_utils.rs +++ b/base_layer/wallet/src/testnet_utils.rs @@ -168,7 +168,7 @@ pub fn get_next_memory_address() -> Multiaddr { pub fn generate_wallet_test_data< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, P: AsRef, >( @@ -691,7 +691,7 @@ pub fn generate_wallet_test_data< pub fn complete_sent_transaction< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, >( wallet: &mut Wallet, @@ -735,7 +735,7 @@ pub fn complete_sent_transaction< pub fn receive_test_transaction< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, >( wallet: &mut Wallet, @@ -765,7 +765,7 @@ pub fn receive_test_transaction< pub fn finalize_received_transaction< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, >( wallet: &mut Wallet, @@ -786,7 +786,7 @@ pub fn finalize_received_transaction< pub fn broadcast_transaction< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, >( wallet: &mut Wallet, @@ -807,7 +807,7 @@ pub fn broadcast_transaction< pub fn mine_transaction< T: WalletBackend, U: TransactionBackend + Clone, - V: OutputManagerBackend, + V: OutputManagerBackend + Clone, W: ContactsBackend, >( wallet: &mut Wallet, diff --git a/base_layer/wallet/src/transaction_service/handle.rs b/base_layer/wallet/src/transaction_service/handle.rs index 98aff042fd..b700be8ad4 100644 --- a/base_layer/wallet/src/transaction_service/handle.rs +++ b/base_layer/wallet/src/transaction_service/handle.rs @@ -108,7 +108,7 @@ pub enum TransactionServiceResponse { PendingInboundTransactions(HashMap), PendingOutboundTransactions(HashMap), CompletedTransactions(HashMap), - CompletedTransaction(CompletedTransaction), + CompletedTransaction(Box), BaseNodePublicKeySet, UtxoImported(TxId), TransactionSubmitted, @@ -290,7 +290,7 @@ impl TransactionServiceHandle { .call(TransactionServiceRequest::GetCompletedTransaction(tx_id)) .await?? { - TransactionServiceResponse::CompletedTransaction(t) => Ok(t), + TransactionServiceResponse::CompletedTransaction(t) => Ok(*t), _ => Err(TransactionServiceError::UnexpectedApiResponse), } } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index 2f1793068f..00aa30f75a 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -257,12 +257,7 @@ where TBackend: TransactionBackend + Clone + 'static error!(target: LOG_TARGET, "Invalid Mempool response variant"); }, MempoolResponse::TxStorage(ts) => { - let completed_tx = match self - .resources - .db - .get_completed_transaction(response.request_key.clone()) - .await - { + let completed_tx = match self.resources.db.get_completed_transaction(response.request_key).await { Ok(tx) => tx, Err(e) => { error!( diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index e6b72cbb70..b657b2c563 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -454,9 +454,11 @@ where self.db.get_cancelled_completed_transactions().await?, )) }, - TransactionServiceRequest::GetCompletedTransaction(tx_id) => Ok( - TransactionServiceResponse::CompletedTransaction(self.db.get_completed_transaction(tx_id).await?), - ), + TransactionServiceRequest::GetCompletedTransaction(tx_id) => { + Ok(TransactionServiceResponse::CompletedTransaction(Box::new( + self.db.get_completed_transaction(tx_id).await?, + ))) + }, TransactionServiceRequest::SetBaseNodePublicKey(public_key) => self .set_base_node_public_key( public_key, @@ -601,12 +603,12 @@ where let _ = self .broadcast_completed_transaction_to_mempool(id, transaction_broadcast_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { error!( target: LOG_TARGET, "Error starting Broadcast Protocol after completed Send Transaction Protocol : {:?}", resp ); - Err(resp) + resp }); trace!( target: LOG_TARGET, @@ -907,45 +909,45 @@ where let _ = self .broadcast_all_completed_transactions_to_mempool(broadcast_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { error!( target: LOG_TARGET, "Error broadcasting all completed transactions: {:?}", resp ); - Err(resp) + resp }); let _ = self .start_chain_monitoring_for_all_broadcast_transactions(chain_monitoring_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { error!( target: LOG_TARGET, "Error querying base_node for all completed transactions: {:?}", resp ); - Err(resp) + resp }); let _ = self .restart_all_send_transaction_protocols(send_transaction_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { error!( target: LOG_TARGET, "Error restarting protocols for all pending outbound transactions: {:?}", resp ); - Err(resp) + resp }); let _ = self .restart_all_receive_transaction_protocols(receive_transaction_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { error!( target: LOG_TARGET, "Error restarting protocols for all pending inbound transactions: {:?}", resp ); - Err(resp) + resp }); } Ok(()) @@ -1066,7 +1068,7 @@ where let _ = self .start_transaction_chain_monitoring_protocol(id, transaction_chain_monitoring_join_handles) .await - .or_else(|resp| { + .map_err(|resp| { match resp { TransactionServiceError::InvalidCompletedTransaction => trace!( target: LOG_TARGET, @@ -1079,7 +1081,7 @@ where resp ), } - Err(resp) + resp }); }, Err(TransactionServiceProtocolError { id, error }) => { @@ -1421,13 +1423,12 @@ where transaction_service::{handle::TransactionServiceHandle, storage::database::InboundTransaction}, }; use futures::stream; - use tari_broadcast_channel::bounded; use tari_core::transactions::{transaction::OutputFeatures, ReceiverTransactionProtocol}; use tari_crypto::keys::SecretKey; let (_sender, receiver) = reply_channel::unbounded(); let (tx, _rx) = mpsc::channel(20); - let (oms_event_publisher, _oms_event_subscriber) = bounded(100, 118); + let (oms_event_publisher, _oms_event_subscriber) = broadcast::channel(100); let (ts_request_sender, _ts_request_receiver) = reply_channel::unbounded(); let (event_publisher, _) = broadcast::channel(100); let ts_handle = TransactionServiceHandle::new(ts_request_sender, event_publisher.clone()); diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index 4e5ada2945..4461714dbc 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -32,7 +32,7 @@ use std::{ }; use tari_comms::types::CommsPublicKey; use tari_core::transactions::{ - tari_amount::{uT, MicroTari}, + tari_amount::MicroTari, transaction::Transaction, types::{BlindingFactor, PrivateKey}, ReceiverTransactionProtocol, @@ -393,7 +393,7 @@ where T: TransactionBackend + 'static ))) }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -412,7 +412,7 @@ where T: TransactionBackend + 'static ))) }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -422,7 +422,7 @@ where T: TransactionBackend + 'static db_clone.write(WriteOperation::Remove(DbKey::PendingOutboundTransaction(tx_id))) }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } @@ -432,7 +432,7 @@ where T: TransactionBackend + 'static let tx_id_clone = tx_id; tokio::task::spawn_blocking(move || db_clone.transaction_exists(tx_id_clone)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -451,7 +451,7 @@ where T: TransactionBackend + 'static ))) }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -490,7 +490,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(*t) } @@ -529,7 +529,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(*t) } @@ -568,7 +568,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(*t) } @@ -609,7 +609,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(t) } @@ -650,7 +650,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(t) } @@ -663,7 +663,7 @@ where T: TransactionBackend + 'static let pub_key = tokio::task::spawn_blocking(move || db_clone.get_pending_transaction_counterparty_pub_key_by_tx_id(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(pub_key) } @@ -702,7 +702,7 @@ where T: TransactionBackend + 'static Err(e) => log_error(key, e), }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(t) } @@ -717,7 +717,7 @@ where T: TransactionBackend + 'static tokio::task::spawn_blocking(move || db_clone.complete_outbound_transaction(tx_id, transaction)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } @@ -732,57 +732,56 @@ where T: TransactionBackend + 'static tokio::task::spawn_blocking(move || db_clone.complete_inbound_transaction(tx_id, transaction)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } - pub async fn cancel_completed_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionStorageError> { + pub async fn cancel_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.cancel_completed_transaction(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } - pub async fn cancel_pending_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionStorageError> { + pub async fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.cancel_pending_transaction(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } - pub async fn mark_direct_send_success(&mut self, tx_id: TxId) -> Result<(), TransactionStorageError> { + pub async fn mark_direct_send_success(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.mark_direct_send_success(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } /// Indicated that the specified completed transaction has been broadcast into the mempool - pub async fn broadcast_completed_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionStorageError> { + pub async fn broadcast_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.broadcast_completed_transaction(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } /// Indicated that the specified completed transaction has been detected as mined on the base layer - pub async fn mine_completed_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionStorageError> { + pub async fn mine_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); tokio::task::spawn_blocking(move || db_clone.mine_completed_transaction(tx_id)) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string()))) + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) .and_then(|inner_result| inner_result) } - #[allow(clippy::erasing_op)] // this is for 0 * uT pub async fn add_utxo_import_transaction( - &mut self, + &self, tx_id: TxId, amount: MicroTari, source_public_key: CommsPublicKey, @@ -795,7 +794,7 @@ where T: TransactionBackend + 'static source_public_key.clone(), comms_public_key.clone(), amount, - 0 * uT, + MicroTari::from(0), Transaction::new(Vec::new(), Vec::new(), Vec::new(), BlindingFactor::default()), TransactionStatus::Imported, message, @@ -810,7 +809,7 @@ where T: TransactionBackend + 'static ))) }) .await - .or_else(|err| Err(TransactionStorageError::BlockingTaskSpawnError(err.to_string())))??; + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } } diff --git a/base_layer/wallet/src/transaction_service/storage/memory_db.rs b/base_layer/wallet/src/transaction_service/storage/memory_db.rs index d9a4df908c..a285fb7d0e 100644 --- a/base_layer/wallet/src/transaction_service/storage/memory_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/memory_db.rs @@ -160,7 +160,7 @@ impl TransactionBackend for TransactionMemoryDatabase { let mut result = HashMap::new(); for (k, v) in db.completed_transactions.iter() { if v.cancelled { - result.insert(k.clone(), v.clone()); + result.insert(*k, v.clone()); } } Some(DbValue::CompletedTransactions(result)) diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 0cf3b56752..403db3d559 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -26,6 +26,7 @@ use crate::{ output_manager_service::{ config::OutputManagerServiceConfig, handle::OutputManagerHandle, + protocols::utxo_validation_protocol::UtxoValidationRetry, storage::database::OutputManagerBackend, OutputManagerServiceInitializer, TxId, @@ -82,7 +83,7 @@ pub struct Wallet where T: WalletBackend + 'static, U: TransactionBackend + Clone + 'static, - V: OutputManagerBackend + 'static, + V: OutputManagerBackend + Clone + 'static, W: ContactsBackend + 'static, { pub comms: CommsNode, @@ -105,7 +106,7 @@ impl Wallet where T: WalletBackend + 'static, U: TransactionBackend + Clone + 'static, - V: OutputManagerBackend + 'static, + V: OutputManagerBackend + Clone + 'static, W: ContactsBackend + 'static, { pub fn new( @@ -284,13 +285,13 @@ where /// Have all the wallet components that need to start a sync process with the set base node to confirm the wallets /// state is accurately reflected on the blockchain - pub fn sync_with_base_node(&mut self) -> Result { + pub fn validate_utxos(&mut self, retries: UtxoValidationRetry) -> Result { self.runtime .block_on(self.store_and_forward_requester.request_saf_messages_from_neighbours())?; let request_key = self .runtime - .block_on(self.output_manager_service.sync_with_base_node())?; + .block_on(self.output_manager_service.validate_utxos(retries))?; Ok(request_key) } diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service/service.rs index 00f84bbb95..0cbde2ab54 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service/service.rs @@ -33,7 +33,6 @@ use futures::{ use prost::Message; use rand::{rngs::OsRng, RngCore}; use std::{thread, time::Duration}; -use tari_broadcast_channel::bounded; use tari_comms::{ message::EnvelopeBody, peer_manager::{NodeIdentity, PeerFeatures}, @@ -65,12 +64,12 @@ use tari_crypto::{ use tari_p2p::domain_message::DomainMessage; use tari_service_framework::reply_channel; use tari_shutdown::Shutdown; -use tari_test_utils::collect_stream; use tari_wallet::{ output_manager_service::{ config::OutputManagerServiceConfig, error::{OutputManagerError, OutputManagerStorageError}, handle::{OutputManagerEvent, OutputManagerHandle}, + protocols::utxo_validation_protocol::UtxoValidationRetry, service::OutputManagerService, storage::{ database::{DbKey, DbKeyValuePair, DbValue, OutputManagerBackend, OutputManagerDatabase, WriteOperation}, @@ -83,9 +82,13 @@ use tari_wallet::{ transaction_service::handle::TransactionServiceHandle, }; use tempdir::TempDir; -use tokio::{runtime::Runtime, sync::broadcast::channel, time::delay_for}; +use tokio::{ + runtime::Runtime, + sync::{broadcast, broadcast::channel}, + time::delay_for, +}; -pub fn setup_output_manager_service( +pub fn setup_output_manager_service( runtime: &mut Runtime, backend: T, ) -> ( @@ -102,7 +105,7 @@ pub fn setup_output_manager_service( let (outbound_message_requester, mock_outbound_service) = create_outbound_service_mock(20); let (oms_request_sender, oms_request_receiver) = reply_channel::unbounded(); let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(20); - let (oms_event_publisher, oms_event_subscriber) = bounded(100, 115); + let (oms_event_publisher, _) = broadcast::channel(200); let (ts_request_sender, _ts_request_receiver) = reply_channel::unbounded(); let (event_publisher, _) = channel(100); @@ -119,11 +122,11 @@ pub fn setup_output_manager_service( oms_request_receiver, base_node_response_receiver, OutputManagerDatabase::new(backend), - oms_event_publisher, + oms_event_publisher.clone(), factories.clone(), )) .unwrap(); - let output_manager_service_handle = OutputManagerHandle::new(oms_request_sender, oms_event_subscriber); + let output_manager_service_handle = OutputManagerHandle::new(oms_request_sender, oms_event_publisher); runtime.spawn(async move { output_manager_service.start().await.unwrap() }); @@ -243,7 +246,7 @@ fn sending_transaction_and_confirmation_sqlite_db() { sending_transaction_and_confirmation(OutputManagerSqliteDatabase::new(connection)); } -fn send_not_enough_funds(backend: T) { +fn send_not_enough_funds(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -286,7 +289,7 @@ fn send_not_enough_funds_sqlite_db() { send_not_enough_funds(OutputManagerSqliteDatabase::new(connection)); } -fn send_no_change(backend: T) { +fn send_no_change(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -362,7 +365,7 @@ fn send_no_change_sqlite_db() { send_no_change(OutputManagerSqliteDatabase::new(connection)); } -fn send_not_enough_for_change(backend: T) { +fn send_not_enough_for_change(backend: T) { let mut runtime = Runtime::new().unwrap(); let (mut oms, _, _shutdown, _, _) = setup_output_manager_service(&mut runtime, backend); @@ -407,7 +410,7 @@ fn send_not_enough_for_change_sqlite_db() { send_not_enough_for_change(OutputManagerSqliteDatabase::new(connection)); } -fn receiving_and_confirmation(backend: T) { +fn receiving_and_confirmation(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -451,7 +454,7 @@ fn receiving_and_confirmation_sqlite_db() { receiving_and_confirmation(OutputManagerSqliteDatabase::new(connection)); } -fn cancel_transaction(backend: T) { +fn cancel_transaction(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -501,7 +504,7 @@ fn cancel_transaction_sqlite_db() { cancel_transaction(OutputManagerSqliteDatabase::new(connection)); } -fn timeout_transaction(backend: T) { +fn timeout_transaction(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -556,7 +559,7 @@ fn timeout_transaction_sqlite_db() { timeout_transaction(OutputManagerSqliteDatabase::new(connection)); } -fn test_get_balance(backend: T) { +fn test_get_balance(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -609,7 +612,7 @@ fn test_get_balance_sqlite_db() { test_get_balance(OutputManagerSqliteDatabase::new(connection)); } -fn test_confirming_received_output(backend: T) { +fn test_confirming_received_output(backend: T) { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); @@ -674,6 +677,8 @@ fn test_startup_utxo_scan() { let (mut oms, outbound_service, _shutdown, mut base_node_response_sender, _) = setup_output_manager_service(&mut runtime, backend); + let mut event_stream = oms.get_event_stream_fused(); + let mut hashes = Vec::new(); let key1 = PrivateKey::random(&mut OsRng); let value1 = 500; @@ -717,26 +722,29 @@ fn test_startup_utxo_scan() { runtime .block_on(oms.set_base_node_public_key(base_node_identity.public_key().clone())) .unwrap(); + runtime + .block_on(oms.validate_utxos(UtxoValidationRetry::UntilSuccess)) + .unwrap(); outbound_service .wait_call_count(3, Duration::from_secs(60)) .expect("call wait 1"); - let (_, _) = outbound_service.pop_call().unwrap(); // Burn the invalid request - let (_, body) = outbound_service.pop_call().unwrap(); let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let bn_request1: BaseNodeProto::BaseNodeServiceRequest = envelope_body .decode_part::(1) .unwrap() .unwrap(); - + let mut hashes_found = 0; match bn_request1.request { None => assert!(false, "Invalid request"), Some(request) => match request { Request::FetchUtxos(hash_outputs) => { for h in hash_outputs.outputs { - assert!(hashes.iter().find(|i| **i == h).is_some(), "Should contain hash"); + if hashes.iter().find(|i| **i == h).is_some() { + hashes_found += 1; + } } }, _ => assert!(false, "invalid request"), @@ -755,36 +763,69 @@ fn test_startup_utxo_scan() { Some(request) => match request { Request::FetchUtxos(hash_outputs) => { for h in hash_outputs.outputs { - assert!(hashes.iter().find(|i| **i == h).is_some(), "Should contain hash2"); + if hashes.iter().find(|i| **i == h).is_some() { + hashes_found += 1; + } } }, _ => assert!(false, "invalid request"), }, } - let result_stream = runtime.block_on(async { - collect_stream!( - oms.get_event_stream_fused().map(|i| (*i).clone()), - take = 3, - timeout = Duration::from_secs(60) - ) - }); + let (_, body) = outbound_service.pop_call().unwrap(); + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let bn_request3: BaseNodeProto::BaseNodeServiceRequest = envelope_body + .decode_part::(1) + .unwrap() + .unwrap(); - assert_eq!( - 3, - result_stream.iter().fold(0, |acc, item| { - if let OutputManagerEvent::BaseNodeSyncRequestTimedOut(_) = item { - acc + 1 - } else { - acc + match bn_request3.request { + None => assert!(false, "Invalid request"), + Some(request) => match request { + Request::FetchUtxos(hash_outputs) => { + for h in hash_outputs.outputs { + if hashes.iter().find(|i| **i == h).is_some() { + hashes_found += 1; + } + } + }, + _ => assert!(false, "invalid request"), + }, + } + + assert_eq!(hashes_found, 4, "Should have found our Unspent UTXO hashes"); + + runtime.block_on(async { + let mut delay = delay_for(Duration::from_secs(60)).fuse(); + let mut timeouts = 0; + loop { + futures::select! { + event = event_stream.select_next_some() => { + match event.unwrap() { + OutputManagerEvent::UtxoValidationTimedOut(_) => { + timeouts+=1; + }, + _ => (), + } + if timeouts >= 2 { + break; + } + }, + () = delay => { + break; + }, } - }) - ); + } + assert_eq!(timeouts, 2); + }); // Test the response to the revalidation call first so as not to confuse the invalidation that happens during the // responses to the Unspent UTXO queries let mut invalid_request_key = 0; let mut unspent_request_key_with_output1 = 0; + let mut unspent_request_key2 = 0; + + outbound_service.wait_call_count(3, Duration::from_secs(60)).unwrap(); for _ in 0..3 { let (_, body) = outbound_service.pop_call().unwrap(); @@ -803,9 +844,10 @@ fn test_startup_utxo_scan() { if request_hashes.iter().find(|i| **i == invalid_hash).is_some() { invalid_request_key = bn_request.request_key; - } - if request_hashes.iter().find(|i| **i == output1_hash).is_some() { + } else if request_hashes.iter().find(|i| **i == output1_hash).is_some() { unspent_request_key_with_output1 = bn_request.request_key; + } else { + unspent_request_key2 = bn_request.request_key; } } assert_ne!(invalid_request_key, 0, "Should have found invalid request key"); @@ -813,6 +855,10 @@ fn test_startup_utxo_scan() { unspent_request_key_with_output1, 0, "Should have found request key for request with output 1 in it" ); + assert_ne!( + unspent_request_key2, 0, + "Should have found request key for second unspent outputs request" + ); let invalid_txs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); assert_eq!(invalid_txs.len(), 1); @@ -832,15 +878,13 @@ fn test_startup_utxo_scan() { ))) .unwrap(); - let mut event_stream = oms.get_event_stream_fused(); - runtime.block_on(async { let mut delay = delay_for(Duration::from_secs(60)).fuse(); let mut acc = 0; loop { futures::select! { event = event_stream.select_next_some() => { - if let OutputManagerEvent::ReceiveBaseNodeResponse(_) = (*event).clone() { + if let OutputManagerEvent::UtxoValidationSuccess(_) = event.unwrap() { acc += 1; if acc >= 1 { break; @@ -858,10 +902,10 @@ fn test_startup_utxo_scan() { let invalid_txs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); assert_eq!(invalid_txs.len(), 0); - let key4 = PrivateKey::random(&mut OsRng); - let value4 = 1000; - let output4 = UnblindedOutput::new(MicroTari::from(value4), key4, None); - runtime.block_on(oms.add_output(output4.clone())).unwrap(); + let key5 = PrivateKey::random(&mut OsRng); + let value5 = 1000; + let output5 = UnblindedOutput::new(MicroTari::from(value5), key5, None); + runtime.block_on(oms.add_output(output5.clone())).unwrap(); let invalid_txs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); assert_eq!(invalid_txs.len(), 0); @@ -894,6 +938,20 @@ fn test_startup_utxo_scan() { )), }; + runtime + .block_on(base_node_response_sender.send(create_dummy_message( + base_node_response, + base_node_identity.public_key(), + ))) + .unwrap(); + + let base_node_response = BaseNodeProto::BaseNodeServiceResponse { + request_key: unspent_request_key2, + response: Some(BaseNodeResponseProto::TransactionOutputs( + BaseNodeProto::TransactionOutputs { outputs: vec![].into() }, + )), + }; + runtime .block_on(base_node_response_sender.send(create_dummy_message( base_node_response, @@ -907,7 +965,7 @@ fn test_startup_utxo_scan() { loop { futures::select! { event = event_stream.select_next_some() => { - if let OutputManagerEvent::ReceiveBaseNodeResponse(_) = (*event).clone() { + if let OutputManagerEvent::UtxoValidationSuccess(_) = event.unwrap() { acc += 1; if acc >= 1 { break; @@ -919,27 +977,24 @@ fn test_startup_utxo_scan() { }, } } - assert!(acc >= 1, "Did not receive enough responses"); + assert!(acc >= 1, "Did not receive enough responses2"); }); let invalid_outputs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); - assert_eq!(invalid_outputs.len(), 1); - let check2 = invalid_outputs[0] == output2; - let check3 = invalid_outputs[0] == output3; - let check4 = invalid_outputs[0] == output4; - - assert!(check2 || check3 || check4, "One of these outputs should be invalid"); + assert_eq!(invalid_outputs.len(), 3); let unspent_outputs = runtime.block_on(oms.get_unspent_outputs()).unwrap(); - assert_eq!(unspent_outputs.len(), 5); + assert_eq!(unspent_outputs.len(), 3); assert!(unspent_outputs.iter().find(|uo| uo == &&output1).is_some()); - if check2 { - assert!(unspent_outputs.iter().find(|uo| uo == &&output3).is_some()) - } else { - assert!(unspent_outputs.iter().find(|uo| uo == &&output2).is_some()) - } + assert!(unspent_outputs.iter().find(|uo| uo == &&output2).is_none()); + assert!(unspent_outputs.iter().find(|uo| uo == &&output3).is_none()); + assert!(unspent_outputs.iter().find(|uo| uo == &&output4).is_none()); - runtime.block_on(oms.sync_with_base_node()).unwrap(); + runtime + .block_on(oms.validate_utxos(UtxoValidationRetry::Limited(1))) + .unwrap(); + + outbound_service.wait_call_count(2, Duration::from_secs(60)).unwrap(); let (_, body) = outbound_service.pop_call().unwrap(); let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); @@ -955,15 +1010,8 @@ fn test_startup_utxo_scan() { .unwrap() .unwrap(); - let (_, body) = outbound_service.pop_call().unwrap(); - let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); - let bn_request3: BaseNodeProto::BaseNodeServiceRequest = envelope_body - .decode_part::(1) - .unwrap() - .unwrap(); - let invalid_txs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); - assert_eq!(invalid_txs.len(), 1); + assert_eq!(invalid_txs.len(), 3); let base_node_response = BaseNodeProto::BaseNodeServiceResponse { request_key: bn_request.request_key.clone(), @@ -992,30 +1040,15 @@ fn test_startup_utxo_scan() { ))) .unwrap(); - let base_node_response3 = BaseNodeProto::BaseNodeServiceResponse { - request_key: bn_request3.request_key.clone(), - response: Some(BaseNodeResponseProto::TransactionOutputs( - BaseNodeProto::TransactionOutputs { outputs: vec![].into() }, - )), - }; - runtime - .block_on(base_node_response_sender.send(create_dummy_message( - base_node_response3, - base_node_identity.public_key(), - ))) - .unwrap(); - - let mut event_stream = oms.get_event_stream_fused(); - runtime.block_on(async { let mut delay = delay_for(Duration::from_secs(30)).fuse(); let mut acc = 0; loop { futures::select! { event = event_stream.select_next_some() => { - if let OutputManagerEvent::ReceiveBaseNodeResponse(r) = (*event).clone() { + if let OutputManagerEvent::UtxoValidationSuccess(_r) = event.unwrap() { acc += 1; - if acc >= 4 { + if acc >= 1 { break; } } @@ -1025,7 +1058,7 @@ fn test_startup_utxo_scan() { }, } } - assert!(acc >= 3, "Did not receive enough responses"); + assert_eq!(acc, 1, "Did not receive enough responses3"); }); let invalid_txs = runtime.block_on(oms.get_invalid_outputs()).unwrap(); diff --git a/base_layer/wallet/tests/output_manager_service/storage.rs b/base_layer/wallet/tests/output_manager_service/storage.rs index b3daf35d50..70674620c7 100644 --- a/base_layer/wallet/tests/output_manager_service/storage.rs +++ b/base_layer/wallet/tests/output_manager_service/storage.rs @@ -45,7 +45,7 @@ use tari_wallet::{ use tempdir::TempDir; use tokio::runtime::Runtime; -pub fn test_db_backend(backend: T) { +pub fn test_db_backend(backend: T) { let mut runtime = Runtime::new().unwrap(); let db = OutputManagerDatabase::new(backend); let factories = CryptoFactories::default(); @@ -349,7 +349,7 @@ pub fn test_output_manager_sqlite_db() { test_db_backend(OutputManagerSqliteDatabase::new(connection)); } -pub fn test_key_manager_crud(backend: T) { +pub fn test_key_manager_crud(backend: T) { let mut runtime = Runtime::new().unwrap(); let db = OutputManagerDatabase::new(backend); @@ -400,7 +400,7 @@ pub fn test_key_manager_crud_sqlite_db() { test_key_manager_crud(OutputManagerSqliteDatabase::new(connection)); } -pub async fn test_short_term_encumberance(backend: T) { +pub async fn test_short_term_encumberance(backend: T) { let factories = CryptoFactories::default(); let db = OutputManagerDatabase::new(backend); diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 5b140e7982..54e8a0b728 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -40,7 +40,6 @@ use std::{ sync::Arc, time::Duration, }; -use tari_broadcast_channel::bounded; use tari_comms::{ message::EnvelopeBody, peer_manager::{NodeIdentity, PeerFeatures}, @@ -120,7 +119,7 @@ use tempdir::TempDir; use tokio::{ runtime, runtime::{Builder, Runtime}, - sync::broadcast::channel, + sync::{broadcast, broadcast::channel}, time::delay_for, }; @@ -202,7 +201,7 @@ pub fn setup_transaction_service_no_comms, transaction_service_event_stream: Fuse, - output_manager_service_event_stream: Fuse>, + output_manager_service_event_stream: Fuse, shutdown_signal: Option, comms_public_key: CommsPublicKey, } @@ -89,7 +91,7 @@ where TBackend: TransactionBackend + 'static pub fn new( db: TransactionDatabase, transaction_service_event_stream: Fuse, - output_manager_service_event_stream: Fuse>, + output_manager_service_event_stream: Fuse, shutdown_signal: ShutdownSignal, comms_public_key: CommsPublicKey, callback_received_transaction: unsafe extern "C" fn(*mut InboundTransaction), @@ -204,17 +206,25 @@ where TBackend: TransactionBackend + 'static Err(e) => error!(target: LOG_TARGET, "Error reading from Transaction Service event broadcast channel"), } }, - msg = self.output_manager_service_event_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Output Manager Service Callback Handler event {:?}", msg); - match (*msg).clone() { - OutputManagerEvent::ReceiveBaseNodeResponse(request_key) => { - self.receive_sync_process_result(request_key, true); + result = self.output_manager_service_event_stream.select_next_some() => { + match result { + Ok(msg) => { + trace!(target: LOG_TARGET, "Output Manager Service Callback Handler event {:?}", msg); + match msg { + OutputManagerEvent::UtxoValidationSuccess(request_key) => { + self.receive_sync_process_result(request_key, true); + }, + OutputManagerEvent::UtxoValidationTimedOut(request_key) => { + self.receive_sync_process_result(request_key, false); + } + OutputManagerEvent::UtxoValidationFailure(request_key) => { + self.receive_sync_process_result(request_key, false); + } + /// Only the above variants are mapped to callbacks + _ => (), + } }, - OutputManagerEvent::BaseNodeSyncRequestTimedOut(request_key) => { - self.receive_sync_process_result(request_key, false); - } - /// Only the above variants are mapped to callbacks - _ => (), + Err(e) => error!(target: LOG_TARGET, "Error reading from Output Manager Service event broadcast channel"), } }, complete => { @@ -378,3 +388,301 @@ where TBackend: TransactionBackend + 'static } } } + +#[cfg(test)] +mod test { + use crate::callback_handler::CallbackHandler; + use chrono::Utc; + use futures::StreamExt; + use rand::rngs::OsRng; + use std::{ + sync::{Arc, Mutex}, + thread, + time::Duration, + }; + use tari_core::transactions::{ + tari_amount::{uT, MicroTari}, + transaction::Transaction, + types::{BlindingFactor, PrivateKey, PublicKey}, + ReceiverTransactionProtocol, + SenderTransactionProtocol, + }; + use tari_crypto::keys::{PublicKey as PublicKeyTrait, SecretKey}; + use tari_shutdown::Shutdown; + use tari_wallet::{ + output_manager_service::handle::OutputManagerEvent, + transaction_service::{ + handle::TransactionEvent, + storage::{ + database::{ + CompletedTransaction, + InboundTransaction, + OutboundTransaction, + TransactionDatabase, + TransactionStatus, + }, + memory_db::TransactionMemoryDatabase, + }, + }, + }; + use tokio::{runtime::Runtime, sync::broadcast}; + + struct CallbackState { + pub received_tx_callback_called: bool, + pub received_tx_reply_callback_called: bool, + pub received_finalized_tx_callback_called: bool, + pub broadcast_tx_callback_called: bool, + pub mined_tx_callback_called: bool, + pub direct_send_callback_called: bool, + pub store_and_forward_send_callback_called: bool, + pub tx_cancellation_callback_called_completed: bool, + pub tx_cancellation_callback_called_inbound: bool, + pub tx_cancellation_callback_called_outbound: bool, + pub base_node_sync_callback_called_true: bool, + pub base_node_sync_callback_called_false: bool, + } + + impl CallbackState { + fn new() -> Self { + Self { + received_tx_callback_called: false, + received_tx_reply_callback_called: false, + received_finalized_tx_callback_called: false, + broadcast_tx_callback_called: false, + mined_tx_callback_called: false, + direct_send_callback_called: false, + store_and_forward_send_callback_called: false, + base_node_sync_callback_called_true: false, + base_node_sync_callback_called_false: false, + tx_cancellation_callback_called_completed: false, + tx_cancellation_callback_called_inbound: false, + tx_cancellation_callback_called_outbound: false, + } + } + } + + lazy_static! { + static ref CALLBACK_STATE: Mutex = { + let c = Mutex::new(CallbackState::new()); + c + }; + } + + unsafe extern "C" fn received_tx_callback(tx: *mut InboundTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.received_tx_callback_called = true; + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn received_tx_reply_callback(tx: *mut CompletedTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.received_tx_reply_callback_called = true; + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn received_tx_finalized_callback(tx: *mut CompletedTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.received_finalized_tx_callback_called = true; + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn broadcast_callback(tx: *mut CompletedTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.broadcast_tx_callback_called = true; + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn mined_callback(tx: *mut CompletedTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.mined_tx_callback_called = true; + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn direct_send_callback(_tx_id: u64, _result: bool) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.direct_send_callback_called = true; + drop(lock); + } + + unsafe extern "C" fn store_and_forward_send_callback(_tx_id: u64, _result: bool) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.store_and_forward_send_callback_called = true; + drop(lock); + } + + unsafe extern "C" fn tx_cancellation_callback(tx: *mut CompletedTransaction) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + match (*tx).tx_id { + 3 => lock.tx_cancellation_callback_called_inbound = true, + 4 => lock.tx_cancellation_callback_called_completed = true, + 5 => lock.tx_cancellation_callback_called_outbound = true, + _ => (), + } + + drop(lock); + Box::from_raw(tx); + } + + unsafe extern "C" fn base_node_sync_process_complete_callback(_tx_id: u64, result: bool) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + + if result { + lock.base_node_sync_callback_called_true = true; + } else { + lock.base_node_sync_callback_called_false = true; + } + drop(lock); + } + + #[test] + fn test_callback_handler() { + let mut runtime = Runtime::new().unwrap(); + + let db = TransactionDatabase::new(TransactionMemoryDatabase::new()); + let rtp = ReceiverTransactionProtocol::new_placeholder(); + let inbound_tx = InboundTransaction::new( + 1u64, + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + 22 * uT, + rtp, + TransactionStatus::Pending, + "1".to_string(), + Utc::now().naive_utc(), + ); + let completed_tx = CompletedTransaction::new( + 2u64, + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + MicroTari::from(100), + MicroTari::from(2000), + Transaction::new(Vec::new(), Vec::new(), Vec::new(), BlindingFactor::default()), + TransactionStatus::Completed, + "2".to_string(), + Utc::now().naive_utc(), + ); + let stp = SenderTransactionProtocol::new_placeholder(); + let outbound_tx = OutboundTransaction::new( + 3u64, + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + 22 * uT, + 23 * uT, + stp, + TransactionStatus::Pending, + "3".to_string(), + Utc::now().naive_utc(), + false, + ); + let inbound_tx_cancelled = InboundTransaction { + tx_id: 4u64, + ..inbound_tx.clone() + }; + let completed_tx_cancelled = CompletedTransaction { + tx_id: 5u64, + ..completed_tx.clone() + }; + + runtime + .block_on(db.add_pending_inbound_transaction(1u64, inbound_tx)) + .unwrap(); + runtime + .block_on(db.insert_completed_transaction(2u64, completed_tx)) + .unwrap(); + runtime + .block_on(db.add_pending_inbound_transaction(4u64, inbound_tx_cancelled)) + .unwrap(); + runtime.block_on(db.cancel_pending_transaction(4u64)).unwrap(); + runtime + .block_on(db.insert_completed_transaction(5u64, completed_tx_cancelled)) + .unwrap(); + runtime.block_on(db.cancel_completed_transaction(5u64)).unwrap(); + runtime + .block_on(db.add_pending_outbound_transaction(3u64, outbound_tx)) + .unwrap(); + runtime.block_on(db.cancel_pending_transaction(3u64)).unwrap(); + + let (tx_sender, tx_receiver) = broadcast::channel(20); + let (oms_sender, oms_receiver) = broadcast::channel(20); + + let shutdown_signal = Shutdown::new(); + let callback_handler = CallbackHandler::new( + db, + tx_receiver.fuse(), + oms_receiver.fuse(), + shutdown_signal.to_signal(), + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + received_tx_callback, + received_tx_reply_callback, + received_tx_finalized_callback, + broadcast_callback, + mined_callback, + direct_send_callback, + store_and_forward_send_callback, + tx_cancellation_callback, + base_node_sync_process_complete_callback, + ); + + runtime.spawn(callback_handler.start()); + + tx_sender + .send(Arc::new(TransactionEvent::ReceivedTransaction(1u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::ReceivedTransactionReply(2u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::ReceivedFinalizedTransaction(2u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionBroadcast(2u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionMined(2u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionDirectSendResult(2u64, true))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionStoreForwardSendResult( + 2u64, true, + ))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionCancelled(3u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionCancelled(4u64))) + .unwrap(); + tx_sender + .send(Arc::new(TransactionEvent::TransactionCancelled(5u64))) + .unwrap(); + + oms_sender + .send(OutputManagerEvent::UtxoValidationSuccess(1u64)) + .unwrap(); + oms_sender + .send(OutputManagerEvent::UtxoValidationTimedOut(1u64)) + .unwrap(); + + thread::sleep(Duration::from_secs(10)); + + let lock = CALLBACK_STATE.lock().unwrap(); + assert!(lock.received_tx_callback_called); + assert!(lock.received_tx_reply_callback_called); + assert!(lock.received_finalized_tx_callback_called); + assert!(lock.broadcast_tx_callback_called); + assert!(lock.mined_tx_callback_called); + assert!(lock.direct_send_callback_called); + assert!(lock.store_and_forward_send_callback_called); + assert!(lock.tx_cancellation_callback_called_inbound); + assert!(lock.tx_cancellation_callback_called_completed); + assert!(lock.tx_cancellation_callback_called_outbound); + assert!(lock.base_node_sync_callback_called_true); + assert!(lock.base_node_sync_callback_called_false); + drop(lock); + } +} diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 2b75a0203b..3766f0db2b 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -154,7 +154,10 @@ use tari_utilities::{hex, hex::Hex, message_format::MessageFormat}; use tari_wallet::{ contacts_service::storage::{database::Contact, sqlite_db::ContactsServiceSqliteDatabase}, error::WalletError, - output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, + output_manager_service::{ + protocols::utxo_validation_protocol::UtxoValidationRetry, + storage::sqlite_db::OutputManagerSqliteDatabase, + }, storage::{ connection_manager::run_migration_and_create_sqlite_connection, database::WalletDatabase, @@ -4138,7 +4141,7 @@ pub unsafe extern "C" fn wallet_sync_with_base_node(wallet: *mut TariWallet, err return 0; } - match (*wallet).sync_with_base_node() { + match (*wallet).validate_utxos(UtxoValidationRetry::Limited(1)) { Ok(request_key) => request_key, Err(e) => { error = LibWalletError::from(e).code;