From bcd14c7dcbfc9c2bd63ec896c80d45785cf04714 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 4 Jul 2022 13:32:47 +0200 Subject: [PATCH] fix(wallet): handle not found rpc error in utxo scanning (#4249) Description --- - handles not found RPC error in UTXO scanning - always remove scanned blocks that were not found on the chain, even if no previously scanned block was found on chain Motivation and Context --- If a reorg has occurred on previously scanned UTXOs, the scanner will fail. It should handle this error by rescanning at that block height. Always removing the previous scanned blocks that were not found may fix the UNIQUE key violation that has been seen cucumber tests. However, I was not able to reproduce the failure locally. How Has This Been Tested? --- In cucumber test Partially manual (scanning works, but reorg case not tested) --- .../wallet/src/utxo_scanner_service/error.rs | 4 +- .../utxo_scanner_service/utxo_scanner_task.rs | 97 +++++++++++-------- base_layer/wallet/tests/utxo_scanner.rs | 5 +- comms/core/src/protocol/rpc/error.rs | 19 ++++ 4 files changed, 82 insertions(+), 43 deletions(-) diff --git a/base_layer/wallet/src/utxo_scanner_service/error.rs b/base_layer/wallet/src/utxo_scanner_service/error.rs index 1f1feab775..7c832894bb 100644 --- a/base_layer/wallet/src/utxo_scanner_service/error.rs +++ b/base_layer/wallet/src/utxo_scanner_service/error.rs @@ -30,8 +30,8 @@ use crate::{error::WalletStorageError, output_manager_service::error::OutputMana #[derive(Debug, Error)] pub enum UtxoScannerError { - #[error("API returned something unexpected.")] - UnexpectedApiResponse, + #[error("Unexpected API response: {details}")] + UnexpectedApiResponse { details: String }, #[error("Wallet storage error: `{0}`")] WalletStorageError(#[from] WalletStorageError), #[error("Connectivity error: `{0}`")] diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index abff646c30..4391fceeac 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -32,7 +32,7 @@ use tari_common_types::{ transaction::{ImportStatus, TxId}, types::HashOutput, }; -use tari_comms::{peer_manager::NodeId, types::CommsPublicKey, PeerConnection}; +use tari_comms::{peer_manager::NodeId, traits::OrOptional, types::CommsPublicKey, PeerConnection}; use tari_core::{ base_node::rpc::BaseNodeWalletRpcClient, blocks::BlockHeader, @@ -234,7 +234,7 @@ where TBackend: WalletBackend + 'static let next_header_hash = next_header.hash(); ScannedBlock { - height: last_scanned_block.height + 1, + height: next_header.height, num_outputs: last_scanned_block.num_outputs, amount: last_scanned_block.amount, header_hash: next_header_hash, @@ -314,8 +314,12 @@ where TBackend: WalletBackend + 'static current_tip_height: u64, client: &mut BaseNodeWalletRpcClient, ) -> Result, UtxoScannerError> { - // Check for reogs let scanned_blocks = self.resources.db.get_scanned_blocks().await?; + debug!( + target: LOG_TARGET, + "Found {} cached previously scanned blocks", + scanned_blocks.len() + ); if scanned_blocks.is_empty() { return Ok(None); @@ -325,51 +329,66 @@ where TBackend: WalletBackend + 'static // Accumulate number of outputs and recovered Tari in the valid blocks // Assumption: The blocks are ordered and a reorg will occur to the most recent blocks. Once you have found a // valid block the blocks before it are also valid and don't need to be checked - let mut missing_scanned_blocks = Vec::new(); + let mut last_missing_scanned_block = None; let mut found_scanned_block = None; let mut num_outputs = 0u64; let mut amount = MicroTari::from(0); for sb in scanned_blocks { - if sb.height <= current_tip_height { - if found_scanned_block.is_none() { - let header = BlockHeader::try_from(client.get_header_by_height(sb.height).await?) - .map_err(UtxoScannerError::ConversionError)?; - let header_hash = header.hash(); - if header_hash == sb.header_hash { - found_scanned_block = Some(sb.clone()); - } else { - missing_scanned_blocks.push(sb.clone()); - } - } - if found_scanned_block.is_some() { - num_outputs = num_outputs.saturating_add(sb.num_outputs.unwrap_or(0)); - amount = amount - .checked_add(sb.amount.unwrap_or_else(|| MicroTari::from(0))) - .ok_or(UtxoScannerError::OverflowError)?; + // The scanned block has a higher height than the current tip, meaning the previously scanned block was + // reorged out. + if sb.height > current_tip_height { + last_missing_scanned_block = Some(sb); + continue; + } + + if found_scanned_block.is_none() { + let header = client.get_header_by_height(sb.height).await.or_optional()?; + let header = header + .map(BlockHeader::try_from) + .transpose() + .map_err(UtxoScannerError::ConversionError)?; + + match header { + Some(header) => { + let header_hash = header.hash(); + if header_hash == sb.header_hash { + found_scanned_block = Some(sb.clone()); + } else { + last_missing_scanned_block = Some(sb.clone()); + } + }, + None => { + last_missing_scanned_block = Some(sb.clone()); + }, } - } else { - missing_scanned_blocks.push(sb.clone()); } + // Sum up the number of outputs recovered starting from the first found block + if found_scanned_block.is_some() { + num_outputs = num_outputs.saturating_add(sb.num_outputs.unwrap_or(0)); + amount = amount + .checked_add(sb.amount.unwrap_or_else(|| MicroTari::from(0))) + .ok_or(UtxoScannerError::OverflowError)?; + } + } + + if let Some(block) = last_missing_scanned_block { + warn!( + target: LOG_TARGET, + "Reorg detected on base node. Removing scanned blocks from height {}", block.height + ); + self.resources + .db + .clear_scanned_blocks_from_and_higher(block.height) + .await?; } if let Some(sb) = found_scanned_block { - if !missing_scanned_blocks.is_empty() { - warn!( - target: LOG_TARGET, - "Reorg detected on base node. Last scanned block found at height {} (Header Hash: {})", - sb.height, - sb.header_hash.to_hex() - ); - self.resources - .db - .clear_scanned_blocks_from_and_higher( - missing_scanned_blocks - .last() - .expect("cannot fail, the vector is not empty") - .height, - ) - .await?; - } + warn!( + target: LOG_TARGET, + "Last scanned block found at height {} (Header Hash: {})", + sb.height, + sb.header_hash.to_hex() + ); Ok(Some(ScannedBlock { height: sb.height, num_outputs: Some(num_outputs), diff --git a/base_layer/wallet/tests/utxo_scanner.rs b/base_layer/wallet/tests/utxo_scanner.rs index 8df7e6c22f..9cade6c82c 100644 --- a/base_layer/wallet/tests/utxo_scanner.rs +++ b/base_layer/wallet/tests/utxo_scanner.rs @@ -351,8 +351,9 @@ async fn test_utxo_scanner_recovery() { final_height, num_recovered, value_recovered, - time_taken: _,} = event.unwrap() { - assert_eq!(final_height, NUM_BLOCKS-1); + time_taken: _, + } = event.unwrap() { + assert_eq!(final_height, NUM_BLOCKS - 1); assert_eq!(num_recovered, total_outputs_to_recover); assert_eq!(value_recovered, total_amount_to_recover); break; diff --git a/comms/core/src/protocol/rpc/error.rs b/comms/core/src/protocol/rpc/error.rs index d640586ebe..e5c4af1b3b 100644 --- a/comms/core/src/protocol/rpc/error.rs +++ b/comms/core/src/protocol/rpc/error.rs @@ -30,6 +30,7 @@ use crate::{ connectivity::ConnectivityError, peer_manager::PeerManagerError, proto::rpc as rpc_proto, + traits::OrOptional, PeerConnectionError, }; @@ -128,3 +129,21 @@ impl From for rpc_proto::rpc_session_reply::HandshakeReje } } } + +impl OrOptional for Result { + type Error = RpcError; + + fn or_optional(self) -> Result, Self::Error> { + self.map(Some).or_else(|err| { + if let RpcError::RequestFailed(ref status) = err { + if status.is_not_found() { + Ok(None) + } else { + Err(err) + } + } else { + Err(err) + } + }) + } +}