diff --git a/client/informant/src/lib.rs b/client/informant/src/lib.rs index 3daf29a9f7837..a1f0ba9ae5fac 100644 --- a/client/informant/src/lib.rs +++ b/client/informant/src/lib.rs @@ -23,7 +23,7 @@ use futures::prelude::*; use log::{info, trace, warn}; use parity_util_mem::MallocSizeOf; use sc_client_api::{BlockchainEvents, UsageProvider}; -use sc_network::{network_state::NetworkState, NetworkStatus}; +use sc_network::NetworkStatus; use sp_blockchain::HeaderMetadata; use sp_runtime::traits::{Block as BlockT, Header}; use sp_transaction_pool::TransactionPool; @@ -81,7 +81,7 @@ impl TransactionPoolAndMaybeMallogSizeOf for /// Builds the informant and returns a `Future` that drives the informant. pub fn build( client: Arc, - network_status_sinks: Arc, NetworkState)>>, + network_status_sinks: Arc>>, pool: Arc, format: OutputFormat, ) -> impl futures::Future @@ -96,7 +96,7 @@ where network_status_sinks.push(Duration::from_millis(5000), network_status_sink); let display_notifications = network_status_stream - .for_each(move |(net_status, _)| { + .for_each(move |net_status| { let info = client_1.usage_info(); if let Some(ref usage) = info.usage { trace!(target: "usage", "Usage statistics: {}", usage); diff --git a/client/network/src/network_state.rs b/client/network/src/network_state.rs index 2e24e9c5a9f58..db2b6429304bb 100644 --- a/client/network/src/network_state.rs +++ b/client/network/src/network_state.rs @@ -43,10 +43,6 @@ pub struct NetworkState { pub connected_peers: HashMap, /// List of node that we know of but that we're not connected to. pub not_connected_peers: HashMap, - /// The total number of bytes received. - pub total_bytes_inbound: u64, - /// The total number of bytes sent. - pub total_bytes_outbound: u64, /// State of the peerset manager. pub peerset: serde_json::Value, } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index a3ac8371dc739..7d97719f51dc1 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -28,7 +28,7 @@ //! which is then processed by [`NetworkWorker::poll`]. use crate::{ - ExHashT, NetworkStateInfo, + ExHashT, NetworkStateInfo, NetworkStatus, behaviour::{self, Behaviour, BehaviourOut}, config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig}, DhtEvent, @@ -49,12 +49,8 @@ use libp2p::kad::record; use libp2p::ping::handler::PingFailure; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError}; use log::{error, info, trace, warn}; +use metrics::{Metrics, MetricSources, Histogram, HistogramVec}; use parking_lot::Mutex; -use prometheus_endpoint::{ - register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, - PrometheusError, Registry, U64, - SourcedCounter, MetricSource -}; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; use sp_runtime::{ @@ -80,6 +76,7 @@ use wasm_timer::Instant; pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure}; +mod metrics; mod out_events; #[cfg(test)] mod tests; @@ -365,10 +362,11 @@ impl NetworkWorker { // Initialize the metrics. let metrics = match ¶ms.metrics_registry { Some(registry) => { - // Sourced metrics. - BandwidthCounters::register(registry, bandwidth.clone())?; - // Other (i.e. new) metrics. - Some(Metrics::register(registry)?) + Some(metrics::register(registry, MetricSources { + bandwidth: bandwidth.clone(), + major_syncing: is_major_syncing.clone(), + connected_peers: num_connected.clone(), + })?) } None => None }; @@ -423,6 +421,19 @@ impl NetworkWorker { }) } + /// High-level network status information. + pub fn status(&self) -> NetworkStatus { + NetworkStatus { + sync_state: self.sync_state(), + best_seen_block: self.best_seen_block(), + num_sync_peers: self.num_sync_peers(), + num_connected_peers: self.num_connected_peers(), + num_active_peers: self.num_active_peers(), + total_bytes_inbound: self.total_bytes_inbound(), + total_bytes_outbound: self.total_bytes_outbound(), + } + } + /// Returns the total number of bytes received so far. pub fn total_bytes_inbound(&self) -> u64 { self.service.bandwidth.total_inbound() @@ -562,8 +573,6 @@ impl NetworkWorker { peer_id: Swarm::::local_peer_id(&swarm).to_base58(), listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), - total_bytes_inbound: self.service.bandwidth.total_inbound(), - total_bytes_outbound: self.service.bandwidth.total_outbound(), connected_peers, not_connected_peers, peerset: swarm.user_protocol_mut().peerset_debug_info(), @@ -1204,265 +1213,6 @@ pub struct NetworkWorker { peers_notifications_sinks: Arc>>, } -struct Metrics { - // This list is ordered alphabetically - connections_closed_total: CounterVec, - connections_opened_total: CounterVec, - distinct_peers_connections_closed_total: Counter, - distinct_peers_connections_opened_total: Counter, - import_queue_blocks_submitted: Counter, - import_queue_finality_proofs_submitted: Counter, - import_queue_justifications_submitted: Counter, - incoming_connections_errors_total: CounterVec, - incoming_connections_total: Counter, - is_major_syncing: Gauge, - issued_light_requests: Counter, - kademlia_query_duration: HistogramVec, - kademlia_random_queries_total: CounterVec, - kademlia_records_count: GaugeVec, - kademlia_records_sizes_total: GaugeVec, - kbuckets_num_nodes: GaugeVec, - listeners_local_addresses: Gauge, - listeners_errors_total: Counter, - notifications_sizes: HistogramVec, - notifications_streams_closed_total: CounterVec, - notifications_streams_opened_total: CounterVec, - peers_count: Gauge, - peerset_num_discovered: Gauge, - peerset_num_requested: Gauge, - pending_connections: Gauge, - pending_connections_errors_total: CounterVec, - requests_in_failure_total: CounterVec, - requests_in_success_total: HistogramVec, - requests_out_failure_total: CounterVec, - requests_out_success_total: HistogramVec, - requests_out_started_total: CounterVec, -} - -/// The source for bandwidth metrics. -#[derive(Clone)] -struct BandwidthCounters(Arc); - -impl BandwidthCounters { - fn register(registry: &Registry, sinks: Arc) - -> Result<(), PrometheusError> - { - register(SourcedCounter::new( - &Opts::new( - "sub_libp2p_network_bytes_total", - "Total bandwidth usage" - ).variable_label("direction"), - BandwidthCounters(sinks), - )?, registry)?; - - Ok(()) - } -} - -impl MetricSource for BandwidthCounters { - type N = u64; - - fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { - set(&[&"in"], self.0.total_inbound()); - set(&[&"out"], self.0.total_outbound()); - } -} - -impl Metrics { - fn register(registry: &Registry) -> Result { - Ok(Self { - // This list is ordered alphabetically - connections_closed_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_connections_closed_total", - "Total number of connections closed, by direction and reason" - ), - &["direction", "reason"] - )?, registry)?, - connections_opened_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_connections_opened_total", - "Total number of connections opened by direction" - ), - &["direction"] - )?, registry)?, - distinct_peers_connections_closed_total: register(Counter::new( - "sub_libp2p_distinct_peers_connections_closed_total", - "Total number of connections closed with distinct peers" - )?, registry)?, - distinct_peers_connections_opened_total: register(Counter::new( - "sub_libp2p_distinct_peers_connections_opened_total", - "Total number of connections opened with distinct peers" - )?, registry)?, - import_queue_blocks_submitted: register(Counter::new( - "import_queue_blocks_submitted", - "Number of blocks submitted to the import queue.", - )?, registry)?, - import_queue_finality_proofs_submitted: register(Counter::new( - "import_queue_finality_proofs_submitted", - "Number of finality proofs submitted to the import queue.", - )?, registry)?, - import_queue_justifications_submitted: register(Counter::new( - "import_queue_justifications_submitted", - "Number of justifications submitted to the import queue.", - )?, registry)?, - incoming_connections_errors_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_incoming_connections_handshake_errors_total", - "Total number of incoming connections that have failed during the \ - initial handshake" - ), - &["reason"] - )?, registry)?, - incoming_connections_total: register(Counter::new( - "sub_libp2p_incoming_connections_total", - "Total number of incoming connections on the listening sockets" - )?, registry)?, - is_major_syncing: register(Gauge::new( - "sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.", - )?, registry)?, - issued_light_requests: register(Counter::new( - "issued_light_requests", - "Number of light client requests that our node has issued.", - )?, registry)?, - kademlia_query_duration: register(HistogramVec::new( - HistogramOpts { - common_opts: Opts::new( - "sub_libp2p_kademlia_query_duration", - "Duration of Kademlia queries per query type" - ), - buckets: prometheus_endpoint::exponential_buckets(0.5, 2.0, 10) - .expect("parameters are always valid values; qed"), - }, - &["type"] - )?, registry)?, - kademlia_random_queries_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_kademlia_random_queries_total", - "Number of random Kademlia queries started" - ), - &["protocol"] - )?, registry)?, - kademlia_records_count: register(GaugeVec::new( - Opts::new( - "sub_libp2p_kademlia_records_count", - "Number of records in the Kademlia records store" - ), - &["protocol"] - )?, registry)?, - kademlia_records_sizes_total: register(GaugeVec::new( - Opts::new( - "sub_libp2p_kademlia_records_sizes_total", - "Total size of all the records in the Kademlia records store" - ), - &["protocol"] - )?, registry)?, - kbuckets_num_nodes: register(GaugeVec::new( - Opts::new( - "sub_libp2p_kbuckets_num_nodes", - "Number of nodes in the Kademlia k-buckets" - ), - &["protocol"] - )?, registry)?, - listeners_local_addresses: register(Gauge::new( - "sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on" - )?, registry)?, - listeners_errors_total: register(Counter::new( - "sub_libp2p_listeners_errors_total", - "Total number of non-fatal errors reported by a listener" - )?, registry)?, - notifications_sizes: register(HistogramVec::new( - HistogramOpts { - common_opts: Opts::new( - "sub_libp2p_notifications_sizes", - "Sizes of the notifications send to and received from all nodes" - ), - buckets: prometheus_endpoint::exponential_buckets(64.0, 4.0, 8) - .expect("parameters are always valid values; qed"), - }, - &["direction", "protocol"] - )?, registry)?, - notifications_streams_closed_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_notifications_streams_closed_total", - "Total number of notification substreams that have been closed" - ), - &["protocol"] - )?, registry)?, - notifications_streams_opened_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_notifications_streams_opened_total", - "Total number of notification substreams that have been opened" - ), - &["protocol"] - )?, registry)?, - peers_count: register(Gauge::new( - "sub_libp2p_peers_count", "Number of network gossip peers", - )?, registry)?, - peerset_num_discovered: register(Gauge::new( - "sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager", - )?, registry)?, - peerset_num_requested: register(Gauge::new( - "sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to", - )?, registry)?, - pending_connections: register(Gauge::new( - "sub_libp2p_pending_connections", - "Number of connections in the process of being established", - )?, registry)?, - pending_connections_errors_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_pending_connections_errors_total", - "Total number of pending connection errors" - ), - &["reason"] - )?, registry)?, - requests_in_failure_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_requests_in_failure_total", - "Total number of incoming requests that the node has failed to answer" - ), - &["protocol", "reason"] - )?, registry)?, - requests_in_success_total: register(HistogramVec::new( - HistogramOpts { - common_opts: Opts::new( - "sub_libp2p_requests_in_success_total", - "Total number of requests received and answered" - ), - buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) - .expect("parameters are always valid values; qed"), - }, - &["protocol"] - )?, registry)?, - requests_out_failure_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_requests_out_failure_total", - "Total number of requests that have failed" - ), - &["protocol", "reason"] - )?, registry)?, - requests_out_success_total: register(HistogramVec::new( - HistogramOpts { - common_opts: Opts::new( - "sub_libp2p_requests_out_success_total", - "For successful requests, time between a request's start and finish" - ), - buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) - .expect("parameters are always valid values; qed"), - }, - &["protocol"] - )?, registry)?, - requests_out_started_total: register(CounterVec::new( - Opts::new( - "sub_libp2p_requests_out_started_total", - "Total number of requests emitted" - ), - &["protocol"] - )?, registry)?, - }) - } -} - impl Future for NetworkWorker { type Output = (); @@ -1931,7 +1681,6 @@ impl Future for NetworkWorker { this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { - metrics.is_major_syncing.set(is_major_syncing as u64); for (proto, num_entries) in this.network_service.num_kbuckets_entries() { metrics.kbuckets_num_nodes.with_label_values(&[&proto.as_ref()]).set(num_entries as u64); } @@ -1941,7 +1690,6 @@ impl Future for NetworkWorker { for (proto, num_entries) in this.network_service.kademlia_records_total_size() { metrics.kademlia_records_sizes_total.with_label_values(&[&proto.as_ref()]).set(num_entries as u64); } - metrics.peers_count.set(num_connected_peers as u64); metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64); metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64); metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64); diff --git a/client/network/src/service/metrics.rs b/client/network/src/service/metrics.rs new file mode 100644 index 0000000000000..bbb0ba8056615 --- /dev/null +++ b/client/network/src/service/metrics.rs @@ -0,0 +1,358 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::transport::BandwidthSinks; +use prometheus_endpoint::{ + self as prometheus, + Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, + PrometheusError, Registry, U64, Opts, + SourcedCounter, SourcedGauge, MetricSource, +}; +use std::{ + str, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, +}; + +pub use prometheus_endpoint::{Histogram, HistogramVec}; + +/// Registers all networking metrics with the given registry. +pub fn register(registry: &Registry, sources: MetricSources) -> Result { + BandwidthCounters::register(registry, sources.bandwidth)?; + MajorSyncingGauge::register(registry, sources.major_syncing)?; + NumConnectedGauge::register(registry, sources.connected_peers)?; + Metrics::register(registry) +} + +/// Predefined metric sources that are fed directly into prometheus. +pub struct MetricSources { + pub bandwidth: Arc, + pub major_syncing: Arc, + pub connected_peers: Arc, +} + +/// Dedicated metrics. +pub struct Metrics { + // This list is ordered alphabetically + pub connections_closed_total: CounterVec, + pub connections_opened_total: CounterVec, + pub distinct_peers_connections_closed_total: Counter, + pub distinct_peers_connections_opened_total: Counter, + pub import_queue_blocks_submitted: Counter, + pub import_queue_finality_proofs_submitted: Counter, + pub import_queue_justifications_submitted: Counter, + pub incoming_connections_errors_total: CounterVec, + pub incoming_connections_total: Counter, + pub issued_light_requests: Counter, + pub kademlia_query_duration: HistogramVec, + pub kademlia_random_queries_total: CounterVec, + pub kademlia_records_count: GaugeVec, + pub kademlia_records_sizes_total: GaugeVec, + pub kbuckets_num_nodes: GaugeVec, + pub listeners_local_addresses: Gauge, + pub listeners_errors_total: Counter, + pub notifications_sizes: HistogramVec, + pub notifications_streams_closed_total: CounterVec, + pub notifications_streams_opened_total: CounterVec, + pub peerset_num_discovered: Gauge, + pub peerset_num_requested: Gauge, + pub pending_connections: Gauge, + pub pending_connections_errors_total: CounterVec, + pub requests_in_failure_total: CounterVec, + pub requests_in_success_total: HistogramVec, + pub requests_out_failure_total: CounterVec, + pub requests_out_success_total: HistogramVec, + pub requests_out_started_total: CounterVec, +} + +impl Metrics { + fn register(registry: &Registry) -> Result { + Ok(Self { + // This list is ordered alphabetically + connections_closed_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_connections_closed_total", + "Total number of connections closed, by direction and reason" + ), + &["direction", "reason"] + )?, registry)?, + connections_opened_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_connections_opened_total", + "Total number of connections opened by direction" + ), + &["direction"] + )?, registry)?, + distinct_peers_connections_closed_total: prometheus::register(Counter::new( + "sub_libp2p_distinct_peers_connections_closed_total", + "Total number of connections closed with distinct peers" + )?, registry)?, + distinct_peers_connections_opened_total: prometheus::register(Counter::new( + "sub_libp2p_distinct_peers_connections_opened_total", + "Total number of connections opened with distinct peers" + )?, registry)?, + import_queue_blocks_submitted: prometheus::register(Counter::new( + "import_queue_blocks_submitted", + "Number of blocks submitted to the import queue.", + )?, registry)?, + import_queue_finality_proofs_submitted: prometheus::register(Counter::new( + "import_queue_finality_proofs_submitted", + "Number of finality proofs submitted to the import queue.", + )?, registry)?, + import_queue_justifications_submitted: prometheus::register(Counter::new( + "import_queue_justifications_submitted", + "Number of justifications submitted to the import queue.", + )?, registry)?, + incoming_connections_errors_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_incoming_connections_handshake_errors_total", + "Total number of incoming connections that have failed during the \ + initial handshake" + ), + &["reason"] + )?, registry)?, + incoming_connections_total: prometheus::register(Counter::new( + "sub_libp2p_incoming_connections_total", + "Total number of incoming connections on the listening sockets" + )?, registry)?, + issued_light_requests: prometheus::register(Counter::new( + "issued_light_requests", + "Number of light client requests that our node has issued.", + )?, registry)?, + kademlia_query_duration: prometheus::register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_kademlia_query_duration", + "Duration of Kademlia queries per query type" + ), + buckets: prometheus::exponential_buckets(0.5, 2.0, 10) + .expect("parameters are always valid values; qed"), + }, + &["type"] + )?, registry)?, + kademlia_random_queries_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_kademlia_random_queries_total", + "Number of random Kademlia queries started" + ), + &["protocol"] + )?, registry)?, + kademlia_records_count: prometheus::register(GaugeVec::new( + Opts::new( + "sub_libp2p_kademlia_records_count", + "Number of records in the Kademlia records store" + ), + &["protocol"] + )?, registry)?, + kademlia_records_sizes_total: prometheus::register(GaugeVec::new( + Opts::new( + "sub_libp2p_kademlia_records_sizes_total", + "Total size of all the records in the Kademlia records store" + ), + &["protocol"] + )?, registry)?, + kbuckets_num_nodes: prometheus::register(GaugeVec::new( + Opts::new( + "sub_libp2p_kbuckets_num_nodes", + "Number of nodes in the Kademlia k-buckets" + ), + &["protocol"] + )?, registry)?, + listeners_local_addresses: prometheus::register(Gauge::new( + "sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on" + )?, registry)?, + listeners_errors_total: prometheus::register(Counter::new( + "sub_libp2p_listeners_errors_total", + "Total number of non-fatal errors reported by a listener" + )?, registry)?, + notifications_sizes: prometheus::register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_notifications_sizes", + "Sizes of the notifications send to and received from all nodes" + ), + buckets: prometheus::exponential_buckets(64.0, 4.0, 8) + .expect("parameters are always valid values; qed"), + }, + &["direction", "protocol"] + )?, registry)?, + notifications_streams_closed_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_notifications_streams_closed_total", + "Total number of notification substreams that have been closed" + ), + &["protocol"] + )?, registry)?, + notifications_streams_opened_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_notifications_streams_opened_total", + "Total number of notification substreams that have been opened" + ), + &["protocol"] + )?, registry)?, + peerset_num_discovered: prometheus::register(Gauge::new( + "sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager", + )?, registry)?, + peerset_num_requested: prometheus::register(Gauge::new( + "sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to", + )?, registry)?, + pending_connections: prometheus::register(Gauge::new( + "sub_libp2p_pending_connections", + "Number of connections in the process of being established", + )?, registry)?, + pending_connections_errors_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_pending_connections_errors_total", + "Total number of pending connection errors" + ), + &["reason"] + )?, registry)?, + requests_in_failure_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_in_failure_total", + "Total number of incoming requests that the node has failed to answer" + ), + &["protocol", "reason"] + )?, registry)?, + requests_in_success_total: prometheus::register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_requests_in_success_total", + "Total number of requests received and answered" + ), + buckets: prometheus::exponential_buckets(0.001, 2.0, 16) + .expect("parameters are always valid values; qed"), + }, + &["protocol"] + )?, registry)?, + requests_out_failure_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_out_failure_total", + "Total number of requests that have failed" + ), + &["protocol", "reason"] + )?, registry)?, + requests_out_success_total: prometheus::register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_requests_out_success_total", + "For successful requests, time between a request's start and finish" + ), + buckets: prometheus::exponential_buckets(0.001, 2.0, 16) + .expect("parameters are always valid values; qed"), + }, + &["protocol"] + )?, registry)?, + requests_out_started_total: prometheus::register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_out_started_total", + "Total number of requests emitted" + ), + &["protocol"] + )?, registry)?, + }) + } +} + +/// The bandwidth counter metric. +#[derive(Clone)] +pub struct BandwidthCounters(Arc); + +impl BandwidthCounters { + /// Registers the `BandwidthCounters` metric whose values are + /// obtained from the given sinks. + fn register(registry: &Registry, sinks: Arc) -> Result<(), PrometheusError> { + prometheus::register(SourcedCounter::new( + &Opts::new( + "sub_libp2p_network_bytes_total", + "Total bandwidth usage" + ).variable_label("direction"), + BandwidthCounters(sinks), + )?, registry)?; + + Ok(()) + } +} + +impl MetricSource for BandwidthCounters { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[&"in"], self.0.total_inbound()); + set(&[&"out"], self.0.total_outbound()); + } +} + +/// The "major syncing" metric. +#[derive(Clone)] +pub struct MajorSyncingGauge(Arc); + +impl MajorSyncingGauge { + /// Registers the `MajorSyncGauge` metric whose value is + /// obtained from the given `AtomicBool`. + fn register(registry: &Registry, value: Arc) -> Result<(), PrometheusError> { + prometheus::register(SourcedGauge::new( + &Opts::new( + "sub_libp2p_is_major_syncing", + "Whether the node is performing a major sync or not.", + ), + MajorSyncingGauge(value), + )?, registry)?; + + Ok(()) + } +} + +impl MetricSource for MajorSyncingGauge { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[], self.0.load(Ordering::Relaxed) as u64); + } +} + +/// The connected peers metric. +#[derive(Clone)] +pub struct NumConnectedGauge(Arc); + +impl NumConnectedGauge { + /// Registers the `MajorSyncingGauge` metric whose value is + /// obtained from the given `AtomicUsize`. + fn register(registry: &Registry, value: Arc) -> Result<(), PrometheusError> { + prometheus::register(SourcedGauge::new( + &Opts::new( + "sub_libp2p_peers_count", + "Number of connected peers", + ), + NumConnectedGauge(value), + )?, registry)?; + + Ok(()) + } +} + +impl MetricSource for NumConnectedGauge { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[], self.0.load(Ordering::Relaxed) as u64); + } +} + diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index dfe1fcc415159..099504bb009e6 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -87,8 +87,6 @@ fn api>>(sync: T) -> System { external_addresses: Default::default(), connected_peers: Default::default(), not_connected_peers: Default::default(), - total_bytes_inbound: 0, - total_bytes_outbound: 0, peerset: serde_json::Value::Null, }).unwrap()); }, @@ -282,8 +280,6 @@ fn system_network_state() { external_addresses: Default::default(), connected_peers: Default::default(), not_connected_peers: Default::default(), - total_bytes_inbound: 0, - total_bytes_outbound: 0, peerset: serde_json::Value::Null, } ); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5faf0899aa2e3..f4046ab722ba7 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -17,10 +17,10 @@ // along with this program. If not, see . use crate::{ - NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, + error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, TelemetryConnectionSinks, RpcHandlers, NetworkStatusSinks, start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle, - status_sinks, metrics::MetricsService, + metrics::MetricsService, client::{light, Client, ClientConfig}, config::{Configuration, KeystoreConfig, PrometheusConfig}, }; @@ -472,7 +472,9 @@ pub fn spawn_tasks( transaction_pool, rpc_extensions_builder, remote_blockchain, - network, network_status_sinks, system_rpc_tx, + network, + network_status_sinks, + system_rpc_tx, telemetry_connection_sinks, } = params; @@ -521,15 +523,13 @@ pub fn spawn_tasks( MetricsService::new() }; - // Periodically notify the telemetry. - spawn_handle.spawn("telemetry-periodic-send", telemetry_periodic_send( - client.clone(), transaction_pool.clone(), metrics_service, network_status_sinks.clone() - )); - - // Periodically send the network state to the telemetry. - spawn_handle.spawn( - "telemetry-periodic-network-state", - telemetry_periodic_network_state(network_status_sinks.clone()), + // Periodically updated metrics and telemetry updates. + spawn_handle.spawn("telemetry-periodic-send", + metrics_service.run( + client.clone(), + transaction_pool.clone(), + network_status_sinks.clone() + ) ); // RPC @@ -574,7 +574,7 @@ pub fn spawn_tasks( // Spawn informant task spawn_handle.spawn("informant", sc_informant::build( client.clone(), - network_status_sinks.clone().0, + network_status_sinks.status.clone(), transaction_pool.clone(), config.informant_output_format, )); @@ -606,47 +606,6 @@ async fn transaction_notifications( .await; } -// Periodically notify the telemetry. -async fn telemetry_periodic_send( - client: Arc, - transaction_pool: Arc, - mut metrics_service: MetricsService, - network_status_sinks: NetworkStatusSinks, -) - where - TBl: BlockT, - TCl: ProvideRuntimeApi + UsageProvider, - TExPool: MaintainedTransactionPool::Hash>, -{ - let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.0.push(std::time::Duration::from_millis(5000), state_tx); - state_rx.for_each(move |(net_status, _)| { - let info = client.usage_info(); - metrics_service.tick( - &info, - &transaction_pool.status(), - &net_status, - ); - ready(()) - }).await; -} - -async fn telemetry_periodic_network_state( - network_status_sinks: NetworkStatusSinks, -) { - // Periodically send the network state to the telemetry. - let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.0.push(std::time::Duration::from_secs(30), netstat_tx); - netstat_rx.for_each(move |(_, network_state)| { - telemetry!( - SUBSTRATE_INFO; - "system.network_state"; - "state" => network_state, - ); - ready(()) - }).await; -} - fn build_telemetry( config: &mut Configuration, endpoints: sc_telemetry::TelemetryEndpoints, @@ -887,7 +846,7 @@ pub fn build_network( let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); - let network_status_sinks = NetworkStatusSinks::new(Arc::new(status_sinks::StatusSinks::new())); + let network_status_sinks = NetworkStatusSinks::new(); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index d19b9f5ea247d..fac09beb8bd60 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -126,24 +126,37 @@ impl RpcHandlers { /// Sinks to propagate network status updates. /// For each element, every time the `Interval` fires we push an element on the sender. #[derive(Clone)] -pub struct NetworkStatusSinks( - Arc, NetworkState)>>, -); +pub struct NetworkStatusSinks { + status: Arc>>, + state: Arc>, +} impl NetworkStatusSinks { - fn new( - sinks: Arc, NetworkState)>> - ) -> Self { - Self(sinks) + fn new() -> Self { + Self { + status: Arc::new(status_sinks::StatusSinks::new()), + state: Arc::new(status_sinks::StatusSinks::new()), + } } - /// Returns a receiver that periodically receives a status of the network. - pub fn network_status(&self, interval: Duration) - -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { + /// Returns a receiver that periodically yields a [`NetworkStatus`]. + pub fn status_stream(&self, interval: Duration) + -> TracingUnboundedReceiver> + { let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.0.push(interval, sink); + self.status.push(interval, sink); + stream + } + + /// Returns a receiver that periodically yields a [`NetworkState`]. + pub fn state_stream(&self, interval: Duration) + -> TracingUnboundedReceiver + { + let (sink, stream) = tracing_unbounded("mpsc_network_state"); + self.state.push(interval, sink); stream } + } /// Sinks to propagate telemetry connection established events. @@ -319,20 +332,16 @@ async fn build_network_future< // the network. _ = (&mut network).fuse() => {} - // At a regular interval, we send the state of the network on what is called - // the "status sinks". - ready_sink = status_sinks.0.next().fuse() => { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - total_bytes_inbound: network.total_bytes_inbound(), - total_bytes_outbound: network.total_bytes_outbound(), - }; - let state = network.network_state(); - ready_sink.send((status, state)); + // At a regular interval, we send high-level status as well as + // detailed state information of the network on what are called + // "status sinks". + + status_sink = status_sinks.status.next().fuse() => { + status_sink.send(network.status()); + } + + state_sink = status_sinks.state.next().fuse() => { + state_sink.send(network.network_state()); } } } diff --git a/client/service/src/metrics.rs b/client/service/src/metrics.rs index 90a77667581bf..0af393b53f517 100644 --- a/client/service/src/metrics.rs +++ b/client/service/src/metrics.rs @@ -18,14 +18,19 @@ use std::{convert::TryFrom, time::SystemTime}; -use crate::{NetworkStatus, config::Configuration}; +use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration}; +use futures_timer::Delay; use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec}; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; +use sp_api::ProvideRuntimeApi; use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto}; -use sp_transaction_pool::PoolStatus; +use sp_transaction_pool::{PoolStatus, MaintainedTransactionPool}; use sp_utils::metrics::register_globals; -use sc_client_api::ClientInfo; +use sp_utils::mpsc::TracingUnboundedReceiver; +use sc_client_api::{ClientInfo, UsageProvider}; use sc_network::config::Role; +use std::sync::Arc; +use std::time::Duration; use wasm_timer::Instant; struct PrometheusMetrics { @@ -99,6 +104,9 @@ impl PrometheusMetrics { } } +/// A `MetricsService` periodically sends general client and +/// network state to the telemetry as well as (optionally) +/// a Prometheus endpoint. pub struct MetricsService { metrics: Option, last_update: Instant, @@ -107,6 +115,8 @@ pub struct MetricsService { } impl MetricsService { + /// Creates a `MetricsService` that only sends information + /// to the telemetry. pub fn new() -> Self { MetricsService { metrics: None, @@ -116,6 +126,8 @@ impl MetricsService { } } + /// Creates a `MetricsService` that sends metrics + /// to prometheus alongside the telemetry. pub fn with_prometheus( registry: &Registry, config: &Configuration, @@ -141,60 +153,109 @@ impl MetricsService { }) } - pub fn tick( + /// Returns a never-ending `Future` that performs the + /// metric and telemetry updates with information from + /// the given sources. + pub async fn run( + mut self, + client: Arc, + transactions: Arc, + network: NetworkStatusSinks, + ) where + TBl: Block, + TCl: ProvideRuntimeApi + UsageProvider, + TExPool: MaintainedTransactionPool::Hash>, + { + let mut timer = Delay::new(Duration::from_secs(0)); + let timer_interval = Duration::from_secs(5); + + // Metric and telemetry update interval. + let net_status_interval = timer_interval; + let net_state_interval = Duration::from_secs(30); + + // Source of network information. + let mut net_status_rx = Some(network.status_stream(net_status_interval)); + let mut net_state_rx = Some(network.state_stream(net_state_interval)); + + loop { + // Wait for the next tick of the timer. + (&mut timer).await; + + // Try to get the latest network information. + let mut net_status = None; + let mut net_state = None; + if let Some(rx) = net_status_rx.as_mut() { + match Self::latest(rx) { + Ok(status) => { net_status = status; } + Err(()) => { net_status_rx = None; } + } + } + if let Some(rx) = net_state_rx.as_mut() { + match Self::latest(rx) { + Ok(state) => { net_state = state; } + Err(()) => { net_state_rx = None; } + } + } + + // Update / Send the metrics. + self.update( + &client.usage_info(), + &transactions.status(), + net_status, + net_state, + ); + + // Schedule next tick. + timer.reset(timer_interval); + } + } + + // Try to get the latest value from a receiver, dropping intermediate values. + fn latest(rx: &mut TracingUnboundedReceiver) -> Result, ()> { + let mut value = None; + + while let Ok(next) = rx.try_next() { + match next { + Some(v) => { + value = Some(v) + } + None => { + log::error!("Receiver closed unexpectedly."); + return Err(()) + } + } + } + + Ok(value) + } + + fn update( &mut self, info: &ClientInfo, txpool_status: &PoolStatus, - net_status: &NetworkStatus, + net_status: Option>, + net_state: Option, ) { let now = Instant::now(); let elapsed = (now - self.last_update).as_secs(); + self.last_update = now; let best_number = info.chain.best_number.saturated_into::(); let best_hash = info.chain.best_hash; - let num_peers = net_status.num_connected_peers; let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); - let total_bytes_inbound = net_status.total_bytes_inbound; - let total_bytes_outbound = net_status.total_bytes_outbound; - let best_seen_block = net_status - .best_seen_block - .map(|num: NumberFor| num.unique_saturated_into() as u64); - - let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound; - let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound; - let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) = - if elapsed > 0 { - self.last_total_bytes_inbound = total_bytes_inbound; - self.last_total_bytes_outbound = total_bytes_outbound; - (diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed) - } else { - (diff_bytes_inbound, diff_bytes_outbound) - }; - self.last_update = now; + // Update/send metrics that are always available. telemetry!( SUBSTRATE_INFO; "system.interval"; - "peers" => num_peers, "height" => best_number, "best" => ?best_hash, "txcount" => txpool_status.ready, "finalized_height" => finalized_number, "finalized_hash" => ?info.chain.finalized_hash, - "bandwidth_download" => avg_bytes_per_sec_inbound, - "bandwidth_upload" => avg_bytes_per_sec_outbound, "used_state_cache_size" => info.usage.as_ref() .map(|usage| usage.memory.state_cache.as_bytes()) .unwrap_or(0), - "used_db_cache_size" => info.usage.as_ref() - .map(|usage| usage.memory.database_cache.as_bytes()) - .unwrap_or(0), - "disk_read_per_sec" => info.usage.as_ref() - .map(|usage| usage.io.bytes_read) - .unwrap_or(0), - "disk_write_per_sec" => info.usage.as_ref() - .map(|usage| usage.io.bytes_written) - .unwrap_or(0), ); if let Some(metrics) = self.metrics.as_ref() { @@ -213,10 +274,6 @@ impl MetricsService { metrics.ready_transactions_number.set(txpool_status.ready as u64); - if let Some(best_seen_block) = best_seen_block { - metrics.block_height.with_label_values(&["sync_target"]).set(best_seen_block); - } - if let Some(info) = info.usage.as_ref() { metrics.database_cache.set(info.memory.database_cache.as_bytes() as u64); metrics.state_cache.set(info.memory.state_cache.as_bytes() as u64); @@ -232,5 +289,50 @@ impl MetricsService { ); } } + + // Update/send network status information, if any. + if let Some(net_status) = net_status { + let num_peers = net_status.num_connected_peers; + let total_bytes_inbound = net_status.total_bytes_inbound; + let total_bytes_outbound = net_status.total_bytes_outbound; + + let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound; + let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound; + let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) = + if elapsed > 0 { + self.last_total_bytes_inbound = total_bytes_inbound; + self.last_total_bytes_outbound = total_bytes_outbound; + (diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed) + } else { + (diff_bytes_inbound, diff_bytes_outbound) + }; + + telemetry!( + SUBSTRATE_INFO; + "system.interval"; + "peers" => num_peers, + "bandwidth_download" => avg_bytes_per_sec_inbound, + "bandwidth_upload" => avg_bytes_per_sec_outbound, + ); + + if let Some(metrics) = self.metrics.as_ref() { + let best_seen_block = net_status + .best_seen_block + .map(|num: NumberFor| num.unique_saturated_into() as u64); + + if let Some(best_seen_block) = best_seen_block { + metrics.block_height.with_label_values(&["sync_target"]).set(best_seen_block); + } + } + } + + // Send network state information, if any. + if let Some(net_state) = net_state { + telemetry!( + SUBSTRATE_INFO; + "system.network_state"; + "state" => net_state, + ); + } } }