Skip to content

Commit

Permalink
[qs] nits
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Mar 14, 2023
1 parent a02e820 commit 6807fde
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 86 deletions.
11 changes: 8 additions & 3 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::Context;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_types::{
Expand Down Expand Up @@ -160,8 +160,12 @@ impl SignedDigest {
self.epoch
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
if sender == self.signer {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
} else {
bail!("Sender {} mismatch signer {}", sender, self.signer);
}
}

pub fn info(&self) -> &SignedDigestInfo {
Expand All @@ -179,6 +183,7 @@ impl SignedDigest {

#[derive(Debug, PartialEq)]
pub enum SignedDigestError {
WrongAuthor,
WrongInfo,
DuplicatedSignature,
}
Expand Down
14 changes: 3 additions & 11 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ impl NetworkSender {

/// Tries to send the given msg to all the participants.
///
/// The future is fulfilled as soon as the message put into the mpsc channel to network
/// internal(to provide back pressure), it does not indicate the message is delivered or sent
/// out. It does not give indication about when the message is delivered to the recipients,
/// as well as there is no indication about the network failures.
/// The future is fulfilled as soon as the message is put into the mpsc channel to network
/// internal (to provide back pressure), it does not indicate the message is delivered or sent
/// out.
async fn broadcast(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());
// Directly send the message to ourself without going through network.
Expand All @@ -186,13 +185,6 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

self.broadcast_without_self(msg).await;
}

/// Tries to send the given msg to all the participants, excluding self.
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
Expand Down
1 change: 0 additions & 1 deletion consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ impl PayloadManager {

let mut tx = coordinator_tx.clone();

// TODO: don't even need to warn on fail?
if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
Expand Down
25 changes: 2 additions & 23 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
counters,
quorum_store_db::QuorumStoreStorage,
types::Fragment,
utils::{BatchBuilder, MempoolProxy, RoundExpirations, Timeouts},
utils::{BatchBuilder, MempoolProxy, RoundExpirations},
},
};
use aptos_config::config::QuorumStoreConfig;
Expand Down Expand Up @@ -48,7 +48,6 @@ pub struct BatchGenerator {
mempool_proxy: MempoolProxy,
batches_in_progress: HashMap<BatchId, Vec<TransactionSummary>>,
batch_round_expirations: RoundExpirations<BatchId>,
batch_time_expirations: Timeouts<BatchId>,
batch_builder: BatchBuilder,
latest_logical_time: LogicalTime,
last_end_batch_time: Instant,
Expand Down Expand Up @@ -90,7 +89,6 @@ impl BatchGenerator {
mempool_proxy: MempoolProxy::new(mempool_tx, mempool_txn_pull_timeout_ms),
batches_in_progress: HashMap::new(),
batch_round_expirations: RoundExpirations::new(),
batch_time_expirations: Timeouts::new(),
batch_builder: BatchBuilder::new(batch_id, max_batch_bytes),
latest_logical_time: LogicalTime::new(epoch, 0),
last_end_batch_time: Instant::now(),
Expand Down Expand Up @@ -138,14 +136,11 @@ impl BatchGenerator {
if !self
.batch_builder
.append_transactions(&pulled_txns, max_count as usize)
|| self.last_end_batch_time.elapsed().as_millis() > self.config.end_batch_ms as u128
{
end_batch = true;
}

if self.last_end_batch_time.elapsed().as_millis() > self.config.end_batch_ms as u128 {
end_batch = true;
}

let serialized_txns = self.batch_builder.take_serialized_txns();

let batch_id = self.batch_builder.batch_id();
Expand Down Expand Up @@ -205,27 +200,13 @@ impl BatchGenerator {
.insert(batch_id, self.batch_builder.take_summaries());
self.batch_round_expirations
.add_item(batch_id, expiry_round);
self.batch_time_expirations
.add(batch_id, self.config.proof_timeout_ms);

self.last_end_batch_time = Instant::now();
return Some(fragment);
}
None
}

fn expire(&mut self) {
for batch_id in self.batch_time_expirations.expire() {
if self.batches_in_progress.remove(&batch_id).is_some() {
debug!(
"QS: timestamp based expiration batch w. id {} from batches_in_progress, new size {}",
batch_id,
self.batches_in_progress.len(),
);
}
}
}

pub async fn start(
mut self,
mut network_sender: NetworkSender,
Expand Down Expand Up @@ -255,8 +236,6 @@ impl BatchGenerator {
self.back_pressure = updated_back_pressure;
},
_ = interval.tick() => monitor!("batch_generator_handle_tick", {
// expire based on timestamp
self.expire();

let now = Instant::now();
// TODO: refactor back_pressure logic into its own function
Expand Down
38 changes: 13 additions & 25 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,30 +433,19 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
self.last_certified_round.load(Ordering::Relaxed)
}

fn get_batch_from_db(&self, digest: &HashValue) -> Result<Vec<SignedTransaction>, Error> {
fn get_batch_from_db(&self, digest: &HashValue) -> Result<PersistedValue, Error> {
counters::GET_BATCH_FROM_DB_COUNT.inc();

match self.db.get_batch(digest) {
Ok(Some(persisted_value)) => {
let payload = persisted_value
.maybe_payload
.expect("Persisted value in QuorumStore DB must have payload");
return Ok(payload);
},
Ok(None) => {
unreachable!("Could not read persisted value (according to BatchReader) from DB")
},
Err(_) => {
// TODO: handle error, e.g. from self or not, log, panic.
Ok(Some(persisted_value)) => Ok(persisted_value),
Ok(None) | Err(_) => {
error!("Could not get batch from db");
Err(Error::CouldNotGetData)
},
}
Err(Error::CouldNotGetData)
}

pub fn get_batch_from_local(
&self,
digest: &HashValue,
) -> Result<Vec<SignedTransaction>, Error> {
pub fn get_batch_from_local(&self, digest: &HashValue) -> Result<PersistedValue, Error> {
if let Some(value) = self.db_cache.get(digest) {
if payload_storage_mode(&value) == StorageMode::PersistedOnly {
assert!(
Expand All @@ -466,10 +455,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
self.get_batch_from_db(digest)
} else {
// Available in memory.
Ok(value
.maybe_payload
.clone()
.expect("BatchReader payload and storage kind mismatch"))
Ok(value.clone())
}
} else {
Err(Error::CouldNotGetData)
Expand All @@ -478,7 +464,8 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
}

pub trait BatchReader: Send + Sync {
fn exists(&self, digest: &HashValue) -> bool;
/// Check if the batch corresponding to the digest exists, return the batch author if true
fn exists(&self, digest: &HashValue) -> Option<PeerId>;

fn get_batch(
&self,
Expand All @@ -487,8 +474,8 @@ pub trait BatchReader: Send + Sync {
}

impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for BatchStore<T> {
fn exists(&self, digest: &HashValue) -> bool {
self.get_batch_from_local(digest).is_ok()
fn exists(&self, digest: &HashValue) -> Option<PeerId> {
self.get_batch_from_local(digest).map(|v| v.author).ok()
}

fn get_batch(
Expand All @@ -498,7 +485,8 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
let (tx, rx) = oneshot::channel();

if let Ok(value) = self.get_batch_from_local(proof.digest()) {
tx.send(Ok(value)).unwrap();
tx.send(Ok(value.maybe_payload.expect("Must have payload")))
.unwrap();
} else {
// Quorum store metrics
counters::MISSED_BATCHES_COUNT.inc();
Expand Down
35 changes: 20 additions & 15 deletions consensus/src/quorum_store/proof_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ impl IncrementalProofState {
Ok(())
}

fn ready(&self, validator_verifier: &ValidatorVerifier, my_peer_id: PeerId) -> bool {
self.aggregated_signature.contains_key(&my_peer_id)
&& validator_verifier
.check_voting_power(self.aggregated_signature.keys())
.is_ok()
fn ready(&self, validator_verifier: &ValidatorVerifier) -> bool {
validator_verifier
.check_voting_power(self.aggregated_signature.keys())
.is_ok()
}

fn take(self, validator_verifier: &ValidatorVerifier) -> ProofOfStore {
Expand Down Expand Up @@ -110,7 +109,19 @@ impl ProofCoordinator {
}
}

fn init_proof(&mut self, signed_digest: &SignedDigest) {
fn init_proof(&mut self, signed_digest: &SignedDigest) -> Result<(), SignedDigestError> {
// Check if the signed digest corresponding to our batch
if signed_digest.info().batch_author != self.peer_id {
return Err(SignedDigestError::WrongAuthor);
}
let batch_author = self
.batch_reader
.exists(&signed_digest.digest())
.ok_or(SignedDigestError::WrongAuthor)?;
if batch_author != signed_digest.info().batch_author {
return Err(SignedDigestError::WrongAuthor);
}

self.timeouts
.add(signed_digest.info().clone(), self.proof_timeout_ms);
self.digest_to_proof.insert(
Expand All @@ -120,6 +131,7 @@ impl ProofCoordinator {
self.digest_to_time
.entry(signed_digest.digest())
.or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64);
Ok(())
}

fn add_signature(
Expand All @@ -128,21 +140,14 @@ impl ProofCoordinator {
validator_verifier: &ValidatorVerifier,
) -> Result<Option<ProofOfStore>, SignedDigestError> {
if !self.digest_to_proof.contains_key(&signed_digest.digest()) {
if signed_digest.info().batch_author == self.peer_id
&& self.batch_reader.exists(&signed_digest.digest())
{
self.init_proof(&signed_digest);
} else {
return Err(SignedDigestError::WrongInfo);
}
self.init_proof(&signed_digest)?;
}
let digest = signed_digest.digest();
let my_id = self.peer_id;

match self.digest_to_proof.entry(signed_digest.digest()) {
Entry::Occupied(mut entry) => {
entry.get_mut().add_signature(signed_digest)?;
if entry.get_mut().ready(validator_verifier, my_id) {
if entry.get_mut().ready(validator_verifier) {
let (_, state) = entry.remove_entry();
let proof = state.take(validator_verifier);
// quorum store measurements
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ impl InnerBuilder {
while let Some(rpc_request) = batch_retrieval_rx.next().await {
counters::RECEIVED_BATCH_REQUEST_COUNT.inc();
if let Ok(value) = batch_store.get_batch_from_local(&rpc_request.req.digest()) {
let batch = Batch::new(author, epoch, rpc_request.req.digest(), value);
let batch = Batch::new(
author,
epoch,
rpc_request.req.digest(),
value.maybe_payload.expect("Must have payload"),
);
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
let bytes = rpc_request.protocol.to_bytes(&msg).unwrap();
if let Err(e) = rpc_request
Expand Down
22 changes: 16 additions & 6 deletions consensus/src/quorum_store/tests/proof_coordinator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ use crate::{
use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime, ProofOfStore, SignedDigest};
use aptos_crypto::HashValue;
use aptos_executor_types::Error;
use aptos_types::{transaction::SignedTransaction, validator_verifier::random_validator_verifier};
use aptos_types::{
transaction::SignedTransaction, validator_verifier::random_validator_verifier, PeerId,
};
use std::sync::Arc;
use tokio::sync::{mpsc::channel, oneshot::Receiver};

pub struct MockBatchReader;
pub struct MockBatchReader {
peer: PeerId,
}

impl BatchReader for MockBatchReader {
fn exists(&self, _digest: &HashValue) -> bool {
true
fn exists(&self, _digest: &HashValue) -> Option<PeerId> {
Some(self.peer)
}

fn get_batch(&self, _proof: ProofOfStore) -> Receiver<Result<Vec<SignedTransaction>, Error>> {
Expand All @@ -34,8 +38,14 @@ async fn test_proof_coordinator_basic() {
aptos_logger::Logger::init_for_testing();
let (signers, verifier) = random_validator_verifier(4, None, true);
let (tx, _rx) = channel(100);
let proof_coordinator =
ProofCoordinator::new(100, signers[0].author(), Arc::new(MockBatchReader), tx);
let proof_coordinator = ProofCoordinator::new(
100,
signers[0].author(),
Arc::new(MockBatchReader {
peer: signers[0].author(),
}),
tx,
);
let (proof_coordinator_tx, proof_coordinator_rx) = channel(100);
let (tx, mut rx) = channel(100);
let network_sender = MockQuorumStoreSender::new(tx);
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl UnverifiedEvent {
},
UnverifiedEvent::SignedDigestMsg(sd) => {
if !self_message {
sd.verify(validator)?;
sd.verify(peer_id, validator)?;
}
VerifiedEvent::SignedDigestMsg(sd)
},
Expand Down
2 changes: 2 additions & 0 deletions testsuite/generate-format/tests/staged/consensus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ SignedDigestInfo:
STRUCT:
- batch_author:
TYPENAME: AccountAddress
- batch_id:
TYPENAME: BatchId
- digest:
TYPENAME: HashValue
- expiration:
Expand Down

0 comments on commit 6807fde

Please sign in to comment.