Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(base-node): add number of active sync peers metric #3784

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions base_layer/core/src/base_node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ pub fn rejected_blocks(height: u64, hash: &[u8]) -> IntCounter {

METER.with_label_values(&[&height.to_string(), &to_hex(hash)])
}

pub fn active_sync_peers() -> IntGauge {
static METER: Lazy<IntGauge> = Lazy::new(|| {
tari_metrics::register_int_gauge(
"base_node::sync::active_peers",
"Number of active peers syncing from this node",
)
.unwrap()
});

METER.clone()
}
19 changes: 16 additions & 3 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tracing::{instrument, span, Instrument, Level};
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
LocalNodeCommsInterface,
},
Expand Down Expand Up @@ -95,6 +96,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {

let token = Arc::new(peer);
lock.push(Arc::downgrade(&token));
metrics::active_sync_peers().set(lock.len() as i64);
Ok(token)
}
}
Expand Down Expand Up @@ -237,6 +239,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Block sync round complete for peer `{}`.", peer_node_id,
Expand Down Expand Up @@ -325,6 +328,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Header sync round complete for peer `{}`.", session_token,
Expand Down Expand Up @@ -430,6 +434,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
&self,
request: Request<SyncKernelsRequest>,
) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
let peer_node_id = request.context().peer_node_id().clone();
let req = request.into_message();
let (tx, rx) = mpsc::channel(100);
let db = self.db();
Expand All @@ -455,6 +460,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
return Err(RpcStatus::bad_request("start header height is after end header"));
}

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
task::spawn(async move {
while current_height <= end_height {
if tx.is_closed() {
Expand Down Expand Up @@ -524,28 +530,35 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}
}

metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Kernel sync round complete for peer `{}`.", session_token,
);
});
Ok(Streaming::new(rx))
}

#[instrument(skip(self), err)]
async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
let req = request.message();
let peer = request.context().peer_node_id();
let peer_node_id = request.context().peer_node_id();
debug!(
target: LOG_TARGET,
"Received sync_utxos request from header {} to {} (start = {}, include_pruned_utxos = {}, \
include_deleted_bitmaps = {})",
peer,
peer_node_id,
req.start,
req.end_header_hash.to_hex(),
req.include_pruned_utxos,
req.include_deleted_bitmaps
);

let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let (tx, rx) = mpsc::channel(200);
let task = SyncUtxosTask::new(self.db());
task.run(request.into_message(), tx).await?;
task.run(request, tx).await?;

Ok(Streaming::new(rx))
}
Expand Down
28 changes: 19 additions & 9 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
use std::{sync::Arc, time::Instant};

use log::*;
use tari_comms::{protocol::rpc::RpcStatus, utils};
use tari_comms::{
protocol::rpc::{Request, RpcStatus},
utils,
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tokio::{sync::mpsc, task};

use crate::{
base_node::metrics,
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto,
Expand All @@ -49,12 +53,14 @@ where B: BlockchainBackend + 'static

pub(crate) async fn run(
self,
request: SyncUtxosRequest,
request: Request<SyncUtxosRequest>,
mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let peer = request.context().peer_node_id().clone();
let msg = request.into_message();
let start_header = self
.db
.fetch_header_containing_utxo_mmr(request.start)
.fetch_header_containing_utxo_mmr(msg.start)
.await
.map_err(|err| {
error!(target: LOG_TARGET, "{}", err);
Expand All @@ -67,7 +73,7 @@ where B: BlockchainBackend + 'static

let end_header = self
.db
.fetch_header_by_block_hash(request.end_header_hash.clone())
.fetch_header_by_block_hash(msg.end_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?;
Expand All @@ -81,7 +87,7 @@ where B: BlockchainBackend + 'static
}

let (skip_outputs, prev_utxo_mmr_size) = if start_header.height() == 0 {
(request.start, 0)
(msg.start, 0)
} else {
let prev_header = self
.db
Expand All @@ -90,15 +96,16 @@ where B: BlockchainBackend + 'static
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("Previous start header hash is was not found"))?;

let skip = request.start.checked_sub(prev_header.output_mmr_size)
let skip = msg.start.checked_sub(prev_header.output_mmr_size)
// This is a data inconsistency because fetch_header_containing_utxo_mmr returned the header we are basing this on
.ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, request.start)))?;
.ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, msg.start)))?;
(skip, prev_header.output_mmr_size)
};

let include_pruned_utxos = request.include_pruned_utxos;
let include_deleted_bitmaps = request.include_deleted_bitmaps;
let include_pruned_utxos = msg.include_pruned_utxos;
let include_deleted_bitmaps = msg.include_deleted_bitmaps;
task::spawn(async move {
debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer);
if let Err(err) = self
.start_streaming(
&mut tx,
Expand All @@ -111,8 +118,11 @@ where B: BlockchainBackend + 'static
)
.await
{
debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err);
let _ = tx.send(Err(err)).await;
}
debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer);
metrics::active_sync_peers().dec();
});

Ok(())
Expand Down