From babd3d8ee6278652a8fee3b41764787203c17cc1 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Mon, 15 Aug 2022 18:31:07 +0100 Subject: [PATCH] [refactor] use block_waiter instead of batch_loader (#738) This commit is swapping the batch_loader with the block_waiter in the executor. This allow us to use a common solution for fetching batches and take advantage of the existing payload sync capabilities. Also earlier fix code has been removed for swapping external worker addresses with internal ones. --- Cargo.lock | 27 ++- executor/Cargo.toml | 3 + executor/src/batch_loader.rs | 207 ------------------ executor/src/core.rs | 20 +- executor/src/errors.rs | 4 + executor/src/lib.rs | 146 +----------- executor/src/metrics.rs | 8 - executor/src/subscriber.rs | 139 ++++++++---- executor/src/tests/fixtures.rs | 30 ++- executor/src/tests/subscriber_tests.rs | 197 ++++++++++++++--- executor/tests/consensus_integration_tests.rs | 92 ++++++++ node/src/lib.rs | 41 +++- primary/src/block_synchronizer/handler.rs | 8 +- primary/src/block_synchronizer/mod.rs | 17 +- primary/src/block_waiter.rs | 6 +- primary/src/lib.rs | 2 +- primary/src/metrics.rs | 32 ++- primary/src/primary.rs | 17 +- primary/tests/epoch_change.rs | 24 ++ .../tests/integration_tests_proposer_api.rs | 16 ++ .../tests/integration_tests_validator_api.rs | 33 +++ test_utils/src/cluster.rs | 25 ++- test_utils/src/lib.rs | 14 ++ workspace-hack/Cargo.toml | 3 +- 24 files changed, 617 insertions(+), 494 deletions(-) delete mode 100644 executor/src/batch_loader.rs create mode 100644 executor/tests/consensus_integration_tests.rs diff --git a/Cargo.lock b/Cargo.lock index e146b29f9..06299dd60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1360,6 +1360,7 @@ name = "executor" version = "0.1.0" dependencies = [ "async-trait", + "backoff", "bincode", "blake2", "bytes", @@ -1375,6 +1376,7 @@ dependencies = [ "prometheus", "rand 0.8.5", "serde", + "telemetry-subscribers 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=d965a5a795dcdb4d1c7964acf556bc249fdc58aa)", "tempfile", "test_utils", "thiserror", @@ -2303,7 +2305,7 @@ dependencies = [ "serde_yaml", "structopt", "task-group", - "telemetry-subscribers", + "telemetry-subscribers 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=f4aa523d3029bd6a23bead5f04c182f2cfa04c5e)", "test_utils", "thiserror", "tokio", @@ -2797,7 +2799,7 @@ dependencies = [ "rand 0.8.5", "serde", "tap", - "telemetry-subscribers", + "telemetry-subscribers 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=f4aa523d3029bd6a23bead5f04c182f2cfa04c5e)", "tempfile", "test_utils", "thiserror", @@ -3793,6 +3795,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "telemetry-subscribers" +version = "0.1.0" +source = "git+https://github.com/mystenlabs/mysten-infra.git?rev=d965a5a795dcdb4d1c7964acf556bc249fdc58aa#d965a5a795dcdb4d1c7964acf556bc249fdc58aa" +dependencies = [ + "crossterm", + "once_cell", + "opentelemetry", + "opentelemetry-jaeger", + "tokio", + "tracing", + "tracing-appender", + "tracing-bunyan-formatter", + "tracing-chrome", + "tracing-opentelemetry", + "tracing-subscriber 0.3.15", +] + [[package]] name = "telemetry-subscribers" version = "0.1.0" @@ -5085,7 +5105,8 @@ dependencies = [ "synstructure", "tap", "task-group", - "telemetry-subscribers", + "telemetry-subscribers 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=d965a5a795dcdb4d1c7964acf556bc249fdc58aa)", + "telemetry-subscribers 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=f4aa523d3029bd6a23bead5f04c182f2cfa04c5e)", "tempfile", "termcolor", "terminal_size", diff --git a/executor/Cargo.toml b/executor/Cargo.toml index f418f24af..f10f2ff3a 100644 --- a/executor/Cargo.toml +++ b/executor/Cargo.toml @@ -23,6 +23,7 @@ tokio-util = { version = "0.7.3", features = ["codec"] } tonic = "0.7.2" tracing = "0.1.36" prometheus = "0.13.1" +backoff = { version = "0.4.0", features = ["tokio"] } types = { path = "../types" } worker = { path = "../worker" } @@ -36,5 +37,7 @@ match_opt = "0.1.2" indexmap = { version = "1.9.1", features = ["serde"] } rand = "0.8.5" tempfile = "3.3.0" +primary = { path = "../primary" } test_utils = { path = "../test_utils" } types = { path = "../types" } +telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" } diff --git a/executor/src/batch_loader.rs b/executor/src/batch_loader.rs deleted file mode 100644 index 28f63659b..000000000 --- a/executor/src/batch_loader.rs +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use crate::{ - errors::{SubscriberError, SubscriberResult}, - DEFAULT_CHANNEL_SIZE, -}; - -use config::WorkerId; -use consensus::ConsensusOutput; -use futures::stream::StreamExt; -use multiaddr::Multiaddr; -use std::collections::{HashMap, HashSet}; -use store::Store; -use tokio::{ - sync::{ - mpsc::{channel, Receiver, Sender}, - watch, - }, - task::JoinHandle, -}; -use tracing::{error, warn}; -use types::{ - metered_channel, serialized_batch_digest, BatchDigest, BincodeEncodedPayload, - ClientBatchRequest, ReconfigureNotification, SerializedBatchMessage, WorkerToWorkerClient, -}; - -/// Download transactions data from the consensus workers and notifies the called when the job is done. -pub struct BatchLoader { - /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, - /// Receive reconfiguration updates. - rx_reconfigure: watch::Receiver, - /// Receive consensus outputs for which to download the associated transaction data. - rx_input: metered_channel::Receiver, - /// The network addresses of the consensus workers. - addresses: HashMap, - /// A map of connections with the consensus workers. - connections: HashMap>>, -} - -impl BatchLoader { - /// Spawn a new batch loaded in a dedicated tokio task. - #[must_use] - pub fn spawn( - store: Store, - rx_reconfigure: watch::Receiver, - rx_input: metered_channel::Receiver, - addresses: HashMap, - ) -> JoinHandle<()> { - tokio::spawn(async move { - Self { - store, - rx_reconfigure, - rx_input, - addresses, - connections: HashMap::new(), - } - .run() - .await - .expect("Failed to run batch loader") - }) - } - - /// Receive consensus messages for which we need to download the associated transaction data. - async fn run(&mut self) -> SubscriberResult<()> { - loop { - tokio::select! { - // Receive sync requests. - Some(message) = self.rx_input.recv() => { - let certificate = &message.certificate; - - // Send a request for every batch referenced by the certificate. - // TODO: Can we write it better without allocating a HashMap every time? - let mut map = HashMap::with_capacity(certificate.header.payload.len()); - for (digest, worker_id) in certificate.header.payload.iter() { - map.entry(*worker_id).or_insert_with(Vec::new).push(*digest); - } - for (worker_id, digests) in map { - let address = self - .addresses - .get(&worker_id) - .ok_or(SubscriberError::UnexpectedWorkerId(worker_id))?; - - let sender = self.connections.entry(worker_id).or_insert_with(|| { - let (sender, receiver) = channel(DEFAULT_CHANNEL_SIZE); - SyncConnection::spawn( - address.clone(), - self.store.clone(), - receiver, - ); - sender - }); - - sender - .send(digests) - .await - .expect("Sync connections are kept alive and never die"); - } - } - - // Check whether the committee changed. - result = self.rx_reconfigure.changed() => { - result.expect("Committee channel dropped"); - let message = self.rx_reconfigure.borrow().clone(); - if let ReconfigureNotification::Shutdown = message { - return Ok(()); - } - } - } - } - } -} - -/// Connect (and maintain a connection) with a specific worker. Then download batches from that -/// specific worker. -struct SyncConnection { - /// The address of the worker to connect with. - address: Multiaddr, - /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, - /// Receive the batches to download from the worker. - rx_request: Receiver>, - /// Keep a set of requests already made to avoid asking twice for the same batch. - to_request: HashSet, -} - -impl SyncConnection { - /// Spawn a new worker connection in a dedicated tokio task. - pub fn spawn( - address: Multiaddr, - store: Store, - rx_request: Receiver>, - ) { - tokio::spawn(async move { - Self { - address, - store, - rx_request, - to_request: HashSet::new(), - } - .run() - .await; - }); - } - - /// Main loop keeping the connection with a worker alive and receive batches to download. - async fn run(&mut self) { - let config = mysten_network::config::Config::new(); - //TODO don't panic on bad address - let channel = config.connect_lazy(&self.address).unwrap(); - let mut client = WorkerToWorkerClient::new(channel); - - while let Some(digests) = self.rx_request.recv().await { - // Filter digests that we already requested. - for digest in digests { - self.to_request.insert(digest); - } - - let missing = self.to_request.iter().copied().collect(); - // Request the batch from the worker. - let message = ClientBatchRequest(missing); - //TODO wrap this call in the retry - let mut stream = match client - .client_batch_request(BincodeEncodedPayload::try_from(&message).unwrap()) - .await - { - Ok(stream) => stream.into_inner(), - Err(e) => { - warn!( - "Failed to send sync request to worker {}: {e}", - self.address - ); - continue; - } - }; - - // Receive the batch data from the worker. - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let batch = batch.payload; - // Store the batch in the temporary store. - // TODO: We can probably avoid re-computing the hash of the bach since we trust the worker. - let res_digest = serialized_batch_digest(&batch); - match res_digest { - Ok(digest) => { - self.store.write(digest, batch.to_vec()).await; - - // Cleanup internal state. - self.to_request.remove(&digest); - } - Err(error) => { - error!("Worker sent invalid serialized batch data: {error}"); - } - } - } - Err(e) => { - warn!( - "Failed to receive batch reply from worker {}: {e}", - self.address - ); - } - } - } - } - } -} diff --git a/executor/src/core.rs b/executor/src/core.rs index fa3e8c0db..09a1a57ad 100644 --- a/executor/src/core.rs +++ b/executor/src/core.rs @@ -14,11 +14,7 @@ use tokio::{ task::JoinHandle, }; use tracing::debug; -use types::{ - metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber, - SerializedBatchMessage, -}; -use worker::WorkerMessage; +use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber}; #[cfg(test)] #[path = "tests/executor_tests.rs"] @@ -30,7 +26,7 @@ pub mod executor_tests; /// not processes twice the same transaction (despite crash-recovery). pub struct Core { /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, + store: Store, /// The (global) state to perform execution. execution_state: Arc, /// Receive reconfiguration updates. @@ -58,7 +54,7 @@ where /// Spawn a new executor in a dedicated tokio task. #[must_use] pub fn spawn( - store: Store, + store: Store, execution_state: Arc, rx_reconfigure: watch::Receiver, rx_subscriber: metered_channel::Receiver, @@ -138,8 +134,8 @@ where total_batches: usize, ) -> SubscriberResult<()> { // The store should now hold all transaction data referenced by the input certificate. - let batch = match self.store.read(batch_digest).await? { - Some(x) => x, + let transactions = match self.store.read(batch_digest).await? { + Some(x) => x.0, None => { // If two certificates contain the exact same batch (eg. by the actions of a Byzantine // consensus node), some correct client may already have deleted the batch from their @@ -152,12 +148,6 @@ where } }; - // Deserialize the consensus workers' batch message to retrieve a list of transactions. - let transactions = match bincode::deserialize(&batch)? { - WorkerMessage::Batch(Batch(x)) => x, - _ => bail!(SubscriberError::UnexpectedProtocolMessage), - }; - // Execute every transaction in the batch. let total_transactions = transactions.len(); for (index, transaction) in transactions.into_iter().enumerate() { diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 99e5ec90b..1feb670b4 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -4,6 +4,7 @@ use config::WorkerId; use std::fmt::Debug; use store::StoreError; use thiserror::Error; +use types::CertificateDigest; #[macro_export] macro_rules! bail { @@ -28,6 +29,9 @@ pub enum SubscriberError { #[error("Storage failure: {0}")] StoreError(#[from] StoreError), + #[error("Error occurred while retrieving certificate {0} payload: {1}")] + PayloadRetrieveError(CertificateDigest, String), + #[error("Consensus referenced unexpected worker id {0}")] UnexpectedWorkerId(WorkerId), diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 4b04d7151..b6aa7a354 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -1,6 +1,5 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -mod batch_loader; mod core; mod errors; mod state; @@ -17,32 +16,22 @@ mod execution_state; mod metrics; pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult}; -use multiaddr::{Multiaddr, Protocol}; pub use state::ExecutionIndices; -use crate::{ - batch_loader::BatchLoader, core::Core, metrics::ExecutorMetrics, subscriber::Subscriber, -}; +use crate::{core::Core, metrics::ExecutorMetrics, subscriber::Subscriber}; use async_trait::async_trait; -use config::SharedCommittee; use consensus::ConsensusOutput; -use crypto::PublicKey; +use primary::BlockCommand; use prometheus::Registry; use serde::de::DeserializeOwned; -use std::{ - borrow::Cow, - collections::HashMap, - fmt::Debug, - net::{Ipv4Addr, Ipv6Addr}, - sync::Arc, -}; +use std::{fmt::Debug, sync::Arc}; use store::Store; use tokio::{ sync::{mpsc::Sender, watch}, task::JoinHandle, }; use tracing::info; -use types::{metered_channel, BatchDigest, ReconfigureNotification, SerializedBatchMessage}; +use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification}; /// Default inter-task channel size. pub const DEFAULT_CHANNEL_SIZE: usize = 1_000; @@ -99,13 +88,12 @@ pub struct Executor; impl Executor { /// Spawn a new client subscriber. pub async fn spawn( - name: PublicKey, - committee: SharedCommittee, - store: Store, + store: Store, execution_state: Arc, tx_reconfigure: &watch::Sender, rx_consensus: metered_channel::Receiver, tx_output: Sender>, + tx_get_block_commands: metered_channel::Sender, registry: &Registry, ) -> SubscriberResult)>> where @@ -115,8 +103,6 @@ impl Executor { { let metrics = ExecutorMetrics::new(registry); - let (tx_batch_loader, rx_batch_loader) = - metered_channel::channel(DEFAULT_CHANNEL_SIZE, &metrics.tx_batch_loader); let (tx_executor, rx_executor) = metered_channel::channel(DEFAULT_CHANNEL_SIZE, &metrics.tx_executor); @@ -129,141 +115,27 @@ impl Executor { // Spawn the subscriber. let subscriber_handle = Subscriber::spawn( store.clone(), + tx_get_block_commands, tx_reconfigure.subscribe(), rx_consensus, - tx_batch_loader, tx_executor, ); // Spawn the executor's core. let executor_handle = Core::::spawn( - store.clone(), + store, execution_state, tx_reconfigure.subscribe(), /* rx_subscriber */ rx_executor, tx_output, ); - // Spawn the batch loader. - let mut worker_addresses: HashMap = committee - .load() - .authorities - .iter() - .find_map(|v| match_opt::match_opt!(v, (x, authority) if *x == name => authority)) - .expect("Our public key is not in the committee") - .workers - .iter() - .map(|(id, x)| (*id, x.worker_to_worker.clone())) - .collect(); - //////////////////////////////////////////////////////////////// - // TODO: remove this hack once #706 is fixed - //////////////////////////////////////////////////////////////// - - // retrieve our primary address - let our_primary_to_primary_address = committee - .load() - .primary(&name) - .expect("Out public key is not in the committee!") - .primary_to_primary; - // extract the hostname portion - let our_primary_hostname = our_primary_to_primary_address - .into_iter() - .flat_map(move |x| match x { - p @ Protocol::Ip4(_) | p @ Protocol::Ip6(_) | p @ Protocol::Dns(_) => Some(p), - _ => None, - }) - .next() - .expect("Could not find hostname in our primary address!"); - // Modify the worker addresses that we are about to use : would we talk better using a loopback address? - for worker_address in worker_addresses.values_mut() { - replace_distant_by_localhost(worker_address, &our_primary_hostname); - } - //////////////////////////////////////////////////////////////// - - let batch_loader_handle = BatchLoader::spawn( - store, - tx_reconfigure.subscribe(), - rx_batch_loader, - worker_addresses, - ); - // Return the handle. info!("Consensus subscriber successfully started"); + Ok(vec![ ("executor_subscriber", subscriber_handle), ("executor", executor_handle), - ("executor_batch_loader", batch_loader_handle), ]) } } - -fn replace_distant_by_localhost(target: &mut Multiaddr, hostname_pattern: &Protocol) { - // does the hostname match our pattern exactly? - if target.iter().next() == Some(hostname_pattern.clone()) { - if let Some(replacement) = target.replace(0, move |x| match x { - Protocol::Ip4(_) => Some(Protocol::Ip4(Ipv4Addr::LOCALHOST)), - Protocol::Ip6(_) => Some(Protocol::Ip6(Ipv6Addr::LOCALHOST)), - Protocol::Dns(_) => Some(Protocol::Dns(Cow::Owned("localhost".to_owned()))), - _ => None, - }) { - tracing::debug!("Address for worker {} replaced by {}", target, replacement); - *target = replacement; - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use multiaddr::{multiaddr, Protocol}; - use std::net::Ipv4Addr; - - #[test] - fn test_replace_distant_by_localhost() { - // IPV4 positive - let non_local: Ipv4Addr = "8.8.8.8".parse().unwrap(); - let mut addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr1, &Protocol::Ip4(non_local)); - assert_eq!(addr1, multiaddr!(Ip4(Ipv4Addr::LOCALHOST), Tcp(10000u16))); - - // IPV4 negative - let other_target: Ipv4Addr = "8.8.8.4".parse().unwrap(); - let addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); - let mut addr2 = multiaddr!(Ip4(non_local), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr2, &Protocol::Ip4(other_target)); - assert_eq!(addr2, addr1); - - // IPV6 positive - let non_local: Ipv6Addr = "2607:f0d0:1002:51::4".parse().unwrap(); - let mut addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr1, &Protocol::Ip6(non_local)); - assert_eq!(addr1, multiaddr!(Ip6(Ipv6Addr::LOCALHOST), Tcp(10000u16))); - - // IPV6 negative - let other_target: Ipv6Addr = "2607:f0d0:1002:50::4".parse().unwrap(); - let addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); - let mut addr2 = multiaddr!(Ip6(non_local), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr2, &Protocol::Ip6(other_target)); - assert_eq!(addr2, addr1); - - // DNS positive - let non_local: Cow = Cow::Owned("google.com".to_owned()); - let mut addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr1, &Protocol::Dns(non_local.clone())); - let localhost: Cow = Cow::Owned("localhost".to_owned()); - assert_eq!(addr1, multiaddr!(Dns(localhost), Tcp(10000u16))); - - // DNS negative - let other_target: Cow = Cow::Owned("apple.com".to_owned()); - let addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); - let mut addr2 = multiaddr!(Dns(non_local), Tcp(10000u16)); - - replace_distant_by_localhost(&mut addr2, &Protocol::Dns(other_target)); - assert_eq!(addr2, addr1); - } -} diff --git a/executor/src/metrics.rs b/executor/src/metrics.rs index 3e1fa117b..188b88959 100644 --- a/executor/src/metrics.rs +++ b/executor/src/metrics.rs @@ -4,8 +4,6 @@ use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, R #[derive(Clone, Debug)] pub struct ExecutorMetrics { - /// occupancy of the channel from the `Subscriber` to `BatchLoader` - pub tx_batch_loader: IntGauge, /// occupancy of the channel from the `Subscriber` to `Core` pub tx_executor: IntGauge, } @@ -13,12 +11,6 @@ pub struct ExecutorMetrics { impl ExecutorMetrics { pub fn new(registry: &Registry) -> Self { Self { - tx_batch_loader: register_int_gauge_with_registry!( - "tx_batch_loader", - "occupancy of the channel from the `Subscriber` to `BatchLoader`", - registry - ) - .unwrap(), tx_executor: register_int_gauge_with_registry!( "tx_executor", "occupancy of the channel from the `Subscriber` to `Core`", diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index a4f3d070b..94a6b4dec 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,54 +1,72 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::{SubscriberError, SubscriberResult}; +use crate::{errors::SubscriberResult, SubscriberError, SubscriberError::PayloadRetrieveError}; +use backoff::{Error, ExponentialBackoff}; use consensus::ConsensusOutput; -use futures::{ - future::try_join_all, - stream::{FuturesOrdered, StreamExt}, -}; +use crypto::Hash; +use futures::stream::{FuturesOrdered, StreamExt}; +use primary::BlockCommand; +use std::time::Duration; use store::Store; -use tokio::{sync::watch, task::JoinHandle}; -use tracing::debug; -use types::{metered_channel, BatchDigest, ReconfigureNotification, SerializedBatchMessage}; +use tokio::{ + sync::{oneshot, watch}, + task::JoinHandle, +}; +use tracing::{debug, error}; +use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification}; #[cfg(test)] #[path = "tests/subscriber_tests.rs"] pub mod subscriber_tests; /// The `Subscriber` receives certificates sequenced by the consensus and waits until the -/// `BatchLoader` downloaded all the transactions references by the certificates; it then +/// downloaded all the transactions references by the certificates; it then /// forward the certificates to the Executor Core. pub struct Subscriber { /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, + store: Store, /// Receive reconfiguration updates. rx_reconfigure: watch::Receiver, /// A channel to receive consensus messages. rx_consensus: metered_channel::Receiver, - /// A channel to the batch loader to download transaction's data. - tx_batch_loader: metered_channel::Sender, /// A channel to send the complete and ordered list of consensus outputs to the executor. This /// channel is used once all transactions data are downloaded. tx_executor: metered_channel::Sender, + // A channel to send commands to the block waiter to receive + // a certificate's batches (block). + tx_get_block_commands: metered_channel::Sender, + // When asking for a certificate's payload we want to retry until we succeed, unless + // some irrecoverable error occurs. For that reason a backoff policy is defined + get_block_retry_policy: ExponentialBackoff, } impl Subscriber { /// Spawn a new subscriber in a new tokio task. #[must_use] pub fn spawn( - store: Store, + store: Store, + tx_get_block_commands: metered_channel::Sender, rx_reconfigure: watch::Receiver, rx_consensus: metered_channel::Receiver, - tx_batch_loader: metered_channel::Sender, tx_executor: metered_channel::Sender, ) -> JoinHandle<()> { + let get_block_retry_policy = ExponentialBackoff { + initial_interval: Duration::from_millis(500), + randomization_factor: backoff::default::RANDOMIZATION_FACTOR, + multiplier: backoff::default::MULTIPLIER, + max_interval: Duration::from_secs(10), // Maximum backoff is 10 seconds + max_elapsed_time: None, // Never end retrying unless a non recoverable error occurs. + ..Default::default() + }; + tokio::spawn(async move { Self { store, rx_reconfigure, rx_consensus, - tx_batch_loader, tx_executor, + tx_get_block_commands, + get_block_retry_policy, } .run() .await @@ -56,21 +74,14 @@ impl Subscriber { }) } - /// Wait for particular data to become available in the storage and then returns. - async fn waiter( - missing: Vec, - store: Store, - deliver: T, - ) -> SubscriberResult { - let waiting: Vec<_> = missing.into_iter().map(|x| store.notify_read(x)).collect(); - try_join_all(waiting) - .await - .map(|_| deliver) - .map_err(SubscriberError::from) - } - /// Main loop connecting to the consensus to listen to sequence messages. async fn run(&mut self) -> SubscriberResult<()> { + // It's important to have the futures in ordered fashion as we want + // to guarantee that will deliver to the executor the certificates + // in the same order we received from rx_consensus. So it doesn't + // mater if we somehow managed to fetch the batches from a later + // certificate. Unless the earlier certificate's payload has been + // fetched, no later certificate will be delivered. let mut waiting = FuturesOrdered::new(); // Listen to sequenced consensus message and process them. @@ -78,18 +89,15 @@ impl Subscriber { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. Some(message) = self.rx_consensus.recv() => { - // Send the certificate to the batch loader to download all transactions' data. - self.tx_batch_loader - .send(message.clone()) - .await - .expect("Failed to send message ot batch loader"); - - // Wait for the transaction data to be available in the store. This will happen for sure because - // the primary already successfully processed the certificate. This implies that the primary notified - // its worker to download any missing batch. We may however have to wait for these batch be available - // on our workers. Once all batches are available, we forward the certificate o the Executor Core. - let digests = message.certificate.header.payload.keys().cloned().collect(); - let future = Self::waiter(digests, self.store.clone(), message); + // Fetch the certificate's payload from the workers. This is done via the + // block_waiter component. If the batches are not available in the workers then + // block_waiter will do its best to sync from the other peers. Once all batches + // are available, we forward the certificate o the Executor Core. + let future = Self::wait_on_payload( + self.get_block_retry_policy.clone(), + self.store.clone(), + self.tx_get_block_commands.clone(), + message); waiting.push(future); }, @@ -112,4 +120,55 @@ impl Subscriber { } } } + + /// The wait_on_payload will try to retrieve the certificate's payload + /// from the workers via the block_waiter component and relase the + /// `deliver` once successfully done. Since we want the output to be + /// sequenced we will not quit this method until we have successfully + /// fetched the payload. + async fn wait_on_payload( + back_off_policy: ExponentialBackoff, + store: Store, + tx_get_block_commands: metered_channel::Sender, + deliver: ConsensusOutput, + ) -> SubscriberResult { + let get_block = move || { + let message = deliver.clone(); + let id = message.certificate.digest(); + let tx_get_block = tx_get_block_commands.clone(); + let batch_store = store.clone(); + + async move { + let (sender, receiver) = oneshot::channel(); + + tx_get_block + .send(BlockCommand::GetBlock { id, sender }) + .await + .map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))?; + + return match receiver + .await + .map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))? + { + Ok(block) => { + // we successfully received the payload. Now let's add to store + batch_store + .write_all(block.batches.into_iter().map(|b| (b.id, b.transactions))) + .await + .map_err(|err| Error::permanent(SubscriberError::from(err)))?; + + Ok(message) + } + Err(err) => { + // whatever the error might be at this point we don't + // have many options apart from retrying. + error!("Error while retrieving block via block waiter: {}", err); + Err(Error::transient(PayloadRetrieveError(id, err.to_string()))) + } + }; + } + }; + + backoff::future::retry(back_off_policy, get_block).await + } } diff --git a/executor/src/tests/fixtures.rs b/executor/src/tests/fixtures.rs index e7758fe17..733fd5cce 100644 --- a/executor/src/tests/fixtures.rs +++ b/executor/src/tests/fixtures.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use config::WorkerId; +use crypto::Hash; use indexmap::IndexMap; use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Serialize; @@ -10,21 +11,18 @@ use store::{ rocks::{open_cf, DBMap}, Store, }; -use types::{ - serialized_batch_digest, Batch, BatchDigest, Certificate, Header, SerializedBatchMessage, -}; -use worker::WorkerMessage; +use types::{Batch, BatchDigest, Certificate, Header}; /// A test batch containing specific transactions. -pub fn test_batch(transactions: Vec) -> (BatchDigest, SerializedBatchMessage) { - let batch = transactions +pub fn test_batch(transactions: Vec) -> (BatchDigest, Batch) { + let serialised_transactions = transactions .iter() .map(|x| bincode::serialize(x).unwrap()) .collect(); - let message = WorkerMessage::Batch(Batch(batch)); - let serialized = bincode::serialize(&message).unwrap(); - let digest = serialized_batch_digest(&serialized).unwrap(); - (digest, serialized) + + let batch = Batch(serialised_transactions); + + (batch.digest(), batch) } /// A test certificate with a specific payload. @@ -39,12 +37,12 @@ pub fn test_certificate(payload: IndexMap) -> Certificate } /// Make a test storage to hold transaction data. -pub fn test_store() -> Store { +pub fn test_store() -> Store { let store_path = tempfile::tempdir().unwrap(); - const BATCHES_CF: &str = "batches"; - let rocksdb = open_cf(store_path, None, &[BATCHES_CF]).unwrap(); - let batch_map = reopen!(&rocksdb, BATCHES_CF;); - Store::new(batch_map) + const TEMP_BATCHES_CF: &str = "temp_batches"; + let rocksdb = open_cf(store_path, None, &[TEMP_BATCHES_CF]).unwrap(); + let temp_batch_map = reopen!(&rocksdb, TEMP_BATCHES_CF;); + Store::new(temp_batch_map) } /// Create a number of test certificates containing transactions of type u64. @@ -52,7 +50,7 @@ pub fn test_u64_certificates( certificates: usize, batches_per_certificate: usize, transactions_per_batch: usize, -) -> Vec<(Certificate, Vec<(BatchDigest, SerializedBatchMessage)>)> { +) -> Vec<(Certificate, Vec<(BatchDigest, Batch)>)> { let mut rng = StdRng::from_seed([0; 32]); (0..certificates) .map(|_| { diff --git a/executor/src/tests/subscriber_tests.rs b/executor/src/tests/subscriber_tests.rs index a4f26f35f..f72bc7d5b 100644 --- a/executor/src/tests/subscriber_tests.rs +++ b/executor/src/tests/subscriber_tests.rs @@ -2,17 +2,21 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::fixtures::{test_store, test_u64_certificates}; +use primary::GetBlockResponse; use test_utils::{committee, test_channel}; -use types::{Certificate, SequenceNumber}; +use types::{ + BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber, +}; /// Spawn a mock consensus core and a test subscriber. async fn spawn_subscriber( rx_sequence: metered_channel::Receiver, - tx_batch_loader: metered_channel::Sender, tx_executor: metered_channel::Sender, + tx_get_block_commands: metered_channel::Sender, ) -> ( - Store, + Store, watch::Sender, + JoinHandle<()>, ) { let committee = committee(None); let message = ReconfigureNotification::NewEpoch(committee); @@ -20,39 +24,34 @@ async fn spawn_subscriber( // Spawn a test subscriber. let store = test_store(); - let _subscriber_handle = Subscriber::spawn( + let subscriber_handle = Subscriber::spawn( store.clone(), + tx_get_block_commands, rx_reconfigure, rx_sequence, - tx_batch_loader, tx_executor, ); - (store, tx_reconfigure) + (store, tx_reconfigure, subscriber_handle) } #[tokio::test] async fn handle_certificate_with_downloaded_batch() { let (tx_sequence, rx_sequence) = test_channel!(10); - let (tx_batch_loader, mut rx_batch_loader) = test_channel!(10); let (tx_executor, mut rx_executor) = test_channel!(10); + let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (store, _tx_reconfigure) = - spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; + let (store, _tx_reconfigure, _) = + spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await; - // Feed certificates to the mock sequencer and ensure the batch loader receive the command to - // download the corresponding transaction data. let total_certificates = 2; let certificates = test_u64_certificates( total_certificates, /* batches_per_certificate */ 2, /* transactions_per_batch */ 2, ); - for (i, (certificate, batches)) in certificates.into_iter().enumerate() { - for (digest, batch) in batches { - store.write(digest, batch).await; - } + for (i, (certificate, _)) in certificates.clone().into_iter().enumerate() { let message = ConsensusOutput { certificate, consensus_index: i as SequenceNumber, @@ -61,37 +60,171 @@ async fn handle_certificate_with_downloaded_batch() { } for i in 0..total_certificates { - let output = rx_batch_loader.recv().await.unwrap(); - assert_eq!(output.consensus_index, i as SequenceNumber); + let request = rx_get_block_command.recv().await.unwrap(); + + let batches = match request { + BlockCommand::GetBlock { id, sender } => { + let (certificate, batches) = certificates.get(i).unwrap().to_owned(); + + assert_eq!( + certificate.digest(), + id, + "Out of order certificate id has been received" + ); + + // Mimic the block_waiter here and respond with the payload back + let ok = successful_block_response(id, batches.clone()); + + sender.send(ok).unwrap(); + + batches + } + _ => panic!("Unexpected command received"), + }; let output = rx_executor.recv().await.unwrap(); assert_eq!(output.consensus_index, i as SequenceNumber); + + // Ensure all the batches have been written in storage + for (batch_id, batch) in batches { + let stored_batch = store + .read(batch_id) + .await + .expect("Error while retrieving batch") + .unwrap(); + assert_eq!(batch, stored_batch); + } } } #[tokio::test] -async fn handle_empty_certificate() { +async fn should_retry_when_failed_to_get_payload() { let (tx_sequence, rx_sequence) = test_channel!(10); - let (tx_batch_loader, mut rx_batch_loader) = test_channel!(10); let (tx_executor, mut rx_executor) = test_channel!(10); + let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let _do_not_drop = spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; + let (store, _tx_reconfigure, _) = + spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await; - // Feed certificates to the mock sequencer and ensure the batch loader receive the command to - // download the corresponding transaction data. - for i in 0..2 { - let message = ConsensusOutput { - certificate: Certificate::default(), - consensus_index: i as SequenceNumber, + // Create a certificate + let total_certificates = 1; + let certificates = test_u64_certificates( + total_certificates, + /* batches_per_certificate */ 2, + /* transactions_per_batch */ 2, + ); + + let (certificate, batches) = certificates.first().unwrap().to_owned(); + let certificate_id = certificate.digest(); + + let message = ConsensusOutput { + certificate, + consensus_index: 500 as SequenceNumber, + }; + + // send the certificate to download payload + tx_sequence.send(message).await.unwrap(); + + // Now assume that the block_wait is responding with error for the + // requested certificate for RETRIES -1 attempts. + // Finally on the last one we reply with a successful result. + const RETRIES: u32 = 3; + for i in 0..RETRIES { + let request = rx_get_block_command.recv().await.unwrap(); + + match request { + BlockCommand::GetBlock { id, sender } => { + assert_eq!(certificate_id, id); + + if i < RETRIES - 1 { + sender + .send(Err(BlockError { + id, + error: BlockErrorKind::BatchTimeout, + })) + .unwrap(); + } else { + // Mimic the block_waiter here and respond with the payload back + let ok = successful_block_response(id, batches.clone()); + + sender.send(ok).unwrap(); + } + } + _ => panic!("Unexpected command received"), }; - tx_sequence.send(message).await.unwrap(); } - for i in 0..2 { - let output = rx_batch_loader.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - let output = rx_executor.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); + // Now the message will be delivered and should be forwarded to tx_executor + let output = rx_executor.recv().await.unwrap(); + assert_eq!(output.consensus_index, 500 as SequenceNumber); + + // Ensure all the batches have been written in storage + for (batch_id, batch) in batches { + let stored_batch = store + .read(batch_id) + .await + .expect("Error while retrieving batch") + .unwrap(); + assert_eq!(batch, stored_batch); } } + +#[tokio::test] +async fn subscriber_should_crash_when_irrecoverable_error() { + let (tx_sequence, rx_sequence) = test_channel!(10); + let (tx_executor, _rx_executor) = test_channel!(10); + let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); + + // Spawn a subscriber. + let (_store, _tx_reconfigure, handle) = + spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await; + + // Create a certificate + let total_certificates = 1; + let certificates = test_u64_certificates( + total_certificates, + /* batches_per_certificate */ 2, + /* transactions_per_batch */ 2, + ); + + let (certificate, _batches) = certificates.first().unwrap().to_owned(); + + let message = ConsensusOutput { + certificate, + consensus_index: 500 as SequenceNumber, + }; + + // now close the tx_get_block_command in order to inject an artificial + // error and make any retries stop and propagate the error + rx_get_block_command.close(); + + // send the certificate to download payload + // We expect this to make the subscriber crash + tx_sequence.send(message).await.unwrap(); + + let err = handle + .await + .expect_err("Expected an error, instead a successful response returned"); + assert!(err.is_panic()); +} + +// Helper method to create a successful (OK) get_block response. +fn successful_block_response( + id: CertificateDigest, + batches: Vec<(BatchDigest, Batch)>, +) -> BlockResult { + // Mimic the block_waiter here and respond with the payload back + let batch_messages = batches + .iter() + .map(|(batch_id, batch)| BatchMessage { + id: *batch_id, + transactions: batch.clone(), + }) + .collect(); + + Ok(GetBlockResponse { + id, + batches: batch_messages, + }) +} diff --git a/executor/tests/consensus_integration_tests.rs b/executor/tests/consensus_integration_tests.rs new file mode 100644 index 000000000..08b54eadc --- /dev/null +++ b/executor/tests/consensus_integration_tests.rs @@ -0,0 +1,92 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use bytes::Bytes; +use telemetry_subscribers::TelemetryGuards; +use test_utils::cluster::Cluster; +use types::TransactionProto; + +#[tokio::test] +async fn test_internal_consensus_output() { + // Enabled debug tracing so we can easily observe the + // nodes logs. + let _guard = setup_tracing(); + + let mut cluster = Cluster::new(None, None, true); + + // start the cluster + cluster.start(Some(4), Some(1), None).await; + + // get a client to send transactions + let worker_id = 0; + + let authority = cluster.authority(0); + let mut client = authority.new_transactions_client(&worker_id).await; + + // Subscribe to the transaction confirmation channel + let mut receiver = authority + .primary() + .await + .tx_transaction_confirmation + .subscribe(); + + // Create arbitrary transactions + let mut transactions = Vec::new(); + + const NUM_OF_TRANSACTIONS: u32 = 10; + for i in 0..NUM_OF_TRANSACTIONS { + let tx = string_transaction(i); + + // serialise and send + let tr = bincode::serialize(&tx).unwrap(); + let txn = TransactionProto { + transaction: Bytes::from(tr), + }; + client.submit_transaction(txn).await.unwrap(); + + transactions.push(tx); + } + + // wait for transactions to complete + loop { + let result = receiver.recv().await.unwrap(); + + assert!(result.0.is_ok()); + + // deserialise transaction + let output_transaction = bincode::deserialize::(&result.1).unwrap(); + + // we always remove the first transaction and check with the one + // sequenced. We want the transactions to be sequenced in the + // same order as we post them. + let expected_transaction = transactions.remove(0); + + assert_eq!( + expected_transaction, output_transaction, + "Expected to have received transaction with same id. Ordering is important" + ); + + if transactions.is_empty() { + break; + } + } +} + +fn string_transaction(id: u32) -> String { + format!("test transaction:{id}") +} + +fn setup_tracing() -> TelemetryGuards { + // Setup tracing + let tracing_level = "debug"; + let network_tracing_level = "info"; + + let log_filter = format!("{tracing_level},h2={network_tracing_level},tower={network_tracing_level},hyper={network_tracing_level},tonic::transport={network_tracing_level}"); + + telemetry_subscribers::TelemetryConfig::new("narwhal") + // load env variables + .with_env() + // load special log filter + .with_log_level(&log_filter) + .init() + .0 +} diff --git a/node/src/lib.rs b/node/src/lib.rs index 26529ad0e..bbacfe910 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -12,7 +12,7 @@ use crypto::{ KeyPair, PublicKey, }; use executor::{ExecutionState, Executor, ExecutorOutput, SerializedTransaction, SubscriberResult}; -use primary::{NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics}; +use primary::{BlockCommand, NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics}; use prometheus::{IntGauge, Registry}; use std::{fmt::Debug, sync::Arc}; use store::{ @@ -27,7 +27,7 @@ use tokio::{ }; use tracing::debug; use types::{ - metered_channel, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header, + metered_channel, Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header, HeaderDigest, ReconfigureNotification, Round, SequenceNumber, SerializedBatchMessage, }; use worker::{metrics::initialise_metrics, Worker}; @@ -43,6 +43,7 @@ pub struct NodeStorage { pub payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, pub batch_store: Store, pub consensus_store: Arc, + pub temp_batch_store: Store, } impl NodeStorage { @@ -53,6 +54,7 @@ impl NodeStorage { const BATCHES_CF: &'static str = "batches"; const LAST_COMMITTED_CF: &'static str = "last_committed"; const SEQUENCE_CF: &'static str = "sequence"; + const TEMP_BATCH_CF: &'static str = "temp_batches"; /// Open or reopen all the storage of the node. pub fn reopen>(store_path: Path) -> Self { @@ -66,17 +68,27 @@ impl NodeStorage { Self::BATCHES_CF, Self::LAST_COMMITTED_CF, Self::SEQUENCE_CF, + Self::TEMP_BATCH_CF, ], ) .expect("Cannot open database"); - let (header_map, certificate_map, payload_map, batch_map, last_committed_map, sequence_map) = reopen!(&rocksdb, + let ( + header_map, + certificate_map, + payload_map, + batch_map, + last_committed_map, + sequence_map, + temp_batch_map, + ) = reopen!(&rocksdb, Self::HEADERS_CF;, Self::CERTIFICATES_CF;, Self::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>, Self::BATCHES_CF;, Self::LAST_COMMITTED_CF;, - Self::SEQUENCE_CF; + Self::SEQUENCE_CF;, + Self::TEMP_BATCH_CF; ); let header_store = Store::new(header_map); @@ -84,6 +96,7 @@ impl NodeStorage { let payload_store = Store::new(payload_map); let batch_store = Store::new(batch_map); let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sequence_map)); + let temp_batch_store = Store::new(temp_batch_map); Self { header_store, @@ -91,6 +104,7 @@ impl NodeStorage { payload_store, batch_store, consensus_store, + temp_batch_store, } } } @@ -151,6 +165,14 @@ impl Node { let (tx_consensus, rx_consensus) = metered_channel::channel(Self::CHANNEL_CAPACITY, &committed_certificates_counter); + let tx_get_block_commands_counter = IntGauge::new( + PrimaryChannelMetrics::NAME_GET_BLOCK_COMMANDS, + PrimaryChannelMetrics::DESC_GET_BLOCK_COMMANDS, + ) + .unwrap(); + let (tx_get_block_commands, rx_get_block_commands) = + metered_channel::channel(Self::CHANNEL_CAPACITY, &tx_get_block_commands_counter); + // Compute the public key of this authority. let name = keypair.public().clone(); let mut handles = Vec::new(); @@ -165,7 +187,6 @@ impl Node { (Some(Arc::new(dag)), NetworkModel::Asynchronous) } else { let consensus_handles = Self::spawn_consensus( - name.clone(), committee.clone(), store, parameters.clone(), @@ -174,6 +195,7 @@ impl Node { rx_new_certificates, tx_consensus.clone(), tx_confirmation, + tx_get_block_commands.clone(), registry, ) .await?; @@ -207,6 +229,8 @@ impl Node { store.payload_store.clone(), tx_new_certificates, /* rx_consensus */ rx_consensus, + tx_get_block_commands, + rx_get_block_commands, /* dag */ dag, network_model, tx_reconfigure, @@ -243,7 +267,6 @@ impl Node { /// Spawn the consensus core and the client executing transactions. async fn spawn_consensus( - name: PublicKey, committee: SharedCommittee, store: &NodeStorage, parameters: Parameters, @@ -255,6 +278,7 @@ impl Node { SubscriberResult<::Outcome>, SerializedTransaction, )>, + tx_get_block_commands: metered_channel::Sender, registry: &Registry, ) -> SubscriberResult)>> where @@ -294,13 +318,12 @@ impl Node { // Spawn the client executing the transactions. It can also synchronize with the // subscriber handler if it missed some transactions. let executor_handles = Executor::spawn( - name, - committee, - store.batch_store.clone(), + store.temp_batch_store.clone(), execution_state, tx_reconfigure, /* rx_consensus */ rx_sequence, /* tx_output */ tx_confirmation, + tx_get_block_commands, registry, ) .await?; diff --git a/primary/src/block_synchronizer/handler.rs b/primary/src/block_synchronizer/handler.rs index 613405b0b..79935823e 100644 --- a/primary/src/block_synchronizer/handler.rs +++ b/primary/src/block_synchronizer/handler.rs @@ -126,7 +126,6 @@ impl BlockSynchronizerHandler { } } - #[instrument(level = "debug", skip_all)] async fn wait_all(&self, certificates: Vec) -> Vec> { let futures: Vec<_> = certificates .into_iter() @@ -136,7 +135,6 @@ impl BlockSynchronizerHandler { join_all(futures).await } - #[instrument(level = "debug", skip_all, err)] async fn wait(&self, block_id: CertificateDigest) -> Result { if let Ok(result) = timeout( self.certificate_deliver_timeout, @@ -164,7 +162,7 @@ impl Handler for BlockSynchronizerHandler { /// * Internal: An internal error caused /// * BlockDeliveryTimeout: Timed out while waiting for the certificate to become available /// after submitting it for processing to core - #[instrument(level="debug", skip_all, fields(num_block_ids = block_ids.len()))] + #[instrument(level="trace", skip_all, fields(num_block_ids = block_ids.len()))] async fn get_and_synchronize_block_headers( &self, block_ids: Vec, @@ -229,7 +227,7 @@ impl Handler for BlockSynchronizerHandler { results } - #[instrument(level="debug", skip_all, fields(num_block_ids = block_ids.len()))] + #[instrument(level="trace", skip_all, fields(num_block_ids = block_ids.len()))] async fn get_block_headers( &self, block_ids: Vec, @@ -266,7 +264,7 @@ impl Handler for BlockSynchronizerHandler { results } - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn synchronize_block_payloads( &self, certificates: Vec, diff --git a/primary/src/block_synchronizer/mod.rs b/primary/src/block_synchronizer/mod.rs index fdd4d2082..4dbb8a9ce 100644 --- a/primary/src/block_synchronizer/mod.rs +++ b/primary/src/block_synchronizer/mod.rs @@ -373,7 +373,7 @@ impl BlockSynchronizer { /// logic of waiting and gathering the replies from the primary nodes /// for the payload availability. This future is returning the next State /// to be executed. - #[instrument(level="debug", skip_all, fields(num_certificates = certificates.len()))] + #[instrument(level="trace", skip_all, fields(num_certificates = certificates.len()))] async fn handle_synchronize_block_payload_command<'a>( &mut self, certificates: Vec, @@ -405,6 +405,9 @@ impl BlockSynchronizer { trace!("Certificate payloads need sync"); } + // TODO: add metric here to track the number of certificates + // requested that are missing a payload + let key = RequestID::from_iter(certificates_to_sync.iter()); let message = PrimaryMessage::PayloadAvailabilityRequest { @@ -555,7 +558,7 @@ impl BlockSynchronizer { /// a reply is immediately sent to the consumer via the provided respond_to /// channel. For the ones that haven't been found, are returned back on the /// returned vector. - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn reply_with_payload_already_in_storage( &self, certificates: Vec, @@ -616,7 +619,7 @@ impl BlockSynchronizer { // Broadcasts a message to all the other primary nodes. // It returns back the primary names to which we have sent the requests. - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn broadcast_batch_request(&mut self, message: PrimaryMessage) -> Vec { // Naively now just broadcast the request to all the primaries @@ -634,7 +637,7 @@ impl BlockSynchronizer { primaries_names } - #[instrument(level="debug", skip_all, fields(request_id = ?request_id))] + #[instrument(level="trace", skip_all, fields(request_id = ?request_id))] async fn handle_synchronize_block_payloads<'a>( &mut self, request_id: RequestID, @@ -677,7 +680,7 @@ impl BlockSynchronizer { /// /// * `primary_peer_name` - The primary from which we are looking to sync the batches. /// * `certificates` - The certificates for which we want to sync their batches. - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn send_synchronize_payload_requests( &mut self, primary_peer_name: PublicKey, @@ -743,7 +746,7 @@ impl BlockSynchronizer { } } - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn handle_payload_availability_response( &mut self, response: PayloadAvailabilityResponse, @@ -766,7 +769,7 @@ impl BlockSynchronizer { } } - #[instrument(level = "debug", skip_all)] + #[instrument(level = "trace", skip_all)] async fn handle_certificates_response(&mut self, response: CertificatesResponse) { let sender = self .map_certificate_responses_senders diff --git a/primary/src/block_waiter.rs b/primary/src/block_waiter.rs index 5857b935f..43a97e712 100644 --- a/primary/src/block_waiter.rs +++ b/primary/src/block_waiter.rs @@ -421,7 +421,7 @@ impl BlockWaiter Option { if let Some((_, c)) = self.get_certificates(vec![id]).await.first() { return c.to_owned(); @@ -434,7 +434,7 @@ impl BlockWaiter, @@ -592,7 +592,7 @@ impl BlockWaiter Self { Self { @@ -157,7 +166,7 @@ impl PrimaryChannelMetrics { ).unwrap(), tx_get_block_commands: register_int_gauge_with_registry!( "tx_get_block_commands", - "occupancy of the channel from the `primary::ConsensusAPIGrpc` to the `primary::BlockWaiter`", + "occupancy of the channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter`", registry ).unwrap(), tx_batches: register_int_gauge_with_registry!( @@ -242,6 +251,21 @@ impl PrimaryChannelMetrics { registry.register(collector).unwrap(); self.tx_committed_certificates = committed_certificates_counter; } + + pub fn replace_registered_get_block_commands_metric( + &mut self, + registry: &Registry, + collector: Box>, + ) { + let tx_get_block_commands_counter = + IntGauge::new(Self::NAME_GET_BLOCK_COMMANDS, Self::DESC_GET_BLOCK_COMMANDS).unwrap(); + // TODO: Sanity-check by hashing the descs against one another + registry + .unregister(Box::new(tx_get_block_commands_counter.clone())) + .unwrap(); + registry.register(collector).unwrap(); + self.tx_get_block_commands = tx_get_block_commands_counter; + } } #[derive(Clone)] diff --git a/primary/src/primary.rs b/primary/src/primary.rs index 4ac03ef64..1ba7e49d6 100644 --- a/primary/src/primary.rs +++ b/primary/src/primary.rs @@ -15,7 +15,8 @@ use crate::{ proposer::Proposer, state_handler::StateHandler, synchronizer::Synchronizer, - BlockRemover, CertificatesResponse, DeleteBatchMessage, PayloadAvailabilityResponse, + BlockCommand, BlockRemover, CertificatesResponse, DeleteBatchMessage, + PayloadAvailabilityResponse, }; use async_trait::async_trait; use config::{Parameters, SharedCommittee, WorkerId, WorkerInfo}; @@ -60,6 +61,7 @@ impl Primary { const INADDR_ANY: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); // Spawns the primary and returns the JoinHandles of its tasks, as well as a metered receiver for the Consensus. + #[allow(clippy::too_many_arguments)] pub fn spawn + Send + 'static>( name: PublicKey, signer: Signatory, @@ -70,6 +72,8 @@ impl Primary { payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, tx_consensus: Sender, rx_consensus: Receiver, + tx_get_block_commands: Sender, + rx_get_block_commands: Receiver, dag: Option>, network_model: NetworkModel, tx_reconfigure: watch::Sender, @@ -117,10 +121,6 @@ impl Primary { CHANNEL_CAPACITY, &primary_channel_metrics.tx_helper_requests, ); - let (tx_get_block_commands, rx_get_block_commands) = channel( - CHANNEL_CAPACITY, - &primary_channel_metrics.tx_get_block_commands, - ); let (tx_batches, rx_batches) = channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_batches); let (tx_block_removal_commands, rx_block_removal_commands) = channel( @@ -152,10 +152,17 @@ impl Primary { registry, Box::new(committed_certificates_gauge), ); + let new_certificates_gauge = tx_consensus.gauge().clone(); primary_channel_metrics .replace_registered_new_certificates_metric(registry, Box::new(new_certificates_gauge)); + let tx_get_block_commands_gauge = tx_get_block_commands.gauge().clone(); + primary_channel_metrics.replace_registered_get_block_commands_metric( + registry, + Box::new(tx_get_block_commands_gauge), + ); + let (tx_consensus_round_updates, rx_consensus_round_updates) = watch::channel(0u64); let our_workers = committee diff --git a/primary/tests/epoch_change.rs b/primary/tests/epoch_change.rs index f0ece3f6a..2eb84566b 100644 --- a/primary/tests/epoch_change.rs +++ b/primary/tests/epoch_change.rs @@ -41,6 +41,8 @@ async fn test_simple_epoch_change() { let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -54,6 +56,8 @@ async fn test_simple_epoch_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -148,6 +152,8 @@ async fn test_partial_committee_change() { epoch_0_tx_channels.push(tx_feedback.clone()); let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -161,6 +167,8 @@ async fn test_partial_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -224,6 +232,8 @@ async fn test_partial_committee_change() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); epoch_1_tx_channels.push(tx_feedback.clone()); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee_1.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -240,6 +250,8 @@ async fn test_partial_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -302,6 +314,8 @@ async fn test_restart_with_new_committee_change() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); tx_channels.push(tx_feedback.clone()); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -318,6 +332,8 @@ async fn test_restart_with_new_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -378,6 +394,8 @@ async fn test_restart_with_new_committee_change() { let initial_committee = ReconfigureNotification::NewEpoch(new_committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); let registry = Registry::new(); @@ -391,6 +409,8 @@ async fn test_restart_with_new_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -459,6 +479,8 @@ async fn test_simple_committee_update() { let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); + let (tx_get_block_commands, rx_get_block_commands) = + test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -472,6 +494,8 @@ async fn test_simple_committee_update() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, diff --git a/primary/tests/integration_tests_proposer_api.rs b/primary/tests/integration_tests_proposer_api.rs index 2f071d570..74006f944 100644 --- a/primary/tests/integration_tests_proposer_api.rs +++ b/primary/tests/integration_tests_proposer_api.rs @@ -73,6 +73,7 @@ async fn test_rounds_errors() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -100,6 +101,8 @@ async fn test_rounds_errors() { /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, /* external_consensus */ + tx_get_block_commands, + rx_get_block_commands, Some(Arc::new( Dag::new(&no_name_committee, rx_new_certificates, consensus_metrics).1, )), @@ -158,6 +161,7 @@ async fn test_rounds_return_successful_response() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -175,6 +179,8 @@ async fn test_rounds_return_successful_response() { store_primary.payload_store, /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* external_consensus */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -307,6 +313,9 @@ async fn test_node_read_causal_signed_certificates() { let keypair_1 = k.pop().unwrap(); let name_1 = keypair_1.public().clone(); + let (tx_get_block_commands_1, rx_get_block_commands_1) = + test_utils::test_get_block_commands!(1); + // Spawn Primary 1 that we will be interacting with. Primary::spawn( name_1.clone(), @@ -318,6 +327,8 @@ async fn test_node_read_causal_signed_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands_1, + rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -340,6 +351,9 @@ async fn test_node_read_causal_signed_certificates() { let name_2 = keypair_2.public().clone(); let consensus_metrics_2 = Arc::new(ConsensusMetrics::new(&Registry::new())); + let (tx_get_block_commands_2, rx_get_block_commands_2) = + test_utils::test_get_block_commands!(1); + // Spawn Primary 2 Primary::spawn( name_2.clone(), @@ -352,6 +366,8 @@ async fn test_node_read_causal_signed_certificates() { /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, /* external_consensus */ + tx_get_block_commands_2, + rx_get_block_commands_2, Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, )), diff --git a/primary/tests/integration_tests_validator_api.rs b/primary/tests/integration_tests_validator_api.rs index a3cb4b424..8421daffc 100644 --- a/primary/tests/integration_tests_validator_api.rs +++ b/primary/tests/integration_tests_validator_api.rs @@ -104,6 +104,7 @@ async fn test_get_collections() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -119,6 +120,8 @@ async fn test_get_collections() { /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, /* dag */ + tx_get_block_commands, + rx_get_block_commands, Some(Arc::new( Dag::new(&committee, rx_new_certificates, consensus_metrics).1, )), @@ -293,6 +296,7 @@ async fn test_remove_collections() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -306,6 +310,8 @@ async fn test_remove_collections() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands, + rx_get_block_commands, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -509,6 +515,9 @@ async fn test_read_causal_signed_certificates() { let keypair_1 = k.pop().unwrap(); let name_1 = keypair_1.public().clone(); + let (tx_get_block_commands_1, rx_get_block_commands_1) = + test_utils::test_get_block_commands!(1); + // Spawn Primary 1 that we will be interacting with. Primary::spawn( name_1.clone(), @@ -520,6 +529,8 @@ async fn test_read_causal_signed_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands_1, + rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -531,6 +542,8 @@ async fn test_read_causal_signed_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands_2, rx_get_block_commands_2) = + test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -554,6 +567,8 @@ async fn test_read_causal_signed_certificates() { primary_store_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, + tx_get_block_commands_2, + rx_get_block_commands_2, /* external_consensus */ Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, @@ -640,6 +655,8 @@ async fn test_read_causal_unsigned_certificates() { // Make the Dag let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands_1, rx_get_block_commands_1) = + test_utils::test_get_block_commands!(1); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let dag = Arc::new(Dag::new(&committee, rx_new_certificates, consensus_metrics).1); @@ -723,6 +740,8 @@ async fn test_read_causal_unsigned_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, + tx_get_block_commands_1, + rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -734,6 +753,9 @@ async fn test_read_causal_unsigned_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands_2, rx_get_block_commands_2) = + test_utils::test_get_block_commands!(1); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics_2 = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -749,6 +771,8 @@ async fn test_read_causal_unsigned_certificates() { primary_store_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, + tx_get_block_commands_2, + rx_get_block_commands_2, /* external_consensus */ Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, @@ -877,6 +901,8 @@ async fn test_get_collections_with_missing_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_1, rx_feedback_1) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands_1, rx_get_block_commands_1) = + test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -892,6 +918,8 @@ async fn test_get_collections_with_missing_certificates() { /* tx_consensus */ tx_new_certificates_1, /* rx_consensus */ rx_feedback_1, /* external_consensus */ + tx_get_block_commands_1, + rx_get_block_commands_1, Some(Arc::new( Dag::new(&committee, rx_new_certificates_1, consensus_metrics).1, )), @@ -923,6 +951,9 @@ async fn test_get_collections_with_missing_certificates() { let (tx_new_certificates_2, _) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); + let (tx_get_block_commands_2, rx_get_block_commands_2) = + test_utils::test_get_block_commands!(1); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -936,6 +967,8 @@ async fn test_get_collections_with_missing_certificates() { store_primary_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, + tx_get_block_commands_2, + rx_get_block_commands_2, /* external_consensus */ None, NetworkModel::Asynchronous, diff --git a/test_utils/src/cluster.rs b/test_utils/src/cluster.rs index 1bca67a5a..ae38efc78 100644 --- a/test_utils/src/cluster.rs +++ b/test_utils/src/cluster.rs @@ -21,7 +21,7 @@ use tokio::{ }; use tonic::transport::Channel; use tracing::info; -use types::{ConfigurationClient, ProposerClient}; +use types::{ConfigurationClient, ProposerClient, TransactionsClient}; #[cfg(test)] #[path = "tests/cluster_tests.rs"] @@ -696,6 +696,29 @@ impl AuthorityDetails { ProposerClient::new(channel) } + /// This method returns a new client to send transactions to the dictated + /// worker identified by the `worker_id`. If the worker_id is not found then + /// a panic is raised. + pub async fn new_transactions_client( + &self, + worker_id: &WorkerId, + ) -> TransactionsClient { + let internal = self.internal.read().await; + + let config = mysten_network::config::Config::new(); + let channel = config + .connect_lazy( + &internal + .workers + .get(worker_id) + .unwrap() + .transactions_address, + ) + .unwrap(); + + TransactionsClient::new(channel) + } + /// Creates a new configuration client that connects to the corresponding client. /// This should be available only if the internal consensus is disabled. If /// the internal consensus is enabled then a panic will be thrown instead. diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index f70088ef0..23d48aaa3 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -89,6 +89,20 @@ macro_rules! test_new_certificates_channel { }; } +#[macro_export] +macro_rules! test_get_block_commands { + ($e:expr) => { + types::metered_channel::channel( + $e, + &prometheus::IntGauge::new( + primary::PrimaryChannelMetrics::NAME_GET_BLOCK_COMMANDS, + primary::PrimaryChannelMetrics::DESC_GET_BLOCK_COMMANDS, + ) + .unwrap(), + ); + }; +} + //////////////////////////////////////////////////////////////// /// Keys, Committee //////////////////////////////////////////////////////////////// diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 9d1489067..7010d34f4 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -265,7 +265,8 @@ subtle-ng = { version = "2", default-features = false } sync_wrapper = { version = "0.1", default-features = false } tap = { version = "1", default-features = false } task-group = { version = "0.2", default-features = false } -telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "f4aa523d3029bd6a23bead5f04c182f2cfa04c5e", features = ["chrome", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-chrome", "tracing-opentelemetry"] } +telemetry-subscribers-2187a2855c8ab378 = { package = "telemetry-subscribers", git = "https://github.com/mystenlabs/mysten-infra.git", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa", features = ["chrome", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-chrome", "tracing-opentelemetry"] } +telemetry-subscribers-a710c70c615361f9 = { package = "telemetry-subscribers", git = "https://github.com/mystenlabs/mysten-infra.git", rev = "f4aa523d3029bd6a23bead5f04c182f2cfa04c5e", features = ["chrome", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-chrome", "tracing-opentelemetry"] } tempfile = { version = "3", default-features = false } termcolor = { version = "1", default-features = false } terminal_size = { version = "0.1", default-features = false }