Skip to content

Commit

Permalink
Old OSV code
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Oct 8, 2024
1 parent ebc1b64 commit 04d3589
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 121 deletions.
12 changes: 12 additions & 0 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ impl SignedBatchInfo {
})
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn dummy(batch_info: BatchInfo, signer: PeerId) -> Self {
Self {
info: batch_info,
signer,
signature: bls12381::Signature::dummy_signature(),
}
}

pub fn signer(&self) -> PeerId {
self.signer
}
Expand Down Expand Up @@ -289,6 +298,9 @@ pub enum SignedBatchInfoError {
NotFound,
AlreadyCommitted,
NoTimeStamps,
UnableToAggregate,
LowVotingPower,
InvalidAggregatedSignature,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,16 +684,14 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let mut quorum_store_builder = if self.quorum_store_enabled {
info!("Building QuorumStore");
QuorumStoreBuilder::QuorumStore(InnerBuilder::new(
self.epoch(),
epoch_state.clone(),
self.author,
epoch_state.verifier.len() as u64,
quorum_store_config,
consensus_to_quorum_store_rx,
self.quorum_store_to_mempool_sender.clone(),
self.config.mempool_txn_pull_timeout_ms,
self.storage.aptos_db().clone(),
network_sender,
epoch_state.verifier.clone(),
self.proof_cache.clone(),
self.config.safety_rules.backend.clone(),
self.quorum_store_storage.clone(),
Expand Down
13 changes: 5 additions & 8 deletions consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use aptos_consensus_types::proof_of_store::BatchInfo;
use aptos_crypto::HashValue;
use aptos_executor_types::*;
use aptos_logger::prelude::*;
use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId};
use aptos_types::{epoch_state::EpochState, transaction::SignedTransaction, PeerId};
use futures::{stream::FuturesUnordered, StreamExt};
use rand::Rng;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -95,19 +95,17 @@ impl BatchRequesterState {
}

pub(crate) struct BatchRequester<T> {
epoch: u64,
my_peer_id: PeerId,
request_num_peers: usize,
retry_limit: usize,
retry_interval_ms: usize,
rpc_timeout_ms: usize,
network_sender: T,
validator_verifier: Arc<ValidatorVerifier>,
epoch_state: Arc<EpochState>,
}

impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
pub(crate) fn new(
epoch: u64,
my_peer_id: PeerId,
request_num_peers: usize,
retry_limit: usize,
Expand All @@ -117,7 +115,6 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
validator_verifier: Arc<ValidatorVerifier>,
) -> Self {
Self {
epoch,
my_peer_id,
request_num_peers,
retry_limit,
Expand All @@ -136,12 +133,12 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
ret_tx: oneshot::Sender<ExecutorResult<Vec<SignedTransaction>>>,
mut subscriber_rx: oneshot::Receiver<PersistedValue>,
) -> Option<(BatchInfo, Vec<SignedTransaction>)> {
let validator_verifier = self.validator_verifier.clone();
let epoch_state = self.epoch_state.clone();
let mut request_state = BatchRequesterState::new(responders, ret_tx, self.retry_limit);
let network_sender = self.network_sender.clone();
let request_num_peers = self.request_num_peers;
let my_peer_id = self.my_peer_id;
let epoch = self.epoch;
let epoch = self.epoch_state.epoch;
let retry_interval = Duration::from_millis(self.retry_interval_ms as u64);
let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64);

Expand Down Expand Up @@ -177,7 +174,7 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
counters::RECEIVED_BATCH_NOT_FOUND_COUNT.inc();
if ledger_info.commit_info().epoch() == epoch
&& ledger_info.commit_info().timestamp_usecs() > expiration
&& ledger_info.verify_signatures(&validator_verifier).is_ok()
&& ledger_info.verify_signatures(&epoch_state.verifier).is_ok()
{
counters::RECEIVED_BATCH_EXPIRED_COUNT.inc();
debug!("QS: batch request expired, digest:{}", digest);
Expand Down
10 changes: 10 additions & 0 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,16 @@ pub static BATCH_TO_POS_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
)
});

pub static SIGNED_BATCH_INFO_VERIFY_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_signed_batch_info_verify_duration",
"Histogram of the time durations for verifying signed batch info.",
)
.unwrap(),
)
});

pub static BATCH_SUCCESSFUL_CREATION: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"quorum_store_batch_successful_creation",
Expand Down
23 changes: 18 additions & 5 deletions consensus/src/quorum_store/network_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use aptos_channels::aptos_channel;
use aptos_logger::prelude::*;
use aptos_types::PeerId;
use aptos_types::{ledger_info::VerificationStatus, PeerId};
use futures::StreamExt;
use tokio::sync::mpsc::Sender;

Expand Down Expand Up @@ -57,16 +57,29 @@ impl NetworkListener {
counters::QUORUM_STORE_MSG_COUNT
.with_label_values(&["NetworkListener::signedbatchinfo"])
.inc();
let cmd = ProofCoordinatorCommand::AppendSignature(*signed_batch_infos);
let cmd = ProofCoordinatorCommand::AppendSignature((
*signed_batch_infos,
VerificationStatus::Verified,
));
self.proof_coordinator_tx
.send(cmd)
.await
.expect("Could not send signed_batch_info to proof_coordinator");
},
VerifiedEvent::BatchMsg(batch_msg) => {
VerifiedEvent::UnverifiedSignedBatchInfo(signed_batch_infos) => {
counters::QUORUM_STORE_MSG_COUNT
.with_label_values(&["NetworkListener::batchmsg"])
.with_label_values(&["NetworkListener::signedbatchinfo"])
.inc();
let cmd = ProofCoordinatorCommand::AppendSignature((
*signed_batch_infos,
VerificationStatus::Unverified,
));
self.proof_coordinator_tx
.send(cmd)
.await
.expect("Could not send signed_batch_info to proof_coordinator");
},
VerifiedEvent::BatchMsg(batch_msg) => {
let author = batch_msg.author();
let batches = batch_msg.take();
counters::RECEIVED_BATCH_MSG_COUNT.inc();
Expand Down Expand Up @@ -97,7 +110,7 @@ impl NetworkListener {
_ => {
unreachable!()
},
};
}
});
}
}
Expand Down
Loading

0 comments on commit 04d3589

Please sign in to comment.