Skip to content

Commit

Permalink
feat: Make all Worker channels metered
Browse files Browse the repository at this point in the history
This is an extension of MystenLabs#682 to the worker
  • Loading branch information
huitseeker committed Aug 8, 2022
1 parent bab4878 commit f6dae11
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 91 deletions.
6 changes: 5 additions & 1 deletion primary/tests/integration_tests_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use types::{
SerializedBatchMessage, Transaction, ValidatorClient,
};
use worker::{
metrics::{Metrics, WorkerEndpointMetrics, WorkerMetrics},
metrics::{Metrics, WorkerChannelMetrics, WorkerEndpointMetrics, WorkerMetrics},
Worker, WorkerMessage,
};

Expand Down Expand Up @@ -131,6 +131,7 @@ async fn test_get_collections() {
let registry = Registry::new();
let metrics = Metrics {
worker_metrics: Some(WorkerMetrics::new(&registry)),
channel_metrics: Some(WorkerChannelMetrics::new(&registry)),
endpoint_metrics: Some(WorkerEndpointMetrics::new(&registry)),
network_metrics: Some(WorkerNetworkMetrics::new(&registry)),
};
Expand Down Expand Up @@ -343,6 +344,7 @@ async fn test_remove_collections() {
let registry = Registry::new();
let metrics = Metrics {
worker_metrics: Some(WorkerMetrics::new(&registry)),
channel_metrics: Some(WorkerChannelMetrics::new(&registry)),
endpoint_metrics: Some(WorkerEndpointMetrics::new(&registry)),
network_metrics: Some(WorkerNetworkMetrics::new(&registry)),
};
Expand Down Expand Up @@ -902,6 +904,7 @@ async fn test_get_collections_with_missing_certificates() {
let registry_1 = Registry::new();
let metrics_1 = Metrics {
worker_metrics: Some(WorkerMetrics::new(&registry_1)),
channel_metrics: Some(WorkerChannelMetrics::new(&registry_1)),
endpoint_metrics: Some(WorkerEndpointMetrics::new(&registry_1)),
network_metrics: Some(WorkerNetworkMetrics::new(&registry_1)),
};
Expand Down Expand Up @@ -944,6 +947,7 @@ async fn test_get_collections_with_missing_certificates() {
let registry_2 = Registry::new();
let metrics_2 = Metrics {
worker_metrics: Some(WorkerMetrics::new(&registry_2)),
channel_metrics: Some(WorkerChannelMetrics::new(&registry_2)),
endpoint_metrics: Some(WorkerEndpointMetrics::new(&registry_2)),
network_metrics: Some(WorkerNetworkMetrics::new(&registry_2)),
};
Expand Down
11 changes: 6 additions & 5 deletions worker/src/batch_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use config::Committee;
#[cfg(feature = "benchmark")]
use std::convert::TryInto;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
watch,
},
sync::watch,
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use types::{error::DagError, Batch, ReconfigureNotification, Transaction};
use types::{
error::DagError,
metered_channel::{Receiver, Sender},
Batch, ReconfigureNotification, Transaction,
};

#[cfg(test)]
#[path = "tests/batch_maker_tests.rs"]
Expand Down
13 changes: 6 additions & 7 deletions worker/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use crypto::PublicKey;
use network::{UnreliableNetwork, WorkerNetwork};
use store::Store;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
watch,
},
sync::{mpsc, watch},
task::JoinHandle,
};
use tracing::{error, trace, warn};
use types::{BatchDigest, ReconfigureNotification, SerializedBatchMessage};
use types::{
metered_channel::Receiver, BatchDigest, ReconfigureNotification, SerializedBatchMessage,
};

#[cfg(test)]
#[path = "tests/helper_tests.rs"]
Expand All @@ -33,7 +32,7 @@ pub struct Helper {
/// Input channel to receive batch requests from workers.
rx_worker_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
/// Input channel to receive batch requests from workers.
rx_client_request: Receiver<(Vec<BatchDigest>, Sender<SerializedBatchMessage>)>,
rx_client_request: Receiver<(Vec<BatchDigest>, mpsc::Sender<SerializedBatchMessage>)>,
/// A network sender to send the batches to the other workers.
network: WorkerNetwork,
}
Expand All @@ -46,7 +45,7 @@ impl Helper {
store: Store<BatchDigest, SerializedBatchMessage>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_worker_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
rx_client_request: Receiver<(Vec<BatchDigest>, Sender<SerializedBatchMessage>)>,
rx_client_request: Receiver<(Vec<BatchDigest>, mpsc::Sender<SerializedBatchMessage>)>,
network: WorkerNetwork,
) -> JoinHandle<()> {
tokio::spawn(async move {
Expand Down
75 changes: 74 additions & 1 deletion worker/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use mysten_network::metrics::MetricsCallbackProvider;
use network::metrics::WorkerNetworkMetrics;
use prometheus::{
default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use std::time::Duration;
use tonic::Code;

#[derive(Clone)]
pub struct Metrics {
pub worker_metrics: Option<WorkerMetrics>,
pub channel_metrics: Option<WorkerChannelMetrics>,
pub endpoint_metrics: Option<WorkerEndpointMetrics>,
pub network_metrics: Option<WorkerNetworkMetrics>,
}
Expand All @@ -21,6 +23,9 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics {
// Essential/core metrics across the worker node
let node_metrics = WorkerMetrics::new(metrics_registry);

// Channel metrics
let channel_metrics = WorkerChannelMetrics::new(metrics_registry);

// Endpoint metrics
let endpoint_metrics = WorkerEndpointMetrics::new(metrics_registry);

Expand All @@ -29,6 +34,7 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics {

Metrics {
worker_metrics: Some(node_metrics),
channel_metrics: Some(channel_metrics),
endpoint_metrics: Some(endpoint_metrics),
network_metrics: Some(network_metrics),
}
Expand Down Expand Up @@ -60,6 +66,73 @@ impl Default for WorkerMetrics {
}
}

#[derive(Clone)]
pub struct WorkerChannelMetrics {
/// occupancy of the channel from various handlers to the `worker::PrimaryConnector`
pub tx_primary: IntGauge,
/// occupancy of the channel from the `worker::PrimaryReceiverHandler` to the `worker::Synchronizer`
pub tx_synchronizer: IntGauge,
/// occupancy of the channel from the `worker::TxReceiverhandler` to the `worker::BatchMaker`
pub tx_batch_maker: IntGauge,
/// occupancy of the channel from the `worker::BatchMaker` to the `worker::QuorumWaiter`
pub tx_quorum_waiter: IntGauge,
/// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Processor`
pub tx_worker_processor: IntGauge,
/// occupancy of the channel from the `worker::QuorumWaiter` to the `worker::Processor`
pub tx_client_processor: IntGauge,
/// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying worker requests)
pub tx_worker_helper: IntGauge,
/// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying client requests)
pub tx_client_helper: IntGauge,
}

impl WorkerChannelMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
tx_primary: register_int_gauge_with_registry!(
"tx_primary",
"occupancy of the channel from various handlers to the `worker::PrimaryConnector`",
registry
).unwrap(),
tx_synchronizer: register_int_gauge_with_registry!(
"tx_synchronizer",
"occupancy of the channel from the `worker::PrimaryReceiverHandler` to the `worker::Synchronizer`",
registry
).unwrap(),
tx_batch_maker: register_int_gauge_with_registry!(
"tx_batch_maker",
"occupancy of the channel from the `worker::TxReceiverhandler` to the `worker::BatchMaker`",
registry
).unwrap(),
tx_quorum_waiter: register_int_gauge_with_registry!(
"tx_quorum_waiter",
"occupancy of the channel from the `worker::BatchMaker` to the `worker::QuorumWaiter`",
registry
).unwrap(),
tx_worker_processor: register_int_gauge_with_registry!(
"tx_worker_processor",
"occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Processor`",
registry
).unwrap(),
tx_client_processor: register_int_gauge_with_registry!(
"tx_client_processor",
"occupancy of the channel from the `worker::QuorumWaiter` to the `worker::Processor`",
registry
).unwrap(),
tx_worker_helper: register_int_gauge_with_registry!(
"tx_worker_helper",
"occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying worker requests)",
registry
).unwrap(),
tx_client_helper: register_int_gauge_with_registry!(
"tx_client_helper",
"occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying client requests)",
registry
).unwrap()
}
}
}

#[derive(Clone)]
pub struct WorkerEndpointMetrics {
/// Counter of requests, route is a label (ie separate timeseries per route)
Expand Down
7 changes: 2 additions & 5 deletions worker/src/primary_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ use config::Committee;
use crypto::PublicKey;
use futures::{stream::FuturesUnordered, StreamExt};
use network::{ReliableNetwork, WorkerToPrimaryNetwork};
use tokio::{
sync::{mpsc::Receiver, watch},
task::JoinHandle,
};
use types::{ReconfigureNotification, WorkerPrimaryMessage};
use tokio::{sync::watch, task::JoinHandle};
use types::{metered_channel::Receiver, ReconfigureNotification, WorkerPrimaryMessage};

/// The maximum number of digests kept in memory waiting to be sent to the primary.
pub const MAX_PENDING_DIGESTS: usize = 10_000;
Expand Down
14 changes: 5 additions & 9 deletions worker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

use config::WorkerId;
use store::Store;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
watch,
},
task::JoinHandle,
};
use tokio::{sync::watch, task::JoinHandle};
use tracing::error;
use types::{
error::DagError, serialized_batch_digest, BatchDigest, ReconfigureNotification,
SerializedBatchMessage, WorkerPrimaryMessage,
error::DagError,
metered_channel::{Receiver, Sender},
serialized_batch_digest, BatchDigest, ReconfigureNotification, SerializedBatchMessage,
WorkerPrimaryMessage,
};

#[cfg(test)]
Expand Down
12 changes: 4 additions & 8 deletions worker/src/quorum_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ use config::{Committee, Stake, WorkerId};
use crypto::PublicKey;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _};
use network::{CancelOnDropHandler, MessageResult, ReliableNetwork, WorkerNetwork};
use tokio::{
sync::{
mpsc::{Receiver, Sender},
watch,
},
task::JoinHandle,
};
use tokio::{sync::watch, task::JoinHandle};
use types::{
error::DagError, Batch, ReconfigureNotification, SerializedBatchMessage, WorkerMessage,
error::DagError,
metered_channel::{Receiver, Sender},
Batch, ReconfigureNotification, SerializedBatchMessage, WorkerMessage,
};

#[cfg(test)]
Expand Down
12 changes: 5 additions & 7 deletions worker/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ use std::{
};
use store::{Store, StoreError};
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
watch,
},
sync::{mpsc, watch},
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error};
use types::{
metered_channel::{Receiver, Sender},
BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage,
WorkerPrimaryError, WorkerPrimaryMessage,
};
Expand Down Expand Up @@ -60,7 +58,7 @@ pub struct Synchronizer {
/// Keeps the digests (of batches) that are waiting to be processed by the primary. Their
/// processing will resume when we get the missing batches in the store or we no longer need them.
/// It also keeps the round number and a time stamp (`u128`) of each request we sent.
pending: HashMap<BatchDigest, (Round, Sender<()>, u128)>,
pending: HashMap<BatchDigest, (Round, mpsc::Sender<()>, u128)>,
/// Send reconfiguration update to other tasks.
tx_reconfigure: watch::Sender<ReconfigureNotification>,
/// Output channel to send out the batch requests.
Expand Down Expand Up @@ -113,7 +111,7 @@ impl Synchronizer {
missing: BatchDigest,
store: Store<BatchDigest, SerializedBatchMessage>,
deliver: BatchDigest,
mut handler: Receiver<()>,
mut handler: mpsc::Receiver<()>,
) -> Result<Option<BatchDigest>, StoreError> {
tokio::select! {
result = store.notify_read(missing) => {
Expand Down Expand Up @@ -164,7 +162,7 @@ impl Synchronizer {

// Add the digest to the waiter.
let deliver = digest;
let (tx_cancel, rx_cancel) = channel(1);
let (tx_cancel, rx_cancel) = mpsc::channel(1);
let fut = Self::waiter(digest, self.store.clone(), deliver, rx_cancel);
waiting.push(fut);
self.pending.insert(digest, (self.round, tx_cancel, now));
Expand Down
9 changes: 4 additions & 5 deletions worker/src/tests/batch_maker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;
use test_utils::{committee, transaction};
use tokio::sync::mpsc::channel;

#[tokio::test]
async fn make_batch() {
let committee = committee(None).clone();
let (_tx_reconfiguration, rx_reconfiguration) =
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let (tx_transaction, rx_transaction) = channel(1);
let (tx_message, mut rx_message) = channel(1);
let (tx_transaction, rx_transaction) = test_utils::test_channel!(1);
let (tx_message, mut rx_message) = test_utils::test_channel!(1);

// Spawn a `BatchMaker` instance.
let _batch_maker_handle = BatchMaker::spawn(
Expand Down Expand Up @@ -40,8 +39,8 @@ async fn batch_timeout() {
let committee = committee(None).clone();
let (_tx_reconfiguration, rx_reconfiguration) =
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let (tx_transaction, rx_transaction) = channel(1);
let (tx_message, mut rx_message) = channel(1);
let (tx_transaction, rx_transaction) = test_utils::test_channel!(1);
let (tx_message, mut rx_message) = test_utils::test_channel!(1);

// Spawn a `BatchMaker` instance.
let _batch_maker_handle = BatchMaker::spawn(
Expand Down
8 changes: 4 additions & 4 deletions worker/src/tests/helper_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use types::BatchDigest;

#[tokio::test]
async fn worker_batch_reply() {
let (tx_worker_request, rx_worker_request) = channel(1);
let (_tx_client_request, rx_client_request) = channel(1);
let (tx_worker_request, rx_worker_request) = test_utils::test_channel!(1);
let (_tx_client_request, rx_client_request) = test_utils::test_channel!(1);
let requestor = keys(None).pop().unwrap().public().clone();
let id = 0;
let committee = committee(None).clone();
Expand Down Expand Up @@ -62,8 +62,8 @@ async fn worker_batch_reply() {

#[tokio::test]
async fn client_batch_reply() {
let (_tx_worker_request, rx_worker_request) = channel(1);
let (tx_client_request, rx_client_request) = channel(1);
let (_tx_worker_request, rx_worker_request) = test_utils::test_channel!(1);
let (tx_client_request, rx_client_request) = test_utils::test_channel!(1);
let id = 0;
let committee = committee(None).clone();
let (_tx_reconfiguration, rx_reconfiguration) =
Expand Down
5 changes: 2 additions & 3 deletions worker/src/tests/processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use crate::worker::WorkerMessage;
use crypto::Hash;
use store::rocks;
use test_utils::{batch, committee, temp_dir};
use tokio::sync::mpsc::channel;

#[tokio::test]
async fn hash_and_store() {
let (tx_batch, rx_batch) = channel(1);
let (tx_digest, mut rx_digest) = channel(1);
let (tx_batch, rx_batch) = test_utils::test_channel!(1);
let (tx_digest, mut rx_digest) = test_utils::test_channel!(1);

let committee = committee(None).clone();
let (_tx_reconfiguration, rx_reconfiguration) =
Expand Down
5 changes: 2 additions & 3 deletions worker/src/tests/quorum_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use super::*;
use crate::worker::WorkerMessage;
use crypto::traits::KeyPair;
use test_utils::{batch, committee, keys, WorkerToWorkerMockServer};
use tokio::sync::mpsc::channel;

#[tokio::test]
async fn wait_for_quorum() {
let (tx_message, rx_message) = channel(1);
let (tx_batch, mut rx_batch) = channel(1);
let (tx_message, rx_message) = test_utils::test_channel!(1);
let (tx_batch, mut rx_batch) = test_utils::test_channel!(1);
let myself = keys(None).pop().unwrap().public().clone();

let committee = committee(None).clone();
Expand Down
Loading

0 comments on commit f6dae11

Please sign in to comment.