Skip to content

Commit

Permalink
feat!: add paging to utxo stream request (#5302)
Browse files Browse the repository at this point in the history
Description
---
Adds paging to the utxo stream to handle the large amount of faucet
utxos

Motivation and Context
---
See: #5299

How Has This Been Tested?
---
unit tests

Fixes: #5299

What process can a PR reviewer use to test or verify this change?
---
See PR #5298 


Breaking Changes
---
Changes request service

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
SWvheerden authored Apr 12, 2023
1 parent 1f04beb commit 3540309
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 33 deletions.
35 changes: 25 additions & 10 deletions base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where B: BlockchainBackend + 'static
Ok(())
}

#[allow(clippy::too_many_lines)]
async fn start_streaming(
&self,
tx: &mut mpsc::Sender<Result<SyncUtxosByBlockResponse, RpcStatus>>,
Expand Down Expand Up @@ -141,7 +142,6 @@ where B: BlockchainBackend + 'static
.fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone()))
.await
.rpc_status_internal_error(LOG_TARGET)?;

let utxos = utxos
.into_iter()
.enumerate()
Expand All @@ -159,15 +159,30 @@ where B: BlockchainBackend + 'static
current_header_hash.to_hex(),
);

let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxos,
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
for utxo_chunk in utxos.chunks(2000) {
let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxo_chunk.to_vec(),
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
}
}
if utxos.is_empty() {
// if its empty, we need to send an empty vec of outputs.
let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxos,
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
}
}

debug!(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/base_node_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ async fn test_sync_utxos_by_block() {
let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner();

let responses = convert_mpsc_to_stream(&mut streaming).collect::<Vec<_>>().await;

// dbg!(&block0);
assert_eq!(
vec![
(0, block0.header().hash().to_vec(), 0),
Expand Down
64 changes: 42 additions & 22 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ where
}
}

#[allow(clippy::too_many_lines)]
async fn scan_utxos(
&mut self,
client: &mut BaseNodeWalletRpcClient,
Expand Down Expand Up @@ -457,6 +458,8 @@ where

let mut utxo_next_await_profiling = Vec::new();
let mut scan_for_outputs_profiling = Vec::new();
let mut prev_scanned_block: Option<ScannedBlock> = None;
let mut prev_output = None;
while let Some(response) = {
let start = Instant::now();
let utxo_stream_next = utxo_stream.next().await;
Expand All @@ -478,42 +481,59 @@ where
.into_iter()
.map(|utxo| TransactionOutput::try_from(utxo).map_err(UtxoScannerError::ConversionError))
.collect::<Result<Vec<_>, _>>()?;

let first_output = Some(outputs[0].clone());
total_scanned += outputs.len();

let start = Instant::now();
let found_outputs = self.scan_for_outputs(outputs).await?;
scan_for_outputs_profiling.push(start.elapsed());

let (count, amount) = self
let (mut count, mut amount) = self
.import_utxos_to_transaction_service(found_outputs, current_height, mined_timestamp)
.await?;
let block_hash = current_header_hash.try_into()?;
self.resources.db.save_scanned_block(ScannedBlock {
if let Some(scanned_block) = prev_scanned_block {
if block_hash == scanned_block.header_hash && first_output == prev_output {
count += scanned_block.num_outputs.unwrap_or(0);
amount += scanned_block.amount.unwrap_or_else(|| 0.into())
} else {
self.resources.db.save_scanned_block(scanned_block)?;
self.resources.db.clear_scanned_blocks_before_height(
current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE),
true,
)?;

if current_height % PROGRESS_REPORT_INTERVAL == 0 {
debug!(
target: LOG_TARGET,
"Scanned up to block {} with a current tip_height of {}", current_height, tip_height
);
self.publish_event(UtxoScannerEvent::Progress {
current_height,
tip_height,
});
}

num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
}
}
prev_output = first_output;
prev_scanned_block = Some(ScannedBlock {
header_hash: block_hash,
height: current_height,
num_outputs: Some(count),
amount: Some(amount),
timestamp: Utc::now().naive_utc(),
})?;

self.resources
.db
.clear_scanned_blocks_before_height(current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), true)?;

if current_height % PROGRESS_REPORT_INTERVAL == 0 {
debug!(
target: LOG_TARGET,
"Scanned up to block {} with a current tip_height of {}", current_height, tip_height
);
self.publish_event(UtxoScannerEvent::Progress {
current_height,
tip_height,
});
}

num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
});
}
// We need to update the last one
if let Some(scanned_block) = prev_scanned_block {
self.resources.db.clear_scanned_blocks_before_height(
scanned_block.height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE),
true,
)?;
self.resources.db.save_scanned_block(scanned_block)?;
}
trace!(
target: LOG_TARGET,
Expand Down

0 comments on commit 3540309

Please sign in to comment.