Skip to content

Commit

Permalink
[Consensus Observer] Add message verification to ordered blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 16, 2024
1 parent 235402e commit c4c22c4
Show file tree
Hide file tree
Showing 6 changed files with 841 additions and 250 deletions.
3 changes: 3 additions & 0 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub struct ConsensusObserverConfig {

/// Interval (in milliseconds) to garbage collect peer state
pub garbage_collection_interval_ms: u64,
/// Maximum number of pending blocks to keep in memory
pub max_num_pending_blocks: u64,
/// Maximum timeout (in milliseconds) for active subscriptions
pub max_subscription_timeout_ms: u64,
/// Maximum timeout (in milliseconds) we'll wait for the synced version to
Expand All @@ -50,6 +52,7 @@ impl Default for ConsensusObserverConfig {
max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs
network_request_timeout_ms: 10_000, // 10 seconds
garbage_collection_interval_ms: 60_000, // 60 seconds
max_num_pending_blocks: 100, // 100 blocks
max_subscription_timeout_ms: 30_000, // 30 seconds
max_synced_version_timeout_ms: 60_000, // 60 seconds
peer_optimality_check_interval_ms: 60_000, // 60 seconds
Expand Down
166 changes: 150 additions & 16 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::{
pipeline::commit_decision::CommitDecision, pipelined_block::PipelinedBlock,
};
use crate::consensus_observer::error::Error;
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_types::{
block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures, transaction::SignedTransaction,
block_info::{BlockInfo, Round},
epoch_change::Verifier,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
transaction::SignedTransaction,
};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -35,9 +38,9 @@ impl ConsensusObserverMessage {

/// Creates and returns a new commit decision message using the given commit decision
pub fn new_commit_decision_message(
commit_decision: CommitDecision,
commit_proof: LedgerInfoWithSignatures,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::CommitDecision(commit_decision)
ConsensusObserverDirectSend::CommitDecision(CommitDecision { commit_proof })
}

/// Creates and returns a new block payload message using the given block, transactions and limit
Expand Down Expand Up @@ -140,16 +143,10 @@ impl ConsensusObserverDirectSend {
pub fn get_content(&self) -> String {
match self {
ConsensusObserverDirectSend::OrderedBlock(ordered_block) => {
format!(
"OrderedBlock: {}",
ordered_block.ordered_proof.commit_info()
)
format!("OrderedBlock: {}", ordered_block.proof_block_info())
},
ConsensusObserverDirectSend::CommitDecision(commit_decision) => {
format!(
"CommitDecision: {}",
commit_decision.ledger_info().commit_info()
)
format!("CommitDecision: {}", commit_decision.proof_block_info())
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
format!(
Expand All @@ -166,8 +163,145 @@ impl ConsensusObserverDirectSend {
/// OrderedBlock message contains the ordered blocks and the proof of the ordering
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct OrderedBlock {
pub blocks: Vec<Arc<PipelinedBlock>>,
pub ordered_proof: LedgerInfoWithSignatures,
blocks: Vec<Arc<PipelinedBlock>>,
ordered_proof: LedgerInfoWithSignatures,
}

impl OrderedBlock {
pub fn new(blocks: Vec<Arc<PipelinedBlock>>, ordered_proof: LedgerInfoWithSignatures) -> Self {
Self {
blocks,
ordered_proof,
}
}

/// Returns a reference to the ordered blocks
pub fn blocks(&self) -> &Vec<Arc<PipelinedBlock>> {
&self.blocks
}

/// Returns a copy of the first ordered block
pub fn first_block(&self) -> Arc<PipelinedBlock> {
self.blocks
.first()
.cloned()
.expect("At least one block is expected!")
}

/// Returns a copy of the last ordered block
pub fn last_block(&self) -> Arc<PipelinedBlock> {
self.blocks
.last()
.cloned()
.expect("At least one block is expected!")
}

/// Returns a reference to the ordered proof
pub fn ordered_proof(&self) -> &LedgerInfoWithSignatures {
&self.ordered_proof
}

/// Returns a reference to the ordered proof block info
pub fn proof_block_info(&self) -> &BlockInfo {
self.ordered_proof.commit_info()
}

/// Verifies the ordered blocks and returns an error if the data is invalid.
/// Note: this does not check the ordered proof.
pub fn verify_ordered_blocks(&self) -> Result<(), Error> {
// Verify that we have at least one ordered block
if self.blocks.is_empty() {
return Err(Error::InvalidMessageError(
"Received empty ordered block!".to_string(),
));
}

// Verify the last block ID matches the ordered proof block ID
if self.last_block().id() != self.proof_block_info().id() {
return Err(Error::InvalidMessageError(
format!(
"Last ordered block ID does not match the ordered proof ID! Number of blocks: {:?}, Last ordered block ID: {:?}, Ordered proof ID: {:?}",
self.blocks.len(),
self.last_block().id(),
self.proof_block_info().id()
)
));
}

// Verify the blocks are correctly chained together (from the last block to the first)
let mut expected_parent_id = None;
for block in self.blocks.iter().rev() {
if let Some(expected_parent_id) = expected_parent_id {
if block.id() != expected_parent_id {
return Err(Error::InvalidMessageError(
format!(
"Block parent ID does not match the expected parent ID! Block ID: {:?}, Expected parent ID: {:?}",
block.id(),
expected_parent_id
)
));
}
}

expected_parent_id = Some(block.parent_id());
}

Ok(())
}

/// Verifies the ordered proof and returns an error if the proof is invalid
pub fn verify_ordered_proof(&self, epoch_state: &EpochState) -> Result<(), Error> {
epoch_state.verify(&self.ordered_proof).map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify ordered proof ledger info: {:?}, Error: {:?}",
self.proof_block_info(),
error
))
})
}
}

/// CommitDecision message contains the commit decision proof
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct CommitDecision {
commit_proof: LedgerInfoWithSignatures,
}

impl CommitDecision {
pub fn new(commit_proof: LedgerInfoWithSignatures) -> Self {
Self { commit_proof }
}

/// Returns a reference to the commit proof
pub fn commit_proof(&self) -> &LedgerInfoWithSignatures {
&self.commit_proof
}

/// Returns the epoch of the commit proof
pub fn epoch(&self) -> u64 {
self.commit_proof.ledger_info().epoch()
}

/// Returns a reference to the commit proof block info
pub fn proof_block_info(&self) -> &BlockInfo {
self.commit_proof.commit_info()
}

/// Returns the round of the commit proof
pub fn round(&self) -> Round {
self.commit_proof.ledger_info().round()
}

/// Verifies the commit proof and returns an error if the proof is invalid
pub fn verify_commit_proof(&self, epoch_state: &EpochState) -> Result<(), Error> {
epoch_state.verify(&self.commit_proof).map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify commit proof ledger info: {:?}, Error: {:?}",
self.proof_block_info(),
error
))
})
}
}

/// Payload message contains the block, transactions and the limit of the block
Expand Down
Loading

0 comments on commit c4c22c4

Please sign in to comment.