Skip to content

Commit

Permalink
StateComputeResult carries real execution result
Browse files Browse the repository at this point in the history
avoids unnecessary cloning
  • Loading branch information
msmouse committed Oct 15, 2024
1 parent 2a0e7d6 commit 68f3f93
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 348 deletions.
7 changes: 2 additions & 5 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PipelinedBlock {
/// The state_compute_result is calculated for all the pending blocks prior to insertion to
/// the tree. The execution results are not persisted: they're recalculated again for the
/// pending blocks upon restart.
#[derivative(PartialEq = "ignore")]
state_compute_result: StateComputeResult,
randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
Expand All @@ -62,14 +63,12 @@ impl Serialize for PipelinedBlock {
struct SerializedBlock<'a> {
block: &'a Block,
input_transactions: &'a Vec<SignedTransaction>,
state_compute_result: &'a StateComputeResult,
randomness: Option<&'a Randomness>,
}

let serialized = SerializedBlock {
block: &self.block,
input_transactions: &self.input_transactions,
state_compute_result: &self.state_compute_result,
randomness: self.randomness.get(),
};
serialized.serialize(serializer)
Expand All @@ -86,21 +85,19 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
struct SerializedBlock {
block: Block,
input_transactions: Vec<SignedTransaction>,
state_compute_result: StateComputeResult,
randomness: Option<Randomness>,
}

let SerializedBlock {
block,
input_transactions,
state_compute_result,
randomness,
} = SerializedBlock::deserialize(deserializer)?;

let block = PipelinedBlock {
block,
input_transactions,
state_compute_result,
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
Expand Down
24 changes: 11 additions & 13 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_executor_types::StateComputeResult;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator,
};
use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
Expand Down Expand Up @@ -175,18 +177,14 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves, /* num_leaves */
vec![], /* parent_root_hashes */
0, /* parent_num_leaves */
None, /* epoch_state */
vec![], /* compute_status */
vec![], /* txn_infos */
vec![], /* reconfig_events */
None, // block end info
);
let result = StateComputeResult::new_empty(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
)
.expect("Failed to recover accumulator."),
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
*root_block,
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aptos_types::{
block_info::{BlockInfo, Round},
ledger_info::LedgerInfoWithSignatures,
};
use mirai_annotations::{checked_verify_eq, precondition};
use mirai_annotations::precondition;
use std::{
collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -249,7 +249,8 @@ impl BlockTree {
existing_block,
block_id,
block);
checked_verify_eq!(existing_block.compute_result(), block.compute_result());
// FIXME(aldenhu): confirm that it's okay to remove
// checked_verify_eq!(existing_block.compute_result(), block.compute_result());
Ok(existing_block)
} else {
match self.get_linkable_block_mut(&block.parent_id()) {
Expand Down
9 changes: 3 additions & 6 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

pub type PreCommitHook = Box<
dyn 'static
+ FnOnce(&[SignedTransaction], &StateComputeResult) -> BoxFuture<'static, ()>
+ Send,
>;
pub type PreCommitHook =
Box<dyn 'static + FnOnce(&StateComputeResult) -> BoxFuture<'static, ()> + Send>;

#[allow(clippy::unwrap_used)]
pub static SIG_VERIFY_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Expand Down Expand Up @@ -287,7 +284,7 @@ impl ExecutionPipeline {
}
.await;
let pipeline_res = res.map(|(output, execution_duration)| {
let pre_commit_hook_fut = pre_commit_hook(&input_txns, &output);
let pre_commit_hook_fut = pre_commit_hook(&output);
let pre_commit_fut: BoxFuture<'static, ExecutorResult<()>> =
if output.epoch_state().is_some() || !enable_pre_commit {
// hack: it causes issue if pre-commit is finished at an epoch ending, and
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use aptos_consensus_types::{
pipeline::commit_vote::CommitVote,
pipelined_block::PipelinedBlock,
};
use aptos_crypto::{bls12381, HashValue};
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorResult;
use aptos_logger::prelude::*;
use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::ProtocolId};
Expand Down Expand Up @@ -703,7 +703,7 @@ impl BufferManager {
CommitMessage::Vote(CommitVote::new_with_signature(
commit_vote.author(),
commit_vote.ledger_info().clone(),
bls12381::Signature::dummy_signature(),
aptos_crypto::bls12381::Signature::dummy_signature(),
))
});
CommitMessage::Vote(commit_vote)
Expand Down
13 changes: 1 addition & 12 deletions consensus/src/pipeline/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,7 @@ pub fn prepare_executed_blocks_with_ledger_info(
proposals.push(proposal);
}

let compute_result = StateComputeResult::new(
executed_hash,
vec![], // dummy subtree
0,
vec![],
0,
None,
vec![],
vec![],
vec![],
None, // block end info
);
let compute_result = StateComputeResult::new_dummy_with_root_hash(executed_hash);

let li = LedgerInfo::new(
proposals.last().unwrap().block().gen_block_info(
Expand Down
61 changes: 26 additions & 35 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use aptos_logger::prelude::*;
use aptos_metrics_core::IntGauge;
use aptos_types::{
account_address::AccountAddress, block_executor::config::BlockExecutorConfigFromOnchain,
block_metadata_ext::BlockMetadataExt, epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures, randomness::Randomness, transaction::SignedTransaction,
epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, randomness::Randomness,
};
use fail::fail_point;
use futures::{future::BoxFuture, SinkExt, StreamExt};
Expand Down Expand Up @@ -128,44 +127,36 @@ impl ExecutionProxy {
fn pre_commit_hook(
&self,
block: &Block,
metadata: BlockMetadataExt,
payload_manager: Arc<dyn TPayloadManager>,
) -> PreCommitHook {
let mut pre_commit_notifier = self.pre_commit_notifier.clone();
let state_sync_notifier = self.state_sync_notifier.clone();
let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
let block_id = block.id();
Box::new(
move |user_txns: &[SignedTransaction], state_compute_result: &StateComputeResult| {
let input_txns = Block::combine_to_input_transactions(
validator_txns,
user_txns.to_vec(),
metadata,
);
let txns = state_compute_result.transactions_to_commit(input_txns, block_id);
let subscribable_events = state_compute_result.subscribable_events().to_vec();
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
if let Err(e) = monitor!(
"notify_state_sync",
state_sync_notifier
.notify_new_commit(txns, subscribable_events)
.await
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
},
)
Box::new(move |state_compute_result: &StateComputeResult| {
let state_compute_result = state_compute_result.clone();
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result.transactions_to_commit();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
"notify_state_sync",
state_sync_notifier
.notify_new_commit(txns, subscribable_events)
.await
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
})
}
}

Expand Down Expand Up @@ -226,7 +217,7 @@ impl StateComputer for ExecutionProxy {
parent_block_id,
transaction_generator,
block_executor_onchain_config,
self.pre_commit_hook(block, metadata, payload_manager),
self.pre_commit_hook(block, payload_manager),
lifetime_guard,
)
.await;
Expand Down
15 changes: 8 additions & 7 deletions consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
transaction::{ExecutionStatus, SignedTransaction, Transaction, TransactionStatus},
transaction::{SignedTransaction, Transaction, TransactionStatus},
validator_txn::ValidatorTransaction,
};
use std::sync::{atomic::AtomicU64, Arc};
Expand Down Expand Up @@ -117,18 +117,19 @@ impl BlockExecutorTrait for DummyBlockExecutor {
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let num_txns = self
let txns = self
.blocks_received
.lock()
.last()
.unwrap()
.transactions
.num_transactions();
.clone()
.into_txns()
.into_iter()
.map(|t| t.into_inner())
.collect();

Ok(StateComputeResult::new_dummy_with_compute_status(vec![
TransactionStatus::Keep(ExecutionStatus::Success);
num_txns
]))
Ok(StateComputeResult::new_dummy_with_input_txns(txns))
}

fn pre_commit_block(
Expand Down
Loading

0 comments on commit 68f3f93

Please sign in to comment.