Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Sep 17, 2024
1 parent 02d34f3 commit 32d5633
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
24 changes: 15 additions & 9 deletions consensus/src/pipeline/buffer_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use aptos_types::{
aggregate_signature::PartialSignatures,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithMixedSignatures, LedgerInfoWithSignatures},
ledger_info::{
LedgerInfo, LedgerInfoWithMixedSignatures, LedgerInfoWithSignatures, VerificationStatus,
},
};
use futures::future::BoxFuture;
use itertools::zip_eq;
Expand Down Expand Up @@ -103,7 +105,7 @@ fn generate_executed_item_from_ordered(
order_vote_enabled,
));
for (author, sig) in verified_signatures.signatures() {
partial_commit_proof.add_signature(*author, sig.clone(), true);
partial_commit_proof.add_signature(*author, sig.clone(), VerificationStatus::Verified);
}
BufferItem::Executed(Box::new(ExecutedItem {
executed_blocks,
Expand Down Expand Up @@ -463,7 +465,7 @@ impl BufferItem {
pub fn add_signature_if_matched(
&mut self,
vote: CommitVote,
verified: bool,
verification_status: VerificationStatus,
) -> anyhow::Result<()> {
let target_commit_info = vote.commit_info();
let author = vote.author();
Expand All @@ -487,17 +489,21 @@ impl BufferItem {
},
Self::Executed(executed) => {
if executed.commit_info == *target_commit_info {
executed
.partial_commit_proof
.add_signature(author, signature, verified);
executed.partial_commit_proof.add_signature(
author,
signature,
verification_status,
);
return Ok(());
}
},
Self::Signed(signed) => {
if signed.partial_commit_proof.commit_info() == target_commit_info {
signed
.partial_commit_proof
.add_signature(author, signature, verified);
signed.partial_commit_proof.add_signature(
author,
signature,
verification_status,
);
return Ok(());
}
},
Expand Down
22 changes: 14 additions & 8 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::Protoc
use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{
account_address::AccountAddress, epoch_change::EpochChangeProof, epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
account_address::AccountAddress,
epoch_change::EpochChangeProof,
epoch_state::EpochState,
ledger_info::{LedgerInfoWithSignatures, VerificationStatus},
};
use bytes::Bytes;
use futures::{
Expand Down Expand Up @@ -702,7 +704,7 @@ impl BufferManager {
fn process_commit_message(
&mut self,
commit_msg: IncomingCommitRequest,
verified: bool,
verification_status: VerificationStatus,
) -> Option<HashValue> {
let IncomingCommitRequest {
req,
Expand All @@ -721,7 +723,7 @@ impl BufferManager {
.find_elem_by_key(*self.buffer.head_cursor(), target_block_id);
if current_cursor.is_some() {
let mut item = self.buffer.take(&current_cursor);
let new_item = match item.add_signature_if_matched(vote, verified) {
let new_item = match item.add_signature_if_matched(vote, verification_status) {
Ok(()) => {
let response =
ConsensusMsg::CommitMessage(Box::new(CommitMessage::Ack(())));
Expand Down Expand Up @@ -900,14 +902,18 @@ impl BufferManager {
.verifier
.is_malicious_author(&vote.author())
{
let _ = tx.unbounded_send((commit_msg, false));
let _ = tx.unbounded_send((
commit_msg,
VerificationStatus::Unverified,
));
return;
}
}
}
match commit_msg.req.verify(&epoch_state_clone.verifier) {
Ok(_) => {
let _ = tx.unbounded_send((commit_msg, true));
let _ =
tx.unbounded_send((commit_msg, VerificationStatus::Verified));
},
Err(e) => warn!("Invalid commit message: {}", e),
}
Expand Down Expand Up @@ -967,9 +973,9 @@ impl BufferManager {
// see where `need_backpressure()` is called.
self.highest_committed_round = round
},
Some((rpc_request, verified)) = verified_commit_msg_rx.next() => {
Some((rpc_request, verification_status)) = verified_commit_msg_rx.next() => {
monitor!("buffer_manager_process_commit_message",
if let Some(aggregated_block_id) = self.process_commit_message(rpc_request, verified) {
if let Some(aggregated_block_id) = self.process_commit_message(rpc_request, verification_status) {
self.advance_head(aggregated_block_id).await;
if self.execution_root.is_none() {
self.advance_execution_root();
Expand Down

0 comments on commit 32d5633

Please sign in to comment.