Skip to content

Commit

Permalink
Update Output Manager UTXO query to split into multiple queries (#1949)
Browse files Browse the repository at this point in the history
Merge pull request #1949

Update Output Manager UTXO query to split into multiple queries

* pull/1949/head:
  Update Output Manager UTXO query to split into multiple queries
  • Loading branch information
sdbondi committed Jun 5, 2020
2 parents 5294690 + 7ca23d6 commit 7959736
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 84 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::time::Duration;
#[derive(Clone)]
pub struct OutputManagerServiceConfig {
pub base_node_query_timeout: Duration,
pub max_utxo_query_size: usize,
}

impl Default for OutputManagerServiceConfig {
fn default() -> Self {
Self {
base_node_query_timeout: Duration::from_secs(30),
max_utxo_query_size: 5000,
}
}
}
174 changes: 124 additions & 50 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, SinkExt, Stream, StreamExt};
use log::*;
use rand::{rngs::OsRng, RngCore};
use std::{cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use std::{cmp, cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use tari_broadcast_channel::Publisher;
use tari_comms::types::CommsPublicKey;
use tari_comms_dht::{
Expand Down Expand Up @@ -195,7 +195,7 @@ where
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}, Trace: {}", origin_public_key, resp, msg.dht_header.message_tag);
Err(resp)
});

Expand Down Expand Up @@ -290,7 +290,7 @@ where
.await
.map(|_| OutputManagerResponse::BaseNodePublicKeySet),
OutputManagerRequest::SyncWithBaseNode => self
.query_unspent_outputs_status(utxo_query_timeout_futures)
.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await
.map(OutputManagerResponse::StartedBaseNodeSync),
OutputManagerRequest::GetInvalidOutputs => {
Expand All @@ -310,7 +310,7 @@ where
}

/// Handle an incoming basenode response message
pub async fn handle_base_node_response(
async fn handle_base_node_response(
&mut self,
response: BaseNodeProto::BaseNodeServiceResponse,
) -> Result<(), OutputManagerError>
Expand Down Expand Up @@ -424,11 +424,11 @@ where
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
) -> Result<(), OutputManagerError>
{
if self.pending_utxo_query_keys.remove(&query_key).is_some() {
error!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Finished queueing new Base Node query timeout");
if let Some(hashes) = self.pending_utxo_query_keys.remove(&query_key) {
warn!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures, Some(hashes))
.await?;

let _ = self
.event_publisher
.send(OutputManagerEvent::BaseNodeSyncRequestTimedOut(query_key))
Expand All @@ -447,49 +447,122 @@ where

/// Send queries to the base node to check the status of all unspent outputs. If the outputs are no longer
/// available their status will be updated in the wallet.
pub async fn query_unspent_outputs_status(
async fn query_unspent_outputs_status(
&mut self,
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
specified_outputs: Option<Vec<Vec<u8>>>,
) -> Result<u64, OutputManagerError>
{
match self.base_node_public_key.as_ref() {
None => Err(OutputManagerError::NoBaseNodeKeysProvided),
Some(pk) => {
let unspent_outputs: Vec<DbUnblindedOutput> = self.db.get_unspent_outputs().await?;
let mut output_hashes = Vec::new();
for uo in unspent_outputs.iter() {
let hash = uo.hash.clone();
output_hashes.push(hash.clone());
}
let request_key = OsRng.next_u64();
let mut first_request_key = 0;
let mut unspent_outputs: Vec<Vec<u8>> = if let Some(hashes) = specified_outputs {
hashes
} else {
self.db
.get_unspent_outputs()
.await?
.iter()
.map(|uo| uo.hash.clone())
.collect()
};

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});
// Determine how many rounds of base node request we need to query all the outputs in batches of
// max_utxo_query_size
let rounds =
((unspent_outputs.len() as f32) / (self.config.max_utxo_query_size as f32)).ceil() as usize;

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "About to attempt to send query to base node");
self.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Query sent to Base Node");
self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node", request_key
);
Ok(request_key)
for r in 0..rounds {
let mut output_hashes = Vec::new();
for uo_hash in
unspent_outputs.drain(..cmp::min(self.config.max_utxo_query_size, unspent_outputs.len()))
{
output_hashes.push(uo_hash);
}
let request_key = OsRng.next_u64();
if first_request_key == 0 {
first_request_key = request_key;
}

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};

let send_message_response = self
.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;

// Here we are going to spawn a non-blocking task that will monitor and log the progress of the
// send process.
tokio::spawn(async move {
match send_message_response.resolve_ok().await {
None => trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
),
Some(send_states) => {
if send_states.len() == 1 {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) queued for sending with Message {}",
request_key,
send_states[0].tag,
);
let message_tag = send_states[0].tag;
if send_states.wait_single().await {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) successfully sent to Base Node with \
Message {}",
request_key,
message_tag,
)
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node with \
Message {}",
request_key,
message_tag,
);
}
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
)
}
},
}
});

self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node, part {} of {} requests",
request_key,
r + 1,
rounds
);
}
// We are just going to return the first request key for use by the front end. It is very unlikely that
// a mobile wallet will ever have this query split up
Ok(first_request_key)
},
}
}
Expand All @@ -500,14 +573,14 @@ where
Ok(self.db.add_unspent_output(output).await?)
}

pub async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
let balance = self.db.get_balance().await?;
trace!(target: LOG_TARGET, "Balance: {:?}", balance);
Ok(balance)
}

/// Request a spending key to be used to accept a transaction from a sender.
pub async fn get_recipient_spending_key(
async fn get_recipient_spending_key(
&mut self,
tx_id: TxId,
amount: MicroTari,
Expand Down Expand Up @@ -634,7 +707,7 @@ where

/// Confirm that a transaction has finished being negotiated between parties so the short-term encumberance can be
/// made official
pub async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
self.db.confirm_encumbered_outputs(tx_id).await?;

Ok(())
Expand All @@ -643,7 +716,7 @@ where
/// Confirm that a received or sent transaction and its outputs have been detected on the base chain. The inputs and
/// outputs are checked to see that they match what the stored PendingTransaction contains. This will
/// be called by the Transaction Service which monitors the base chain.
pub async fn confirm_transaction(
async fn confirm_transaction(
&mut self,
tx_id: u64,
inputs: &[TransactionInput],
Expand Down Expand Up @@ -698,7 +771,7 @@ where
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
pub async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.db.timeout_pending_transaction_outputs(period).await?)
}

Expand Down Expand Up @@ -793,7 +866,8 @@ where
self.base_node_public_key = Some(base_node_public_key);

if startup_query {
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
self.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await?;
}
Ok(())
}
Expand All @@ -816,7 +890,7 @@ where
Ok(self.db.get_invalid_outputs().await?)
}

pub async fn create_coin_split(
async fn create_coin_split(
&mut self,
amount_per_split: MicroTari,
split_count: usize,
Expand Down
Loading

0 comments on commit 7959736

Please sign in to comment.