diff --git a/base_layer/core/src/base_node/metrics.rs b/base_layer/core/src/base_node/metrics.rs index 2129e8fb03..b78b5b8dc7 100644 --- a/base_layer/core/src/base_node/metrics.rs +++ b/base_layer/core/src/base_node/metrics.rs @@ -104,3 +104,15 @@ pub fn rejected_blocks(height: u64, hash: &[u8]) -> IntCounter { METER.with_label_values(&[&height.to_string(), &to_hex(hash)]) } + +pub fn active_sync_peers() -> IntGauge { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_gauge( + "base_node::sync::active_peers", + "Number of active peers syncing from this node", + ) + .unwrap() + }); + + METER.clone() +} diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 891775ebc2..0ebac3f626 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -43,6 +43,7 @@ use tracing::{instrument, span, Instrument, Level}; use crate::{ base_node::{ comms_interface::BlockEvent, + metrics, sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService}, LocalNodeCommsInterface, }, @@ -95,6 +96,7 @@ impl BaseNodeSyncRpcService { let token = Arc::new(peer); lock.push(Arc::downgrade(&token)); + metrics::active_sync_peers().set(lock.len() as i64); Ok(token) } } @@ -237,6 +239,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, "Block sync round complete for peer `{}`.", peer_node_id, @@ -325,6 +328,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, "Header sync round complete for peer `{}`.", session_token, @@ -430,6 +434,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ &self, request: Request, ) -> Result, RpcStatus> { + let peer_node_id = request.context().peer_node_id().clone(); let req = request.into_message(); let (tx, rx) = mpsc::channel(100); let db = self.db(); @@ -455,6 +460,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ return Err(RpcStatus::bad_request("start header height is after end header")); } + let session_token = self.try_add_exclusive_session(peer_node_id).await?; task::spawn(async move { while current_height <= end_height { if tx.is_closed() { @@ -524,6 +530,12 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } } + + metrics::active_sync_peers().dec(); + debug!( + target: LOG_TARGET, + "Kernel sync round complete for peer `{}`.", session_token, + ); }); Ok(Streaming::new(rx)) } @@ -531,21 +543,22 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ #[instrument(skip(self), err)] async fn sync_utxos(&self, request: Request) -> Result, RpcStatus> { let req = request.message(); - let peer = request.context().peer_node_id(); + let peer_node_id = request.context().peer_node_id(); debug!( target: LOG_TARGET, "Received sync_utxos request from header {} to {} (start = {}, include_pruned_utxos = {}, \ include_deleted_bitmaps = {})", - peer, + peer_node_id, req.start, req.end_header_hash.to_hex(), req.include_pruned_utxos, req.include_deleted_bitmaps ); + let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?; let (tx, rx) = mpsc::channel(200); let task = SyncUtxosTask::new(self.db()); - task.run(request.into_message(), tx).await?; + task.run(request, tx).await?; Ok(Streaming::new(rx)) } diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index db1e46f94b..22326ae14f 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -23,11 +23,15 @@ use std::{sync::Arc, time::Instant}; use log::*; -use tari_comms::{protocol::rpc::RpcStatus, utils}; +use tari_comms::{ + protocol::rpc::{Request, RpcStatus}, + utils, +}; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; use tokio::{sync::mpsc, task}; use crate::{ + base_node::metrics, blocks::BlockHeader, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, proto, @@ -49,12 +53,14 @@ where B: BlockchainBackend + 'static pub(crate) async fn run( self, - request: SyncUtxosRequest, + request: Request, mut tx: mpsc::Sender>, ) -> Result<(), RpcStatus> { + let peer = request.context().peer_node_id().clone(); + let msg = request.into_message(); let start_header = self .db - .fetch_header_containing_utxo_mmr(request.start) + .fetch_header_containing_utxo_mmr(msg.start) .await .map_err(|err| { error!(target: LOG_TARGET, "{}", err); @@ -67,7 +73,7 @@ where B: BlockchainBackend + 'static let end_header = self .db - .fetch_header_by_block_hash(request.end_header_hash.clone()) + .fetch_header_by_block_hash(msg.end_header_hash.clone()) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?; @@ -81,7 +87,7 @@ where B: BlockchainBackend + 'static } let (skip_outputs, prev_utxo_mmr_size) = if start_header.height() == 0 { - (request.start, 0) + (msg.start, 0) } else { let prev_header = self .db @@ -90,15 +96,16 @@ where B: BlockchainBackend + 'static .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found("Previous start header hash is was not found"))?; - let skip = request.start.checked_sub(prev_header.output_mmr_size) + let skip = msg.start.checked_sub(prev_header.output_mmr_size) // This is a data inconsistency because fetch_header_containing_utxo_mmr returned the header we are basing this on - .ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, request.start)))?; + .ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, msg.start)))?; (skip, prev_header.output_mmr_size) }; - let include_pruned_utxos = request.include_pruned_utxos; - let include_deleted_bitmaps = request.include_deleted_bitmaps; + let include_pruned_utxos = msg.include_pruned_utxos; + let include_deleted_bitmaps = msg.include_deleted_bitmaps; task::spawn(async move { + debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer); if let Err(err) = self .start_streaming( &mut tx, @@ -111,8 +118,11 @@ where B: BlockchainBackend + 'static ) .await { + debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err); let _ = tx.send(Err(err)).await; } + debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer); + metrics::active_sync_peers().dec(); }); Ok(())