Skip to content

Commit

Permalink
Add revalidation check to Output Manager service
Browse files Browse the repository at this point in the history
Outputs can become marked as invalid by the Output Manager service when they do not appear in the base nodes blockchain. Generally this means that they were re-orged out or have been spent by another copy of the wallet. However they can be reported as invalid incorrectly in some case like, for example, if the Base Node is not fully synced.

This PR adds in an addition check that the Output Manager does on startup that will see if any of the invalid outputs has become valid again. If it has that output will changed back into a spendable output.
  • Loading branch information
philipr-za committed Jun 5, 2020
1 parent 5294690 commit 188b656
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 114 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::time::Duration;
#[derive(Clone)]
pub struct OutputManagerServiceConfig {
pub base_node_query_timeout: Duration,
pub max_utxo_query_size: usize,
}

impl Default for OutputManagerServiceConfig {
fn default() -> Self {
Self {
base_node_query_timeout: Duration::from_secs(30),
max_utxo_query_size: 5000,
}
}
}
206 changes: 126 additions & 80 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, SinkExt, Stream, StreamExt};
use log::*;
use rand::{rngs::OsRng, RngCore};
use std::{cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use std::{cmp, cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use tari_broadcast_channel::Publisher;
use tari_comms::types::CommsPublicKey;
use tari_comms_dht::{
Expand Down Expand Up @@ -195,7 +195,7 @@ where
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}, Trace: {}", origin_public_key, resp, msg.dht_header.message_tag);
Err(resp)
});

Expand Down Expand Up @@ -290,7 +290,7 @@ where
.await
.map(|_| OutputManagerResponse::BaseNodePublicKeySet),
OutputManagerRequest::SyncWithBaseNode => self
.query_unspent_outputs_status(utxo_query_timeout_futures)
.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await
.map(OutputManagerResponse::StartedBaseNodeSync),
OutputManagerRequest::GetInvalidOutputs => {
Expand All @@ -310,7 +310,7 @@ where
}

/// Handle an incoming basenode response message
pub async fn handle_base_node_response(
async fn handle_base_node_response(
&mut self,
response: BaseNodeProto::BaseNodeServiceResponse,
) -> Result<(), OutputManagerError>
Expand Down Expand Up @@ -424,11 +424,11 @@ where
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
) -> Result<(), OutputManagerError>
{
if self.pending_utxo_query_keys.remove(&query_key).is_some() {
error!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Finished queueing new Base Node query timeout");
if let Some(hashes) = self.pending_utxo_query_keys.remove(&query_key) {
warn!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures, Some(hashes))
.await?;

let _ = self
.event_publisher
.send(OutputManagerEvent::BaseNodeSyncRequestTimedOut(query_key))
Expand All @@ -447,67 +447,140 @@ where

/// Send queries to the base node to check the status of all unspent outputs. If the outputs are no longer
/// available their status will be updated in the wallet.
pub async fn query_unspent_outputs_status(
async fn query_unspent_outputs_status(
&mut self,
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
specified_outputs: Option<Vec<Vec<u8>>>,
) -> Result<u64, OutputManagerError>
{
match self.base_node_public_key.as_ref() {
None => Err(OutputManagerError::NoBaseNodeKeysProvided),
Some(pk) => {
let unspent_outputs: Vec<DbUnblindedOutput> = self.db.get_unspent_outputs().await?;
let mut output_hashes = Vec::new();
for uo in unspent_outputs.iter() {
let hash = uo.hash.clone();
output_hashes.push(hash.clone());
}
let request_key = OsRng.next_u64();
let mut first_request_key = 0;
let mut unspent_outputs: Vec<Vec<u8>> = if let Some(hashes) = specified_outputs {
hashes
} else {
self.db
.get_unspent_outputs()
.await?
.iter()
.map(|uo| uo.hash.clone())
.collect()
};

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});
// Determine how many rounds of base node request we need to query all the outputs in batches of
// max_utxo_query_size
let rounds =
((unspent_outputs.len() as f32) / (self.config.max_utxo_query_size as f32)).ceil() as usize;

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "About to attempt to send query to base node");
self.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Query sent to Base Node");
self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node", request_key
);
Ok(request_key)
for r in 0..rounds {
let mut output_hashes = Vec::new();
for uo_hash in
unspent_outputs.drain(..cmp::min(self.config.max_utxo_query_size, unspent_outputs.len()))
{
output_hashes.push(uo_hash);
}
let request_key = OsRng.next_u64();
if first_request_key == 0 {
first_request_key = request_key;
}

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};

let send_message_response = self
.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;

// Here we are going to spawn a non-blocking task that will monitor and log the progress of the
// send process.
tokio::spawn(async move {
match send_message_response.resolve_ok().await {
None => trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
),
Some(send_states) => {
if send_states.len() == 1 {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) queued for sending with Message {}",
request_key,
send_states[0].tag,
);
let message_tag = send_states[0].tag;
if send_states.wait_single().await {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) successfully sent to Base Node with \
Message {}",
request_key,
message_tag,
)
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node with \
Message {}",
request_key,
message_tag,
);
}
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
)
}
},
}
});

self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node, part {} of {} requests",
request_key,
r + 1,
rounds
);
}
// We are just going to return the first request key for use by the front end. It is very unlikely that
// a mobile wallet will ever have this query split up
Ok(first_request_key)
},
}
}

/// Add an unblinded output to the unspent outputs list
pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
let output = DbUnblindedOutput::from_unblinded_output(output, &self.factories)?;
Ok(self.db.add_unspent_output(output).await?)
}

pub async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
let balance = self.db.get_balance().await?;
trace!(target: LOG_TARGET, "Balance: {:?}", balance);
Ok(balance)
}

/// Request a spending key to be used to accept a transaction from a sender.
pub async fn get_recipient_spending_key(
async fn get_recipient_spending_key(
&mut self,
tx_id: TxId,
amount: MicroTari,
Expand All @@ -528,37 +601,9 @@ where
Ok(key)
}

/// Confirm the reception of an expected transaction output. This will be called by the Transaction Service when it
/// detects the output on the blockchain
pub async fn confirm_received_transaction_output(
&mut self,
tx_id: u64,
received_output: &TransactionOutput,
) -> Result<(), OutputManagerError>
{
let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id).await?;

// Assumption: We are only allowing a single output per receiver in the current transaction protocols.
if pending_transaction.outputs_to_be_received.len() != 1 ||
pending_transaction.outputs_to_be_received[0]
.unblinded_output
.as_transaction_input(&self.factories.commitment, OutputFeatures::default())
.commitment !=
received_output.commitment
{
return Err(OutputManagerError::IncompleteTransaction);
}

self.db
.confirm_pending_transaction_outputs(pending_transaction.tx_id)
.await?;

Ok(())
}

/// Prepare a Sender Transaction Protocol for the amount and fee_per_gram specified. If required a change output
/// will be produced.
pub async fn prepare_transaction_to_send(
async fn prepare_transaction_to_send(
&mut self,
amount: MicroTari,
fee_per_gram: MicroTari,
Expand Down Expand Up @@ -634,7 +679,7 @@ where

/// Confirm that a transaction has finished being negotiated between parties so the short-term encumberance can be
/// made official
pub async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
self.db.confirm_encumbered_outputs(tx_id).await?;

Ok(())
Expand All @@ -643,7 +688,7 @@ where
/// Confirm that a received or sent transaction and its outputs have been detected on the base chain. The inputs and
/// outputs are checked to see that they match what the stored PendingTransaction contains. This will
/// be called by the Transaction Service which monitors the base chain.
pub async fn confirm_transaction(
async fn confirm_transaction(
&mut self,
tx_id: u64,
inputs: &[TransactionInput],
Expand Down Expand Up @@ -698,7 +743,7 @@ where
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
pub async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.db.timeout_pending_transaction_outputs(period).await?)
}

Expand Down Expand Up @@ -793,7 +838,8 @@ where
self.base_node_public_key = Some(base_node_public_key);

if startup_query {
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
self.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await?;
}
Ok(())
}
Expand All @@ -816,7 +862,7 @@ where
Ok(self.db.get_invalid_outputs().await?)
}

pub async fn create_coin_split(
async fn create_coin_split(
&mut self,
amount_per_split: MicroTari,
split_count: usize,
Expand Down
Loading

0 comments on commit 188b656

Please sign in to comment.