Skip to content

Commit

Permalink
Makes StateComputeResult an combination of immutable stage outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 26, 2024
1 parent 5010494 commit 1a0bb2d
Show file tree
Hide file tree
Showing 49 changed files with 1,234 additions and 1,209 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl PipelinedBlock {
pub fn block_info(&self) -> BlockInfo {
self.block().gen_block_info(
self.compute_result().root_hash(),
self.compute_result().version(),
self.compute_result().last_version_or_0(),
self.compute_result().epoch_state().clone(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new_empty(Arc::new(
let result = StateComputeResult::new_dummy_with_accumulator(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ async fn test_insert_vote() {
VoteData::new(
block.block().gen_block_info(
block.compute_result().root_hash(),
block.compute_result().version(),
block.compute_result().last_version_or_0(),
block.compute_result().epoch_state().clone(),
),
block.quorum_cert().certified_block().clone(),
Expand Down Expand Up @@ -319,7 +319,7 @@ async fn test_insert_vote() {
VoteData::new(
block.block().gen_block_info(
block.compute_result().root_hash(),
block.compute_result().version(),
block.compute_result().last_version_or_0(),
block.compute_result().epoch_state().clone(),
),
block.quorum_cert().certified_block().clone(),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlo
.observe(block.block().payload().map_or(0, |payload| payload.size()) as f64);
COMMITTED_BLOCKS_COUNT.inc();
LAST_COMMITTED_ROUND.set(block.round() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().last_version_or_0() as i64);

let failed_rounds = block
.block()
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub fn prepare_executed_blocks_with_ledger_info(
let li = LedgerInfo::new(
proposals.last().unwrap().block().gen_block_info(
compute_result.root_hash(),
compute_result.version(),
compute_result.last_version_or_0(),
compute_result.epoch_state().clone(),
),
consensus_hash,
Expand Down
7 changes: 3 additions & 4 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ impl ExecutionProxy {
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result
.ledger_update_output
.transactions
.clone();
let _timer = counters::OP_COUNTERS.timer("pre_commit_notify");

let txns = state_compute_result.transactions_to_commit().to_vec();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand Down
35 changes: 23 additions & 12 deletions execution/executor-benchmark/src/ledger_update_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::pipeline::{CommitBlockMessage, LedgerUpdateMessage};
use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor};
use aptos_executor_types::BlockExecutorTrait;
use aptos_types::transaction::Version;
use std::sync::{mpsc, Arc};

pub enum CommitProcessing {
Expand All @@ -18,7 +17,9 @@ pub enum CommitProcessing {
pub struct LedgerUpdateStage<V> {
executor: Arc<BlockExecutor<V>>,
commit_processing: CommitProcessing,
version: Version,
allow_aborts: bool,
allow_discards: bool,
allow_retries: bool,
}

impl<V> LedgerUpdateStage<V>
Expand All @@ -28,52 +29,62 @@ where
pub fn new(
executor: Arc<BlockExecutor<V>>,
commit_processing: CommitProcessing,
version: Version,
allow_aborts: bool,
allow_discards: bool,
allow_retries: bool,
) -> Self {
Self {
executor,
version,
commit_processing,
allow_aborts,
allow_discards,
allow_retries,
}
}

pub fn ledger_update(&mut self, ledger_update_message: LedgerUpdateMessage) {
// let ledger_update_start_time = Instant::now();
let LedgerUpdateMessage {
first_block_start_time,
current_block_start_time,
execution_time,
partition_time,
execution_time,
block_id,
parent_block_id,
num_input_txns,
state_checkpoint_output,
first_block_start_time,
} = ledger_update_message;

let output = self
.executor
.ledger_update(block_id, parent_block_id, state_checkpoint_output)
.ledger_update(block_id, parent_block_id, state_checkpoint_output.clone())
.unwrap();

self.version += output.transactions_to_commit_len() as Version;
output.execution_output.check_aborts_discards_retries(
self.allow_aborts,
self.allow_discards,
self.allow_retries,
);
if !self.allow_retries {
assert_eq!(output.num_transactions_to_commit(), num_input_txns + 1);
}

match &self.commit_processing {
CommitProcessing::SendToQueue(commit_sender) => {
let msg = CommitBlockMessage {
block_id,
root_hash: output.root_hash(),
first_block_start_time,
current_block_start_time,
partition_time,
execution_time,
num_txns: output.transactions_to_commit_len(),
output,
};
commit_sender.send(msg).unwrap();
},
CommitProcessing::ExecuteInline => {
let ledger_info_with_sigs = super::transaction_committer::gen_li_with_sigs(
block_id,
output.root_hash(),
self.version,
output.expect_last_version(),
);
self.executor.pre_commit_block(block_id).unwrap();
self.executor.commit_ledger(ledger_info_with_sigs).unwrap();
Expand Down
12 changes: 6 additions & 6 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ pub fn run_benchmark<V>(
((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::<Vec<_>>(), phase)
});

let version = db.reader.expect_synced_version();

let start_version = db.reader.expect_synced_version();
let (pipeline, block_sender) =
Pipeline::new(executor, version, &pipeline_config, Some(num_blocks));
Pipeline::new(executor, start_version, &pipeline_config, Some(num_blocks));

let mut num_accounts_to_load = num_main_signer_accounts;
if let Some(mix) = &transaction_mix {
Expand Down Expand Up @@ -235,7 +234,8 @@ pub fn run_benchmark<V>(
);

if !pipeline_config.skip_commit {
let num_txns = db.reader.expect_synced_version() - version - num_blocks_created as u64;
let num_txns =
db.reader.expect_synced_version() - start_version - num_blocks_created as u64;
overall_measuring.print_end("Overall", num_txns);

if verify_sequence_numbers {
Expand All @@ -259,10 +259,10 @@ fn init_workload<V>(
where
V: TransactionBlockExecutor + 'static,
{
let version = db.reader.expect_synced_version();
let start_version = db.reader.expect_synced_version();
let (pipeline, block_sender) = Pipeline::<V>::new(
BlockExecutor::new(db.clone()),
version,
start_version,
pipeline_config,
None,
);
Expand Down
Loading

0 comments on commit 1a0bb2d

Please sign in to comment.