Skip to content

Commit

Permalink
Do not persist StateComputeResult
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 15, 2024
1 parent 2a0e7d6 commit e07da31
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 268 deletions.
7 changes: 2 additions & 5 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PipelinedBlock {
/// The state_compute_result is calculated for all the pending blocks prior to insertion to
/// the tree. The execution results are not persisted: they're recalculated again for the
/// pending blocks upon restart.
#[derivative(PartialEq = "ignore")]
state_compute_result: StateComputeResult,
randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
Expand All @@ -62,14 +63,12 @@ impl Serialize for PipelinedBlock {
struct SerializedBlock<'a> {
block: &'a Block,
input_transactions: &'a Vec<SignedTransaction>,
state_compute_result: &'a StateComputeResult,
randomness: Option<&'a Randomness>,
}

let serialized = SerializedBlock {
block: &self.block,
input_transactions: &self.input_transactions,
state_compute_result: &self.state_compute_result,
randomness: self.randomness.get(),
};
serialized.serialize(serializer)
Expand All @@ -86,21 +85,19 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
struct SerializedBlock {
block: Block,
input_transactions: Vec<SignedTransaction>,
state_compute_result: StateComputeResult,
randomness: Option<Randomness>,
}

let SerializedBlock {
block,
input_transactions,
state_compute_result,
randomness,
} = SerializedBlock::deserialize(deserializer)?;

let block = PipelinedBlock {
block,
input_transactions,
state_compute_result,
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
Expand Down
24 changes: 11 additions & 13 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_executor_types::StateComputeResult;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator,
};
use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
Expand Down Expand Up @@ -175,18 +177,14 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves, /* num_leaves */
vec![], /* parent_root_hashes */
0, /* parent_num_leaves */
None, /* epoch_state */
vec![], /* compute_status */
vec![], /* txn_infos */
vec![], /* reconfig_events */
None, // block end info
);
let result = StateComputeResult::new_empty(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
)
.expect("Failed to recover accumulator."),
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
*root_block,
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aptos_types::{
block_info::{BlockInfo, Round},
ledger_info::LedgerInfoWithSignatures,
};
use mirai_annotations::{checked_verify_eq, precondition};
use mirai_annotations::precondition;
use std::{
collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -249,7 +249,8 @@ impl BlockTree {
existing_block,
block_id,
block);
checked_verify_eq!(existing_block.compute_result(), block.compute_result());
// FIXME(aldenhu): confirm that it's okay to remove
// checked_verify_eq!(existing_block.compute_result(), block.compute_result());
Ok(existing_block)
} else {
match self.get_linkable_block_mut(&block.parent_id()) {
Expand Down
13 changes: 1 addition & 12 deletions consensus/src/pipeline/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,7 @@ pub fn prepare_executed_blocks_with_ledger_info(
proposals.push(proposal);
}

let compute_result = StateComputeResult::new(
executed_hash,
vec![], // dummy subtree
0,
vec![],
0,
None,
vec![],
vec![],
vec![],
None, // block end info
);
let compute_result = StateComputeResult::new_dummy_with_root_hash(executed_hash);

let li = LedgerInfo::new(
proposals.last().unwrap().block().gen_block_info(
Expand Down
118 changes: 92 additions & 26 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,78 @@ use aptos_types::{
},
};
use itertools::zip_eq;
use std::sync::Arc;
use std::{ops::Deref, sync::Arc};

#[derive(Default, Debug)]
#[derive(Clone, Debug, Default)]
pub struct LedgerUpdateOutput {
inner: Arc<Inner>,
}

impl Deref for LedgerUpdateOutput {
type Target = Inner;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl LedgerUpdateOutput {
pub fn new_empty(transaction_accumulator: Arc<InMemoryTransactionAccumulator>) -> Self {
Self::new_impl(Inner::new_empty(transaction_accumulator))
}

pub fn new_dummy_with_compute_status(statuses: Vec<TransactionStatus>) -> Self {
Self::new_impl(Inner::new_dummy_with_compute_status(statuses))
}

pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self {
Self::new_impl(Inner::new_dummy_with_root_hash(root_hash))
}

pub fn reconfig_suffix(&self) -> Self {
Self::new_impl(Inner::new_empty(self.transaction_accumulator.clone()))
}

pub fn new(
statuses_for_input_txns: Vec<TransactionStatus>,
to_commit: Vec<TransactionToCommit>,
subscribable_events: Vec<ContractEvent>,
transaction_info_hashes: Vec<HashValue>,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
sharded_state_cache: ShardedStateCache,
transaction_accumulator: Arc<InMemoryTransactionAccumulator>,
parent_accumulator: Arc<InMemoryTransactionAccumulator>,
block_end_info: Option<BlockEndInfo>,
) -> Self {
Self::new_impl(Inner {
statuses_for_input_txns,
to_commit,
subscribable_events,
transaction_info_hashes,
state_updates_until_last_checkpoint,
sharded_state_cache,
transaction_accumulator,
parent_accumulator,
block_end_info,
})
}

fn new_impl(inner: Inner) -> Self {
Self {
inner: Arc::new(inner),
}
}

pub fn as_state_compute_result(
&self,
next_epoch_state: Option<EpochState>,
) -> StateComputeResult {
StateComputeResult::new(self.clone(), next_epoch_state)
}
}

#[derive(Default, Debug)]
pub struct Inner {
pub statuses_for_input_txns: Vec<TransactionStatus>,
pub to_commit: Vec<TransactionToCommit>,
pub subscribable_events: Vec<ContractEvent>,
Expand All @@ -31,20 +99,33 @@ pub struct LedgerUpdateOutput {
/// The in-memory Merkle Accumulator representing a blockchain state consistent with the
/// `state_tree`.
pub transaction_accumulator: Arc<InMemoryTransactionAccumulator>,
pub parent_accumulator: Arc<InMemoryTransactionAccumulator>,
pub block_end_info: Option<BlockEndInfo>,
}

impl LedgerUpdateOutput {
impl Inner {
pub fn new_empty(transaction_accumulator: Arc<InMemoryTransactionAccumulator>) -> Self {
Self {
parent_accumulator: transaction_accumulator.clone(),
transaction_accumulator,
..Default::default()
}
}

pub fn reconfig_suffix(&self) -> Self {
pub fn new_dummy_with_compute_status(statuses: Vec<TransactionStatus>) -> Self {
Self {
transaction_accumulator: Arc::clone(&self.transaction_accumulator),
statuses_for_input_txns: statuses,
..Default::default()
}
}

pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self {
let transaction_accumulator = Arc::new(
InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash),
);
Self {
parent_accumulator: transaction_accumulator.clone(),
transaction_accumulator,
..Default::default()
}
}
Expand Down Expand Up @@ -98,31 +179,16 @@ impl LedgerUpdateOutput {
Ok(())
}

pub fn as_state_compute_result(
&self,
parent_accumulator: &Arc<InMemoryTransactionAccumulator>,
next_epoch_state: Option<EpochState>,
) -> StateComputeResult {
let txn_accu = self.txn_accumulator();

StateComputeResult::new(
txn_accu.root_hash(),
txn_accu.frozen_subtree_roots().clone(),
txn_accu.num_leaves(),
parent_accumulator.frozen_subtree_roots().clone(),
parent_accumulator.num_leaves(),
next_epoch_state,
self.statuses_for_input_txns.clone(),
self.transaction_info_hashes.clone(),
self.subscribable_events.clone(),
self.block_end_info.clone(),
)
}

pub fn next_version(&self) -> Version {
self.transaction_accumulator.num_leaves() as Version
}

pub fn last_version(&self) -> Version {
self.next_version()
.checked_sub(1)
.expect("Empty block before genesis.")
}

pub fn first_version(&self) -> Version {
self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version
}
Expand Down
Loading

0 comments on commit e07da31

Please sign in to comment.