Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
Revert "refactor executor temporary batch store. Clean up storage aft…
Browse files Browse the repository at this point in the history
…er execution. (#842)"

This reverts commit 842a611.
  • Loading branch information
huitseeker committed Sep 7, 2022
1 parent 35b9bdd commit c7b1b63
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 128 deletions.
38 changes: 7 additions & 31 deletions executor/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ use crate::{
ExecutionState, ExecutorOutput, SerializedTransaction,
};
use consensus::ConsensusOutput;
use fastcrypto::Hash;
use std::{fmt::Debug, sync::Arc};
use store::{rocks::TypedStoreError, Store};
use store::Store;
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::debug;
use types::{
metered_channel, Batch, BatchDigest, CertificateDigest, ReconfigureNotification, SequenceNumber,
};
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};

#[cfg(test)]
#[path = "tests/executor_tests.rs"]
Expand All @@ -29,7 +26,7 @@ pub mod executor_tests;
/// not processes twice the same transaction (despite crash-recovery).
pub struct Core<State: ExecutionState> {
/// The temporary storage holding all transactions' data (that may be too big to hold in memory).
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
/// The (global) state to perform execution.
execution_state: Arc<State>,
/// Receive reconfiguration updates.
Expand Down Expand Up @@ -57,7 +54,7 @@ where
/// Spawn a new executor in a dedicated tokio task.
#[must_use]
pub fn spawn(
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
execution_state: Arc<State>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_subscriber: metered_channel::Receiver<ConsensusOutput>,
Expand Down Expand Up @@ -92,7 +89,7 @@ where
self.execute_certificate(&message).await?;

// Cleanup the temporary persistent storage.
self.cleanup_store(&message).await.map_err(SubscriberError::from)?;
// TODO [issue #191]: Security cleanup the store.
},

// Check whether the committee changed.
Expand All @@ -107,24 +104,6 @@ where
}
}

/// Cleans up the temporary batch store for the batches stored
/// for the specified certificate. We are storing the batches per
/// certificate as bathes of same id can be referenced by multiple
/// certificates.
async fn cleanup_store(&self, message: &ConsensusOutput) -> Result<(), TypedStoreError> {
let certificate_id = message.certificate.digest();

let to_delete_keys = message
.certificate
.header
.payload
.iter()
.map(|(digest, _)| (certificate_id, *digest))
.collect::<Vec<_>>();

self.store.remove_all(to_delete_keys).await
}

/// Execute a single certificate.
async fn execute_certificate(&mut self, message: &ConsensusOutput) -> SubscriberResult<()> {
// Skip the certificate if it contains no transactions.
Expand All @@ -134,16 +113,14 @@ where
}

// Execute every batch in the certificate.
let certificate_id = message.certificate.digest();
let total_batches = message.certificate.header.payload.len();
for (index, digest) in message.certificate.header.payload.keys().enumerate() {
// Skip batches that we already executed (after crash-recovery).
if self
.execution_indices
.check_next_batch_index(index as SequenceNumber)
{
self.execute_batch(message, certificate_id, *digest, total_batches)
.await?;
self.execute_batch(message, *digest, total_batches).await?;
}
}
Ok(())
Expand All @@ -153,12 +130,11 @@ where
async fn execute_batch(
&mut self,
consensus_output: &ConsensusOutput,
certificate_id: CertificateDigest,
batch_digest: BatchDigest,
total_batches: usize,
) -> SubscriberResult<()> {
// The store should now hold all transaction data referenced by the input certificate.
let transactions = match self.store.read((certificate_id, batch_digest)).await? {
let transactions = match self.store.read(batch_digest).await? {
Some(x) => x.0,
None => {
// If two certificates contain the exact same batch (eg. by the actions of a Byzantine
Expand Down
4 changes: 2 additions & 2 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ mod metrics;

pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult};
pub use state::ExecutionIndices;
use tracing::info;

use crate::{core::Core, metrics::ExecutorMetrics, subscriber::Subscriber};
use async_trait::async_trait;
Expand All @@ -32,6 +31,7 @@ use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::info;
use types::{
metered_channel, Batch, BatchDigest, CertificateDigest, ConsensusStore,
ReconfigureNotification, SequenceNumber,
Expand Down Expand Up @@ -94,7 +94,7 @@ pub struct Executor;
impl Executor {
/// Spawn a new client subscriber.
pub async fn spawn<State>(
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
execution_state: Arc<State>,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
Expand Down
38 changes: 13 additions & 25 deletions executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{
use tracing::error;
use types::{
bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest,
CertificateDigest, ReconfigureNotification,
ReconfigureNotification,
};

#[cfg(test)]
Expand All @@ -29,7 +29,7 @@ pub mod subscriber_tests;
/// forward the certificates to the Executor Core.
pub struct Subscriber {
/// The temporary storage holding all transactions' data (that may be too big to hold in memory).
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
/// Receive reconfiguration updates.
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
/// A channel to receive consensus messages.
Expand All @@ -54,7 +54,7 @@ impl Subscriber {
/// Spawn a new subscriber in a new tokio task.
#[must_use]
pub fn spawn(
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
Expand Down Expand Up @@ -158,41 +158,32 @@ impl Subscriber {
/// fetched the payload.
async fn wait_on_payload(
back_off_policy: ExponentialBackoff,
store: Store<(CertificateDigest, BatchDigest), Batch>,
store: Store<BatchDigest, Batch>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
deliver: ConsensusOutput,
) -> SubscriberResult<ConsensusOutput> {
let get_block = move || {
let message = deliver.clone();
let certificate_id = message.certificate.digest();
let id = message.certificate.digest();
let tx_get_block = tx_get_block_commands.clone();
let batch_store = store.clone();

async move {
let (sender, receiver) = oneshot::channel();

tx_get_block
.send(BlockCommand::GetBlock {
id: certificate_id,
sender,
})
.send(BlockCommand::GetBlock { id, sender })
.await
.map_err(|err| {
Error::permanent(PayloadRetrieveError(certificate_id, err.to_string()))
})?;
.map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))?;

match receiver.await.map_err(|err| {
Error::permanent(PayloadRetrieveError(certificate_id, err.to_string()))
})? {
match receiver
.await
.map_err(|err| Error::permanent(PayloadRetrieveError(id, err.to_string())))?
{
Ok(block) => {
// we successfully received the payload. Now let's add to store
batch_store
.write_all(
block
.batches
.into_iter()
.map(|b| ((certificate_id, b.id), b.transactions)),
)
.write_all(block.batches.into_iter().map(|b| (b.id, b.transactions)))
.await
.map_err(|err| Error::permanent(SubscriberError::from(err)))?;

Expand All @@ -202,10 +193,7 @@ impl Subscriber {
// whatever the error might be at this point we don't
// have many options apart from retrying.
error!("Error while retrieving block via block waiter: {}", err);
Err(Error::transient(PayloadRetrieveError(
certificate_id,
err.to_string(),
)))
Err(Error::transient(PayloadRetrieveError(id, err.to_string())))
}
}
}
Expand Down
74 changes: 15 additions & 59 deletions executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ async fn execute_transactions() {
/* certificates */ 2, /* batches_per_certificate */ 2,
/* transactions_per_batch */ 2,
);
for (certificate, batches) in certificates.clone() {
for (certificate, batches) in certificates {
for (digest, batch) in batches {
store.write((certificate.digest(), digest), batch).await;
store.write(digest, batch).await;
}
let message = ConsensusOutput {
certificate,
Expand All @@ -55,9 +55,6 @@ async fn execute_transactions() {
next_transaction_index: 0,
};
assert_eq!(execution_state.get_execution_indices().await, expected);

// Ensure the storage has been cleaned up
assert_storage_cleaned_up(store, certificates).await;
}

#[tokio::test]
Expand Down Expand Up @@ -97,7 +94,7 @@ async fn execute_empty_certificate() {
);
for (certificate, batches) in certificates {
for (digest, batch) in batches {
store.write((certificate.digest(), digest), batch).await;
store.write(digest, batch).await;
}
let message = ConsensusOutput {
certificate,
Expand Down Expand Up @@ -141,9 +138,10 @@ async fn execute_malformed_transactions() {
let tx1 = 10;
let (digest, batch) = test_batch(vec![tx0, tx1]);

store.write(digest, batch).await;

let payload = [(digest, 0)].iter().cloned().collect();
let certificate = test_certificate(payload);
store.write((certificate.digest(), digest), batch).await;

let message = ConsensusOutput {
certificate,
Expand All @@ -156,9 +154,9 @@ async fn execute_malformed_transactions() {
/* certificates */ 2, /* batches_per_certificate */ 2,
/* transactions_per_batch */ 2,
);
for (certificate, batches) in certificates.clone() {
for (certificate, batches) in certificates {
for (digest, batch) in batches {
store.write((certificate.digest(), digest), batch).await;
store.write(digest, batch).await;
}
let message = ConsensusOutput {
certificate,
Expand All @@ -175,9 +173,6 @@ async fn execute_malformed_transactions() {
next_transaction_index: 0,
};
assert_eq!(execution_state.get_execution_indices().await, expected);

// Ensure the storage has been cleaned up
assert_storage_cleaned_up(store, certificates).await;
}

#[tokio::test]
Expand Down Expand Up @@ -210,12 +205,11 @@ async fn internal_error_execution() {
let (digest_0, batch_0) = test_batch(vec![tx00, tx01]);
let (digest_1, batch_1) = test_batch(vec![tx10, tx11]);

store.write(digest_0, batch_0).await;
store.write(digest_1, batch_1).await;

let payload = [(digest_0, 0), (digest_1, 1)].iter().cloned().collect();
let certificate = test_certificate(payload);
let certificate_id = certificate.digest();

store.write((certificate_id, digest_0), batch_0).await;
store.write((certificate_id, digest_1), batch_1).await;

let message = ConsensusOutput {
certificate,
Expand All @@ -231,18 +225,6 @@ async fn internal_error_execution() {
next_transaction_index: 1,
};
assert_eq!(execution_state.get_execution_indices().await, expected);

// We don't expect storage to get cleaned up in this case
assert!(store
.read((certificate_id, digest_0))
.await
.unwrap()
.is_some());
assert!(store
.read((certificate_id, digest_1))
.await
.unwrap()
.is_some());
}

#[tokio::test]
Expand Down Expand Up @@ -272,7 +254,7 @@ async fn crash_recovery() {
);
for (certificate, batches) in certificates {
for (digest, batch) in batches {
store.write((certificate.digest(), digest), batch).await;
store.write(digest, batch).await;
}
let message = ConsensusOutput {
certificate,
Expand All @@ -287,11 +269,11 @@ async fn crash_recovery() {
let tx1 = KILLER_TRANSACTION;
let (digest, batch) = test_batch(vec![tx0, tx1]);

store.write(digest, batch).await;

let payload = [(digest, 0)].iter().cloned().collect();
let certificate = test_certificate(payload);

store.write((certificate.digest(), digest), batch).await;

let message = ConsensusOutput {
certificate,
consensus_index: SequenceNumber::default(),
Expand Down Expand Up @@ -325,9 +307,9 @@ async fn crash_recovery() {
/* certificates */ 2, /* batches_per_certificate */ 2,
/* transactions_per_batch */ 2,
);
for (certificate, batches) in certificates.clone() {
for (certificate, batches) in certificates {
for (digest, batch) in batches {
store.write((certificate.digest(), digest), batch).await;
store.write(digest, batch).await;
}
let message = ConsensusOutput {
certificate,
Expand All @@ -344,30 +326,4 @@ async fn crash_recovery() {
next_transaction_index: 0,
};
assert_eq!(execution_state.get_execution_indices().await, expected);

// Ensure the storage has been cleaned up
assert_storage_cleaned_up(store, certificates).await;
}

async fn assert_storage_cleaned_up(
store: Store<(CertificateDigest, BatchDigest), Batch>,
certificates: Vec<(Certificate, Vec<(BatchDigest, Batch)>)>,
) {
// Ensure the storage has been cleaned up
for (certificate, batches) in certificates {
let result = store
.read_all(
batches
.into_iter()
.map(|(id, _)| (certificate.digest(), id))
.collect::<Vec<_>>(),
)
.await
.unwrap();

assert!(
result.iter().all(Option::is_none),
"Expected to not found any batches still stored for this certificate"
);
}
}
Loading

0 comments on commit c7b1b63

Please sign in to comment.