From 04d3589b960b02d6fc6c04a038f66132d37156c4 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Tue, 8 Oct 2024 14:44:28 -0700 Subject: [PATCH] Old OSV code --- .../consensus-types/src/proof_of_store.rs | 12 + consensus/src/epoch_manager.rs | 4 +- consensus/src/quorum_store/batch_requester.rs | 13 +- consensus/src/quorum_store/counters.rs | 10 + .../src/quorum_store/network_listener.rs | 23 +- .../src/quorum_store/proof_coordinator.rs | 264 +++++++++++++----- .../src/quorum_store/quorum_store_builder.rs | 32 +-- .../tests/batch_requester_test.rs | 16 +- .../tests/proof_coordinator_test.rs | 84 +++++- 9 files changed, 337 insertions(+), 121 deletions(-) diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 3f06a6cf0f274..9f6c9af18e3aa 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -234,6 +234,15 @@ impl SignedBatchInfo { }) } + #[cfg(any(test, feature = "fuzzing"))] + pub fn dummy(batch_info: BatchInfo, signer: PeerId) -> Self { + Self { + info: batch_info, + signer, + signature: bls12381::Signature::dummy_signature(), + } + } + pub fn signer(&self) -> PeerId { self.signer } @@ -289,6 +298,9 @@ pub enum SignedBatchInfoError { NotFound, AlreadyCommitted, NoTimeStamps, + UnableToAggregate, + LowVotingPower, + InvalidAggregatedSignature, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index c0e1ba3ac7ecb..253de75a739a1 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -684,16 +684,14 @@ impl EpochManager

{ let mut quorum_store_builder = if self.quorum_store_enabled { info!("Building QuorumStore"); QuorumStoreBuilder::QuorumStore(InnerBuilder::new( - self.epoch(), + epoch_state.clone(), self.author, - epoch_state.verifier.len() as u64, quorum_store_config, consensus_to_quorum_store_rx, self.quorum_store_to_mempool_sender.clone(), self.config.mempool_txn_pull_timeout_ms, self.storage.aptos_db().clone(), network_sender, - epoch_state.verifier.clone(), self.proof_cache.clone(), self.config.safety_rules.backend.clone(), self.quorum_store_storage.clone(), diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index c7dfe6aeff0c1..03ac1302e2baf 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -13,7 +13,7 @@ use aptos_consensus_types::proof_of_store::BatchInfo; use aptos_crypto::HashValue; use aptos_executor_types::*; use aptos_logger::prelude::*; -use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId}; +use aptos_types::{epoch_state::EpochState, transaction::SignedTransaction, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; use rand::Rng; use std::{sync::Arc, time::Duration}; @@ -95,19 +95,17 @@ impl BatchRequesterState { } pub(crate) struct BatchRequester { - epoch: u64, my_peer_id: PeerId, request_num_peers: usize, retry_limit: usize, retry_interval_ms: usize, rpc_timeout_ms: usize, network_sender: T, - validator_verifier: Arc, + epoch_state: Arc, } impl BatchRequester { pub(crate) fn new( - epoch: u64, my_peer_id: PeerId, request_num_peers: usize, retry_limit: usize, @@ -117,7 +115,6 @@ impl BatchRequester { validator_verifier: Arc, ) -> Self { Self { - epoch, my_peer_id, request_num_peers, retry_limit, @@ -136,12 +133,12 @@ impl BatchRequester { ret_tx: oneshot::Sender>>, mut subscriber_rx: oneshot::Receiver, ) -> Option<(BatchInfo, Vec)> { - let validator_verifier = self.validator_verifier.clone(); + let epoch_state = self.epoch_state.clone(); let mut request_state = BatchRequesterState::new(responders, ret_tx, self.retry_limit); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; - let epoch = self.epoch; + let epoch = self.epoch_state.epoch; let retry_interval = Duration::from_millis(self.retry_interval_ms as u64); let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); @@ -177,7 +174,7 @@ impl BatchRequester { counters::RECEIVED_BATCH_NOT_FOUND_COUNT.inc(); if ledger_info.commit_info().epoch() == epoch && ledger_info.commit_info().timestamp_usecs() > expiration - && ledger_info.verify_signatures(&validator_verifier).is_ok() + && ledger_info.verify_signatures(&epoch_state.verifier).is_ok() { counters::RECEIVED_BATCH_EXPIRED_COUNT.inc(); debug!("QS: batch request expired, digest:{}", digest); diff --git a/consensus/src/quorum_store/counters.rs b/consensus/src/quorum_store/counters.rs index 0be7db69d78ed..754cb68cbb888 100644 --- a/consensus/src/quorum_store/counters.rs +++ b/consensus/src/quorum_store/counters.rs @@ -852,6 +852,16 @@ pub static BATCH_TO_POS_DURATION: Lazy = Lazy::new(|| { ) }); +pub static SIGNED_BATCH_INFO_VERIFY_DURATION: Lazy = Lazy::new(|| { + DurationHistogram::new( + register_histogram!( + "quorum_store_signed_batch_info_verify_duration", + "Histogram of the time durations for verifying signed batch info.", + ) + .unwrap(), + ) +}); + pub static BATCH_SUCCESSFUL_CREATION: Lazy = Lazy::new(|| { register_avg_counter( "quorum_store_batch_successful_creation", diff --git a/consensus/src/quorum_store/network_listener.rs b/consensus/src/quorum_store/network_listener.rs index 381c50ebde350..af07427079aed 100644 --- a/consensus/src/quorum_store/network_listener.rs +++ b/consensus/src/quorum_store/network_listener.rs @@ -11,7 +11,7 @@ use crate::{ }; use aptos_channels::aptos_channel; use aptos_logger::prelude::*; -use aptos_types::PeerId; +use aptos_types::{ledger_info::VerificationStatus, PeerId}; use futures::StreamExt; use tokio::sync::mpsc::Sender; @@ -57,16 +57,29 @@ impl NetworkListener { counters::QUORUM_STORE_MSG_COUNT .with_label_values(&["NetworkListener::signedbatchinfo"]) .inc(); - let cmd = ProofCoordinatorCommand::AppendSignature(*signed_batch_infos); + let cmd = ProofCoordinatorCommand::AppendSignature(( + *signed_batch_infos, + VerificationStatus::Verified, + )); self.proof_coordinator_tx .send(cmd) .await .expect("Could not send signed_batch_info to proof_coordinator"); }, - VerifiedEvent::BatchMsg(batch_msg) => { + VerifiedEvent::UnverifiedSignedBatchInfo(signed_batch_infos) => { counters::QUORUM_STORE_MSG_COUNT - .with_label_values(&["NetworkListener::batchmsg"]) + .with_label_values(&["NetworkListener::signedbatchinfo"]) .inc(); + let cmd = ProofCoordinatorCommand::AppendSignature(( + *signed_batch_infos, + VerificationStatus::Unverified, + )); + self.proof_coordinator_tx + .send(cmd) + .await + .expect("Could not send signed_batch_info to proof_coordinator"); + }, + VerifiedEvent::BatchMsg(batch_msg) => { let author = batch_msg.author(); let batches = batch_msg.take(); counters::RECEIVED_BATCH_MSG_COUNT.inc(); @@ -97,7 +110,7 @@ impl NetworkListener { _ => { unreachable!() }, - }; + } }); } } diff --git a/consensus/src/quorum_store/proof_coordinator.rs b/consensus/src/quorum_store/proof_coordinator.rs index 16df06d1cb500..ac01c5e73b017 100644 --- a/consensus/src/quorum_store/proof_coordinator.rs +++ b/consensus/src/quorum_store/proof_coordinator.rs @@ -12,13 +12,16 @@ use crate::{ use aptos_consensus_types::proof_of_store::{ BatchInfo, ProofCache, ProofOfStore, SignedBatchInfo, SignedBatchInfoError, SignedBatchInfoMsg, }; -use aptos_crypto::bls12381; use aptos_logger::prelude::*; -use aptos_types::{validator_verifier::ValidatorVerifier, PeerId}; +use aptos_types::{ + aggregate_signature::PartialSignatures, epoch_state::EpochState, + ledger_info::VerificationStatus, validator_verifier::ValidatorVerifier, PeerId, +}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::{ - collections::{hash_map::Entry, BTreeMap, HashMap}, + collections::{hash_map::Entry, HashMap}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use tokio::{ sync::{mpsc::Receiver, oneshot as TokioOneshot}, @@ -27,15 +30,17 @@ use tokio::{ #[derive(Debug)] pub(crate) enum ProofCoordinatorCommand { - AppendSignature(SignedBatchInfoMsg), + // The verification status indicates whether the signed batch info message is already verified. + // If the message is not verified, the coordinator is exepected to verify the message. + AppendSignature((SignedBatchInfoMsg, VerificationStatus)), CommitNotification(Vec), Shutdown(TokioOneshot::Sender<()>), } struct IncrementalProofState { info: BatchInfo, - aggregated_signature: BTreeMap, - aggregated_voting_power: u128, + unverified_signatures: PartialSignatures, + verified_signatures: PartialSignatures, self_voted: bool, completed: bool, } @@ -44,17 +49,38 @@ impl IncrementalProofState { fn new(info: BatchInfo) -> Self { Self { info, - aggregated_signature: BTreeMap::new(), - aggregated_voting_power: 0, + unverified_signatures: PartialSignatures::empty(), + verified_signatures: PartialSignatures::empty(), self_voted: false, completed: false, } } + pub fn all_voters(&self) -> Vec { + self.verified_signatures + .signatures() + .keys() + .chain(self.unverified_signatures.signatures().keys()) + .cloned() + .collect() + } + + pub fn voter_count(&self) -> u64 { + self.verified_signatures.signatures().len() as u64 + + self.unverified_signatures.signatures().len() as u64 + } + + pub fn aggregate_voting_power(&self, verifier: &ValidatorVerifier) -> u64 { + verifier + .check_voting_power(self.all_voters().iter(), true) + .unwrap_or(0) as u64 + } + fn add_signature( &mut self, signed_batch_info: &SignedBatchInfo, - validator_verifier: &ValidatorVerifier, + epoch_state: Arc, + verification_status: VerificationStatus, ) -> Result<(), SignedBatchInfoError> { if signed_batch_info.batch_info() != &self.info { return Err(SignedBatchInfoError::WrongInfo(( @@ -63,30 +89,32 @@ impl IncrementalProofState { ))); } - if self - .aggregated_signature - .contains_key(&signed_batch_info.signer()) + match epoch_state + .verifier + .get_voting_power(&signed_batch_info.signer()) { - return Err(SignedBatchInfoError::DuplicatedSignature); - } - - match validator_verifier.get_voting_power(&signed_batch_info.signer()) { - Some(voting_power) => { + Some(_voting_power) => { let signer = signed_batch_info.signer(); - if self - .aggregated_signature - .insert(signer, signed_batch_info.signature().clone()) - .is_none() - { - self.aggregated_voting_power += voting_power as u128; - if signer == self.info.author() { - self.self_voted = true; - } - } else { - error!( - "Author already in aggregated_signatures right after rechecking: {}", - signer - ); + match verification_status { + VerificationStatus::Verified => { + self.verified_signatures + .add_signature(signer, signed_batch_info.signature().clone()); + self.unverified_signatures.remove_signature(signer); + }, + VerificationStatus::Unverified => { + if !self.verified_signatures.contains_voter(&signer) { + warn!( + "Duplicate unverified signatures received from {} on signed batch info", + signer + ); + self.unverified_signatures.remove_signature(signer); + self.unverified_signatures + .add_signature(signer, signed_batch_info.signature().clone()); + } + }, + } + if signer == self.info.author() { + self.self_voted = true; } }, None => { @@ -101,28 +129,112 @@ impl IncrementalProofState { Ok(()) } - fn ready(&self, validator_verifier: &ValidatorVerifier) -> bool { - if self.aggregated_voting_power >= validator_verifier.quorum_voting_power() { - let recheck = - validator_verifier.check_voting_power(self.aggregated_signature.keys(), true); - if recheck.is_err() { - error!("Unexpected discrepancy: aggregated_voting_power is {}, while rechecking we get {:?}", self.aggregated_voting_power, recheck); - } - recheck.is_ok() - } else { - false - } + fn ready(&self, epoch_state: Arc) -> bool { + let all_voters = self.all_voters(); + epoch_state + .verifier + .check_voting_power(all_voters.iter(), true) + .is_ok() } - fn take(&mut self, validator_verifier: &ValidatorVerifier) -> ProofOfStore { + fn merge_signatures(&mut self, ) { + + } + + + fn aggregate_and_verify( + &mut self, + epoch_state: Arc, + ) -> Result { + if !self.ready(epoch_state.clone()) { + return Err(SignedBatchInfoError::LowVotingPower); + } if self.completed { panic!("Cannot call take twice, unexpected issue occurred"); } - self.completed = true; - match validator_verifier.aggregate_signatures(self.aggregated_signature.iter()) { - Ok(sig) => ProofOfStore::new(self.info.clone(), sig), - Err(e) => unreachable!("Cannot aggregate signatures on digest err = {:?}", e), + let mut all_signatures = self.verified_signatures.clone(); + for (author, signature) in self.unverified_signatures.signatures() { + all_signatures.add_signature(*author, signature.clone()); + } + + let aggregated_sig = epoch_state + .verifier + .aggregate_signatures(all_signatures.signatures_iter()) + .map_err(|e| { + error!( + "Unable to aggregate signatures in proof coordinator. err = {:?}", + e + ); + SignedBatchInfoError::UnableToAggregate + })?; + + let (verified_aggregate_signature, malicious_authors) = match epoch_state + .verifier + .verify_multi_signatures(&self.info, &aggregated_sig) + { + Ok(_) => { + for (account_address, signature) in self.unverified_signatures.signatures() { + self.verified_signatures + .add_signature(*account_address, signature.clone()); + } + self.unverified_signatures = PartialSignatures::empty(); + (aggregated_sig, vec![]) + }, + Err(_) => { + // Question: Should we assign min tasks per thread here for into_par_iter()? + // Question: How to add a counter to sum up the time spent in all all the threads? + let verified = self + .unverified_signatures + .signatures() + .into_par_iter() + .flat_map(|(account_address, signature)| { + if epoch_state + .verifier + .verify(*account_address, &self.info, signature) + .is_ok() + { + return Some((*account_address, signature.clone())); + } + None + }) + .collect::>(); + for (account_address, signature) in verified { + self.verified_signatures + .add_signature(account_address, signature.clone()); + self.unverified_signatures.remove_signature(account_address); + } + let malicious_authors = self + .unverified_signatures + .signatures() + .keys() + .cloned() + .collect(); + self.unverified_signatures = PartialSignatures::empty(); + let aggregated_sig = epoch_state + .verifier + .aggregate_signatures(self.verified_signatures.signatures_iter()) + .map_err(|e| { + error!( + "Unable to aggregate signatures in proof coordinator err = {:?}", + e + ); + SignedBatchInfoError::UnableToAggregate + })?; + (aggregated_sig, malicious_authors) + }, + }; + epoch_state + .verifier + .add_malicious_authors(malicious_authors); + if self.ready(epoch_state) { + self.completed = true; + Ok(ProofOfStore::new( + self.info.clone(), + verified_aggregate_signature, + )) + } else { + Err(SignedBatchInfoError::LowVotingPower) } } @@ -136,7 +248,7 @@ pub(crate) struct ProofCoordinator { proof_timeout_ms: usize, batch_info_to_proof: HashMap, // to record the batch creation time - batch_info_to_time: HashMap, + batch_info_to_time: HashMap, timeouts: Timeouts, batch_reader: Arc, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, @@ -191,10 +303,9 @@ impl ProofCoordinator { signed_batch_info.batch_info().clone(), IncrementalProofState::new(signed_batch_info.batch_info().clone()), ); - #[allow(deprecated)] self.batch_info_to_time .entry(signed_batch_info.batch_info().clone()) - .or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64); + .or_insert(Instant::now()); debug!( LogSchema::new(LogEvent::ProofOfStoreInit), digest = signed_batch_info.digest(), @@ -206,7 +317,8 @@ impl ProofCoordinator { fn add_signature( &mut self, signed_batch_info: SignedBatchInfo, - validator_verifier: &ValidatorVerifier, + epoch_state: Arc, + verification_status: VerificationStatus, ) -> Result, SignedBatchInfoError> { if !self .batch_info_to_proof @@ -218,23 +330,25 @@ impl ProofCoordinator { .batch_info_to_proof .get_mut(signed_batch_info.batch_info()) { - value.add_signature(&signed_batch_info, validator_verifier)?; - if !value.completed && value.ready(validator_verifier) { - let proof = value.take(validator_verifier); + value.add_signature(&signed_batch_info, epoch_state.clone(), verification_status)?; + if !value.completed && value.ready(epoch_state.clone()) { + let proof = { + let _timer = counters::SIGNED_BATCH_INFO_VERIFY_DURATION.start_timer(); + value.aggregate_and_verify(epoch_state)? + }; // proof validated locally, so adding to cache self.proof_cache .insert(proof.info().clone(), proof.multi_signature().clone()); // quorum store measurements - #[allow(deprecated)] - let duration = chrono::Utc::now().naive_utc().timestamp_micros() as u64 - - self - .batch_info_to_time - .remove(signed_batch_info.batch_info()) - .ok_or( - // Batch created without recording the time! - SignedBatchInfoError::NoTimeStamps, - )?; - counters::BATCH_TO_POS_DURATION.observe_duration(Duration::from_micros(duration)); + let duration = self + .batch_info_to_time + .remove(signed_batch_info.batch_info()) + .ok_or( + // Batch created without recording the time! + SignedBatchInfoError::NoTimeStamps, + )? + .elapsed(); + counters::BATCH_TO_POS_DURATION.observe_duration(duration); return Ok(Some(proof)); } } else { @@ -243,22 +357,22 @@ impl ProofCoordinator { Ok(None) } - fn update_counters_on_expire(state: &IncrementalProofState) { + fn update_counters_on_expire(state: &IncrementalProofState, epoch_state: Arc) { // Count late votes separately if !state.completed && !state.self_voted { - counters::BATCH_RECEIVED_LATE_REPLIES_COUNT - .inc_by(state.aggregated_signature.len() as u64); + counters::BATCH_RECEIVED_LATE_REPLIES_COUNT.inc_by(state.voter_count()); return; } - counters::BATCH_RECEIVED_REPLIES_COUNT.observe(state.aggregated_signature.len() as f64); - counters::BATCH_RECEIVED_REPLIES_VOTING_POWER.observe(state.aggregated_voting_power as f64); + counters::BATCH_RECEIVED_REPLIES_COUNT.observe(state.voter_count() as f64); + counters::BATCH_RECEIVED_REPLIES_VOTING_POWER + .observe(state.aggregate_voting_power(&epoch_state.verifier) as f64); if !state.completed { counters::BATCH_SUCCESSFUL_CREATION.observe(0.0); } } - async fn expire(&mut self) { + async fn expire(&mut self, epoch_state: Arc) { let mut batch_ids = vec![]; for signed_batch_info_info in self.timeouts.expire() { if let Some(state) = self.batch_info_to_proof.remove(&signed_batch_info_info) { @@ -280,7 +394,7 @@ impl ProofCoordinator { self_voted = state.self_voted, ); } - Self::update_counters_on_expire(&state); + Self::update_counters_on_expire(&state, epoch_state.clone()); } } if self @@ -333,13 +447,13 @@ impl ProofCoordinator { } } }, - ProofCoordinatorCommand::AppendSignature(signed_batch_infos) => { + ProofCoordinatorCommand::AppendSignature((signed_batch_infos, verification_status)) => { let mut proofs = vec![]; for signed_batch_info in signed_batch_infos.take().into_iter() { let peer_id = signed_batch_info.signer(); let digest = *signed_batch_info.digest(); let batch_id = signed_batch_info.batch_id(); - match self.add_signature(signed_batch_info, &validator_verifier) { + match self.add_signature(signed_batch_info, epoch_state.clone(), verification_status.clone()) { Ok(result) => { if let Some(proof) = result { debug!( @@ -371,7 +485,7 @@ impl ProofCoordinator { } }), _ = interval.tick() => { - monitor!("proof_coordinator_handle_tick", self.expire().await); + monitor!("proof_coordinator_handle_tick", self.expire(epoch_state.clone()).await); } } } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 74615b7a733ba..63d5720d95fce 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -34,8 +34,7 @@ use aptos_mempool::QuorumStoreRequest; use aptos_secure_storage::{KVStorage, Storage}; use aptos_storage_interface::DbReader; use aptos_types::{ - account_address::AccountAddress, validator_signer::ValidatorSigner, - validator_verifier::ValidatorVerifier, + account_address::AccountAddress, epoch_state::EpochState, validator_signer::ValidatorSigner, }; use futures::StreamExt; use futures_channel::mpsc::{Receiver, Sender}; @@ -118,9 +117,8 @@ impl DirectMempoolInnerBuilder { // TODO: push most things to config pub struct InnerBuilder { - epoch: u64, + epoch_state: Arc, author: Author, - num_validators: u64, config: QuorumStoreConfig, consensus_to_quorum_store_receiver: Receiver, quorum_store_to_mempool_sender: Sender, @@ -152,9 +150,8 @@ pub struct InnerBuilder { impl InnerBuilder { pub(crate) fn new( - epoch: u64, + epoch_state: Arc, author: Author, - num_validators: u64, config: QuorumStoreConfig, consensus_to_quorum_store_receiver: Receiver, quorum_store_to_mempool_sender: Sender, @@ -191,16 +188,14 @@ impl InnerBuilder { } Self { - epoch, + epoch_state, author, - num_validators, config, consensus_to_quorum_store_receiver, quorum_store_to_mempool_sender, mempool_txn_pull_timeout_ms, aptos_db, network_sender, - verifier, proof_cache, backend, coordinator_tx, @@ -243,17 +238,16 @@ impl InnerBuilder { let last_committed_timestamp = latest_ledger_info_with_sigs.commit_info().timestamp_usecs(); let batch_requester = BatchRequester::new( - self.epoch, self.author, self.config.batch_request_num_peers, self.config.batch_request_retry_limit, self.config.batch_request_retry_interval_ms, self.config.batch_request_rpc_timeout_ms, self.network_sender.clone(), - self.verifier.clone(), + self.epoch_state.clone(), ); let batch_store = Arc::new(BatchStore::new( - self.epoch, + self.epoch_state.epoch, last_committed_timestamp, self.quorum_store_storage.clone(), self.config.memory_quota, @@ -297,7 +291,7 @@ impl InnerBuilder { let batch_generator_cmd_rx = self.batch_generator_cmd_rx.take().unwrap(); let back_pressure_rx = self.back_pressure_rx.take().unwrap(); let batch_generator = BatchGenerator::new( - self.epoch, + self.epoch_state.epoch, self.author, self.config.clone(), self.quorum_store_storage.clone(), @@ -346,12 +340,13 @@ impl InnerBuilder { self.proof_cache, self.broadcast_proofs, ); + let epoch_state = self.epoch_state.clone(); spawn_named!( "proof_coordinator", proof_coordinator.start( proof_coordinator_cmd_rx, self.network_sender.clone(), - self.verifier.clone(), + epoch_state, ) ); @@ -362,7 +357,7 @@ impl InnerBuilder { self.config .back_pressure .backlog_per_validator_batch_limit_count - * self.num_validators, + * self.epoch_state.verifier.len() as u64, self.batch_store.clone().unwrap(), self.config.allow_batches_without_pos_in_proposal, self.config.enable_opt_quorum_store, @@ -376,9 +371,8 @@ impl InnerBuilder { ) ); - let network_msg_rx = self.quorum_store_msg_rx.take().unwrap(); let net = NetworkListener::new( - network_msg_rx, + self.quorum_store_msg_rx.take().unwrap(), self.proof_coordinator_cmd_tx.clone(), self.remote_batch_coordinator_cmd_tx.clone(), self.proof_manager_cmd_tx.clone(), @@ -386,7 +380,7 @@ impl InnerBuilder { spawn_named!("network_listener", net.start()); let batch_store = self.batch_store.clone().unwrap(); - let epoch = self.epoch; + let epoch = self.epoch_state.epoch; let (batch_retrieval_tx, mut batch_retrieval_rx) = aptos_channel::new::( QueueStyle::LIFO, @@ -445,7 +439,7 @@ impl InnerBuilder { // TODO: remove after splitting out clean requests self.coordinator_tx.clone(), consensus_publisher, - self.verifier.get_ordered_account_addresses(), + self.epoch_state.verifier.get_ordered_account_addresses(), )), Some(self.quorum_store_msg_tx.clone()), ) diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs index 33b63849e1940..b8afb38fa09fb 100644 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ b/consensus/src/quorum_store/tests/batch_requester_test.rs @@ -16,12 +16,16 @@ use aptos_crypto::HashValue; use aptos_types::{ aggregate_signature::PartialSignatures, block_info::BlockInfo, + epoch_state::EpochState, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, validator_signer::ValidatorSigner, validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, }; use move_core_types::account_address::AccountAddress; -use std::time::{Duration, Instant}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::oneshot; #[derive(Clone)] @@ -80,9 +84,11 @@ async fn test_batch_request_exists() { let batch_response = BatchResponse::Batch(batch.clone()); let validator_signer = ValidatorSigner::random(None); + let verifier = + ValidatorVerifier::new_single(validator_signer.author(), validator_signer.public_key()); + let epoch_state = Arc::new(EpochState::new(5, verifier)); let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_requester = BatchRequester::new( - 1, AccountAddress::random(), 1, 2, @@ -166,7 +172,7 @@ async fn test_batch_request_not_exists_not_expired() { // Batch has not expired yet let (ledger_info_with_signatures, validator_verifier) = create_ledger_info_with_timestamp(expiration - 1); - + let epoch_state = Arc::new(EpochState::new(5, validator_verifier)); let batch = Batch::new( BatchId::new_for_test(1), vec![], @@ -178,7 +184,6 @@ async fn test_batch_request_not_exists_not_expired() { let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_response = BatchResponse::NotFound(ledger_info_with_signatures); let batch_requester = BatchRequester::new( - 1, AccountAddress::random(), 1, 2, @@ -214,7 +219,7 @@ async fn test_batch_request_not_exists_expired() { // Batch has expired according to the ledger info that will be returned let (ledger_info_with_signatures, validator_verifier) = create_ledger_info_with_timestamp(expiration + 1); - + let epoch_state = Arc::new(EpochState::new(1, validator_verifier)); let batch = Batch::new( BatchId::new_for_test(1), vec![], @@ -226,7 +231,6 @@ async fn test_batch_request_not_exists_expired() { let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_response = BatchResponse::NotFound(ledger_info_with_signatures); let batch_requester = BatchRequester::new( - 1, AccountAddress::random(), 1, 2, diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index e38fb3fda274f..fa82f7decba57 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -14,7 +14,8 @@ use aptos_consensus_types::proof_of_store::{BatchId, SignedBatchInfo, SignedBatc use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; use aptos_types::{ - transaction::SignedTransaction, validator_verifier::random_validator_verifier, PeerId, + epoch_state::EpochState, ledger_info::VerificationStatus, transaction::SignedTransaction, + validator_verifier::random_validator_verifier, PeerId, }; use mini_moka::sync::Cache; use std::sync::Arc; @@ -47,6 +48,7 @@ impl BatchReader for MockBatchReader { async fn test_proof_coordinator_basic() { aptos_logger::Logger::init_for_testing(); let (signers, verifier) = random_validator_verifier(4, None, true); + let epoch_state = Arc::new(EpochState::new(5, verifier)); let (tx, _rx) = channel(100); let proof_cache = Cache::builder().build(); let proof_coordinator = ProofCoordinator::new( @@ -74,9 +76,10 @@ async fn test_proof_coordinator_basic() { for signer in &signers { let signed_batch_info = SignedBatchInfo::new(batch.batch_info().clone(), signer).unwrap(); assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::AppendSignature( - SignedBatchInfoMsg::new(vec![signed_batch_info]) - )) + .send(ProofCoordinatorCommand::AppendSignature(( + SignedBatchInfoMsg::new(vec![signed_batch_info]), + VerificationStatus::Verified, + ))) .await .is_ok()); } @@ -86,7 +89,78 @@ async fn test_proof_coordinator_basic() { msg => panic!("Expected LocalProof but received: {:?}", msg), }; // check normal path - assert!(proof_msg.verify(100, &verifier, &proof_cache).is_ok()); + assert!(proof_msg + .verify(100, &epoch_state.verifier, &proof_cache) + .is_ok()); let proofs = proof_msg.take(); assert_eq!(proofs[0].digest(), digest); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_proof_coordinator_with_unverified_signatures() { + aptos_logger::Logger::init_for_testing(); + let (signers, verifier) = random_validator_verifier(10, Some(4), true); + let epoch_state = Arc::new(EpochState::new(10, verifier)); + let (tx, _rx) = channel(100); + let proof_cache = Cache::builder().build(); + let proof_coordinator = ProofCoordinator::new( + 100, + signers[0].author(), + Arc::new(MockBatchReader { + peer: signers[0].author(), + }), + tx, + proof_cache.clone(), + true, + ); + let (proof_coordinator_tx, proof_coordinator_rx) = channel(100); + let (tx, mut rx) = channel(100); + let network_sender = MockQuorumStoreSender::new(tx); + tokio::spawn(proof_coordinator.start( + proof_coordinator_rx, + network_sender, + epoch_state.clone(), + )); + + let batch_author = signers[0].author(); + for batch_index in 1..10 { + let batch_id = BatchId::new_for_test(batch_index); + let payload = create_vec_signed_transactions(100); + let batch = Batch::new(batch_id, payload, 1, 20, batch_author, 0); + let digest = batch.digest(); + + for (signer_index, signer) in signers.iter().enumerate() { + if signer_index > 2 { + let signed_batch_info = SignedBatchInfo::new(batch.batch_info().clone(), signer) + .expect("Failed to create SignedBatchInfo"); + + assert!(proof_coordinator_tx + .send(ProofCoordinatorCommand::AppendSignature(( + SignedBatchInfoMsg::new(vec![signed_batch_info]), + VerificationStatus::Unverified, + ))) + .await + .is_ok()) + } else { + let signed_batch_info = + SignedBatchInfo::dummy(batch.batch_info().clone(), signer.author()); + assert!(proof_coordinator_tx + .send(ProofCoordinatorCommand::AppendSignature(( + SignedBatchInfoMsg::new(vec![signed_batch_info]), + VerificationStatus::Unverified, + ))) + .await + .is_ok()); + } + } + + let proof_msg = match rx.recv().await.expect("channel dropped") { + (ConsensusMsg::ProofOfStoreMsg(proof_msg), _) => *proof_msg, + msg => panic!("Expected LocalProof but received: {:?}", msg), + }; + + let proofs = proof_msg.take(); + assert_eq!(proofs[0].digest(), digest); + assert_eq!(epoch_state.verifier.pessimistic_verify_set().len(), 3); + } +}