Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Revert 818 + increase the batch timeout #859

Merged
merged 2 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crypto/src/tests/bls12377_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use crate::bls12377::{
BLS12377AggregateSignature, BLS12377KeyPair, BLS12377PrivateKey, BLS12377PublicKey,
BLS12377PublicKeyBytes, BLS12377Signature,
};
use fastcrypto::traits::{
AggregateAuthenticator, EncodeDecodeBase64, KeyPair, ToFromBytes, VerifyingKey,
use fastcrypto::{
traits::{AggregateAuthenticator, EncodeDecodeBase64, KeyPair, ToFromBytes, VerifyingKey},
Hash, SignatureService,
};
use fastcrypto::{Hash, SignatureService};

use crate::bls12377::CELO_BLS_PRIVATE_KEY_LENGTH;
use rand::{rngs::StdRng, SeedableRng as _};
Expand Down
300 changes: 96 additions & 204 deletions executor/src/core.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl From<Box<bincode::ErrorKind>> for SubscriberError {
}
}

/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// errors caused by a fault in the authority.
pub trait ExecutionStateError: std::error::Error {
/// Whether the error is due to a fault in the authority (eg. internal storage error).
Expand Down
69 changes: 22 additions & 47 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use prometheus::Registry;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use store::Store;
use tokio::{sync::watch, task::JoinHandle};
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::info;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};

// Re-export SingleExecutor as a convenience adapter.
pub use crate::core::SingleExecutor;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};

/// Convenience type representing a serialized transaction.
pub type SerializedTransaction = Vec<u8>;
Expand All @@ -41,47 +41,12 @@ pub type SerializedTransactionDigest = u64;

#[async_trait]
pub trait ExecutionState {
/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);
}

/// Execution state that gets whole certificates and the corresponding batches
/// for execution. It is responsible for deduplication in case the same certificate
/// is re-delivered after a crash.
#[async_trait]
pub trait BatchExecutionState: ExecutionState {
/// Load the last consensus index from storage.
///
/// It should return the index from which it expects a replay, so one higher than
/// the last certificate index it successfully committed. This is so it has the
/// same semantics as `ExecutionIndices`.
async fn load_next_certificate_index(&self) -> Result<SequenceNumber, Self::Error>;

/// Execute the transactions and atomically persist the consensus index.
///
/// TODO: This function should be allowed to return a new committee to reconfigure the system.
async fn handle_consensus(
&self,
consensus_output: &ConsensusOutput,
transaction_batches: Vec<Vec<SerializedTransaction>>,
) -> Result<(), Self::Error>;
}

/// Execution state that executes a single transaction at a time.
#[async_trait]
pub trait SingleExecutionState: ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// The execution outcome to output.
type Outcome;

Expand All @@ -95,15 +60,22 @@ pub trait SingleExecutionState: ExecutionState {
transaction: Self::Transaction,
) -> Result<Self::Outcome, Self::Error>;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);

/// Load the last consensus index from storage.
///
/// It *must* return index that was last passed to `handle_consensus_transaction`.
async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error>;
}

/// The output of the executor.
pub type ExecutorOutput<State> = (
SubscriberResult<<State as SingleExecutionState>::Outcome>,
SubscriberResult<<State as ExecutionState>::Outcome>,
SerializedTransaction,
);

Expand All @@ -117,11 +89,13 @@ impl Executor {
execution_state: Arc<State>,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: BatchExecutionState + Send + Sync + 'static,
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State::Error: Debug,
{
let metrics = ExecutorMetrics::new(registry);
Expand Down Expand Up @@ -154,6 +128,7 @@ impl Executor {
execution_state,
tx_reconfigure.subscribe(),
/* rx_subscriber */ rx_executor,
tx_output,
);

// Return the handle.
Expand Down
42 changes: 19 additions & 23 deletions executor/src/tests/execution_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError};
use async_trait::async_trait;
use consensus::ConsensusOutput;

Expand All @@ -14,14 +14,14 @@ use store::{
use thiserror::Error;

/// A malformed transaction.
pub const MALFORMED_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 400;
pub const MALFORMED_TRANSACTION: <TestState as ExecutionState>::Transaction = 400;

/// A special transaction that makes the executor engine crash.
pub const KILLER_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 500;
pub const KILLER_TRANSACTION: <TestState as ExecutionState>::Transaction = 500;

/// A dumb execution state for testing.
pub struct TestState {
indices_store: Store<u64, ExecutionIndices>,
store: Store<u64, ExecutionIndices>,
}

impl std::fmt::Debug for TestState {
Expand All @@ -38,18 +38,8 @@ impl Default for TestState {

#[async_trait]
impl ExecutionState for TestState {
type Error = TestStateError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for TestState {
type Transaction = u64;
type Error = TestStateError;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -63,16 +53,22 @@ impl SingleExecutionState for TestState {
} else if transaction == KILLER_TRANSACTION {
Err(Self::Error::ServerError)
} else {
self.indices_store
self.store
.write(Self::INDICES_ADDRESS, execution_indices)
.await;
Ok(Vec::default())
}
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
let indices = self
.indices_store
.store
.read(Self::INDICES_ADDRESS)
.await
.unwrap()
Expand All @@ -87,21 +83,21 @@ impl TestState {

/// Create a new test state.
pub fn new(store_path: &Path) -> Self {
const INDICES_CF: &str = "test_state_indices";
let rocksdb = open_cf(store_path, None, &[INDICES_CF]).unwrap();
let indices_map = reopen!(&rocksdb, INDICES_CF;<u64, ExecutionIndices>);
const STATE_CF: &str = "test_state";
let rocksdb = open_cf(store_path, None, &[STATE_CF]).unwrap();
let map = reopen!(&rocksdb, STATE_CF;<u64, ExecutionIndices>);
Self {
indices_store: Store::new(indices_map),
store: Store::new(map),
}
}

/// Load the execution indices.
/// Load the execution indices; ie. the state.
pub async fn get_execution_indices(&self) -> ExecutionIndices {
self.load_execution_indices().await.unwrap()
}
}

#[derive(Debug, Error, Clone)]
#[derive(Debug, Error)]
pub enum TestStateError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
40 changes: 23 additions & 17 deletions executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ async fn execute_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed certificates to the mock sequencer and add the transaction data to storage (as if
Expand Down Expand Up @@ -69,19 +70,20 @@ async fn execute_empty_certificate() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed empty certificates to the executor.
let empty_certificates = 2;
for i in 0..empty_certificates {
for _ in 0..empty_certificates {
let message = ConsensusOutput {
certificate: Certificate::default(),
consensus_index: i,
consensus_index: SequenceNumber::default(),
};
tx_executor.send(message).await.unwrap();
}
Expand Down Expand Up @@ -124,11 +126,12 @@ async fn execute_malformed_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a malformed transaction to the mock sequencer
Expand Down Expand Up @@ -185,18 +188,19 @@ async fn internal_error_execution() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_hanlde = Core::spawn(
let _core_hanlde = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a 'killer' transaction to the executor. This is a special test transaction that
// crashes the test executor engine.
let tx00 = 10u64;
let tx01 = 11u64;
let tx10 = 12u64;
let tx00 = 10;
let tx01 = 11;
let tx10 = 12;
let tx11 = KILLER_TRANSACTION;

let (digest_0, batch_0) = test_batch(vec![tx00, tx01]);
Expand Down Expand Up @@ -236,11 +240,12 @@ async fn crash_recovery() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down Expand Up @@ -290,11 +295,12 @@ async fn crash_recovery() {
let (tx_output, mut rx_output) = channel(10);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification);

let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down
1 change: 1 addition & 0 deletions executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn test_internal_consensus_output() {

assert!(result.0.is_ok());

// deserialise transaction
let output_transaction = bincode::deserialize::<String>(&result.1).unwrap();

// we always remove the first transaction and check with the one
Expand Down
Loading