diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 0f5d10a0e..861da146d 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -11,14 +11,12 @@ pub mod bullshark; pub mod consensus; pub mod dag; pub mod metrics; -pub mod subscriber; pub mod tusk; mod utils; -pub use crate::{consensus::Consensus, subscriber::SubscriberHandler}; +pub use crate::consensus::Consensus; use serde::{Deserialize, Serialize}; -use std::ops::RangeInclusive; use types::{Certificate, SequenceNumber}; /// The default channel size used in the consensus and subscriber logic. @@ -32,10 +30,3 @@ pub struct ConsensusOutput { /// The (global) index associated with this certificate. pub consensus_index: SequenceNumber, } - -/// The message sent by the client to sync missing chunks of the output sequence. -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsensusSyncRequest { - /// The sequence numbers of the missing consensus outputs. - pub missing: RangeInclusive, -} diff --git a/consensus/src/subscriber.rs b/consensus/src/subscriber.rs deleted file mode 100644 index 15306df59..000000000 --- a/consensus/src/subscriber.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use crate::{ConsensusOutput, ConsensusSyncRequest}; -use std::sync::Arc; -use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, - task::JoinHandle, -}; -use tracing::{debug, error}; -use types::{Certificate, CertificateDigest, ConsensusStore, ReconfigureNotification, StoreResult}; - -#[cfg(test)] -#[path = "tests/subscriber_tests.rs"] -pub mod subscriber_tests; - -/// Convenience alias indicating the persistent storage holding certificates. -type CertificateStore = store::Store; - -/// Pushes the consensus output to subscriber clients and helps them to remain up to date. -pub struct SubscriberHandler { - // The persistent store holding the consensus state. - consensus_store: Arc, - // The persistent store holding the certificates. - certificate_store: CertificateStore, - /// Receive reconfiguration update. - rx_reconfigure: watch::Receiver, - // Channel to receive the output of consensus. - rx_sequence: Receiver, - /// Channel to receive sync requests from the client. - rx_client: Receiver, - /// Channel to send new consensus outputs to the client. - tx_client: Sender, -} - -impl SubscriberHandler { - /// Spawn a new subscriber handler in a dedicated tokio task. - #[must_use] - pub fn spawn( - consensus_store: Arc, - certificate_store: CertificateStore, - rx_reconfigure: watch::Receiver, - rx_sequence: Receiver, - rx_client: Receiver, - tx_client: Sender, - ) -> JoinHandle<()> { - tokio::spawn(async move { - Self { - consensus_store, - certificate_store, - rx_reconfigure, - rx_sequence, - rx_client, - tx_client, - } - .run() - .await - .expect("Failed to run subscriber") - }) - } - - /// Main loop responsible to update the client with the latest consensus outputs and answer - /// its sync requests. - async fn run(&mut self) -> StoreResult<()> { - loop { - tokio::select! { - // Forward new consensus outputs to the client. - Some(message) = self.rx_sequence.recv() => { - if self.tx_client.send(message).await.is_err() { - debug!("Client connection dropped"); - } - }, - - // Receive client sync requests. - Some(request) = self.rx_client.recv() => self - .synchronize(request) - .await?, - - // Check whether the committee changed. - result = self.rx_reconfigure.changed() => { - result.expect("Committee channel dropped"); - let message = self.rx_reconfigure.borrow().clone(); - if let ReconfigureNotification::Shutdown = message { - return Ok(()); - } - } - } - } - } - - /// Help the subscriber missing chunks of the output sequence to get up to speed. - async fn synchronize(&mut self, request: ConsensusSyncRequest) -> StoreResult<()> { - // Load the digests from the consensus store. - let digests = self - .consensus_store - .read_sequenced_certificates(&request.missing)? - .into_iter() - .take_while(|x| x.is_some()) - .map(|x| x.unwrap()); - - // Load the actual certificates from the certificate store. - let certificates = self.certificate_store.read_all(digests).await?; - - // Transmit each certificate to the subscriber (in the right order). - for (certificate, consensus_index) in certificates.into_iter().zip(request.missing) { - match certificate { - Some(certificate) => { - let message = ConsensusOutput { - certificate, - consensus_index, - }; - if self.tx_client.send(message).await.is_err() { - debug!("Connection dropped by client"); - break; - } - } - None => { - // TODO: We should return an error and exit the task. - error!("Inconsistency between consensus and certificates store"); - break; - } - } - } - Ok(()) - } -} diff --git a/consensus/src/tests/subscriber_tests.rs b/consensus/src/tests/subscriber_tests.rs deleted file mode 100644 index fa1f6e36f..000000000 --- a/consensus/src/tests/subscriber_tests.rs +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use super::*; -use crate::{ - metrics::ConsensusMetrics, - tusk::{tusk_tests::*, Tusk}, - Consensus, ConsensusOutput, ConsensusSyncRequest, SubscriberHandler, -}; -use crypto::{traits::KeyPair, Hash}; -use futures::future::join_all; -use prometheus::Registry; -use std::collections::{BTreeSet, VecDeque}; -use test_utils::{keys, make_consensus_store, mock_committee}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use types::metered_channel; - -/// Make enough certificates to commit a leader. -pub fn commit_certificates() -> VecDeque { - // Make certificates for rounds 1 to 4. - let keys: Vec<_> = keys(None) - .into_iter() - .map(|kp| kp.public().clone()) - .collect(); - let genesis = Certificate::genesis(&mock_committee(&keys[..])) - .iter() - .map(|x| x.digest()) - .collect::>(); - let (mut certificates, next_parents) = - test_utils::make_optimal_certificates(1..=4, &genesis, &keys); - - // Make one certificate with round 5 to trigger the commits. - let (_, certificate) = test_utils::mock_certificate(keys[0].clone(), 5, next_parents); - certificates.push_back(certificate); - certificates -} - -/// Spawn the consensus core and the subscriber handler. Also add to storage enough certificates to -/// commit a leader (as if they were added by the Primary). -pub async fn spawn_node( - rx_waiter: metered_channel::Receiver, - rx_client: Receiver, - tx_client: Sender, -) -> (watch::Sender, Vec>) { - // Make enough certificates to commit a leader. - let certificates = commit_certificates(); - - // Make the committee. - let keys: Vec<_> = keys(None) - .into_iter() - .map(|kp| kp.public().clone()) - .collect(); - - let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); - let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); - - // Create the storages. - let consensus_store_path = test_utils::temp_dir(); - let consensus_store = make_consensus_store(&consensus_store_path); - let certificate_store_path = test_utils::temp_dir(); - let certificate_store = make_certificate_store(&certificate_store_path); - - // Persist the certificates to storage (they may be require by the synchronizer). - let to_store = certificates.into_iter().map(|x| (x.digest(), x)); - certificate_store.write_all(to_store).await.unwrap(); - - // Spawn the consensus engine and sink the primary channel. - let (tx_primary, mut rx_primary) = test_utils::test_channel!(1); - let (tx_output, rx_output) = channel(1); - let gc_depth = 50; - let tusk = Tusk::new(committee.clone(), consensus_store.clone(), gc_depth); - let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let consensus_handle = Consensus::spawn( - committee, - consensus_store.clone(), - certificate_store.clone(), - tx_reconfigure.subscribe(), - rx_waiter, - tx_primary, - tx_output, - tusk, - metrics, - gc_depth, - ); - tokio::spawn(async move { while rx_primary.recv().await.is_some() {} }); - - // Spawn the subscriber handler. - let subscriber_handle = SubscriberHandler::spawn( - consensus_store, - certificate_store, - rx_reconfigure, - /* rx_sequence */ rx_output, - rx_client, - tx_client, - ); - - (tx_reconfigure, vec![consensus_handle, subscriber_handle]) -} - -/// Facility to read consensus outputs out of a stream and return them in the right order. -pub async fn order_stream( - reader: &mut Receiver, - last_known_client_index: u64, - last_known_server_index: u64, -) -> Vec { - let mut next_ordinary_sequence = last_known_server_index + 1; - let mut next_catchup_sequence = last_known_client_index + 1; - let mut buffer = Vec::new(); - let mut sequence = Vec::new(); - loop { - let output = reader.recv().await.unwrap(); - let consensus_index = output.consensus_index; - - if consensus_index == next_ordinary_sequence { - buffer.push(output); - next_ordinary_sequence += 1; - } else if consensus_index == next_catchup_sequence { - sequence.push(output); - next_catchup_sequence += 1; - } else { - panic!("Unexpected consensus index"); - } - - if consensus_index == last_known_server_index { - break; - } - } - - sequence.extend(buffer); - sequence -} - -#[tokio::test] -async fn subscribe() { - let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1); - let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1); - let (_tx_client_to_consensus, rx_client_to_consensus) = channel(1); - - // Make enough certificates to commit a leader. - let mut certificates = commit_certificates(); - - // Spawn the consensus and subscriber handler. - let (_tx_reconfigure, _handles) = spawn_node( - rx_consensus_input, - rx_client_to_consensus, - tx_consensus_to_client, - ) - .await; - - // Feed all certificates to the consensus. Only the last certificate should trigger commits, - while let Some(certificate) = certificates.pop_front() { - tx_consensus_input.send(certificate).await.unwrap(); - } - - // Ensure the first 4 ordered certificates have the expected consensus index. Note that we - // need to feed 5 certificates to consensus to trigger a commit. - for i in 0..=4 { - let output = rx_consensus_to_client.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - } -} - -#[tokio::test] -async fn subscribe_sync() { - let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1); - let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1); - let (tx_client_to_consensus, rx_client_to_consensus) = channel(1); - - // Make enough certificates to commit a leader. - let mut certificates = commit_certificates(); - - // Spawn the consensus and subscriber handler. - let (_tx_reconfigure, _handles) = spawn_node( - rx_consensus_input, - rx_client_to_consensus, - tx_consensus_to_client, - ) - .await; - - // Feed all certificates to the consensus. Only the last certificate should trigger commits, - // so the task should not block. - while let Some(certificate) = certificates.pop_front() { - tx_consensus_input.send(certificate).await.unwrap(); - } - - // Read first 4 certificates. Then pretend we crashed after reading the first certificate and - // try to sync to get up to speed. - for i in 0..=4 { - let output = rx_consensus_to_client.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - } - - let last_known_client_index = 1; - let last_known_server_index = 4; - - let message = ConsensusSyncRequest { - missing: (last_known_client_index + 1..=last_known_server_index), - }; - tx_client_to_consensus.send(message).await.unwrap(); - - // Check that we got the complete sequence of certificates in the right order. - let ok = order_stream( - &mut rx_consensus_to_client, - last_known_client_index, - last_known_server_index, - ) - .await - .into_iter() - .enumerate() - .all(|(i, output)| output.consensus_index == last_known_client_index + 1 + i as u64); - assert!(ok); -} - -#[tokio::test] -async fn restart() { - let (tx_consensus_input, rx_consensus_input) = test_utils::test_channel!(1); - let (tx_consensus_to_client, mut rx_consensus_to_client) = channel(1); - let (_tx_client_to_consensus, rx_client_to_consensus) = channel(1); - - // Make enough certificates to commit a leader. - let mut certificates = commit_certificates(); - - // Spawn the consensus and subscriber handler. - let (tx_reconfigure, handles) = spawn_node( - rx_consensus_input, - rx_client_to_consensus, - tx_consensus_to_client, - ) - .await; - - // Feed all certificates to the consensus. Only the last certificate should trigger commits, - while let Some(certificate) = certificates.pop_front() { - tx_consensus_input.send(certificate).await.unwrap(); - } - - // Ensure the first 4 ordered certificates have the expected consensus index. Note that we - // need to feed 5 certificates to consensus to trigger a commit. - for i in 0..=4 { - let output = rx_consensus_to_client.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - } - - // Send a shutdown message. - let message = ReconfigureNotification::Shutdown; - tx_reconfigure.send(message).unwrap(); - - // Ensure all tasks properly shut down. - join_all(handles).await; -} diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 60f758e53..99e5ec90b 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -4,7 +4,6 @@ use config::WorkerId; use std::fmt::Debug; use store::StoreError; use thiserror::Error; -use types::SequenceNumber; #[macro_export] macro_rules! bail { @@ -32,12 +31,6 @@ pub enum SubscriberError { #[error("Consensus referenced unexpected worker id {0}")] UnexpectedWorkerId(WorkerId), - #[error("Unexpected consensus index number {0}")] - UnexpectedConsensusIndex(SequenceNumber), - - #[error("The client-consensus connection dropped")] - ConsensusConnectionDropped, - #[error("Connection with the transaction executor dropped")] ExecutorConnectionDropped, diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 5bfa377bf..73c604a24 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -14,17 +14,13 @@ mod fixtures; #[path = "tests/execution_state.rs"] mod execution_state; -#[cfg(test)] -#[path = "tests/sequencer.rs"] -mod sequencer; - pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult}; pub use state::ExecutionIndices; use crate::{batch_loader::BatchLoader, core::Core, subscriber::Subscriber}; use async_trait::async_trait; use config::SharedCommittee; -use consensus::{ConsensusOutput, ConsensusSyncRequest}; +use consensus::ConsensusOutput; use crypto::PublicKey; use serde::de::DeserializeOwned; use std::{fmt::Debug, sync::Arc}; @@ -100,7 +96,6 @@ impl Executor { execution_state: Arc, tx_reconfigure: &watch::Sender, rx_consensus: Receiver, - tx_consensus: Sender, tx_output: Sender>, ) -> SubscriberResult>> where @@ -117,19 +112,13 @@ impl Executor { SubscriberError::OnlyOneConsensusClientPermitted ); - // Load the subscriber state from storage. - let execution_indices = execution_state.load_execution_indices().await?; - let next_consensus_index = execution_indices.next_certificate_index; - // Spawn the subscriber. let subscriber_handle = Subscriber::spawn( store.clone(), tx_reconfigure.subscribe(), rx_consensus, - tx_consensus, tx_batch_loader, tx_executor, - next_consensus_index, ); // Spawn the executor's core. diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index c1ff6aacd..62a0b8c46 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,15 +1,11 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{ - bail, - errors::{SubscriberError, SubscriberResult}, -}; -use consensus::{ConsensusOutput, ConsensusSyncRequest}; +use crate::errors::{SubscriberError, SubscriberResult}; +use consensus::ConsensusOutput; use futures::{ future::try_join_all, stream::{FuturesOrdered, StreamExt}, }; -use std::cmp::Ordering; use store::Store; use tokio::{ sync::{ @@ -19,16 +15,15 @@ use tokio::{ task::JoinHandle, }; use tracing::debug; -use types::{BatchDigest, ReconfigureNotification, SequenceNumber, SerializedBatchMessage}; +use types::{BatchDigest, ReconfigureNotification, SerializedBatchMessage}; #[cfg(test)] #[path = "tests/subscriber_tests.rs"] pub mod subscriber_tests; -/// The `Subscriber` receives certificates sequenced by the consensus and execute every -/// transaction it references. We assume that the messages we receives from consensus has -/// already been authenticated (ie. they really come from a trusted consensus node) and -/// integrity-validated (ie. no corrupted messages). +/// The `Subscriber` receives certificates sequenced by the consensus and waits until the +/// `BatchLoader` downloaded all the transactions references by the certificates; it then +/// forward the certificates to the Executor Core. pub struct Subscriber { /// The temporary storage holding all transactions' data (that may be too big to hold in memory). store: Store, @@ -36,15 +31,11 @@ pub struct Subscriber { rx_reconfigure: watch::Receiver, /// A channel to receive consensus messages. rx_consensus: Receiver, - /// A channel to send sync request to consensus for missed messages. - tx_consensus: Sender, /// A channel to the batch loader to download transaction's data. tx_batch_loader: Sender, /// A channel to send the complete and ordered list of consensus outputs to the executor. This /// channel is used once all transactions data are downloaded. tx_executor: Sender, - /// The index of the next expected consensus output. - next_consensus_index: SequenceNumber, } impl Subscriber { @@ -54,20 +45,16 @@ impl Subscriber { store: Store, rx_reconfigure: watch::Receiver, rx_consensus: Receiver, - tx_consensus: Sender, tx_batch_loader: Sender, tx_executor: Sender, - next_consensus_index: SequenceNumber, ) -> JoinHandle<()> { tokio::spawn(async move { Self { store, rx_reconfigure, rx_consensus, - tx_consensus, tx_batch_loader, tx_executor, - next_consensus_index, } .run() .await @@ -75,94 +62,6 @@ impl Subscriber { }) } - /// Synchronize with the consensus in case we missed part of its output sequence. - /// It is safety-critical that we process the consensus' outputs in the complete - /// and right order. This function reads the consensus outputs out of a stream and - /// return them in the right order. - async fn synchronize( - &mut self, - last_known_client_index: SequenceNumber, - last_known_server_index: SequenceNumber, - ) -> SubscriberResult> { - // Send a sync request. - let request = ConsensusSyncRequest { - missing: (last_known_client_index + 1..=last_known_server_index), - }; - self.tx_consensus - .send(request) - .await - .map_err(|_| SubscriberError::ConsensusConnectionDropped)?; - - // Read the replies. - let mut next_ordinary_sequence = last_known_server_index + 1; - let mut next_catchup_sequence = last_known_client_index + 1; - let mut buffer = Vec::new(); - let mut sequence = Vec::new(); - loop { - let output = match self.rx_consensus.recv().await { - Some(x) => x, - None => bail!(SubscriberError::ConsensusConnectionDropped), - }; - let consensus_index = output.consensus_index; - - if consensus_index == next_ordinary_sequence { - buffer.push(output); - next_ordinary_sequence += 1; - } else if consensus_index == next_catchup_sequence { - sequence.push(output); - next_catchup_sequence += 1; - } else { - bail!(SubscriberError::UnexpectedConsensusIndex(consensus_index)); - } - - if consensus_index == last_known_server_index { - break; - } - } - - sequence.extend(buffer); - Ok(sequence) - } - - /// Process a single consensus output message. If we realize we are missing part of the sequence, - /// we first sync every missing output and return them on the right order. - async fn handle_consensus_message( - &mut self, - message: &ConsensusOutput, - ) -> SubscriberResult> { - let consensus_index = message.consensus_index; - - // Check that the latest consensus index is as expected; otherwise synchronize. - let need_to_sync = match self.next_consensus_index.cmp(&consensus_index) { - Ordering::Greater => { - // That is fine, it may happen when the consensus node crashes and recovers. - debug!("Consensus index of authority bigger than expected"); - return Ok(Vec::default()); - } - Ordering::Less => { - debug!("Subscriber is synchronizing missed consensus output messages"); - true - } - Ordering::Equal => false, - }; - - // Send the certificate to the batch loader to download all transactions' data. - self.tx_batch_loader - .send(message.clone()) - .await - .expect("Failed to send message ot batch loader"); - - // Synchronize missing consensus outputs if we need to. - if need_to_sync { - let last_known_client_index = self.next_consensus_index; - let last_known_server_index = message.consensus_index; - self.synchronize(last_known_client_index, last_known_server_index) - .await - } else { - Ok(vec![message.clone()]) - } - } - /// Wait for particular data to become available in the storage and then returns. async fn waiter( missing: Vec, @@ -185,29 +84,28 @@ impl Subscriber { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. Some(message) = self.rx_consensus.recv() => { - // Process the consensus message (synchronize missing messages, download transaction data). - let sequence = self.handle_consensus_message(&message).await?; - - // Update the latest consensus index. The state will atomically persist the change when - // executing the transaction. It is important to increment the consensus index before - // deserializing the transaction data because the consensus core will increment its own - // index regardless of deserialization or other application-specific failures. - self.next_consensus_index += sequence.len() as SequenceNumber; - - // Wait for the transaction data to be available in the store. We will then execute the transactions. - for message in sequence { - let digests = message.certificate.header.payload.keys().cloned().collect(); - let future = Self::waiter(digests, self.store.clone(), message); - waiting.push(future); - } + // Send the certificate to the batch loader to download all transactions' data. + self.tx_batch_loader + .send(message.clone()) + .await + .expect("Failed to send message ot batch loader"); + + // Wait for the transaction data to be available in the store. This will happen for sure because + // the primary already successfully processed the certificate. This implies that the primary notified + // its worker to download any missing batch. We may however have to wait for these batch be available + // on our workers. Once all batches are available, we forward the certificate o the Executor Core. + let digests = message.certificate.header.payload.keys().cloned().collect(); + let future = Self::waiter(digests, self.store.clone(), message); + waiting.push(future); }, // Receive here consensus messages for which we have downloaded all transactions data. - Some(message) = waiting.next() => self - .tx_executor - .send(message?) - .await - .map_err(|_| SubscriberError::ExecutorConnectionDropped)?, + Some(message) = waiting.next() => { + if self.tx_executor.send(message?).await.is_err() { + debug!("Executor core is shutting down"); + return Ok(()); + } + }, // Check whether the committee changed. result = self.rx_reconfigure.changed() => { diff --git a/executor/src/tests/sequencer.rs b/executor/src/tests/sequencer.rs deleted file mode 100644 index 0194e51bd..000000000 --- a/executor/src/tests/sequencer.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use consensus::{ConsensusOutput, ConsensusSyncRequest}; - -use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::warn; -use types::{Certificate, SequenceNumber}; - -pub struct MockSequencer { - rx_sequence: Receiver, - rx_client: Receiver, - tx_client: Sender, - consensus_index: SequenceNumber, - sequence: Vec, -} - -impl MockSequencer { - pub fn spawn( - rx_sequence: Receiver, - rx_client: Receiver, - tx_client: Sender, - ) { - tokio::spawn(async move { - Self { - rx_sequence, - rx_client, - tx_client, - consensus_index: SequenceNumber::default(), - sequence: Vec::new(), - } - .run() - .await; - }); - } - - async fn synchronize(&mut self, request: ConsensusSyncRequest) { - for i in request.missing { - let message = self.sequence[i as usize].clone(); - if self.tx_client.send(message).await.is_err() { - warn!("Failed to deliver sequenced message to client"); - break; - } - } - } - - async fn run(&mut self) { - loop { - tokio::select! { - // Update the subscriber every time a message is sequenced. - Some(certificate) = self.rx_sequence.recv() => { - let message = ConsensusOutput { - certificate, - consensus_index: self.consensus_index - }; - - self.consensus_index += 1; - self.sequence.push(message.clone()); - - if self.tx_client.send(message).await.is_err() { - warn!("Failed to deliver sequenced message to client"); - } - }, - - // Receive sync requests form the subscriber. - Some(request) = self.rx_client.recv() => self.synchronize(request).await, - } - } - } -} diff --git a/executor/src/tests/subscriber_tests.rs b/executor/src/tests/subscriber_tests.rs index c972c1212..9ce10658f 100644 --- a/executor/src/tests/subscriber_tests.rs +++ b/executor/src/tests/subscriber_tests.rs @@ -1,44 +1,32 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use super::*; -use crate::{ - fixtures::{test_store, test_u64_certificates}, - sequencer::MockSequencer, -}; +use crate::fixtures::{test_store, test_u64_certificates}; use test_utils::committee; use tokio::sync::mpsc::{channel, Sender}; -use types::Certificate; +use types::{Certificate, SequenceNumber}; /// Spawn a mock consensus core and a test subscriber. -async fn spawn_consensus_and_subscriber( - rx_sequence: Receiver, +async fn spawn_subscriber( + rx_sequence: Receiver, tx_batch_loader: Sender, tx_executor: Sender, ) -> ( Store, watch::Sender, ) { - let (tx_consensus_to_client, rx_consensus_to_client) = channel(10); - let (tx_client_to_consensus, rx_client_to_consensus) = channel(10); - let committee = committee(None); let message = ReconfigureNotification::NewEpoch(committee); let (tx_reconfigure, rx_reconfigure) = watch::channel(message); - // Spawn a mock consensus core. - MockSequencer::spawn(rx_sequence, rx_client_to_consensus, tx_consensus_to_client); - // Spawn a test subscriber. let store = test_store(); - let next_consensus_index = SequenceNumber::default(); - let _subsscriber_handle = Subscriber::spawn( + let _subscriber_handle = Subscriber::spawn( store.clone(), rx_reconfigure, - rx_consensus_to_client, - tx_client_to_consensus, + rx_sequence, tx_batch_loader, tx_executor, - next_consensus_index, ); (store, tx_reconfigure) @@ -52,7 +40,7 @@ async fn handle_certificate_with_downloaded_batch() { // Spawn a subscriber. let (store, _tx_reconfigure) = - spawn_consensus_and_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; + spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; // Feed certificates to the mock sequencer and ensure the batch loader receive the command to // download the corresponding transaction data. @@ -62,11 +50,15 @@ async fn handle_certificate_with_downloaded_batch() { /* batches_per_certificate */ 2, /* transactions_per_batch */ 2, ); - for (certificate, batches) in certificates { + for (i, (certificate, batches)) in certificates.into_iter().enumerate() { for (digest, batch) in batches { store.write(digest, batch).await; } - tx_sequence.send(certificate).await.unwrap(); + let message = ConsensusOutput { + certificate, + consensus_index: i as SequenceNumber, + }; + tx_sequence.send(message).await.unwrap(); } for i in 0..total_certificates { @@ -85,64 +77,18 @@ async fn handle_empty_certificate() { let (tx_executor, mut rx_executor) = channel(10); // Spawn a subscriber. - let _do_not_drop = - spawn_consensus_and_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; + let _do_not_drop = spawn_subscriber(rx_sequence, tx_batch_loader, tx_executor).await; // Feed certificates to the mock sequencer and ensure the batch loader receive the command to // download the corresponding transaction data. - for _ in 0..2 { - tx_sequence.send(Certificate::default()).await.unwrap(); - } for i in 0..2 { - let output = rx_batch_loader.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - - let output = rx_executor.recv().await.unwrap(); - assert_eq!(output.consensus_index, i); - } -} - -#[tokio::test] -async fn synchronize() { - let (tx_sequence, rx_sequence) = channel(10); - let (tx_batch_loader, mut rx_batch_loader) = channel(10); - let (tx_executor, mut rx_executor) = channel(10); - let (tx_consensus_to_client, rx_consensus_to_client) = channel(10); - let (tx_client_to_consensus, rx_client_to_consensus) = channel(10); - - let committee = committee(None); - let message = ReconfigureNotification::NewEpoch(committee); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); - - // Spawn a mock consensus core. - MockSequencer::spawn(rx_sequence, rx_client_to_consensus, tx_consensus_to_client); - - // Send two certificates. - for _ in 0..2 { - tx_sequence.send(Certificate::default()).await.unwrap(); + let message = ConsensusOutput { + certificate: Certificate::default(), + consensus_index: i as SequenceNumber, + }; + tx_sequence.send(message).await.unwrap(); } - tokio::task::yield_now().await; - - // Spawn a subscriber. - let store = test_store(); - let next_consensus_index = SequenceNumber::default(); - let _subscriber_handle = Subscriber::spawn( - store.clone(), - rx_reconfigure, - rx_consensus_to_client, - tx_client_to_consensus, - tx_batch_loader, - tx_executor, - next_consensus_index, - ); - - // Send two extra certificates. The client needs to sync for the first two certificates. - for _ in 0..2 { - tx_sequence.send(Certificate::default()).await.unwrap(); - } - - // Ensure the client synchronizes the first twi certificates. - for i in 0..4 { + for i in 0..2 { let output = rx_batch_loader.recv().await.unwrap(); assert_eq!(output.consensus_index, i); diff --git a/node/src/lib.rs b/node/src/lib.rs index cb0dd5815..6e1103602 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,9 +1,7 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use config::{Parameters, SharedCommittee, WorkerId}; -use consensus::{ - bullshark::Bullshark, dag::Dag, metrics::ConsensusMetrics, Consensus, SubscriberHandler, -}; +use consensus::{bullshark::Bullshark, dag::Dag, metrics::ConsensusMetrics, Consensus}; use crypto::{ traits::{KeyPair as _, VerifyingKey}, KeyPair, PublicKey, @@ -257,8 +255,6 @@ impl Node { State::Error: Debug, { let (tx_sequence, rx_sequence) = channel(Self::CHANNEL_CAPACITY); - let (tx_consensus_to_client, rx_consensus_to_client) = channel(Self::CHANNEL_CAPACITY); - let (tx_client_to_consensus, rx_client_to_consensus) = channel(Self::CHANNEL_CAPACITY); let consensus_metrics = Arc::new(ConsensusMetrics::new(registry)); // Spawn the consensus core who only sequences transactions. @@ -280,18 +276,6 @@ impl Node { parameters.gc_depth, ); - // The subscriber handler receives the ordered sequence from consensus and feed them - // to the executor. The executor has its own state and data store who may crash - // independently of the narwhal node. - let subscriber_handles = SubscriberHandler::spawn( - store.consensus_store.clone(), - store.certificate_store.clone(), - tx_reconfigure.subscribe(), - rx_sequence, - /* rx_client */ rx_client_to_consensus, - /* tx_client */ tx_consensus_to_client, - ); - // Spawn the client executing the transactions. It can also synchronize with the // subscriber handler if it missed some transactions. let executor_handles = Executor::spawn( @@ -300,8 +284,7 @@ impl Node { store.batch_store.clone(), execution_state, tx_reconfigure, - /* rx_consensus */ rx_consensus_to_client, - /* tx_consensus */ tx_client_to_consensus, + /* rx_consensus */ rx_sequence, /* tx_output */ tx_confirmation, ) .await?; @@ -309,7 +292,6 @@ impl Node { Ok(executor_handles .into_iter() .chain(std::iter::once(consensus_handles)) - .chain(std::iter::once(subscriber_handles)) .collect()) }