Skip to content

Commit

Permalink
wait for all futs to clear from ExecutionPipeline before dropping lif… (
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Aug 6, 2024
1 parent 9c21cf5 commit ef343b7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
19 changes: 10 additions & 9 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use aptos_executor_types::{
ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, warn};
use aptos_logger::{debug, error};
use aptos_types::{
block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock},
block_metadata_ext::BlockMetadataExt,
Expand Down Expand Up @@ -116,9 +116,9 @@ impl ExecutionPipeline {
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer.prepare_block(&block).await;
if let Err(e) = input_txns {
result_tx
.send(Err(e))
.unwrap_or_else(|value| process_failed_to_send_result(value, block.id()));
result_tx.send(Err(e)).unwrap_or_else(|value| {
process_failed_to_send_result(value, block.id(), "prepare")
});
return;
}
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
Expand Down Expand Up @@ -242,9 +242,9 @@ impl ExecutionPipeline {
let pipe_line_res = res.map(|(output, execution_duration)| {
PipelineExecutionResult::new(input_txns, output, execution_duration)
});
result_tx
.send(pipe_line_res)
.unwrap_or_else(|value| process_failed_to_send_result(value, block_id));
result_tx.send(pipe_line_res).unwrap_or_else(|value| {
process_failed_to_send_result(value, block_id, "ledger_apply")
});
}
debug!("ledger_apply stage quitting.");
}
Expand Down Expand Up @@ -279,11 +279,12 @@ struct LedgerApplyCommand {
fn process_failed_to_send_result(
value: Result<PipelineExecutionResult, ExecutorError>,
block_id: HashValue,
from_stage: &str,
) {
warn!(
error!(
block_id = block_id,
is_err = value.is_err(),
"Failed to send back execution result",
"Failed to send back execution result from {from_stage} stage",
);
if let Err(e) = value {
// receive channel discarding error, log for debugging.
Expand Down
23 changes: 15 additions & 8 deletions consensus/src/pipeline/execution_schedule_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorError;
use aptos_executor_types::{ExecutorError, ExecutorResult};
use aptos_logger::debug;
use async_trait::async_trait;
use futures::TryFutureExt;
Expand Down Expand Up @@ -92,15 +92,22 @@ impl StatelessPipeline for ExecutionSchedulePhase {
// ExecutionWait phase is never kicked off.
let fut = tokio::task::spawn(async move {
let mut results = vec![];
for (block, fut) in itertools::zip_eq(ordered_blocks, futs) {
// wait for all futs so that lifetime_guard is guaranteed to be dropped only
// after all executor calls are over
for (block, fut) in itertools::zip_eq(&ordered_blocks, futs) {
debug!("try to receive compute result for block {}", block.id());
let PipelineExecutionResult {
input_txns,
result,
execution_time,
} = fut.await?;
results.push(block.set_execution_result(input_txns, result, execution_time));
results.push(fut.await)
}
let results = itertools::zip_eq(ordered_blocks, results)
.map(|(block, res)| {
let PipelineExecutionResult {
input_txns,
result,
execution_time,
} = res?;
Ok(block.set_execution_result(input_txns, result, execution_time))
})
.collect::<ExecutorResult<Vec<_>>>()?;
drop(lifetime_guard);
Ok(results)
})
Expand Down

0 comments on commit ef343b7

Please sign in to comment.