Skip to content

Commit

Permalink
Update Output Manager UTXO query to split into multiple queries
Browse files Browse the repository at this point in the history
Previously when the output manager service would send its UTXO validity query to its Base Node it would send a single query with all the UTXO hashes in it. The Base Node then responds with a single response message with all the valid UTXO’s that match a hash in the queries list. In large wallets this caused a problem where the response message could become large enough to be rejected by the frame size limits in the comms stack.

In order to mitigate this issue the Output Manager service will now send queries consisting of up to a maximum number of outputs (specified in the config) and will send multiple queries until all the outputs have been queried so that any one response does not hit the frame size limit.
  • Loading branch information
philipr-za committed Jun 4, 2020
1 parent 997b14d commit b59a1ce
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 76 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::time::Duration;
#[derive(Clone)]
pub struct OutputManagerServiceConfig {
pub base_node_query_timeout: Duration,
pub max_utxo_query_size: usize,
}

impl Default for OutputManagerServiceConfig {
fn default() -> Self {
Self {
base_node_query_timeout: Duration::from_secs(30),
max_utxo_query_size: 5000,
}
}
}
159 changes: 117 additions & 42 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, SinkExt, Stream, StreamExt};
use log::*;
use rand::{rngs::OsRng, RngCore};
use std::{cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use std::{cmp, cmp::Ordering, collections::HashMap, convert::TryFrom, fmt, sync::Mutex, time::Duration};
use tari_broadcast_channel::Publisher;
use tari_comms::types::CommsPublicKey;
use tari_comms_dht::{
Expand Down Expand Up @@ -195,7 +195,7 @@ where
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}, Trace: {}", origin_public_key, resp, msg.dht_header.message_tag);
Err(resp)
});

Expand Down Expand Up @@ -290,7 +290,7 @@ where
.await
.map(|_| OutputManagerResponse::BaseNodePublicKeySet),
OutputManagerRequest::SyncWithBaseNode => self
.query_unspent_outputs_status(utxo_query_timeout_futures)
.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await
.map(OutputManagerResponse::StartedBaseNodeSync),
OutputManagerRequest::GetInvalidOutputs => {
Expand Down Expand Up @@ -424,11 +424,11 @@ where
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
) -> Result<(), OutputManagerError>
{
if self.pending_utxo_query_keys.remove(&query_key).is_some() {
error!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Finished queueing new Base Node query timeout");
if let Some(hashes) = self.pending_utxo_query_keys.remove(&query_key) {
warn!(target: LOG_TARGET, "UTXO Query {} timed out", query_key);
self.query_unspent_outputs_status(utxo_query_timeout_futures, Some(hashes))
.await?;

let _ = self
.event_publisher
.send(OutputManagerEvent::BaseNodeSyncRequestTimedOut(query_key))
Expand All @@ -450,46 +450,120 @@ where
pub async fn query_unspent_outputs_status(
&mut self,
utxo_query_timeout_futures: &mut FuturesUnordered<BoxFuture<'static, u64>>,
specified_outputs: Option<Vec<Vec<u8>>>,
) -> Result<u64, OutputManagerError>
{
match self.base_node_public_key.as_ref() {
None => Err(OutputManagerError::NoBaseNodeKeysProvided),
Some(pk) => {
let unspent_outputs: Vec<DbUnblindedOutput> = self.db.get_unspent_outputs().await?;
let mut output_hashes = Vec::new();
for uo in unspent_outputs.iter() {
let hash = uo.hash.clone();
output_hashes.push(hash.clone());
}
let request_key = OsRng.next_u64();
let mut first_request_key = 0;
let mut unspent_outputs: Vec<Vec<u8>> = if let Some(hashes) = specified_outputs {
hashes
} else {
self.db
.get_unspent_outputs()
.await?
.iter()
.map(|uo| uo.hash.clone())
.collect()
};

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});
// Determine how many rounds of base node request we need to query all the outputs in batches of
// max_utxo_query_size
let rounds =
((unspent_outputs.len() as f32) / (self.config.max_utxo_query_size as f32 + 0.1)) as usize + 1;

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "About to attempt to send query to base node");
self.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;
// TODO Remove this once this bug is fixed
trace!(target: LOG_TARGET, "Query sent to Base Node");
self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node", request_key
);
Ok(request_key)
for r in 0..rounds {
let mut output_hashes = Vec::new();
for uo_hash in
unspent_outputs.drain(..cmp::min(self.config.max_utxo_query_size, unspent_outputs.len()))
{
output_hashes.push(uo_hash);
}
let request_key = OsRng.next_u64();
if first_request_key == 0 {
first_request_key = request_key;
}

let request = BaseNodeRequestProto::FetchUtxos(BaseNodeProto::HashOutputs {
outputs: output_hashes.clone(),
});

let service_request = BaseNodeProto::BaseNodeServiceRequest {
request_key,
request: Some(request),
};

let send_message_response = self
.outbound_message_service
.send_direct(
pk.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
)
.await?;

// Here we are going to spawn a non-blocking task that will monitor and log the progress of the
// send process.
let owned_request_key = request_key;
tokio::spawn(async move {
match send_message_response.resolve_ok().await {
None => trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
owned_request_key
),
Some(send_states) => {
if send_states.len() == 1 {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) queued for sending with Message {}",
owned_request_key,
send_states[0].tag,
);
let message_tag = send_states[0].tag;
if send_states.wait_single().await {
trace!(
target: LOG_TARGET,
"Output Manager UTXO Sync query ({}) successfully sent to Base Node with \
Message {}",
owned_request_key,
message_tag,
)
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node with \
Message {}",
owned_request_key,
message_tag,
);
}
} else {
trace!(
target: LOG_TARGET,
"Failed to send Output Manager UTXO Sync query ({}) to Base Node",
owned_request_key
)
}
},
}
});

self.pending_utxo_query_keys.insert(request_key, output_hashes);
let state_timeout = StateDelay::new(self.config.base_node_query_timeout, request_key);
utxo_query_timeout_futures.push(state_timeout.delay().boxed());
debug!(
target: LOG_TARGET,
"Output Manager Sync query ({}) sent to Base Node, part {} of {} requests",
request_key,
r + 1,
rounds
);
}
// We are just going to return the first request key for use by the front end. It is very unlikely that
// a mobile wallet will ever have this query split up
Ok(first_request_key)
},
}
}
Expand Down Expand Up @@ -793,7 +867,8 @@ where
self.base_node_public_key = Some(base_node_public_key);

if startup_query {
self.query_unspent_outputs_status(utxo_query_timeout_futures).await?;
self.query_unspent_outputs_status(utxo_query_timeout_futures, None)
.await?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit b59a1ce

Please sign in to comment.