Skip to content

Commit

Permalink
[Consensus Observer] Parallelize message processing logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 22, 2024
1 parent ef1c8eb commit b772220
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 48 deletions.
5 changes: 4 additions & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ impl ProofOfStore {
}
let result = validator
.verify_multi_signatures(&self.info, &self.multi_signature)
.context("Failed to verify ProofOfStore");
.context(format!(
"Failed to verify ProofOfStore for batch: {:?}",
self.info
));
if result.is_ok() {
cache.insert(self.info.clone(), self.multi_signature.clone());
}
Expand Down
131 changes: 86 additions & 45 deletions consensus/src/consensus_observer/network/observer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
transaction::SignedTransaction,
};
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
slice::Iter,
sync::Arc,
vec::IntoIter,
};

/// Types of messages that can be sent between the consensus publisher and observer
Expand All @@ -36,28 +40,25 @@ impl ConsensusObserverMessage {
blocks: Vec<Arc<PipelinedBlock>>,
ordered_proof: LedgerInfoWithSignatures,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::OrderedBlock(OrderedBlock {
blocks,
ordered_proof,
})
let ordered_block = OrderedBlock::new(blocks, ordered_proof);
ConsensusObserverDirectSend::OrderedBlock(ordered_block)
}

/// Creates and returns a new commit decision message using the given commit decision
pub fn new_commit_decision_message(
commit_proof: LedgerInfoWithSignatures,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::CommitDecision(CommitDecision { commit_proof })
let commit_decision = CommitDecision::new(commit_proof);
ConsensusObserverDirectSend::CommitDecision(commit_decision)
}

/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transaction_payload,
})
let block_payload = BlockPayload::new(block, transaction_payload);
ConsensusObserverDirectSend::BlockPayload(block_payload)
}
}

Expand Down Expand Up @@ -674,33 +675,65 @@ impl BlockPayload {
let num_payload_proofs = payload_proofs.len();
let num_inline_batches = inline_batches.len();

// Verify the payload proof digests using the transactions
let mut transactions_iter = transactions.iter();
// Gather the transactions for each payload batch
let mut batches_and_transactions = vec![];
let mut transactions_iter = transactions.into_iter();
for proof_of_store in &payload_proofs {
reconstruct_and_verify_batch(&block_info, &mut transactions_iter, proof_of_store.info(), true).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify payload proof digests! Num transactions: {:?}, \
match reconstruct_batch(
&block_info,
&mut transactions_iter,
proof_of_store.info(),
true,
) {
Ok(Some(batch_transactions)) => {
batches_and_transactions
.push((proof_of_store.info().clone(), batch_transactions));
},
Ok(None) => { /* Nothing needs to be done (the batch was expired) */ },
Err(error) => {
return Err(Error::InvalidMessageError(format!(
"Failed to reconstruct payload proof batch! Num transactions: {:?}, \
num batches: {:?}, num inline batches: {:?}, failed batch: {:?}, Error: {:?}",
num_transactions, num_payload_proofs, num_inline_batches, proof_of_store.info(), error
))
)));
},
)?;
}
}

// Verify the inline batch digests using the transactions
// Gather the transactions for each inline batch
for batch_info in inline_batches.into_iter() {
reconstruct_and_verify_batch(&block_info, &mut transactions_iter, batch_info, false).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify inline batch digests! Num transactions: {:?}, \
match reconstruct_batch(&block_info, &mut transactions_iter, batch_info, false) {
Ok(Some(batch_transactions)) => {
batches_and_transactions.push((batch_info.clone(), batch_transactions));
},
Ok(None) => {
return Err(Error::UnexpectedError(format!(
"Failed to reconstruct inline batch! Batch was unexpectedly skipped: {:?}",
batch_info
)));
},
Err(error) => {
return Err(Error::InvalidMessageError(format!(
"Failed to reconstruct inline batch! Num transactions: {:?}, \
num batches: {:?}, num inline batches: {:?}, failed batch: {:?}, Error: {:?}",
num_transactions, num_payload_proofs, num_inline_batches, batch_info, error
))
)));
},
)?;
}
}

// Verify all the reconstructed batches (in parallel)
batches_and_transactions
.into_par_iter()
.with_min_len(2)
.try_for_each(|(batch_info, transactions)| verify_batch(&batch_info, transactions))
.map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify the payload batches and transactions! Error: {:?}",
error
))
})?;

// Verify that there are no transactions remaining (all transactions should be consumed)
let remaining_transactions = transactions_iter.as_slice();
if !remaining_transactions.is_empty() {
Expand All @@ -721,38 +754,38 @@ impl BlockPayload {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// TODO: parallelize the verification of the proof signatures!

// Verify each of the proof signatures
// Verify each of the proof signatures (in parallel)
let payload_proofs = self.transaction_payload.payload_proofs();
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.payload_proofs() {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
payload_proofs
.par_iter()
.with_min_len(2)
.try_for_each(|proof| proof.verify(validator_verifier, &proof_cache))
.map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify the payload proof signatures! Error: {:?}",
error
)));
}
}
))
})?;

Ok(()) // All proofs are correctly signed
}
}

/// Reconstructs and verifies the batch using the given transactions
/// and the expected batch info. If `skip_expired_batches` is true
/// then verification will be skipped if the batch is expired.
fn reconstruct_and_verify_batch(
/// Reconstructs the batch using the given transactions and the
/// expected batch info. If `skip_expired_batches` is true
/// then reconstruction will be skipped if the batch is expired.
fn reconstruct_batch(
block_info: &BlockInfo,
transactions_iter: &mut Iter<SignedTransaction>,
transactions_iter: &mut IntoIter<SignedTransaction>,
expected_batch_info: &BatchInfo,
skip_expired_batches: bool,
) -> Result<(), Error> {
// If the batch is expired we should skip verification (as the
) -> Result<Option<Vec<SignedTransaction>>, Error> {
// If the batch is expired we should skip reconstruction (as the
// transactions for the expired batch won't be sent in the payload).
// Note: this should only be required for QS batches (not inline batches).
if skip_expired_batches && block_info.timestamp_usecs() > expected_batch_info.expiration() {
return Ok(());
return Ok(None);
}

// Gather the transactions for the batch
Expand All @@ -767,9 +800,17 @@ fn reconstruct_and_verify_batch(
)));
},
};
batch_transactions.push(batch_transaction.clone());
batch_transactions.push(batch_transaction);
}

Ok(Some(batch_transactions))
}

/// Verifies the batch digest using the given transactions and the expected batch info
fn verify_batch(
expected_batch_info: &BatchInfo,
batch_transactions: Vec<SignedTransaction>,
) -> Result<(), Error> {
// Calculate the batch digest
let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions);
let batch_digest = batch_payload.hash();
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/consensus_observer/observer/payload_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ impl BlockPayloadStore {
}

/// Verifies the block payload signatures against the given epoch state.
/// If verification is successful, blocks are marked as verified. Each
/// new verified block is
/// If verification is successful, blocks are marked as verified.
pub fn verify_payload_signatures(&mut self, epoch_state: &EpochState) -> Vec<Round> {
// Get the current epoch
let current_epoch = epoch_state.epoch;
Expand Down

0 comments on commit b772220

Please sign in to comment.