Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for all futs to clear from ExecutionPipeline before dropping lif… #14224

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use aptos_executor_types::{
ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, warn};
use aptos_logger::{debug, error};
use aptos_types::{
block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock},
block_metadata_ext::BlockMetadataExt,
Expand Down Expand Up @@ -116,9 +116,9 @@ impl ExecutionPipeline {
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer.prepare_block(&block).await;
if let Err(e) = input_txns {
result_tx
.send(Err(e))
.unwrap_or_else(|value| process_failed_to_send_result(value, block.id()));
result_tx.send(Err(e)).unwrap_or_else(|value| {
process_failed_to_send_result(value, block.id(), "prepare")
});
return;
}
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
Expand Down Expand Up @@ -242,9 +242,9 @@ impl ExecutionPipeline {
let pipe_line_res = res.map(|(output, execution_duration)| {
PipelineExecutionResult::new(input_txns, output, execution_duration)
});
result_tx
.send(pipe_line_res)
.unwrap_or_else(|value| process_failed_to_send_result(value, block_id));
result_tx.send(pipe_line_res).unwrap_or_else(|value| {
process_failed_to_send_result(value, block_id, "ledger_apply")
});
}
debug!("ledger_apply stage quitting.");
}
Expand Down Expand Up @@ -279,11 +279,12 @@ struct LedgerApplyCommand {
fn process_failed_to_send_result(
value: Result<PipelineExecutionResult, ExecutorError>,
block_id: HashValue,
from_stage: &str,
) {
warn!(
error!(
block_id = block_id,
is_err = value.is_err(),
"Failed to send back execution result",
"Failed to send back execution result from {from_stage} stage",
);
if let Err(e) = value {
// receive channel discarding error, log for debugging.
Expand Down
23 changes: 15 additions & 8 deletions consensus/src/pipeline/execution_schedule_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorError;
use aptos_executor_types::{ExecutorError, ExecutorResult};
use aptos_logger::debug;
use async_trait::async_trait;
use futures::TryFutureExt;
Expand Down Expand Up @@ -92,15 +92,22 @@ impl StatelessPipeline for ExecutionSchedulePhase {
// ExecutionWait phase is never kicked off.
let fut = tokio::task::spawn(async move {
let mut results = vec![];
for (block, fut) in itertools::zip_eq(ordered_blocks, futs) {
// wait for all futs so that lifetime_guard is guaranteed to be dropped only
// after all executor calls are over
for (block, fut) in itertools::zip_eq(&ordered_blocks, futs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about put these futs in a FuturesOrdered and poll them all together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't matter, they are already spawned and running in parallel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They run in parallel, but I notice there is a future here where we do some post-processing with some .await. And, possibility that in the future, this future can have other things.

On the other hand, polling one by one would keep the logs sequential. Otherwise, we risk reading unordered logs when debugging. Let's leave it as-is then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not sure if things in the post processing wants to be sequential.. For example, does the mempool tolerate seeing the notifications out of order?

debug!("try to receive compute result for block {}", block.id());
let PipelineExecutionResult {
input_txns,
result,
execution_time,
} = fut.await?;
results.push(block.set_execution_result(input_txns, result, execution_time));
results.push(fut.await)
}
let results = itertools::zip_eq(ordered_blocks, results)
.map(|(block, res)| {
let PipelineExecutionResult {
input_txns,
result,
execution_time,
} = res?;
Ok(block.set_execution_result(input_txns, result, execution_time))
})
.collect::<ExecutorResult<Vec<_>>>()?;
drop(lifetime_guard);
Ok(results)
})
Expand Down
Loading