Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bulletproof rewind profiling #3618

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
@@ -300,7 +300,10 @@ pub async fn init_wallet(
);

let node_address = match wallet_db.get_node_address().await? {
None => config.public_address.clone().unwrap_or_else(Multiaddr::empty),
None => match config.public_address.clone() {
Some(val) => val,
None => Multiaddr::empty(),
},
Some(a) => a,
};

@@ -464,12 +467,12 @@ pub async fn init_wallet(
},
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};

Ok(wallet)
}
13 changes: 9 additions & 4 deletions applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
@@ -79,7 +79,11 @@ pub fn get_seed_from_seed_words(seed_words: Vec<String>) -> Result<CipherSeed, E
/// Recovers wallet funds by connecting to a given base node peer, downloading the transaction outputs stored in the
/// blockchain, and attempting to rewind them. Any outputs that are successfully rewound are then imported into the
/// wallet.
pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfig) -> Result<(), ExitCodes> {
pub async fn wallet_recovery(
wallet: &WalletSqlite,
base_node_config: &PeerConfig,
retry_limit: usize,
) -> Result<(), ExitCodes> {
println!("\nPress Ctrl-C to stop the recovery process\n");
// We dont care about the shutdown signal here, so we just create one
let shutdown = Shutdown::new();
@@ -105,7 +109,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi

let mut recovery_task = UtxoScannerService::<WalletSqliteDatabase>::builder()
.with_peers(peer_public_keys)
.with_retry_limit(3)
// Do not make this a small number as wallet recovery needs to be resilient
.with_retry_limit(retry_limit)
.build_with_wallet(wallet, shutdown_signal);

let mut event_stream = recovery_task.get_event_receiver();
@@ -122,8 +127,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi
println!("OK (latency = {:.2?})", latency);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32;
debug!(
6 changes: 5 additions & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
@@ -285,7 +285,11 @@ pub fn recovery_mode(config: WalletModeConfig, wallet: WalletSqlite) -> Result<(
println!("{}", CUCUMBER_TEST_MARKER_A);

println!("Starting recovery...");
match handle.block_on(wallet_recovery(&wallet, &base_node_config)) {
match handle.block_on(wallet_recovery(
&wallet,
&base_node_config,
config.global_config.wallet_recovery_retry_limit,
)) {
Ok(_) => println!("Wallet recovered!"),
Err(e) => {
error!(target: LOG_TARGET, "Recovery failed: {}", e);
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use log::*;
use rand::rngs::OsRng;
@@ -73,6 +73,8 @@ where TBackend: OutputManagerBackend + 'static
&mut self,
outputs: Vec<TransactionOutput>,
) -> Result<Vec<UnblindedOutput>, OutputManagerError> {
let start = Instant::now();
let outputs_length = outputs.len();
let mut rewound_outputs: Vec<UnblindedOutput> = outputs
.into_iter()
.filter_map(|output| {
@@ -114,6 +116,13 @@ where TBackend: OutputManagerBackend + 'static
},
)
.collect();
let rewind_time = start.elapsed();
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - rewound {} outputs in {} ms",
outputs_length,
rewind_time.as_millis(),
);

for output in rewound_outputs.iter_mut() {
self.update_outputs_script_private_key_and_update_key_manager_index(output)
@@ -136,10 +145,7 @@ where TBackend: OutputManagerBackend + 'static
trace!(
target: LOG_TARGET,
"Output {} with value {} with {} recovered",
output
.as_transaction_input(&self.factories.commitment)?
.commitment
.to_hex(),
output_hex,
output.value,
output.features,
);
4 changes: 2 additions & 2 deletions base_layer/wallet/src/utxo_scanner_service/handle.rs
Original file line number Diff line number Diff line change
@@ -42,8 +42,8 @@ pub enum UtxoScannerEvent {
},
/// Progress of the recovery process (current_block, current_chain_height)
Progress {
current_block: u64,
current_chain_height: u64,
current_index: u64,
total_index: u64,
},
/// Completed Recovery (Number scanned, Num of Recovered outputs, Value of recovered outputs, Time taken)
Completed {
72 changes: 55 additions & 17 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ use tari_core::{
},
};
use tari_shutdown::ShutdownSignal;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, time};

use crate::{
error::WalletError,
@@ -89,8 +89,8 @@ where TBackend: WalletBackend + 'static
) -> Result<(), UtxoScannerError> {
let metadata = self.get_metadata().await?.unwrap_or_default();
self.publish_event(UtxoScannerEvent::Progress {
current_block: final_utxo_pos,
current_chain_height: final_utxo_pos,
current_index: final_utxo_pos,
total_index: final_utxo_pos,
});
self.publish_event(UtxoScannerEvent::Completed {
number_scanned: total_scanned,
@@ -116,11 +116,20 @@ where TBackend: WalletBackend + 'static
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
peer,
peer: peer.clone(),
num_retries: self.num_retries,
retry_limit: self.retry_limit,
error: e.to_string(),
});
// No use re-dialing a peer that is not responsive for recovery mode
if self.mode == UtxoScannerMode::Recovery {
if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await {
if connection.clone().disconnect().await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer);
}
};
let _ = time::sleep(Duration::from_secs(30));
}

Err(e.into())
},
@@ -243,14 +252,14 @@ where TBackend: WalletBackend + 'static
);

let end_header_hash = end_header.hash();
let end_header_size = end_header.output_mmr_size;
let output_mmr_size = end_header.output_mmr_size;
let mut num_recovered = 0u64;
let mut total_amount = MicroTari::from(0);
let mut total_scanned = 0;

self.publish_event(UtxoScannerEvent::Progress {
current_block: start_mmr_leaf_index,
current_chain_height: (end_header_size - 1),
current_index: start_mmr_leaf_index,
total_index: (output_mmr_size - 1),
});
let request = SyncUtxosRequest {
start: start_mmr_leaf_index,
@@ -259,13 +268,28 @@ where TBackend: WalletBackend + 'static
include_deleted_bitmaps: false,
};

let start = Instant::now();
let utxo_stream = client.sync_utxos(request).await?;
// We download in chunks just because rewind_outputs works with multiple outputs (and could parallelized
// rewinding)
let mut utxo_stream = utxo_stream.chunks(10);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - UTXO stream request time {} ms",
start.elapsed().as_millis(),
);

// We download in chunks for improved streaming efficiency
const CHUNK_SIZE: usize = 125;
let mut utxo_stream = utxo_stream.chunks(CHUNK_SIZE);
const COMMIT_EVERY_N: u64 = (1000_i64 / CHUNK_SIZE as i64) as u64;
let mut last_utxo_index = 0u64;
let mut iteration_count = 0u64;
while let Some(response) = utxo_stream.next().await {
let mut utxo_next_await_profiling = Vec::new();
let mut scan_for_outputs_profiling = Vec::new();
while let Some(response) = {
let start = Instant::now();
let utxo_stream_next = utxo_stream.next().await;
utxo_next_await_profiling.push(start.elapsed());
utxo_stream_next
} {
if self.shutdown_signal.is_triggered() {
// if running is set to false, we know its been canceled upstream so lets exit the loop
return Ok(total_scanned as u64);
@@ -274,14 +298,16 @@ where TBackend: WalletBackend + 'static
last_utxo_index = utxo_index;
total_scanned += outputs.len();
iteration_count += 1;

let start = Instant::now();
let found_outputs = self.scan_for_outputs(outputs).await?;
scan_for_outputs_profiling.push(start.elapsed());

// Reduce the number of db hits by only persisting progress every N iterations
const COMMIT_EVERY_N: u64 = 100;
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= end_header_size - 1 {
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= output_mmr_size - 1 {
self.publish_event(UtxoScannerEvent::Progress {
current_block: last_utxo_index,
current_chain_height: (end_header_size - 1),
current_index: last_utxo_index,
total_index: (output_mmr_size - 1),
});
self.update_scanning_progress_in_db(
last_utxo_index,
@@ -295,11 +321,23 @@ where TBackend: WalletBackend + 'static
num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
}
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - streamed {} outputs in {} ms",
total_scanned,
utxo_next_await_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - scanned {} outputs in {} ms",
total_scanned,
scan_for_outputs_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
self.update_scanning_progress_in_db(last_utxo_index, total_amount, num_recovered, end_header_hash)
.await?;
self.publish_event(UtxoScannerEvent::Progress {
current_block: (end_header_size - 1),
current_chain_height: (end_header_size - 1),
current_index: (output_mmr_size - 1),
total_index: (output_mmr_size - 1),
});
Ok(total_scanned as u64)
}
4 changes: 2 additions & 2 deletions base_layer/wallet_ffi/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -87,8 +87,8 @@ pub async fn recovery_event_monitoring(
);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
unsafe {
(recovery_progress_callback)(RecoveryEvent::Progress as u8, current, total);
5 changes: 5 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
@@ -112,6 +112,7 @@ pub struct GlobalConfig {
pub base_node_event_channel_size: usize,
pub output_manager_event_channel_size: usize,
pub wallet_connection_manager_pool_size: usize,
pub wallet_recovery_retry_limit: usize,
pub console_wallet_password: Option<String>,
pub wallet_command_send_wait_stage: String,
pub wallet_command_send_wait_timeout: u64,
@@ -484,6 +485,9 @@ fn convert_node_config(
let key = "wallet.connection_manager_pool_size";
let wallet_connection_manager_pool_size = optional(cfg.get_int(key))?.unwrap_or(16) as usize;

let key = "wallet.wallet_recovery_retry_limit";
let wallet_recovery_retry_limit = optional(cfg.get_int(key))?.unwrap_or(3) as usize;

let key = "wallet.output_manager_event_channel_size";
let output_manager_event_channel_size = optional(cfg.get_int(key))?.unwrap_or(250) as usize;

@@ -759,6 +763,7 @@ fn convert_node_config(
transaction_event_channel_size,
base_node_event_channel_size,
wallet_connection_manager_pool_size,
wallet_recovery_retry_limit,
output_manager_event_channel_size,
console_wallet_password,
wallet_command_send_wait_stage,
Loading