Skip to content

Commit

Permalink
[Consensus Observer] Add block payload verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 17, 2024
1 parent 7023fb3 commit 452435b
Show file tree
Hide file tree
Showing 8 changed files with 1,265 additions and 251 deletions.
4 changes: 4 additions & 0 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ProofWithData {
}
}

pub fn empty() -> Self {
Self::new(vec![])
}

pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
Expand Down
161 changes: 149 additions & 12 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_observer::error::Error;
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_consensus_types::{
common::{BatchPayload, ProofWithData},
pipelined_block::PipelinedBlock,
proof_of_store::{BatchInfo, ProofCache},
};
use aptos_crypto::hash::CryptoHash;
use aptos_types::{
block_info::{BlockInfo, Round},
epoch_change::Verifier,
Expand All @@ -12,6 +17,7 @@ use aptos_types::{
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
fmt::{Display, Formatter},
sync::Arc,
};
Expand Down Expand Up @@ -46,13 +52,11 @@ impl ConsensusObserverMessage {
/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transactions,
limit,
transaction_payload,
})
}
}
Expand Down Expand Up @@ -150,10 +154,11 @@ impl ConsensusObserverDirectSend {
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
format!(
"BlockPayload: {} {} {:?}",
block_payload.block.id(),
block_payload.transactions.len(),
block_payload.limit
"BlockPayload: {}. Number of transactions: {}, limit: {:?}, proofs: {:?}",
block_payload.block,
block_payload.transaction_payload.transactions.len(),
block_payload.transaction_payload.limit,
block_payload.transaction_payload.proof_with_data.proofs,
)
},
}
Expand Down Expand Up @@ -304,10 +309,142 @@ impl CommitDecision {
}
}

/// Payload message contains the block, transactions and the limit of the block
/// The transaction payload of each block
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub struct BlockTransactionPayload {
pub transactions: Vec<SignedTransaction>,
pub limit: Option<u64>,
pub proof_with_data: ProofWithData,
pub inline_batches: Vec<BatchInfo>,
}

impl BlockTransactionPayload {
pub fn new(
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
proof_with_data: ProofWithData,
inline_batches: Vec<BatchInfo>,
) -> Self {
Self {
transactions,
limit,
proof_with_data,
inline_batches,
}
}

#[cfg(test)]
/// Returns an empty transaction payload (for testing)
pub fn empty() -> Self {
Self {
transactions: vec![],
limit: None,
proof_with_data: ProofWithData::empty(),
inline_batches: vec![],
}
}
}

/// Payload message contains the block and transaction payload
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub transaction_payload: BlockTransactionPayload,
}

impl BlockPayload {
pub fn new(block: BlockInfo, transaction_payload: BlockTransactionPayload) -> Self {
Self {
block,
transaction_payload,
}
}

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Verify the proof of store digests against the transactions
let mut transactions = self
.transaction_payload
.transactions
.iter()
.cloned()
.collect::<VecDeque<_>>();
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
reconstruct_and_verify_batch(&mut transactions, proof_of_store.info())?;
}

// Verify the inline batch digests against the inline batches
for batch_info in &self.transaction_payload.inline_batches {
reconstruct_and_verify_batch(&mut transactions, batch_info)?;
}

// Verify that there are no transactions remaining
if !transactions.is_empty() {
return Err(Error::InvalidMessageError(format!(
"Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0",
transactions.len()
)));
}

Ok(()) // All digests match
}

/// Verifies that the block payload proofs are correctly signed according
/// to the current epoch state. Returns an error if the data is invalid.
pub fn verify_payload_signatures(&self, epoch_state: &EpochState) -> Result<(), Error> {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// Verify each of the proof signatures
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
error
)));
}
}

Ok(()) // All proofs are correctly signed
}
}

/// Reconstructs and verifies the batch using the
/// given transactions and the expected batch info.
fn reconstruct_and_verify_batch(
transactions: &mut VecDeque<SignedTransaction>,
expected_batch_info: &BatchInfo,
) -> Result<(), Error> {
// Gather the transactions for the batch
let mut batch_transactions = vec![];
for i in 0..expected_batch_info.num_txns() {
let batch_transaction = match transactions.pop_front() {
Some(transaction) => transaction,
None => {
return Err(Error::InvalidMessageError(format!(
"Failed to extract transaction during batch reconstruction! Batch: {:?}, transaction index: {:?}",
expected_batch_info, i
)));
},
};
batch_transactions.push(batch_transaction);
}

// Calculate the batch digest
let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions);
let batch_digest = batch_payload.hash();

// Verify the reconstructed digest against the expected digest
let expected_digest = expected_batch_info.digest();
if batch_digest != *expected_digest {
return Err(Error::InvalidMessageError(format!(
"The reconstructed inline batch digest does not match the expected digest!\
Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}",
expected_batch_info, expected_digest, batch_digest
)));
}

Ok(())
}
82 changes: 56 additions & 26 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ConsensusObserver {
epoch_state: None,
quorum_store_enabled: false, // Updated on epoch changes
root: Arc::new(Mutex::new(root)),
block_payload_store: BlockPayloadStore::new(),
block_payload_store: BlockPayloadStore::new(consensus_observer_config),
pending_block_store: PendingBlockStore::new(consensus_observer_config),
pending_ordered_blocks: PendingOrderedBlocks::new(consensus_observer_config),
execution_client,
Expand Down Expand Up @@ -286,10 +286,8 @@ impl ConsensusObserver {

// Create the commit callback
Box::new(move |blocks, ledger_info: LedgerInfoWithSignatures| {
// Remove the committed blocks from the payload store
block_payload_store.remove_blocks(blocks);

// Remove the committed blocks from the pending blocks
// Remove the committed blocks from the payload and pending stores
block_payload_store.remove_committed_blocks(blocks);
pending_ordered_blocks.remove_blocks_for_commit(&ledger_info);

// Verify the ledger info is for the same epoch
Expand Down Expand Up @@ -510,14 +508,9 @@ impl ConsensusObserver {

/// Processes the block payload message
async fn process_block_payload_message(&mut self, block_payload: BlockPayload) {
// Get the block round and epoch
let block = block_payload.block;
let block_round = block.round();
let block_epoch = block.epoch();

// Get the block transactions and limit
let transactions = block_payload.transactions;
let limit = block_payload.limit;
// Get the epoch and round for the block
let block_epoch = block_payload.block.epoch();
let block_round = block_payload.block.round();

// Update the metrics for the received block payload
metrics::set_gauge_with_label(
Expand All @@ -526,21 +519,52 @@ impl ConsensusObserver {
block_round,
);

// TODO: verify the block payload!
// Verify the block payload digests
if let Err(error) = block_payload.verify_payload_digests() {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to verify block payload digests! Ignoring block: {:?}. Error: {:?}",
block_payload.block, error
))
);
return;
}

// If the payload is for the current epoch, verify the proof signatures
let epoch_state = self.get_epoch_state();
let verified_payload = if block_epoch == epoch_state.epoch {
// Verify the block proof signatures
if let Err(error) = block_payload.verify_payload_signatures(&epoch_state) {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to verify block payload signatures! Ignoring block: {:?}. Error: {:?}",
block_payload.block, error
))
);
return;
}

true // We have successfully verified the signatures
} else {
false // We can't verify the signatures yet
};

// Update the payload store with the payload
self.block_payload_store
.insert_block_payload(block, transactions, limit);

// Check if there are blocks that were pending payloads
// but are now ready because of the new payload.
if let Some(ordered_block) = self.pending_block_store.remove_ready_block(
block_epoch,
block_round,
&self.block_payload_store,
) {
// Process the ordered block
self.process_ordered_block(ordered_block).await;
.insert_block_payload(block_payload, verified_payload);

// Check if there are blocks that were missing payloads but are
// now ready because of the new payload. Note: this should only
// be done if the payload has been verified correctly.
if verified_payload {
if let Some(ordered_block) = self.pending_block_store.remove_ready_block(
block_epoch,
block_round,
&self.block_payload_store,
) {
// Process the ordered block
self.process_ordered_block(ordered_block).await;
}
}
}

Expand All @@ -553,7 +577,7 @@ impl ConsensusObserver {
commit_decision.round(),
);

// If the commit decision is for the current epoch, verify it
// If the commit decision is for the current epoch, verify and process it
let epoch_state = self.get_epoch_state();
let commit_decision_epoch = commit_decision.epoch();
if commit_decision_epoch == epoch_state.epoch {
Expand Down Expand Up @@ -593,6 +617,8 @@ impl ConsensusObserver {

// Update the root and clear the pending blocks (up to the commit)
*self.root.lock() = commit_decision.commit_proof().clone();
self.block_payload_store
.remove_blocks_for_epoch_round(commit_decision_epoch, commit_decision_round);
self.pending_ordered_blocks
.remove_blocks_for_commit(commit_decision.commit_proof());

Expand Down Expand Up @@ -857,6 +883,10 @@ impl ConsensusObserver {
self.execution_client.end_epoch().await;
self.wait_for_epoch_start().await;

// Verify the block payloads for the new epoch
self.block_payload_store
.verify_payload_signatures(&self.get_epoch_state());

// Verify the pending blocks for the new epoch
let new_epoch_state = self.get_epoch_state();
self.pending_ordered_blocks
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus_observer/ordered_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,8 @@ mod test {
validator_signer.public_key(),
100,
);
let validator_verified = ValidatorVerifier::new(vec![validator_consensus_info]);
let epoch_state = EpochState::new(next_epoch, validator_verified);
let validator_verifier = ValidatorVerifier::new(vec![validator_consensus_info]);
let epoch_state = EpochState::new(next_epoch, validator_verifier);

// Verify the pending blocks for the next epoch
pending_ordered_blocks.verify_pending_blocks(&epoch_state);
Expand Down
Loading

0 comments on commit 452435b

Please sign in to comment.