From c4c22c4f9a72f017d503822237d169c7daa9eff3 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Mon, 1 Jul 2024 18:43:23 -0400 Subject: [PATCH] [Consensus Observer] Add message verification to ordered blocks. --- .../src/config/consensus_observer_config.rs | 3 + .../src/consensus_observer/network_message.rs | 166 +++- consensus/src/consensus_observer/observer.rs | 153 ++-- .../src/consensus_observer/pending_blocks.rs | 752 ++++++++++++++---- consensus/src/consensus_observer/publisher.rs | 8 +- consensus/src/pipeline/buffer_manager.rs | 9 +- 6 files changed, 841 insertions(+), 250 deletions(-) diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs index 091b22a8105f2..b1ec36a2fe3ce 100644 --- a/config/src/config/consensus_observer_config.rs +++ b/config/src/config/consensus_observer_config.rs @@ -30,6 +30,8 @@ pub struct ConsensusObserverConfig { /// Interval (in milliseconds) to garbage collect peer state pub garbage_collection_interval_ms: u64, + /// Maximum number of pending blocks to keep in memory + pub max_num_pending_blocks: u64, /// Maximum timeout (in milliseconds) for active subscriptions pub max_subscription_timeout_ms: u64, /// Maximum timeout (in milliseconds) we'll wait for the synced version to @@ -50,6 +52,7 @@ impl Default for ConsensusObserverConfig { max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs network_request_timeout_ms: 10_000, // 10 seconds garbage_collection_interval_ms: 60_000, // 60 seconds + max_num_pending_blocks: 100, // 100 blocks max_subscription_timeout_ms: 30_000, // 30 seconds max_synced_version_timeout_ms: 60_000, // 60 seconds peer_optimality_check_interval_ms: 60_000, // 60 seconds diff --git a/consensus/src/consensus_observer/network_message.rs b/consensus/src/consensus_observer/network_message.rs index f50d09d8576d4..7490c56cb06ac 100644 --- a/consensus/src/consensus_observer/network_message.rs +++ b/consensus/src/consensus_observer/network_message.rs @@ -1,11 +1,14 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_consensus_types::{ - pipeline::commit_decision::CommitDecision, pipelined_block::PipelinedBlock, -}; +use crate::consensus_observer::error::Error; +use aptos_consensus_types::pipelined_block::PipelinedBlock; use aptos_types::{ - block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures, transaction::SignedTransaction, + block_info::{BlockInfo, Round}, + epoch_change::Verifier, + epoch_state::EpochState, + ledger_info::LedgerInfoWithSignatures, + transaction::SignedTransaction, }; use serde::{Deserialize, Serialize}; use std::{ @@ -35,9 +38,9 @@ impl ConsensusObserverMessage { /// Creates and returns a new commit decision message using the given commit decision pub fn new_commit_decision_message( - commit_decision: CommitDecision, + commit_proof: LedgerInfoWithSignatures, ) -> ConsensusObserverDirectSend { - ConsensusObserverDirectSend::CommitDecision(commit_decision) + ConsensusObserverDirectSend::CommitDecision(CommitDecision { commit_proof }) } /// Creates and returns a new block payload message using the given block, transactions and limit @@ -140,16 +143,10 @@ impl ConsensusObserverDirectSend { pub fn get_content(&self) -> String { match self { ConsensusObserverDirectSend::OrderedBlock(ordered_block) => { - format!( - "OrderedBlock: {}", - ordered_block.ordered_proof.commit_info() - ) + format!("OrderedBlock: {}", ordered_block.proof_block_info()) }, ConsensusObserverDirectSend::CommitDecision(commit_decision) => { - format!( - "CommitDecision: {}", - commit_decision.ledger_info().commit_info() - ) + format!("CommitDecision: {}", commit_decision.proof_block_info()) }, ConsensusObserverDirectSend::BlockPayload(block_payload) => { format!( @@ -166,8 +163,145 @@ impl ConsensusObserverDirectSend { /// OrderedBlock message contains the ordered blocks and the proof of the ordering #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct OrderedBlock { - pub blocks: Vec>, - pub ordered_proof: LedgerInfoWithSignatures, + blocks: Vec>, + ordered_proof: LedgerInfoWithSignatures, +} + +impl OrderedBlock { + pub fn new(blocks: Vec>, ordered_proof: LedgerInfoWithSignatures) -> Self { + Self { + blocks, + ordered_proof, + } + } + + /// Returns a reference to the ordered blocks + pub fn blocks(&self) -> &Vec> { + &self.blocks + } + + /// Returns a copy of the first ordered block + pub fn first_block(&self) -> Arc { + self.blocks + .first() + .cloned() + .expect("At least one block is expected!") + } + + /// Returns a copy of the last ordered block + pub fn last_block(&self) -> Arc { + self.blocks + .last() + .cloned() + .expect("At least one block is expected!") + } + + /// Returns a reference to the ordered proof + pub fn ordered_proof(&self) -> &LedgerInfoWithSignatures { + &self.ordered_proof + } + + /// Returns a reference to the ordered proof block info + pub fn proof_block_info(&self) -> &BlockInfo { + self.ordered_proof.commit_info() + } + + /// Verifies the ordered blocks and returns an error if the data is invalid. + /// Note: this does not check the ordered proof. + pub fn verify_ordered_blocks(&self) -> Result<(), Error> { + // Verify that we have at least one ordered block + if self.blocks.is_empty() { + return Err(Error::InvalidMessageError( + "Received empty ordered block!".to_string(), + )); + } + + // Verify the last block ID matches the ordered proof block ID + if self.last_block().id() != self.proof_block_info().id() { + return Err(Error::InvalidMessageError( + format!( + "Last ordered block ID does not match the ordered proof ID! Number of blocks: {:?}, Last ordered block ID: {:?}, Ordered proof ID: {:?}", + self.blocks.len(), + self.last_block().id(), + self.proof_block_info().id() + ) + )); + } + + // Verify the blocks are correctly chained together (from the last block to the first) + let mut expected_parent_id = None; + for block in self.blocks.iter().rev() { + if let Some(expected_parent_id) = expected_parent_id { + if block.id() != expected_parent_id { + return Err(Error::InvalidMessageError( + format!( + "Block parent ID does not match the expected parent ID! Block ID: {:?}, Expected parent ID: {:?}", + block.id(), + expected_parent_id + ) + )); + } + } + + expected_parent_id = Some(block.parent_id()); + } + + Ok(()) + } + + /// Verifies the ordered proof and returns an error if the proof is invalid + pub fn verify_ordered_proof(&self, epoch_state: &EpochState) -> Result<(), Error> { + epoch_state.verify(&self.ordered_proof).map_err(|error| { + Error::InvalidMessageError(format!( + "Failed to verify ordered proof ledger info: {:?}, Error: {:?}", + self.proof_block_info(), + error + )) + }) + } +} + +/// CommitDecision message contains the commit decision proof +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct CommitDecision { + commit_proof: LedgerInfoWithSignatures, +} + +impl CommitDecision { + pub fn new(commit_proof: LedgerInfoWithSignatures) -> Self { + Self { commit_proof } + } + + /// Returns a reference to the commit proof + pub fn commit_proof(&self) -> &LedgerInfoWithSignatures { + &self.commit_proof + } + + /// Returns the epoch of the commit proof + pub fn epoch(&self) -> u64 { + self.commit_proof.ledger_info().epoch() + } + + /// Returns a reference to the commit proof block info + pub fn proof_block_info(&self) -> &BlockInfo { + self.commit_proof.commit_info() + } + + /// Returns the round of the commit proof + pub fn round(&self) -> Round { + self.commit_proof.ledger_info().round() + } + + /// Verifies the commit proof and returns an error if the proof is invalid + pub fn verify_commit_proof(&self, epoch_state: &EpochState) -> Result<(), Error> { + epoch_state.verify(&self.commit_proof).map_err(|error| { + Error::InvalidMessageError(format!( + "Failed to verify commit proof ledger info: {:?}, Error: {:?}", + self.proof_block_info(), + error + )) + }) + } } /// Payload message contains the block, transactions and the limit of the block diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs index 2273c056e5a49..0bc584b38dd0b 100644 --- a/consensus/src/consensus_observer/observer.rs +++ b/consensus/src/consensus_observer/observer.rs @@ -9,7 +9,7 @@ use crate::{ network_client::ConsensusObserverClient, network_events::{ConsensusObserverNetworkEvents, NetworkMessage, ResponseSender}, network_message::{ - BlockPayload, ConsensusObserverDirectSend, ConsensusObserverMessage, + BlockPayload, CommitDecision, ConsensusObserverDirectSend, ConsensusObserverMessage, ConsensusObserverRequest, ConsensusObserverResponse, OrderedBlock, }, payload_store::BlockPayloadStore, @@ -27,9 +27,7 @@ use crate::{ }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; -use aptos_consensus_types::{ - pipeline::commit_decision::CommitDecision, pipelined_block::PipelinedBlock, -}; +use aptos_consensus_types::pipeline; use aptos_crypto::{bls12381, Genesis}; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_infallible::Mutex; @@ -43,7 +41,6 @@ use aptos_storage_interface::DbReader; use aptos_time_service::TimeService; use aptos_types::{ block_info::{BlockInfo, Round}, - epoch_change::Verifier, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, on_chain_config::{ @@ -122,7 +119,7 @@ impl ConsensusObserver { consensus_observer_client, epoch_state: None, root: Arc::new(Mutex::new(root)), - pending_ordered_blocks: PendingOrderedBlocks::new(), + pending_ordered_blocks: PendingOrderedBlocks::new(consensus_observer_config), execution_client, block_payload_store: BlockPayloadStore::new(), sync_handle: None, @@ -356,14 +353,14 @@ impl ConsensusObserver { } /// Finalizes the ordered block by sending it to the execution pipeline - async fn finalize_ordered_block( - &mut self, - blocks: &[Arc], - ordered_proof: LedgerInfoWithSignatures, - ) { + async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) { if let Err(error) = self .execution_client - .finalize_order(blocks, ordered_proof, self.create_commit_callback()) + .finalize_order( + ordered_block.blocks(), + ordered_block.ordered_proof().clone(), + self.create_commit_callback(), + ) .await { error!( @@ -376,11 +373,13 @@ impl ConsensusObserver { } /// Forwards the commit decision to the execution pipeline - fn forward_commit_decision(&self, decision: CommitDecision) { + fn forward_commit_decision(&self, commit_decision: CommitDecision) { // Create a dummy RPC message let (response_sender, _response_receiver) = oneshot::channel(); let commit_request = IncomingCommitRequest { - req: CommitMessage::Decision(decision), + req: CommitMessage::Decision(pipeline::commit_decision::CommitDecision::new( + commit_decision.commit_proof().clone(), + )), protocol: ProtocolId::ConsensusDirectSendCompressed, response_sender, }; @@ -454,14 +453,15 @@ impl ConsensusObserver { /// Processes the commit decision fn process_commit_decision(&mut self, commit_decision: CommitDecision) { // If the commit decision is for the current epoch, verify it + let epoch_state = self.get_epoch_state(); let commit_decision_epoch = commit_decision.epoch(); - if commit_decision_epoch == self.get_epoch_state().epoch { + if commit_decision_epoch == epoch_state.epoch { // Verify the commit decision - if let Err(error) = self.verify_commit_decision(&commit_decision) { + if let Err(error) = commit_decision.verify_commit_proof(&epoch_state) { error!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Failed to verify commit decision! Ignoring: {:?}, Error: {:?}", - commit_decision.ledger_info().commit_info(), + commit_decision.proof_block_info(), error )) ); @@ -486,13 +486,14 @@ impl ConsensusObserver { info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Started syncing to {}!", - commit_decision.ledger_info().commit_info() + commit_decision.proof_block_info() )) ); - // Update the root and clear the pending blocks - *self.root.lock() = commit_decision.ledger_info().clone(); - self.pending_ordered_blocks.clear_all_pending_blocks(); + // Update the root and clear the pending blocks (up to the commit) + *self.root.lock() = commit_decision.commit_proof().clone(); + self.pending_ordered_blocks + .remove_blocks_for_commit(commit_decision.commit_proof()); // Start the state sync process let abort_handle = sync_to_commit_decision( @@ -506,22 +507,26 @@ impl ConsensusObserver { } } - /// Processes the commit decision for the pending block and returns iff - /// the commit decision was successfully processed. + /// Processes the commit decision for the pending block and returns true iff + /// the commit decision was successfully processed. Note: this function + /// assumes the commit decision has already been verified. fn process_commit_decision_for_pending_block(&self, commit_decision: &CommitDecision) -> bool { + // Get the pending block for the commit decision let pending_block = self .pending_ordered_blocks - .get_pending_block(commit_decision.round()); + .get_verified_pending_block(commit_decision.epoch(), commit_decision.round()); + + // Process the pending block if let Some(pending_block) = pending_block { // If the payload exists, add the commit decision to the pending blocks if self .block_payload_store - .all_payloads_exist(&pending_block.blocks) + .all_payloads_exist(pending_block.blocks()) { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Adding decision to pending block: {}", - commit_decision.ledger_info().commit_info() + commit_decision.proof_block_info() )) ); self.pending_ordered_blocks @@ -532,7 +537,7 @@ impl ConsensusObserver { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Forwarding commit decision to the execution pipeline: {}", - commit_decision.ledger_info().commit_info() + commit_decision.proof_block_info() )) ); self.forward_commit_decision(commit_decision.clone()); @@ -591,7 +596,7 @@ impl ConsensusObserver { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Received ordered block: {}, from peer: {}!", - ordered_block.ordered_proof.commit_info(), + ordered_block.proof_block_info(), peer_network_id )) ); @@ -601,7 +606,7 @@ impl ConsensusObserver { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Received commit decision: {}, from peer: {}!", - commit_decision.ledger_info().commit_info(), + commit_decision.proof_block_info(), peer_network_id )) ); @@ -621,48 +626,62 @@ impl ConsensusObserver { /// Processes the ordered block async fn process_ordered_block(&mut self, ordered_block: OrderedBlock) { - // Unpack the ordered block - let OrderedBlock { - blocks, - ordered_proof, - } = ordered_block.clone(); - - // Verify that we have at least one ordered block - if blocks.is_empty() { - warn!( + // Verify the ordered blocks before processing + if let Err(error) = ordered_block.verify_ordered_blocks() { + error!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Received empty ordered block! Ignoring: {:?}", - ordered_proof.commit_info() + "Failed to verify ordered blocks! Ignoring: {:?}, Error: {:?}", + ordered_block.proof_block_info(), + error )) ); return; - } + }; - // TODO: verify the ordered block! + // If the ordered block is for the current epoch, verify the proof + let epoch_state = self.get_epoch_state(); + let verified_ordered_proof = + if ordered_block.proof_block_info().epoch() == epoch_state.epoch { + // Verify the ordered proof + if let Err(error) = ordered_block.verify_ordered_proof(&epoch_state) { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to verify ordered proof! Ignoring: {:?}, Error: {:?}", + ordered_block.proof_block_info(), + error + )) + ); + return; + } + + true // We have successfully verified the proof + } else { + false // We can't verify the proof yet + }; // If the block is a child of our last block, we can insert it - if self.get_last_block().id() == blocks.first().unwrap().parent_id() { + if self.get_last_block().id() == ordered_block.first_block().parent_id() { // Insert the ordered block into the pending blocks self.pending_ordered_blocks - .insert_ordered_block(ordered_block); + .insert_ordered_block(ordered_block.clone(), verified_ordered_proof); - // If we are not in sync mode, forward the blocks to the execution pipeline - if self.sync_handle.is_none() { + // If we verified the proof, and we're not in sync mode, finalize the ordered blocks + if verified_ordered_proof && self.sync_handle.is_none() { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Forwarding blocks to the execution pipeline: {}", - ordered_proof.commit_info() + ordered_block.proof_block_info() )) ); // Finalize the ordered block - self.finalize_ordered_block(&blocks, ordered_proof).await; + self.finalize_ordered_block(ordered_block).await; } } else { warn!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Parent block is missing! Ignoring: {:?}", - ordered_proof.commit_info() + ordered_block.proof_block_info() )) ); } @@ -721,26 +740,27 @@ impl ConsensusObserver { } // If the epoch has changed, end the current epoch and start the new one - if epoch > self.get_epoch_state().epoch { + let current_epoch_state = self.get_epoch_state(); + if epoch > current_epoch_state.epoch { + // Wait for the next epoch to start self.execution_client.end_epoch().await; self.wait_for_epoch_start().await; + + // Verify the pending blocks for the new epoch + self.pending_ordered_blocks + .verify_pending_blocks(¤t_epoch_state); } // Reset and drop the sync handle self.sync_handle = None; // Process all the pending blocks. These were all buffered during the state sync process. - for (_, (ordered_block, commit_decision)) in - self.pending_ordered_blocks.get_all_pending_blocks() + for (_, (ordered_block, commit_decision)) in self + .pending_ordered_blocks + .get_all_verified_pending_blocks() { - // Unpack the ordered block - let OrderedBlock { - blocks, - ordered_proof, - } = ordered_block; - // Finalize the ordered block - self.finalize_ordered_block(&blocks, ordered_proof).await; + self.finalize_ordered_block(ordered_block).await; // If a commit decision is available, forward it to the execution pipeline if let Some(commit_decision) = commit_decision { @@ -868,19 +888,6 @@ impl ConsensusObserver { ); } - /// Verifies the commit decision and returns an error if the decision is invalid - fn verify_commit_decision(&self, commit_decision: &CommitDecision) -> Result<(), Error> { - self.get_epoch_state() - .verify(commit_decision.ledger_info()) - .map_err(|error| { - Error::InvalidMessageError(format!( - "Failed to verify the commit decision ledger info: {:?}, Error: {:?}", - commit_decision.ledger_info().commit_info(), - error - )) - }) - } - /// Waits for a new epoch to start async fn wait_for_epoch_start(&mut self) { // Extract the epoch state and on-chain configs @@ -1149,7 +1156,7 @@ fn sync_to_commit_decision( // Sync to the commit decision if let Err(error) = execution_client .clone() - .sync_to(commit_decision.ledger_info().clone()) + .sync_to(commit_decision.commit_proof().clone()) .await { warn!( diff --git a/consensus/src/consensus_observer/pending_blocks.rs b/consensus/src/consensus_observer/pending_blocks.rs index a5a8eba1b8847..e2b05c4e8d5af 100644 --- a/consensus/src/consensus_observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/pending_blocks.rs @@ -3,100 +3,202 @@ use crate::consensus_observer::{ logging::{LogEntry, LogSchema}, - network_message::OrderedBlock, + network_message::{CommitDecision, OrderedBlock}, }; -use aptos_consensus_types::{common::Round, pipeline::commit_decision::CommitDecision}; +use aptos_config::config::ConsensusObserverConfig; +use aptos_consensus_types::common::Round; use aptos_infallible::Mutex; -use aptos_logger::debug; -use aptos_types::{block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures}; +use aptos_logger::{debug, error, warn}; +use aptos_types::{ + block_info::BlockInfo, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, +}; use std::{collections::BTreeMap, sync::Arc}; /// A simple struct to store the block payloads of ordered and committed blocks #[derive(Clone)] pub struct PendingOrderedBlocks { - // Pending ordered blocks (indexed by consensus round) - pending_ordered_blocks: Arc)>>>, + // The configuration of the consensus observer + consensus_observer_config: ConsensusObserverConfig, + + // Verified and unverified pending ordered blocks. The key is the epoch and + // round of the last block in the ordered block. Each entry contains the + // block, if the block was verified, and the commit decision (if any). + pending_blocks: + Arc)>>>, } impl PendingOrderedBlocks { - pub fn new() -> Self { + pub fn new(consensus_observer_config: ConsensusObserverConfig) -> Self { Self { - pending_ordered_blocks: Arc::new(Mutex::new(BTreeMap::new())), + consensus_observer_config, + pending_blocks: Arc::new(Mutex::new(BTreeMap::new())), } } - /// Clears all pending blocks - pub fn clear_all_pending_blocks(&self) { - self.pending_ordered_blocks.lock().clear(); - } - - /// Returns a copy of the pending ordered blocks map - pub fn get_all_pending_blocks( + /// Returns a copy of the verified pending blocks + pub fn get_all_verified_pending_blocks( &self, - ) -> BTreeMap)> { - self.pending_ordered_blocks.lock().clone() + ) -> BTreeMap<(u64, Round), (OrderedBlock, Option)> { + let mut verified_pending_blocks = BTreeMap::new(); + for (key, (ordered_block, verified_ordered_proof, commit_decision)) in + self.pending_blocks.lock().iter() + { + if *verified_ordered_proof { + verified_pending_blocks + .insert(*key, (ordered_block.clone(), commit_decision.clone())); + } + } + verified_pending_blocks } - /// Returns the pending ordered block (if any) + /// Returns the last pending ordered block (if any). We take into + /// account verified and unverified pending blocks (to ensure we're + /// able to buffer blocks across epoch boundaries). pub fn get_last_pending_block(&self) -> Option { - let pending_ordered_blocks = self.pending_ordered_blocks.lock(); - if let Some((_, (ordered_block, _))) = pending_ordered_blocks.last_key_value() { - Some(ordered_block.blocks.last().unwrap().block_info()) - } else { - None // No pending blocks were found - } + self.pending_blocks + .lock() + .last_key_value() + .map(|(_, (ordered_block, _, _))| ordered_block.last_block().block_info()) } - /// Returns the pending ordered block (if any) - pub fn get_pending_block(&self, round: Round) -> Option { - let pending_ordered_blocks = self.pending_ordered_blocks.lock(); - pending_ordered_blocks - .get(&round) - .map(|(ordered_block, _)| ordered_block.clone()) + /// Returns the verified pending ordered block (if any) + pub fn get_verified_pending_block(&self, epoch: u64, round: Round) -> Option { + self.pending_blocks.lock().get(&(epoch, round)).and_then( + |(ordered_block, verified_ordered_proof, _)| { + if *verified_ordered_proof { + Some(ordered_block.clone()) + } else { + None + } + }, + ) } - /// Inserts the given ordered block into the pending blocks - pub fn insert_ordered_block(&self, ordered_block: OrderedBlock) { + /// Inserts the given ordered block into the pending blocks. This function + /// assumes the block has already been checked to extend the current pending blocks. + pub fn insert_ordered_block(&self, ordered_block: OrderedBlock, verified_ordered_proof: bool) { + // Verify that the number of pending blocks doesn't exceed the maximum + let max_num_pending_blocks = self.consensus_observer_config.max_num_pending_blocks as usize; + if self.pending_blocks.lock().len() >= max_num_pending_blocks { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Exceeded the maximum number of pending blocks: {:?}. Block verification: {:?}, block: {:?}.", + max_num_pending_blocks, + verified_ordered_proof, + ordered_block.proof_block_info() + )) + ); + return; // Drop the block if we've exceeded the maximum + } + + // Otherwise, we can add the block to the pending blocks debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Adding ordered block to the pending blocks: {}", - ordered_block.ordered_proof.commit_info() + "Adding ordered block to the pending blocks: {}. Verified ordered proof: {:?}", + verified_ordered_proof, + ordered_block.proof_block_info() )) ); - // Insert the ordered block into the pending ordered blocks - let last_block_round = ordered_block.blocks.last().unwrap().round(); - self.pending_ordered_blocks - .lock() - .insert(last_block_round, (ordered_block, None)); + // Get the epoch and round of the last ordered block + let last_block = ordered_block.last_block(); + let last_block_epoch = last_block.epoch(); + let last_block_round = last_block.round(); + + // Insert the pending block + self.pending_blocks.lock().insert( + (last_block_epoch, last_block_round), + (ordered_block, verified_ordered_proof, None), + ); } - /// Removes the pending blocks for the given commit ledger info. - /// This will remove all blocks up to (and including) the commit - /// round of the committed ledger info. + /// Removes the pending blocks for the given commit ledger info. This will + /// remove all blocks up to (and including) the epoch and round of the + /// commit. Note: this function must remove both verified and unverified + /// blocks (to support state sync commits). pub fn remove_blocks_for_commit(&self, commit_ledger_info: &LedgerInfoWithSignatures) { - // Determine the round to split off - let split_off_round = commit_ledger_info.commit_info().round() + 1; + // Determine the epoch and round to split off + let split_off_epoch = commit_ledger_info.ledger_info().epoch(); + let split_off_round = commit_ledger_info.commit_info().round().saturating_add(1); - // Remove the pending blocks before the split off round - let mut pending_ordered_blocks = self.pending_ordered_blocks.lock(); - *pending_ordered_blocks = pending_ordered_blocks.split_off(&split_off_round); + // Remove the blocks from the pending ordered blocks + let mut pending_blocks = self.pending_blocks.lock(); + *pending_blocks = pending_blocks.split_off(&(split_off_epoch, split_off_round)); } - /// Updates the commit decision of the pending ordered block (if found) + /// Updates the commit decision of the pending ordered block (if found). + /// This can only be done for verified pending blocks. pub fn update_commit_decision(&self, commit_decision: &CommitDecision) { - let mut pending_ordered_blocks = self.pending_ordered_blocks.lock(); - if let Some((_, existing_commit_decision)) = - pending_ordered_blocks.get_mut(&commit_decision.round()) + // Get the epoch and round of the commit decision + let commit_decision_epoch = commit_decision.epoch(); + let commit_decision_round = commit_decision.round(); + + // Update the commit decision for the verified pending blocks + let mut pending_blocks = self.pending_blocks.lock(); + if let Some((_, verified_ordered_proof, existing_commit_decision)) = + pending_blocks.get_mut(&(commit_decision_epoch, commit_decision_round)) { - *existing_commit_decision = Some(commit_decision.clone()); + if *verified_ordered_proof { + *existing_commit_decision = Some(commit_decision.clone()); + } else { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Attempting to update commit decision for unverified block! Epoch: {:?}, Round: {:?}", + commit_decision_epoch, + commit_decision_round + )) + ); + } } } -} -impl Default for PendingOrderedBlocks { - fn default() -> Self { - Self::new() + /// Verifies the pending blocks against the given epoch state. + /// If verification is successful, blocks are marked as verified. + pub fn verify_pending_blocks(&self, epoch_state: &EpochState) { + // Get the current epoch + let current_epoch = epoch_state.epoch; + + // Go through all the pending blocks and verify them + let mut failed_verification_round = None; + for ((epoch, round), (ordered_block, verified_ordered_proof, _)) in + self.pending_blocks.lock().iter_mut() + { + // Check if we can return early (BtreeMaps are sorted by key) + if *epoch > current_epoch { + return; + } + + // If the block is not verified, attempt to verify it + if *epoch == current_epoch && !(*verified_ordered_proof) { + match ordered_block.verify_ordered_proof(epoch_state) { + Ok(_) => { + // Mark the block as verified + *verified_ordered_proof = true; + }, + Err(error) => { + // Log the verification failure + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to verify ordered block: {}. Error: {:?}", + ordered_block.last_block().block_info(), + error + )) + ); + + // Note the failure and break early + failed_verification_round = Some(*round); + break; + }, + } + } + } + + // If verification failed, remove all blocks after (and including) the failure + if let Some(failed_round) = failed_verification_round { + self.pending_blocks + .lock() + .split_off(&(current_epoch, failed_round)); + } } } @@ -111,144 +213,336 @@ mod test { }; use aptos_crypto::HashValue; use aptos_types::{ - aggregate_signature::AggregateSignature, ledger_info::LedgerInfo, transaction::Version, + aggregate_signature::AggregateSignature, + ledger_info::LedgerInfo, + transaction::Version, + validator_signer::ValidatorSigner, + validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, }; #[test] - pub fn test_clear_pending_blocks() { + pub fn test_get_last_pending_block() { // Create new pending ordered blocks - let pending_ordered_blocks = PendingOrderedBlocks::new(); - - // Insert several pending blocks - let num_pending_blocks = 10; - let pending_blocks = - create_and_add_pending_blocks(&pending_ordered_blocks, num_pending_blocks); - - // Verify the pending blocks were all inserted - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - assert_eq!(all_pending_blocks.len(), num_pending_blocks); - - // Verify the pending blocks were inserted by round - for pending_block in pending_blocks { - // Get the round of the last block in the pending block - let round = pending_block.blocks.last().unwrap().round(); - - // Verify the pending block exists for the round - assert_eq!( - pending_block, - pending_ordered_blocks.get_pending_block(round).unwrap() - ); + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); + + // Verify that we have no last pending block + assert!(pending_ordered_blocks.get_last_pending_block().is_none()); + + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 50; + let verified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Verify the last pending block is the verified block with the highest round + let last_verified_block = verified_blocks.last().unwrap(); + let last_verified_block_info = last_verified_block.last_block().block_info(); + assert_eq!( + last_verified_block_info, + pending_ordered_blocks.get_last_pending_block().unwrap() + ); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 50; + let unverified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); + + // Verify the last pending block is the unverified block with the highest round + let last_unverified_block = unverified_blocks.last().unwrap(); + let last_unverified_block_info = last_unverified_block.last_block().block_info(); + assert_eq!( + last_unverified_block_info, + pending_ordered_blocks.get_last_pending_block().unwrap() + ); + + // Clear the unverified pending blocks + pending_ordered_blocks + .pending_blocks + .lock() + .retain(|_, (_, verified_ordered_proof, _)| *verified_ordered_proof); + + // Verify the last pending block is the verified block with the highest round + assert_eq!( + last_verified_block_info, + pending_ordered_blocks.get_last_pending_block().unwrap() + ); + } + + #[test] + pub fn test_get_verified_pending_block() { + // Create new pending ordered blocks + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); + + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 10; + let verified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Ensure the verified pending blocks were all inserted + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!(all_verified_blocks.len(), num_verified_blocks); + + // Verify the pending blocks can be retrieved + for verified_block in &verified_blocks { + let block_info = verified_block.last_block().block_info(); + let pending_block = pending_ordered_blocks + .get_verified_pending_block(block_info.epoch(), block_info.round()) + .unwrap(); + assert_eq!(verified_block.clone(), pending_block); } - // Clear all pending blocks - pending_ordered_blocks.clear_all_pending_blocks(); + // Verify that a non-existent block cannot be retrieved + let non_existent_block = verified_blocks.last().unwrap(); + let non_existent_block_info = non_existent_block.last_block().block_info(); + let pending_block = pending_ordered_blocks.get_verified_pending_block( + non_existent_block_info.epoch(), + non_existent_block_info.round() + 1, // Request a round that doesn't exist + ); + assert!(pending_block.is_none()); - // Verify all blocks were removed - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - assert_eq!(all_pending_blocks.len(), 0); + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 20; + let unverified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); + + // Ensure the unverified pending blocks cannot be retrieved + for unverified_block in &unverified_blocks { + let block_info = unverified_block.last_block().block_info(); + let pending_block = pending_ordered_blocks + .get_verified_pending_block(block_info.epoch(), block_info.round()); + assert!(pending_block.is_none()); + } } #[test] - pub fn test_get_last_pending_block() { + pub fn test_insert_ordered_block_limit() { + // Create a consensus observer config with a maximum of 10 pending blocks + let max_num_pending_blocks = 10; + let consensus_observer_config = ConsensusObserverConfig { + max_num_pending_blocks: max_num_pending_blocks as u64, + ..ConsensusObserverConfig::default() + }; + // Create new pending ordered blocks - let pending_ordered_blocks = PendingOrderedBlocks::new(); + let pending_ordered_blocks = PendingOrderedBlocks::new(consensus_observer_config); - // Insert several pending blocks - let num_pending_blocks = 100; - let pending_blocks = - create_and_add_pending_blocks(&pending_ordered_blocks, num_pending_blocks); + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = max_num_pending_blocks * 2; // Insert more than the maximum + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); - // Verify the last pending block is the one with the highest round - let last_pending_block = pending_blocks.last().unwrap(); - let last_pending_block_info = last_pending_block.blocks.last().unwrap().block_info(); - assert_eq!( - last_pending_block_info, - pending_ordered_blocks.get_last_pending_block().unwrap() + // Verify the verified pending blocks were inserted up to the maximum + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!(all_verified_blocks.len(), max_num_pending_blocks); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = max_num_pending_blocks - 1; // Insert less than the maximum + let unverified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, ); + + // Verify the unverified pending blocks were not inserted + for unverified_block in &unverified_blocks { + let block_info = unverified_block.last_block().block_info(); + let pending_block = pending_ordered_blocks + .get_verified_pending_block(block_info.epoch(), block_info.round()); + assert!(pending_block.is_none()); + } + + // Verify the pending blocks don't exceed the maximum + let num_pending_blocks = get_num_pending_blocks(&pending_ordered_blocks); + assert_eq!(num_pending_blocks, max_num_pending_blocks); } #[test] pub fn test_remove_blocks_for_commit() { // Create new pending ordered blocks - let pending_ordered_blocks = PendingOrderedBlocks::new(); + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); + + // Insert several verified blocks for the current epoch + let current_epoch = 10; + let num_verified_blocks = 10; + let verified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 20; + let unverified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); + + // Insert additional unverified blocks for a future epoch + let future_epoch = next_epoch + 1; + let num_future_blocks = 30; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_future_blocks, + future_epoch, + false, + ); - // Insert several pending blocks - let num_pending_blocks = 10; - let pending_blocks = - create_and_add_pending_blocks(&pending_ordered_blocks, num_pending_blocks); + // Create a commit decision for the first pending verified block + let first_verified_block = verified_blocks.first().unwrap(); + let first_verified_block_info = first_verified_block.last_block().block_info(); + let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( + LedgerInfo::new(first_verified_block_info.clone(), HashValue::random()), + AggregateSignature::empty(), + )); - // Create a commit decision for the first pending block - let first_pending_block = pending_blocks.first().unwrap(); - let first_pending_block_info = first_pending_block.blocks.last().unwrap().block_info(); + // Remove the pending blocks for the commit decision + pending_ordered_blocks.remove_blocks_for_commit(commit_decision.commit_proof()); + + // Verify the first verified block was removed + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!(all_verified_blocks.len(), num_verified_blocks - 1); + assert!(!all_verified_blocks.contains_key(&( + first_verified_block_info.epoch(), + first_verified_block_info.round() + ))); + + // Create a commit decision for the last pending verified block + let last_verified_block = verified_blocks.last().unwrap(); + let last_verified_block_info = last_verified_block.last_block().block_info(); let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( - LedgerInfo::new(first_pending_block_info.clone(), HashValue::random()), + LedgerInfo::new(last_verified_block_info.clone(), HashValue::random()), AggregateSignature::empty(), )); // Remove the pending blocks for the commit decision - pending_ordered_blocks.remove_blocks_for_commit(commit_decision.ledger_info()); + pending_ordered_blocks.remove_blocks_for_commit(commit_decision.commit_proof()); - // Verify the first pending block was removed - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - assert_eq!(all_pending_blocks.len(), num_pending_blocks - 1); - assert!(!all_pending_blocks.contains_key(&first_pending_block_info.round())); + // Verify all verified pending blocks were removed + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert!(all_verified_blocks.is_empty()); - // Create a commit decision for the last pending block - let last_pending_block = pending_blocks.last().unwrap(); - let last_pending_block_info = last_pending_block.blocks.last().unwrap().block_info(); + // Verify the unverified pending blocks were not removed + let num_pending_blocks = get_num_pending_blocks(&pending_ordered_blocks); + assert_eq!( + num_pending_blocks, + num_unverified_blocks + num_future_blocks + ); + + // Create a commit decision for the last pending unverified block (next epoch) + let last_unverified_block = unverified_blocks.last().unwrap(); + let last_unverified_block_info = last_unverified_block.last_block().block_info(); let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( - LedgerInfo::new(last_pending_block_info.clone(), HashValue::random()), + LedgerInfo::new(last_unverified_block_info.clone(), HashValue::random()), AggregateSignature::empty(), )); // Remove the pending blocks for the commit decision - pending_ordered_blocks.remove_blocks_for_commit(commit_decision.ledger_info()); - - // Verify all the pending blocks were removed - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - assert!(all_pending_blocks.is_empty()); + pending_ordered_blocks.remove_blocks_for_commit(commit_decision.commit_proof()); + + // Verify the unverified pending blocks were removed (next epoch) + let num_pending_blocks = get_num_pending_blocks(&pending_ordered_blocks); + assert_eq!(num_pending_blocks, num_future_blocks); + + // Verify the last unverified block was removed (next epoch) + let pending_blocks = pending_ordered_blocks.pending_blocks.lock(); + assert!(!pending_blocks.contains_key(&( + last_unverified_block_info.epoch(), + last_unverified_block_info.round() + ))); } #[test] pub fn test_update_commit_decision() { // Create new pending ordered blocks - let pending_ordered_blocks = PendingOrderedBlocks::new(); + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); - // Insert several pending blocks - let num_pending_blocks = 10; - let pending_blocks = - create_and_add_pending_blocks(&pending_ordered_blocks, num_pending_blocks); + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 10; + let verified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 30; + let unverified_blocks = create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); - // Verify the pending blocks were all inserted - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - assert_eq!(all_pending_blocks.len(), num_pending_blocks); + // Ensure the verified pending blocks were all inserted + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!(all_verified_blocks.len(), num_verified_blocks); // Verify the pending blocks don't have any commit decisions - for (_, (_, commit_decision)) in all_pending_blocks.iter() { + for (_, (_, commit_decision)) in all_verified_blocks.iter() { assert!(commit_decision.is_none()); } - // Create a commit decision for the first pending block - let first_pending_block = pending_blocks.first().unwrap(); - let first_pending_block_info = first_pending_block.blocks.last().unwrap().block_info(); + // Verify the unverified pending blocks were all inserted + let num_pending_blocks = get_num_pending_blocks(&pending_ordered_blocks); + assert_eq!( + num_pending_blocks, + num_verified_blocks + num_unverified_blocks + ); + + // Create a commit decision for the first verified block + let first_verified_block = verified_blocks.first().unwrap(); + let first_verified_block_info = first_verified_block.last_block().block_info(); let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( - LedgerInfo::new(first_pending_block_info.clone(), HashValue::random()), + LedgerInfo::new(first_verified_block_info.clone(), HashValue::random()), AggregateSignature::empty(), )); - // Update the commit decision for the first pending block + // Update the commit decision for the first verified block pending_ordered_blocks.update_commit_decision(&commit_decision); // Verify the commit decision was updated verify_commit_decision( &pending_ordered_blocks, - &first_pending_block_info, + &first_verified_block_info, commit_decision, ); // Create a commit decision for the last pending block - let last_pending_block = pending_blocks.last().unwrap(); - let last_pending_block_info = last_pending_block.blocks.last().unwrap().block_info(); + let last_pending_block = verified_blocks.last().unwrap(); + let last_pending_block_info = last_pending_block.last_block().block_info(); let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( LedgerInfo::new(last_pending_block_info.clone(), HashValue::random()), AggregateSignature::empty(), @@ -265,23 +559,168 @@ mod test { ); // Verify the commit decisions for the remaining blocks are still missing - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); for i in 1..9 { - let (_, commit_decision) = all_pending_blocks.get(&(i as u64)).unwrap(); + let (_, commit_decision) = all_verified_blocks.get(&(current_epoch, i as u64)).unwrap(); assert!(commit_decision.is_none()); } + + // Create a commit decision for the last unverified pending block + let last_unverified_block = unverified_blocks.last().unwrap(); + let last_unverified_block_info = last_unverified_block.last_block().block_info(); + let commit_decision = CommitDecision::new(LedgerInfoWithSignatures::new( + LedgerInfo::new(last_unverified_block_info.clone(), HashValue::random()), + AggregateSignature::empty(), + )); + + // Update the commit decision for the last unverified pending block + pending_ordered_blocks.update_commit_decision(&commit_decision); + + // Verify the commit decision was not updated + let pending_blocks = pending_ordered_blocks.pending_blocks.lock(); + let (_, _, commit_decision) = pending_blocks + .get(&(next_epoch, last_unverified_block_info.round())) + .unwrap(); + assert!(commit_decision.is_none()); + } + + #[test] + fn test_verify_pending_blocks() { + // Create new pending ordered blocks + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); + + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 5; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 10; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); + + // Insert additional unverified blocks for a future epoch + let future_epoch = next_epoch + 1; + let num_future_blocks = 30; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_future_blocks, + future_epoch, + false, + ); + + // Create an epoch state for the next epoch (with an empty verifier) + let epoch_state = Arc::new(EpochState::new(next_epoch, ValidatorVerifier::new(vec![]))); + + // Verify the pending blocks for the next epoch + pending_ordered_blocks.verify_pending_blocks(&epoch_state); + + // Ensure the verified pending blocks were all inserted + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!( + all_verified_blocks.len(), + num_verified_blocks + num_unverified_blocks + ); + + // Create an epoch state for the future epoch (with an empty verifier) + let epoch_state = EpochState::new(future_epoch, ValidatorVerifier::new(vec![])); + + // Verify the pending blocks for a future epoch + pending_ordered_blocks.verify_pending_blocks(&epoch_state); + + // Ensure the verified pending blocks were all inserted + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!( + all_verified_blocks.len(), + num_verified_blocks + num_unverified_blocks + num_future_blocks + ); + + // Ensure there are no longer any unverified pending blocks + assert_eq!( + all_verified_blocks.len(), + get_num_pending_blocks(&pending_ordered_blocks), + ); + } + + #[test] + fn test_verify_pending_blocks_failure() { + // Create new pending ordered blocks + let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default()); + + // Insert several verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 5; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_verified_blocks, + current_epoch, + true, + ); + + // Insert several unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 10; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_unverified_blocks, + next_epoch, + false, + ); + + // Insert additional unverified blocks for a future epoch + let future_epoch = next_epoch + 1; + let num_future_blocks = 30; + create_and_add_pending_blocks( + &pending_ordered_blocks, + num_future_blocks, + future_epoch, + false, + ); + + // Create an epoch state for the next epoch (with a non-empty verifier) + let validator_signer = ValidatorSigner::random(None); + let validator_consensus_info = ValidatorConsensusInfo::new( + validator_signer.author(), + validator_signer.public_key(), + 100, + ); + let validator_verified = ValidatorVerifier::new(vec![validator_consensus_info]); + let epoch_state = EpochState::new(next_epoch, validator_verified); + + // Verify the pending blocks for the next epoch + pending_ordered_blocks.verify_pending_blocks(&epoch_state); + + // Ensure the unverified pending blocks were not inserted + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + assert_eq!(all_verified_blocks.len(), num_verified_blocks); + + // Ensure the unverified pending blocks were all removed + let num_pending_blocks = get_num_pending_blocks(&pending_ordered_blocks); + assert_eq!(num_pending_blocks, num_verified_blocks); } /// Creates and adds the specified number of pending blocks to the pending ordered blocks fn create_and_add_pending_blocks( pending_ordered_blocks: &PendingOrderedBlocks, num_pending_blocks: usize, + epoch: u64, + verified_ordered_proof: bool, ) -> Vec { let mut pending_blocks = vec![]; for i in 0..num_pending_blocks { // Create a new block info let block_info = BlockInfo::new( - i as u64, + epoch, i as aptos_types::block_info::Round, HashValue::random(), HashValue::random(), @@ -303,17 +742,12 @@ mod test { // Create an ordered block let blocks = vec![pipelined_block]; - let ordered_proof = LedgerInfoWithSignatures::new( - LedgerInfo::new(BlockInfo::empty(), HashValue::random()), - AggregateSignature::empty(), - ); - let ordered_block = OrderedBlock { - blocks, - ordered_proof, - }; + let ordered_proof = create_ledger_info(epoch, i as Round); + let ordered_block = OrderedBlock::new(blocks, ordered_proof); // Insert the ordered block into the pending ordered blocks - pending_ordered_blocks.insert_ordered_block(ordered_block.clone()); + pending_ordered_blocks + .insert_ordered_block(ordered_block.clone(), verified_ordered_proof); // Add the ordered block to the pending blocks pending_blocks.push(ordered_block); @@ -322,6 +756,22 @@ mod test { pending_blocks } + /// Creates and returns a new ledger info with the specified epoch and round + fn create_ledger_info(epoch: u64, round: Round) -> LedgerInfoWithSignatures { + LedgerInfoWithSignatures::new( + LedgerInfo::new( + BlockInfo::random_with_epoch(epoch, round), + HashValue::random(), + ), + AggregateSignature::empty(), + ) + } + + /// Returns the number of pending blocks (both verified and unverified) + fn get_num_pending_blocks(pending_ordered_blocks: &PendingOrderedBlocks) -> usize { + pending_ordered_blocks.pending_blocks.lock().len() + } + /// Verifies the commit decision for the specified block info fn verify_commit_decision( pending_ordered_blocks: &PendingOrderedBlocks, @@ -329,8 +779,10 @@ mod test { commit_decision: CommitDecision, ) { // Get the commit decision for the block - let all_pending_blocks = pending_ordered_blocks.get_all_pending_blocks(); - let (_, updated_commit_decision) = all_pending_blocks.get(&block_info.round()).unwrap(); + let all_verified_blocks = pending_ordered_blocks.get_all_verified_pending_blocks(); + let (_, updated_commit_decision) = all_verified_blocks + .get(&(block_info.epoch(), block_info.round())) + .unwrap(); // Verify the commit decision is expected assert_eq!( diff --git a/consensus/src/consensus_observer/publisher.rs b/consensus/src/consensus_observer/publisher.rs index c56a817245fba..46e31fa6f65ae 100644 --- a/consensus/src/consensus_observer/publisher.rs +++ b/consensus/src/consensus_observer/publisher.rs @@ -315,7 +315,6 @@ fn spawn_message_serializer_and_sender( mod test { use super::*; use aptos_config::network_id::NetworkId; - use aptos_consensus_types::pipeline::commit_decision::CommitDecision; use aptos_crypto::HashValue; use aptos_network::{ application::{metadata::ConnectionState, storage::PeersAndMetadata}, @@ -516,12 +515,11 @@ mod test { process_unsubscription_for_peer(&consensus_publisher, &peer_network_id_1); // Publish another message to the active subscribers - let commit_decision_message = ConsensusObserverMessage::new_commit_decision_message( - CommitDecision::new(LedgerInfoWithSignatures::new( + let commit_decision_message = + ConsensusObserverMessage::new_commit_decision_message(LedgerInfoWithSignatures::new( LedgerInfo::new(BlockInfo::empty(), HashValue::zero()), AggregateSignature::empty(), - )), - ); + )); consensus_publisher .publish_message(commit_decision_message.clone()) .await; diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index 68e1ac8c19bd0..8e3907ab2d14a 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -24,9 +24,7 @@ use crate::{ }; use aptos_bounded_executor::BoundedExecutor; use aptos_config::config::ConsensusObserverConfig; -use aptos_consensus_types::{ - common::Author, pipeline::commit_decision::CommitDecision, pipelined_block::PipelinedBlock, -}; +use aptos_consensus_types::{common::Author, pipelined_block::PipelinedBlock}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorError; use aptos_logger::prelude::*; @@ -393,9 +391,8 @@ impl BufferManager { self.reset().await; } if let Some(consensus_publisher) = &self.consensus_publisher { - let message = ConsensusObserverMessage::new_commit_decision_message( - CommitDecision::new(commit_proof.clone()), - ); + let message = + ConsensusObserverMessage::new_commit_decision_message(commit_proof.clone()); consensus_publisher.publish_message(message).await; } self.persisting_phase_tx