Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor UXTO Validation in Output Manager into async protocol #2006

Merged
merged 1 commit into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use tari_wallet::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
protocols::utxo_validation_protocol::UtxoValidationRetry,
storage::sqlite_db::OutputManagerSqliteDatabase,
OutputManagerServiceInitializer,
},
Expand Down Expand Up @@ -225,7 +226,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
let _ = oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.validate_utxos(UtxoValidationRetry::UntilSuccess).await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
Expand Down Expand Up @@ -559,12 +560,18 @@ where
.set_base_node_public_key(base_node_public_key.clone())
.await
.expect("Problem setting local base node public key for transaction service.");
wallet_handles
let mut oms_handle = wallet_handles
.get_handle::<OutputManagerHandle>()
.expect("OutputManagerService is not registered")
.expect("OutputManagerService is not registered");
oms_handle
.set_base_node_public_key(base_node_public_key)
.await
.expect("Problem setting local base node public key for output manager service.");
// Start the Output Manager UTXO Validation
oms_handle
.validate_utxos(UtxoValidationRetry::UntilSuccess)
.await
.expect("Problem starting the Output Manager Service Utxo Valdation process");

//---------------------------------- Base Node State Machine --------------------------------------------//
let outbound_interface = base_node_handles
Expand Down Expand Up @@ -1158,7 +1165,7 @@ async fn register_wallet_services(
))
// Wallet services
.add_initializer(OutputManagerServiceInitializer::new(
OutputManagerServiceConfig::default(),
OutputManagerServiceConfig{ base_node_query_timeout: Duration::from_secs(120), ..Default::default() },
subscription_factory.clone(),
OutputManagerSqliteDatabase::new(wallet_db_conn.clone()),
factories.clone(),
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ test_harness = ["tari_test_utils"]
c_integration = []

[dependencies]
tari_broadcast_channel = "^0.2"
tari_comms = { path = "../../comms", version = "^0.1"}
tari_comms_dht = { path = "../../comms/dht", version = "^0.1"}
tari_crypto = { version = "^0.3" }
Expand Down
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ pub enum OutputManagerError {
NoBaseNodeKeysProvided,
/// An error occured sending an event out on the event stream
EventStreamError,
/// Maximum Attempts Exceeded
MaximumAttemptsExceeded,
/// An error has been experienced in the service
#[error(msg_embedded, non_std, no_from)]
ServiceError(String),
}

#[derive(Debug, Error, PartialEq)]
Expand All @@ -88,6 +93,7 @@ pub enum OutputManagerStorageError {
OutputAlreadySpent,
/// Key Manager not initialized
KeyManagerNotInitialized,

OutOfRangeError(OutOfRangeError),
R2d2Error,
TransactionError(TransactionError),
Expand All @@ -98,3 +104,23 @@ pub enum OutputManagerStorageError {
#[error(msg_embedded, non_std, no_from)]
BlockingTaskSpawnError(String),
}

/// This error type is used to return OutputManagerError from inside a Output Manager Service protocol but also
/// include the ID of the protocol
#[derive(Debug)]
pub struct OutputManagerProtocolError {
pub id: u64,
pub error: OutputManagerError,
}

impl OutputManagerProtocolError {
pub fn new(id: u64, error: OutputManagerError) -> Self {
Self { id, error }
}
}

impl From<OutputManagerProtocolError> for OutputManagerError {
fn from(tspe: OutputManagerProtocolError) -> Self {
tspe.error
}
}
36 changes: 22 additions & 14 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

use crate::output_manager_service::{
error::OutputManagerError,
protocols::utxo_validation_protocol::UtxoValidationRetry,
service::Balance,
storage::database::PendingTransactionOutputs,
};
use futures::{stream::Fuse, StreamExt};
use std::{collections::HashMap, fmt, time::Duration};
use tari_broadcast_channel::Subscriber;
use tari_comms::types::CommsPublicKey;
use tari_core::transactions::{
tari_amount::MicroTari,
Expand All @@ -36,6 +36,7 @@ use tari_core::transactions::{
SenderTransactionProtocol,
};
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;

/// API Request enum
Expand All @@ -55,7 +56,7 @@ pub enum OutputManagerRequest {
GetInvalidOutputs,
GetSeedWords,
SetBaseNodePublicKey(CommsPublicKey),
SyncWithBaseNode,
ValidateUtxos(UtxoValidationRetry),
CreateCoinSplit((MicroTari, usize, MicroTari, Option<u64>)),
}

Expand All @@ -78,7 +79,7 @@ impl fmt::Display for OutputManagerRequest {
Self::GetInvalidOutputs => f.write_str("GetInvalidOutputs"),
Self::GetSeedWords => f.write_str("GetSeedWords"),
Self::SetBaseNodePublicKey(k) => f.write_str(&format!("SetBaseNodePublicKey ({})", k)),
Self::SyncWithBaseNode => f.write_str("SyncWithBaseNode"),
Self::ValidateUtxos(retry) => f.write_str(&format!("ValidateUtxos ({:?})", retry)),
Self::CreateCoinSplit(v) => f.write_str(&format!("CreateCoinSplit ({})", v.0)),
}
}
Expand All @@ -101,35 +102,42 @@ pub enum OutputManagerResponse {
InvalidOutputs(Vec<UnblindedOutput>),
SeedWords(Vec<String>),
BaseNodePublicKeySet,
StartedBaseNodeSync(u64),
UtxoValidationStarted(u64),
Transaction((u64, Transaction, MicroTari, MicroTari)),
}

pub type OutputManagerEventSender = broadcast::Sender<OutputManagerEvent>;
pub type OutputManagerEventReceiver = broadcast::Receiver<OutputManagerEvent>;

/// Events that can be published on the Text Message Service Event Stream
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum OutputManagerEvent {
BaseNodeSyncRequestTimedOut(u64),
ReceiveBaseNodeResponse(u64),
UtxoValidationTimedOut(u64),
UtxoValidationSuccess(u64),
UtxoValidationFailure(u64),
Error(String),
}

#[derive(Clone)]
pub struct OutputManagerHandle {
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
}

impl OutputManagerHandle {
pub fn new(
handle: SenderService<OutputManagerRequest, Result<OutputManagerResponse, OutputManagerError>>,
event_stream: Subscriber<OutputManagerEvent>,
event_stream_sender: OutputManagerEventSender,
) -> Self
{
OutputManagerHandle { handle, event_stream }
OutputManagerHandle {
handle,
event_stream_sender,
}
}

pub fn get_event_stream_fused(&self) -> Fuse<Subscriber<OutputManagerEvent>> {
self.event_stream.clone().fuse()
pub fn get_event_stream_fused(&self) -> Fuse<OutputManagerEventReceiver> {
self.event_stream_sender.subscribe().fuse()
}

pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
Expand Down Expand Up @@ -287,9 +295,9 @@ impl OutputManagerHandle {
}
}

pub async fn sync_with_base_node(&mut self) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::SyncWithBaseNode).await?? {
OutputManagerResponse::StartedBaseNodeSync(request_key) => Ok(request_key),
pub async fn validate_utxos(&mut self, retries: UtxoValidationRetry) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::ValidateUtxos(retries)).await?? {
OutputManagerResponse::UtxoValidationStarted(request_key) => Ok(request_key),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
Expand Down
16 changes: 8 additions & 8 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::output_manager_service::{handle::OutputManagerHandle, service::OutputManagerService};

use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerHandle,
service::OutputManagerService,
storage::database::{OutputManagerBackend, OutputManagerDatabase},
},
transaction_service::handle::TransactionServiceHandle,
};
use futures::{future, Future, Stream, StreamExt};
use log::*;
use std::sync::Arc;
use tari_broadcast_channel::bounded;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_core::{base_node::proto::base_node as BaseNodeProto, transactions::types::CryptoFactories};
use tari_p2p::{
Expand All @@ -48,11 +47,12 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime;
use tokio::{runtime, sync::broadcast};

pub mod config;
pub mod error;
pub mod handle;
pub mod protocols;
#[allow(unused_assignments)]
pub mod service;
pub mod storage;
Expand All @@ -72,7 +72,7 @@ where T: OutputManagerBackend
}

impl<T> OutputManagerServiceInitializer<T>
where T: OutputManagerBackend
where T: OutputManagerBackend + Clone + 'static
{
pub fn new(
config: OutputManagerServiceConfig,
Expand All @@ -98,7 +98,7 @@ where T: OutputManagerBackend
}

impl<T> ServiceInitializer for OutputManagerServiceInitializer<T>
where T: OutputManagerBackend + 'static
where T: OutputManagerBackend + Clone + 'static
{
type Future = impl Future<Output = Result<(), ServiceInitializationError>>;

Expand All @@ -112,9 +112,9 @@ where T: OutputManagerBackend + 'static
let base_node_response_stream = self.base_node_response_stream();

let (sender, receiver) = reply_channel::unbounded();
let (publisher, subscriber) = bounded(100, 7);
let (publisher, _) = broadcast::channel(200);

let oms_handle = OutputManagerHandle::new(sender, subscriber);
let oms_handle = OutputManagerHandle::new(sender, publisher.clone());

// Register handle before waiting for handles to be ready
handles_fut.register(oms_handle);
Expand Down
23 changes: 23 additions & 0 deletions base_layer/wallet/src/output_manager_service/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod utxo_validation_protocol;
Loading