Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StateComputeResult an combination of immutable stage outputs #15041

Merged
merged 7 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl PipelinedBlock {
pub fn block_info(&self) -> BlockInfo {
self.block().gen_block_info(
self.compute_result().root_hash(),
self.compute_result().version(),
self.compute_result().last_version_or_0(),
self.compute_result().epoch_state().clone(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new_empty(Arc::new(
let result = StateComputeResult::new_dummy_with_accumulator(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ async fn test_insert_vote() {
VoteData::new(
block.block().gen_block_info(
block.compute_result().root_hash(),
block.compute_result().version(),
block.compute_result().last_version_or_0(),
block.compute_result().epoch_state().clone(),
),
block.quorum_cert().certified_block().clone(),
Expand Down Expand Up @@ -319,7 +319,7 @@ async fn test_insert_vote() {
VoteData::new(
block.block().gen_block_info(
block.compute_result().root_hash(),
block.compute_result().version(),
block.compute_result().last_version_or_0(),
block.compute_result().epoch_state().clone(),
),
block.quorum_cert().certified_block().clone(),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlo
.observe(block.block().payload().map_or(0, |payload| payload.size()) as f64);
COMMITTED_BLOCKS_COUNT.inc();
LAST_COMMITTED_ROUND.set(block.round() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().last_version_or_0() as i64);

let failed_rounds = block
.block()
Expand Down
17 changes: 8 additions & 9 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use crate::{
use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorError, ExecutorResult,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, warn};
Expand Down Expand Up @@ -213,7 +212,7 @@ impl ExecutionPipeline {
let block_id = block.block_id;
debug!("execute_stage received block {}.", block_id);
let executor = executor.clone();
let state_checkpoint_output = monitor!(
let execution_time = monitor!(
"execute_block",
tokio::task::spawn_blocking(move || {
fail_point!("consensus::compute", |_| {
Expand All @@ -228,7 +227,7 @@ impl ExecutionPipeline {
parent_block_id,
block_executor_onchain_config,
)
.map(|output| (output, start.elapsed()))
.map(|_| start.elapsed())
})
.await
)
Expand All @@ -239,7 +238,7 @@ impl ExecutionPipeline {
input_txns,
block_id,
parent_block_id,
state_checkpoint_output,
execution_time,
pre_commit_hook,
result_tx,
command_creation_time: Instant::now(),
Expand All @@ -260,7 +259,7 @@ impl ExecutionPipeline {
input_txns,
block_id,
parent_block_id,
state_checkpoint_output: execution_result,
execution_time,
pre_commit_hook,
result_tx,
command_creation_time,
Expand All @@ -270,12 +269,12 @@ impl ExecutionPipeline {
counters::APPLY_LEDGER_WAIT_TIME.observe_duration(command_creation_time.elapsed());
debug!("ledger_apply stage received block {}.", block_id);
let res = async {
let (state_checkpoint_output, execution_duration) = execution_result?;
let execution_duration = execution_time?;
let executor = executor.clone();
monitor!(
"ledger_apply",
tokio::task::spawn_blocking(move || {
executor.ledger_update(block_id, parent_block_id, state_checkpoint_output)
executor.ledger_update(block_id, parent_block_id)
})
.await
)
Expand Down Expand Up @@ -389,7 +388,7 @@ struct LedgerApplyCommand {
input_txns: Vec<SignedTransaction>,
block_id: HashValue,
parent_block_id: HashValue,
state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>,
execution_time: ExecutorResult<Duration>,
pre_commit_hook: PreCommitHook,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub fn prepare_executed_blocks_with_ledger_info(
let li = LedgerInfo::new(
proposals.last().unwrap().block().gen_block_info(
compute_result.root_hash(),
compute_result.version(),
compute_result.last_version_or_0(),
compute_result.epoch_state().clone(),
),
consensus_hash,
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ impl ExecutionProxy {
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result
.ledger_update_output
.transactions
.clone();
let _timer = counters::OP_COUNTERS.timer("pre_commit_notify");

let txns = state_compute_result.transactions_to_commit().to_vec();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand Down Expand Up @@ -467,7 +466,6 @@ async fn test_commit_sync_race() {
};
use aptos_config::config::transaction_filter_type::Filter;
use aptos_consensus_notifications::Error;
use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput;
use aptos_infallible::Mutex;
use aptos_types::{
aggregate_signature::AggregateSignature,
Expand Down Expand Up @@ -506,15 +504,14 @@ async fn test_commit_sync_race() {
_block: ExecutableBlock,
_parent_block_id: HashValue,
_onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
todo!()
}

fn ledger_update(
&self,
_block_id: HashValue,
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
todo!()
}
Expand Down
8 changes: 3 additions & 5 deletions consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use aptos_consensus_notifications::{ConsensusNotificationSender, Error};
use aptos_consensus_types::{block::Block, block_data::BlockData};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorResult,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorResult,
};
use aptos_infallible::Mutex;
use aptos_types::{
Expand Down Expand Up @@ -118,16 +117,15 @@ impl BlockExecutorTrait for DummyBlockExecutor {
block: ExecutableBlock,
_parent_block_id: HashValue,
_onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
self.blocks_received.lock().push(block);
Ok(StateCheckpointOutput::default())
Ok(())
}

fn ledger_update(
&self,
_block_id: HashValue,
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let txns = self
.blocks_received
Expand Down
36 changes: 23 additions & 13 deletions execution/executor-benchmark/src/ledger_update_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::pipeline::{CommitBlockMessage, LedgerUpdateMessage};
use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor};
use aptos_executor_types::BlockExecutorTrait;
use aptos_types::transaction::Version;
use std::sync::{mpsc, Arc};

pub enum CommitProcessing {
Expand All @@ -18,7 +17,9 @@ pub enum CommitProcessing {
pub struct LedgerUpdateStage<V> {
executor: Arc<BlockExecutor<V>>,
commit_processing: CommitProcessing,
version: Version,
allow_aborts: bool,
allow_discards: bool,
allow_retries: bool,
}

impl<V> LedgerUpdateStage<V>
Expand All @@ -28,52 +29,61 @@ where
pub fn new(
executor: Arc<BlockExecutor<V>>,
commit_processing: CommitProcessing,
version: Version,
allow_aborts: bool,
allow_discards: bool,
allow_retries: bool,
) -> Self {
Self {
executor,
version,
commit_processing,
allow_aborts,
allow_discards,
allow_retries,
}
}

pub fn ledger_update(&mut self, ledger_update_message: LedgerUpdateMessage) {
// let ledger_update_start_time = Instant::now();
let LedgerUpdateMessage {
first_block_start_time,
current_block_start_time,
execution_time,
partition_time,
execution_time,
block_id,
parent_block_id,
state_checkpoint_output,
first_block_start_time,
num_input_txns,
} = ledger_update_message;

let output = self
.executor
.ledger_update(block_id, parent_block_id, state_checkpoint_output)
.ledger_update(block_id, parent_block_id)
.unwrap();

self.version += output.transactions_to_commit_len() as Version;
output.execution_output.check_aborts_discards_retries(
self.allow_aborts,
self.allow_discards,
self.allow_retries,
);
if !self.allow_retries {
assert_eq!(output.num_transactions_to_commit(), num_input_txns + 1);
}

match &self.commit_processing {
CommitProcessing::SendToQueue(commit_sender) => {
let msg = CommitBlockMessage {
block_id,
root_hash: output.root_hash(),
first_block_start_time,
current_block_start_time,
partition_time,
execution_time,
num_txns: output.transactions_to_commit_len(),
output,
};
commit_sender.send(msg).unwrap();
},
CommitProcessing::ExecuteInline => {
let ledger_info_with_sigs = super::transaction_committer::gen_li_with_sigs(
block_id,
output.root_hash(),
self.version,
output.expect_last_version(),
);
self.executor.pre_commit_block(block_id).unwrap();
self.executor.commit_ledger(ledger_info_with_sigs).unwrap();
Expand Down
12 changes: 6 additions & 6 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ pub fn run_benchmark<V>(
((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::<Vec<_>>(), phase)
});

let version = db.reader.expect_synced_version();

let start_version = db.reader.expect_synced_version();
let (pipeline, block_sender) =
Pipeline::new(executor, version, &pipeline_config, Some(num_blocks));
Pipeline::new(executor, start_version, &pipeline_config, Some(num_blocks));

let mut num_accounts_to_load = num_main_signer_accounts;
if let Some(mix) = &transaction_mix {
Expand Down Expand Up @@ -235,7 +234,8 @@ pub fn run_benchmark<V>(
);

if !pipeline_config.skip_commit {
let num_txns = db.reader.expect_synced_version() - version - num_blocks_created as u64;
let num_txns =
db.reader.expect_synced_version() - start_version - num_blocks_created as u64;
overall_measuring.print_end("Overall", num_txns);

if verify_sequence_numbers {
Expand All @@ -259,10 +259,10 @@ fn init_workload<V>(
where
V: TransactionBlockExecutor + 'static,
{
let version = db.reader.expect_synced_version();
let start_version = db.reader.expect_synced_version();
let (pipeline, block_sender) = Pipeline::<V>::new(
BlockExecutor::new(db.clone()),
version,
start_version,
pipeline_config,
None,
);
Expand Down
Loading
Loading