diff --git a/Cargo.lock b/Cargo.lock index aca5e6cd1..fe02d69f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1485,12 +1485,12 @@ name = "executor" version = "0.1.0" dependencies = [ "async-trait", - "backoff", "bincode", "blake2", "bytes", "config", "consensus", + "crypto", "fastcrypto", "futures", "indexmap", diff --git a/executor/Cargo.toml b/executor/Cargo.toml index 1415e4ff4..242d77a83 100644 --- a/executor/Cargo.toml +++ b/executor/Cargo.toml @@ -23,10 +23,10 @@ tokio-util = { version = "0.7.3", features = ["codec"] } tonic = "0.7.2" tracing = "0.1.36" prometheus = "0.13.1" -backoff = { version = "0.4.0", features = ["tokio"] } storage = { path = "../storage" } itertools = "0.10.3" +crypto = { path = "../crypto" } types = { path = "../types" } mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "352091e92070c2ecfcccad444361a78249ecfe59" } @@ -38,7 +38,6 @@ match_opt = "0.1.2" indexmap = { version = "1.9.1", features = ["serde"] } rand = "0.8.5" tempfile = "3.3.0" -primary = { path = "../primary" } test_utils = { path = "../test_utils" } types = { path = "../types" } telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "352091e92070c2ecfcccad444361a78249ecfe59" } diff --git a/executor/src/batch_loader.rs b/executor/src/batch_loader.rs new file mode 100644 index 000000000..5ed538877 --- /dev/null +++ b/executor/src/batch_loader.rs @@ -0,0 +1,204 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use crate::errors::{SubscriberError, SubscriberResult}; + +use config::WorkerId; +use consensus::ConsensusOutput; +use futures::stream::StreamExt; +use multiaddr::Multiaddr; +use std::collections::{HashMap, HashSet}; +use store::Store; +use tokio::{ + sync::{ + mpsc::{channel, Receiver, Sender}, + watch, + }, + task::JoinHandle, +}; +use tracing::{error, warn}; +use types::{ + metered_channel, serialized_batch_digest, BatchDigest, BincodeEncodedPayload, + ClientBatchRequest, ReconfigureNotification, SerializedBatchMessage, WorkerToWorkerClient, +}; + +/// Download transactions data from the consensus workers and notifies the called when the job is done. +pub struct BatchLoader { + /// The temporary storage holding all transactions' data (that may be too big to hold in memory). + store: Store, + /// Receive reconfiguration updates. + rx_reconfigure: watch::Receiver, + /// Receive consensus outputs for which to download the associated transaction data. + rx_input: metered_channel::Receiver, + /// The network addresses of the consensus workers. + addresses: HashMap, + /// A map of connections with the consensus workers. + connections: HashMap>>, +} + +impl BatchLoader { + /// Spawn a new batch loaded in a dedicated tokio task. + #[must_use] + pub fn spawn( + store: Store, + rx_reconfigure: watch::Receiver, + rx_input: metered_channel::Receiver, + addresses: HashMap, + ) -> JoinHandle<()> { + tokio::spawn(async move { + Self { + store, + rx_reconfigure, + rx_input, + addresses, + connections: HashMap::new(), + } + .run() + .await + .expect("Failed to run batch loader") + }) + } + + /// Receive consensus messages for which we need to download the associated transaction data. + async fn run(&mut self) -> SubscriberResult<()> { + loop { + tokio::select! { + // Receive sync requests. + Some(message) = self.rx_input.recv() => { + let certificate = &message.certificate; + + // Send a request for every batch referenced by the certificate. + // TODO: Can we write it better without allocating a HashMap every time? + let mut map = HashMap::with_capacity(certificate.header.payload.len()); + for (digest, worker_id) in certificate.header.payload.iter() { + map.entry(*worker_id).or_insert_with(Vec::new).push(*digest); + } + for (worker_id, digests) in map { + let address = self + .addresses + .get(&worker_id) + .ok_or(SubscriberError::UnexpectedWorkerId(worker_id))?; + + let sender = self.connections.entry(worker_id).or_insert_with(|| { + let (sender, receiver) = channel(primary::CHANNEL_CAPACITY); + SyncConnection::spawn( + address.clone(), + self.store.clone(), + receiver, + ); + sender + }); + + sender + .send(digests) + .await + .expect("Sync connections are kept alive and never die"); + } + } + + // Check whether the committee changed. + result = self.rx_reconfigure.changed() => { + result.expect("Committee channel dropped"); + let message = self.rx_reconfigure.borrow().clone(); + if let ReconfigureNotification::Shutdown = message { + return Ok(()); + } + } + } + } + } +} + +/// Connect (and maintain a connection) with a specific worker. Then download batches from that +/// specific worker. +struct SyncConnection { + /// The address of the worker to connect with. + address: Multiaddr, + /// The temporary storage holding all transactions' data (that may be too big to hold in memory). + store: Store, + /// Receive the batches to download from the worker. + rx_request: Receiver>, + /// Keep a set of requests already made to avoid asking twice for the same batch. + to_request: HashSet, +} + +impl SyncConnection { + /// Spawn a new worker connection in a dedicated tokio task. + pub fn spawn( + address: Multiaddr, + store: Store, + rx_request: Receiver>, + ) { + tokio::spawn(async move { + Self { + address, + store, + rx_request, + to_request: HashSet::new(), + } + .run() + .await; + }); + } + + /// Main loop keeping the connection with a worker alive and receive batches to download. + async fn run(&mut self) { + let config = mysten_network::config::Config::new(); + //TODO don't panic on bad address + let channel = config.connect_lazy(&self.address).unwrap(); + let mut client = WorkerToWorkerClient::new(channel); + + while let Some(digests) = self.rx_request.recv().await { + // Filter digests that we already requested. + for digest in digests { + self.to_request.insert(digest); + } + + let missing = self.to_request.iter().copied().collect(); + // Request the batch from the worker. + let message = ClientBatchRequest(missing); + //TODO wrap this call in the retry + let mut stream = match client + .client_batch_request(BincodeEncodedPayload::try_from(&message).unwrap()) + .await + { + Ok(stream) => stream.into_inner(), + Err(e) => { + warn!( + "Failed to send sync request to worker {}: {e}", + self.address + ); + continue; + } + }; + + // Receive the batch data from the worker. + while let Some(result) = stream.next().await { + match result { + Ok(batch) => { + let batch = batch.payload; + // Store the batch in the temporary store. + // TODO: We can probably avoid re-computing the hash of the bach since we trust the worker. + let res_digest = serialized_batch_digest(&batch); + match res_digest { + Ok(digest) => { + self.store.write(digest, batch.to_vec()).await; + + // Cleanup internal state. + self.to_request.remove(&digest); + } + Err(error) => { + error!("Worker sent invalid serialized batch data: {error}"); + } + } + } + Err(e) => { + warn!( + "Failed to receive batch reply from worker {}: {e}", + self.address + ); + } + } + } + } + } +} diff --git a/executor/src/core.rs b/executor/src/core.rs index 687e69ac3..fed5d7d5e 100644 --- a/executor/src/core.rs +++ b/executor/src/core.rs @@ -14,7 +14,10 @@ use tokio::{ task::JoinHandle, }; use tracing::debug; -use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber}; +use types::{ + metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber, + SerializedBatchMessage, WorkerMessage, +}; #[cfg(test)] #[path = "tests/executor_tests.rs"] @@ -26,7 +29,7 @@ pub mod executor_tests; /// not processes twice the same transaction (despite crash-recovery). pub struct Core { /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, + store: Store, /// The (global) state to perform execution. execution_state: Arc, /// Receive reconfiguration updates. @@ -54,7 +57,7 @@ where /// Spawn a new executor in a dedicated tokio task. #[must_use] pub fn spawn( - store: Store, + store: Store, execution_state: Arc, rx_reconfigure: watch::Receiver, rx_subscriber: metered_channel::Receiver, @@ -134,8 +137,8 @@ where total_batches: usize, ) -> SubscriberResult<()> { // The store should now hold all transaction data referenced by the input certificate. - let transactions = match self.store.read(batch_digest).await? { - Some(x) => x.0, + let batch = match self.store.read(batch_digest).await? { + Some(x) => x, None => { // If two certificates contain the exact same batch (eg. by the actions of a Byzantine // consensus node), some correct client may already have deleted the batch from their @@ -148,6 +151,12 @@ where } }; + // Deserialize the consensus workers' batch message to retrieve a list of transactions. + let transactions = match bincode::deserialize(&batch)? { + WorkerMessage::Batch(Batch(x)) => x, + _ => bail!(SubscriberError::UnexpectedProtocolMessage), + }; + // Execute every transaction in the batch. let total_transactions = transactions.len(); for (index, transaction) in transactions.into_iter().enumerate() { diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 3630ceb15..034f20fb9 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -4,7 +4,6 @@ use config::WorkerId; use std::fmt::Debug; use store::StoreError; use thiserror::Error; -use types::CertificateDigest; #[macro_export] macro_rules! bail { @@ -50,9 +49,6 @@ pub enum SubscriberError { #[error("Storage failure: {0}")] StoreError(#[from] StoreError), - #[error("Error occurred while retrieving certificate {0} payload: {1}")] - PayloadRetrieveError(CertificateDigest, String), - #[error("Consensus referenced unexpected worker id {0}")] UnexpectedWorkerId(WorkerId), diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 0c4edae7c..464aa97f3 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -1,5 +1,6 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +mod batch_loader; mod core; mod errors; mod state; @@ -16,16 +17,26 @@ mod execution_state; mod metrics; pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult}; +use multiaddr::{Multiaddr, Protocol}; pub use state::ExecutionIndices; +use storage::CertificateStore; -use crate::{core::Core, metrics::ExecutorMetrics, subscriber::Subscriber}; +use crate::{ + batch_loader::BatchLoader, core::Core, metrics::ExecutorMetrics, subscriber::Subscriber, +}; use async_trait::async_trait; +use config::{SharedCommittee, SharedWorkerCache}; use consensus::ConsensusOutput; -use primary::BlockCommand; +use crypto::PublicKey; use prometheus::Registry; use serde::de::DeserializeOwned; -use std::{fmt::Debug, sync::Arc}; -use storage::CertificateStore; +use std::{ + borrow::Cow, + collections::HashMap, + fmt::Debug, + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; use store::Store; use tokio::{ sync::{mpsc::Sender, watch}, @@ -33,8 +44,8 @@ use tokio::{ }; use tracing::info; use types::{ - metered_channel, Batch, BatchDigest, CertificateDigest, ConsensusStore, - ReconfigureNotification, SequenceNumber, + metered_channel, BatchDigest, CertificateDigest, ConsensusStore, ReconfigureNotification, + SequenceNumber, SerializedBatchMessage, }; /// Convenience type representing a serialized transaction. @@ -94,12 +105,14 @@ pub struct Executor; impl Executor { /// Spawn a new client subscriber. pub async fn spawn( - store: Store, + name: PublicKey, + committee: SharedCommittee, + worker_cache: SharedWorkerCache, + store: Store, execution_state: Arc, tx_reconfigure: &watch::Sender, rx_consensus: metered_channel::Receiver, tx_output: Sender>, - tx_get_block_commands: metered_channel::Sender, registry: &Registry, restored_consensus_output: Vec, ) -> SubscriberResult>> @@ -110,6 +123,8 @@ impl Executor { { let metrics = ExecutorMetrics::new(registry); + let (tx_batch_loader, rx_batch_loader) = + metered_channel::channel(primary::CHANNEL_CAPACITY, &metrics.tx_batch_loader); let (tx_executor, rx_executor) = metered_channel::channel(primary::CHANNEL_CAPACITY, &metrics.tx_executor); @@ -120,32 +135,77 @@ impl Executor { ); // We expect this will ultimately be needed in the `Core` as well as the `Subscriber`. - let arc_metrics = Arc::new(metrics); // Spawn the subscriber. let subscriber_handle = Subscriber::spawn( store.clone(), - tx_get_block_commands, tx_reconfigure.subscribe(), rx_consensus, + tx_batch_loader, tx_executor, - arc_metrics, restored_consensus_output, ); // Spawn the executor's core. let executor_handle = Core::::spawn( - store, + store.clone(), execution_state, tx_reconfigure.subscribe(), /* rx_subscriber */ rx_executor, tx_output, ); + // Spawn the batch loader. + let mut worker_addresses: HashMap = worker_cache + .load() + .workers + .iter() + .find_map(|v| match_opt::match_opt!(v, (nm, authority) if name == *nm => authority)) + .expect("Could not find own key in worker_cache") + .0 + .iter() + .map(|(id, x)| (*id, x.worker_to_worker.clone())) + .collect(); + //////////////////////////////////////////////////////////////// + // TODO: remove this hack once #706 is fixed + //////////////////////////////////////////////////////////////// + + // retrieve our primary address + let our_primary_to_primary_address = committee + .load() + .primary(&name) + .expect("Out public key is not in the committee!") + .primary_to_primary; + // extract the hostname portion + let our_primary_hostname = our_primary_to_primary_address + .into_iter() + .flat_map(move |x| match x { + p @ Protocol::Ip4(_) | p @ Protocol::Ip6(_) | p @ Protocol::Dns(_) => Some(p), + _ => None, + }) + .next() + .expect("Could not find hostname in our primary address!"); + // Modify the worker addresses that we are about to use : would we talk better using a loopback address? + for worker_address in worker_addresses.values_mut() { + replace_distant_by_localhost(worker_address, &our_primary_hostname); + } + //////////////////////////////////////////////////////////////// + + let batch_loader_handle = BatchLoader::spawn( + store, + tx_reconfigure.subscribe(), + rx_batch_loader, + worker_addresses, + ); + // Return the handle. info!("Consensus subscriber successfully started"); - Ok(vec![subscriber_handle, executor_handle]) + Ok(vec![ + subscriber_handle, + executor_handle, + batch_loader_handle, + ]) } } @@ -188,3 +248,74 @@ where } Ok(restored_consensus_output) } + +fn replace_distant_by_localhost(target: &mut Multiaddr, hostname_pattern: &Protocol) { + // does the hostname match our pattern exactly? + if target.iter().next() == Some(hostname_pattern.clone()) { + if let Some(replacement) = target.replace(0, move |x| match x { + Protocol::Ip4(_) => Some(Protocol::Ip4(Ipv4Addr::LOCALHOST)), + Protocol::Ip6(_) => Some(Protocol::Ip6(Ipv6Addr::LOCALHOST)), + Protocol::Dns(_) => Some(Protocol::Dns(Cow::Owned("localhost".to_owned()))), + _ => None, + }) { + tracing::debug!("Address for worker {} replaced by {}", target, replacement); + *target = replacement; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use multiaddr::{multiaddr, Protocol}; + use std::net::Ipv4Addr; + + #[test] + fn test_replace_distant_by_localhost() { + // IPV4 positive + let non_local: Ipv4Addr = "8.8.8.8".parse().unwrap(); + let mut addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Ip4(non_local)); + assert_eq!(addr1, multiaddr!(Ip4(Ipv4Addr::LOCALHOST), Tcp(10000u16))); + + // IPV4 negative + let other_target: Ipv4Addr = "8.8.8.4".parse().unwrap(); + let addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + let mut addr2 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Ip4(other_target)); + assert_eq!(addr2, addr1); + + // IPV6 positive + let non_local: Ipv6Addr = "2607:f0d0:1002:51::4".parse().unwrap(); + let mut addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Ip6(non_local)); + assert_eq!(addr1, multiaddr!(Ip6(Ipv6Addr::LOCALHOST), Tcp(10000u16))); + + // IPV6 negative + let other_target: Ipv6Addr = "2607:f0d0:1002:50::4".parse().unwrap(); + let addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + let mut addr2 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Ip6(other_target)); + assert_eq!(addr2, addr1); + + // DNS positive + let non_local: Cow = Cow::Owned("google.com".to_owned()); + let mut addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Dns(non_local.clone())); + let localhost: Cow = Cow::Owned("localhost".to_owned()); + assert_eq!(addr1, multiaddr!(Dns(localhost), Tcp(10000u16))); + + // DNS negative + let other_target: Cow = Cow::Owned("apple.com".to_owned()); + let addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); + let mut addr2 = multiaddr!(Dns(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Dns(other_target)); + assert_eq!(addr2, addr1); + } +} diff --git a/executor/src/metrics.rs b/executor/src/metrics.rs index 812c3c890..5c7de5965 100644 --- a/executor/src/metrics.rs +++ b/executor/src/metrics.rs @@ -4,6 +4,8 @@ use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, R #[derive(Clone, Debug)] pub struct ExecutorMetrics { + /// occupancy of the channel from the `Subscriber` to `BatchLoader` + pub tx_batch_loader: IntGauge, /// occupancy of the channel from the `Subscriber` to `Core` pub tx_executor: IntGauge, /// Number of elements in the waiting (ready-to-deliver) list of subscriber @@ -13,6 +15,12 @@ pub struct ExecutorMetrics { impl ExecutorMetrics { pub fn new(registry: &Registry) -> Self { Self { + tx_batch_loader: register_int_gauge_with_registry!( + "tx_batch_loader", + "occupancy of the channel from the `Subscriber` to `BatchLoader`", + registry + ) + .unwrap(), tx_executor: register_int_gauge_with_registry!( "tx_executor", "occupancy of the channel from the `Subscriber` to `Core`", diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index a61c1f57b..8672693d3 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,23 +1,13 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{ - errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError, - SubscriberError::PayloadRetrieveError, -}; -use backoff::{Error, ExponentialBackoff}; +use crate::{errors::SubscriberResult, try_fut_and_permit, SubscriberError}; use consensus::ConsensusOutput; -use fastcrypto::Hash; -use primary::BlockCommand; -use std::{sync::Arc, time::Duration}; +use futures::future::try_join_all; use store::Store; -use tokio::{ - sync::{oneshot, watch}, - task::JoinHandle, -}; -use tracing::error; +use tokio::{sync::watch, task::JoinHandle}; use types::{ - bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest, - ReconfigureNotification, + bounded_future_queue::BoundedFuturesOrdered, metered_channel, BatchDigest, + ReconfigureNotification, SerializedBatchMessage, }; #[cfg(test)] @@ -25,26 +15,20 @@ use types::{ pub mod subscriber_tests; /// The `Subscriber` receives certificates sequenced by the consensus and waits until the -/// downloaded all the transactions references by the certificates; it then +/// `BatchLoader` downloaded all the transactions references by the certificates; it then /// forward the certificates to the Executor Core. pub struct Subscriber { /// The temporary storage holding all transactions' data (that may be too big to hold in memory). - store: Store, + store: Store, /// Receive reconfiguration updates. rx_reconfigure: watch::Receiver, /// A channel to receive consensus messages. rx_consensus: metered_channel::Receiver, + /// A channel to the batch loader to download transaction's data. + tx_batch_loader: metered_channel::Sender, /// A channel to send the complete and ordered list of consensus outputs to the executor. This /// channel is used once all transactions data are downloaded. tx_executor: metered_channel::Sender, - // A channel to send commands to the block waiter to receive - // a certificate's batches (block). - tx_get_block_commands: metered_channel::Sender, - // When asking for a certificate's payload we want to retry until we succeed, unless - // some irrecoverable error occurs. For that reason a backoff policy is defined - get_block_retry_policy: ExponentialBackoff, - /// The metrics handler - metrics: Arc, } impl Subscriber { @@ -54,32 +38,20 @@ impl Subscriber { /// Spawn a new subscriber in a new tokio task. #[must_use] pub fn spawn( - store: Store, - tx_get_block_commands: metered_channel::Sender, + store: Store, rx_reconfigure: watch::Receiver, rx_consensus: metered_channel::Receiver, + tx_batch_loader: metered_channel::Sender, tx_executor: metered_channel::Sender, - metrics: Arc, restored_consensus_output: Vec, ) -> JoinHandle<()> { - let get_block_retry_policy = ExponentialBackoff { - initial_interval: Duration::from_millis(500), - randomization_factor: backoff::default::RANDOMIZATION_FACTOR, - multiplier: backoff::default::MULTIPLIER, - max_interval: Duration::from_secs(10), // Maximum backoff is 10 seconds - max_elapsed_time: None, // Never end retrying unless a non recoverable error occurs. - ..Default::default() - }; - tokio::spawn(async move { Self { store, rx_reconfigure, rx_consensus, + tx_batch_loader, tx_executor, - tx_get_block_commands, - get_block_retry_policy, - metrics, } .run(restored_consensus_output) .await @@ -87,29 +59,37 @@ impl Subscriber { }) } + /// Wait for particular data to become available in the storage and then returns. + async fn waiter( + missing: Vec, + store: Store, + deliver: T, + ) -> SubscriberResult { + let waiting: Vec<_> = missing.into_iter().map(|x| store.notify_read(x)).collect(); + try_join_all(waiting) + .await + .map(|_| deliver) + .map_err(SubscriberError::from) + } + /// Main loop connecting to the consensus to listen to sequence messages. async fn run( &mut self, restored_consensus_output: Vec, ) -> SubscriberResult<()> { - // It's important to have the futures in ordered fashion as we want - // to guarantee that will deliver to the executor the certificates - // in the same order we received from rx_consensus. So it doesn't - // mater if we somehow managed to fetch the batches from a later - // certificate. Unless the earlier certificate's payload has been - // fetched, no later certificate will be delivered. let mut waiting = BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES); // First handle any consensus output messages that were restored due to a restart. // This needs to happen before we start listening on rx_consensus and receive messages sequenced after these. for message in restored_consensus_output { - let future = Self::wait_on_payload( - self.get_block_retry_policy.clone(), - self.store.clone(), - self.tx_get_block_commands.clone(), - message, - ); + self.tx_batch_loader + .send(message.clone()) + .await + .expect("Failed to send message ot batch loader"); + + let digests = message.certificate.header.payload.keys().cloned().collect(); + let future = Self::waiter(digests, self.store.clone(), message); waiting.push(future).await; } @@ -118,15 +98,18 @@ impl Subscriber { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. Some(message) = self.rx_consensus.recv(), if waiting.available_permits() > 0 => { - // Fetch the certificate's payload from the workers. This is done via the - // block_waiter component. If the batches are not available in the workers then - // block_waiter will do its best to sync from the other peers. Once all batches - // are available, we forward the certificate to the Executor Core. - let future = Self::wait_on_payload( - self.get_block_retry_policy.clone(), - self.store.clone(), - self.tx_get_block_commands.clone(), - message); + // Send the certificate to the batch loader to download all transactions' data. + self.tx_batch_loader + .send(message.clone()) + .await + .expect("Failed to send message ot batch loader"); + + // Wait for the transaction data to be available in the store. This will happen for sure because + // the primary already successfully processed the certificate. This implies that the primary notified + // its worker to download any missing batch. We may however have to wait for these batch be available + // on our workers. Once all batches are available, we forward the certificate o the Executor Core. + let digests = message.certificate.header.payload.keys().cloned().collect(); + let future = Self::waiter(digests, self.store.clone(), message); waiting.push(future).await; }, @@ -144,61 +127,6 @@ impl Subscriber { } } } - - self.metrics - .waiting_elements_subscriber - .set(waiting.len() as i64); } } - - /// The wait_on_payload will try to retrieve the certificate's payload - /// from the workers via the block_waiter component and relase the - /// `deliver` once successfully done. Since we want the output to be - /// sequenced we will not quit this method until we have successfully - /// fetched the payload. - async fn wait_on_payload( - back_off_policy: ExponentialBackoff, - store: Store, - tx_get_block_commands: metered_channel::Sender, - deliver: ConsensusOutput, - ) -> SubscriberResult { - let get_block = move || { - let message = deliver.clone(); - let id = message.certificate.digest(); - let tx_get_block = tx_get_block_commands.clone(); - let batch_store = store.clone(); - - async move { - let (sender, receiver) = oneshot::channel(); - - tx_get_block - .send(BlockCommand::GetBlock { id, sender }) - .await - .map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))?; - - match receiver - .await - .map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))? - { - Ok(block) => { - // we successfully received the payload. Now let's add to store - batch_store - .write_all(block.batches.into_iter().map(|b| (b.id, b.transactions))) - .await - .map_err(|err| Error::permanent(SubscriberError::from(err)))?; - - Ok(message) - } - Err(err) => { - // whatever the error might be at this point we don't - // have many options apart from retrying. - error!("Error while retrieving block via block waiter: {}", err); - Err(Error::transient(PayloadRetrieveError(id, err.to_string()))) - } - } - } - }; - - backoff::future::retry(back_off_policy, get_block).await - } } diff --git a/executor/src/tests/fixtures.rs b/executor/src/tests/fixtures.rs index 2b8dac5ed..8bab44e92 100644 --- a/executor/src/tests/fixtures.rs +++ b/executor/src/tests/fixtures.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use config::WorkerId; -use fastcrypto::Hash; use indexmap::IndexMap; use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Serialize; @@ -11,19 +10,23 @@ use store::{ rocks::{open_cf, DBMap}, Store, }; + use test_utils::committee; -use types::{Batch, BatchDigest, Certificate, Header}; +use types::{ + serialized_batch_digest, Batch, BatchDigest, Certificate, Header, SerializedBatchMessage, + WorkerMessage, +}; /// A test batch containing specific transactions. -pub fn test_batch(transactions: Vec) -> (BatchDigest, Batch) { - let serialised_transactions = transactions +pub fn test_batch(transactions: Vec) -> (BatchDigest, SerializedBatchMessage) { + let batch = transactions .iter() .map(|x| bincode::serialize(x).unwrap()) .collect(); - - let batch = Batch(serialised_transactions); - - (batch.digest(), batch) + let message = WorkerMessage::Batch(Batch(batch)); + let serialized = bincode::serialize(&message).unwrap(); + let digest = serialized_batch_digest(&serialized).unwrap(); + (digest, serialized) } /// A test certificate with a specific payload. @@ -40,12 +43,12 @@ pub fn test_certificate(payload: IndexMap) -> Certificate } /// Make a test storage to hold transaction data. -pub fn test_store() -> Store { +pub fn test_store() -> Store { let store_path = tempfile::tempdir().unwrap(); - const TEMP_BATCHES_CF: &str = "temp_batches"; - let rocksdb = open_cf(store_path, None, &[TEMP_BATCHES_CF]).unwrap(); - let temp_batch_map = reopen!(&rocksdb, TEMP_BATCHES_CF;); - Store::new(temp_batch_map) + const BATCHES_CF: &str = "batches"; + let rocksdb = open_cf(store_path, None, &[BATCHES_CF]).unwrap(); + let batch_map = reopen!(&rocksdb, BATCHES_CF;); + Store::new(batch_map) } /// Create a number of test certificates containing transactions of type u64. @@ -53,7 +56,7 @@ pub fn test_u64_certificates( certificates: usize, batches_per_certificate: usize, transactions_per_batch: usize, -) -> Vec<(Certificate, Vec<(BatchDigest, Batch)>)> { +) -> Vec<(Certificate, Vec<(BatchDigest, SerializedBatchMessage)>)> { let mut rng = StdRng::from_seed([0; 32]); (0..certificates) .map(|_| { diff --git a/executor/src/tests/subscriber_tests.rs b/executor/src/tests/subscriber_tests.rs index 646e778f5..335907630 100644 --- a/executor/src/tests/subscriber_tests.rs +++ b/executor/src/tests/subscriber_tests.rs @@ -2,23 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::fixtures::{test_store, test_u64_certificates}; -use primary::GetBlockResponse; -use prometheus::Registry; use test_utils::{committee, test_channel}; -use types::{ - BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber, -}; +use types::{Certificate, SequenceNumber}; /// Spawn a mock consensus core and a test subscriber. async fn spawn_subscriber( rx_sequence: metered_channel::Receiver, + tx_batch_loader: metered_channel::Sender, tx_executor: metered_channel::Sender, - tx_get_block_commands: metered_channel::Sender, restored_consensus_output: Vec, ) -> ( - Store, + Store, watch::Sender, - JoinHandle<()>, ) { let committee = committee(None); let message = ReconfigureNotification::NewEpoch(committee); @@ -26,37 +21,40 @@ async fn spawn_subscriber( // Spawn a test subscriber. let store = test_store(); - let executor_metrics = ExecutorMetrics::new(&Registry::new()); - let subscriber_handle = Subscriber::spawn( + let _subscriber_handle = Subscriber::spawn( store.clone(), - tx_get_block_commands, rx_reconfigure, rx_sequence, + tx_batch_loader, tx_executor, - Arc::new(executor_metrics), restored_consensus_output, ); - (store, tx_reconfigure, subscriber_handle) + (store, tx_reconfigure) } #[tokio::test] async fn handle_certificate_with_downloaded_batch() { let (tx_sequence, rx_sequence) = test_channel!(10); + let (tx_batch_loader, mut rx_batch_loader) = test_channel!(10); let (tx_executor, mut rx_executor) = test_channel!(10); - let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (store, _tx_reconfigure, _) = - spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await; + let (store, _tx_reconfigure) = + spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor, vec![]).await; + // Feed certificates to the mock sequencer and ensure the batch loader receive the command to + // download the corresponding transaction data. let total_certificates = 2; let certificates = test_u64_certificates( total_certificates, /* batches_per_certificate */ 2, /* transactions_per_batch */ 2, ); - for (i, (certificate, _)) in certificates.clone().into_iter().enumerate() { + for (i, (certificate, batches)) in certificates.into_iter().enumerate() { + for (digest, batch) in batches { + store.write(digest, batch).await; + } let message = ConsensusOutput { certificate, consensus_index: i as SequenceNumber, @@ -65,226 +63,37 @@ async fn handle_certificate_with_downloaded_batch() { } for i in 0..total_certificates { - let request = rx_get_block_command.recv().await.unwrap(); - - let batches = match request { - BlockCommand::GetBlock { id, sender } => { - let (certificate, batches) = certificates.get(i).unwrap().to_owned(); - - assert_eq!( - certificate.digest(), - id, - "Out of order certificate id has been received" - ); - - // Mimic the block_waiter here and respond with the payload back - let ok = successful_block_response(id, batches.clone()); - - sender.send(ok).unwrap(); - - batches - } - _ => panic!("Unexpected command received"), - }; + let output = rx_batch_loader.recv().await.unwrap(); + assert_eq!(output.consensus_index, i as SequenceNumber); let output = rx_executor.recv().await.unwrap(); assert_eq!(output.consensus_index, i as SequenceNumber); - - // Ensure all the batches have been written in storage - for (batch_id, batch) in batches { - let stored_batch = store - .read(batch_id) - .await - .expect("Error while retrieving batch") - .unwrap(); - assert_eq!(batch, stored_batch); - } } } #[tokio::test] -async fn should_retry_when_failed_to_get_payload() { +async fn handle_empty_certificate() { let (tx_sequence, rx_sequence) = test_channel!(10); + let (tx_batch_loader, mut rx_batch_loader) = test_channel!(10); let (tx_executor, mut rx_executor) = test_channel!(10); - let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (store, _tx_reconfigure, _) = - spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await; - - // Create a certificate - let total_certificates = 1; - let certificates = test_u64_certificates( - total_certificates, - /* batches_per_certificate */ 2, - /* transactions_per_batch */ 2, - ); - - let (certificate, batches) = certificates.first().unwrap().to_owned(); - let certificate_id = certificate.digest(); - - let message = ConsensusOutput { - certificate, - consensus_index: 500 as SequenceNumber, - }; - - // send the certificate to download payload - tx_sequence.send(message).await.unwrap(); + let _do_not_drop = spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor, vec![]).await; - // Now assume that the block_wait is responding with error for the - // requested certificate for RETRIES -1 attempts. - // Finally on the last one we reply with a successful result. - const RETRIES: u32 = 3; - for i in 0..RETRIES { - let request = rx_get_block_command.recv().await.unwrap(); - - match request { - BlockCommand::GetBlock { id, sender } => { - assert_eq!(certificate_id, id); - - if i < RETRIES - 1 { - sender - .send(Err(BlockError { - id, - error: BlockErrorKind::BatchTimeout, - })) - .unwrap(); - } else { - // Mimic the block_waiter here and respond with the payload back - let ok = successful_block_response(id, batches.clone()); - - sender.send(ok).unwrap(); - } - } - _ => panic!("Unexpected command received"), - }; - } - - // Now the message will be delivered and should be forwarded to tx_executor - let output = rx_executor.recv().await.unwrap(); - assert_eq!(output.consensus_index, 500 as SequenceNumber); - - // Ensure all the batches have been written in storage - for (batch_id, batch) in batches { - let stored_batch = store - .read(batch_id) - .await - .expect("Error while retrieving batch") - .unwrap(); - assert_eq!(batch, stored_batch); - } -} - -#[tokio::test] -async fn subscriber_should_crash_when_irrecoverable_error() { - let (tx_sequence, rx_sequence) = test_channel!(10); - let (tx_executor, _rx_executor) = test_channel!(10); - let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); - - // Spawn a subscriber. - let (_store, _tx_reconfigure, handle) = - spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await; - - // Create a certificate - let total_certificates = 1; - let certificates = test_u64_certificates( - total_certificates, - /* batches_per_certificate */ 2, - /* transactions_per_batch */ 2, - ); - - let (certificate, _batches) = certificates.first().unwrap().to_owned(); - - let message = ConsensusOutput { - certificate, - consensus_index: 500 as SequenceNumber, - }; - - // now close the tx_get_block_command in order to inject an artificial - // error and make any retries stop and propagate the error - rx_get_block_command.close(); - - // send the certificate to download payload - // We expect this to make the subscriber crash - tx_sequence.send(message).await.unwrap(); - - let err = handle - .await - .expect_err("Expected an error, instead a successful response returned"); - assert!(err.is_panic()); -} - -#[tokio::test] -async fn test_subscriber_with_restored_consensus_output() { - let (_tx_sequence, rx_sequence) = test_channel!(10); - let (tx_executor, mut rx_executor) = test_channel!(10); - let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); - - // Create restored consensus output - let total_certificates = 2; - let certificates = test_u64_certificates( - total_certificates, - /* batches_per_certificate */ 2, - /* transactions_per_batch */ 2, - ); - let restored_consensus = certificates - .clone() - .into_iter() - .enumerate() - .map(|(i, (certificate, _))| ConsensusOutput { - certificate, + // Feed certificates to the mock sequencer and ensure the batch loader receive the command to + // download the corresponding transaction data. + for i in 0..2 { + let message = ConsensusOutput { + certificate: Certificate::default(), consensus_index: i as SequenceNumber, - }) - .collect(); - - // Spawn a subscriber. - let (_store, _tx_reconfigure, _handle) = spawn_subscriber( - rx_sequence, - tx_executor, - tx_get_block_command, - restored_consensus, - ) - .await; - - for i in 0..total_certificates { - let request = rx_get_block_command.recv().await.unwrap(); - - let _batches = match request { - BlockCommand::GetBlock { id, sender } => { - let (_certificate, batches) = certificates.get(i).unwrap().to_owned(); - - // Mimic the block_waiter here and respond with the payload back - let ok = successful_block_response(id, batches.clone()); - - sender.send(ok).unwrap(); - - batches - } - _ => panic!("Unexpected command received"), }; + tx_sequence.send(message).await.unwrap(); + } + for i in 0..2 { + let output = rx_batch_loader.recv().await.unwrap(); + assert_eq!(output.consensus_index, i); - // Ensure restored messages are delivered. let output = rx_executor.recv().await.unwrap(); - assert_eq!(output.consensus_index, i as SequenceNumber); + assert_eq!(output.consensus_index, i); } } - -// Helper method to create a successful (OK) get_block response. -fn successful_block_response( - id: CertificateDigest, - batches: Vec<(BatchDigest, Batch)>, -) -> BlockResult { - // Mimic the block_waiter here and respond with the payload back - let batch_messages = batches - .iter() - .map(|(batch_id, batch)| BatchMessage { - id: *batch_id, - transactions: batch.clone(), - }) - .collect(); - - Ok(GetBlockResponse { - id, - batches: batch_messages, - }) -} diff --git a/node/src/lib.rs b/node/src/lib.rs index aa7c5ad5f..4c9a6506a 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -15,7 +15,7 @@ use executor::{ }; use fastcrypto::traits::{KeyPair as _, VerifyingKey}; use itertools::Itertools; -use primary::{BlockCommand, NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics}; +use primary::{NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics}; use prometheus::{IntGauge, Registry}; use std::{fmt::Debug, sync::Arc}; use storage::{CertificateStore, CertificateToken}; @@ -30,7 +30,7 @@ use tokio::{ }; use tracing::{debug, info}; use types::{ - metered_channel, Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header, + metered_channel, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header, HeaderDigest, ReconfigureNotification, Round, SequenceNumber, SerializedBatchMessage, }; use worker::{metrics::initialise_metrics, Worker}; @@ -46,7 +46,6 @@ pub struct NodeStorage { pub payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, pub batch_store: Store, pub consensus_store: Arc, - pub temp_batch_store: Store, } impl NodeStorage { @@ -58,7 +57,6 @@ impl NodeStorage { const BATCHES_CF: &'static str = "batches"; const LAST_COMMITTED_CF: &'static str = "last_committed"; const SEQUENCE_CF: &'static str = "sequence"; - const TEMP_BATCH_CF: &'static str = "temp_batches"; /// Open or reopen all the storage of the node. pub fn reopen>(store_path: Path) -> Self { @@ -73,7 +71,6 @@ impl NodeStorage { Self::BATCHES_CF, Self::LAST_COMMITTED_CF, Self::SEQUENCE_CF, - Self::TEMP_BATCH_CF, ], ) .expect("Cannot open database"); @@ -86,7 +83,6 @@ impl NodeStorage { batch_map, last_committed_map, sequence_map, - temp_batch_map, ) = reopen!(&rocksdb, Self::HEADERS_CF;, Self::CERTIFICATES_CF;, @@ -94,8 +90,7 @@ impl NodeStorage { Self::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>, Self::BATCHES_CF;, Self::LAST_COMMITTED_CF;, - Self::SEQUENCE_CF;, - Self::TEMP_BATCH_CF; + Self::SEQUENCE_CF; ); let header_store = Store::new(header_map); @@ -103,7 +98,6 @@ impl NodeStorage { let payload_store = Store::new(payload_map); let batch_store = Store::new(batch_map); let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sequence_map)); - let temp_batch_store = Store::new(temp_batch_map); Self { header_store, @@ -111,7 +105,6 @@ impl NodeStorage { payload_store, batch_store, consensus_store, - temp_batch_store, } } } @@ -174,14 +167,6 @@ impl Node { let (tx_consensus, rx_consensus) = metered_channel::channel(Self::CHANNEL_CAPACITY, &committed_certificates_counter); - let tx_get_block_commands_counter = IntGauge::new( - PrimaryChannelMetrics::NAME_GET_BLOCK_COMMANDS, - PrimaryChannelMetrics::DESC_GET_BLOCK_COMMANDS, - ) - .unwrap(); - let (tx_get_block_commands, rx_get_block_commands) = - metered_channel::channel(Self::CHANNEL_CAPACITY, &tx_get_block_commands_counter); - // Compute the public key of this authority. let name = keypair.public().clone(); let mut handles = Vec::new(); @@ -196,7 +181,9 @@ impl Node { (Some(Arc::new(dag)), NetworkModel::Asynchronous) } else { let consensus_handles = Self::spawn_consensus( + name.clone(), committee.clone(), + worker_cache.clone(), store, parameters.clone(), execution_state, @@ -204,7 +191,6 @@ impl Node { rx_new_certificates, tx_consensus.clone(), tx_confirmation, - tx_get_block_commands.clone(), registry, ) .await?; @@ -239,8 +225,6 @@ impl Node { store.payload_store.clone(), tx_new_certificates, /* rx_consensus */ rx_consensus, - tx_get_block_commands, - rx_get_block_commands, /* dag */ dag, network_model, tx_reconfigure, @@ -271,7 +255,9 @@ impl Node { /// Spawn the consensus core and the client executing transactions. async fn spawn_consensus( + name: PublicKey, committee: SharedCommittee, + worker_cache: SharedWorkerCache, store: &NodeStorage, parameters: Parameters, execution_state: Arc, @@ -282,7 +268,6 @@ impl Node { SubscriberResult<::Outcome>, SerializedTransaction, )>, - tx_get_block_commands: metered_channel::Sender, registry: &Registry, ) -> SubscriberResult>> where @@ -341,12 +326,14 @@ impl Node { // Spawn the client executing the transactions. It can also synchronize with the // subscriber handler if it missed some transactions. let executor_handles = Executor::spawn( - store.temp_batch_store.clone(), + name, + committee, + worker_cache, + store.batch_store.clone(), execution_state, tx_reconfigure, /* rx_consensus */ rx_sequence, /* tx_output */ tx_confirmation, - tx_get_block_commands, registry, restored_consensus_output, ) diff --git a/primary/src/block_synchronizer/handler.rs b/primary/src/block_synchronizer/handler.rs index 79f984417..ad893d73d 100644 --- a/primary/src/block_synchronizer/handler.rs +++ b/primary/src/block_synchronizer/handler.rs @@ -126,6 +126,7 @@ impl BlockSynchronizerHandler { } } + #[instrument(level = "debug", skip_all)] async fn wait_all(&self, certificates: Vec) -> Vec> { let futures: Vec<_> = certificates .into_iter() @@ -135,6 +136,7 @@ impl BlockSynchronizerHandler { join_all(futures).await } + #[instrument(level = "debug", skip_all, err)] async fn wait(&self, block_id: CertificateDigest) -> Result { if let Ok(result) = timeout( self.certificate_deliver_timeout, @@ -160,7 +162,7 @@ impl Handler for BlockSynchronizerHandler { /// * Internal: An internal error caused /// * BlockDeliveryTimeout: Timed out while waiting for the certificate to become available /// after submitting it for processing to core - #[instrument(level="trace", skip_all, fields(num_block_ids = block_ids.len()))] + #[instrument(level="debug", skip_all, fields(num_block_ids = block_ids.len()))] async fn get_and_synchronize_block_headers( &self, block_ids: Vec, @@ -225,7 +227,7 @@ impl Handler for BlockSynchronizerHandler { results } - #[instrument(level="trace", skip_all, fields(num_block_ids = block_ids.len()))] + #[instrument(level="debug", skip_all, fields(num_block_ids = block_ids.len()))] async fn get_block_headers( &self, block_ids: Vec, @@ -262,7 +264,7 @@ impl Handler for BlockSynchronizerHandler { results } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn synchronize_block_payloads( &self, certificates: Vec, diff --git a/primary/src/block_synchronizer/mod.rs b/primary/src/block_synchronizer/mod.rs index 7a861cf59..0304c2f1e 100644 --- a/primary/src/block_synchronizer/mod.rs +++ b/primary/src/block_synchronizer/mod.rs @@ -380,7 +380,7 @@ impl BlockSynchronizer { /// logic of waiting and gathering the replies from the primary nodes /// for the payload availability. This future is returning the next State /// to be executed. - #[instrument(level="trace", skip_all, fields(num_certificates = certificates.len()))] + #[instrument(level="debug", skip_all, fields(num_certificates = certificates.len()))] async fn handle_synchronize_block_payload_command<'a>( &mut self, certificates: Vec, @@ -412,9 +412,6 @@ impl BlockSynchronizer { trace!("Certificate payloads need sync"); } - // TODO: add metric here to track the number of certificates - // requested that are missing a payload - let key = RequestID::from_iter(certificates_to_sync.iter()); let message = PrimaryMessage::PayloadAvailabilityRequest { @@ -566,7 +563,7 @@ impl BlockSynchronizer { /// a reply is immediately sent to the consumer via the provided respond_to /// channel. For the ones that haven't been found, are returned back on the /// returned vector. - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn reply_with_payload_already_in_storage( &self, certificates: Vec, @@ -627,7 +624,7 @@ impl BlockSynchronizer { // Broadcasts a message to all the other primary nodes. // It returns back the primary names to which we have sent the requests. - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn broadcast_batch_request(&mut self, message: PrimaryMessage) -> Vec { // Naively now just broadcast the request to all the primaries @@ -645,7 +642,7 @@ impl BlockSynchronizer { primaries_names } - #[instrument(level="trace", skip_all, fields(request_id = ?request_id))] + #[instrument(level="debug", skip_all, fields(request_id = ?request_id))] async fn handle_synchronize_block_payloads<'a>( &mut self, request_id: RequestID, @@ -688,7 +685,7 @@ impl BlockSynchronizer { /// /// * `primary_peer_name` - The primary from which we are looking to sync the batches. /// * `certificates` - The certificates for which we want to sync their batches. - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn send_synchronize_payload_requests( &mut self, primary_peer_name: PublicKey, @@ -755,7 +752,7 @@ impl BlockSynchronizer { } } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn handle_payload_availability_response( &mut self, response: PayloadAvailabilityResponse, @@ -778,7 +775,7 @@ impl BlockSynchronizer { } } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "debug", skip_all)] async fn handle_certificates_response(&mut self, response: CertificatesResponse) { let sender = self .map_certificate_responses_senders diff --git a/primary/src/block_waiter.rs b/primary/src/block_waiter.rs index c4699a7fa..5dd858555 100644 --- a/primary/src/block_waiter.rs +++ b/primary/src/block_waiter.rs @@ -434,7 +434,7 @@ impl BlockWaiter Option { if let Some((_, c)) = self.get_certificates(vec![id]).await.first() { return c.to_owned(); @@ -447,7 +447,7 @@ impl BlockWaiter, @@ -605,7 +605,7 @@ impl BlockWaiter Self { Self { @@ -166,7 +157,7 @@ impl PrimaryChannelMetrics { ).unwrap(), tx_get_block_commands: register_int_gauge_with_registry!( "tx_get_block_commands", - "occupancy of the channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter`", + "occupancy of the channel from the `primary::ConsensusAPIGrpc` to the `primary::BlockWaiter`", registry ).unwrap(), tx_batches: register_int_gauge_with_registry!( @@ -251,21 +242,6 @@ impl PrimaryChannelMetrics { registry.register(collector).unwrap(); self.tx_committed_certificates = committed_certificates_counter; } - - pub fn replace_registered_get_block_commands_metric( - &mut self, - registry: &Registry, - collector: Box>, - ) { - let tx_get_block_commands_counter = - IntGauge::new(Self::NAME_GET_BLOCK_COMMANDS, Self::DESC_GET_BLOCK_COMMANDS).unwrap(); - // TODO: Sanity-check by hashing the descs against one another - registry - .unregister(Box::new(tx_get_block_commands_counter.clone())) - .unwrap(); - registry.register(collector).unwrap(); - self.tx_get_block_commands = tx_get_block_commands_counter; - } } #[derive(Clone)] diff --git a/primary/src/primary.rs b/primary/src/primary.rs index 5bb90b73f..8e22857ed 100644 --- a/primary/src/primary.rs +++ b/primary/src/primary.rs @@ -15,8 +15,7 @@ use crate::{ proposer::Proposer, state_handler::StateHandler, synchronizer::Synchronizer, - BlockCommand, BlockRemover, CertificatesResponse, DeleteBatchMessage, - PayloadAvailabilityResponse, + BlockRemover, CertificatesResponse, DeleteBatchMessage, PayloadAvailabilityResponse, }; use anemo::{types::PeerInfo, PeerId}; @@ -76,8 +75,6 @@ impl Primary { payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, tx_consensus: Sender, rx_consensus: Receiver, - tx_get_block_commands: Sender, - rx_get_block_commands: Receiver, dag: Option>, network_model: NetworkModel, tx_reconfigure: watch::Sender, @@ -126,6 +123,10 @@ impl Primary { CHANNEL_CAPACITY, &primary_channel_metrics.tx_helper_requests, ); + let (tx_get_block_commands, rx_get_block_commands) = channel( + CHANNEL_CAPACITY, + &primary_channel_metrics.tx_get_block_commands, + ); let (tx_batches, rx_batches) = channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_batches); let (tx_block_removal_commands, rx_block_removal_commands) = channel( @@ -157,17 +158,10 @@ impl Primary { registry, Box::new(committed_certificates_gauge), ); - let new_certificates_gauge = tx_consensus.gauge().clone(); primary_channel_metrics .replace_registered_new_certificates_metric(registry, Box::new(new_certificates_gauge)); - let tx_get_block_commands_gauge = tx_get_block_commands.gauge().clone(); - primary_channel_metrics.replace_registered_get_block_commands_metric( - registry, - Box::new(tx_get_block_commands_gauge), - ); - let (tx_consensus_round_updates, rx_consensus_round_updates) = watch::channel(0u64); let our_workers = worker_cache diff --git a/primary/tests/epoch_change.rs b/primary/tests/epoch_change.rs index 61e2339f8..9654173d0 100644 --- a/primary/tests/epoch_change.rs +++ b/primary/tests/epoch_change.rs @@ -44,8 +44,6 @@ async fn test_simple_epoch_change() { let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -60,8 +58,6 @@ async fn test_simple_epoch_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -157,8 +153,6 @@ async fn test_partial_committee_change() { epoch_0_tx_channels.push(tx_feedback.clone()); let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -173,8 +167,6 @@ async fn test_partial_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -242,8 +234,6 @@ async fn test_partial_committee_change() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); epoch_1_tx_channels.push(tx_feedback.clone()); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee_1.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -261,8 +251,6 @@ async fn test_partial_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -328,8 +316,6 @@ async fn test_restart_with_new_committee_change() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); tx_channels.push(tx_feedback.clone()); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -347,8 +333,6 @@ async fn test_restart_with_new_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -414,8 +398,6 @@ async fn test_restart_with_new_committee_change() { let initial_committee = ReconfigureNotification::NewEpoch(new_committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -430,8 +412,6 @@ async fn test_restart_with_new_committee_change() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, @@ -503,8 +483,6 @@ async fn test_simple_committee_update() { let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); - let (tx_get_block_commands, rx_get_block_commands) = - test_utils::test_get_block_commands!(1); let store = NodeStorage::reopen(temp_dir()); @@ -519,8 +497,6 @@ async fn test_simple_committee_update() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ None, NetworkModel::Asynchronous, tx_reconfigure, diff --git a/primary/tests/integration_tests_proposer_api.rs b/primary/tests/integration_tests_proposer_api.rs index 2f251128d..615f1a46b 100644 --- a/primary/tests/integration_tests_proposer_api.rs +++ b/primary/tests/integration_tests_proposer_api.rs @@ -77,7 +77,6 @@ async fn test_rounds_errors() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -106,8 +105,6 @@ async fn test_rounds_errors() { /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, /* external_consensus */ - tx_get_block_commands, - rx_get_block_commands, Some(Arc::new( Dag::new(&no_name_committee, rx_new_certificates, consensus_metrics).1, )), @@ -170,7 +167,6 @@ async fn test_rounds_return_successful_response() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -189,8 +185,6 @@ async fn test_rounds_return_successful_response() { store_primary.payload_store, /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* external_consensus */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -314,9 +308,6 @@ async fn test_node_read_causal_signed_certificates() { let keypair_1 = k.pop().unwrap(); let name_1 = keypair_1.public().clone(); - let (tx_get_block_commands_1, rx_get_block_commands_1) = - test_utils::test_get_block_commands!(1); - // Spawn Primary 1 that we will be interacting with. Primary::spawn( name_1.clone(), @@ -329,8 +320,6 @@ async fn test_node_read_causal_signed_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands_1, - rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -353,9 +342,6 @@ async fn test_node_read_causal_signed_certificates() { let name_2 = keypair_2.public().clone(); let consensus_metrics_2 = Arc::new(ConsensusMetrics::new(&Registry::new())); - let (tx_get_block_commands_2, rx_get_block_commands_2) = - test_utils::test_get_block_commands!(1); - // Spawn Primary 2 Primary::spawn( name_2.clone(), @@ -369,8 +355,6 @@ async fn test_node_read_causal_signed_certificates() { /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, /* external_consensus */ - tx_get_block_commands_2, - rx_get_block_commands_2, Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, )), diff --git a/primary/tests/integration_tests_validator_api.rs b/primary/tests/integration_tests_validator_api.rs index 8bbe2bb5a..3e3618714 100644 --- a/primary/tests/integration_tests_validator_api.rs +++ b/primary/tests/integration_tests_validator_api.rs @@ -105,7 +105,6 @@ async fn test_get_collections() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -122,8 +121,6 @@ async fn test_get_collections() { /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, /* dag */ - tx_get_block_commands, - rx_get_block_commands, Some(Arc::new( Dag::new(&committee, rx_new_certificates, consensus_metrics).1, )), @@ -298,7 +295,6 @@ async fn test_remove_collections() { let (tx_feedback, rx_feedback) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands, rx_get_block_commands) = test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -313,8 +309,6 @@ async fn test_remove_collections() { store.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands, - rx_get_block_commands, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -507,9 +501,6 @@ async fn test_read_causal_signed_certificates() { let keypair_1 = k.pop().unwrap(); let name_1 = keypair_1.public().clone(); - let (tx_get_block_commands_1, rx_get_block_commands_1) = - test_utils::test_get_block_commands!(1); - // Spawn Primary 1 that we will be interacting with. Primary::spawn( name_1.clone(), @@ -522,8 +513,6 @@ async fn test_read_causal_signed_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands_1, - rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -535,8 +524,6 @@ async fn test_read_causal_signed_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands_2, rx_get_block_commands_2) = - test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -561,8 +548,6 @@ async fn test_read_causal_signed_certificates() { primary_store_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, - tx_get_block_commands_2, - rx_get_block_commands_2, /* external_consensus */ Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, @@ -650,8 +635,6 @@ async fn test_read_causal_unsigned_certificates() { // Make the Dag let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands_1, rx_get_block_commands_1) = - test_utils::test_get_block_commands!(1); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let dag = Arc::new(Dag::new(&committee, rx_new_certificates, consensus_metrics).1); @@ -726,8 +709,6 @@ async fn test_read_causal_unsigned_certificates() { primary_store_1.payload_store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, - tx_get_block_commands_1, - rx_get_block_commands_1, /* dag */ Some(dag.clone()), NetworkModel::Asynchronous, tx_reconfigure, @@ -739,9 +720,6 @@ async fn test_read_causal_unsigned_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands_2, rx_get_block_commands_2) = - test_utils::test_get_block_commands!(1); - let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics_2 = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -758,8 +736,6 @@ async fn test_read_causal_unsigned_certificates() { primary_store_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, - tx_get_block_commands_2, - rx_get_block_commands_2, /* external_consensus */ Some(Arc::new( Dag::new(&committee, rx_new_certificates_2, consensus_metrics_2).1, @@ -889,8 +865,6 @@ async fn test_get_collections_with_missing_certificates() { test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_1, rx_feedback_1) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands_1, rx_get_block_commands_1) = - test_utils::test_get_block_commands!(1); let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -907,8 +881,6 @@ async fn test_get_collections_with_missing_certificates() { /* tx_consensus */ tx_new_certificates_1, /* rx_consensus */ rx_feedback_1, /* external_consensus */ - tx_get_block_commands_1, - rx_get_block_commands_1, Some(Arc::new( Dag::new(&committee, rx_new_certificates_1, consensus_metrics).1, )), @@ -941,9 +913,6 @@ async fn test_get_collections_with_missing_certificates() { let (tx_new_certificates_2, _) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = test_utils::test_committed_certificates_channel!(CHANNEL_CAPACITY); - let (tx_get_block_commands_2, rx_get_block_commands_2) = - test_utils::test_get_block_commands!(1); - let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); @@ -958,8 +927,6 @@ async fn test_get_collections_with_missing_certificates() { store_primary_2.payload_store, /* tx_consensus */ tx_new_certificates_2, /* rx_consensus */ rx_feedback_2, - tx_get_block_commands_2, - rx_get_block_commands_2, /* external_consensus */ None, NetworkModel::Asynchronous, diff --git a/test_utils/src/cluster.rs b/test_utils/src/cluster.rs index 5155273ad..2c2c3ef44 100644 --- a/test_utils/src/cluster.rs +++ b/test_utils/src/cluster.rs @@ -697,24 +697,6 @@ impl AuthorityDetails { .collect() } - /// Creates a new proposer client that connects to the corresponding client. - /// This should be available only if the internal consensus is disabled. If - /// the internal consensus is enabled then a panic will be thrown instead. - pub async fn new_proposer_client(&self) -> ProposerClient { - let internal = self.internal.read().await; - - if internal.primary.internal_consensus_enabled { - panic!("External consensus is disabled, won't create a proposer client"); - } - - let config = mysten_network::config::Config::new(); - let channel = config - .connect_lazy(&internal.primary.parameters.consensus_api_grpc.socket_addr) - .unwrap(); - - ProposerClient::new(channel) - } - /// This method returns a new client to send transactions to the dictated /// worker identified by the `worker_id`. If the worker_id is not found then /// a panic is raised. @@ -738,6 +720,24 @@ impl AuthorityDetails { TransactionsClient::new(channel) } + /// Creates a new proposer client that connects to the corresponding client. + /// This should be available only if the internal consensus is disabled. If + /// the internal consensus is enabled then a panic will be thrown instead. + pub async fn new_proposer_client(&self) -> ProposerClient { + let internal = self.internal.read().await; + + if internal.primary.internal_consensus_enabled { + panic!("External consensus is disabled, won't create a proposer client"); + } + + let config = mysten_network::config::Config::new(); + let channel = config + .connect_lazy(&internal.primary.parameters.consensus_api_grpc.socket_addr) + .unwrap(); + + ProposerClient::new(channel) + } + /// Creates a new configuration client that connects to the corresponding client. /// This should be available only if the internal consensus is disabled. If /// the internal consensus is enabled then a panic will be thrown instead. diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 18939126c..c3d8334b5 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -94,20 +94,6 @@ macro_rules! test_new_certificates_channel { }; } -#[macro_export] -macro_rules! test_get_block_commands { - ($e:expr) => { - types::metered_channel::channel( - $e, - &prometheus::IntGauge::new( - primary::PrimaryChannelMetrics::NAME_GET_BLOCK_COMMANDS, - primary::PrimaryChannelMetrics::DESC_GET_BLOCK_COMMANDS, - ) - .unwrap(), - ); - }; -} - //////////////////////////////////////////////////////////////// /// Keys, Committee ////////////////////////////////////////////////////////////////