From 925fd7da334c8555de4b1d03cb9eefce59c966e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Mon, 29 Aug 2022 12:08:20 -0400 Subject: [PATCH] revert: 030517045 - Batch execution with single execution adapter (#818) --- crypto/src/tests/bls12377_tests.rs | 6 +- executor/src/core.rs | 300 ++++++------------ executor/src/errors.rs | 2 +- executor/src/lib.rs | 69 ++-- executor/src/tests/execution_state.rs | 42 ++- executor/src/tests/executor_tests.rs | 40 ++- executor/tests/consensus_integration_tests.rs | 1 + node/src/execution_state.rs | 22 +- node/src/lib.rs | 24 +- node/src/main.rs | 9 +- node/src/restarter.rs | 9 +- node/tests/reconfigure.rs | 33 +- primary/src/aggregators.rs | 3 +- primary/src/core.rs | 3 +- primary/src/primary.rs | 3 +- primary/src/proposer.rs | 3 +- primary/src/state_handler.rs | 3 +- primary/src/tests/payload_receiver_tests.rs | 3 +- .../tests/integration_tests_validator_api.rs | 3 +- storage/src/certificate_store.rs | 29 +- test_utils/src/cluster.rs | 12 +- test_utils/src/lib.rs | 3 +- types/src/primary.rs | 3 +- worker/src/synchronizer.rs | 8 +- worker/src/tests/synchronizer_tests.rs | 3 +- 25 files changed, 247 insertions(+), 389 deletions(-) diff --git a/crypto/src/tests/bls12377_tests.rs b/crypto/src/tests/bls12377_tests.rs index d861248a9..14f305d43 100644 --- a/crypto/src/tests/bls12377_tests.rs +++ b/crypto/src/tests/bls12377_tests.rs @@ -4,10 +4,10 @@ use crate::bls12377::{ BLS12377AggregateSignature, BLS12377KeyPair, BLS12377PrivateKey, BLS12377PublicKey, BLS12377PublicKeyBytes, BLS12377Signature, }; -use fastcrypto::traits::{ - AggregateAuthenticator, EncodeDecodeBase64, KeyPair, ToFromBytes, VerifyingKey, +use fastcrypto::{ + traits::{AggregateAuthenticator, EncodeDecodeBase64, KeyPair, ToFromBytes, VerifyingKey}, + Hash, SignatureService, }; -use fastcrypto::{Hash, SignatureService}; use crate::bls12377::CELO_BLS_PRIVATE_KEY_LENGTH; use rand::{rngs::StdRng, SeedableRng as _}; diff --git a/executor/src/core.rs b/executor/src/core.rs index 8a34b8f46..09a1a57ad 100644 --- a/executor/src/core.rs +++ b/executor/src/core.rs @@ -4,12 +4,9 @@ use crate::{ bail, errors::{SubscriberError, SubscriberResult}, state::ExecutionIndices, - BatchExecutionState, ExecutionState, ExecutorOutput, SerializedTransaction, - SingleExecutionState, + ExecutionState, ExecutorOutput, SerializedTransaction, }; -use async_trait::async_trait; use consensus::ConsensusOutput; -use futures::lock::Mutex; use std::{fmt::Debug, sync::Arc}; use store::Store; use tokio::{ @@ -36,6 +33,10 @@ pub struct Core { rx_reconfigure: watch::Receiver, /// Receive ordered consensus output to execute. rx_subscriber: metered_channel::Receiver, + /// Outputs executed transactions. + tx_output: Sender>, + /// The indices ensuring we do not execute twice the same transaction. + execution_indices: ExecutionIndices, } impl Drop for Core { @@ -46,7 +47,8 @@ impl Drop for Core { impl Core where - State: BatchExecutionState + Send + Sync + 'static, + State: ExecutionState + Send + Sync + 'static, + State::Outcome: Send + 'static, State::Error: Debug, { /// Spawn a new executor in a dedicated tokio task. @@ -56,13 +58,20 @@ where execution_state: Arc, rx_reconfigure: watch::Receiver, rx_subscriber: metered_channel::Receiver, + tx_output: Sender>, ) -> JoinHandle<()> { tokio::spawn(async move { + let execution_indices = execution_state + .load_execution_indices() + .await + .expect("Failed to load execution indices from store"); Self { store, execution_state, rx_reconfigure, rx_subscriber, + tx_output, + execution_indices, } .run() .await @@ -72,14 +81,6 @@ where /// Main loop listening to new certificates and execute them. async fn run(&mut self) -> SubscriberResult<()> { - let _next_certificate_index = self - .execution_state - .load_next_certificate_index() - .await - .expect("Failed to load execution indices from store"); - - // TODO: Replay certificates from the store. - loop { tokio::select! { // Execute all transactions associated with the consensus output message. @@ -105,41 +106,35 @@ where /// Execute a single certificate. async fn execute_certificate(&mut self, message: &ConsensusOutput) -> SubscriberResult<()> { - // Collect all transactions in all the batches. - let mut batches = Vec::new(); - - for batch_digest in message.certificate.header.payload.keys() { - batches.push(self.collect_batch(batch_digest).await?); + // Skip the certificate if it contains no transactions. + if message.certificate.header.payload.is_empty() { + self.execution_indices.skip_certificate(); + return Ok(()); } - let result = self - .execution_state - .handle_consensus(message, batches) - .await - .map_err(SubscriberError::from); - - match result { - Ok(()) => Ok(()), - Err(error @ SubscriberError::ClientExecutionError(_)) => { - // We may want to log the errors that are the user's fault (i.e., that are neither - // our fault or the fault of consensus) for debug purposes. It is safe to continue - // by ignoring those transactions since all honest subscribers will do the same. - debug!("{error}"); - Ok(()) - } - Err(error) => { - bail!(error) + // Execute every batch in the certificate. + let total_batches = message.certificate.header.payload.len(); + for (index, digest) in message.certificate.header.payload.keys().enumerate() { + // Skip batches that we already executed (after crash-recovery). + if self + .execution_indices + .check_next_batch_index(index as SequenceNumber) + { + self.execute_batch(message, *digest, total_batches).await?; } } + Ok(()) } - /// Collect all transactions in a batch - async fn collect_batch( + /// Execute a single batch of transactions. + async fn execute_batch( &mut self, - batch_digest: &BatchDigest, - ) -> SubscriberResult> { + consensus_output: &ConsensusOutput, + batch_digest: BatchDigest, + total_batches: usize, + ) -> SubscriberResult<()> { // The store should now hold all transaction data referenced by the input certificate. - let transactions = match self.store.read(*batch_digest).await? { + let transactions = match self.store.read(batch_digest).await? { Some(x) => x.0, None => { // If two certificates contain the exact same batch (eg. by the actions of a Byzantine @@ -148,129 +143,56 @@ where // the second batch since there is no point in executing twice the same transactions // (as the second execution attempt will always fail). debug!("Duplicate batch {batch_digest}"); - return Ok(Vec::new()); + self.execution_indices.skip_batch(total_batches); + return Ok(()); } }; - Ok(transactions) - } -} - -/// Executor that feeds transactions one by one to the execution state. -pub struct SingleExecutor -where - State: SingleExecutionState, -{ - /// The (global) state to perform execution. - execution_state: Arc, - /// The indices ensuring we do not execute twice the same transaction. - execution_indices: Mutex, - /// Outputs executed transactions. - tx_output: Sender>, -} - -#[async_trait] -impl ExecutionState for SingleExecutor -where - State: SingleExecutionState + Sync + Send + 'static, - State::Outcome: Sync + Send + 'static, - State::Error: Sync + Send + 'static, -{ - type Error = State::Error; - - fn ask_consensus_write_lock(&self) -> bool { - self.execution_state.ask_consensus_write_lock() - } - - fn release_consensus_write_lock(&self) { - self.execution_state.release_consensus_write_lock() - } -} - -#[async_trait] -impl BatchExecutionState for SingleExecutor -where - State: SingleExecutionState + Sync + Send + 'static, - State::Outcome: Sync + Send + 'static, - State::Error: Clone + Sync + Send + 'static, -{ - async fn load_next_certificate_index(&self) -> Result { - let indices = self.execution_state.load_execution_indices().await?; - let mut execution_indices = self.execution_indices.lock().await; - *execution_indices = indices; - Ok(execution_indices.next_certificate_index) - } - - async fn handle_consensus( - &self, - consensus_output: &ConsensusOutput, - transaction_batches: Vec>, - ) -> Result<(), Self::Error> { - let mut execution_indices = self.execution_indices.lock().await; - - if transaction_batches.is_empty() { - execution_indices.skip_certificate(); - } else { - // Execute every batch in the certificate. - let total_batches = transaction_batches.len(); - for (index, batch) in transaction_batches.into_iter().enumerate() { - // Skip batches that we already executed (after crash-recovery). - if execution_indices.check_next_batch_index(index as SequenceNumber) { - self.execute_batch( - &mut execution_indices, + // Execute every transaction in the batch. + let total_transactions = transactions.len(); + for (index, transaction) in transactions.into_iter().enumerate() { + // Skip transactions that we already executed (after crash-recovery). + if self + .execution_indices + .check_next_transaction_index(index as SequenceNumber) + { + // Execute the transaction + let result = self + .execute_transaction( consensus_output, - batch, + transaction.clone(), + total_transactions, total_batches, ) - .await?; - } - } - } - Ok(()) - } -} + .await; -impl SingleExecutor -where - State: SingleExecutionState, - State::Error: Clone, - State::Outcome: Sync + Send + 'static, -{ - pub fn new(execution_state: Arc, tx_output: Sender>) -> Arc { - Arc::new(Self { - execution_state, - execution_indices: Mutex::new(ExecutionIndices::default()), - tx_output, - }) - } + let (bail, result) = match result { + outcome @ Ok(..) => (None, outcome), - /// Execute a single batch of transactions. - async fn execute_batch( - &self, - execution_indices: &mut ExecutionIndices, - consensus_output: &ConsensusOutput, - transactions: Vec, - total_batches: usize, - ) -> Result<(), State::Error> { - if transactions.is_empty() { - execution_indices.skip_batch(total_batches); - return Ok(()); - } + // We may want to log the errors that are the user's fault (i.e., that are neither + // our fault or the fault of consensus) for debug purposes. It is safe to continue + // by ignoring those transactions since all honest subscribers will do the same. + Err(error @ SubscriberError::ClientExecutionError(_)) => { + debug!("{error}"); + (None, Err(error)) + } - // Execute every transaction in the batch. - let total_transactions = transactions.len(); - for (index, transaction) in transactions.into_iter().enumerate() { - // Skip transactions that we already executed (after crash-recovery). - if execution_indices.check_next_transaction_index(index as SequenceNumber) { - // Execute the transaction - self.execute_transaction( - execution_indices, - consensus_output, - transaction, - total_batches, - total_transactions, - ) - .await?; + // We must take special care to errors that are our fault, such as storage errors. + // We may be the only authority experiencing it, and thus cannot continue to process + // transactions until the problem is fixed. + Err(error) => (Some(error.clone()), Err(error)), + }; + + // Output the result (eg. to notify the end-user); + let output = (result, transaction); + if self.tx_output.send(output).await.is_err() { + debug!("No users listening for transaction execution"); + } + + // Bail if a fatal error occurred. + if let Some(e) = bail { + bail!(e); + } } } Ok(()) @@ -278,67 +200,37 @@ where /// Execute a single transaction. async fn execute_transaction( - &self, - execution_indices: &mut ExecutionIndices, + &mut self, consensus_output: &ConsensusOutput, serialized: SerializedTransaction, - total_batches: usize, total_transactions: usize, - ) -> Result<(), State::Error> { + total_batches: usize, + ) -> SubscriberResult<::Outcome> { // Compute the next expected indices. Those will be persisted upon transaction execution // and are only used for crash-recovery. - execution_indices.next(total_batches, total_transactions); + self.execution_indices + .next(total_batches, total_transactions); // The consensus simply orders bytes, so we first need to deserialize the transaction. // If the deserialization fail it is safe to ignore the transaction since all correct // clients will do the same. Remember that a bad authority or client may input random // bytes to the consensus. - let (result, outcome) = match bincode::deserialize::(&serialized) { - Err(e) => { - let error = SubscriberError::ClientExecutionError(format!( - "Failed to deserialize transaction: {e}" - )); - // There is always a chance that the fault lies with our deserialization. - debug!("{error}"); - (Ok(()), Err(error)) - } - Ok(transaction) => { - // Execute the transaction. Note that the executor will need to choose whether to discard - // transactions from previous epochs by itself. - let result = self - .execution_state - .handle_consensus_transaction( - consensus_output, - execution_indices.clone(), - transaction, - ) - .await; - - match result { - Ok(outcome) => (Ok(()), Ok(outcome)), - Err(error) => match SubscriberError::from(error.clone()) { - // We may want to log the errors that are the user's fault (i.e., that are neither - // our fault or the fault of consensus) for debug purposes. It is safe to continue - // by ignoring those transactions since all honest subscribers will do the same. - non_fatal @ SubscriberError::ClientExecutionError(_) => { - debug!("{non_fatal}"); - (Ok(()), Err(non_fatal)) - } - // We must take special care to errors that are our fault, such as storage errors. - // We may be the only authority experiencing it, and thus cannot continue to process - // transactions until the problem is fixed. - fatal => (Err(error), Err(fatal)), - }, - } - } + let transaction: State::Transaction = match bincode::deserialize(&serialized) { + Ok(x) => x, + Err(e) => bail!(SubscriberError::ClientExecutionError(format!( + "Failed to deserialize transaction: {e}" + ))), }; - // Output the result (eg. to notify the end-user); - let output = (outcome, serialized); - if self.tx_output.send(output).await.is_err() { - debug!("No users listening for transaction execution"); - } - - result + // Execute the transaction. Note that the executor will need to choose whether to discard + // transactions from previous epochs by itself. + self.execution_state + .handle_consensus_transaction( + consensus_output, + self.execution_indices.clone(), + transaction, + ) + .await + .map_err(SubscriberError::from) } } diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 699b94a9d..3630ceb15 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -81,7 +81,7 @@ impl From> for SubscriberError { } } -/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii) +/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii) /// errors caused by a fault in the authority. pub trait ExecutionStateError: std::error::Error { /// Whether the error is due to a fault in the authority (eg. internal storage error). diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 158b7a481..e087c97b6 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -26,12 +26,12 @@ use prometheus::Registry; use serde::de::DeserializeOwned; use std::{fmt::Debug, sync::Arc}; use store::Store; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{ + sync::{mpsc::Sender, watch}, + task::JoinHandle, +}; use tracing::info; -use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber}; - -// Re-export SingleExecutor as a convenience adapter. -pub use crate::core::SingleExecutor; +use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification}; /// Convenience type representing a serialized transaction. pub type SerializedTransaction = Vec; @@ -41,47 +41,12 @@ pub type SerializedTransactionDigest = u64; #[async_trait] pub trait ExecutionState { - /// The error type to return in case something went wrong during execution. - type Error: ExecutionStateError; - - /// Simple guardrail ensuring there is a single instance using the state - /// to call `handle_consensus_transaction`. Many instances may read the state, - /// or use it for other purposes. - fn ask_consensus_write_lock(&self) -> bool; - - /// Tell the state that the caller instance is no longer using calling - //// `handle_consensus_transaction`. - fn release_consensus_write_lock(&self); -} - -/// Execution state that gets whole certificates and the corresponding batches -/// for execution. It is responsible for deduplication in case the same certificate -/// is re-delivered after a crash. -#[async_trait] -pub trait BatchExecutionState: ExecutionState { - /// Load the last consensus index from storage. - /// - /// It should return the index from which it expects a replay, so one higher than - /// the last certificate index it successfully committed. This is so it has the - /// same semantics as `ExecutionIndices`. - async fn load_next_certificate_index(&self) -> Result; - - /// Execute the transactions and atomically persist the consensus index. - /// - /// TODO: This function should be allowed to return a new committee to reconfigure the system. - async fn handle_consensus( - &self, - consensus_output: &ConsensusOutput, - transaction_batches: Vec>, - ) -> Result<(), Self::Error>; -} - -/// Execution state that executes a single transaction at a time. -#[async_trait] -pub trait SingleExecutionState: ExecutionState { /// The type of the transaction to process. type Transaction: DeserializeOwned + Send + Debug; + /// The error type to return in case something went wrong during execution. + type Error: ExecutionStateError; + /// The execution outcome to output. type Outcome; @@ -95,15 +60,22 @@ pub trait SingleExecutionState: ExecutionState { transaction: Self::Transaction, ) -> Result; + /// Simple guardrail ensuring there is a single instance using the state + /// to call `handle_consensus_transaction`. Many instances may read the state, + /// or use it for other purposes. + fn ask_consensus_write_lock(&self) -> bool; + + /// Tell the state that the caller instance is no longer using calling + //// `handle_consensus_transaction`. + fn release_consensus_write_lock(&self); + /// Load the last consensus index from storage. - /// - /// It *must* return index that was last passed to `handle_consensus_transaction`. async fn load_execution_indices(&self) -> Result; } /// The output of the executor. pub type ExecutorOutput = ( - SubscriberResult<::Outcome>, + SubscriberResult<::Outcome>, SerializedTransaction, ); @@ -117,11 +89,13 @@ impl Executor { execution_state: Arc, tx_reconfigure: &watch::Sender, rx_consensus: metered_channel::Receiver, + tx_output: Sender>, tx_get_block_commands: metered_channel::Sender, registry: &Registry, ) -> SubscriberResult>> where - State: BatchExecutionState + Send + Sync + 'static, + State: ExecutionState + Send + Sync + 'static, + State::Outcome: Send + 'static, State::Error: Debug, { let metrics = ExecutorMetrics::new(registry); @@ -154,6 +128,7 @@ impl Executor { execution_state, tx_reconfigure.subscribe(), /* rx_subscriber */ rx_executor, + tx_output, ); // Return the handle. diff --git a/executor/src/tests/execution_state.rs b/executor/src/tests/execution_state.rs index 72db79d56..1fac845ef 100644 --- a/executor/src/tests/execution_state.rs +++ b/executor/src/tests/execution_state.rs @@ -1,6 +1,6 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState}; +use crate::{ExecutionIndices, ExecutionState, ExecutionStateError}; use async_trait::async_trait; use consensus::ConsensusOutput; @@ -14,14 +14,14 @@ use store::{ use thiserror::Error; /// A malformed transaction. -pub const MALFORMED_TRANSACTION: ::Transaction = 400; +pub const MALFORMED_TRANSACTION: ::Transaction = 400; /// A special transaction that makes the executor engine crash. -pub const KILLER_TRANSACTION: ::Transaction = 500; +pub const KILLER_TRANSACTION: ::Transaction = 500; /// A dumb execution state for testing. pub struct TestState { - indices_store: Store, + store: Store, } impl std::fmt::Debug for TestState { @@ -38,18 +38,8 @@ impl Default for TestState { #[async_trait] impl ExecutionState for TestState { - type Error = TestStateError; - - fn ask_consensus_write_lock(&self) -> bool { - true - } - - fn release_consensus_write_lock(&self) {} -} - -#[async_trait] -impl SingleExecutionState for TestState { type Transaction = u64; + type Error = TestStateError; type Outcome = Vec; async fn handle_consensus_transaction( @@ -63,16 +53,22 @@ impl SingleExecutionState for TestState { } else if transaction == KILLER_TRANSACTION { Err(Self::Error::ServerError) } else { - self.indices_store + self.store .write(Self::INDICES_ADDRESS, execution_indices) .await; Ok(Vec::default()) } } + fn ask_consensus_write_lock(&self) -> bool { + true + } + + fn release_consensus_write_lock(&self) {} + async fn load_execution_indices(&self) -> Result { let indices = self - .indices_store + .store .read(Self::INDICES_ADDRESS) .await .unwrap() @@ -87,21 +83,21 @@ impl TestState { /// Create a new test state. pub fn new(store_path: &Path) -> Self { - const INDICES_CF: &str = "test_state_indices"; - let rocksdb = open_cf(store_path, None, &[INDICES_CF]).unwrap(); - let indices_map = reopen!(&rocksdb, INDICES_CF;); + const STATE_CF: &str = "test_state"; + let rocksdb = open_cf(store_path, None, &[STATE_CF]).unwrap(); + let map = reopen!(&rocksdb, STATE_CF;); Self { - indices_store: Store::new(indices_map), + store: Store::new(map), } } - /// Load the execution indices. + /// Load the execution indices; ie. the state. pub async fn get_execution_indices(&self) -> ExecutionIndices { self.load_execution_indices().await.unwrap() } } -#[derive(Debug, Error, Clone)] +#[derive(Debug, Error)] pub enum TestStateError { #[error("Something went wrong in the authority")] ServerError, diff --git a/executor/src/tests/executor_tests.rs b/executor/src/tests/executor_tests.rs index f89e4cb6f..c8c60515d 100644 --- a/executor/src/tests/executor_tests.rs +++ b/executor/src/tests/executor_tests.rs @@ -23,11 +23,12 @@ async fn execute_transactions() { // Spawn the executor. let store = test_store(); let execution_state = Arc::new(TestState::default()); - let _core_handle = Core::spawn( + let _core_handle = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed certificates to the mock sequencer and add the transaction data to storage (as if @@ -69,19 +70,20 @@ async fn execute_empty_certificate() { // Spawn the executor. let store = test_store(); let execution_state = Arc::new(TestState::default()); - let _core_handle = Core::spawn( + let _core_handle = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed empty certificates to the executor. let empty_certificates = 2; - for i in 0..empty_certificates { + for _ in 0..empty_certificates { let message = ConsensusOutput { certificate: Certificate::default(), - consensus_index: i, + consensus_index: SequenceNumber::default(), }; tx_executor.send(message).await.unwrap(); } @@ -124,11 +126,12 @@ async fn execute_malformed_transactions() { // Spawn the executor. let store = test_store(); let execution_state = Arc::new(TestState::default()); - let _core_handle = Core::spawn( + let _core_handle = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed a malformed transaction to the mock sequencer @@ -185,18 +188,19 @@ async fn internal_error_execution() { // Spawn the executor. let store = test_store(); let execution_state = Arc::new(TestState::default()); - let _core_hanlde = Core::spawn( + let _core_hanlde = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed a 'killer' transaction to the executor. This is a special test transaction that // crashes the test executor engine. - let tx00 = 10u64; - let tx01 = 11u64; - let tx10 = 12u64; + let tx00 = 10; + let tx01 = 11; + let tx10 = 12; let tx11 = KILLER_TRANSACTION; let (digest_0, batch_0) = test_batch(vec![tx00, tx01]); @@ -236,11 +240,12 @@ async fn crash_recovery() { // Spawn the executor. let store = test_store(); let execution_state = Arc::new(TestState::default()); - let _core_handle = Core::spawn( + let _core_handle = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed two certificates with good transactions to the executor. @@ -290,11 +295,12 @@ async fn crash_recovery() { let (tx_output, mut rx_output) = channel(10); let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification); - let _core_handle = Core::spawn( + let _core_handle = Core::::spawn( store.clone(), - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), rx_reconfigure, /* rx_subscriber */ rx_executor, + tx_output, ); // Feed two certificates with good transactions to the executor. diff --git a/executor/tests/consensus_integration_tests.rs b/executor/tests/consensus_integration_tests.rs index 35c113d61..b57981f88 100644 --- a/executor/tests/consensus_integration_tests.rs +++ b/executor/tests/consensus_integration_tests.rs @@ -52,6 +52,7 @@ async fn test_internal_consensus_output() { assert!(result.0.is_ok()); + // deserialise transaction let output_transaction = bincode::deserialize::(&result.1).unwrap(); // we always remove the first transaction and check with the one diff --git a/node/src/execution_state.rs b/node/src/execution_state.rs index 1131f66a5..ebf2a3459 100644 --- a/node/src/execution_state.rs +++ b/node/src/execution_state.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; use consensus::ConsensusOutput; -use executor::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState}; +use executor::{ExecutionIndices, ExecutionState, ExecutionStateError}; use thiserror::Error; /// A simple/dumb execution engine. @@ -10,18 +10,8 @@ pub struct SimpleExecutionState; #[async_trait] impl ExecutionState for SimpleExecutionState { - type Error = SimpleExecutionError; - - fn ask_consensus_write_lock(&self) -> bool { - true - } - - fn release_consensus_write_lock(&self) {} -} - -#[async_trait] -impl SingleExecutionState for SimpleExecutionState { type Transaction = String; + type Error = SimpleExecutionError; type Outcome = Vec; async fn handle_consensus_transaction( @@ -33,6 +23,12 @@ impl SingleExecutionState for SimpleExecutionState { Ok(Vec::default()) } + fn ask_consensus_write_lock(&self) -> bool { + true + } + + fn release_consensus_write_lock(&self) {} + async fn load_execution_indices(&self) -> Result { Ok(ExecutionIndices::default()) } @@ -45,7 +41,7 @@ impl Default for SimpleExecutionState { } /// A simple/dumb execution error. -#[derive(Debug, Error, Clone)] +#[derive(Debug, Error)] pub enum SimpleExecutionError { #[error("Something went wrong in the authority")] ServerError, diff --git a/node/src/lib.rs b/node/src/lib.rs index 564e2efc3..799dd321f 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -7,9 +7,8 @@ use consensus::{ metrics::{ChannelMetrics, ConsensusMetrics}, Consensus, }; -use crypto::KeyPair; -use crypto::PublicKey; -use executor::{BatchExecutionState, Executor, SubscriberResult}; +use crypto::{KeyPair, PublicKey}; +use executor::{ExecutionState, Executor, ExecutorOutput, SerializedTransaction, SubscriberResult}; use fastcrypto::traits::{KeyPair as _, VerifyingKey}; use primary::{BlockCommand, NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics}; use prometheus::{IntGauge, Registry}; @@ -20,7 +19,10 @@ use store::{ rocks::{open_cf, DBMap}, Store, }; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{ + sync::{mpsc::Sender, watch}, + task::JoinHandle, +}; use tracing::debug; use types::{ metered_channel, Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header, @@ -136,11 +138,14 @@ impl Node { internal_consensus: bool, // The state used by the client to execute transactions. execution_state: Arc, + // A channel to output transactions execution confirmations. + tx_confirmation: Sender>, // A prometheus exporter Registry to use for the metrics registry: &Registry, ) -> SubscriberResult>> where - State: BatchExecutionState + Send + Sync + 'static, + State: ExecutionState + Send + Sync + 'static, + State::Outcome: Send + 'static, State::Error: Debug, { let initial_committee = ReconfigureNotification::NewEpoch((**committee.load()).clone()); @@ -193,6 +198,7 @@ impl Node { &tx_reconfigure, rx_new_certificates, tx_consensus.clone(), + tx_confirmation, tx_get_block_commands.clone(), registry, ) @@ -267,12 +273,17 @@ impl Node { tx_reconfigure: &watch::Sender, rx_new_certificates: metered_channel::Receiver, tx_feedback: metered_channel::Sender, + tx_confirmation: Sender<( + SubscriberResult<::Outcome>, + SerializedTransaction, + )>, tx_get_block_commands: metered_channel::Sender, registry: &Registry, ) -> SubscriberResult>> where PublicKey: VerifyingKey, - State: BatchExecutionState + Send + Sync + 'static, + State: ExecutionState + Send + Sync + 'static, + State::Outcome: Send + 'static, State::Error: Debug, { let consensus_metrics = Arc::new(ConsensusMetrics::new(registry)); @@ -307,6 +318,7 @@ impl Node { execution_state, tx_reconfigure, /* rx_consensus */ rx_sequence, + /* tx_output */ tx_confirmation, tx_get_block_commands, registry, ) diff --git a/node/src/main.rs b/node/src/main.rs index 0027a9229..73a7a6e1c 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -12,7 +12,7 @@ use arc_swap::ArcSwap; use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand}; use config::{Committee, Import, Parameters, WorkerCache, WorkerId}; use crypto::KeyPair; -use executor::{SerializedTransaction, SingleExecutor, SubscriberResult}; +use executor::{SerializedTransaction, SubscriberResult}; use eyre::Context; use fastcrypto::{generate_production_keypair, traits::KeyPair as _}; use futures::future::join_all; @@ -174,11 +174,8 @@ async fn run(matches: &ArgMatches<'_>) -> Result<(), eyre::Report> { &store, parameters.clone(), /* consensus */ !sub_matches.is_present("consensus-disabled"), - /* execution_state */ - SingleExecutor::new( - Arc::new(SimpleExecutionState::default()), - tx_transaction_confirmation, - ), + /* execution_state */ Arc::new(SimpleExecutionState::default()), + tx_transaction_confirmation, ®istry, ) .await? diff --git a/node/src/restarter.rs b/node/src/restarter.rs index 3ceeaffa7..a513b98cf 100644 --- a/node/src/restarter.rs +++ b/node/src/restarter.rs @@ -4,13 +4,13 @@ use crate::{Node, NodeStorage}; use arc_swap::ArcSwap; use config::{Committee, Parameters, SharedWorkerCache}; use crypto::KeyPair; -use executor::BatchExecutionState; +use executor::{ExecutionState, ExecutorOutput}; use fastcrypto::traits::KeyPair as _; use futures::future::join_all; use network::{PrimaryToWorkerNetwork, ReliableNetwork, UnreliableNetwork, WorkerToPrimaryNetwork}; use prometheus::Registry; use std::{fmt::Debug, path::PathBuf, sync::Arc}; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{Receiver, Sender}; use types::{PrimaryWorkerMessage, ReconfigureNotification, WorkerPrimaryMessage}; // Module to start a node (primary, workers and default consensus), keep it running, and restarting it @@ -26,9 +26,11 @@ impl NodeRestarter { execution_state: Arc, parameters: Parameters, mut rx_reconfigure: Receiver<(KeyPair, Committee)>, + tx_output: Sender>, registry: &Registry, ) where - State: BatchExecutionState + Send + Sync + 'static, + State: ExecutionState + Send + Sync + 'static, + State::Outcome: Send + 'static, State::Error: Debug, { let mut keypair = keypair; @@ -57,6 +59,7 @@ impl NodeRestarter { parameters.clone(), /* consensus */ true, execution_state.clone(), + tx_output.clone(), registry, ) .await diff --git a/node/tests/reconfigure.rs b/node/tests/reconfigure.rs index 9622acc6a..5c27e137e 100644 --- a/node/tests/reconfigure.rs +++ b/node/tests/reconfigure.rs @@ -4,11 +4,8 @@ use arc_swap::ArcSwap; use bytes::Bytes; use config::{Committee, Parameters, SharedWorkerCache}; use consensus::ConsensusOutput; -use crypto::KeyPair; -use crypto::PublicKey; -use executor::{ - ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState, SingleExecutor, -}; +use crypto::{KeyPair, PublicKey}; +use executor::{ExecutionIndices, ExecutionState, ExecutionStateError}; use fastcrypto::traits::KeyPair as _; use futures::future::join_all; use network::{PrimaryToWorkerNetwork, ReliableNetwork, UnreliableNetwork, WorkerToPrimaryNetwork}; @@ -49,18 +46,8 @@ impl SimpleExecutionState { #[async_trait::async_trait] impl ExecutionState for SimpleExecutionState { - type Error = SimpleExecutionError; - - fn ask_consensus_write_lock(&self) -> bool { - true - } - - fn release_consensus_write_lock(&self) {} -} - -#[async_trait::async_trait] -impl SingleExecutionState for SimpleExecutionState { type Transaction = u64; + type Error = SimpleExecutionError; type Outcome = u64; async fn handle_consensus_transaction( @@ -97,13 +84,19 @@ impl SingleExecutionState for SimpleExecutionState { Ok(epoch) } + fn ask_consensus_write_lock(&self) -> bool { + true + } + + fn release_consensus_write_lock(&self) {} + async fn load_execution_indices(&self) -> Result { Ok(ExecutionIndices::default()) } } /// A simple/dumb execution error. -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum SimpleExecutionError { #[error("Something went wrong in the authority")] ServerError, @@ -200,9 +193,10 @@ async fn restart() { &committee, worker_cache, /* base_store_path */ test_utils::temp_dir(), - SingleExecutor::new(execution_state, tx_output), + execution_state, parameters, rx_node_reconfigure, + tx_output, &Registry::new(), ) .await; @@ -322,7 +316,8 @@ async fn epoch_change() { &store, parameters.clone(), /* consensus */ true, - SingleExecutor::new(execution_state.clone(), tx_output), + execution_state.clone(), + tx_output, &Registry::new(), ) .await diff --git a/primary/src/aggregators.rs b/primary/src/aggregators.rs index cc230e4d0..6a7ccbd74 100644 --- a/primary/src/aggregators.rs +++ b/primary/src/aggregators.rs @@ -3,8 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use config::{Committee, Stake}; -use crypto::PublicKey; -use crypto::Signature; +use crypto::{PublicKey, Signature}; use fastcrypto::traits::EncodeDecodeBase64; use std::collections::HashSet; use types::{ diff --git a/primary/src/core.rs b/primary/src/core.rs index 54d204ac7..6e9715b9d 100644 --- a/primary/src/core.rs +++ b/primary/src/core.rs @@ -9,8 +9,7 @@ use crate::{ }; use async_recursion::async_recursion; use config::{Committee, Epoch, SharedWorkerCache}; -use crypto::PublicKey; -use crypto::Signature; +use crypto::{PublicKey, Signature}; use fastcrypto::{Hash as _, SignatureService}; use network::{CancelOnDropHandler, MessageResult, PrimaryNetwork, ReliableNetwork}; use std::{ diff --git a/primary/src/primary.rs b/primary/src/primary.rs index 7cb04f6d3..b7f4b83c9 100644 --- a/primary/src/primary.rs +++ b/primary/src/primary.rs @@ -21,8 +21,7 @@ use crate::{ use async_trait::async_trait; use config::{Parameters, SharedCommittee, SharedWorkerCache, WorkerId, WorkerInfo}; use consensus::dag::Dag; -use crypto::PublicKey; -use crypto::Signature; +use crypto::{PublicKey, Signature}; use fastcrypto::{ traits::{EncodeDecodeBase64, Signer}, SignatureService, diff --git a/primary/src/proposer.rs b/primary/src/proposer.rs index 125afdf67..145bdf8ec 100644 --- a/primary/src/proposer.rs +++ b/primary/src/proposer.rs @@ -3,8 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{metrics::PrimaryMetrics, NetworkModel}; use config::{Committee, Epoch, WorkerId}; -use crypto::PublicKey; -use crypto::Signature; +use crypto::{PublicKey, Signature}; use fastcrypto::{Digest, Hash as _, SignatureService}; use std::{cmp::Ordering, sync::Arc}; use tokio::{ diff --git a/primary/src/state_handler.rs b/primary/src/state_handler.rs index dac7c0a30..353063ad7 100644 --- a/primary/src/state_handler.rs +++ b/primary/src/state_handler.rs @@ -5,8 +5,7 @@ use crate::primary::PrimaryWorkerMessage; use config::{SharedCommittee, SharedWorkerCache, WorkerCache, WorkerIndex}; use crypto::PublicKey; use network::{PrimaryToWorkerNetwork, UnreliableNetwork}; -use std::collections::BTreeMap; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use tap::TapOptional; use tokio::{sync::watch, task::JoinHandle}; use tracing::{info, warn}; diff --git a/primary/src/tests/payload_receiver_tests.rs b/primary/src/tests/payload_receiver_tests.rs index 75f313b42..814a7458b 100644 --- a/primary/src/tests/payload_receiver_tests.rs +++ b/primary/src/tests/payload_receiver_tests.rs @@ -1,7 +1,6 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::common::create_db_stores; -use crate::payload_receiver::PayloadReceiver; +use crate::{common::create_db_stores, payload_receiver::PayloadReceiver}; use types::BatchDigest; #[tokio::test] diff --git a/primary/tests/integration_tests_validator_api.rs b/primary/tests/integration_tests_validator_api.rs index 8dacb6977..8bbe2bb5a 100644 --- a/primary/tests/integration_tests_validator_api.rs +++ b/primary/tests/integration_tests_validator_api.rs @@ -3,8 +3,7 @@ use arc_swap::ArcSwap; // SPDX-License-Identifier: Apache-2.0 use config::{Parameters, WorkerId}; use consensus::{dag::Dag, metrics::ConsensusMetrics}; -use crypto::KeyPair; -use crypto::PublicKey; +use crypto::{KeyPair, PublicKey}; use fastcrypto::{traits::KeyPair as _, Hash}; use indexmap::IndexMap; use network::metrics::WorkerNetworkMetrics; diff --git a/storage/src/certificate_store.rs b/storage/src/certificate_store.rs index f75757345..ac25bf606 100644 --- a/storage/src/certificate_store.rs +++ b/storage/src/certificate_store.rs @@ -2,13 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use dashmap::DashMap; use fastcrypto::Hash; -use std::sync::Arc; -use std::{collections::VecDeque, iter}; -use store::rocks::DBMap; -use store::rocks::TypedStoreError::RocksDBError; -use store::Map; -use tokio::sync::oneshot; -use tokio::sync::oneshot::Sender; +use std::{collections::VecDeque, iter, sync::Arc}; +use store::{ + rocks::{DBMap, TypedStoreError::RocksDBError}, + Map, +}; +use tokio::sync::{oneshot, oneshot::Sender}; use tracing::warn; use types::{Certificate, CertificateDigest, Round, StoreResult}; @@ -318,13 +317,15 @@ mod test { use crate::certificate_store::{CertificateStore, CertificateToken}; use fastcrypto::Hash; use futures::future::join_all; - use std::collections::{BTreeSet, HashSet}; - use std::time::Instant; - use store::reopen; - use store::rocks::open_cf; - use store::rocks::DBMap; - use test_utils::temp_dir; - use test_utils::{certificate, committee, fixture_headers_round}; + use std::{ + collections::{BTreeSet, HashSet}, + time::Instant, + }; + use store::{ + reopen, + rocks::{open_cf, DBMap}, + }; + use test_utils::{certificate, committee, fixture_headers_round, temp_dir}; use types::{Certificate, CertificateDigest, Round}; fn new_store(path: std::path::PathBuf) -> CertificateStore { diff --git a/test_utils/src/cluster.rs b/test_utils/src/cluster.rs index d904e7c4f..8b2cbf342 100644 --- a/test_utils/src/cluster.rs +++ b/test_utils/src/cluster.rs @@ -3,9 +3,8 @@ use crate::{keys, pure_committee_from_keys, shared_worker_cache_from_keys, temp_dir}; use arc_swap::ArcSwap; use config::{Committee, Parameters, SharedCommittee, SharedWorkerCache, WorkerId}; -use crypto::KeyPair; -use crypto::PublicKey; -use executor::{SerializedTransaction, SingleExecutor, SubscriberResult}; +use crypto::{KeyPair, PublicKey}; +use executor::{SerializedTransaction, SubscriberResult}; use fastcrypto::traits::KeyPair as _; use itertools::Itertools; use multiaddr::Multiaddr; @@ -353,11 +352,8 @@ impl PrimaryNodeDetails { &primary_store, self.parameters.clone(), /* consensus */ self.internal_consensus_enabled, - /* execution_state */ - SingleExecutor::new( - Arc::new(SimpleExecutionState::default()), - tx_transaction_confirmation, - ), + /* execution_state */ Arc::new(SimpleExecutionState::default()), + tx_transaction_confirmation, ®istry, ) .await diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index ab374be75..6601b855c 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -5,8 +5,7 @@ use config::{ utils::get_available_port, Authority, Committee, Epoch, PrimaryAddresses, SharedWorkerCache, WorkerCache, WorkerId, WorkerIndex, WorkerInfo, }; -use crypto::PublicKey; -use crypto::{KeyPair, Signature}; +use crypto::{KeyPair, PublicKey, Signature}; use fastcrypto::{ traits::{KeyPair as _, Signer as _}, Digest, Hash as _, diff --git a/types/src/primary.rs b/types/src/primary.rs index 9060fb6c6..c7b310b66 100644 --- a/types/src/primary.rs +++ b/types/src/primary.rs @@ -8,8 +8,7 @@ use crate::{ use blake2::{digest::Update, VarBlake2b}; use bytes::Bytes; use config::{Committee, Epoch, SharedWorkerCache, WorkerId, WorkerInfo}; -use crypto::PublicKey; -use crypto::Signature; +use crypto::{PublicKey, Signature}; use dag::node_dag::Affiliated; use derive_builder::Builder; use fastcrypto::{ diff --git a/worker/src/synchronizer.rs b/worker/src/synchronizer.rs index 874475f43..22c686617 100644 --- a/worker/src/synchronizer.rs +++ b/worker/src/synchronizer.rs @@ -7,23 +7,21 @@ use crypto::PublicKey; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _}; use network::{LuckyNetwork, UnreliableNetwork, WorkerNetwork}; use primary::PrimaryWorkerMessage; -use std::collections::HashSet; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use store::{Store, StoreError}; -use tap::TapFallible; -use tap::TapOptional; +use tap::{TapFallible, TapOptional}; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, time::{sleep, Duration, Instant}, }; use tracing::{debug, error, trace, warn}; -use types::error::DagError; use types::{ + error::DagError, metered_channel::{Receiver, Sender}, BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage, WorkerPrimaryError, WorkerPrimaryMessage, diff --git a/worker/src/tests/synchronizer_tests.rs b/worker/src/tests/synchronizer_tests.rs index c2025f19e..6866922f7 100644 --- a/worker/src/tests/synchronizer_tests.rs +++ b/worker/src/tests/synchronizer_tests.rs @@ -5,9 +5,8 @@ use super::*; use arc_swap::ArcSwap; use fastcrypto::traits::KeyPair; use prometheus::Registry; -use test_utils::committee; use test_utils::{ - batch, batch_digest, batches, keys, open_batch_store, pure_committee_from_keys, + batch, batch_digest, batches, committee, keys, open_batch_store, pure_committee_from_keys, resolve_name_committee_and_worker_cache, serialize_batch_message, shared_worker_cache_from_keys, WorkerToWorkerMockServer, };