diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index 16c57cf82e573..a3cb4b424ce76 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -29,7 +29,7 @@ use types::{ SerializedBatchMessage, Transaction, ValidatorClient, }; use worker::{ - metrics::{Metrics, WorkerEndpointMetrics, WorkerMetrics}, + metrics::{Metrics, WorkerChannelMetrics, WorkerEndpointMetrics, WorkerMetrics}, Worker, WorkerMessage, }; @@ -131,6 +131,7 @@ async fn test_get_collections() { let registry = Registry::new(); let metrics = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry)), network_metrics: Some(WorkerNetworkMetrics::new(®istry)), }; @@ -343,6 +344,7 @@ async fn test_remove_collections() { let registry = Registry::new(); let metrics = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry)), network_metrics: Some(WorkerNetworkMetrics::new(®istry)), }; @@ -902,6 +904,7 @@ async fn test_get_collections_with_missing_certificates() { let registry_1 = Registry::new(); let metrics_1 = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry_1)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry_1)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry_1)), network_metrics: Some(WorkerNetworkMetrics::new(®istry_1)), }; @@ -944,6 +947,7 @@ async fn test_get_collections_with_missing_certificates() { let registry_2 = Registry::new(); let metrics_2 = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry_2)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry_2)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry_2)), network_metrics: Some(WorkerNetworkMetrics::new(®istry_2)), }; diff --git a/narwhal/worker/src/batch_maker.rs b/narwhal/worker/src/batch_maker.rs index d9c7eb7e32207..fc45ab33b488c 100644 --- a/narwhal/worker/src/batch_maker.rs +++ b/narwhal/worker/src/batch_maker.rs @@ -5,14 +5,15 @@ use config::Committee; #[cfg(feature = "benchmark")] use std::convert::TryInto; use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, + sync::watch, task::JoinHandle, time::{sleep, Duration, Instant}, }; -use types::{error::DagError, Batch, ReconfigureNotification, Transaction}; +use types::{ + error::DagError, + metered_channel::{Receiver, Sender}, + Batch, ReconfigureNotification, Transaction, +}; #[cfg(test)] #[path = "tests/batch_maker_tests.rs"] diff --git a/narwhal/worker/src/helper.rs b/narwhal/worker/src/helper.rs index 824f4e87bcc2b..c42a8bcd2bf05 100644 --- a/narwhal/worker/src/helper.rs +++ b/narwhal/worker/src/helper.rs @@ -7,14 +7,13 @@ use crypto::PublicKey; use network::{UnreliableNetwork, WorkerNetwork}; use store::Store; use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, + sync::{mpsc, watch}, task::JoinHandle, }; use tracing::{error, trace, warn}; -use types::{BatchDigest, ReconfigureNotification, SerializedBatchMessage}; +use types::{ + metered_channel::Receiver, BatchDigest, ReconfigureNotification, SerializedBatchMessage, +}; #[cfg(test)] #[path = "tests/helper_tests.rs"] @@ -33,7 +32,7 @@ pub struct Helper { /// Input channel to receive batch requests from workers. rx_worker_request: Receiver<(Vec, PublicKey)>, /// Input channel to receive batch requests from workers. - rx_client_request: Receiver<(Vec, Sender)>, + rx_client_request: Receiver<(Vec, mpsc::Sender)>, /// A network sender to send the batches to the other workers. network: WorkerNetwork, } @@ -46,7 +45,7 @@ impl Helper { store: Store, rx_reconfigure: watch::Receiver, rx_worker_request: Receiver<(Vec, PublicKey)>, - rx_client_request: Receiver<(Vec, Sender)>, + rx_client_request: Receiver<(Vec, mpsc::Sender)>, network: WorkerNetwork, ) -> JoinHandle<()> { tokio::spawn(async move { diff --git a/narwhal/worker/src/metrics.rs b/narwhal/worker/src/metrics.rs index 0cb9a3393640d..079e7d0935035 100644 --- a/narwhal/worker/src/metrics.rs +++ b/narwhal/worker/src/metrics.rs @@ -4,7 +4,8 @@ use mysten_network::metrics::MetricsCallbackProvider; use network::metrics::WorkerNetworkMetrics; use prometheus::{ default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry, - register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry, + register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec, + IntCounterVec, IntGauge, IntGaugeVec, Registry, }; use std::time::Duration; use tonic::Code; @@ -12,6 +13,7 @@ use tonic::Code; #[derive(Clone)] pub struct Metrics { pub worker_metrics: Option, + pub channel_metrics: Option, pub endpoint_metrics: Option, pub network_metrics: Option, } @@ -21,6 +23,9 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics { // Essential/core metrics across the worker node let node_metrics = WorkerMetrics::new(metrics_registry); + // Channel metrics + let channel_metrics = WorkerChannelMetrics::new(metrics_registry); + // Endpoint metrics let endpoint_metrics = WorkerEndpointMetrics::new(metrics_registry); @@ -29,6 +34,7 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics { Metrics { worker_metrics: Some(node_metrics), + channel_metrics: Some(channel_metrics), endpoint_metrics: Some(endpoint_metrics), network_metrics: Some(network_metrics), } @@ -60,6 +66,73 @@ impl Default for WorkerMetrics { } } +#[derive(Clone)] +pub struct WorkerChannelMetrics { + /// occupancy of the channel from various handlers to the `worker::PrimaryConnector` + pub tx_primary: IntGauge, + /// occupancy of the channel from the `worker::PrimaryReceiverHandler` to the `worker::Synchronizer` + pub tx_synchronizer: IntGauge, + /// occupancy of the channel from the `worker::TxReceiverhandler` to the `worker::BatchMaker` + pub tx_batch_maker: IntGauge, + /// occupancy of the channel from the `worker::BatchMaker` to the `worker::QuorumWaiter` + pub tx_quorum_waiter: IntGauge, + /// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Processor` + pub tx_worker_processor: IntGauge, + /// occupancy of the channel from the `worker::QuorumWaiter` to the `worker::Processor` + pub tx_client_processor: IntGauge, + /// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying worker requests) + pub tx_worker_helper: IntGauge, + /// occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying client requests) + pub tx_client_helper: IntGauge, +} + +impl WorkerChannelMetrics { + pub fn new(registry: &Registry) -> Self { + Self { + tx_primary: register_int_gauge_with_registry!( + "tx_primary", + "occupancy of the channel from various handlers to the `worker::PrimaryConnector`", + registry + ).unwrap(), + tx_synchronizer: register_int_gauge_with_registry!( + "tx_synchronizer", + "occupancy of the channel from the `worker::PrimaryReceiverHandler` to the `worker::Synchronizer`", + registry + ).unwrap(), + tx_batch_maker: register_int_gauge_with_registry!( + "tx_batch_maker", + "occupancy of the channel from the `worker::TxReceiverhandler` to the `worker::BatchMaker`", + registry + ).unwrap(), + tx_quorum_waiter: register_int_gauge_with_registry!( + "tx_quorum_waiter", + "occupancy of the channel from the `worker::BatchMaker` to the `worker::QuorumWaiter`", + registry + ).unwrap(), + tx_worker_processor: register_int_gauge_with_registry!( + "tx_worker_processor", + "occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Processor`", + registry + ).unwrap(), + tx_client_processor: register_int_gauge_with_registry!( + "tx_client_processor", + "occupancy of the channel from the `worker::QuorumWaiter` to the `worker::Processor`", + registry + ).unwrap(), + tx_worker_helper: register_int_gauge_with_registry!( + "tx_worker_helper", + "occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying worker requests)", + registry + ).unwrap(), + tx_client_helper: register_int_gauge_with_registry!( + "tx_client_helper", + "occupancy of the channel from the `worker::WorkerReceiverHandler` to the `worker::Helper` (carrying client requests)", + registry + ).unwrap() + } + } +} + #[derive(Clone)] pub struct WorkerEndpointMetrics { /// Counter of requests, route is a label (ie separate timeseries per route) diff --git a/narwhal/worker/src/primary_connector.rs b/narwhal/worker/src/primary_connector.rs index 656921a055c68..38da2b7a0a7fb 100644 --- a/narwhal/worker/src/primary_connector.rs +++ b/narwhal/worker/src/primary_connector.rs @@ -5,11 +5,8 @@ use config::Committee; use crypto::PublicKey; use futures::{stream::FuturesUnordered, StreamExt}; use network::{ReliableNetwork, WorkerToPrimaryNetwork}; -use tokio::{ - sync::{mpsc::Receiver, watch}, - task::JoinHandle, -}; -use types::{ReconfigureNotification, WorkerPrimaryMessage}; +use tokio::{sync::watch, task::JoinHandle}; +use types::{metered_channel::Receiver, ReconfigureNotification, WorkerPrimaryMessage}; /// The maximum number of digests kept in memory waiting to be sent to the primary. pub const MAX_PENDING_DIGESTS: usize = 10_000; diff --git a/narwhal/worker/src/processor.rs b/narwhal/worker/src/processor.rs index b9ac91ef5ac80..54ed0d86fc839 100644 --- a/narwhal/worker/src/processor.rs +++ b/narwhal/worker/src/processor.rs @@ -4,17 +4,13 @@ use config::WorkerId; use store::Store; -use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, - task::JoinHandle, -}; +use tokio::{sync::watch, task::JoinHandle}; use tracing::error; use types::{ - error::DagError, serialized_batch_digest, BatchDigest, ReconfigureNotification, - SerializedBatchMessage, WorkerPrimaryMessage, + error::DagError, + metered_channel::{Receiver, Sender}, + serialized_batch_digest, BatchDigest, ReconfigureNotification, SerializedBatchMessage, + WorkerPrimaryMessage, }; #[cfg(test)] diff --git a/narwhal/worker/src/quorum_waiter.rs b/narwhal/worker/src/quorum_waiter.rs index ff84e49923b74..fdd027b7a0d3c 100644 --- a/narwhal/worker/src/quorum_waiter.rs +++ b/narwhal/worker/src/quorum_waiter.rs @@ -5,15 +5,11 @@ use config::{Committee, Stake, WorkerId}; use crypto::PublicKey; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _}; use network::{CancelOnDropHandler, MessageResult, ReliableNetwork, WorkerNetwork}; -use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, - task::JoinHandle, -}; +use tokio::{sync::watch, task::JoinHandle}; use types::{ - error::DagError, Batch, ReconfigureNotification, SerializedBatchMessage, WorkerMessage, + error::DagError, + metered_channel::{Receiver, Sender}, + Batch, ReconfigureNotification, SerializedBatchMessage, WorkerMessage, }; #[cfg(test)] diff --git a/narwhal/worker/src/synchronizer.rs b/narwhal/worker/src/synchronizer.rs index 66ad7df50941e..49ae298dfae4f 100644 --- a/narwhal/worker/src/synchronizer.rs +++ b/narwhal/worker/src/synchronizer.rs @@ -14,15 +14,13 @@ use std::{ }; use store::{Store, StoreError}; use tokio::{ - sync::{ - mpsc::{channel, Receiver, Sender}, - watch, - }, + sync::{mpsc, watch}, task::JoinHandle, time::{sleep, Duration, Instant}, }; use tracing::{debug, error}; use types::{ + metered_channel::{Receiver, Sender}, BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage, WorkerPrimaryError, WorkerPrimaryMessage, }; @@ -60,7 +58,7 @@ pub struct Synchronizer { /// Keeps the digests (of batches) that are waiting to be processed by the primary. Their /// processing will resume when we get the missing batches in the store or we no longer need them. /// It also keeps the round number and a time stamp (`u128`) of each request we sent. - pending: HashMap, u128)>, + pending: HashMap, u128)>, /// Send reconfiguration update to other tasks. tx_reconfigure: watch::Sender, /// Output channel to send out the batch requests. @@ -113,7 +111,7 @@ impl Synchronizer { missing: BatchDigest, store: Store, deliver: BatchDigest, - mut handler: Receiver<()>, + mut handler: mpsc::Receiver<()>, ) -> Result, StoreError> { tokio::select! { result = store.notify_read(missing) => { @@ -164,7 +162,7 @@ impl Synchronizer { // Add the digest to the waiter. let deliver = digest; - let (tx_cancel, rx_cancel) = channel(1); + let (tx_cancel, rx_cancel) = mpsc::channel(1); let fut = Self::waiter(digest, self.store.clone(), deliver, rx_cancel); waiting.push(fut); self.pending.insert(digest, (self.round, tx_cancel, now)); diff --git a/narwhal/worker/src/tests/batch_maker_tests.rs b/narwhal/worker/src/tests/batch_maker_tests.rs index 554e859a74bcb..50597aabc0660 100644 --- a/narwhal/worker/src/tests/batch_maker_tests.rs +++ b/narwhal/worker/src/tests/batch_maker_tests.rs @@ -3,15 +3,14 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; use test_utils::{committee, transaction}; -use tokio::sync::mpsc::channel; #[tokio::test] async fn make_batch() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); - let (tx_transaction, rx_transaction) = channel(1); - let (tx_message, mut rx_message) = channel(1); + let (tx_transaction, rx_transaction) = test_utils::test_channel!(1); + let (tx_message, mut rx_message) = test_utils::test_channel!(1); // Spawn a `BatchMaker` instance. let _batch_maker_handle = BatchMaker::spawn( @@ -40,8 +39,8 @@ async fn batch_timeout() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); - let (tx_transaction, rx_transaction) = channel(1); - let (tx_message, mut rx_message) = channel(1); + let (tx_transaction, rx_transaction) = test_utils::test_channel!(1); + let (tx_message, mut rx_message) = test_utils::test_channel!(1); // Spawn a `BatchMaker` instance. let _batch_maker_handle = BatchMaker::spawn( diff --git a/narwhal/worker/src/tests/helper_tests.rs b/narwhal/worker/src/tests/helper_tests.rs index f8017f83b2cac..c403980b248cf 100644 --- a/narwhal/worker/src/tests/helper_tests.rs +++ b/narwhal/worker/src/tests/helper_tests.rs @@ -13,8 +13,8 @@ use types::BatchDigest; #[tokio::test] async fn worker_batch_reply() { - let (tx_worker_request, rx_worker_request) = channel(1); - let (_tx_client_request, rx_client_request) = channel(1); + let (tx_worker_request, rx_worker_request) = test_utils::test_channel!(1); + let (_tx_client_request, rx_client_request) = test_utils::test_channel!(1); let requestor = keys(None).pop().unwrap().public().clone(); let id = 0; let committee = committee(None).clone(); @@ -62,8 +62,8 @@ async fn worker_batch_reply() { #[tokio::test] async fn client_batch_reply() { - let (_tx_worker_request, rx_worker_request) = channel(1); - let (tx_client_request, rx_client_request) = channel(1); + let (_tx_worker_request, rx_worker_request) = test_utils::test_channel!(1); + let (tx_client_request, rx_client_request) = test_utils::test_channel!(1); let id = 0; let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = diff --git a/narwhal/worker/src/tests/processor_tests.rs b/narwhal/worker/src/tests/processor_tests.rs index a9e13b8ec9a9f..4f32a68e8d5a1 100644 --- a/narwhal/worker/src/tests/processor_tests.rs +++ b/narwhal/worker/src/tests/processor_tests.rs @@ -6,12 +6,11 @@ use crate::worker::WorkerMessage; use crypto::Hash; use store::rocks; use test_utils::{batch, committee, temp_dir}; -use tokio::sync::mpsc::channel; #[tokio::test] async fn hash_and_store() { - let (tx_batch, rx_batch) = channel(1); - let (tx_digest, mut rx_digest) = channel(1); + let (tx_batch, rx_batch) = test_utils::test_channel!(1); + let (tx_digest, mut rx_digest) = test_utils::test_channel!(1); let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = diff --git a/narwhal/worker/src/tests/quorum_waiter_tests.rs b/narwhal/worker/src/tests/quorum_waiter_tests.rs index ae6387989ada2..bfc4f6b029367 100644 --- a/narwhal/worker/src/tests/quorum_waiter_tests.rs +++ b/narwhal/worker/src/tests/quorum_waiter_tests.rs @@ -5,12 +5,11 @@ use super::*; use crate::worker::WorkerMessage; use crypto::traits::KeyPair; use test_utils::{batch, committee, keys, WorkerToWorkerMockServer}; -use tokio::sync::mpsc::channel; #[tokio::test] async fn wait_for_quorum() { - let (tx_message, rx_message) = channel(1); - let (tx_batch, mut rx_batch) = channel(1); + let (tx_message, rx_message) = test_utils::test_channel!(1); + let (tx_batch, mut rx_batch) = test_utils::test_channel!(1); let myself = keys(None).pop().unwrap().public().clone(); let committee = committee(None).clone(); diff --git a/narwhal/worker/src/tests/synchronizer_tests.rs b/narwhal/worker/src/tests/synchronizer_tests.rs index 4f57aebc7af00..e85b6a1bbee11 100644 --- a/narwhal/worker/src/tests/synchronizer_tests.rs +++ b/narwhal/worker/src/tests/synchronizer_tests.rs @@ -9,13 +9,13 @@ use test_utils::{ batch, batch_digest, batches, committee, keys, open_batch_store, serialize_batch_message, WorkerToWorkerMockServer, }; -use tokio::{sync::mpsc::channel, time::timeout}; +use tokio::time::timeout; use types::serialized_batch_digest; #[tokio::test] async fn synchronize() { - let (tx_message, rx_message) = channel(1); - let (tx_primary, _) = channel(1); + let (tx_message, rx_message) = test_utils::test_channel!(1); + let (tx_primary, _) = test_utils::test_channel!(1); let mut keys = keys(None); let name = keys.pop().unwrap().public().clone(); @@ -65,8 +65,8 @@ async fn synchronize() { #[tokio::test] async fn test_successful_request_batch() { - let (tx_message, rx_message) = channel(1); - let (tx_primary, mut rx_primary) = channel(1); + let (tx_message, rx_message) = test_utils::test_channel!(1); + let (tx_primary, mut rx_primary) = test_utils::test_channel!(1); let mut keys = keys(None); let name = keys.pop().unwrap().public().clone(); @@ -128,8 +128,8 @@ async fn test_successful_request_batch() { #[tokio::test] async fn test_request_batch_not_found() { - let (tx_message, rx_message) = channel(1); - let (tx_primary, mut rx_primary) = channel(1); + let (tx_message, rx_message) = test_utils::test_channel!(1); + let (tx_primary, mut rx_primary) = test_utils::test_channel!(1); let mut keys = keys(None); let name = keys.pop().unwrap().public().clone(); @@ -190,8 +190,8 @@ async fn test_request_batch_not_found() { #[tokio::test] async fn test_successful_batch_delete() { - let (tx_message, rx_message) = channel(1); - let (tx_primary, mut rx_primary) = channel(1); + let (tx_message, rx_message) = test_utils::test_channel!(1); + let (tx_primary, mut rx_primary) = test_utils::test_channel!(1); let mut keys = keys(None); let name = keys.pop().unwrap().public().clone(); diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index 04c24c22c4140..527cf957795b8 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -39,6 +39,7 @@ async fn handle_clients_transactions() { let registry = Registry::new(); let metrics = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry)), network_metrics: Some(WorkerNetworkMetrics::new(®istry)), }; @@ -118,6 +119,7 @@ async fn handle_client_batch_request() { let registry = Registry::new(); let metrics = Metrics { worker_metrics: Some(WorkerMetrics::new(®istry)), + channel_metrics: Some(WorkerChannelMetrics::new(®istry)), endpoint_metrics: Some(WorkerEndpointMetrics::new(®istry)), network_metrics: Some(WorkerNetworkMetrics::new(®istry)), }; diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index ba7bd3c5af30c..d87bd9c306f67 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -2,8 +2,9 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::{ - batch_maker::BatchMaker, helper::Helper, primary_connector::PrimaryConnector, - processor::Processor, quorum_waiter::QuorumWaiter, synchronizer::Synchronizer, + batch_maker::BatchMaker, helper::Helper, metrics::WorkerChannelMetrics, + primary_connector::PrimaryConnector, processor::Processor, quorum_waiter::QuorumWaiter, + synchronizer::Synchronizer, }; use async_trait::async_trait; use bytes::Bytes; @@ -16,19 +17,18 @@ use primary::PrimaryWorkerMessage; use std::{net::Ipv4Addr, pin::Pin, sync::Arc}; use store::Store; use tokio::{ - sync::{ - mpsc::{channel, Sender}, - watch, - }, + sync::{mpsc, watch}, task::JoinHandle, }; use tonic::{Request, Response, Status}; use tracing::info; use types::{ - error::DagError, BatchDigest, BincodeEncodedPayload, ClientBatchRequest, Empty, - PrimaryToWorker, PrimaryToWorkerServer, ReconfigureNotification, SerializedBatchMessage, - Transaction, TransactionProto, Transactions, TransactionsServer, WorkerPrimaryMessage, - WorkerToWorker, WorkerToWorkerServer, + error::DagError, + metered_channel::{channel, Sender}, + BatchDigest, BincodeEncodedPayload, ClientBatchRequest, Empty, PrimaryToWorker, + PrimaryToWorkerServer, ReconfigureNotification, SerializedBatchMessage, Transaction, + TransactionProto, Transactions, TransactionsServer, WorkerPrimaryMessage, WorkerToWorker, + WorkerToWorkerServer, }; #[cfg(test)] @@ -77,9 +77,10 @@ impl Worker { let node_metrics = Arc::new(metrics.worker_metrics.unwrap()); let endpoint_metrics = metrics.endpoint_metrics.unwrap(); let network_metrics = Arc::new(metrics.network_metrics.unwrap()); + let channel_metrics: Arc = Arc::new(metrics.channel_metrics.unwrap()); // Spawn all worker tasks. - let (tx_primary, rx_primary) = channel(CHANNEL_CAPACITY); + let (tx_primary, rx_primary) = channel(CHANNEL_CAPACITY, &channel_metrics.tx_primary); let initial_committee = (*(*(*committee).load()).clone()).clone(); let (tx_reconfigure, rx_reconfigure) = @@ -88,18 +89,21 @@ impl Worker { let client_flow_handles = worker.handle_clients_transactions( &tx_reconfigure, tx_primary.clone(), + channel_metrics.clone(), endpoint_metrics, network_metrics.clone(), ); let worker_flow_handles = worker.handle_workers_messages( &tx_reconfigure, tx_primary.clone(), + channel_metrics.clone(), network_metrics.clone(), ); let primary_flow_handles = worker.handle_primary_messages( tx_reconfigure, tx_primary, node_metrics, + channel_metrics, network_metrics, ); @@ -131,9 +135,11 @@ impl Worker { tx_reconfigure: watch::Sender, tx_primary: Sender, node_metrics: Arc, + channel_metrics: Arc, network_metrics: Arc, ) -> Vec> { - let (tx_synchronizer, rx_synchronizer) = channel(CHANNEL_CAPACITY); + let (tx_synchronizer, rx_synchronizer) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_synchronizer); // Receive incoming messages from our primary. let address = self @@ -182,12 +188,16 @@ impl Worker { &self, tx_reconfigure: &watch::Sender, tx_primary: Sender, + channel_metrics: Arc, endpoint_metrics: WorkerEndpointMetrics, network_metrics: Arc, ) -> Vec> { - let (tx_batch_maker, rx_batch_maker) = channel(CHANNEL_CAPACITY); - let (tx_quorum_waiter, rx_quorum_waiter) = channel(CHANNEL_CAPACITY); - let (tx_processor, rx_processor) = channel(CHANNEL_CAPACITY); + let (tx_batch_maker, rx_batch_maker) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_batch_maker); + let (tx_quorum_waiter, rx_quorum_waiter) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_quorum_waiter); + let (tx_client_processor, rx_client_processor) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_client_processor); // We first receive clients' transactions from the network. let address = self @@ -229,7 +239,7 @@ impl Worker { (*(*(*self.committee).load()).clone()).clone(), tx_reconfigure.subscribe(), /* rx_message */ rx_quorum_waiter, - /* tx_batch */ tx_processor, + /* tx_batch */ tx_client_processor, worker_network, ); @@ -239,7 +249,7 @@ impl Worker { self.id, self.store.clone(), tx_reconfigure.subscribe(), - /* rx_batch */ rx_processor, + /* rx_batch */ rx_client_processor, /* tx_digest */ tx_primary, /* own_batch */ true, ); @@ -262,11 +272,15 @@ impl Worker { &self, tx_reconfigure: &watch::Sender, tx_primary: Sender, + channel_metrics: Arc, network_metrics: Arc, ) -> Vec> { - let (tx_worker_helper, rx_worker_helper) = channel(CHANNEL_CAPACITY); - let (tx_client_helper, rx_client_helper) = channel(CHANNEL_CAPACITY); - let (tx_processor, rx_processor) = channel(CHANNEL_CAPACITY); + let (tx_worker_helper, rx_worker_helper) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_worker_helper); + let (tx_client_helper, rx_client_helper) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_client_helper); + let (tx_worker_processor, rx_worker_processor) = + channel(CHANNEL_CAPACITY, &channel_metrics.tx_worker_processor); // Receive incoming messages from other workers. let address = self @@ -281,7 +295,7 @@ impl Worker { let worker_handle = WorkerReceiverHandler { tx_worker_helper, tx_client_helper, - tx_processor, + tx_processor: tx_worker_processor, } .spawn( address.clone(), @@ -310,7 +324,7 @@ impl Worker { self.id, self.store.clone(), tx_reconfigure.subscribe(), - /* rx_batch */ rx_processor, + /* rx_batch */ rx_worker_processor, /* tx_digest */ tx_primary, /* own_batch */ false, ); @@ -403,7 +417,7 @@ impl Transactions for TxReceiverHandler { #[derive(Clone)] struct WorkerReceiverHandler { tx_worker_helper: Sender<(Vec, PublicKey)>, - tx_client_helper: Sender<(Vec, Sender)>, + tx_client_helper: Sender<(Vec, mpsc::Sender)>, tx_processor: Sender, } @@ -487,7 +501,7 @@ impl WorkerToWorker for WorkerReceiverHandler { // TODO [issue #7]: Do some accounting to prevent bad actors from use all our // resources (in this case allocate a gigantic channel). - let (sender, receiver) = channel(missing.len()); + let (sender, receiver) = mpsc::channel(missing.len()); self.tx_client_helper .send((missing, sender))