Skip to content

Commit

Permalink
feat(base-node): add number of active sync peers metric (#3784)
Browse files Browse the repository at this point in the history
Description
---

- Adds `tari_base_node::sync::active_peers` gauge that tracks number of peers syncing from the base node

Motivation and Context
---
Visibility into number of peers syncing from the base node

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Feb 2, 2022
1 parent 3ee1320 commit 3495e85
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
12 changes: 12 additions & 0 deletions base_layer/core/src/base_node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ pub fn rejected_blocks(height: u64, hash: &[u8]) -> IntCounter {

METER.with_label_values(&[&height.to_string(), &to_hex(hash)])
}

pub fn active_sync_peers() -> IntGauge {
static METER: Lazy<IntGauge> = Lazy::new(|| {
tari_metrics::register_int_gauge(
"base_node::sync::active_peers",
"Number of active peers syncing from this node",
)
.unwrap()
});

METER.clone()
}
19 changes: 16 additions & 3 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tracing::{instrument, span, Instrument, Level};
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
LocalNodeCommsInterface,
},
Expand Down Expand Up @@ -95,6 +96,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {

let token = Arc::new(peer);
lock.push(Arc::downgrade(&token));
metrics::active_sync_peers().set(lock.len() as i64);
Ok(token)
}
}
Expand Down Expand Up @@ -237,6 +239,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Block sync round complete for peer `{}`.", peer_node_id,
Expand Down Expand Up @@ -325,6 +328,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Header sync round complete for peer `{}`.", session_token,
Expand Down Expand Up @@ -430,6 +434,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
&self,
request: Request<SyncKernelsRequest>,
) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
let peer_node_id = request.context().peer_node_id().clone();
let req = request.into_message();
let (tx, rx) = mpsc::channel(100);
let db = self.db();
Expand All @@ -455,6 +460,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
return Err(RpcStatus::bad_request("start header height is after end header"));
}

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
task::spawn(async move {
while current_height <= end_height {
if tx.is_closed() {
Expand Down Expand Up @@ -524,28 +530,35 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Kernel sync round complete for peer `{}`.", session_token,
);
});
Ok(Streaming::new(rx))
}

#[instrument(skip(self), err)]
async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
let req = request.message();
let peer = request.context().peer_node_id();
let peer_node_id = request.context().peer_node_id();
debug!(
target: LOG_TARGET,
"Received sync_utxos request from header {} to {} (start = {}, include_pruned_utxos = {}, \
include_deleted_bitmaps = {})",
peer,
peer_node_id,
req.start,
req.end_header_hash.to_hex(),
req.include_pruned_utxos,
req.include_deleted_bitmaps
);

let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let (tx, rx) = mpsc::channel(200);
let task = SyncUtxosTask::new(self.db());
task.run(request.into_message(), tx).await?;
task.run(request, tx).await?;

Ok(Streaming::new(rx))
}
Expand Down
28 changes: 19 additions & 9 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
use std::{sync::Arc, time::Instant};

use log::*;
use tari_comms::{protocol::rpc::RpcStatus, utils};
use tari_comms::{
protocol::rpc::{Request, RpcStatus},
utils,
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tokio::{sync::mpsc, task};

use crate::{
base_node::metrics,
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto,
Expand All @@ -49,12 +53,14 @@ where B: BlockchainBackend + 'static

pub(crate) async fn run(
self,
request: SyncUtxosRequest,
request: Request<SyncUtxosRequest>,
mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let peer = request.context().peer_node_id().clone();
let msg = request.into_message();
let start_header = self
.db
.fetch_header_containing_utxo_mmr(request.start)
.fetch_header_containing_utxo_mmr(msg.start)
.await
.map_err(|err| {
error!(target: LOG_TARGET, "{}", err);
Expand All @@ -67,7 +73,7 @@ where B: BlockchainBackend + 'static

let end_header = self
.db
.fetch_header_by_block_hash(request.end_header_hash.clone())
.fetch_header_by_block_hash(msg.end_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?;
Expand All @@ -81,7 +87,7 @@ where B: BlockchainBackend + 'static
}

let (skip_outputs, prev_utxo_mmr_size) = if start_header.height() == 0 {
(request.start, 0)
(msg.start, 0)
} else {
let prev_header = self
.db
Expand All @@ -90,15 +96,16 @@ where B: BlockchainBackend + 'static
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("Previous start header hash is was not found"))?;

let skip = request.start.checked_sub(prev_header.output_mmr_size)
let skip = msg.start.checked_sub(prev_header.output_mmr_size)
// This is a data inconsistency because fetch_header_containing_utxo_mmr returned the header we are basing this on
.ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, request.start)))?;
.ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, msg.start)))?;
(skip, prev_header.output_mmr_size)
};

let include_pruned_utxos = request.include_pruned_utxos;
let include_deleted_bitmaps = request.include_deleted_bitmaps;
let include_pruned_utxos = msg.include_pruned_utxos;
let include_deleted_bitmaps = msg.include_deleted_bitmaps;
task::spawn(async move {
debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer);
if let Err(err) = self
.start_streaming(
&mut tx,
Expand All @@ -111,8 +118,11 @@ where B: BlockchainBackend + 'static
)
.await
{
debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err);
let _ = tx.send(Err(err)).await;
}
debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer);
metrics::active_sync_peers().dec();
});

Ok(())
Expand Down

0 comments on commit 3495e85

Please sign in to comment.