Skip to content

Commit

Permalink
[consensus] Downgrade error!() from channel closing in pipeline execu…
Browse files Browse the repository at this point in the history
…tion (#14216) (#14220)
  • Loading branch information
github-actions[bot] authored Aug 6, 2024
1 parent f1acac9 commit 4b757b7
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 35 deletions.
43 changes: 43 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use crate::{
quorum_store,
};
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorError;
use aptos_logger::prelude::{error, warn};
use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_counter,
register_gauge, register_gauge_vec, register_histogram, register_histogram_vec,
Expand Down Expand Up @@ -1012,6 +1015,46 @@ pub static BUFFER_MANAGER_RECEIVED_EXECUTOR_ERROR_COUNT: Lazy<IntCounterVec> = L
.unwrap()
});

/// Count of the executor errors pipeline discarded
pub static PIPELINE_DISCARDED_EXECUTOR_ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_pipeline_discarded_executor_error_count",
"Count of the executor errors pipeline discarded",
&["error_type"],
)
.unwrap()
});

pub fn log_executor_error_occurred(
e: ExecutorError,
counter: &Lazy<IntCounterVec>,
block_id: HashValue,
) {
match e {
ExecutorError::CouldNotGetData => {
counter.with_label_values(&["CouldNotGetData"]).inc();
warn!(
block_id = block_id,
"Execution error - CouldNotGetData {}", block_id
);
},
ExecutorError::BlockNotFound(block_id) => {
counter.with_label_values(&["BlockNotFound"]).inc();
warn!(
block_id = block_id,
"Execution error BlockNotFound {}", block_id
);
},
e => {
counter.with_label_values(&["UnexpectedError"]).inc();
error!(
block_id = block_id,
"Execution error {:?} for {}", e, block_id
);
},
}
}

const PROPSER_ELECTION_DURATION_BUCKETS: [f64; 17] = [
0.001, 0.002, 0.003, 0.004, 0.006, 0.008, 0.01, 0.012, 0.014, 0.0175, 0.02, 0.025, 0.05, 0.25,
0.5, 1.0, 2.0,
Expand Down
42 changes: 27 additions & 15 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
block_preparer::BlockPreparer,
counters::{self, log_executor_error_occurred},
monitor,
state_computer::{PipelineExecutionResult, StateComputeResultFut},
};
Expand All @@ -15,7 +16,7 @@ use aptos_executor_types::{
ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, error};
use aptos_logger::{debug, warn};
use aptos_types::{
block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock},
block_metadata_ext::BlockMetadataExt,
Expand Down Expand Up @@ -115,14 +116,9 @@ impl ExecutionPipeline {
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer.prepare_block(&block).await;
if let Err(e) = input_txns {
result_tx.send(Err(e)).unwrap_or_else(|err| {
error!(
block_id = block.id(),
"Failed to send back execution result for block {}: {:?}.",
block.id(),
err,
);
});
result_tx
.send(Err(e))
.unwrap_or_else(|value| process_failed_to_send_result(value, block.id()));
return;
}
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
Expand Down Expand Up @@ -246,12 +242,9 @@ impl ExecutionPipeline {
let pipe_line_res = res.map(|(output, execution_duration)| {
PipelineExecutionResult::new(input_txns, output, execution_duration)
});
result_tx.send(pipe_line_res).unwrap_or_else(|err| {
error!(
block_id = block_id,
"Failed to send back execution result for block {}: {:?}", block_id, err,
);
});
result_tx
.send(pipe_line_res)
.unwrap_or_else(|value| process_failed_to_send_result(value, block_id));
}
debug!("ledger_apply stage quitting.");
}
Expand Down Expand Up @@ -282,3 +275,22 @@ struct LedgerApplyCommand {
state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
}

fn process_failed_to_send_result(
value: Result<PipelineExecutionResult, ExecutorError>,
block_id: HashValue,
) {
warn!(
block_id = block_id,
is_err = value.is_err(),
"Failed to send back execution result",
);
if let Err(e) = value {
// receive channel discarding error, log for debugging.
log_executor_error_occurred(
e,
&counters::PIPELINE_DISCARDED_EXECUTOR_ERROR_COUNT,
block_id,
);
}
}
27 changes: 7 additions & 20 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
consensus_observer::{
network_message::ConsensusObserverMessage, publisher::ConsensusPublisher,
},
counters, monitor,
counters::{self, log_executor_error_occurred},
monitor,
network::{IncomingCommitRequest, NetworkSender},
network_interface::ConsensusMsg,
pipeline::{
Expand All @@ -26,7 +27,6 @@ use aptos_bounded_executor::BoundedExecutor;
use aptos_config::config::ConsensusObserverConfig;
use aptos_consensus_types::{common::Author, pipelined_block::PipelinedBlock};
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorError;
use aptos_logger::prelude::*;
use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::ProtocolId};
use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast};
Expand Down Expand Up @@ -500,25 +500,12 @@ impl BufferManager {

let executed_blocks = match inner {
Ok(result) => result,
Err(ExecutorError::CouldNotGetData) => {
counters::BUFFER_MANAGER_RECEIVED_EXECUTOR_ERROR_COUNT
.with_label_values(&["CouldNotGetData"])
.inc();
warn!("Execution error - CouldNotGetData {}", block_id);
return;
},
Err(ExecutorError::BlockNotFound(block_id)) => {
counters::BUFFER_MANAGER_RECEIVED_EXECUTOR_ERROR_COUNT
.with_label_values(&["BlockNotFound"])
.inc();
warn!("Execution error BlockNotFound {}", block_id);
return;
},
Err(e) => {
counters::BUFFER_MANAGER_RECEIVED_EXECUTOR_ERROR_COUNT
.with_label_values(&["UnexpectedError"])
.inc();
error!("Execution error {:?} for {}", e, block_id);
log_executor_error_occurred(
e,
&counters::BUFFER_MANAGER_RECEIVED_EXECUTOR_ERROR_COUNT,
block_id,
);
return;
},
};
Expand Down

0 comments on commit 4b757b7

Please sign in to comment.