diff --git a/Cargo.lock b/Cargo.lock index ea7d7063d8d5d..0c30fdba46840 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,6 +881,7 @@ dependencies = [ "aptos-consensus-types", "aptos-crypto", "aptos-crypto-derive", + "aptos-dkg", "aptos-enum-conversion-derive", "aptos-event-notifications", "aptos-executor", @@ -937,6 +938,7 @@ dependencies = [ "serde_bytes", "serde_json", "serde_yaml 0.8.26", + "sha3 0.9.1", "strum_macros 0.24.3", "tempfile", "thiserror", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 275ebe0ba527f..3b3c689e51a50 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -23,6 +23,7 @@ aptos-consensus-notifications = { workspace = true } aptos-consensus-types = { workspace = true } aptos-crypto = { workspace = true } aptos-crypto-derive = { workspace = true } +aptos-dkg = { workspace = true } aptos-enum-conversion-derive = { workspace = true } aptos-event-notifications = { workspace = true } aptos-executor = { workspace = true } @@ -75,6 +76,7 @@ serde = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } +sha3 = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -90,6 +92,7 @@ aptos-keygen = { workspace = true } aptos-mempool = { workspace = true, features = ["fuzzing"] } aptos-network = { workspace = true, features = ["fuzzing"] } aptos-safety-rules = { workspace = true, features = ["testing"] } +aptos-vm = { workspace = true, features = ["fuzzing"] } aptos-vm-validator = { workspace = true } claims = { workspace = true } move-core-types = { workspace = true } diff --git a/consensus/src/block_storage/tracing.rs b/consensus/src/block_storage/tracing.rs index 1a08f4db118a9..302fc7e336877 100644 --- a/consensus/src/block_storage/tracing.rs +++ b/consensus/src/block_storage/tracing.rs @@ -19,8 +19,7 @@ impl BlockStage { pub const QC_ADDED: &'static str = "qc_added"; pub const QC_AGGREGATED: &'static str = "qc_aggregated"; pub const RAND_ADD_DECISION: &'static str = "rand_add_decision"; - pub const RAND_ADD_SHARE: &'static str = "rand_add_share"; - pub const RAND_AGG_DECISION: &'static str = "rand_agg_decision"; + pub const RAND_ADD_ENOUGH_SHARE: &'static str = "rand_add_enough_share"; pub const RAND_ENTER: &'static str = "rand_enter"; pub const RAND_READY: &'static str = "rand_ready"; pub const ROUND_MANAGER_RECEIVED: &'static str = "round_manager_received"; diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 836f8da887887..df50af9c91b38 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -868,6 +868,15 @@ pub static MAX_TXNS_FROM_BLOCK_TO_EXECUTE: Lazy = Lazy::new(|| { .unwrap() }); +/// Count of the number of `DKG` validator transactions received while the feature is disabled. +pub static UNEXPECTED_DKG_VTXN_COUNT: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_consensus_unexpected_dkg_vtxn_count", + "Count of the number of `DKG` validator transactions received while the feature is disabled." + ) + .unwrap() +}); + /// Update various counters for committed blocks pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc]) { for block in blocks_to_commit { @@ -938,3 +947,11 @@ pub static PROPOSED_VTXN_BYTES: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static RAND_QUEUE_SIZE: Lazy = Lazy::new(|| { + register_int_gauge!( + "aptos_consensus_rand_queue_size", + "Number of randomness-pending blocks." + ) + .unwrap() +}); diff --git a/consensus/src/logging.rs b/consensus/src/logging.rs index 4040b0ef5a23d..0e04adb3343d6 100644 --- a/consensus/src/logging.rs +++ b/consensus/src/logging.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_consensus_types::common::Author; +use aptos_crypto::HashValue; use aptos_logger::Schema; use aptos_types::block_info::Round; use serde::Serialize; @@ -10,9 +11,11 @@ use serde::Serialize; #[derive(Schema)] pub struct LogSchema { event: LogEvent, + author: Option, remote_peer: Option, epoch: Option, round: Option, + id: Option, } #[derive(Serialize)] @@ -40,15 +43,25 @@ pub enum LogEvent { Timeout, Vote, VoteNIL, + // log events related to randomness generation + BroadcastRandShare, + ReceiveProactiveRandShare, + ReceiveReactiveRandShare, + BroadcastAugData, + ReceiveAugData, + BroadcastCertifiedAugData, + ReceiveCertifiedAugData, } impl LogSchema { pub fn new(event: LogEvent) -> Self { Self { event, + author: None, remote_peer: None, epoch: None, round: None, + id: None, } } } diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 052e085b071cb..5da56774b06a7 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -14,7 +14,7 @@ use crate::{ network_interface::{ConsensusMsg, ConsensusNetworkClient, RPC}, pipeline::commit_reliable_broadcast::CommitMessage, quorum_store::types::{Batch, BatchMsg, BatchRequest, BatchResponse}, - rand::rand_gen::RandGenMessage, + rand::rand_gen::network_messages::RandGenMessage, }; use anyhow::{anyhow, bail, ensure}; use aptos_channels::{self, aptos_channel, message_queues::QueueStyle}; diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index 3b91bfcf09679..16f7d06d3587d 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -8,7 +8,7 @@ use crate::{ dag::DAGNetworkMessage, pipeline, quorum_store::types::{Batch, BatchMsg, BatchRequest, BatchResponse}, - rand::rand_gen::RandGenMessage, + rand::rand_gen::network_messages::RandGenMessage, }; use aptos_config::network_id::{NetworkId, PeerNetworkId}; use aptos_consensus_types::{ diff --git a/consensus/src/rand/rand_gen/aug_data_store.rs b/consensus/src/rand/rand_gen/aug_data_store.rs index 7d5211ea6cb01..5723a60ee7231 100644 --- a/consensus/src/rand/rand_gen/aug_data_store.rs +++ b/consensus/src/rand/rand_gen/aug_data_store.rs @@ -2,10 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::rand::rand_gen::{ - storage::interface::AugDataStorage, + storage::interface::RandStorage, types::{ - AugData, AugDataId, AugDataSignature, AugmentedData, CertifiedAugData, CertifiedAugDataAck, - RandConfig, + AugData, AugDataId, AugDataSignature, CertifiedAugData, CertifiedAugDataAck, RandConfig, + TAugmentedData, }, }; use anyhow::ensure; @@ -14,16 +14,16 @@ use aptos_logger::error; use aptos_types::validator_signer::ValidatorSigner; use std::{collections::HashMap, sync::Arc}; -pub struct AugDataStore { +pub struct AugDataStore { epoch: u64, signer: Arc, config: RandConfig, data: HashMap>, certified_data: HashMap>, - db: Arc, + db: Arc>, } -impl> AugDataStore { +impl AugDataStore { fn filter_by_epoch( epoch: u64, all_data: impl Iterator, @@ -44,24 +44,30 @@ impl> AugDataStore { epoch: u64, signer: Arc, config: RandConfig, - db: Arc, + db: Arc>, ) -> Self { let all_data = db.get_all_aug_data().unwrap_or_default(); let (to_remove, aug_data) = Self::filter_by_epoch(epoch, all_data.into_iter()); - if let Err(e) = db.remove_aug_data(to_remove.into_iter()) { + if let Err(e) = db.remove_aug_data(to_remove) { error!("[AugDataStore] failed to remove aug data: {:?}", e); } let all_certified_data = db.get_all_certified_aug_data().unwrap_or_default(); let (to_remove, certified_data) = Self::filter_by_epoch(epoch, all_certified_data.into_iter()); - if let Err(e) = db.remove_certified_aug_data(to_remove.into_iter()) { + if let Err(e) = db.remove_certified_aug_data(to_remove) { error!( "[AugDataStore] failed to remove certified aug data: {:?}", e ); } + for (_, certified_data) in &certified_data { + certified_data + .data() + .augment(&config, certified_data.author()); + } + Self { epoch, signer, @@ -79,11 +85,11 @@ impl> AugDataStore { } pub fn get_my_aug_data(&self) -> Option> { - self.data.get(self.config.author()).cloned() + self.data.get(&self.config.author()).cloned() } pub fn get_my_certified_aug_data(&self) -> Option> { - self.certified_data.get(self.config.author()).cloned() + self.certified_data.get(&self.config.author()).cloned() } pub fn add_aug_data(&mut self, data: AugData) -> anyhow::Result { diff --git a/consensus/src/rand/rand_gen/block_queue.rs b/consensus/src/rand/rand_gen/block_queue.rs index 79dad70de7061..0c5b316829e6b 100644 --- a/consensus/src/rand/rand_gen/block_queue.rs +++ b/consensus/src/rand/rand_gen/block_queue.rs @@ -94,6 +94,10 @@ impl BlockQueue { } } + pub fn queue(&self) -> &BTreeMap { + &self.queue + } + pub fn push_back(&mut self, item: QueueItem) { for block in item.blocks() { observe_block(block.timestamp_usecs(), BlockStage::RAND_ENTER); diff --git a/consensus/src/rand/rand_gen/mod.rs b/consensus/src/rand/rand_gen/mod.rs index 49cfd4cc4d0a1..0127c320dc217 100644 --- a/consensus/src/rand/rand_gen/mod.rs +++ b/consensus/src/rand/rand_gen/mod.rs @@ -4,14 +4,12 @@ #[cfg(test)] mod test_utils; -mod block_queue; -mod network_messages; -mod rand_store; -mod types; - -mod aug_data_store; -mod rand_manager; -mod reliable_broadcast_state; -mod storage; - -pub use network_messages::RandGenMessage; +pub mod block_queue; +pub mod network_messages; +pub mod rand_store; +pub mod types; + +pub mod aug_data_store; +pub mod rand_manager; +pub mod reliable_broadcast_state; +pub mod storage; diff --git a/consensus/src/rand/rand_gen/network_messages.rs b/consensus/src/rand/rand_gen/network_messages.rs index 157cc2060989c..4477d0b49c955 100644 --- a/consensus/src/rand/rand_gen/network_messages.rs +++ b/consensus/src/rand/rand_gen/network_messages.rs @@ -5,8 +5,8 @@ use crate::{ network::TConsensusMsg, network_interface::ConsensusMsg, rand::rand_gen::types::{ - AugData, AugDataSignature, AugmentedData, CertifiedAugData, CertifiedAugDataAck, - RandConfig, RandShare, RequestShare, Share, + AugData, AugDataSignature, CertifiedAugData, CertifiedAugDataAck, RandConfig, RandShare, + RequestShare, TAugmentedData, TShare, }, }; use anyhow::bail; @@ -30,7 +30,7 @@ pub enum RandMessage { CertifiedAugDataAck(CertifiedAugDataAck), } -impl RandMessage { +impl RandMessage { pub fn verify( &self, epoch_state: &EpochState, @@ -49,9 +49,9 @@ impl RandMessage { } } -impl RBMessage for RandMessage {} +impl RBMessage for RandMessage {} -impl TConsensusMsg for RandMessage { +impl TConsensusMsg for RandMessage { fn epoch(&self) -> u64 { match self { RandMessage::RequestShare(request) => request.epoch(), diff --git a/consensus/src/rand/rand_gen/rand_manager.rs b/consensus/src/rand/rand_gen/rand_manager.rs index c43c975d830f5..f53d2a2cf3e1d 100644 --- a/consensus/src/rand/rand_gen/rand_manager.rs +++ b/consensus/src/rand/rand_gen/rand_manager.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + counters::RAND_QUEUE_SIZE, + logging::{LogEvent, LogSchema}, network::{IncomingRandGenRequest, NetworkSender, TConsensusMsg}, pipeline::buffer_manager::{OrderedBlocks, ResetAck, ResetRequest, ResetSignal}, rand::rand_gen::{ @@ -12,11 +14,12 @@ use crate::{ reliable_broadcast_state::{ AugDataCertBuilder, CertifiedAugDataAckState, ShareAggregateState, }, - storage::interface::AugDataStorage, - types::{AugmentedData, CertifiedAugData, RandConfig, RequestShare, Share}, + storage::interface::RandStorage, + types::{RandConfig, RequestShare, TAugmentedData, TShare}, }, }; use aptos_bounded_executor::BoundedExecutor; +use aptos_channels::aptos_channel; use aptos_consensus_types::common::Author; use aptos_infallible::Mutex; use aptos_logger::{error, info, spawn_named, warn}; @@ -29,16 +32,21 @@ use aptos_types::{ validator_signer::ValidatorSigner, }; use bytes::Bytes; -use futures::future::{AbortHandle, Abortable}; -use futures_channel::oneshot; +use futures::{ + future::{AbortHandle, Abortable}, + FutureExt, StreamExt, +}; +use futures_channel::{ + mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use std::{sync::Arc, time::Duration}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_retry::strategy::ExponentialBackoff; pub type Sender = UnboundedSender; pub type Receiver = UnboundedReceiver; -pub struct RandManager { +pub struct RandManager { author: Author, epoch_state: Arc, stop: bool, @@ -52,11 +60,11 @@ pub struct RandManager { outgoing_blocks: Sender, // local state rand_store: Arc>>, - aug_data_store: AugDataStore, + aug_data_store: AugDataStore, block_queue: BlockQueue, } -impl> RandManager { +impl RandManager { pub fn new( author: Author, epoch_state: Arc, @@ -64,7 +72,7 @@ impl> RandManager, network_sender: Arc, - db: Arc, + db: Arc>, bounded_executor: BoundedExecutor, ) -> Self { let rb_backoff_policy = ExponentialBackoff::from_millis(2) @@ -78,7 +86,7 @@ impl> RandManager> RandManager = blocks.ordered_blocks.iter().map(|b| b.round()).collect(); + info!(rounds = rounds, "Processing incoming blocks."); let broadcast_handles: Vec<_> = blocks .ordered_blocks .iter() @@ -117,19 +127,30 @@ impl> RandManager DropGuard { let self_share = S::generate(&self.config, metadata.clone()); + info!(LogSchema::new(LogEvent::BroadcastRandShare) + .epoch(self.epoch_state.epoch) + .author(self.author) + .round(metadata.round())); let mut rand_store = self.rand_store.lock(); - rand_store.add_rand_metadata(metadata.clone()); + rand_store.update_highest_known_round(metadata.round()); rand_store .add_share(self_share.clone()) .expect("Add self share should succeed"); + rand_store.add_rand_metadata(metadata.clone()); self.network_sender .broadcast_without_self(RandMessage::::Share(self_share).into_network_message()); self.spawn_aggregate_shares_task(metadata) } fn process_ready_blocks(&mut self, ready_blocks: Vec) { + let rounds: Vec = ready_blocks + .iter() + .flat_map(|b| b.ordered_blocks.iter().map(|b3| b3.round())) + .collect(); + info!(rounds = rounds, "Processing rand-ready blocks."); + for blocks in ready_blocks { - let _ = self.outgoing_blocks.send(blocks); + let _ = self.outgoing_blocks.unbounded_send(blocks); } } @@ -140,12 +161,18 @@ impl> RandManager round, }; self.block_queue = BlockQueue::new(); - self.rand_store.lock().reset(target_round); + self.rand_store + .lock() + .update_highest_known_round(target_round); self.stop = matches!(signal, ResetSignal::Stop); let _ = tx.send(ResetAck::default()); } fn process_randomness(&mut self, randomness: Randomness) { + info!( + metadata = randomness.metadata().metadata_to_sign, + "Processing decisioned randomness." + ); if let Some(block) = self.block_queue.item_mut(randomness.round()) { block.set_randomness(randomness.round(), randomness); } @@ -163,12 +190,12 @@ impl> RandManager, - mut incoming_rpc_request: Receiver, + mut incoming_rpc_request: aptos_channel::Receiver, verified_msg_tx: UnboundedSender>, rand_config: RandConfig, bounded_executor: BoundedExecutor, ) { - while let Some(rand_gen_msg) = incoming_rpc_request.recv().await { + while let Some(rand_gen_msg) = incoming_rpc_request.next().await { let tx = verified_msg_tx.clone(); let epoch_state_clone = epoch_state.clone(); let config_clone = rand_config.clone(); @@ -180,7 +207,7 @@ impl> RandManager> RandManager CertifiedAugData { - if let Some(certified_data) = self.aug_data_store.get_my_certified_aug_data() { - info!("[RandManager] Already have certified aug data"); - return certified_data; - } + async fn broadcast_aug_data(&mut self) -> DropGuard { let data = self .aug_data_store .get_my_aug_data() @@ -251,25 +274,31 @@ impl> RandManager) -> DropGuard { - let rb = self.reliable_broadcast.clone(); + let rb2 = self.reliable_broadcast.clone(); let validators = self.epoch_state.verifier.get_ordered_account_addresses(); - // Add it synchronously to be able to sign without a race that we get to sign before the broadcast reaches aug store. - self.aug_data_store - .add_certified_aug_data(certified_data.clone()) - .expect("Add self aug data should succeed"); - let task = async move { - let ack_state = Arc::new(CertifiedAugDataAckState::new(validators.into_iter())); + let maybe_existing_certified_data = self.aug_data_store.get_my_certified_aug_data(); + let phase1 = async move { + if let Some(certified_data) = maybe_existing_certified_data { + info!("[RandManager] Already have certified aug data"); + return certified_data; + } + info!("[RandManager] Start broadcasting aug data"); + info!(LogSchema::new(LogEvent::BroadcastAugData) + .author(*data.author()) + .epoch(data.epoch())); + let certified_data = rb.broadcast(data, aug_ack).await; + info!("[RandManager] Finish broadcasting aug data"); + certified_data + }; + let ack_state = Arc::new(CertifiedAugDataAckState::new(validators.into_iter())); + let task = phase1.then(|certified_data| async move { + info!(LogSchema::new(LogEvent::BroadcastCertifiedAugData) + .author(*certified_data.author()) + .epoch(certified_data.epoch())); info!("[RandManager] Start broadcasting certified aug data"); - rb.broadcast(certified_data, ack_state).await; + rb2.broadcast(certified_data, ack_state).await; info!("[RandManager] Finish broadcasting certified aug data"); - }; + }); let (abort_handle, abort_registration) = AbortHandle::new_pair(); tokio::spawn(Abortable::new(task, abort_registration)); DropGuard::new(abort_handle) @@ -278,12 +307,12 @@ impl> RandManager, - incoming_rpc_request: Receiver, + incoming_rpc_request: aptos_channel::Receiver, mut reset_rx: Receiver, bounded_executor: BoundedExecutor, ) { info!("RandManager started"); - let (verified_msg_tx, mut verified_msg_rx) = tokio::sync::mpsc::unbounded_channel(); + let (verified_msg_tx, mut verified_msg_rx) = unbounded(); let epoch_state = self.epoch_state.clone(); let rand_config = self.config.clone(); spawn_named!( @@ -297,22 +326,21 @@ impl> RandManager { + Some(blocks) = incoming_blocks.next() => { self.process_incoming_blocks(blocks); } - Some(reset) = reset_rx.recv() => { - while incoming_blocks.try_recv().is_ok() {} + Some(reset) = reset_rx.next() => { + while matches!(incoming_blocks.try_next(), Ok(Some(_))) {} self.process_reset(reset); } - Some(randomness) = self.decision_rx.recv() => { + Some(randomness) = self.decision_rx.next() => { self.process_randomness(randomness); } - Some(request) = verified_msg_rx.recv() => { + Some(request) = verified_msg_rx.next() => { let RpcRequest { req: rand_gen_msg, protocol, @@ -337,17 +365,31 @@ impl> RandManager { + info!(LogSchema::new(LogEvent::ReceiveProactiveRandShare) + .author(self.author) + .epoch(share.epoch()) + .round(share.metadata().round()) + .remote_peer(*share.author())); + if let Err(e) = self.rand_store.lock().add_share(share) { warn!("[RandManager] Failed to add share: {}", e); } } RandMessage::AugData(aug_data) => { + info!(LogSchema::new(LogEvent::ReceiveAugData) + .author(self.author) + .epoch(aug_data.epoch()) + .remote_peer(*aug_data.author())); match self.aug_data_store.add_aug_data(aug_data) { Ok(sig) => self.process_response(protocol, response_sender, RandMessage::AugDataSignature(sig)), Err(e) => error!("[RandManager] Failed to add aug data: {}", e), } } RandMessage::CertifiedAugData(certified_aug_data) => { + info!(LogSchema::new(LogEvent::ReceiveCertifiedAugData) + .author(self.author) + .epoch(certified_aug_data.epoch()) + .remote_peer(*certified_aug_data.author())); match self.aug_data_store.add_certified_aug_data(certified_aug_data) { Ok(ack) => self.process_response(protocol, response_sender, RandMessage::CertifiedAugDataAck(ack)), Err(e) => error!("[RandManager] Failed to add certified aug data: {}", e), @@ -356,6 +398,10 @@ impl> RandManager unreachable!("[RandManager] Unexpected message type after verification"), } } + _ = interval.tick().fuse() => { + self.observe_queue(); + }, + } let maybe_ready_blocks = self.block_queue.dequeue_rand_ready_prefix(); if !maybe_ready_blocks.is_empty() { @@ -364,4 +410,9 @@ impl> RandManager { total_weight: u64, } -impl ShareAggregator { +impl ShareAggregator { pub fn new(author: Author) -> Self { Self { author, @@ -32,9 +32,7 @@ impl ShareAggregator { } pub fn add_share(&mut self, weight: u64, share: RandShare) { - let timestamp = share.metadata().timestamp; if self.shares.insert(*share.author(), share).is_none() { - observe_block(timestamp, BlockStage::RAND_ADD_SHARE); self.total_weight += weight; } } @@ -45,15 +43,17 @@ impl ShareAggregator { rand_metadata: RandMetadata, decision_tx: Sender, ) -> Either> { - if self.total_weight < rand_config.threshold_weight() { + if self.total_weight < rand_config.threshold() { return Either::Left(self); } + // timestamp records the time when the block is created + observe_block(rand_metadata.timestamp, BlockStage::RAND_ADD_ENOUGH_SHARE); let rand_config = rand_config.clone(); let self_share = self .get_self_share() .expect("Aggregated item should have self share"); tokio::task::spawn_blocking(move || { - decision_tx.send(S::aggregate( + decision_tx.unbounded_send(S::aggregate( self.shares.values(), &rand_config, rand_metadata, @@ -92,7 +92,7 @@ enum RandItem { }, } -impl RandItem { +impl RandItem { fn new(author: Author) -> Self { Self::PendingMetadata(ShareAggregator::new(author)) } @@ -198,7 +198,7 @@ pub struct RandStore { decision_tx: Sender, } -impl RandStore { +impl RandStore { pub fn new( epoch: u64, author: Author, @@ -215,12 +215,11 @@ impl RandStore { } } - pub fn reset(&mut self, target_round: u64) { - self.highest_known_round = std::cmp::max(self.highest_known_round, target_round); + pub fn update_highest_known_round(&mut self, round: u64) { + self.highest_known_round = std::cmp::max(self.highest_known_round, round); } pub fn add_rand_metadata(&mut self, rand_metadata: RandMetadata) { - self.highest_known_round = std::cmp::max(self.highest_known_round, rand_metadata.round()); let rand_item = self .rand_map .entry(rand_metadata.round()) @@ -283,30 +282,141 @@ mod tests { types::{MockShare, RandConfig}, }; use aptos_consensus_types::common::Author; - use aptos_types::randomness::RandMetadata; - use std::{collections::HashMap, str::FromStr}; - use tokio::sync::mpsc::unbounded_channel; + use aptos_crypto::{bls12381, HashValue, Uniform}; + use aptos_dkg::{ + pvss::{traits::Transcript, Player, WeightedConfig}, + weighted_vuf::traits::WeightedVUF, + }; + use aptos_types::{ + dkg::{real_dkg::maybe_dk_from_bls_sk, DKGSessionMetadata, DKGTrait, DefaultDKG}, + randomness::{RandKeys, RandMetadata, WvufPP, WVUF}, + validator_verifier::{ + ValidatorConsensusInfo, ValidatorConsensusInfoMoveStruct, ValidatorVerifier, + }, + }; + use futures::StreamExt; + use futures_channel::mpsc::unbounded; + use rand::thread_rng; + use std::str::FromStr; + + /// Captures important data items across the whole DKG-WVUF flow. + struct TestContext { + authors: Vec, + dealer_epoch: u64, + target_epoch: u64, + rand_config: RandConfig, + } + + impl TestContext { + fn new(weights: Vec, my_index: usize) -> Self { + let dealer_epoch = 0; + let target_epoch = 1; + let num_validators = weights.len(); + let mut rng = thread_rng(); + let authors: Vec<_> = (0..num_validators) + .map(|i| Author::from_str(&format!("{:x}", i)).unwrap()) + .collect(); + let private_keys: Vec = (0..num_validators) + .map(|_| bls12381::PrivateKey::generate_for_testing()) + .collect(); + let public_keys: Vec = + private_keys.iter().map(bls12381::PublicKey::from).collect(); + let dkg_decrypt_keys: Vec<::NewValidatorDecryptKey> = + private_keys + .iter() + .map(|sk| maybe_dk_from_bls_sk(sk).unwrap()) + .collect(); + let consensus_infos: Vec = (0..num_validators) + .map(|idx| { + ValidatorConsensusInfo::new( + authors[idx], + public_keys[idx].clone(), + weights[idx], + ) + }) + .collect(); + let consensus_info_move_structs = consensus_infos + .clone() + .into_iter() + .map(ValidatorConsensusInfoMoveStruct::from) + .collect::>(); + let verifier = ValidatorVerifier::new(consensus_infos.clone()); + let dkg_session_metadata = DKGSessionMetadata { + dealer_epoch: 999, + dealer_validator_set: consensus_info_move_structs.clone(), + target_validator_set: consensus_info_move_structs.clone(), + }; + let dkg_pub_params = DefaultDKG::new_public_params(&dkg_session_metadata); + let input_secret = ::InputSecret::generate_for_testing(); + let transcript = DefaultDKG::generate_transcript( + &mut rng, + &dkg_pub_params, + &input_secret, + 0, + &private_keys[0], + ); + let (sk, pk) = DefaultDKG::decrypt_secret_share_from_transcript( + &dkg_pub_params, + &transcript, + my_index as u64, + &dkg_decrypt_keys[my_index], + ) + .unwrap(); + + let pk_shares = (0..num_validators) + .map(|id| { + transcript + .get_public_key_share(&dkg_pub_params.pvss_config.wconfig, &Player { id }) + }) + .collect::>(); + let vuf_pub_params = WvufPP::from(&dkg_pub_params.pvss_config.pp); + + let (ask, apk) = WVUF::augment_key_pair(&vuf_pub_params, sk, pk, &mut rng); + + let rand_keys = RandKeys::new(ask, apk, pk_shares, num_validators); + let weights: Vec = weights.into_iter().map(|x| x as usize).collect(); + let half_total_weights = weights.clone().into_iter().sum::() / 2; + let weighted_config = WeightedConfig::new(half_total_weights, weights).unwrap(); + let rand_config = RandConfig::new( + authors[my_index], + target_epoch, + verifier, + vuf_pub_params, + rand_keys, + weighted_config, + ); + + Self { + authors, + dealer_epoch, + target_epoch, + rand_config, + } + } + } #[test] fn test_share_aggregator() { - let mut aggr = ShareAggregator::new(Author::ONE); - let weights = HashMap::from([(Author::ONE, 1), (Author::TWO, 2), (Author::ZERO, 3)]); - let shares = vec![ - create_share_for_round(1, Author::ONE), - create_share_for_round(2, Author::TWO), - create_share_for_round(1, Author::ZERO), - ]; - for share in shares.iter() { - aggr.add_share(*weights.get(share.author()).unwrap(), share.clone()); - // double add should be no op to the total weight - aggr.add_share(*weights.get(share.author()).unwrap(), share.clone()); - } + let ctxt = TestContext::new(vec![1, 2, 3], 0); + let mut aggr = ShareAggregator::new(ctxt.authors[0]); + aggr.add_share( + 1, + create_share_for_round(ctxt.target_epoch, 1, ctxt.authors[0]), + ); + aggr.add_share( + 2, + create_share_for_round(ctxt.target_epoch, 2, ctxt.authors[1]), + ); + aggr.add_share( + 3, + create_share_for_round(ctxt.target_epoch, 1, ctxt.authors[2]), + ); assert_eq!(aggr.shares.len(), 3); assert_eq!(aggr.total_weight, 6); // retain the shares with the same metadata aggr.retain( - &RandConfig::new(1, Author::ZERO, weights), - &RandMetadata::new_for_testing(1), + &ctxt.rand_config, + &RandMetadata::new(ctxt.target_epoch, 1, HashValue::zero(), 1700000000), ); assert_eq!(aggr.shares.len(), 2); assert_eq!(aggr.total_weight, 4); @@ -314,42 +424,48 @@ mod tests { #[tokio::test] async fn test_rand_item() { - let weights = HashMap::from([(Author::ONE, 1), (Author::TWO, 2), (Author::ZERO, 3)]); - let config = RandConfig::new(1, Author::ZERO, weights); - let (tx, _rx) = unbounded_channel(); + let ctxt = TestContext::new(vec![1, 2, 3], 1); + let (tx, _rx) = unbounded(); let shares = vec![ - create_share_for_round(2, Author::ONE), - create_share_for_round(1, Author::TWO), - create_share_for_round(1, Author::ZERO), + create_share_for_round(ctxt.target_epoch, 2, ctxt.authors[0]), + create_share_for_round(ctxt.target_epoch, 1, ctxt.authors[1]), + create_share_for_round(ctxt.target_epoch, 1, ctxt.authors[2]), ]; - let mut item = RandItem::::new(Author::TWO); + let mut item = RandItem::::new(ctxt.authors[1]); for share in shares.iter() { - item.add_share(share.clone(), &config).unwrap(); + item.add_share(share.clone(), &ctxt.rand_config).unwrap(); } assert_eq!(item.total_weights().unwrap(), 6); - item.add_metadata(&config, RandMetadata::new_for_testing(1)); + item.add_metadata( + &ctxt.rand_config, + RandMetadata::new(ctxt.target_epoch, 1, HashValue::zero(), 1700000000), + ); assert_eq!(item.total_weights().unwrap(), 5); - item.try_aggregate(&config, tx); + item.try_aggregate(&ctxt.rand_config, tx); assert!(item.has_decision()); - let mut item = RandItem::::new(Author::ONE); - item.add_metadata(&config, RandMetadata::new_for_testing(2)); + let mut item = RandItem::::new(ctxt.authors[0]); + item.add_metadata( + &ctxt.rand_config, + RandMetadata::new(ctxt.target_epoch, 2, HashValue::zero(), 1700000000), + ); for share in shares[1..].iter() { - item.add_share(share.clone(), &config).unwrap_err(); + item.add_share(share.clone(), &ctxt.rand_config) + .unwrap_err(); } } #[tokio::test] async fn test_rand_store() { - let authors: Vec<_> = (0..7) - .map(|i| Author::from_str(&format!("{:x}", i)).unwrap()) - .collect(); - let weights: HashMap = authors.iter().map(|addr| (*addr, 1)).collect(); - let authors: Vec = weights.keys().cloned().collect(); - let config = RandConfig::new(1, Author::ZERO, weights); - let (decision_tx, mut decision_rx) = unbounded_channel(); - let mut rand_store = RandStore::new(1, authors[1], config, decision_tx); + let ctxt = TestContext::new(vec![100; 7], 0); + let (decision_tx, mut decision_rx) = unbounded(); + let mut rand_store = RandStore::new( + ctxt.target_epoch, + ctxt.authors[1], + ctxt.rand_config.clone(), + decision_tx, + ); let rounds = vec![vec![1], vec![2, 3], vec![5, 8, 13]]; let blocks_1 = QueueItem::new(create_ordered_blocks(rounds[0].clone()), None); @@ -358,29 +474,30 @@ mod tests { let metadata_2 = blocks_2.all_rand_metadata(); // shares come before metadata - for share in authors[0..5] + for share in ctxt.authors[0..5] .iter() .map(|author| create_share(metadata_1[0].clone(), *author)) { rand_store.add_share(share).unwrap(); } - assert!(decision_rx.try_recv().is_err()); + assert!(decision_rx.try_next().is_err()); for metadata in blocks_1.all_rand_metadata() { rand_store.add_rand_metadata(metadata); } - assert!(decision_rx.recv().await.is_some()); + assert!(decision_rx.next().await.is_some()); + // metadata come after shares for metadata in blocks_2.all_rand_metadata() { rand_store.add_rand_metadata(metadata); } - assert!(decision_rx.try_recv().is_err()); + assert!(decision_rx.try_next().is_err()); - for share in authors[1..6] + for share in ctxt.authors[1..6] .iter() .map(|author| create_share(metadata_2[0].clone(), *author)) { rand_store.add_share(share).unwrap(); } - assert!(decision_rx.recv().await.is_some()); + assert!(decision_rx.next().await.is_some()); } } diff --git a/consensus/src/rand/rand_gen/reliable_broadcast_state.rs b/consensus/src/rand/rand_gen/reliable_broadcast_state.rs index 43999890a0e05..e993ee36d17bb 100644 --- a/consensus/src/rand/rand_gen/reliable_broadcast_state.rs +++ b/consensus/src/rand/rand_gen/reliable_broadcast_state.rs @@ -5,8 +5,8 @@ use crate::rand::rand_gen::{ network_messages::RandMessage, rand_store::RandStore, types::{ - AugData, AugDataSignature, AugmentedData, CertifiedAugData, CertifiedAugDataAck, - RandConfig, RandShare, RequestShare, Share, + AugData, AugDataSignature, CertifiedAugData, CertifiedAugDataAck, RandConfig, RandShare, + RequestShare, TAugmentedData, TShare, }, }; use anyhow::ensure; @@ -34,7 +34,7 @@ impl AugDataCertBuilder { } } -impl BroadcastStatus, RandMessage> +impl BroadcastStatus, RandMessage> for Arc> { type Aggregated = CertifiedAugData; @@ -45,7 +45,7 @@ impl BroadcastStatus, RandMessage< ack.verify(peer, &self.epoch_state.verifier, &self.aug_data)?; let mut parital_signatures_guard = self.partial_signatures.lock(); parital_signatures_guard.add_signature(peer, ack.into_signature()); - Ok(self + let qc_aug_data = self .epoch_state .verifier .check_voting_power(parital_signatures_guard.signatures().keys(), true) @@ -57,7 +57,8 @@ impl BroadcastStatus, RandMessage< .aggregate_signatures(&parital_signatures_guard) .expect("Signature aggregation should succeed"); CertifiedAugData::new(self.aug_data.clone(), aggregated_signature) - })) + }); + Ok(qc_aug_data) } } @@ -73,7 +74,7 @@ impl CertifiedAugDataAckState { } } -impl BroadcastStatus, RandMessage> +impl BroadcastStatus, RandMessage> for Arc { type Aggregated = (); @@ -116,7 +117,7 @@ impl ShareAggregateState { } } -impl BroadcastStatus, RandMessage> +impl BroadcastStatus, RandMessage> for Arc> { type Aggregated = (); diff --git a/consensus/src/rand/rand_gen/storage/db.rs b/consensus/src/rand/rand_gen/storage/db.rs index 3a968000c4064..0700d6f92563a 100644 --- a/consensus/src/rand/rand_gen/storage/db.rs +++ b/consensus/src/rand/rand_gen/storage/db.rs @@ -5,12 +5,13 @@ use crate::{ error::DbError, rand::rand_gen::{ storage::{ - interface::AugDataStorage, + interface::RandStorage, schema::{ - AugDataSchema, CertifiedAugDataSchema, AUG_DATA_CF_NAME, CERTIFIED_AUG_DATA_CF_NAME, + AugDataSchema, CertifiedAugDataSchema, KeyPairSchema, AUG_DATA_CF_NAME, + CERTIFIED_AUG_DATA_CF_NAME, KEY_PAIR_CF_NAME, }, }, - types::{AugData, AugDataId, AugmentedData, CertifiedAugData}, + types::{AugData, AugDataId, CertifiedAugData, TAugmentedData}, }, }; use anyhow::Result; @@ -26,7 +27,11 @@ pub const RAND_DB_NAME: &str = "rand_db"; impl RandDb { pub(crate) fn new + Clone>(db_root_path: P) -> Self { - let column_families = vec![AUG_DATA_CF_NAME, CERTIFIED_AUG_DATA_CF_NAME]; + let column_families = vec![ + KEY_PAIR_CF_NAME, + AUG_DATA_CF_NAME, + CERTIFIED_AUG_DATA_CF_NAME, + ]; let path = db_root_path.as_ref().join(RAND_DB_NAME); let instant = Instant::now(); @@ -77,34 +82,40 @@ impl RandDb { } } -impl AugDataStorage for RandDb { - fn save_aug_data(&self, aug_data: &AugData) -> anyhow::Result<()> { +impl RandStorage for RandDb { + fn save_key_pair_bytes(&self, epoch: u64, key_pair: Vec) -> Result<()> { + Ok(self.put::(&(), &(epoch, key_pair))?) + } + + fn save_aug_data(&self, aug_data: &AugData) -> Result<()> { Ok(self.put::>(&aug_data.id(), aug_data)?) } - fn save_certified_aug_data( - &self, - certified_aug_data: &CertifiedAugData, - ) -> anyhow::Result<()> { + fn save_certified_aug_data(&self, certified_aug_data: &CertifiedAugData) -> Result<()> { Ok(self.put::>(&certified_aug_data.id(), certified_aug_data)?) } - fn get_all_aug_data(&self) -> anyhow::Result)>> { + fn get_key_pair_bytes(&self) -> Result)>> { + Ok(self.get_all::()?.pop().map(|(_, v)| v)) + } + + fn get_all_aug_data(&self) -> Result)>> { Ok(self.get_all::>()?) } - fn get_all_certified_aug_data(&self) -> anyhow::Result)>> { + fn get_all_certified_aug_data(&self) -> Result)>> { Ok(self.get_all::>()?) } - fn remove_aug_data(&self, aug_data: impl Iterator>) -> anyhow::Result<()> { - Ok(self.delete::>(aug_data.map(|d| d.id()))?) + fn remove_aug_data(&self, aug_data: Vec>) -> Result<()> { + Ok(self.delete::>(aug_data.into_iter().map(|d| d.id()))?) } fn remove_certified_aug_data( &self, - certified_aug_data: impl Iterator>, - ) -> anyhow::Result<()> { - Ok(self.delete::>(certified_aug_data.map(|d| d.id()))?) + certified_aug_data: Vec>, + ) -> Result<()> { + Ok(self + .delete::>(certified_aug_data.into_iter().map(|d| d.id()))?) } } diff --git a/consensus/src/rand/rand_gen/storage/in_memory.rs b/consensus/src/rand/rand_gen/storage/in_memory.rs index d669563bb2c65..cf5046f5d1a53 100644 --- a/consensus/src/rand/rand_gen/storage/in_memory.rs +++ b/consensus/src/rand/rand_gen/storage/in_memory.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::rand::rand_gen::{ - storage::interface::AugDataStorage, - types::{AugData, AugDataId, AugmentedData, CertifiedAugData}, + storage::interface::RandStorage, + types::{AugData, AugDataId, CertifiedAugData, TAugmentedData}, }; use aptos_infallible::RwLock; use std::collections::HashMap; pub struct InMemRandDb { + key_pair: RwLock)>>, aug_data: RwLock>>, certified_aug_data: RwLock>>, } @@ -16,13 +17,19 @@ pub struct InMemRandDb { impl InMemRandDb { pub fn new() -> Self { Self { + key_pair: RwLock::new(None), aug_data: RwLock::new(HashMap::new()), certified_aug_data: RwLock::new(HashMap::new()), } } } -impl AugDataStorage for InMemRandDb { +impl RandStorage for InMemRandDb { + fn save_key_pair_bytes(&self, epoch: u64, key_pair: Vec) -> anyhow::Result<()> { + self.key_pair.write().replace((epoch, key_pair)); + Ok(()) + } + fn save_aug_data(&self, aug_data: &AugData) -> anyhow::Result<()> { self.aug_data .write() @@ -40,6 +47,10 @@ impl AugDataStorage for InMemRandDb { Ok(()) } + fn get_key_pair_bytes(&self) -> anyhow::Result)>> { + Ok(self.key_pair.read().clone()) + } + fn get_all_aug_data(&self) -> anyhow::Result)>> { Ok(self.aug_data.read().clone().into_iter().collect()) } @@ -48,7 +59,7 @@ impl AugDataStorage for InMemRandDb { Ok(self.certified_aug_data.read().clone().into_iter().collect()) } - fn remove_aug_data(&self, aug_data: impl Iterator>) -> anyhow::Result<()> { + fn remove_aug_data(&self, aug_data: Vec>) -> anyhow::Result<()> { for data in aug_data { self.aug_data.write().remove(&data.id()); } @@ -57,7 +68,7 @@ impl AugDataStorage for InMemRandDb { fn remove_certified_aug_data( &self, - certified_aug_data: impl Iterator>, + certified_aug_data: Vec>, ) -> anyhow::Result<()> { for data in certified_aug_data { self.certified_aug_data.write().remove(&data.id()); diff --git a/consensus/src/rand/rand_gen/storage/interface.rs b/consensus/src/rand/rand_gen/storage/interface.rs index d9bdd26354749..80a391f78285e 100644 --- a/consensus/src/rand/rand_gen/storage/interface.rs +++ b/consensus/src/rand/rand_gen/storage/interface.rs @@ -3,19 +3,21 @@ use crate::rand::rand_gen::types::{AugData, AugDataId, CertifiedAugData}; -pub trait AugDataStorage: 'static { +pub trait RandStorage: Send + Sync + 'static { + fn save_key_pair_bytes(&self, epoch: u64, key_pair: Vec) -> anyhow::Result<()>; fn save_aug_data(&self, aug_data: &AugData) -> anyhow::Result<()>; fn save_certified_aug_data( &self, certified_aug_data: &CertifiedAugData, ) -> anyhow::Result<()>; + fn get_key_pair_bytes(&self) -> anyhow::Result)>>; fn get_all_aug_data(&self) -> anyhow::Result)>>; fn get_all_certified_aug_data(&self) -> anyhow::Result)>>; - fn remove_aug_data(&self, aug_data: impl Iterator>) -> anyhow::Result<()>; + fn remove_aug_data(&self, aug_data: Vec>) -> anyhow::Result<()>; fn remove_certified_aug_data( &self, - certified_aug_data: impl Iterator>, + certified_aug_data: Vec>, ) -> anyhow::Result<()>; } diff --git a/consensus/src/rand/rand_gen/storage/schema.rs b/consensus/src/rand/rand_gen/storage/schema.rs index ef57242db7568..37b7c5b2a7e0e 100644 --- a/consensus/src/rand/rand_gen/storage/schema.rs +++ b/consensus/src/rand/rand_gen/storage/schema.rs @@ -1,25 +1,50 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::rand::rand_gen::types::{AugData, AugDataId, AugmentedData, CertifiedAugData}; +use crate::rand::rand_gen::types::{AugData, AugDataId, CertifiedAugData, TAugmentedData}; use aptos_schemadb::{ + define_schema, schema::{KeyCodec, Schema, ValueCodec}, ColumnFamilyName, }; use std::marker::PhantomData; +pub(crate) const KEY_PAIR_CF_NAME: ColumnFamilyName = "key_pair"; + +define_schema!(KeyPairSchema, (), (u64, Vec), KEY_PAIR_CF_NAME); + +impl KeyCodec for () { + fn encode_key(&self) -> anyhow::Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> anyhow::Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for (u64, Vec) { + fn encode_value(&self) -> anyhow::Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_value(data: &[u8]) -> anyhow::Result { + Ok(bcs::from_bytes(data)?) + } +} + pub(crate) const AUG_DATA_CF_NAME: ColumnFamilyName = "aug_data"; #[derive(Debug)] pub struct AugDataSchema(PhantomData); -impl Schema for AugDataSchema { +impl Schema for AugDataSchema { type Key = AugDataId; type Value = AugData; const COLUMN_FAMILY_NAME: ColumnFamilyName = AUG_DATA_CF_NAME; } -impl KeyCodec> for AugDataId { +impl KeyCodec> for AugDataId { fn encode_key(&self) -> anyhow::Result> { Ok(bcs::to_bytes(self)?) } @@ -29,7 +54,7 @@ impl KeyCodec> for AugDataId { } } -impl ValueCodec> for AugData { +impl ValueCodec> for AugData { fn encode_value(&self) -> anyhow::Result> { Ok(bcs::to_bytes(&self)?) } @@ -43,14 +68,14 @@ pub(crate) const CERTIFIED_AUG_DATA_CF_NAME: ColumnFamilyName = "certified_aug_d #[derive(Debug)] pub struct CertifiedAugDataSchema(PhantomData); -impl Schema for CertifiedAugDataSchema { +impl Schema for CertifiedAugDataSchema { type Key = AugDataId; type Value = CertifiedAugData; const COLUMN_FAMILY_NAME: ColumnFamilyName = CERTIFIED_AUG_DATA_CF_NAME; } -impl KeyCodec> for AugDataId { +impl KeyCodec> for AugDataId { fn encode_key(&self) -> anyhow::Result> { Ok(bcs::to_bytes(self)?) } @@ -60,7 +85,7 @@ impl KeyCodec> for AugDataId { } } -impl ValueCodec> for CertifiedAugData { +impl ValueCodec> for CertifiedAugData { fn encode_value(&self) -> anyhow::Result> { Ok(bcs::to_bytes(&self)?) } diff --git a/consensus/src/rand/rand_gen/test_utils.rs b/consensus/src/rand/rand_gen/test_utils.rs index 06d0b3eea063c..0c2977941c823 100644 --- a/consensus/src/rand/rand_gen/test_utils.rs +++ b/consensus/src/rand/rand_gen/test_utils.rs @@ -51,8 +51,16 @@ pub fn create_ordered_blocks(rounds: Vec) -> OrderedBlocks { } } -pub(super) fn create_share_for_round(round: Round, author: Author) -> RandShare { - RandShare::::new(author, RandMetadata::new_for_testing(round), MockShare) +pub(super) fn create_share_for_round( + epoch: u64, + round: Round, + author: Author, +) -> RandShare { + RandShare::::new( + author, + RandMetadata::new(epoch, round, HashValue::zero(), 1700000000), + MockShare, + ) } pub(super) fn create_share(rand_metadata: RandMetadata, author: Author) -> RandShare { diff --git a/consensus/src/rand/rand_gen/types.rs b/consensus/src/rand/rand_gen/types.rs index f3d4009ce66f3..b48922c92f6ef 100644 --- a/consensus/src/rand/rand_gen/types.rs +++ b/consensus/src/rand/rand_gen/types.rs @@ -1,17 +1,28 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::ensure; +use anyhow::{bail, ensure}; use aptos_consensus_types::common::{Author, Round}; use aptos_crypto::bls12381::Signature; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; +use aptos_dkg::{ + pvss::{Player, WeightedConfig}, + weighted_vuf::traits::WeightedVUF, +}; +use aptos_logger::debug; +use aptos_runtimes::spawn_rayon_thread_pool; use aptos_types::{ aggregate_signature::AggregateSignature, - randomness::{RandMetadata, Randomness}, + randomness::{ + Delta, PKShare, ProofShare, RandKeys, RandMetadata, Randomness, WvufPP, APK, WVUF, + }, validator_verifier::ValidatorVerifier, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{collections::HashMap, fmt::Debug}; +use sha3::{Digest, Sha3_256}; +use std::{fmt::Debug, sync::Arc}; + +const NUM_THREADS_FOR_WVUF_DERIVATION: usize = 8; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub(super) struct MockShare; @@ -19,7 +30,129 @@ pub(super) struct MockShare; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub(super) struct MockAugData; -impl Share for MockShare { +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Share { + share: ProofShare, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct AugmentedData { + delta: Delta, +} + +impl TShare for Share { + fn verify( + &self, + rand_config: &RandConfig, + rand_metadata: &RandMetadata, + author: &Author, + ) -> anyhow::Result<()> { + let index = *rand_config + .validator + .address_to_validator_index() + .get(author) + .unwrap(); + let maybe_apk = &rand_config.keys.certified_apks[index]; + if let Some(apk) = maybe_apk.get() { + WVUF::verify_share( + &rand_config.vuf_pp, + apk, + rand_metadata.to_bytes().as_slice(), + &self.share, + )?; + } else { + bail!( + "[RandShare] No augmented public key for validator id {}, {}", + index, + author + ); + } + Ok(()) + } + + fn generate(rand_config: &RandConfig, rand_metadata: RandMetadata) -> RandShare + where + Self: Sized, + { + let share = Share { + share: WVUF::create_share(&rand_config.keys.ask, rand_metadata.to_bytes().as_slice()), + }; + RandShare::new(rand_config.author(), rand_metadata, share) + } + + fn aggregate<'a>( + shares: impl Iterator>, + rand_config: &RandConfig, + rand_metadata: RandMetadata, + ) -> Randomness + where + Self: Sized, + { + let timer = std::time::Instant::now(); + let mut apks_and_proofs = vec![]; + for share in shares { + let id = *rand_config + .validator + .address_to_validator_index() + .get(share.author()) + .unwrap(); + let apk = rand_config.get_certified_apk(share.author()).unwrap(); // needs to have apk to verify the share + apks_and_proofs.push((Player { id }, apk.clone(), share.share().share)); + } + + let proof = WVUF::aggregate_shares(&rand_config.wconfig, &apks_and_proofs); + let pool = + spawn_rayon_thread_pool("wvuf".to_string(), Some(NUM_THREADS_FOR_WVUF_DERIVATION)); + let eval = WVUF::derive_eval( + &rand_config.wconfig, + &rand_config.vuf_pp, + rand_metadata.to_bytes().as_slice(), + &rand_config.get_all_certified_apk(), + &proof, + &pool, + ) + .expect("All APK should exist"); + debug!( + "WVUF derivation time: {} ms, number of threads: {}", + timer.elapsed().as_millis(), + NUM_THREADS_FOR_WVUF_DERIVATION + ); + let eval_bytes = bcs::to_bytes(&eval).unwrap(); + let rand_bytes = Sha3_256::digest(eval_bytes.as_slice()).to_vec(); + Randomness::new(rand_metadata.clone(), rand_bytes) + } +} + +impl TAugmentedData for AugmentedData { + fn generate(rand_config: &RandConfig) -> AugData + where + Self: Sized, + { + let delta = rand_config.get_my_delta().clone(); + rand_config + .add_certified_delta(&rand_config.author(), delta.clone()) + .expect("Add self delta should succeed"); + let data = AugmentedData { + delta: delta.clone(), + }; + AugData::new(rand_config.epoch(), rand_config.author(), data) + } + + fn augment(&self, rand_config: &RandConfig, author: &Author) { + let AugmentedData { delta } = self; + rand_config + .add_certified_delta(author, delta.clone()) + .expect("Add delta should succeed") + } + + fn verify(&self, rand_config: &RandConfig, author: &Author) -> anyhow::Result<()> { + rand_config + .derive_apk(author, self.delta.clone()) + .map(|_| ()) + } +} + +impl TShare for MockShare { fn verify( &self, _rand_config: &RandConfig, @@ -33,7 +166,7 @@ impl Share for MockShare { where Self: Sized, { - RandShare::new(*rand_config.author(), rand_metadata, Self) + RandShare::new(rand_config.author(), rand_metadata, Self) } fn aggregate<'a>( @@ -48,12 +181,12 @@ impl Share for MockShare { } } -impl AugmentedData for MockAugData { +impl TAugmentedData for MockAugData { fn generate(rand_config: &RandConfig) -> AugData where Self: Sized, { - AugData::new(rand_config.epoch(), *rand_config.author(), Self) + AugData::new(rand_config.epoch(), rand_config.author(), Self) } fn augment(&self, _rand_config: &RandConfig, _author: &Author) {} @@ -63,7 +196,7 @@ impl AugmentedData for MockAugData { } } -pub trait Share: +pub trait TShare: Clone + Debug + PartialEq + Send + Sync + Serialize + DeserializeOwned + 'static { fn verify( @@ -86,7 +219,7 @@ pub trait Share: Self: Sized; } -pub trait AugmentedData: +pub trait TAugmentedData: Clone + Debug + PartialEq + Send + Sync + Serialize + DeserializeOwned + 'static { fn generate(rand_config: &RandConfig) -> AugData @@ -112,7 +245,7 @@ pub struct RandShare { share: S, } -impl RandShare { +impl RandShare { pub fn new(author: Author, metadata: RandMetadata, share: S) -> Self { Self { author, @@ -125,6 +258,10 @@ impl RandShare { &self.author } + pub fn share(&self) -> &S { + &self.share + } + pub fn metadata(&self) -> &RandMetadata { &self.metadata } @@ -200,7 +337,7 @@ pub struct AugData { data: D, } -impl AugData { +impl AugData { pub fn new(epoch: u64, author: Author, data: D) -> Self { Self { epoch, @@ -246,7 +383,7 @@ impl AugDataSignature { self.epoch } - pub fn verify( + pub fn verify( &self, author: Author, verifier: &ValidatorVerifier, @@ -266,7 +403,7 @@ pub struct CertifiedAugData { signatures: AggregateSignature, } -impl CertifiedAugData { +impl CertifiedAugData { pub fn new(aug_data: AugData, signatures: AggregateSignature) -> Self { Self { aug_data, @@ -313,20 +450,43 @@ impl CertifiedAugDataAck { #[derive(Clone)] pub struct RandConfig { - epoch: u64, - author: Author, - threshold: u64, - weights: HashMap, + pub author: Author, + pub epoch: u64, + pub validator: ValidatorVerifier, + // public parameters of the weighted VUF + pub vuf_pp: WvufPP, + // key shares for weighted VUF + pub keys: Arc, + // weighted config for weighted VUF + pub wconfig: WeightedConfig, +} + +impl Debug for RandConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RandConfig {{ epoch: {}, author: {}, wconfig: {:?} }}", + self.epoch, self.author, self.wconfig + ) + } } impl RandConfig { - pub fn new(epoch: u64, author: Author, weights: HashMap) -> Self { - let sum = weights.values().sum::(); + pub fn new( + author: Author, + epoch: u64, + validator: ValidatorVerifier, + vuf_pp: WvufPP, + keys: RandKeys, + wconfig: WeightedConfig, + ) -> Self { Self { - epoch, author, - weights, - threshold: sum * 2 / 3 + 1, + epoch, + validator, + vuf_pp, + keys: Arc::new(keys), + wconfig, } } @@ -334,18 +494,64 @@ impl RandConfig { self.epoch } - pub fn author(&self) -> &Author { - &self.author + pub fn author(&self) -> Author { + self.author } - pub fn get_peer_weight(&self, author: &Author) -> u64 { + pub fn get_id(&self, peer: &Author) -> usize { *self - .weights - .get(author) - .expect("Author should exist after verify") + .validator + .address_to_validator_index() + .get(peer) + .unwrap() + } + + pub fn get_certified_apk(&self, peer: &Author) -> Option<&APK> { + let index = self.get_id(peer); + self.keys.certified_apks[index].get() + } + + pub fn get_all_certified_apk(&self) -> Vec> { + self.keys + .certified_apks + .iter() + .map(|cell| cell.get().cloned()) + .collect() + } + + pub fn add_certified_apk(&self, peer: &Author, apk: APK) -> anyhow::Result<()> { + let index = self.get_id(peer); + self.keys.add_certified_apk(index, apk) + } + + fn derive_apk(&self, peer: &Author, delta: Delta) -> anyhow::Result { + let apk = WVUF::augment_pubkey(&self.vuf_pp, self.get_pk_share(peer).clone(), delta)?; + Ok(apk) + } + + pub fn add_certified_delta(&self, peer: &Author, delta: Delta) -> anyhow::Result<()> { + let apk = self.derive_apk(peer, delta)?; + self.add_certified_apk(peer, apk)?; + Ok(()) + } + + pub fn get_my_delta(&self) -> &Delta { + WVUF::get_public_delta(&self.keys.apk) + } + + pub fn get_pk_share(&self, peer: &Author) -> &PKShare { + let index = self.get_id(peer); + &self.keys.pk_shares[index] + } + + pub fn get_peer_weight(&self, peer: &Author) -> u64 { + let player = Player { + id: self.get_id(peer), + }; + self.wconfig.get_player_weight(&player) as u64 } - pub fn threshold_weight(&self) -> u64 { - self.threshold + pub fn threshold(&self) -> u64 { + self.wconfig.get_threshold_weight() as u64 } }