diff --git a/consensus/src/pipeline/buffer_item.rs b/consensus/src/pipeline/buffer_item.rs index a4e03fb58b7cd..dbe111ca59ca9 100644 --- a/consensus/src/pipeline/buffer_item.rs +++ b/consensus/src/pipeline/buffer_item.rs @@ -19,7 +19,9 @@ use aptos_types::{ aggregate_signature::PartialSignatures, block_info::BlockInfo, epoch_state::EpochState, - ledger_info::{LedgerInfo, LedgerInfoWithMixedSignatures, LedgerInfoWithSignatures}, + ledger_info::{ + LedgerInfo, LedgerInfoWithMixedSignatures, LedgerInfoWithSignatures, VerificationStatus, + }, }; use futures::future::BoxFuture; use itertools::zip_eq; @@ -103,7 +105,7 @@ fn generate_executed_item_from_ordered( order_vote_enabled, )); for (author, sig) in verified_signatures.signatures() { - partial_commit_proof.add_signature(*author, sig.clone(), true); + partial_commit_proof.add_signature(*author, sig.clone(), VerificationStatus::Verified); } BufferItem::Executed(Box::new(ExecutedItem { executed_blocks, @@ -463,7 +465,7 @@ impl BufferItem { pub fn add_signature_if_matched( &mut self, vote: CommitVote, - verified: bool, + verification_status: VerificationStatus, ) -> anyhow::Result<()> { let target_commit_info = vote.commit_info(); let author = vote.author(); @@ -487,17 +489,21 @@ impl BufferItem { }, Self::Executed(executed) => { if executed.commit_info == *target_commit_info { - executed - .partial_commit_proof - .add_signature(author, signature, verified); + executed.partial_commit_proof.add_signature( + author, + signature, + verification_status, + ); return Ok(()); } }, Self::Signed(signed) => { if signed.partial_commit_proof.commit_info() == target_commit_info { - signed - .partial_commit_proof - .add_signature(author, signature, verified); + signed.partial_commit_proof.add_signature( + author, + signature, + verification_status, + ); return Ok(()); } }, diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index 125d761d76699..b84f7ec137e95 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -37,8 +37,10 @@ use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::Protoc use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast}; use aptos_time_service::TimeService; use aptos_types::{ - account_address::AccountAddress, epoch_change::EpochChangeProof, epoch_state::EpochState, - ledger_info::LedgerInfoWithSignatures, + account_address::AccountAddress, + epoch_change::EpochChangeProof, + epoch_state::EpochState, + ledger_info::{LedgerInfoWithSignatures, VerificationStatus}, }; use bytes::Bytes; use futures::{ @@ -702,7 +704,7 @@ impl BufferManager { fn process_commit_message( &mut self, commit_msg: IncomingCommitRequest, - verified: bool, + verification_status: VerificationStatus, ) -> Option { let IncomingCommitRequest { req, @@ -721,7 +723,7 @@ impl BufferManager { .find_elem_by_key(*self.buffer.head_cursor(), target_block_id); if current_cursor.is_some() { let mut item = self.buffer.take(¤t_cursor); - let new_item = match item.add_signature_if_matched(vote, verified) { + let new_item = match item.add_signature_if_matched(vote, verification_status) { Ok(()) => { let response = ConsensusMsg::CommitMessage(Box::new(CommitMessage::Ack(()))); @@ -900,14 +902,18 @@ impl BufferManager { .verifier .is_malicious_author(&vote.author()) { - let _ = tx.unbounded_send((commit_msg, false)); + let _ = tx.unbounded_send(( + commit_msg, + VerificationStatus::Unverified, + )); return; } } } match commit_msg.req.verify(&epoch_state_clone.verifier) { Ok(_) => { - let _ = tx.unbounded_send((commit_msg, true)); + let _ = + tx.unbounded_send((commit_msg, VerificationStatus::Verified)); }, Err(e) => warn!("Invalid commit message: {}", e), } @@ -967,9 +973,9 @@ impl BufferManager { // see where `need_backpressure()` is called. self.highest_committed_round = round }, - Some((rpc_request, verified)) = verified_commit_msg_rx.next() => { + Some((rpc_request, verification_status)) = verified_commit_msg_rx.next() => { monitor!("buffer_manager_process_commit_message", - if let Some(aggregated_block_id) = self.process_commit_message(rpc_request, verified) { + if let Some(aggregated_block_id) = self.process_commit_message(rpc_request, verification_status) { self.advance_head(aggregated_block_id).await; if self.execution_root.is_none() { self.advance_execution_root();