Skip to content

Commit

Permalink
Update Output Manager UTXO query to split into multiple queries
Browse files Browse the repository at this point in the history
Previously when the output manager service would send its UTXO validity query to its Base Node it would send a single query with all the UTXO hashes in it. The Base Node then responds with a single response message with all the valid UTXO’s that match a hash in the queries list. In large wallets this caused a problem where the response message could become large enough to be rejected by the frame size limits in the comms stack.

In order to mitigate this issue the Output Manager service will now send queries consisting of up to a maximum number of outputs (specified in the config) and will send multiple queries until all the outputs have been queried so that any one response does not hit the frame size limit.
  • Loading branch information
philipr-za committed Jun 5, 2020
1 parent 5294690 commit 0d2c07c
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 86 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::time::Duration;
#[derive(Clone)]
pub struct OutputManagerServiceConfig {
pub base_node_query_timeout: Duration,
pub max_utxo_query_size: usize,
}

impl Default for OutputManagerServiceConfig {
fn default() -> Self {
Self {
base_node_query_timeout: Duration::from_secs(30),
max_utxo_query_size: 5000,
}
}
}
178 changes: 126 additions & 52 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, SinkExt, Stream, StreamExt};
use log::*;
use rand::{rngs::OsRng, RngCore};
use std::{cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use std::{cmp, cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use tari_broadcast_channel::Publisher;
use tari_comms::types::CommsPublicKey;
use tari_comms_dht::{
Expand Down Expand Up @@ -195,7 +195,7 @@ where
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}, Trace: {}", origin_public_key, resp, msg.dht_header.message_tag);
Err(resp)
});

Expand Down Expand Up @@ -290,7 +290,7 @@ where
.await
.map(|_| OutputManagerResponse::BaseNodePublicKeySet),
OutputManagerRequest::SyncWithBaseNode => self
.query_unspent_outputs_status(utxo_query_timeout_futures)
.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await
.map(OutputManagerResponse::StartedBaseNodeSync),
OutputManagerRequest::GetInvalidOutputs => {
Expand All @@ -310,7 +310,7 @@ where
}

/// Handle an incoming basenode response message
pub async fn handle_base_node_response(
async fn handle_base_node_response(
&mut self,
response: BaseNodeProto::BaseNodeServiceResponse,
) -> Result<(), OutputManagerError>
Expand Down Expand Up @@ -424,11 +424,11 @@ where
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
) -> Result<(), OutputManagerError>
{
if self.pending_utxo_query_keys.remove(&query_key).is_some() {
error!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Finished queueing new Base Node query timeout");
if let Some(hashes) = self.pending_utxo_query_keys.remove(&query_key) {
warn!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures, Some(hashes))
.await?;

let _ = self
.event_publisher
.send(OutputManagerEvent::BaseNodeSyncRequestTimedOut(query_key))
Expand All @@ -447,67 +447,140 @@ where

/// Send queries to the base node to check the status of all unspent outputs. If the outputs are no longer
/// available their status will be updated in the wallet.
pub async fn query_unspent_outputs_status(
async fn query_unspent_outputs_status(
&mut self,
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
specified_outputs: Option<Vec<Vec<u8>>>,
) -> Result<u64, OutputManagerError>
{
match self.base_node_public_key.as_ref() {
None => Err(OutputManagerError::NoBaseNodeKeysProvided),
Some(pk) => {
let unspent_outputs: Vec<DbUnblindedOutput> = self.db.get_unspent_outputs().await?;
let mut output_hashes = Vec::new();
for uo in unspent_outputs.iter() {
let hash = uo.hash.clone();
output_hashes.push(hash.clone());
}
let request_key = OsRng.next_u64();
let mut first_request_key = 0;
let mut unspent_outputs: Vec<Vec<u8>> = if let Some(hashes) = specified_outputs {
hashes
} else {
self.db
.get_unspent_outputs()
.await?
.iter()
.map(|uo| uo.hash.clone())
.collect()
};

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});
// Determine how many rounds of base node request we need to query all the outputs in batches of
// max_utxo_query_size
let rounds =
((unspent_outputs.len() as f32) / (self.config.max_utxo_query_size as f32)).ceil() as usize;

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "About to attempt to send query to base node");
self.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Query sent to Base Node");
self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node", request_key
);
Ok(request_key)
for r in 0..rounds {
let mut output_hashes = Vec::new();
for uo_hash in
unspent_outputs.drain(..cmp::min(self.config.max_utxo_query_size, unspent_outputs.len()))
{
output_hashes.push(uo_hash);
}
let request_key = OsRng.next_u64();
if first_request_key == 0 {
first_request_key = request_key;
}

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};

let send_message_response = self
.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;

// Here we are going to spawn a non-blocking task that will monitor and log the progress of the
// send process.
tokio::spawn(async move {
match send_message_response.resolve_ok().await {
None => trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
),
Some(send_states) => {
if send_states.len() == 1 {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) queued for sending with Message {}",
request_key,
send_states[0].tag,
);
let message_tag = send_states[0].tag;
if send_states.wait_single().await {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) successfully sent to Base Node with \
Message {}",
request_key,
message_tag,
)
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node with \
Message {}",
request_key,
message_tag,
);
}
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
request_key
)
}
},
}
});

self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node, part {} of {} requests",
request_key,
r + 1,
rounds
);
}
// We are just going to return the first request key for use by the front end. It is very unlikely that
// a mobile wallet will ever have this query split up
Ok(first_request_key)
},
}
}

/// Add an unblinded output to the unspent outputs list
pub async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
async fn add_output(&mut self, output: UnblindedOutput) -> Result<(), OutputManagerError> {
let output = DbUnblindedOutput::from_unblinded_output(output, &self.factories)?;
Ok(self.db.add_unspent_output(output).await?)
}

pub async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
async fn get_balance(&self) -> Result<Balance, OutputManagerError> {
let balance = self.db.get_balance().await?;
trace!(target: LOG_TARGET, "Balance: {:?}", balance);
Ok(balance)
}

/// Request a spending key to be used to accept a transaction from a sender.
pub async fn get_recipient_spending_key(
async fn get_recipient_spending_key(
&mut self,
tx_id: TxId,
amount: MicroTari,
Expand Down Expand Up @@ -558,7 +631,7 @@ where

/// Prepare a Sender Transaction Protocol for the amount and fee_per_gram specified. If required a change output
/// will be produced.
pub async fn prepare_transaction_to_send(
async fn prepare_transaction_to_send(
&mut self,
amount: MicroTari,
fee_per_gram: MicroTari,
Expand Down Expand Up @@ -634,7 +707,7 @@ where

/// Confirm that a transaction has finished being negotiated between parties so the short-term encumberance can be
/// made official
pub async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
async fn confirm_encumberance(&mut self, tx_id: u64) -> Result<(), OutputManagerError> {
self.db.confirm_encumbered_outputs(tx_id).await?;

Ok(())
Expand All @@ -643,7 +716,7 @@ where
/// Confirm that a received or sent transaction and its outputs have been detected on the base chain. The inputs and
/// outputs are checked to see that they match what the stored PendingTransaction contains. This will
/// be called by the Transaction Service which monitors the base chain.
pub async fn confirm_transaction(
async fn confirm_transaction(
&mut self,
tx_id: u64,
inputs: &[TransactionInput],
Expand Down Expand Up @@ -698,7 +771,7 @@ where
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
pub async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.db.timeout_pending_transaction_outputs(period).await?)
}

Expand Down Expand Up @@ -793,7 +866,8 @@ where
self.base_node_public_key = Some(base_node_public_key);

if startup_query {
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
self.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await?;
}
Ok(())
}
Expand All @@ -816,7 +890,7 @@ where
Ok(self.db.get_invalid_outputs().await?)
}

pub async fn create_coin_split(
async fn create_coin_split(
&mut self,
amount_per_split: MicroTari,
split_count: usize,
Expand Down
Loading

0 comments on commit 0d2c07c

Please sign in to comment.