Skip to content

Commit

Permalink
[Consensus Observer] Add block payload verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 21, 2024
1 parent 0522406 commit 6644003
Show file tree
Hide file tree
Showing 8 changed files with 1,270 additions and 285 deletions.
4 changes: 4 additions & 0 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ProofWithData {
}
}

pub fn empty() -> Self {
Self::new(vec![])
}

pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
Expand Down
158 changes: 146 additions & 12 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_observer::error::Error;
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_consensus_types::{
common::{BatchPayload, ProofWithData},
pipelined_block::PipelinedBlock,
proof_of_store::{BatchInfo, ProofCache},
};
use aptos_crypto::hash::CryptoHash;
use aptos_types::{
block_info::{BlockInfo, Round},
epoch_change::Verifier,
Expand All @@ -13,6 +18,7 @@ use aptos_types::{
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
slice::Iter,
sync::Arc,
};

Expand Down Expand Up @@ -46,13 +52,11 @@ impl ConsensusObserverMessage {
/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transactions,
limit,
transaction_payload,
})
}
}
Expand Down Expand Up @@ -150,10 +154,11 @@ impl ConsensusObserverDirectSend {
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
format!(
"BlockPayload: {} {} {:?}",
block_payload.block.id(),
block_payload.transactions.len(),
block_payload.limit
"BlockPayload: {}. Number of transactions: {}, limit: {:?}, proofs: {:?}",
block_payload.block,
block_payload.transaction_payload.transactions.len(),
block_payload.transaction_payload.limit,
block_payload.transaction_payload.proof_with_data.proofs,
)
},
}
Expand Down Expand Up @@ -304,10 +309,139 @@ impl CommitDecision {
}
}

/// Payload message contains the block, transactions and the limit of the block
/// The transaction payload of each block
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub struct BlockTransactionPayload {
pub transactions: Vec<SignedTransaction>,
pub limit: Option<u64>,
pub proof_with_data: ProofWithData,
pub inline_batches: Vec<BatchInfo>,
}

impl BlockTransactionPayload {
pub fn new(
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
proof_with_data: ProofWithData,
inline_batches: Vec<BatchInfo>,
) -> Self {
Self {
transactions,
limit,
proof_with_data,
inline_batches,
}
}

#[cfg(test)]
/// Returns an empty transaction payload (for testing)
pub fn empty() -> Self {
Self {
transactions: vec![],
limit: None,
proof_with_data: ProofWithData::empty(),
inline_batches: vec![],
}
}
}

/// Payload message contains the block and transaction payload
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub transaction_payload: BlockTransactionPayload,
}

impl BlockPayload {
pub fn new(block: BlockInfo, transaction_payload: BlockTransactionPayload) -> Self {
Self {
block,
transaction_payload,
}
}

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Verify the proof of store digests against the transaction
let mut transactions_iter = self.transaction_payload.transactions.iter();
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info())?;
}

// Verify the inline batch digests against the inline batches
for batch_info in &self.transaction_payload.inline_batches {
reconstruct_and_verify_batch(&mut transactions_iter, batch_info)?;
}

// Verify that there are no transactions remaining
if transactions_iter.next().is_some() {
return Err(Error::InvalidMessageError(format!(
"Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0",
transactions_iter.as_slice().len()
)));
}

Ok(()) // All digests match
}

/// Verifies that the block payload proofs are correctly signed according
/// to the current epoch state. Returns an error if the data is invalid.
pub fn verify_payload_signatures(&self, epoch_state: &EpochState) -> Result<(), Error> {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// TODO: parallelize the verification of the proof signatures!

// Verify each of the proof signatures
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
error
)));
}
}

Ok(()) // All proofs are correctly signed
}
}

/// Reconstructs and verifies the batch using the
/// given transactions and the expected batch info.
fn reconstruct_and_verify_batch(
transactions_iter: &mut Iter<SignedTransaction>,
expected_batch_info: &BatchInfo,
) -> Result<(), Error> {
// Gather the transactions for the batch
let mut batch_transactions = vec![];
for i in 0..expected_batch_info.num_txns() {
let batch_transaction = match transactions_iter.next() {
Some(transaction) => transaction,
None => {
return Err(Error::InvalidMessageError(format!(
"Failed to extract transaction during batch reconstruction! Batch: {:?}, transaction index: {:?}",
expected_batch_info, i
)));
},
};
batch_transactions.push(batch_transaction.clone());
}

// Calculate the batch digest
let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions);
let batch_digest = batch_payload.hash();

// Verify the reconstructed digest against the expected digest
let expected_digest = expected_batch_info.digest();
if batch_digest != *expected_digest {
return Err(Error::InvalidMessageError(format!(
"The reconstructed inline batch digest does not match the expected digest!\
Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}",
expected_batch_info, expected_digest, batch_digest
)));
}

Ok(())
}
Loading

0 comments on commit 6644003

Please sign in to comment.