Skip to content

Commit

Permalink
fix(wallet): handle not found rpc error in utxo scanning (#4249)
Browse files Browse the repository at this point in the history
Description
---
- handles not found RPC error in UTXO scanning
- always remove scanned blocks that were not found on the chain, even if no previously scanned block was found on chain  

Motivation and Context
---
If a reorg has occurred on previously scanned UTXOs, the scanner will fail. It should handle this error by rescanning at that block height.

Always removing the previous scanned blocks that were not found may fix the UNIQUE key violation that has been seen cucumber tests. However, I was not able to reproduce the failure locally. 

How Has This Been Tested?
---
In cucumber test
Partially manual (scanning works, but reorg case not tested)
  • Loading branch information
sdbondi authored Jul 4, 2022
1 parent 3e5a4bb commit bcd14c7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 43 deletions.
4 changes: 2 additions & 2 deletions base_layer/wallet/src/utxo_scanner_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::{error::WalletStorageError, output_manager_service::error::OutputMana

#[derive(Debug, Error)]
pub enum UtxoScannerError {
#[error("API returned something unexpected.")]
UnexpectedApiResponse,
#[error("Unexpected API response: {details}")]
UnexpectedApiResponse { details: String },
#[error("Wallet storage error: `{0}`")]
WalletStorageError(#[from] WalletStorageError),
#[error("Connectivity error: `{0}`")]
Expand Down
97 changes: 58 additions & 39 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_common_types::{
transaction::{ImportStatus, TxId},
types::HashOutput,
};
use tari_comms::{peer_manager::NodeId, types::CommsPublicKey, PeerConnection};
use tari_comms::{peer_manager::NodeId, traits::OrOptional, types::CommsPublicKey, PeerConnection};
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
Expand Down Expand Up @@ -234,7 +234,7 @@ where TBackend: WalletBackend + 'static
let next_header_hash = next_header.hash();

ScannedBlock {
height: last_scanned_block.height + 1,
height: next_header.height,
num_outputs: last_scanned_block.num_outputs,
amount: last_scanned_block.amount,
header_hash: next_header_hash,
Expand Down Expand Up @@ -314,8 +314,12 @@ where TBackend: WalletBackend + 'static
current_tip_height: u64,
client: &mut BaseNodeWalletRpcClient,
) -> Result<Option<ScannedBlock>, UtxoScannerError> {
// Check for reogs
let scanned_blocks = self.resources.db.get_scanned_blocks().await?;
debug!(
target: LOG_TARGET,
"Found {} cached previously scanned blocks",
scanned_blocks.len()
);

if scanned_blocks.is_empty() {
return Ok(None);
Expand All @@ -325,51 +329,66 @@ where TBackend: WalletBackend + 'static
// Accumulate number of outputs and recovered Tari in the valid blocks
// Assumption: The blocks are ordered and a reorg will occur to the most recent blocks. Once you have found a
// valid block the blocks before it are also valid and don't need to be checked
let mut missing_scanned_blocks = Vec::new();
let mut last_missing_scanned_block = None;
let mut found_scanned_block = None;
let mut num_outputs = 0u64;
let mut amount = MicroTari::from(0);
for sb in scanned_blocks {
if sb.height <= current_tip_height {
if found_scanned_block.is_none() {
let header = BlockHeader::try_from(client.get_header_by_height(sb.height).await?)
.map_err(UtxoScannerError::ConversionError)?;
let header_hash = header.hash();
if header_hash == sb.header_hash {
found_scanned_block = Some(sb.clone());
} else {
missing_scanned_blocks.push(sb.clone());
}
}
if found_scanned_block.is_some() {
num_outputs = num_outputs.saturating_add(sb.num_outputs.unwrap_or(0));
amount = amount
.checked_add(sb.amount.unwrap_or_else(|| MicroTari::from(0)))
.ok_or(UtxoScannerError::OverflowError)?;
// The scanned block has a higher height than the current tip, meaning the previously scanned block was
// reorged out.
if sb.height > current_tip_height {
last_missing_scanned_block = Some(sb);
continue;
}

if found_scanned_block.is_none() {
let header = client.get_header_by_height(sb.height).await.or_optional()?;
let header = header
.map(BlockHeader::try_from)
.transpose()
.map_err(UtxoScannerError::ConversionError)?;

match header {
Some(header) => {
let header_hash = header.hash();
if header_hash == sb.header_hash {
found_scanned_block = Some(sb.clone());
} else {
last_missing_scanned_block = Some(sb.clone());
}
},
None => {
last_missing_scanned_block = Some(sb.clone());
},
}
} else {
missing_scanned_blocks.push(sb.clone());
}
// Sum up the number of outputs recovered starting from the first found block
if found_scanned_block.is_some() {
num_outputs = num_outputs.saturating_add(sb.num_outputs.unwrap_or(0));
amount = amount
.checked_add(sb.amount.unwrap_or_else(|| MicroTari::from(0)))
.ok_or(UtxoScannerError::OverflowError)?;
}
}

if let Some(block) = last_missing_scanned_block {
warn!(
target: LOG_TARGET,
"Reorg detected on base node. Removing scanned blocks from height {}", block.height
);
self.resources
.db
.clear_scanned_blocks_from_and_higher(block.height)
.await?;
}

if let Some(sb) = found_scanned_block {
if !missing_scanned_blocks.is_empty() {
warn!(
target: LOG_TARGET,
"Reorg detected on base node. Last scanned block found at height {} (Header Hash: {})",
sb.height,
sb.header_hash.to_hex()
);
self.resources
.db
.clear_scanned_blocks_from_and_higher(
missing_scanned_blocks
.last()
.expect("cannot fail, the vector is not empty")
.height,
)
.await?;
}
warn!(
target: LOG_TARGET,
"Last scanned block found at height {} (Header Hash: {})",
sb.height,
sb.header_hash.to_hex()
);
Ok(Some(ScannedBlock {
height: sb.height,
num_outputs: Some(num_outputs),
Expand Down
5 changes: 3 additions & 2 deletions base_layer/wallet/tests/utxo_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ async fn test_utxo_scanner_recovery() {
final_height,
num_recovered,
value_recovered,
time_taken: _,} = event.unwrap() {
assert_eq!(final_height, NUM_BLOCKS-1);
time_taken: _,
} = event.unwrap() {
assert_eq!(final_height, NUM_BLOCKS - 1);
assert_eq!(num_recovered, total_outputs_to_recover);
assert_eq!(value_recovered, total_amount_to_recover);
break;
Expand Down
19 changes: 19 additions & 0 deletions comms/core/src/protocol/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
connectivity::ConnectivityError,
peer_manager::PeerManagerError,
proto::rpc as rpc_proto,
traits::OrOptional,
PeerConnectionError,
};

Expand Down Expand Up @@ -128,3 +129,21 @@ impl From<HandshakeRejectReason> for rpc_proto::rpc_session_reply::HandshakeReje
}
}
}

impl<T> OrOptional<T> for Result<T, RpcError> {
type Error = RpcError;

fn or_optional(self) -> Result<Option<T>, Self::Error> {
self.map(Some).or_else(|err| {
if let RpcError::RequestFailed(ref status) = err {
if status.is_not_found() {
Ok(None)
} else {
Err(err)
}
} else {
Err(err)
}
})
}
}

0 comments on commit bcd14c7

Please sign in to comment.