Skip to content

Commit

Permalink
Refactor UXTO Validation in Output Manager into async protocol
Browse files Browse the repository at this point in the history
UTXO validation in the Output Manager currently has a few elements that are synchronous relative to the main Select! Loop in the service. This meant that when there were a large number of outputs in the wallet that this could bog down the entire service.

This PR refactors the UTXO validation process into a fully asynchronous and more modular Protocol pattern that runs in its own task. The API to the process is also updated to allow clients to select the appropriate Retry strategy. The automatic Utxo Validation of Unspent outputs was removed and must now be done explicitly but the client so they have more control over when it happens. The Automatic Invalid UTXO validation is still in place but instead of repeating until success it is reduced to 5 attempts as it is not critical.

It was also found that there was one synchronous Mutex being used in the service outside of a `spawn_blocking(…)` context which could cause problems with the Tokio runtime so that has been updated to use an asynchronous Mutex.

The Output Manager was also updated to use to the Tokio Broadcast channel and the tari_broadcast_channel dependency has been removed from the Wallet crate.

A test has been added to fully test the wallet_ffi callback handler which ingests the new events the OMS is emitting.

A bunch of Clippy fixes
  • Loading branch information
philipr-za committed Jun 19, 2020
1 parent 0bed24e commit 83be1fc
Show file tree
Hide file tree
Showing 23 changed files with 1,299 additions and 647 deletions.
15 changes: 11 additions & 4 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use tari_wallet::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
protocols::utxo_validation_protocol::UtxoValidationRetry,
storage::sqlite_db::OutputManagerSqliteDatabase,
OutputManagerServiceInitializer,
},
Expand Down Expand Up @@ -225,7 +226,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
let _ = oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.validate_utxos(UtxoValidationRetry::UntilSuccess).await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
Expand Down Expand Up @@ -559,12 +560,18 @@ where
.set_base_node_public_key(base_node_public_key.clone())
.await
.expect("Problem setting local base node public key for transaction service.");
wallet_handles
let mut oms_handle = wallet_handles
.get_handle::<OutputManagerHandle>()
.expect("OutputManagerService is not registered")
.expect("OutputManagerService is not registered");
oms_handle
.set_base_node_public_key(base_node_public_key)
.await
.expect("Problem setting local base node public key for output manager service.");
// Start the Output Manager UTXO Validation
oms_handle
.validate_utxos(UtxoValidationRetry::UntilSuccess)
.await
.expect("Problem starting the Output Manager Service Utxo Valdation process");

//---------------------------------- Base Node State Machine --------------------------------------------//
let outbound_interface = base_node_handles
Expand Down Expand Up @@ -1158,7 +1165,7 @@ async fn register_wallet_services(
))
// Wallet services
.add_initializer(OutputManagerServiceInitializer::new(
OutputManagerServiceConfig::default(),
OutputManagerServiceConfig{ base_node_query_timeout: Duration::from_secs(120), ..Default::default() },
subscription_factory.clone(),
OutputManagerSqliteDatabase::new(wallet_db_conn.clone()),
factories.clone(),
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ test_harness = ["tari_test_utils"]
c_integration = []

[dependencies]
tari_broadcast_channel = "^0.2"
tari_comms = { path = "../../comms", version = "^0.1"}
tari_comms_dht = { path = "../../comms/dht", version = "^0.1"}
tari_crypto = { version = "^0.3" }
Expand Down
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ pub enum OutputManagerError {
NoBaseNodeKeysProvided,
/// An error occured sending an event out on the event stream
EventStreamError,
/// Maximum Attempts Exceeded
MaximumAttemptsExceeded,
/// An error has been experienced in the service
#[error(msg_embedded, non_std, no_from)]
ServiceError(String),
}

#[derive(Debug, Error, PartialEq)]
Expand All @@ -88,6 +93,7 @@ pub enum OutputManagerStorageError {
OutputAlreadySpent,
/// Key Manager not initialized
KeyManagerNotInitialized,

OutOfRangeError(OutOfRangeError),
R2d2Error,
TransactionError(TransactionError),
Expand All @@ -98,3 +104,23 @@ pub enum OutputManagerStorageError {
#[error(msg_embedded, non_std, no_from)]
BlockingTaskSpawnError(String),
}

/// This error type is used to return OutputManagerError from inside a Output Manager Service protocol but also
/// include the ID of the protocol
#[derive(Debug)]
pub struct OutputManagerProtocolError {
pub id: u64,
pub error: OutputManagerError,
}

impl OutputManagerProtocolError {
pub fn new(id: u64, error: OutputManagerError) -> Self {
Self { id, error }
}
}

impl From<OutputManagerProtocolError> for OutputManagerError {
fn from(tspe: OutputManagerProtocolError) -> Self {
tspe.error
}
}
36 changes: 22 additions & 14 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

use crate::output_manager_service::{
error::OutputManagerError,
protocols::utxo_validation_protocol::UtxoValidationRetry,
service::Balance,
storage::database::PendingTransactionOutputs,
};
use futures::{stream::Fuse, StreamExt};
use std::{collections::HashMap, fmt, time::Duration};
use tari_broadcast_channel::Subscriber;
use tari_comms::types::CommsPublicKey;
use tari_core::transactions::{
tari_amount::MicroTari,
Expand All @@ -36,6 +36,7 @@ use tari_core::transactions::{
SenderTransactionProtocol,
};
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;

/// API Request enum
Expand All @@ -55,7 +56,7 @@ pub enum OutputManagerRequest {
GetInvalidOutputs,
GetSeedWords,
SetBaseNodePublicKey(CommsPublicKey),
SyncWithBaseNode,
ValidateUtxos(UtxoValidationRetry),
CreateCoinSplit((MicroTari, usize, MicroTari, Option<u64>)),
}

Expand All @@ -78,7 +79,7 @@ impl fmt::Display for OutputManagerRequest {
Self::GetInvalidOutputs => f.write_str("GetInvalidOutputs"),
Self::GetSeedWords => f.write_str("GetSeedWords"),
Self::SetBaseNodePublicKey(k) => f.write_str(&format!("SetBaseNodePublicKey ({})", k)),
Self::SyncWithBaseNode => f.write_str("SyncWithBaseNode"),
Self::ValidateUtxos(retry) => f.write_str(&format!("ValidateUtxos ({:?})", retry)),
Self::CreateCoinSplit(v) => f.write_str(&format!("CreateCoinSplit ({})", v.0)),
}
}
Expand All @@ -101,35 +102,42 @@ pub enum OutputManagerResponse {
InvalidOutputs(Vec<UnblindedOutput>),
SeedWords(Vec<String>),
BaseNodePublicKeySet,
StartedBaseNodeSync(u64),
UtxoValidationStarted(u64),
Transaction((u64, Transaction, MicroTari, MicroTari)),
}

pub type OutputManagerEventSender = broadcast::Sender<OutputManagerEvent>;
pub type OutputManagerEventReceiver = broadcast::Receiver<OutputManagerEvent>;

/// Events that can be published on the Text Message Service Event Stream
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum OutputManagerEvent {
BaseNodeSyncRequestTimedOut(u64),
ReceiveBaseNodeResponse(u64),
UtxoValidationTimedOut(u64),
UtxoValidationSuccess(u64),
UtxoValidationFailure(u64),
Error(String),
}

#[derive(Clone)]
pub struct OutputManagerHandle {
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
}

impl OutputManagerHandle {
pub fn new(
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
) -> Self
{
OutputManagerHandle { handle, event_stream }
OutputManagerHandle {
handle,
event_stream_sender,
}
}

pub fn get_event_stream_fused(&self) -> Fuse<Subscriber<OutputManagerEvent>> {
self.event_stream.clone().fuse()
pub fn get_event_stream_fused(&self) -> Fuse<OutputManagerEventReceiver> {
self.event_stream_sender.subscribe().fuse()
}

pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
Expand Down Expand Up @@ -287,9 +295,9 @@ impl OutputManagerHandle {
}
}

pub async fn sync_with_base_node(&mut self) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::SyncWithBaseNode).await?? {
OutputManagerResponse::StartedBaseNodeSync(request_key) => Ok(request_key),
pub async fn validate_utxos(&mut self, retries: UtxoValidationRetry) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::ValidateUtxos(retries)).await?? {
OutputManagerResponse::UtxoValidationStarted(request_key) => Ok(request_key),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
Expand Down
16 changes: 8 additions & 8 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::output_manager_service::{handle::OutputManagerHandle, service::OutputManagerService};

use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
service::OutputManagerService,
storage::database::{OutputManagerBackend, OutputManagerDatabase},
},
transaction_service::handle::TransactionServiceHandle,
};
use futures::{future, Future, Stream, StreamExt};
use log::*;
use std::sync::Arc;
use tari_broadcast_channel::bounded;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_core::{base_node::proto::base_node as BaseNodeProto, transactions::types::CryptoFactories};
use tari_p2p::{
Expand All @@ -48,11 +47,12 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime;
use tokio::{runtime, sync::broadcast};

pub mod config;
pub mod error;
pub mod handle;
pub mod protocols;
#[allow(unused_assignments)]
pub mod service;
pub mod storage;
Expand All @@ -72,7 +72,7 @@ where T: OutputManagerBackend
}

impl<T> OutputManagerServiceInitializer<T>
where T: OutputManagerBackend
where T: OutputManagerBackend + Clone + 'static
{
pub fn new(
config: OutputManagerServiceConfig,
Expand All @@ -98,7 +98,7 @@ where T: OutputManagerBackend
}

impl<T> ServiceInitializer for OutputManagerServiceInitializer<T>
where T: OutputManagerBackend + 'static
where T: OutputManagerBackend + Clone + 'static
{
type Future = impl Future<Output = Result<(), ServiceInitializationError>>;

Expand All @@ -112,9 +112,9 @@ where T: OutputManagerBackend + 'static
let base_node_response_stream = self.base_node_response_stream();

let (sender, receiver) = reply_channel::unbounded();
let (publisher, subscriber) = bounded(100, 7);
let (publisher, _) = broadcast::channel(200);

let oms_handle = OutputManagerHandle::new(sender, subscriber);
let oms_handle = OutputManagerHandle::new(sender, publisher.clone());

// Register handle before waiting for handles to be ready
handles_fut.register(oms_handle);
Expand Down
23 changes: 23 additions & 0 deletions base_layer/wallet/src/output_manager_service/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod utxo_validation_protocol;
Loading

0 comments on commit 83be1fc

Please sign in to comment.