Skip to content

Commit

Permalink
randomness #4: RandManager update from randomnet (#12224)
Browse files Browse the repository at this point in the history
* RandManager update from randomnet

* lint

* lint
  • Loading branch information
zjma committed Mar 1, 2024
1 parent 16cfd22 commit 87b5e28
Show file tree
Hide file tree
Showing 20 changed files with 669 additions and 204 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aptos-consensus-notifications = { workspace = true }
aptos-consensus-types = { workspace = true }
aptos-crypto = { workspace = true }
aptos-crypto-derive = { workspace = true }
aptos-dkg = { workspace = true }
aptos-enum-conversion-derive = { workspace = true }
aptos-event-notifications = { workspace = true }
aptos-executor = { workspace = true }
Expand Down Expand Up @@ -76,6 +77,7 @@ serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
sha3 = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand All @@ -91,6 +93,7 @@ aptos-keygen = { workspace = true }
aptos-mempool = { workspace = true, features = ["fuzzing"] }
aptos-network = { workspace = true, features = ["fuzzing"] }
aptos-safety-rules = { workspace = true, features = ["testing"] }
aptos-vm = { workspace = true, features = ["fuzzing"] }
aptos-vm-validator = { workspace = true }
claims = { workspace = true }
move-core-types = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/block_storage/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ impl BlockStage {
pub const QC_ADDED: &'static str = "qc_added";
pub const QC_AGGREGATED: &'static str = "qc_aggregated";
pub const RAND_ADD_DECISION: &'static str = "rand_add_decision";
pub const RAND_ADD_SHARE: &'static str = "rand_add_share";
pub const RAND_AGG_DECISION: &'static str = "rand_agg_decision";
pub const RAND_ADD_ENOUGH_SHARE: &'static str = "rand_add_enough_share";
pub const RAND_ENTER: &'static str = "rand_enter";
pub const RAND_READY: &'static str = "rand_ready";
pub const ROUND_MANAGER_RECEIVED: &'static str = "round_manager_received";
Expand Down
8 changes: 8 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,3 +959,11 @@ pub static PROPOSED_VTXN_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
)
.unwrap()
});

pub static RAND_QUEUE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_consensus_rand_queue_size",
"Number of randomness-pending blocks."
)
.unwrap()
});
13 changes: 13 additions & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_logger::Schema;
use aptos_types::block_info::Round;
use serde::Serialize;

#[derive(Schema)]
pub struct LogSchema {
event: LogEvent,
author: Option<Author>,
remote_peer: Option<Author>,
epoch: Option<u64>,
round: Option<Round>,
id: Option<HashValue>,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -40,15 +43,25 @@ pub enum LogEvent {
Timeout,
Vote,
VoteNIL,
// log events related to randomness generation
BroadcastRandShare,
ReceiveProactiveRandShare,
ReceiveReactiveRandShare,
BroadcastAugData,
ReceiveAugData,
BroadcastCertifiedAugData,
ReceiveCertifiedAugData,
}

impl LogSchema {
pub fn new(event: LogEvent) -> Self {
Self {
event,
author: None,
remote_peer: None,
epoch: None,
round: None,
id: None,
}
}
}
2 changes: 1 addition & 1 deletion consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
network_interface::{ConsensusMsg, ConsensusNetworkClient, RPC},
pipeline::commit_reliable_broadcast::CommitMessage,
quorum_store::types::{Batch, BatchMsg, BatchRequest, BatchResponse},
rand::rand_gen::RandGenMessage,
rand::rand_gen::network_messages::RandGenMessage,
};
use anyhow::{anyhow, bail, ensure};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
dag::DAGNetworkMessage,
pipeline,
quorum_store::types::{Batch, BatchMsg, BatchRequest, BatchResponse},
rand::rand_gen::RandGenMessage,
rand::rand_gen::network_messages::RandGenMessage,
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
Expand Down
28 changes: 17 additions & 11 deletions consensus/src/rand/rand_gen/aug_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use crate::rand::rand_gen::{
storage::interface::AugDataStorage,
storage::interface::RandStorage,
types::{
AugData, AugDataId, AugDataSignature, AugmentedData, CertifiedAugData, CertifiedAugDataAck,
RandConfig,
AugData, AugDataId, AugDataSignature, CertifiedAugData, CertifiedAugDataAck, RandConfig,
TAugmentedData,
},
};
use anyhow::ensure;
Expand All @@ -14,16 +14,16 @@ use aptos_logger::error;
use aptos_types::validator_signer::ValidatorSigner;
use std::{collections::HashMap, sync::Arc};

pub struct AugDataStore<D, Storage> {
pub struct AugDataStore<D> {
epoch: u64,
signer: Arc<ValidatorSigner>,
config: RandConfig,
data: HashMap<Author, AugData<D>>,
certified_data: HashMap<Author, CertifiedAugData<D>>,
db: Arc<Storage>,
db: Arc<dyn RandStorage<D>>,
}

impl<D: AugmentedData, Storage: AugDataStorage<D>> AugDataStore<D, Storage> {
impl<D: TAugmentedData> AugDataStore<D> {
fn filter_by_epoch<T>(
epoch: u64,
all_data: impl Iterator<Item = (AugDataId, T)>,
Expand All @@ -44,24 +44,30 @@ impl<D: AugmentedData, Storage: AugDataStorage<D>> AugDataStore<D, Storage> {
epoch: u64,
signer: Arc<ValidatorSigner>,
config: RandConfig,
db: Arc<Storage>,
db: Arc<dyn RandStorage<D>>,
) -> Self {
let all_data = db.get_all_aug_data().unwrap_or_default();
let (to_remove, aug_data) = Self::filter_by_epoch(epoch, all_data.into_iter());
if let Err(e) = db.remove_aug_data(to_remove.into_iter()) {
if let Err(e) = db.remove_aug_data(to_remove) {
error!("[AugDataStore] failed to remove aug data: {:?}", e);
}

let all_certified_data = db.get_all_certified_aug_data().unwrap_or_default();
let (to_remove, certified_data) =
Self::filter_by_epoch(epoch, all_certified_data.into_iter());
if let Err(e) = db.remove_certified_aug_data(to_remove.into_iter()) {
if let Err(e) = db.remove_certified_aug_data(to_remove) {
error!(
"[AugDataStore] failed to remove certified aug data: {:?}",
e
);
}

for (_, certified_data) in &certified_data {
certified_data
.data()
.augment(&config, certified_data.author());
}

Self {
epoch,
signer,
Expand All @@ -79,11 +85,11 @@ impl<D: AugmentedData, Storage: AugDataStorage<D>> AugDataStore<D, Storage> {
}

pub fn get_my_aug_data(&self) -> Option<AugData<D>> {
self.data.get(self.config.author()).cloned()
self.data.get(&self.config.author()).cloned()
}

pub fn get_my_certified_aug_data(&self) -> Option<CertifiedAugData<D>> {
self.certified_data.get(self.config.author()).cloned()
self.certified_data.get(&self.config.author()).cloned()
}

pub fn add_aug_data(&mut self, data: AugData<D>) -> anyhow::Result<AugDataSignature> {
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/rand/rand_gen/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl BlockQueue {
}
}

pub fn queue(&self) -> &BTreeMap<Round, QueueItem> {
&self.queue
}

pub fn push_back(&mut self, item: QueueItem) {
for block in item.blocks() {
observe_block(block.timestamp_usecs(), BlockStage::RAND_ENTER);
Expand Down
20 changes: 9 additions & 11 deletions consensus/src/rand/rand_gen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
#[cfg(test)]
mod test_utils;

mod block_queue;
mod network_messages;
mod rand_store;
mod types;

mod aug_data_store;
mod rand_manager;
mod reliable_broadcast_state;
mod storage;

pub use network_messages::RandGenMessage;
pub mod block_queue;
pub mod network_messages;
pub mod rand_store;
pub mod types;

pub mod aug_data_store;
pub mod rand_manager;
pub mod reliable_broadcast_state;
pub mod storage;
10 changes: 5 additions & 5 deletions consensus/src/rand/rand_gen/network_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
network::TConsensusMsg,
network_interface::ConsensusMsg,
rand::rand_gen::types::{
AugData, AugDataSignature, AugmentedData, CertifiedAugData, CertifiedAugDataAck,
RandConfig, RandShare, RequestShare, Share,
AugData, AugDataSignature, CertifiedAugData, CertifiedAugDataAck, RandConfig, RandShare,
RequestShare, TAugmentedData, TShare,
},
};
use anyhow::bail;
Expand All @@ -30,7 +30,7 @@ pub enum RandMessage<S, D> {
CertifiedAugDataAck(CertifiedAugDataAck),
}

impl<S: Share, D: AugmentedData> RandMessage<S, D> {
impl<S: TShare, D: TAugmentedData> RandMessage<S, D> {
pub fn verify(
&self,
epoch_state: &EpochState,
Expand All @@ -49,9 +49,9 @@ impl<S: Share, D: AugmentedData> RandMessage<S, D> {
}
}

impl<S: Share, D: AugmentedData> RBMessage for RandMessage<S, D> {}
impl<S: TShare, D: TAugmentedData> RBMessage for RandMessage<S, D> {}

impl<S: Share, D: AugmentedData> TConsensusMsg for RandMessage<S, D> {
impl<S: TShare, D: TAugmentedData> TConsensusMsg for RandMessage<S, D> {
fn epoch(&self) -> u64 {
match self {
RandMessage::RequestShare(request) => request.epoch(),
Expand Down
Loading

0 comments on commit 87b5e28

Please sign in to comment.