Skip to content

Commit

Permalink
[consensus] enable round timeout message (#14914)
Browse files Browse the repository at this point in the history
* [consensus] enable round timeout message

* [consensus] counters to track round timeout reason
  • Loading branch information
ibalajiarun authored Oct 16, 2024
1 parent d2eaddb commit 09427b2
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl Default for ConsensusConfig {
enable_pre_commit: true,
max_pending_rounds_in_commit_vote_cache: 100,
optimistic_sig_verification: false,
enable_round_timeout_msg: false,
enable_round_timeout_msg: true,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ aptos-runtimes = { workspace = true }
aptos-safety-rules = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-secure-storage = { workspace = true }
aptos-short-hex-str = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-temppath = { workspace = true }
aptos-time-service = { workspace = true }
Expand Down
20 changes: 20 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,26 @@ pub static TIMEOUT_ROUNDS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

/// Count of the round timeout by reason and by whether the aggregator is the next proposer.
pub static AGGREGATED_ROUND_TIMEOUT_REASON: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_agg_round_timeout_reason",
"Count of round timeouts by reason",
&["reason", "is_next_proposer"],
)
.unwrap()
});

/// Count of the missing authors if any reported in the round timeout reason
pub static AGGREGATED_ROUND_TIMEOUT_REASON_MISSING_AUTHORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_agg_round_timeout_reason_missing_authors",
"Count of missing authors in round timeout reason",
&["author"],
)
.unwrap()
});

/// Count the number of timeouts a node experienced since last restart (close to 0 in happy path).
/// This count is different from `TIMEOUT_ROUNDS_COUNT`, because not every time a node has
/// a timeout there is an ultimate decision to move to the next round (it might take multiple
Expand Down
28 changes: 23 additions & 5 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use aptos_logger::prelude::*;
#[cfg(test)]
use aptos_safety_rules::ConsensusState;
use aptos_safety_rules::TSafetyRules;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::{
block_info::BlockInfo,
epoch_state::EpochState,
Expand Down Expand Up @@ -354,14 +355,34 @@ impl RoundManager {
&mut self,
new_round_event: NewRoundEvent,
) -> anyhow::Result<()> {
let is_current_proposer = self
.proposer_election
.is_valid_proposer(self.proposal_generator.author(), new_round_event.round);

counters::CURRENT_ROUND.set(new_round_event.round as i64);
counters::ROUND_TIMEOUT_MS.set(new_round_event.timeout.as_millis() as i64);
match new_round_event.reason {
NewRoundReason::QCReady => {
counters::QC_ROUNDS_COUNT.inc();
},
NewRoundReason::Timeout(_) => {
NewRoundReason::Timeout(ref reason) => {
counters::TIMEOUT_ROUNDS_COUNT.inc();
counters::AGGREGATED_ROUND_TIMEOUT_REASON
.with_label_values(&[&reason.to_string(), &is_current_proposer.to_string()])
.inc();
if is_current_proposer {
if let RoundTimeoutReason::PayloadUnavailable { missing_authors } = reason {
let ordered_peers =
self.epoch_state.verifier.get_ordered_account_addresses();
for idx in missing_authors.iter_ones() {
if let Some(author) = ordered_peers.get(idx) {
counters::AGGREGATED_ROUND_TIMEOUT_REASON_MISSING_AUTHORS
.with_label_values(&[author.short_str().as_str()])
.inc();
}
}
}
}
},
};
info!(
Expand All @@ -374,10 +395,7 @@ impl RoundManager {
self.proposal_status_tracker
.push(new_round_event.reason.clone());

if self
.proposer_election
.is_valid_proposer(self.proposal_generator.author(), new_round_event.round)
{
if is_current_proposer {
let epoch_state = self.epoch_state.clone();
let network = self.network.clone();
let sync_info = self.block_store.sync_info();
Expand Down
76 changes: 71 additions & 5 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,14 @@ impl NodeSetup {
}
}

fn config_with_round_timeout_msg_disabled() -> ConsensusConfig {
// Disable RoundTimeoutMsg to unless expliclity enabled.
ConsensusConfig {
enable_round_timeout_msg: false,
..Default::default()
}
}

fn start_replying_to_block_retreival(nodes: Vec<NodeSetup>) -> ReplyingRPCHandle {
let done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
Expand Down Expand Up @@ -966,7 +974,7 @@ fn sync_info_carried_on_timeout_vote() {
1,
None,
None,
None,
Some(config_with_round_timeout_msg_disabled()),
None,
None,
);
Expand Down Expand Up @@ -1470,7 +1478,7 @@ fn nil_vote_on_timeout() {
1,
None,
None,
None,
Some(config_with_round_timeout_msg_disabled()),
None,
None,
);
Expand Down Expand Up @@ -1553,7 +1561,7 @@ fn vote_resent_on_timeout() {
1,
None,
None,
None,
Some(config_with_round_timeout_msg_disabled()),
None,
None,
);
Expand Down Expand Up @@ -1706,7 +1714,7 @@ fn safety_rules_crash() {
1,
None,
None,
None,
Some(config_with_round_timeout_msg_disabled()),
None,
None,
);
Expand Down Expand Up @@ -1772,7 +1780,7 @@ fn echo_timeout() {
4,
None,
None,
None,
Some(config_with_round_timeout_msg_disabled()),
None,
None,
);
Expand Down Expand Up @@ -1825,6 +1833,64 @@ fn echo_timeout() {
});
}

#[test]
fn echo_round_timeout_msg() {
let runtime = consensus_runtime();
let mut playground = NetworkPlayground::new(runtime.handle().clone());
let mut nodes = NodeSetup::create_nodes(
&mut playground,
runtime.handle().clone(),
4,
None,
None,
None,
None,
None,
);
runtime.spawn(playground.start());
timed_block_on(&runtime, async {
// clear the message queue
for node in &mut nodes {
node.next_proposal().await;
}
// timeout 3 nodes
for node in &mut nodes[1..] {
node.round_manager
.process_local_timeout(1)
.await
.unwrap_err();
}
let node_0 = &mut nodes[0];
// node 0 doesn't timeout and should echo the timeout after 2 timeout message
for i in 0..3 {
let timeout_vote = node_0.next_timeout().await;
let result = node_0
.round_manager
.process_round_timeout_msg(timeout_vote)
.await;
// first and third message should not timeout
if i == 0 || i == 2 {
assert!(result.is_ok());
}
if i == 1 {
// timeout is an Error
assert!(result.is_err());
}
}

let node_1 = &mut nodes[1];
// it receives 4 timeout messages (1 from each) and doesn't echo since it already timeout
for _ in 0..4 {
let timeout_vote = node_1.next_timeout().await;
node_1
.round_manager
.process_round_timeout_msg(timeout_vote)
.await
.unwrap();
}
});
}

#[test]
fn no_next_test() {
let runtime = consensus_runtime();
Expand Down

0 comments on commit 09427b2

Please sign in to comment.