Skip to content

Commit

Permalink
Batch execution with single execution adapter (MystenLabs#818)
Browse files Browse the repository at this point in the history
Batch execution with single execution adapter
  • Loading branch information
aakoshh authored Aug 19, 2022
1 parent f4cab79 commit 0305170
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 213 deletions.
300 changes: 204 additions & 96 deletions executor/src/core.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl From<Box<bincode::ErrorKind>> for SubscriberError {
}
}

/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// errors caused by a fault in the authority.
pub trait ExecutionStateError: std::error::Error {
/// Whether the error is due to a fault in the authority (eg. internal storage error).
Expand Down
69 changes: 47 additions & 22 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use prometheus::Registry;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use store::Store;
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tokio::{sync::watch, task::JoinHandle};
use tracing::info;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};

// Re-export SingleExecutor as a convenience adapter.
pub use crate::core::SingleExecutor;

/// Default inter-task channel size.
pub const DEFAULT_CHANNEL_SIZE: usize = 1_000;
Expand All @@ -44,12 +44,47 @@ pub type SerializedTransactionDigest = u64;

#[async_trait]
pub trait ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);
}

/// Execution state that gets whole certificates and the corresponding batches
/// for execution. It is responsible for deduplication in case the same certificate
/// is re-delivered after a crash.
#[async_trait]
pub trait BatchExecutionState: ExecutionState {
/// Load the last consensus index from storage.
///
/// It should return the index from which it expects a replay, so one higher than
/// the last certificate index it successfully committed. This is so it has the
/// same semantics as `ExecutionIndices`.
async fn load_next_certificate_index(&self) -> Result<SequenceNumber, Self::Error>;

/// Execute the transactions and atomically persist the consensus index.
///
/// TODO: This function should be allowed to return a new committee to reconfigure the system.
async fn handle_consensus(
&self,
consensus_output: &ConsensusOutput,
transaction_batches: Vec<Vec<SerializedTransaction>>,
) -> Result<(), Self::Error>;
}

/// Execution state that executes a single transaction at a time.
#[async_trait]
pub trait SingleExecutionState: ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The execution outcome to output.
type Outcome;

Expand All @@ -63,22 +98,15 @@ pub trait ExecutionState {
transaction: Self::Transaction,
) -> Result<Self::Outcome, Self::Error>;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);

/// Load the last consensus index from storage.
///
/// It *must* return index that was last passed to `handle_consensus_transaction`.
async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error>;
}

/// The output of the executor.
pub type ExecutorOutput<State> = (
SubscriberResult<<State as ExecutionState>::Outcome>,
SubscriberResult<<State as SingleExecutionState>::Outcome>,
SerializedTransaction,
);

Expand All @@ -92,13 +120,11 @@ impl Executor {
execution_state: Arc<State>,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State: BatchExecutionState + Send + Sync + 'static,
State::Error: Debug,
{
let metrics = ExecutorMetrics::new(registry);
Expand Down Expand Up @@ -131,7 +157,6 @@ impl Executor {
execution_state,
tx_reconfigure.subscribe(),
/* rx_subscriber */ rx_executor,
tx_output,
);

// Return the handle.
Expand Down
42 changes: 23 additions & 19 deletions executor/src/tests/execution_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError};
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use async_trait::async_trait;
use consensus::ConsensusOutput;

Expand All @@ -14,14 +14,14 @@ use store::{
use thiserror::Error;

/// A malformed transaction.
pub const MALFORMED_TRANSACTION: <TestState as ExecutionState>::Transaction = 400;
pub const MALFORMED_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 400;

/// A special transaction that makes the executor engine crash.
pub const KILLER_TRANSACTION: <TestState as ExecutionState>::Transaction = 500;
pub const KILLER_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 500;

/// A dumb execution state for testing.
pub struct TestState {
store: Store<u64, ExecutionIndices>,
indices_store: Store<u64, ExecutionIndices>,
}

impl std::fmt::Debug for TestState {
Expand All @@ -38,8 +38,18 @@ impl Default for TestState {

#[async_trait]
impl ExecutionState for TestState {
type Transaction = u64;
type Error = TestStateError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for TestState {
type Transaction = u64;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -53,22 +63,16 @@ impl ExecutionState for TestState {
} else if transaction == KILLER_TRANSACTION {
Err(Self::Error::ServerError)
} else {
self.store
self.indices_store
.write(Self::INDICES_ADDRESS, execution_indices)
.await;
Ok(Vec::default())
}
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
let indices = self
.store
.indices_store
.read(Self::INDICES_ADDRESS)
.await
.unwrap()
Expand All @@ -83,21 +87,21 @@ impl TestState {

/// Create a new test state.
pub fn new(store_path: &Path) -> Self {
const STATE_CF: &str = "test_state";
let rocksdb = open_cf(store_path, None, &[STATE_CF]).unwrap();
let map = reopen!(&rocksdb, STATE_CF;<u64, ExecutionIndices>);
const INDICES_CF: &str = "test_state_indices";
let rocksdb = open_cf(store_path, None, &[INDICES_CF]).unwrap();
let indices_map = reopen!(&rocksdb, INDICES_CF;<u64, ExecutionIndices>);
Self {
store: Store::new(map),
indices_store: Store::new(indices_map),
}
}

/// Load the execution indices; ie. the state.
/// Load the execution indices.
pub async fn get_execution_indices(&self) -> ExecutionIndices {
self.load_execution_indices().await.unwrap()
}
}

#[derive(Debug, Error)]
#[derive(Debug, Error, Clone)]
pub enum TestStateError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
40 changes: 17 additions & 23 deletions executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ async fn execute_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed certificates to the mock sequencer and add the transaction data to storage (as if
Expand Down Expand Up @@ -70,20 +69,19 @@ async fn execute_empty_certificate() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed empty certificates to the executor.
let empty_certificates = 2;
for _ in 0..empty_certificates {
for i in 0..empty_certificates {
let message = ConsensusOutput {
certificate: Certificate::default(),
consensus_index: SequenceNumber::default(),
consensus_index: i,
};
tx_executor.send(message).await.unwrap();
}
Expand Down Expand Up @@ -126,12 +124,11 @@ async fn execute_malformed_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a malformed transaction to the mock sequencer
Expand Down Expand Up @@ -188,19 +185,18 @@ async fn internal_error_execution() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_hanlde = Core::<TestState>::spawn(
let _core_hanlde = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a 'killer' transaction to the executor. This is a special test transaction that
// crashes the test executor engine.
let tx00 = 10;
let tx01 = 11;
let tx10 = 12;
let tx00 = 10u64;
let tx01 = 11u64;
let tx10 = 12u64;
let tx11 = KILLER_TRANSACTION;

let (digest_0, batch_0) = test_batch(vec![tx00, tx01]);
Expand Down Expand Up @@ -240,12 +236,11 @@ async fn crash_recovery() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down Expand Up @@ -295,12 +290,11 @@ async fn crash_recovery() {
let (tx_output, mut rx_output) = channel(10);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification);

let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down
1 change: 0 additions & 1 deletion executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ async fn test_internal_consensus_output() {

assert!(result.0.is_ok());

// deserialise transaction
let output_transaction = bincode::deserialize::<String>(&result.1).unwrap();

// we always remove the first transaction and check with the one
Expand Down
22 changes: 13 additions & 9 deletions node/src/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use consensus::ConsensusOutput;
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError};
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use thiserror::Error;

/// A simple/dumb execution engine.
pub struct SimpleExecutionState;

#[async_trait]
impl ExecutionState for SimpleExecutionState {
type Transaction = String;
type Error = SimpleExecutionError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for SimpleExecutionState {
type Transaction = String;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -23,12 +33,6 @@ impl ExecutionState for SimpleExecutionState {
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
Ok(ExecutionIndices::default())
}
Expand All @@ -41,7 +45,7 @@ impl Default for SimpleExecutionState {
}

/// A simple/dumb execution error.
#[derive(Debug, Error)]
#[derive(Debug, Error, Clone)]
pub enum SimpleExecutionError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
Loading

0 comments on commit 0305170

Please sign in to comment.