diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs index b1d4b38644..1ec424eb90 100644 --- a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -91,6 +91,7 @@ where B: BlockchainBackend + 'static Ok(()) } + #[allow(clippy::too_many_lines)] async fn start_streaming( &self, tx: &mut mpsc::Sender>, @@ -141,7 +142,6 @@ where B: BlockchainBackend + 'static .fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone())) .await .rpc_status_internal_error(LOG_TARGET)?; - let utxos = utxos .into_iter() .enumerate() @@ -159,15 +159,30 @@ where B: BlockchainBackend + 'static current_header_hash.to_hex(), ); - let utxo_block_response = SyncUtxosByBlockResponse { - outputs: utxos, - height: current_header.height, - header_hash: current_header_hash.to_vec(), - mined_timestamp: current_header.timestamp.as_u64(), - }; - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send(Ok(utxo_block_response)).await.is_err() { - break; + for utxo_chunk in utxos.chunks(2000) { + let utxo_block_response = SyncUtxosByBlockResponse { + outputs: utxo_chunk.to_vec(), + height: current_header.height, + header_hash: current_header_hash.to_vec(), + mined_timestamp: current_header.timestamp.as_u64(), + }; + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send(Ok(utxo_block_response)).await.is_err() { + break; + } + } + if utxos.is_empty() { + // if its empty, we need to send an empty vec of outputs. + let utxo_block_response = SyncUtxosByBlockResponse { + outputs: utxos, + height: current_header.height, + header_hash: current_header_hash.to_vec(), + mined_timestamp: current_header.timestamp.as_u64(), + }; + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send(Ok(utxo_block_response)).await.is_err() { + break; + } } debug!( diff --git a/base_layer/core/tests/base_node_rpc.rs b/base_layer/core/tests/base_node_rpc.rs index 9188a4ca0a..70768f7b08 100644 --- a/base_layer/core/tests/base_node_rpc.rs +++ b/base_layer/core/tests/base_node_rpc.rs @@ -389,7 +389,7 @@ async fn test_sync_utxos_by_block() { let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner(); let responses = convert_mpsc_to_stream(&mut streaming).collect::>().await; - + // dbg!(&block0); assert_eq!( vec![ (0, block0.header().hash().to_vec(), 0), diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index d60e7d3f37..7db42c4f45 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -428,6 +428,7 @@ where } } + #[allow(clippy::too_many_lines)] async fn scan_utxos( &mut self, client: &mut BaseNodeWalletRpcClient, @@ -457,6 +458,8 @@ where let mut utxo_next_await_profiling = Vec::new(); let mut scan_for_outputs_profiling = Vec::new(); + let mut prev_scanned_block: Option = None; + let mut prev_output = None; while let Some(response) = { let start = Instant::now(); let utxo_stream_next = utxo_stream.next().await; @@ -478,42 +481,59 @@ where .into_iter() .map(|utxo| TransactionOutput::try_from(utxo).map_err(UtxoScannerError::ConversionError)) .collect::, _>>()?; - + let first_output = Some(outputs[0].clone()); total_scanned += outputs.len(); let start = Instant::now(); let found_outputs = self.scan_for_outputs(outputs).await?; scan_for_outputs_profiling.push(start.elapsed()); - let (count, amount) = self + let (mut count, mut amount) = self .import_utxos_to_transaction_service(found_outputs, current_height, mined_timestamp) .await?; let block_hash = current_header_hash.try_into()?; - self.resources.db.save_scanned_block(ScannedBlock { + if let Some(scanned_block) = prev_scanned_block { + if block_hash == scanned_block.header_hash && first_output == prev_output { + count += scanned_block.num_outputs.unwrap_or(0); + amount += scanned_block.amount.unwrap_or_else(|| 0.into()) + } else { + self.resources.db.save_scanned_block(scanned_block)?; + self.resources.db.clear_scanned_blocks_before_height( + current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), + true, + )?; + + if current_height % PROGRESS_REPORT_INTERVAL == 0 { + debug!( + target: LOG_TARGET, + "Scanned up to block {} with a current tip_height of {}", current_height, tip_height + ); + self.publish_event(UtxoScannerEvent::Progress { + current_height, + tip_height, + }); + } + + num_recovered = num_recovered.saturating_add(count); + total_amount += amount; + } + } + prev_output = first_output; + prev_scanned_block = Some(ScannedBlock { header_hash: block_hash, height: current_height, num_outputs: Some(count), amount: Some(amount), timestamp: Utc::now().naive_utc(), - })?; - - self.resources - .db - .clear_scanned_blocks_before_height(current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), true)?; - - if current_height % PROGRESS_REPORT_INTERVAL == 0 { - debug!( - target: LOG_TARGET, - "Scanned up to block {} with a current tip_height of {}", current_height, tip_height - ); - self.publish_event(UtxoScannerEvent::Progress { - current_height, - tip_height, - }); - } - - num_recovered = num_recovered.saturating_add(count); - total_amount += amount; + }); + } + // We need to update the last one + if let Some(scanned_block) = prev_scanned_block { + self.resources.db.clear_scanned_blocks_before_height( + scanned_block.height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), + true, + )?; + self.resources.db.save_scanned_block(scanned_block)?; } trace!( target: LOG_TARGET,