Skip to content

Commit

Permalink
feat: add bulletproof rewind profiling (#3618)
Browse files Browse the repository at this point in the history
Description
---
The following changes were made:
- Added bulletproof rewind profiling measurements.
- Improved wallet recovery resiliency with improved connection logic to the selected base node(s).
- Improved recovering of seed words usability.
- Improved some struct member names to be less confusing.
- Improved recovery UTXO streaming performance by tuning the chunking parameter.

**Results**
The following tests were run using standard Tor connectivity on a laptop with an Intel(R) Core(TM) i7-7820HQ CPU and basic Samsung NVMe storage device.

| Number of outputs               | 52654  | 52284  | 52753  | 52762 | 52762  |
| ------------------------------- | ------ | ------ | ------ | ----- | ------ |
| Chunk size                      | 10     | 100    | 250    | 125   | 125    |
| Stream form base node time (ms) | 197678 | 108757 | 236872 | 96274 | 115798 |
| -  per output (ms)              | 3.754  | 2.080  | 4.490  | 1.825 | 2.195  |
| -  normalized                   | 0.836  | 0.463  | 1      | 0.406 | 0.489  |
| Total processing time (ms)      | 77157  | 72653  | 69005  | 67170 | 68563  |
| -  per output (ms)              | 1.465  | 1.39   | 1.308  | 1.273 | 1.299  |
| -  normalized                   | 1      | 0.949  | 0.893  | 0.869 | 0.887  |
| Bulletproofs rewind time (ms)   | 10877  | 11040  | 10665  | 10854 | 10702  |
| -  per output (ms)              | 0.207  | 0.211  | 0.202  | 0.206 | 0.203  |
| -  normalized                   | 0.981  | 1      | 0.957  | 0.976 | 0.962  |

**Discussion**
- As can be seen, Bulletproof rewind performance was consistently measured at ~0.21ms per output.
- Total processing time, which includes all database processing and Bulletproof rewinding, averaged at about 1.35ms per output.
- Optimum streaming performance was found to be ~2ms per output.
- Output streaming performance was found to be closely related to the chunk size that are requested from the base node. Smaller (10) and larger (250) quantities were the worst performers, while a quantity in the range of 100 to 125 was approximately twice as fast. _For this PR a final chunk size value of 125 was chosen._

Motivation and Context
---
- Further code improvements can be based on informed decisions using the profiling information as basis.

How Has This Been Tested?
---
- Unit testing.
- Cucumber testing.
- System-level testing doing wallet recoveries.
  • Loading branch information
hansieodendaal authored Nov 26, 2021
1 parent b09acd1 commit 5790a9d
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 4,533 deletions.
15 changes: 9 additions & 6 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ pub async fn init_wallet(
);

let node_address = match wallet_db.get_node_address().await? {
None => config.public_address.clone().unwrap_or_else(Multiaddr::empty),
None => match config.public_address.clone() {
Some(val) => val,
None => Multiaddr::empty(),
},
Some(a) => a,
};

Expand Down Expand Up @@ -464,12 +467,12 @@ pub async fn init_wallet(
},
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};

Ok(wallet)
}
Expand Down
13 changes: 9 additions & 4 deletions applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub fn get_seed_from_seed_words(seed_words: Vec<String>) -> Result<CipherSeed, E
/// Recovers wallet funds by connecting to a given base node peer, downloading the transaction outputs stored in the
/// blockchain, and attempting to rewind them. Any outputs that are successfully rewound are then imported into the
/// wallet.
pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfig) -> Result<(), ExitCodes> {
pub async fn wallet_recovery(
wallet: &WalletSqlite,
base_node_config: &PeerConfig,
retry_limit: usize,
) -> Result<(), ExitCodes> {
println!("\nPress Ctrl-C to stop the recovery process\n");
// We dont care about the shutdown signal here, so we just create one
let shutdown = Shutdown::new();
Expand All @@ -105,7 +109,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi

let mut recovery_task = UtxoScannerService::<WalletSqliteDatabase>::builder()
.with_peers(peer_public_keys)
.with_retry_limit(3)
// Do not make this a small number as wallet recovery needs to be resilient
.with_retry_limit(retry_limit)
.build_with_wallet(wallet, shutdown_signal);

let mut event_stream = recovery_task.get_event_receiver();
Expand All @@ -122,8 +127,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi
println!("OK (latency = {:.2?})", latency);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32;
debug!(
Expand Down
6 changes: 5 additions & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ pub fn recovery_mode(config: WalletModeConfig, wallet: WalletSqlite) -> Result<(
println!("{}", CUCUMBER_TEST_MARKER_A);

println!("Starting recovery...");
match handle.block_on(wallet_recovery(&wallet, &base_node_config)) {
match handle.block_on(wallet_recovery(
&wallet,
&base_node_config,
config.global_config.wallet_recovery_retry_limit,
)) {
Ok(_) => println!("Wallet recovered!"),
Err(e) => {
error!(target: LOG_TARGET, "Recovery failed: {}", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use log::*;
use rand::rngs::OsRng;
Expand Down Expand Up @@ -73,6 +73,8 @@ where TBackend: OutputManagerBackend + 'static
&mut self,
outputs: Vec<TransactionOutput>,
) -> Result<Vec<UnblindedOutput>, OutputManagerError> {
let start = Instant::now();
let outputs_length = outputs.len();
let mut rewound_outputs: Vec<UnblindedOutput> = outputs
.into_iter()
.filter_map(|output| {
Expand Down Expand Up @@ -114,6 +116,13 @@ where TBackend: OutputManagerBackend + 'static
},
)
.collect();
let rewind_time = start.elapsed();
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - rewound {} outputs in {} ms",
outputs_length,
rewind_time.as_millis(),
);

for output in rewound_outputs.iter_mut() {
self.update_outputs_script_private_key_and_update_key_manager_index(output)
Expand All @@ -136,10 +145,7 @@ where TBackend: OutputManagerBackend + 'static
trace!(
target: LOG_TARGET,
"Output {} with value {} with {} recovered",
output
.as_transaction_input(&self.factories.commitment)?
.commitment
.to_hex(),
output_hex,
output.value,
output.features,
);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/utxo_scanner_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub enum UtxoScannerEvent {
},
/// Progress of the recovery process (current_block, current_chain_height)
Progress {
current_block: u64,
current_chain_height: u64,
current_index: u64,
total_index: u64,
},
/// Completed Recovery (Number scanned, Num of Recovered outputs, Value of recovered outputs, Time taken)
Completed {
Expand Down
72 changes: 55 additions & 17 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tari_core::{
},
};
use tari_shutdown::ShutdownSignal;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, time};

use crate::{
error::WalletError,
Expand Down Expand Up @@ -89,8 +89,8 @@ where TBackend: WalletBackend + 'static
) -> Result<(), UtxoScannerError> {
let metadata = self.get_metadata().await?.unwrap_or_default();
self.publish_event(UtxoScannerEvent::Progress {
current_block: final_utxo_pos,
current_chain_height: final_utxo_pos,
current_index: final_utxo_pos,
total_index: final_utxo_pos,
});
self.publish_event(UtxoScannerEvent::Completed {
number_scanned: total_scanned,
Expand All @@ -116,11 +116,20 @@ where TBackend: WalletBackend + 'static
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
peer,
peer: peer.clone(),
num_retries: self.num_retries,
retry_limit: self.retry_limit,
error: e.to_string(),
});
// No use re-dialing a peer that is not responsive for recovery mode
if self.mode == UtxoScannerMode::Recovery {
if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await {
if connection.clone().disconnect().await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer);
}
};
let _ = time::sleep(Duration::from_secs(30));
}

Err(e.into())
},
Expand Down Expand Up @@ -243,14 +252,14 @@ where TBackend: WalletBackend + 'static
);

let end_header_hash = end_header.hash();
let end_header_size = end_header.output_mmr_size;
let output_mmr_size = end_header.output_mmr_size;
let mut num_recovered = 0u64;
let mut total_amount = MicroTari::from(0);
let mut total_scanned = 0;

self.publish_event(UtxoScannerEvent::Progress {
current_block: start_mmr_leaf_index,
current_chain_height: (end_header_size - 1),
current_index: start_mmr_leaf_index,
total_index: (output_mmr_size - 1),
});
let request = SyncUtxosRequest {
start: start_mmr_leaf_index,
Expand All @@ -259,13 +268,28 @@ where TBackend: WalletBackend + 'static
include_deleted_bitmaps: false,
};

let start = Instant::now();
let utxo_stream = client.sync_utxos(request).await?;
// We download in chunks just because rewind_outputs works with multiple outputs (and could parallelized
// rewinding)
let mut utxo_stream = utxo_stream.chunks(10);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - UTXO stream request time {} ms",
start.elapsed().as_millis(),
);

// We download in chunks for improved streaming efficiency
const CHUNK_SIZE: usize = 125;
let mut utxo_stream = utxo_stream.chunks(CHUNK_SIZE);
const COMMIT_EVERY_N: u64 = (1000_i64 / CHUNK_SIZE as i64) as u64;
let mut last_utxo_index = 0u64;
let mut iteration_count = 0u64;
while let Some(response) = utxo_stream.next().await {
let mut utxo_next_await_profiling = Vec::new();
let mut scan_for_outputs_profiling = Vec::new();
while let Some(response) = {
let start = Instant::now();
let utxo_stream_next = utxo_stream.next().await;
utxo_next_await_profiling.push(start.elapsed());
utxo_stream_next
} {
if self.shutdown_signal.is_triggered() {
// if running is set to false, we know its been canceled upstream so lets exit the loop
return Ok(total_scanned as u64);
Expand All @@ -274,14 +298,16 @@ where TBackend: WalletBackend + 'static
last_utxo_index = utxo_index;
total_scanned += outputs.len();
iteration_count += 1;

let start = Instant::now();
let found_outputs = self.scan_for_outputs(outputs).await?;
scan_for_outputs_profiling.push(start.elapsed());

// Reduce the number of db hits by only persisting progress every N iterations
const COMMIT_EVERY_N: u64 = 100;
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= end_header_size - 1 {
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= output_mmr_size - 1 {
self.publish_event(UtxoScannerEvent::Progress {
current_block: last_utxo_index,
current_chain_height: (end_header_size - 1),
current_index: last_utxo_index,
total_index: (output_mmr_size - 1),
});
self.update_scanning_progress_in_db(
last_utxo_index,
Expand All @@ -295,11 +321,23 @@ where TBackend: WalletBackend + 'static
num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
}
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - streamed {} outputs in {} ms",
total_scanned,
utxo_next_await_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - scanned {} outputs in {} ms",
total_scanned,
scan_for_outputs_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
self.update_scanning_progress_in_db(last_utxo_index, total_amount, num_recovered, end_header_hash)
.await?;
self.publish_event(UtxoScannerEvent::Progress {
current_block: (end_header_size - 1),
current_chain_height: (end_header_size - 1),
current_index: (output_mmr_size - 1),
total_index: (output_mmr_size - 1),
});
Ok(total_scanned as u64)
}
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet_ffi/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub async fn recovery_event_monitoring(
);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
unsafe {
(recovery_progress_callback)(RecoveryEvent::Progress as u8, current, total);
Expand Down
5 changes: 5 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub struct GlobalConfig {
pub base_node_event_channel_size: usize,
pub output_manager_event_channel_size: usize,
pub wallet_connection_manager_pool_size: usize,
pub wallet_recovery_retry_limit: usize,
pub console_wallet_password: Option<String>,
pub wallet_command_send_wait_stage: String,
pub wallet_command_send_wait_timeout: u64,
Expand Down Expand Up @@ -484,6 +485,9 @@ fn convert_node_config(
let key = "wallet.connection_manager_pool_size";
let wallet_connection_manager_pool_size = optional(cfg.get_int(key))?.unwrap_or(16) as usize;

let key = "wallet.wallet_recovery_retry_limit";
let wallet_recovery_retry_limit = optional(cfg.get_int(key))?.unwrap_or(3) as usize;

let key = "wallet.output_manager_event_channel_size";
let output_manager_event_channel_size = optional(cfg.get_int(key))?.unwrap_or(250) as usize;

Expand Down Expand Up @@ -759,6 +763,7 @@ fn convert_node_config(
transaction_event_channel_size,
base_node_event_channel_size,
wallet_connection_manager_pool_size,
wallet_recovery_retry_limit,
output_manager_event_channel_size,
console_wallet_password,
wallet_command_send_wait_stage,
Expand Down
Loading

0 comments on commit 5790a9d

Please sign in to comment.