Skip to content

Commit

Permalink
Refactor UXTO Validation in Output Manager into async protocol (#2006)
Browse files Browse the repository at this point in the history
Merge pull request #2006

Refactor UXTO Validation in Output Manager into async protocol
CjS77 committed Jun 23, 2020

Verified

This commit was signed with the committer’s verified signature. The key has expired.
CjS77 Cayle Sharrock
2 parents 4cc98f2 + e9c1573 commit f9966e9
Showing 23 changed files with 1,299 additions and 647 deletions.
15 changes: 11 additions & 4 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@ use tari_wallet::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
protocols::utxo_validation_protocol::UtxoValidationRetry,
storage::sqlite_db::OutputManagerSqliteDatabase,
OutputManagerServiceInitializer,
},
@@ -225,7 +226,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
let _ = oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.validate_utxos(UtxoValidationRetry::UntilSuccess).await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
@@ -559,12 +560,18 @@ where
.set_base_node_public_key(base_node_public_key.clone())
.await
.expect("Problem setting local base node public key for transaction service.");
wallet_handles
let mut oms_handle = wallet_handles
.get_handle::<OutputManagerHandle>()
.expect("OutputManagerService is not registered")
.expect("OutputManagerService is not registered");
oms_handle
.set_base_node_public_key(base_node_public_key)
.await
.expect("Problem setting local base node public key for output manager service.");
// Start the Output Manager UTXO Validation
oms_handle
.validate_utxos(UtxoValidationRetry::UntilSuccess)
.await
.expect("Problem starting the Output Manager Service Utxo Valdation process");

//---------------------------------- Base Node State Machine --------------------------------------------//
let outbound_interface = base_node_handles
@@ -1158,7 +1165,7 @@ async fn register_wallet_services(
))
// Wallet services
.add_initializer(OutputManagerServiceInitializer::new(
OutputManagerServiceConfig::default(),
OutputManagerServiceConfig{ base_node_query_timeout: Duration::from_secs(120), ..Default::default() },
subscription_factory.clone(),
OutputManagerSqliteDatabase::new(wallet_db_conn.clone()),
factories.clone(),
1 change: 0 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ test_harness = ["tari_test_utils"]
c_integration = []

[dependencies]
tari_broadcast_channel = "^0.2"
tari_comms = { path = "../../comms", version = "^0.1"}
tari_comms_dht = { path = "../../comms/dht", version = "^0.1"}
tari_crypto = { version = "^0.3" }
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
@@ -66,6 +66,11 @@ pub enum OutputManagerError {
NoBaseNodeKeysProvided,
/// An error occured sending an event out on the event stream
EventStreamError,
/// Maximum Attempts Exceeded
MaximumAttemptsExceeded,
/// An error has been experienced in the service
#[error(msg_embedded, non_std, no_from)]
ServiceError(String),
}

#[derive(Debug, Error, PartialEq)]
@@ -88,6 +93,7 @@ pub enum OutputManagerStorageError {
OutputAlreadySpent,
/// Key Manager not initialized
KeyManagerNotInitialized,

OutOfRangeError(OutOfRangeError),
R2d2Error,
TransactionError(TransactionError),
@@ -98,3 +104,23 @@ pub enum OutputManagerStorageError {
#[error(msg_embedded, non_std, no_from)]
BlockingTaskSpawnError(String),
}

/// This error type is used to return OutputManagerError from inside a Output Manager Service protocol but also
/// include the ID of the protocol
#[derive(Debug)]
pub struct OutputManagerProtocolError {
pub id: u64,
pub error: OutputManagerError,
}

impl OutputManagerProtocolError {
pub fn new(id: u64, error: OutputManagerError) -> Self {
Self { id, error }
}
}

impl From<OutputManagerProtocolError> for OutputManagerError {
fn from(tspe: OutputManagerProtocolError) -> Self {
tspe.error
}
}
36 changes: 22 additions & 14 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
@@ -22,12 +22,12 @@

use crate::output_manager_service::{
error::OutputManagerError,
protocols::utxo_validation_protocol::UtxoValidationRetry,
service::Balance,
storage::database::PendingTransactionOutputs,
};
use futures::{stream::Fuse, StreamExt};
use std::{collections::HashMap, fmt, time::Duration};
use tari_broadcast_channel::Subscriber;
use tari_comms::types::CommsPublicKey;
use tari_core::transactions::{
tari_amount::MicroTari,
@@ -36,6 +36,7 @@ use tari_core::transactions::{
SenderTransactionProtocol,
};
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;

/// API Request enum
@@ -55,7 +56,7 @@ pub enum OutputManagerRequest {
GetInvalidOutputs,
GetSeedWords,
SetBaseNodePublicKey(CommsPublicKey),
SyncWithBaseNode,
ValidateUtxos(UtxoValidationRetry),
CreateCoinSplit((MicroTari, usize, MicroTari, Option<u64>)),
}

@@ -78,7 +79,7 @@ impl fmt::Display for OutputManagerRequest {
Self::GetInvalidOutputs => f.write_str("GetInvalidOutputs"),
Self::GetSeedWords => f.write_str("GetSeedWords"),
Self::SetBaseNodePublicKey(k) => f.write_str(&format!("SetBaseNodePublicKey ({})", k)),
Self::SyncWithBaseNode => f.write_str("SyncWithBaseNode"),
Self::ValidateUtxos(retry) => f.write_str(&format!("ValidateUtxos ({:?})", retry)),
Self::CreateCoinSplit(v) => f.write_str(&format!("CreateCoinSplit ({})", v.0)),
}
}
@@ -101,35 +102,42 @@ pub enum OutputManagerResponse {
InvalidOutputs(Vec<UnblindedOutput>),
SeedWords(Vec<String>),
BaseNodePublicKeySet,
StartedBaseNodeSync(u64),
UtxoValidationStarted(u64),
Transaction((u64, Transaction, MicroTari, MicroTari)),
}

pub type OutputManagerEventSender = broadcast::Sender<OutputManagerEvent>;
pub type OutputManagerEventReceiver = broadcast::Receiver<OutputManagerEvent>;

/// Events that can be published on the Text Message Service Event Stream
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum OutputManagerEvent {
BaseNodeSyncRequestTimedOut(u64),
ReceiveBaseNodeResponse(u64),
UtxoValidationTimedOut(u64),
UtxoValidationSuccess(u64),
UtxoValidationFailure(u64),
Error(String),
}

#[derive(Clone)]
pub struct OutputManagerHandle {
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
}

impl OutputManagerHandle {
pub fn new(
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
) -> Self
{
OutputManagerHandle { handle, event_stream }
OutputManagerHandle {
handle,
event_stream_sender,
}
}

pub fn get_event_stream_fused(&self) -> Fuse<Subscriber<OutputManagerEvent>> {
self.event_stream.clone().fuse()
pub fn get_event_stream_fused(&self) -> Fuse<OutputManagerEventReceiver> {
self.event_stream_sender.subscribe().fuse()
}

pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
@@ -287,9 +295,9 @@ impl OutputManagerHandle {
}
}

pub async fn sync_with_base_node(&mut self) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::SyncWithBaseNode).await?? {
OutputManagerResponse::StartedBaseNodeSync(request_key) => Ok(request_key),
pub async fn validate_utxos(&mut self, retries: UtxoValidationRetry) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::ValidateUtxos(retries)).await?? {
OutputManagerResponse::UtxoValidationStarted(request_key) => Ok(request_key),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
16 changes: 8 additions & 8 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
@@ -20,19 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::output_manager_service::{handle::OutputManagerHandle, service::OutputManagerService};

use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
service::OutputManagerService,
storage::database::{OutputManagerBackend, OutputManagerDatabase},
},
transaction_service::handle::TransactionServiceHandle,
};
use futures::{future, Future, Stream, StreamExt};
use log::*;
use std::sync::Arc;
use tari_broadcast_channel::bounded;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_core::{base_node::proto::base_node as BaseNodeProto, transactions::types::CryptoFactories};
use tari_p2p::{
@@ -48,11 +47,12 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime;
use tokio::{runtime, sync::broadcast};

pub mod config;
pub mod error;
pub mod handle;
pub mod protocols;
#[allow(unused_assignments)]
pub mod service;
pub mod storage;
@@ -72,7 +72,7 @@ where T: OutputManagerBackend
}

impl<T> OutputManagerServiceInitializer<T>
where T: OutputManagerBackend
where T: OutputManagerBackend + Clone + 'static
{
pub fn new(
config: OutputManagerServiceConfig,
@@ -98,7 +98,7 @@ where T: OutputManagerBackend
}

impl<T> ServiceInitializer for OutputManagerServiceInitializer<T>
where T: OutputManagerBackend + 'static
where T: OutputManagerBackend + Clone + 'static
{
type Future = impl Future<Output = Result<(), ServiceInitializationError>>;

@@ -112,9 +112,9 @@ where T: OutputManagerBackend + 'static
let base_node_response_stream = self.base_node_response_stream();

let (sender, receiver) = reply_channel::unbounded();
let (publisher, subscriber) = bounded(100, 7);
let (publisher, _) = broadcast::channel(200);

let oms_handle = OutputManagerHandle::new(sender, subscriber);
let oms_handle = OutputManagerHandle::new(sender, publisher.clone());

// Register handle before waiting for handles to be ready
handles_fut.register(oms_handle);
23 changes: 23 additions & 0 deletions base_layer/wallet/src/output_manager_service/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod utxo_validation_protocol;
Loading

0 comments on commit f9966e9

Please sign in to comment.