Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
[metrics] Makes all channels metered (#682)
Browse files Browse the repository at this point in the history
* upgrade chrono to 0.4.20

* fix: fix underflow in debug message

* fix: rename worker_network_concourrent_tasks to worker_network_available_tasks

* feat: add a metered_channel that wraps a tokio::mpsc::{Sender, Receiver} with an IntGauge

* feat: use metered_channel everywhere in the Primary

* fix: move the tx_committed_certificates to be initialized in the node bootstrap

Adjusts the consensus registry on the back end of that.

* fix: add tests for reserve, try_reserve, empty & closed channel behavior
  • Loading branch information
huitseeker committed Aug 12, 2022
1 parent 8f8866b commit c7f8956
Show file tree
Hide file tree
Showing 46 changed files with 1,149 additions and 476 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ use std::{
};
use store::Store;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
watch,
},
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::{info, instrument};
use types::{
Certificate, CertificateDigest, ConsensusStore, ReconfigureNotification, Round, StoreResult,
metered_channel, Certificate, CertificateDigest, ConsensusStore, ReconfigureNotification,
Round, StoreResult,
};

/// The representation of the DAG in memory.
Expand Down Expand Up @@ -189,9 +187,9 @@ pub struct Consensus<ConsensusProtocol> {
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
/// Receives new certificates from the primary. The primary should send us new certificates only
/// if it already sent us its whole history.
rx_primary: Receiver<Certificate>,
rx_primary: metered_channel::Receiver<Certificate>,
/// Outputs the sequence of ordered certificates to the primary (for cleanup and feedback).
tx_primary: Sender<Certificate>,
tx_primary: metered_channel::Sender<Certificate>,
/// Outputs the sequence of ordered certificates to the application layer.
tx_output: Sender<ConsensusOutput>,

Expand All @@ -216,8 +214,8 @@ where
store: Arc<ConsensusStore>,
cert_store: Store<CertificateDigest, Certificate>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_primary: Receiver<Certificate>,
tx_primary: Sender<Certificate>,
rx_primary: metered_channel::Receiver<Certificate>,
tx_primary: metered_channel::Sender<Certificate>,
tx_output: Sender<ConsensusOutput>,
protocol: Protocol,
metrics: Arc<ConsensusMetrics>,
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{
task::JoinHandle,
};
use tracing::instrument;
use types::{Certificate, CertificateDigest, Round};
use types::{metered_channel, Certificate, CertificateDigest, Round};

use crate::{metrics::ConsensusMetrics, DEFAULT_CHANNEL_SIZE};

Expand All @@ -36,7 +36,7 @@ pub mod dag_tests;
struct InnerDag {
/// Receives new certificates from the primary. The primary should send us new certificates only
/// if it already sent us its whole history.
rx_primary: Receiver<Certificate>,
rx_primary: metered_channel::Receiver<Certificate>,

/// Receives new commands for the Dag.
rx_commands: Receiver<DagCommand>,
Expand Down Expand Up @@ -101,7 +101,7 @@ enum DagCommand {
impl InnerDag {
fn new(
committee: &Committee,
rx_primary: Receiver<Certificate>,
rx_primary: metered_channel::Receiver<Certificate>,
rx_commands: Receiver<DagCommand>,
dag: NodeDag<Certificate>,
vertices: RwLock<BTreeMap<(PublicKey, Round), CertificateDigest>>,
Expand Down Expand Up @@ -339,7 +339,7 @@ impl InnerDag {
impl Dag {
pub fn new(
committee: &Committee,
rx_primary: Receiver<Certificate>,
rx_primary: metered_channel::Receiver<Certificate>,
metrics: Arc<ConsensusMetrics>,
) -> (JoinHandle<()>, Self) {
let (tx_commands, rx_commands) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_SIZE);
Expand Down
24 changes: 12 additions & 12 deletions consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async fn commit_one() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -133,8 +133,8 @@ async fn dead_node() {
let (mut certificates, _) = test_utils::make_optimal_certificates(1..=9, &genesis, &keys);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -243,8 +243,8 @@ async fn not_enough_support() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -327,8 +327,8 @@ async fn missing_leader() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -389,8 +389,8 @@ async fn epoch_change() {
let mut committee = mock_committee(&keys[..]);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
Expand Down Expand Up @@ -474,8 +474,8 @@ async fn restart_with_new_committee() {
// Run for a few epochs.
for epoch in 0..5 {
// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
Expand Down
17 changes: 8 additions & 9 deletions consensus/src/tests/dag_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crypto::{traits::KeyPair, Hash};
use dag::node_dag::NodeDagError;
use std::sync::Arc;
use test_utils::make_optimal_certificates;
use tokio::sync::mpsc::channel;
use types::Certificate;

use crate::metrics::ConsensusMetrics;
Expand All @@ -33,7 +32,7 @@ async fn inner_dag_insert_one() {
let (mut certificates, _next_parents) = make_optimal_certificates(1..=4, &genesis, &keys);

// set up a Dag
let (tx_cert, rx_cert) = channel(1);
let (tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
Dag::new(&committee, rx_cert, metrics);

Expand All @@ -58,7 +57,7 @@ async fn test_dag_read_notify() {
let (mut certificates, _next_parents) = make_optimal_certificates(1..=4, &genesis, &keys);
let certs = certificates.clone().into_iter().map(|c| (c.digest(), c));
// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let arc = Arc::new(Dag::new(&committee, rx_cert, metrics));
let cloned = arc.clone();
Expand Down Expand Up @@ -93,7 +92,7 @@ async fn test_dag_new_has_genesis_and_its_not_live() {
.collect::<BTreeSet<_>>();

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_, dag) = Dag::new(&committee, rx_cert, metrics);

Expand Down Expand Up @@ -144,7 +143,7 @@ async fn test_dag_compresses_empty_blocks() {
.collect::<BTreeSet<_>>();

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_, dag) = Dag::new(&committee, rx_cert, metrics);

Expand Down Expand Up @@ -212,7 +211,7 @@ async fn test_dag_rounds_after_compression() {
.collect::<BTreeSet<_>>();

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_, dag) = Dag::new(&committee, rx_cert, metrics);

Expand Down Expand Up @@ -262,7 +261,7 @@ async fn dag_mutation_failures() {
let (certificates, _next_parents) = make_optimal_certificates(1..=4, &genesis, &keys);

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_handle, dag) = Dag::new(&committee, rx_cert, metrics);
let mut certs_to_insert = certificates.clone();
Expand Down Expand Up @@ -331,7 +330,7 @@ async fn dag_insert_one_and_rounds_node_read() {
let (certificates, _next_parents) = make_optimal_certificates(1..=4, &genesis, &keys);

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_handle, dag) = Dag::new(&committee, rx_cert, metrics);
let mut certs_to_insert = certificates.clone();
Expand Down Expand Up @@ -379,7 +378,7 @@ async fn dag_insert_and_remove_reads() {
let (mut certificates, _next_parents) = make_optimal_certificates(1..=4, &genesis, &keys);

// set up a Dag
let (_tx_cert, rx_cert) = channel(1);
let (_tx_cert, rx_cert) = test_utils::test_channel!(1);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let (_handle, dag) = Dag::new(&committee, rx_cert, metrics);

Expand Down
11 changes: 6 additions & 5 deletions consensus/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use prometheus::Registry;
use std::collections::{BTreeSet, VecDeque};
use test_utils::{keys, make_consensus_store, mock_committee};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use types::metered_channel;

/// Make enough certificates to commit a leader.
pub fn commit_certificates() -> VecDeque<Certificate> {
Expand All @@ -36,7 +37,7 @@ pub fn commit_certificates() -> VecDeque<Certificate> {
/// Spawn the consensus core and the subscriber handler. Also add to storage enough certificates to
/// commit a leader (as if they were added by the Primary).
pub async fn spawn_node(
rx_waiter: Receiver<Certificate>,
rx_waiter: metered_channel::Receiver<Certificate>,
rx_client: Receiver<ConsensusSyncRequest>,
tx_client: Sender<ConsensusOutput>,
) -> (watch::Sender<ReconfigureNotification>, Vec<JoinHandle<()>>) {
Expand Down Expand Up @@ -64,7 +65,7 @@ pub async fn spawn_node(
certificate_store.write_all(to_store).await.unwrap();

// Spawn the consensus engine and sink the primary channel.
let (tx_primary, mut rx_primary) = channel(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, rx_output) = channel(1);
let gc_depth = 50;
let tusk = Tusk::new(committee.clone(), consensus_store.clone(), gc_depth);
Expand Down Expand Up @@ -131,7 +132,7 @@ pub async fn order_stream(

#[tokio::test]
async fn subscribe() {
let (tx_consensus_input, rx_consensus_input) = channel(1);
let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1);
let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1);
let (_tx_client_to_consensus, rx_client_to_consensus) = channel(1);

Expand Down Expand Up @@ -161,7 +162,7 @@ async fn subscribe() {

#[tokio::test]
async fn subscribe_sync() {
let (tx_consensus_input, rx_consensus_input) = channel(1);
let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1);
let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1);
let (tx_client_to_consensus, rx_client_to_consensus) = channel(1);

Expand Down Expand Up @@ -212,7 +213,7 @@ async fn subscribe_sync() {

#[tokio::test]
async fn restart() {
let (tx_consensus_input, rx_consensus_input) = channel(1);
let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1);
let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1);
let (_tx_client_to_consensus, rx_client_to_consensus) = channel(1);

Expand Down
24 changes: 12 additions & 12 deletions consensus/src/tests/tusk_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ async fn commit_one() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -132,8 +132,8 @@ async fn dead_node() {
let (mut certificates, _) = test_utils::make_optimal_certificates(1..=9, &genesis, &keys);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -240,8 +240,8 @@ async fn not_enough_support() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -322,8 +322,8 @@ async fn missing_leader() {
certificates.push_back(certificate);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
Expand Down Expand Up @@ -383,8 +383,8 @@ async fn epoch_change() {
let mut committee = mock_committee(&keys[..]);

// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
Expand Down Expand Up @@ -462,8 +462,8 @@ async fn restart_with_new_committee() {
// Run for a few epochs.
for epoch in 0..5 {
// Spawn the consensus engine and sink the primary channel.
let (tx_waiter, rx_waiter) = channel(1);
let (tx_primary, mut rx_primary) = channel(1);
let (tx_waiter, rx_waiter) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
Expand Down
6 changes: 3 additions & 3 deletions network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl PrimaryNetworkMetrics {
Self {
network_available_tasks: register_int_gauge_vec_with_registry!(
"primary_network_available_tasks",
"The number of available tasks to run in the network connector",
"The number of available tasks to run in the primary2primary network connector",
&["module", "network", "address"],
registry
)
Expand Down Expand Up @@ -49,8 +49,8 @@ impl WorkerNetworkMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
network_available_tasks: register_int_gauge_vec_with_registry!(
"worker_network_concurrent_tasks",
"The number of available tasks to run in the network connector",
"worker_network_available_tasks",
"The number of available tasks to run in the worker2worker network connector",
&["module", "network", "address"],
registry
)
Expand Down
Loading

0 comments on commit c7f8956

Please sign in to comment.