Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: StateComputeResult carries full execution result #14945

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PipelinedBlock {
/// The state_compute_result is calculated for all the pending blocks prior to insertion to
/// the tree. The execution results are not persisted: they're recalculated again for the
/// pending blocks upon restart.
#[derivative(PartialEq = "ignore")]
state_compute_result: StateComputeResult,
Comment on lines +47 to 48
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay? @zekun000
(Implementing Eq on these are confusing in the first place IMO.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably?

randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
Expand Down
24 changes: 11 additions & 13 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_executor_types::StateComputeResult;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator,
};
use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
Expand Down Expand Up @@ -175,18 +177,14 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves, /* num_leaves */
vec![], /* parent_root_hashes */
0, /* parent_num_leaves */
None, /* epoch_state */
vec![], /* compute_status */
vec![], /* txn_infos */
vec![], /* reconfig_events */
None, // block end info
);
let result = StateComputeResult::new_empty(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
)
.expect("Failed to recover accumulator."),
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
*root_block,
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aptos_types::{
block_info::{BlockInfo, Round},
ledger_info::LedgerInfoWithSignatures,
};
use mirai_annotations::{checked_verify_eq, precondition};
use mirai_annotations::precondition;
use std::{
collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -249,7 +249,6 @@ impl BlockTree {
existing_block,
block_id,
block);
checked_verify_eq!(existing_block.compute_result(), block.compute_result());
Ok(existing_block)
} else {
match self.get_linkable_block_mut(&block.parent_id()) {
Expand Down
9 changes: 3 additions & 6 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

pub type PreCommitHook = Box<
dyn 'static
+ FnOnce(&[SignedTransaction], &StateComputeResult) -> BoxFuture<'static, ()>
+ Send,
>;
pub type PreCommitHook =
Box<dyn 'static + FnOnce(&StateComputeResult) -> BoxFuture<'static, ()> + Send>;

#[allow(clippy::unwrap_used)]
pub static SIG_VERIFY_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Expand Down Expand Up @@ -287,7 +284,7 @@ impl ExecutionPipeline {
}
.await;
let pipeline_res = res.map(|(output, execution_duration)| {
let pre_commit_hook_fut = pre_commit_hook(&input_txns, &output);
let pre_commit_hook_fut = pre_commit_hook(&output);
let pre_commit_fut: BoxFuture<'static, ExecutorResult<()>> =
if output.epoch_state().is_some() || !enable_pre_commit {
// hack: it causes issue if pre-commit is finished at an epoch ending, and
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use aptos_consensus_types::{
pipeline::commit_vote::CommitVote,
pipelined_block::PipelinedBlock,
};
use aptos_crypto::{bls12381, HashValue};
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorResult;
use aptos_logger::prelude::*;
use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::ProtocolId};
Expand Down Expand Up @@ -703,7 +703,7 @@ impl BufferManager {
CommitMessage::Vote(CommitVote::new_with_signature(
commit_vote.author(),
commit_vote.ledger_info().clone(),
bls12381::Signature::dummy_signature(),
aptos_crypto::bls12381::Signature::dummy_signature(),
))
});
CommitMessage::Vote(commit_vote)
Expand Down
13 changes: 1 addition & 12 deletions consensus/src/pipeline/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,7 @@ pub fn prepare_executed_blocks_with_ledger_info(
proposals.push(proposal);
}

let compute_result = StateComputeResult::new(
executed_hash,
vec![], // dummy subtree
0,
vec![],
0,
None,
vec![],
vec![],
vec![],
None, // block end info
);
let compute_result = StateComputeResult::new_dummy_with_root_hash(executed_hash);

let li = LedgerInfo::new(
proposals.last().unwrap().block().gen_block_info(
Expand Down
61 changes: 26 additions & 35 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use aptos_logger::prelude::*;
use aptos_metrics_core::IntGauge;
use aptos_types::{
account_address::AccountAddress, block_executor::config::BlockExecutorConfigFromOnchain,
block_metadata_ext::BlockMetadataExt, epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures, randomness::Randomness, transaction::SignedTransaction,
epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, randomness::Randomness,
};
use fail::fail_point;
use futures::{future::BoxFuture, SinkExt, StreamExt};
Expand Down Expand Up @@ -132,44 +131,36 @@ impl ExecutionProxy {
fn pre_commit_hook(
&self,
block: &Block,
metadata: BlockMetadataExt,
payload_manager: Arc<dyn TPayloadManager>,
) -> PreCommitHook {
let mut pre_commit_notifier = self.pre_commit_notifier.clone();
let state_sync_notifier = self.state_sync_notifier.clone();
let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
let block_id = block.id();
Box::new(
move |user_txns: &[SignedTransaction], state_compute_result: &StateComputeResult| {
let input_txns = Block::combine_to_input_transactions(
validator_txns,
user_txns.to_vec(),
metadata,
);
let txns = state_compute_result.transactions_to_commit(input_txns, block_id);
let subscribable_events = state_compute_result.subscribable_events().to_vec();
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
if let Err(e) = monitor!(
"notify_state_sync",
state_sync_notifier
.notify_new_commit(txns, subscribable_events)
.await
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
},
)
Box::new(move |state_compute_result: &StateComputeResult| {
let state_compute_result = state_compute_result.clone();
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result.transactions_to_commit();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
"notify_state_sync",
state_sync_notifier
.notify_new_commit(txns, subscribable_events)
.await
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
})
}
}

Expand Down Expand Up @@ -230,7 +221,7 @@ impl StateComputer for ExecutionProxy {
parent_block_id,
transaction_generator,
block_executor_onchain_config,
self.pre_commit_hook(block, metadata, payload_manager),
self.pre_commit_hook(block, payload_manager),
lifetime_guard,
)
.await;
Expand Down
15 changes: 8 additions & 7 deletions consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
transaction::{ExecutionStatus, SignedTransaction, Transaction, TransactionStatus},
transaction::{SignedTransaction, Transaction, TransactionStatus},
validator_txn::ValidatorTransaction,
};
use std::{
Expand Down Expand Up @@ -129,18 +129,19 @@ impl BlockExecutorTrait for DummyBlockExecutor {
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let num_txns = self
let txns = self
.blocks_received
.lock()
.last()
.unwrap()
.transactions
.num_transactions();
.clone()
.into_txns()
.into_iter()
.map(|t| t.into_inner())
.collect();

Ok(StateComputeResult::new_dummy_with_compute_status(vec![
TransactionStatus::Keep(ExecutionStatus::Success);
num_txns
]))
Ok(StateComputeResult::new_dummy_with_input_txns(txns))
}

fn pre_commit_block(
Expand Down
1 change: 1 addition & 0 deletions crates/aptos-drop-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ rust-version = { workspace = true }
[dependencies]
aptos-infallible = { workspace = true }
aptos-metrics-core = { workspace = true }
derive_more = { workspace = true }
once_cell = { workspace = true }
threadpool = { workspace = true }
27 changes: 26 additions & 1 deletion crates/aptos-drop-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::async_concurrent_dropper::AsyncConcurrentDropper;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;
use std::mem::ManuallyDrop;

pub mod async_concurrent_dropper;
pub mod async_drop_queue;
Expand All @@ -11,8 +13,31 @@ mod metrics;
pub static DEFAULT_DROPPER: Lazy<AsyncConcurrentDropper> =
Lazy::new(|| AsyncConcurrentDropper::new("default", 32, 8));

/// Arc<T: ArcAsyncDrop> will be `Send + 'static`, which is requried to be able to drop Arc<T>
/// Arc<T: ArcAsyncDrop> will be `Send + 'static`, which is required to be able to drop Arc<T>
/// in another thread
pub trait ArcAsyncDrop: Send + Sync + 'static {}

impl<T: Send + Sync + 'static> ArcAsyncDrop for T {}

#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Deref, DerefMut)]
#[repr(transparent)]
pub struct DropHelper<T: Send + 'static> {
#[deref]
#[deref_mut]
inner: ManuallyDrop<T>,
}

impl<T: Send + 'static> DropHelper<T> {
pub fn new(inner: T) -> Self {
Self {
inner: ManuallyDrop::new(inner),
}
}
}

impl<T: Send + 'static> Drop for DropHelper<T> {
fn drop(&mut self) {
let inner = unsafe { ManuallyDrop::take(&mut self.inner) };
DEFAULT_DROPPER.schedule_drop(inner);
}
}
2 changes: 2 additions & 0 deletions execution/executor-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-crypto = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-scratchpad = { workspace = true }
aptos-secure-net = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
criterion = { workspace = true }
derive_more = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
Expand Down
Loading
Loading