Skip to content

Commit

Permalink
feat: compile out the metrics (#5944)
Browse files Browse the repository at this point in the history
Description
---

## Do not merge until testing is complete

We made metric features in the applications a compiler option but core
was still including it all by default.

I had wanted to do something more elegant like swap in a mock for the
metrics system which would have hopefully required less sprinkles of
changes, but the metrics are currently being used to return types from
the metrics system and operate on them. This is still possible to phase
out but would take more time due to the need to refactor all the
existing metrics modules.

Motivation and Context
---
Metrics should be opt-in, and not on by default.

How Has This Been Tested?
---
Mobile team is testing it now.

Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify
  • Loading branch information
brianp authored Nov 13, 2023
1 parent 41e6602 commit fa2fb27
Show file tree
Hide file tree
Showing 22 changed files with 147 additions and 42 deletions.
3 changes: 2 additions & 1 deletion base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mempool_proto = []
base_node = ["tari_mmr", "transactions", "mempool_proto", "base_node_proto", "monero", "randomx-rs"]
base_node_proto = []
benches = ["base_node"]
metrics = ["tari_metrics"]

[dependencies]
tari_common = { path = "../../common" }
Expand All @@ -24,7 +25,7 @@ tari_comms = { path = "../../comms/core" }
tari_comms_dht = { path = "../../comms/dht" }
tari_comms_rpc_macros = { path = "../../comms/rpc_macros" }
tari_crypto = { version = "0.19", features = ["borsh"] }
tari_metrics = { path = "../../infrastructure/metrics" }
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
tari_mmr = { path = "../../base_layer/mmr", optional = true, features = ["native_bitmap"] }
tari_p2p = { path = "../../base_layer/p2p" }
tari_script = { path = "../../infrastructure/tari_script" }
Expand Down
62 changes: 42 additions & 20 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[cfg(feature = "metrics")]
use std::convert::{TryFrom, TryInto};
use std::{
cmp::max,
collections::HashSet,
convert::{TryFrom, TryInto},
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -35,17 +36,16 @@ use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_utilities::hex::Hex;
use tokio::sync::RwLock;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
metrics,
base_node::comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
Expand Down Expand Up @@ -619,6 +619,7 @@ where B: BlockchainBackend + 'static
.build();
return Ok(block);
}
#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand All @@ -628,6 +629,7 @@ where B: BlockchainBackend + 'static
let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();

#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);

let mut builder = BlockBuilder::new(header.version)
Expand Down Expand Up @@ -673,6 +675,7 @@ where B: BlockchainBackend + 'static
not_found.len()
);

#[cfg(feature = "metrics")]
metrics::compact_block_full_misses(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -710,6 +713,7 @@ where B: BlockchainBackend + 'static
e,
);

#[cfg(feature = "metrics")]
metrics::compact_block_mmr_mismatch(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -834,8 +838,11 @@ where B: BlockchainBackend + 'static
},

Err(e @ ChainStorageError::ValidationError { .. }) => {
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
#[cfg(feature = "metrics")]
{
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
}
warn!(
target: LOG_TARGET,
"Peer {} sent an invalid block: {}",
Expand All @@ -856,14 +863,20 @@ where B: BlockchainBackend + 'static
}
},
// SECURITY: This indicates an issue in the transaction validator.
None => metrics::rejected_local_blocks(block.header.height, &block_hash).inc(),
None => {
#[cfg(feature = "metrics")]
metrics::rejected_local_blocks(block.header.height, &block_hash).inc();
debug!(target: LOG_TARGET, "There may have been an issue in the transaction validator");
},
}
self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
Err(e.into())
},

Err(e) => {
#[cfg(feature = "metrics")]
metrics::rejected_blocks(block.header.height, &block.hash()).inc();

self.publish_block_event(BlockEvent::AddBlockErrored { block });
Err(e.into())
},
Expand Down Expand Up @@ -936,6 +949,7 @@ where B: BlockchainBackend + 'static

async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
fn update_target_difficulty(block: &ChainBlock) {
#[cfg(feature = "metrics")]
match block.header().pow_algo() {
PowAlgorithm::Sha3x => {
metrics::target_difficulty_sha()
Expand All @@ -950,25 +964,33 @@ where B: BlockchainBackend + 'static

match block_add_result {
BlockAddResult::Ok(ref block) => {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
update_target_difficulty(block);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));

#[cfg(feature = "metrics")]
{
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
},
#[allow(unused_variables)] // `removed` variable is used if metrics are compiled
BlockAddResult::ChainReorg { added, removed } => {
#[cfg(feature = "metrics")]
if let Some(fork_height) = added.last().map(|b| b.height()) {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, added.len(), removed.len()).inc();

let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
for block in added {
update_target_difficulty(block);
}
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
},
BlockAddResult::OrphanBlock => {
#[cfg(feature = "metrics")]
metrics::orphaned_blocks().inc();
},
_ => {},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod chain_metadata_service;
pub mod comms_interface;
#[cfg(feature = "base_node")]
pub use comms_interface::LocalNodeCommsInterface;
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;

#[cfg(feature = "base_node")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use std::time::Instant;

use log::*;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, HorizonStateSync, StateEvent, StateInfo, StatusInfo},
sync::{BlockSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -63,6 +64,7 @@ impl BlockSync {
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
#[cfg(feature = "metrics")]
let tip_height_metric = metrics::tip_height();
synchronizer.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
Expand All @@ -81,6 +83,7 @@ impl BlockSync {
BlockAddResult::Ok(block),
));

#[cfg(feature = "metrics")]
tip_height_metric.set(local_height as i64);
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -146,6 +147,7 @@ impl HeaderSyncState {

let local_nci = shared.local_node_interface.clone();
synchronizer.on_rewind(move |removed| {
#[cfg(feature = "metrics")]
if let Some(fork_height) = removed.last().map(|b| b.height().saturating_sub(1)) {
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, 0, removed.len()).inc();
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use tokio::{
};
use tracing::{instrument, span, Instrument, Level};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
metrics,
sync::{
header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
Expand Down Expand Up @@ -99,6 +100,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {

let token = Arc::new(peer);
lock.push(Arc::downgrade(&token));
#[cfg(feature = "metrics")]
metrics::active_sync_peers().set(lock.len() as i64);
Ok(token)
}
Expand Down Expand Up @@ -256,6 +258,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -355,6 +358,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -572,6 +576,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down
4 changes: 3 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use tari_comms::{
use tari_utilities::hex::Hex;
use tokio::{sync::mpsc, task};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::metrics,
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::{SyncUtxosRequest, SyncUtxosResponse},
Expand Down Expand Up @@ -106,6 +107,7 @@ where B: BlockchainBackend + 'static
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
});

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod rpc;
pub use rpc::create_mempool_rpc_service;
#[cfg(feature = "base_node")]
pub use rpc::{MempoolRpcClient, MempoolRpcServer, MempoolRpcService, MempoolService};
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;
#[cfg(feature = "base_node")]
mod shrink_hashmap;
Expand Down
5 changes: 4 additions & 1 deletion base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use log::*;
use tari_comms::peer_manager::NodeId;
use tari_utilities::hex::Hex;

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored},
chain_storage::BlockAddResult,
mempool::{
metrics,
service::{MempoolRequest, MempoolResponse, MempoolServiceError, OutboundMempoolServiceInterface},
Mempool,
TxStorageResponse,
Expand Down Expand Up @@ -135,6 +136,7 @@ impl MempoolInboundHandlers {
}
match self.mempool.insert(tx.clone()).await {
Ok(tx_storage) => {
#[cfg(feature = "metrics")]
if tx_storage.is_stored() {
metrics::inbound_transactions(source_peer.as_ref()).inc();
} else {
Expand Down Expand Up @@ -164,6 +166,7 @@ impl MempoolInboundHandlers {

#[allow(clippy::cast_possible_wrap)]
async fn update_pool_size_metrics(&self) {
#[cfg(feature = "metrics")]
if let Ok(stats) = self.mempool.stats().await {
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
metrics::reorg_pool_size().set(stats.reorg_txs as i64);
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ use tokio::{
time,
};

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
mempool::{proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
};
Expand Down Expand Up @@ -544,6 +546,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_possible_wrap)]
#[cfg(feature = "metrics")]
{
let stats = self.mempool.stats().await?;
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
Expand Down Expand Up @@ -580,6 +583,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

let stored_result = self.mempool.insert(txn).await?;
if stored_result.is_stored() {
#[cfg(feature = "metrics")]
metrics::inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand All @@ -588,6 +592,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin
self.peer_node_id.short_str()
);
} else {
#[cfg(feature = "metrics")]
metrics::rejected_inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand Down
4 changes: 2 additions & 2 deletions comms/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ edition = "2018"

[dependencies]
tari_crypto = { version = "0.19" }
tari_metrics = { path = "../../infrastructure/metrics" }
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
tari_storage = { path = "../../infrastructure/storage" }
tari_shutdown = { path = "../../infrastructure/shutdown" }
tari_utilities = { version = "0.6" }
Expand Down Expand Up @@ -63,5 +63,5 @@ tari_common = { path = "../../common", features = ["build"] }

[features]
c_integration = []
metrics = []
metrics = ["tari_metrics"]
rpc = ["tower/make", "tower/util"]
Loading

0 comments on commit fa2fb27

Please sign in to comment.