Skip to content

Commit

Permalink
[Consensus Observer] Parallelize message processing logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 21, 2024
1 parent 38aedc2 commit 05c8308
Showing 3 changed files with 25 additions and 23 deletions.
5 changes: 4 additions & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -363,7 +363,10 @@ impl ProofOfStore {
}
let result = validator
.verify_multi_signatures(&self.info, &self.multi_signature)
.context("Failed to verify ProofOfStore");
.context(format!(
"Failed to verify ProofOfStore for batch: {:?}",
self.info
));
if result.is_ok() {
cache.insert(self.info.clone(), self.multi_signature.clone());
}
40 changes: 20 additions & 20 deletions consensus/src/consensus_observer/network/observer_message.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,10 @@ use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
transaction::SignedTransaction,
};
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
@@ -36,28 +40,25 @@ impl ConsensusObserverMessage {
blocks: Vec<Arc<PipelinedBlock>>,
ordered_proof: LedgerInfoWithSignatures,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::OrderedBlock(OrderedBlock {
blocks,
ordered_proof,
})
let ordered_block = OrderedBlock::new(blocks, ordered_proof);
ConsensusObserverDirectSend::OrderedBlock(ordered_block)
}

/// Creates and returns a new commit decision message using the given commit decision
pub fn new_commit_decision_message(
commit_proof: LedgerInfoWithSignatures,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::CommitDecision(CommitDecision { commit_proof })
let commit_decision = CommitDecision::new(commit_proof);
ConsensusObserverDirectSend::CommitDecision(commit_decision)
}

/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transaction_payload,
})
let block_payload = BlockPayload::new(block, transaction_payload);
ConsensusObserverDirectSend::BlockPayload(block_payload)
}
}

@@ -721,19 +722,18 @@ impl BlockPayload {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// TODO: parallelize the verification of the proof signatures!

// Verify each of the proof signatures
// Verify each of the proof signatures (in parallel)
let payload_proofs = self.transaction_payload.payload_proofs();
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.payload_proofs() {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
payload_proofs
.par_iter()
.try_for_each(|proof| proof.verify(validator_verifier, &proof_cache))
.map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify the payload proof signatures! Error: {:?}",
error
)));
}
}
))
})?;

Ok(()) // All proofs are correctly signed
}
3 changes: 1 addition & 2 deletions consensus/src/consensus_observer/observer/payload_store.rs
Original file line number Diff line number Diff line change
@@ -213,8 +213,7 @@ impl BlockPayloadStore {
}

/// Verifies the block payload signatures against the given epoch state.
/// If verification is successful, blocks are marked as verified. Each
/// new verified block is
/// If verification is successful, blocks are marked as verified.
pub fn verify_payload_signatures(&mut self, epoch_state: &EpochState) -> Vec<Round> {
// Get the current epoch
let current_epoch = epoch_state.epoch;

0 comments on commit 05c8308

Please sign in to comment.