Skip to content

Commit

Permalink
remove StateCheckpointOutput from executor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 23, 2024
1 parent ce19f39 commit 5a00bcc
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 42 deletions.
17 changes: 8 additions & 9 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use crate::{
use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorError, ExecutorResult,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, warn};
Expand Down Expand Up @@ -213,7 +212,7 @@ impl ExecutionPipeline {
let block_id = block.block_id;
debug!("execute_stage received block {}.", block_id);
let executor = executor.clone();
let state_checkpoint_output = monitor!(
let execution_time = monitor!(
"execute_block",
tokio::task::spawn_blocking(move || {
fail_point!("consensus::compute", |_| {
Expand All @@ -228,7 +227,7 @@ impl ExecutionPipeline {
parent_block_id,
block_executor_onchain_config,
)
.map(|output| (output, start.elapsed()))
.map(|_| start.elapsed())
})
.await
)
Expand All @@ -239,7 +238,7 @@ impl ExecutionPipeline {
input_txns,
block_id,
parent_block_id,
state_checkpoint_output,
execution_time,
pre_commit_hook,
result_tx,
command_creation_time: Instant::now(),
Expand All @@ -260,7 +259,7 @@ impl ExecutionPipeline {
input_txns,
block_id,
parent_block_id,
state_checkpoint_output: execution_result,
execution_time,
pre_commit_hook,
result_tx,
command_creation_time,
Expand All @@ -270,12 +269,12 @@ impl ExecutionPipeline {
counters::APPLY_LEDGER_WAIT_TIME.observe_duration(command_creation_time.elapsed());
debug!("ledger_apply stage received block {}.", block_id);
let res = async {
let (state_checkpoint_output, execution_duration) = execution_result?;
let execution_duration = execution_time?;
let executor = executor.clone();
monitor!(
"ledger_apply",
tokio::task::spawn_blocking(move || {
executor.ledger_update(block_id, parent_block_id, state_checkpoint_output)
executor.ledger_update(block_id, parent_block_id)
})
.await
)
Expand Down Expand Up @@ -389,7 +388,7 @@ struct LedgerApplyCommand {
input_txns: Vec<SignedTransaction>,
block_id: HashValue,
parent_block_id: HashValue,
state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>,
execution_time: ExecutorResult<Duration>,
pre_commit_hook: PreCommitHook,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ async fn test_commit_sync_race() {
};
use aptos_config::config::transaction_filter_type::Filter;
use aptos_consensus_notifications::Error;
use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput;
use aptos_infallible::Mutex;
use aptos_types::{
aggregate_signature::AggregateSignature,
Expand Down Expand Up @@ -505,15 +504,14 @@ async fn test_commit_sync_race() {
_block: ExecutableBlock,
_parent_block_id: HashValue,
_onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
todo!()
}

fn ledger_update(
&self,
_block_id: HashValue,
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
todo!()
}
Expand Down
8 changes: 3 additions & 5 deletions consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use aptos_consensus_notifications::{ConsensusNotificationSender, Error};
use aptos_consensus_types::{block::Block, block_data::BlockData};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorResult,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorResult,
};
use aptos_infallible::Mutex;
use aptos_types::{
Expand Down Expand Up @@ -118,16 +117,15 @@ impl BlockExecutorTrait for DummyBlockExecutor {
block: ExecutableBlock,
_parent_block_id: HashValue,
_onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
self.blocks_received.lock().push(block);
Ok(StateCheckpointOutput::default())
Ok(())
}

fn ledger_update(
&self,
_block_id: HashValue,
_parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let txns = self
.blocks_received
Expand Down
3 changes: 1 addition & 2 deletions execution/executor-benchmark/src/ledger_update_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ where
block_id,
parent_block_id,
num_input_txns,
state_checkpoint_output,
} = ledger_update_message;

let output = self
.executor
.ledger_update(block_id, parent_block_id, state_checkpoint_output.clone())
.ledger_update(block_id, parent_block_id)
.unwrap();
output.execution_output.check_aborts_discards_retries(
self.allow_aborts,
Expand Down
6 changes: 1 addition & 5 deletions execution/executor-benchmark/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use crate::{
use aptos_block_partitioner::v2::config::PartitionerV2Config;
use aptos_crypto::HashValue;
use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor};
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait,
};
use aptos_executor_types::{state_compute_result::StateComputeResult, BlockExecutorTrait};
use aptos_logger::info;
use aptos_types::{
block_executor::partitioner::ExecutableBlock,
Expand Down Expand Up @@ -270,7 +267,6 @@ pub struct LedgerUpdateMessage {
pub block_id: HashValue,
pub parent_block_id: HashValue,
pub num_input_txns: usize,
pub state_checkpoint_output: StateCheckpointOutput,
}

/// Message from execution stage to commit stage.
Expand Down
4 changes: 1 addition & 3 deletions execution/executor-benchmark/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ where
self.num_blocks_processed, block_id
);
let num_input_txns = executable_block.transactions.num_transactions();
let state_checkpoint_output = self
.executor
self.executor
.execute_and_state_checkpoint(
executable_block,
self.parent_block_id,
Expand All @@ -77,7 +76,6 @@ where
block_id,
parent_block_id: self.parent_block_id,
num_input_txns,
state_checkpoint_output,
};
self.ledger_update_sender.send(msg).unwrap();
self.parent_block_id = block_id;
Expand Down
9 changes: 3 additions & 6 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0
#![forbid(unsafe_code)]

use crate::state_checkpoint_output::StateCheckpointOutput;
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_scratchpad::{ProofRead, SparseMerkleTree};
Expand Down Expand Up @@ -135,9 +134,8 @@ pub trait BlockExecutorTrait: Send + Sync {
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateComputeResult> {
let block_id = block.block_id;
let state_checkpoint_output =
self.execute_and_state_checkpoint(block, parent_block_id, onchain_config)?;
self.ledger_update(block_id, parent_block_id, state_checkpoint_output)
self.execute_and_state_checkpoint(block, parent_block_id, onchain_config)?;
self.ledger_update(block_id, parent_block_id)
}

/// Executes a block and returns the state checkpoint output.
Expand All @@ -146,13 +144,12 @@ pub trait BlockExecutorTrait: Send + Sync {
block: ExecutableBlock,
parent_block_id: HashValue,
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput>;
) -> ExecutorResult<()>;

fn ledger_update(
&self,
block_id: HashValue,
parent_block_id: HashValue,
state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult>;

#[cfg(any(test, feature = "fuzzing"))]
Expand Down
16 changes: 7 additions & 9 deletions execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_executor_types::{
execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
execution_output::ExecutionOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorError, ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::RwLock;
Expand Down Expand Up @@ -120,7 +120,7 @@ where
block: ExecutableBlock,
parent_block_id: HashValue,
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
let _guard = CONCURRENCY_GAUGE.concurrency_with(&["block", "execute_and_state_checkpoint"]);

self.maybe_initialize()?;
Expand All @@ -135,7 +135,6 @@ where
&self,
block_id: HashValue,
parent_block_id: HashValue,
state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let _guard = CONCURRENCY_GAUGE.concurrency_with(&["block", "ledger_update"]);

Expand All @@ -144,7 +143,7 @@ where
.read()
.as_ref()
.expect("BlockExecutor is not reset")
.ledger_update(block_id, parent_block_id, state_checkpoint_output)
.ledger_update(block_id, parent_block_id)
}

fn pre_commit_block(&self, block_id: HashValue) -> ExecutorResult<()> {
Expand Down Expand Up @@ -207,7 +206,7 @@ where
block: ExecutableBlock,
parent_block_id: HashValue,
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateCheckpointOutput> {
) -> ExecutorResult<()> {
let _timer = EXECUTE_BLOCK.start_timer();
let ExecutableBlock {
block_id,
Expand Down Expand Up @@ -287,19 +286,18 @@ where
(execution_output, state_checkpoint_output)
};
let output = PartialStateComputeResult::new(execution_output);
output.set_state_checkpoint_output(state_checkpoint_output.clone());
output.set_state_checkpoint_output(state_checkpoint_output);

let _ = self
.block_tree
.add_block(parent_block_id, block_id, output)?;
Ok(state_checkpoint_output)
Ok(())
}

fn ledger_update(
&self,
block_id: HashValue,
parent_block_id: HashValue,
_state_checkpoint_output: StateCheckpointOutput,
) -> ExecutorResult<StateComputeResult> {
let _timer = UPDATE_LEDGER.start_timer();
info!(
Expand Down

0 comments on commit 5a00bcc

Please sign in to comment.