Skip to content

Commit

Permalink
[Consensus Observer] Add block payload verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 18, 2024
1 parent 3268730 commit da0b809
Show file tree
Hide file tree
Showing 8 changed files with 1,275 additions and 285 deletions.
4 changes: 4 additions & 0 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ProofWithData {
}
}

pub fn empty() -> Self {
Self::new(vec![])
}

pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
Expand Down
161 changes: 149 additions & 12 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_observer::error::Error;
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_consensus_types::{
common::{BatchPayload, ProofWithData},
pipelined_block::PipelinedBlock,
proof_of_store::{BatchInfo, ProofCache},
};
use aptos_crypto::hash::CryptoHash;
use aptos_types::{
block_info::{BlockInfo, Round},
epoch_change::Verifier,
Expand All @@ -12,6 +17,7 @@ use aptos_types::{
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
fmt::{Display, Formatter},
sync::Arc,
};
Expand Down Expand Up @@ -46,13 +52,11 @@ impl ConsensusObserverMessage {
/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transactions,
limit,
transaction_payload,
})
}
}
Expand Down Expand Up @@ -150,10 +154,11 @@ impl ConsensusObserverDirectSend {
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
format!(
"BlockPayload: {} {} {:?}",
block_payload.block.id(),
block_payload.transactions.len(),
block_payload.limit
"BlockPayload: {}. Number of transactions: {}, limit: {:?}, proofs: {:?}",
block_payload.block,
block_payload.transaction_payload.transactions.len(),
block_payload.transaction_payload.limit,
block_payload.transaction_payload.proof_with_data.proofs,
)
},
}
Expand Down Expand Up @@ -304,10 +309,142 @@ impl CommitDecision {
}
}

/// Payload message contains the block, transactions and the limit of the block
/// The transaction payload of each block
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub struct BlockTransactionPayload {
pub transactions: Vec<SignedTransaction>,
pub limit: Option<u64>,
pub proof_with_data: ProofWithData,
pub inline_batches: Vec<BatchInfo>,
}

impl BlockTransactionPayload {
pub fn new(
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
proof_with_data: ProofWithData,
inline_batches: Vec<BatchInfo>,
) -> Self {
Self {
transactions,
limit,
proof_with_data,
inline_batches,
}
}

#[cfg(test)]
/// Returns an empty transaction payload (for testing)
pub fn empty() -> Self {
Self {
transactions: vec![],
limit: None,
proof_with_data: ProofWithData::empty(),
inline_batches: vec![],
}
}
}

/// Payload message contains the block and transaction payload
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub transaction_payload: BlockTransactionPayload,
}

impl BlockPayload {
pub fn new(block: BlockInfo, transaction_payload: BlockTransactionPayload) -> Self {
Self {
block,
transaction_payload,
}
}

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Verify the proof of store digests against the transactions
let mut transactions = self
.transaction_payload
.transactions
.iter()
.cloned()
.collect::<VecDeque<_>>();
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
reconstruct_and_verify_batch(&mut transactions, proof_of_store.info())?;
}

// Verify the inline batch digests against the inline batches
for batch_info in &self.transaction_payload.inline_batches {
reconstruct_and_verify_batch(&mut transactions, batch_info)?;
}

// Verify that there are no transactions remaining
if !transactions.is_empty() {
return Err(Error::InvalidMessageError(format!(
"Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0",
transactions.len()
)));
}

Ok(()) // All digests match
}

/// Verifies that the block payload proofs are correctly signed according
/// to the current epoch state. Returns an error if the data is invalid.
pub fn verify_payload_signatures(&self, epoch_state: &EpochState) -> Result<(), Error> {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// Verify each of the proof signatures
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
error
)));
}
}

Ok(()) // All proofs are correctly signed
}
}

/// Reconstructs and verifies the batch using the
/// given transactions and the expected batch info.
fn reconstruct_and_verify_batch(
transactions: &mut VecDeque<SignedTransaction>,
expected_batch_info: &BatchInfo,
) -> Result<(), Error> {
// Gather the transactions for the batch
let mut batch_transactions = vec![];
for i in 0..expected_batch_info.num_txns() {
let batch_transaction = match transactions.pop_front() {
Some(transaction) => transaction,
None => {
return Err(Error::InvalidMessageError(format!(
"Failed to extract transaction during batch reconstruction! Batch: {:?}, transaction index: {:?}",
expected_batch_info, i
)));
},
};
batch_transactions.push(batch_transaction);
}

// Calculate the batch digest
let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions);
let batch_digest = batch_payload.hash();

// Verify the reconstructed digest against the expected digest
let expected_digest = expected_batch_info.digest();
if batch_digest != *expected_digest {
return Err(Error::InvalidMessageError(format!(
"The reconstructed inline batch digest does not match the expected digest!\
Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}",
expected_batch_info, expected_digest, batch_digest
)));
}

Ok(())
}
Loading

0 comments on commit da0b809

Please sign in to comment.