diff --git a/Cargo.lock b/Cargo.lock index b49089f976ecc..0db9e0ff0acb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,6 +1313,7 @@ dependencies = [ "async-trait", "bcs 0.1.4", "bytes", + "fail 0.5.1", "futures", "futures-channel", "futures-util", diff --git a/config/src/config/identity_config.rs b/config/src/config/identity_config.rs index 0d2142574d834..9fcfd3aec0422 100644 --- a/config/src/config/identity_config.rs +++ b/config/src/config/identity_config.rs @@ -2,14 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{config::SecureBackend, keys::ConfigKey}; +use anyhow::anyhow; use aptos_crypto::{ bls12381, ed25519::Ed25519PrivateKey, x25519::{self, PRIVATE_KEY_SIZE}, ValidCryptoMaterial, }; -use aptos_types::account_address::{ - from_identity_public_key, AccountAddress, AccountAddress as PeerId, +use aptos_types::{ + account_address::{from_identity_public_key, AccountAddress, AccountAddress as PeerId}, + dkg::{real_dkg::maybe_dk_from_bls_sk, DKGTrait, DefaultDKG}, }; use serde::{Deserialize, Serialize}; use std::{ @@ -43,6 +45,21 @@ impl IdentityBlob { let mut file = File::open(path)?; Ok(file.write_all(serde_yaml::to_string(self)?.as_bytes())?) } + + pub fn try_into_dkg_dealer_private_key( + self, + ) -> Option<::DealerPrivateKey> { + self.consensus_private_key + } + + pub fn try_into_dkg_new_validator_decrypt_key( + self, + ) -> anyhow::Result<::NewValidatorDecryptKey> { + let consensus_sk = self.consensus_private_key.as_ref().ok_or_else(|| { + anyhow!("try_into_dkg_new_validator_decrypt_key failed with missing consensus key") + })?; + maybe_dk_from_bls_sk(consensus_sk) + } } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index 64b54c51cd623..63032c74c0ffb 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -10,6 +10,7 @@ use crate::{ persistent_liveness_storage::StorageWriteProxy, pipeline::execution_client::ExecutionProxyClient, quorum_store::quorum_store_db::QuorumStoreDB, + rand::rand_gen::storage::db::RandDb, state_computer::ExecutionProxy, transaction_filter::TransactionFilter, txn_notifier::MempoolNotifier, @@ -66,13 +67,16 @@ pub fn start_consensus( aptos_channels::new_unbounded(&counters::PENDING_SELF_MESSAGES); let consensus_network_client = ConsensusNetworkClient::new(network_client); let bounded_executor = BoundedExecutor::new(8, runtime.handle().clone()); + let rand_storage = Arc::new(RandDb::new(node_config.storage.dir())); let execution_client = Arc::new(ExecutionProxyClient::new( + node_config.consensus.clone(), Arc::new(execution_proxy), node_config.validator_network.as_ref().unwrap().peer_id(), self_sender.clone(), consensus_network_client.clone(), bounded_executor.clone(), + rand_storage.clone(), )); let epoch_mgr = EpochManager::new( @@ -89,6 +93,7 @@ pub fn start_consensus( bounded_executor, aptos_time_service::TimeService::real(), vtxn_pool, + rand_storage, ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index b36aa0ed03db6..5cca509e41496 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -29,7 +29,7 @@ use crate::{ monitor, network::{ IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest, - IncomingRpcRequest, NetworkReceivers, NetworkSender, + IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender, }, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_client::{ @@ -43,22 +43,31 @@ use crate::{ quorum_store_coordinator::CoordinatorCommand, quorum_store_db::QuorumStoreStorage, }, + rand::rand_gen::{ + storage::interface::RandStorage, + types::{AugmentedData, RandConfig}, + }, recovery_manager::RecoveryManager, round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent}, util::time_service::TimeService, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context}; use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{ ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig, QcAggregatorType, - SecureBackend, + SafetyRulesConfig, SecureBackend, }; use aptos_consensus_types::{ common::{Author, Round}, delayed_qc_msg::DelayedQcMsg, epoch_retrieval::EpochRetrievalRequest, }; +use aptos_crypto::bls12381; +use aptos_dkg::{ + pvss::{traits::Transcript, Player}, + weighted_vuf::traits::WeightedVUF, +}; use aptos_event_notifications::ReconfigNotificationListener; use aptos_global_constants::CONSENSUS_KEY; use aptos_infallible::{duration_since_epoch, Mutex}; @@ -69,12 +78,14 @@ use aptos_safety_rules::SafetyRulesManager; use aptos_secure_storage::{KVStorage, Storage}; use aptos_types::{ account_address::AccountAddress, + dkg::{real_dkg::maybe_dk_from_bls_sk, DKGState, DKGTrait, DefaultDKG}, epoch_change::EpochChangeProof, epoch_state::EpochState, on_chain_config::{ - Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider, + FeatureFlag, Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig, OnChainExecutionConfig, ProposerElectionType, ValidatorSet, }, + randomness::{RandKeys, WvufPP, WVUF}, validator_signer::ValidatorSigner, }; use aptos_validator_transaction_pool::VTxnPoolState; @@ -88,6 +99,7 @@ use futures::{ SinkExt, StreamExt, }; use itertools::Itertools; +use rand::{prelude::StdRng, thread_rng, SeedableRng}; use std::{ cmp::Ordering, collections::HashMap, @@ -128,6 +140,8 @@ pub struct EpochManager { safety_rules_manager: SafetyRulesManager, vtxn_pool: VTxnPoolState, reconfig_events: ReconfigNotificationListener

, + // channels to rand manager + rand_manager_msg_tx: Option>, // channels to round manager round_manager_tx: Option< aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, @@ -151,6 +165,7 @@ pub struct EpochManager { dag_shutdown_tx: Option>>, dag_config: DagConsensusConfig, payload_manager: Arc, + rand_storage: Arc>, } impl EpochManager

{ @@ -168,6 +183,7 @@ impl EpochManager

{ bounded_executor: BoundedExecutor, aptos_time_service: aptos_time_service::TimeService, vtxn_pool: VTxnPoolState, + rand_storage: Arc>, ) -> Self { let author = node_config.validator_network.as_ref().unwrap().peer_id(); let config = node_config.consensus.clone(); @@ -191,6 +207,7 @@ impl EpochManager

{ safety_rules_manager, vtxn_pool, reconfig_events, + rand_manager_msg_tx: None, round_manager_tx: None, round_manager_close_tx: None, buffered_proposal_tx: None, @@ -207,6 +224,7 @@ impl EpochManager

{ aptos_time_service, dag_config, payload_manager: Arc::new(PayloadManager::DirectMempool), + rand_storage, } } @@ -574,6 +592,9 @@ impl EpochManager

{ } self.dag_shutdown_tx = None; + // Shutdown the previous rand manager + self.rand_manager_msg_tx = None; + // Shutdown the previous buffer manager, to release the SafetyRule client self.execution_client.end_epoch().await; @@ -704,7 +725,9 @@ impl EpochManager

{ network_sender: Arc, payload_client: Arc, payload_manager: Arc, + rand_config: Option, features: Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; info!( @@ -750,8 +773,11 @@ impl EpochManager

{ epoch_state.clone(), safety_rules_container.clone(), payload_manager.clone(), + &onchain_consensus_config, &onchain_execution_config, &features, + rand_config, + rand_msg_rx, ) .await; @@ -814,7 +840,7 @@ impl EpochManager

{ onchain_consensus_config, buffered_proposal_tx, self.config.clone(), - features, + features.clone(), true, ); @@ -850,6 +876,98 @@ impl EpochManager

{ ) } + fn try_get_rand_config_for_new_epoch( + &self, + new_epoch_state: &EpochState, + features: &Features, + maybe_dkg_state: anyhow::Result, + consensus_config: &OnChainConsensusConfig, + ) -> Result { + if !consensus_config.is_vtxn_enabled() { + return Err(NoRandomnessReason::VTxnDisabled); + } + if !features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG) { + return Err(NoRandomnessReason::FeatureDisabled); + } + let new_epoch = new_epoch_state.epoch; + + let dkg_state = maybe_dkg_state.map_err(NoRandomnessReason::DKGStateResourceMissing)?; + let dkg_session = dkg_state + .last_completed + .ok_or_else(|| NoRandomnessReason::DKGCompletedSessionResourceMissing)?; + if dkg_session.metadata.dealer_epoch + 1 != new_epoch_state.epoch { + return Err(NoRandomnessReason::CompletedSessionTooOld); + } + let dkg_pub_params = DefaultDKG::new_public_params(&dkg_session.metadata); + let my_index = new_epoch_state + .verifier + .address_to_validator_index() + .get(&self.author) + .copied() + .ok_or_else(|| NoRandomnessReason::NotInValidatorSet)?; + + let dkg_decrypt_key = load_dkg_decrypt_key(&self.config.safety_rules) + .ok_or_else(|| NoRandomnessReason::DKGDecryptKeyUnavailable)?; + let transcript = bcs::from_bytes::<::Transcript>( + dkg_session.transcript.as_slice(), + ) + .map_err(NoRandomnessReason::TranscriptDeserializationError)?; + + let vuf_pp = WvufPP::from(&dkg_pub_params.pvss_config.pp); + + // No need to verify the transcript. + + // keys for randomness generation + let (sk, pk) = DefaultDKG::decrypt_secret_share_from_transcript( + &dkg_pub_params, + &transcript, + my_index as u64, + &dkg_decrypt_key, + ) + .map_err(NoRandomnessReason::SecretShareDecryptionFailed)?; + + let pk_shares = (0..new_epoch_state.verifier.len()) + .map(|id| { + transcript.get_public_key_share(&dkg_pub_params.pvss_config.wconfig, &Player { id }) + }) + .collect::>(); + + // Recover existing augmented key pair or generate a new one + let (ask, apk) = if let Some((_, key_pair)) = self + .rand_storage + .get_key_pair_bytes() + .map_err(NoRandomnessReason::RandDbNotAvailable)? + .filter(|(epoch, _)| *epoch == new_epoch) + { + bcs::from_bytes(&key_pair).map_err(NoRandomnessReason::KeyPairDeserializationError)? + } else { + let mut rng = + StdRng::from_rng(thread_rng()).map_err(NoRandomnessReason::RngCreationError)?; + let augmented_key_pair = WVUF::augment_key_pair(&vuf_pp, sk, pk, &mut rng); + self.rand_storage + .save_key_pair_bytes( + new_epoch, + bcs::to_bytes(&augmented_key_pair) + .map_err(NoRandomnessReason::KeyPairSerializationError)?, + ) + .map_err(NoRandomnessReason::KeyPairPersistError)?; + augmented_key_pair + }; + + let keys = RandKeys::new(ask, apk, pk_shares, new_epoch_state.verifier.len()); + + let rand_config = RandConfig::new( + self.author, + new_epoch, + new_epoch_state.verifier.clone(), + vuf_pp, + keys, + dkg_pub_params.pvss_config.wconfig.clone(), + ); + + Ok(rand_config) + } + async fn start_new_epoch(&mut self, payload: OnChainConfigPayload

) { let validator_set: ValidatorSet = payload .get() @@ -859,9 +977,12 @@ impl EpochManager

{ verifier: (&validator_set).into(), }); + self.epoch_state = Some(epoch_state.clone()); + let onchain_consensus_config: anyhow::Result = payload.get(); let onchain_execution_config: anyhow::Result = payload.get(); let features = payload.get::(); + let dkg_state = payload.get::(); if let Err(error) = &onchain_consensus_config { error!("Failed to read on-chain consensus config {}", error); @@ -882,10 +1003,30 @@ impl EpochManager

{ .unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing()); let features = features.unwrap_or_default(); + let rand_config = self.try_get_rand_config_for_new_epoch( + &epoch_state, + &features, + dkg_state, + &consensus_config, + ); + info!( + "[Randomness] start_new_epoch: epoch={}, rand_config={:?}, ", + epoch_state.epoch, rand_config + ); // The sk inside has `SlientDebug`. + let rand_config = rand_config.ok(); + let (network_sender, payload_client, payload_manager) = self .initialize_shared_component(&epoch_state, &consensus_config) .await; + let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::( + QueueStyle::FIFO, + 100, + None, + ); + + self.rand_manager_msg_tx = Some(rand_msg_tx); + if consensus_config.is_dag_enabled() { self.start_new_epoch_with_dag( epoch_state, @@ -894,7 +1035,9 @@ impl EpochManager

{ network_sender, payload_client, payload_manager, - features, + rand_config, + &features, + rand_msg_rx, ) .await } else { @@ -905,7 +1048,9 @@ impl EpochManager

{ network_sender, payload_client, payload_manager, - features, + rand_config, + &features, + rand_msg_rx, ) .await } @@ -945,7 +1090,9 @@ impl EpochManager

{ network_sender: NetworkSender, payload_client: Arc, payload_manager: Arc, - features: Features, + rand_config: Option, + features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { @@ -958,7 +1105,9 @@ impl EpochManager

{ Arc::new(network_sender), payload_client, payload_manager, - features, + rand_config, + features.clone(), + rand_msg_rx, ) .await }, @@ -983,12 +1132,15 @@ impl EpochManager

{ network_sender: NetworkSender, payload_client: Arc, payload_manager: Arc, - features: Features, + rand_config: Option, + features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; - - let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); - let commit_signer = Arc::new(DagCommitSigner::new(signer)); + let consensus_key = new_consensus_key_from_storage(&self.config.safety_rules.backend) + .expect("unable to get private key"); + let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); + let commit_signer = Arc::new(DagCommitSigner::new(signer.clone())); assert!( onchain_consensus_config.decoupled_execution(), @@ -1000,8 +1152,11 @@ impl EpochManager

{ epoch_state.clone(), commit_signer, payload_manager.clone(), + &onchain_consensus_config, &on_chain_execution_config, - &features, + features, + rand_config, + rand_msg_rx, ) .await; @@ -1019,7 +1174,6 @@ impl EpochManager

{ self.storage.aptos_db(), )); - let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); let network_sender_arc = Arc::new(network_sender); let bootstrapper = DagBootstrapper::new( @@ -1040,7 +1194,7 @@ impl EpochManager

{ onchain_consensus_config.quorum_store_enabled(), onchain_consensus_config.effective_validator_txn_config(), self.bounded_executor.clone(), - features, + features.clone(), ); let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None); @@ -1323,7 +1477,13 @@ impl EpochManager

{ IncomingRpcRequest::CommitRequest(request) => { self.execution_client.send_commit_msg(peer_id, request) }, - IncomingRpcRequest::RandGenRequest(_) => Ok(()), + IncomingRpcRequest::RandGenRequest(request) => { + if let Some(tx) = &self.rand_manager_msg_tx { + tx.push(peer_id, request) + } else { + bail!("Rand manager not started"); + } + }, } } @@ -1394,15 +1554,69 @@ impl EpochManager

{ } } -#[allow(dead_code)] -fn new_signer_from_storage(author: Author, backend: &SecureBackend) -> Arc { +fn new_consensus_key_from_storage(backend: &SecureBackend) -> anyhow::Result { let storage: Storage = backend.into(); - if let Err(error) = storage.available() { - panic!("Storage is not available: {:?}", error); - } - let private_key = storage + storage + .available() + .map_err(|e| anyhow!("Storage is not available: {e}"))?; + storage .get(CONSENSUS_KEY) .map(|v| v.value) - .expect("Unable to get private key"); - Arc::new(ValidatorSigner::new(author, private_key)) + .map_err(|e| anyhow!("storage get and map err: {e}")) +} + +fn load_dkg_decrypt_key_from_identity_blob( + config: &SafetyRulesConfig, +) -> anyhow::Result<::NewValidatorDecryptKey> { + let identity_blob = config.initial_safety_rules_config.identity_blob()?; + identity_blob.try_into_dkg_new_validator_decrypt_key() +} + +fn load_dkg_decrypt_key_from_secure_storage( + config: &SafetyRulesConfig, +) -> anyhow::Result<::NewValidatorDecryptKey> { + let consensus_key = new_consensus_key_from_storage(&config.backend)?; + maybe_dk_from_bls_sk(&consensus_key) +} + +fn load_dkg_decrypt_key( + config: &SafetyRulesConfig, +) -> Option<::NewValidatorDecryptKey> { + match load_dkg_decrypt_key_from_secure_storage(config) { + Ok(dk) => { + return Some(dk); + }, + Err(e) => { + warn!("{e}"); + }, + } + + match load_dkg_decrypt_key_from_identity_blob(config) { + Ok(dk) => { + return Some(dk); + }, + Err(e) => { + warn!("{e}"); + }, + } + + None +} + +#[derive(Debug)] +enum NoRandomnessReason { + VTxnDisabled, + FeatureDisabled, + DKGStateResourceMissing(anyhow::Error), + DKGCompletedSessionResourceMissing, + CompletedSessionTooOld, + NotInValidatorSet, + DKGDecryptKeyUnavailable, + TranscriptDeserializationError(bcs::Error), + SecretShareDecryptionFailed(anyhow::Error), + RngCreationError(rand::Error), + RandDbNotAvailable(anyhow::Error), + KeyPairDeserializationError(bcs::Error), + KeyPairSerializationError(bcs::Error), + KeyPairPersistError(anyhow::Error), } diff --git a/consensus/src/pipeline/errors.rs b/consensus/src/pipeline/errors.rs index cdb8396bb00a7..746228438eceb 100644 --- a/consensus/src/pipeline/errors.rs +++ b/consensus/src/pipeline/errors.rs @@ -15,4 +15,6 @@ pub enum Error { VerificationError, #[error("Reset host dropped")] ResetDropped, + #[error("Rand Reset host dropped")] + RandResetDropped, } diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs index fdeacae3dae03..fcc81ab66441a 100644 --- a/consensus/src/pipeline/execution_client.rs +++ b/consensus/src/pipeline/execution_client.rs @@ -5,7 +5,7 @@ use crate::{ counters, error::StateSyncError, - network::{IncomingCommitRequest, NetworkSender}, + network::{IncomingCommitRequest, IncomingRandGenRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_manager::PayloadManager, pipeline::{ @@ -14,6 +14,11 @@ use crate::{ errors::Error, signing_phase::CommitSignerProvider, }, + rand::rand_gen::{ + rand_manager::RandManager, + storage::interface::RandStorage, + types::{AugmentedData, RandConfig, Share}, + }, state_computer::ExecutionProxy, state_replication::{StateComputer, StateComputerCommitCallBackType}, transaction_deduper::create_transaction_deduper, @@ -22,15 +27,18 @@ use crate::{ use anyhow::Result; use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_config::config::ConsensusConfig; use aptos_consensus_types::{common::Author, pipelined_block::PipelinedBlock}; use aptos_executor_types::ExecutorResult; use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_network::{application::interface::NetworkClient, protocols::network::Event}; +use aptos_safety_rules::safety_rules_manager::load_consensus_key_from_secure_storage; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - on_chain_config::{FeatureFlag, Features, OnChainExecutionConfig}, + on_chain_config::{FeatureFlag, Features, OnChainConsensusConfig, OnChainExecutionConfig}, + validator_signer::ValidatorSigner, }; use fail::fail_point; use futures::{ @@ -49,8 +57,11 @@ pub trait TExecutionClient: Send + Sync { epoch_state: Arc, commit_signer_provider: Arc, payload_manager: Arc, + onchain_consensus_config: &OnChainConsensusConfig, onchain_execution_config: &OnChainExecutionConfig, features: &Features, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ); /// This is needed for some DAG tests. Clean this up as a TODO. @@ -80,7 +91,8 @@ pub trait TExecutionClient: Send + Sync { struct BufferManagerHandle { pub execute_tx: Option>, pub commit_tx: Option>, - pub reset_tx: Option>, + pub reset_tx_to_buffer_manager: Option>, + pub reset_tx_to_rand_manager: Option>, } impl BufferManagerHandle { @@ -88,7 +100,8 @@ impl BufferManagerHandle { Self { execute_tx: None, commit_tx: None, - reset_tx: None, + reset_tx_to_buffer_manager: None, + reset_tx_to_rand_manager: None, } } @@ -96,23 +109,31 @@ impl BufferManagerHandle { &mut self, execute_tx: UnboundedSender, commit_tx: aptos_channel::Sender, - reset_tx: UnboundedSender, + reset_tx_to_buffer_manager: UnboundedSender, + reset_tx_to_rand_manager: Option>, ) { self.execute_tx = Some(execute_tx); self.commit_tx = Some(commit_tx); - self.reset_tx = Some(reset_tx); + self.reset_tx_to_buffer_manager = Some(reset_tx_to_buffer_manager); + self.reset_tx_to_rand_manager = reset_tx_to_rand_manager; } - pub fn reset(&mut self) -> Option> { - let reset_tx = self.reset_tx.take(); + pub fn reset( + &mut self, + ) -> ( + Option>, + Option>, + ) { + let reset_tx_to_rand_manager = self.reset_tx_to_rand_manager.take(); + let reset_tx_to_buffer_manager = self.reset_tx_to_buffer_manager.take(); self.execute_tx = None; self.commit_tx = None; - self.reset_tx = None; - reset_tx + (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) } } pub struct ExecutionProxyClient { + consensus_config: ConsensusConfig, execution_proxy: Arc, author: Author, self_sender: aptos_channels::UnboundedSender>, @@ -120,23 +141,28 @@ pub struct ExecutionProxyClient { bounded_executor: BoundedExecutor, // channels to buffer manager handle: Arc>, + rand_storage: Arc>, } impl ExecutionProxyClient { pub fn new( + consensus_config: ConsensusConfig, execution_proxy: Arc, author: Author, self_sender: aptos_channels::UnboundedSender>, network_sender: ConsensusNetworkClient>, bounded_executor: BoundedExecutor, + rand_storage: Arc>, ) -> Self { Self { + consensus_config, execution_proxy, author, self_sender, network_sender, bounded_executor, handle: Arc::new(RwLock::new(BufferManagerHandle::new())), + rand_storage, } } @@ -144,6 +170,8 @@ impl ExecutionProxyClient { &self, commit_signer_provider: Arc, epoch_state: Arc, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ) { let network_sender = NetworkSender::new( self.author, @@ -152,8 +180,7 @@ impl ExecutionProxyClient { epoch_state.verifier.clone(), ); - let (block_tx, block_rx) = unbounded::(); - let (reset_tx, reset_rx) = unbounded::(); + let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::(); let (commit_msg_tx, commit_msg_rx) = aptos_channel::new::( @@ -162,7 +189,51 @@ impl ExecutionProxyClient { Some(&counters::BUFFER_MANAGER_MSGS), ); - self.handle.write().init(block_tx, commit_msg_tx, reset_tx); + let (execution_ready_block_tx, execution_ready_block_rx, maybe_reset_tx_to_rand_manager) = + if let Some(rand_config) = rand_config { + let (ordered_block_tx, ordered_block_rx) = unbounded::(); + let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::(); + + let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::(); + let consensus_key = + load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules) + .expect("Failed in loading consensus key for ExecutionProxyClient."); + let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); + + let rand_manager = RandManager::::new( + self.author, + epoch_state.clone(), + signer, + rand_config, + rand_ready_block_tx, + Arc::new(network_sender.clone()), + self.rand_storage.clone(), + self.bounded_executor.clone(), + ); + + tokio::spawn(rand_manager.start( + ordered_block_rx, + rand_msg_rx, + reset_rand_manager_rx, + self.bounded_executor.clone(), + )); + + ( + ordered_block_tx, + rand_ready_block_rx, + Some(reset_tx_to_rand_manager), + ) + } else { + let (ordered_block_tx, ordered_block_rx) = unbounded(); + (ordered_block_tx, ordered_block_rx, None) + }; + + self.handle.write().init( + execution_ready_block_tx, + commit_msg_tx, + reset_buffer_manager_tx, + maybe_reset_tx_to_rand_manager, + ); let ( execution_schedule_phase, @@ -177,8 +248,8 @@ impl ExecutionProxyClient { network_sender, commit_msg_rx, self.execution_proxy.clone(), - block_rx, - reset_rx, + execution_ready_block_rx, + reset_buffer_manager_rx, epoch_state, self.bounded_executor.clone(), ); @@ -198,10 +269,18 @@ impl TExecutionClient for ExecutionProxyClient { epoch_state: Arc, commit_signer_provider: Arc, payload_manager: Arc, + onchain_consensus_config: &OnChainConsensusConfig, onchain_execution_config: &OnChainExecutionConfig, features: &Features, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ) { - self.spawn_decoupled_execution(commit_signer_provider, epoch_state.clone()); + let maybe_rand_msg_tx = self.spawn_decoupled_execution( + commit_signer_provider, + epoch_state.clone(), + rand_config, + rand_msg_rx, + ); let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); @@ -209,14 +288,18 @@ impl TExecutionClient for ExecutionProxyClient { onchain_execution_config.block_executor_onchain_config(); let transaction_deduper = create_transaction_deduper(onchain_execution_config.transaction_deduper_type()); + let randomness_enabled = onchain_consensus_config.is_vtxn_enabled() + && features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG); self.execution_proxy.new_epoch( &epoch_state, payload_manager, transaction_shuffler, block_executor_onchain_config, transaction_deduper, - features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG), + randomness_enabled, ); + + maybe_rand_msg_tx } fn get_execution_channel(&self) -> Option> { @@ -280,9 +363,27 @@ impl TExecutionClient for ExecutionProxyClient { Err(anyhow::anyhow!("Injected error in sync_to").into()) }); - let reset_tx = self.handle.read().reset_tx.clone(); + let (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) = { + let handle = self.handle.read(); + ( + handle.reset_tx_to_rand_manager.clone(), + handle.reset_tx_to_buffer_manager.clone(), + ) + }; - if let Some(mut reset_tx) = reset_tx { + if let Some(mut reset_tx) = reset_tx_to_rand_manager { + let (ack_tx, ack_rx) = oneshot::channel::(); + reset_tx + .send(ResetRequest { + tx: ack_tx, + signal: ResetSignal::TargetRound(target.commit_info().round()), + }) + .await + .map_err(|_| Error::RandResetDropped)?; + ack_rx.await.map_err(|_| Error::RandResetDropped)?; + } + + if let Some(mut reset_tx) = reset_tx_to_buffer_manager { // reset execution phase and commit phase let (tx, rx) = oneshot::channel::(); reset_tx @@ -302,8 +403,25 @@ impl TExecutionClient for ExecutionProxyClient { } async fn end_epoch(&self) { - let reset_tx = self.handle.write().reset(); - if let Some(mut tx) = reset_tx { + let (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) = { + let mut handle = self.handle.write(); + handle.reset() + }; + + if let Some(mut tx) = reset_tx_to_rand_manager { + let (ack_tx, ack_rx) = oneshot::channel(); + tx.send(ResetRequest { + tx: ack_tx, + signal: ResetSignal::Stop, + }) + .await + .expect("[EpochManager] Fail to drop rand manager"); + ack_rx + .await + .expect("[EpochManager] Fail to drop rand manager"); + } + + if let Some(mut tx) = reset_tx_to_buffer_manager { let (ack_tx, ack_rx) = oneshot::channel(); tx.send(ResetRequest { tx: ack_tx, @@ -328,8 +446,11 @@ impl TExecutionClient for DummyExecutionClient { _epoch_state: Arc, _commit_signer_provider: Arc, _payload_manager: Arc, + _onchain_consensus_config: &OnChainConsensusConfig, _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, + _rand_config: Option, + _rand_msg_rx: aptos_channel::Receiver, ) { } diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index 82d30670b84a8..579812c75207b 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -4,16 +4,18 @@ use crate::{ error::StateSyncError, - network::IncomingCommitRequest, + network::{IncomingCommitRequest, IncomingRandGenRequest}, payload_manager::PayloadManager, pipeline::{ buffer_manager::OrderedBlocks, execution_client::TExecutionClient, signing_phase::CommitSignerProvider, }, + rand::rand_gen::types::RandConfig, state_replication::StateComputerCommitCallBackType, test_utils::mock_storage::MockStorage, }; use anyhow::{format_err, Result}; +use aptos_channels::aptos_channel; use aptos_consensus_types::{common::Payload, pipelined_block::PipelinedBlock}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; @@ -22,7 +24,7 @@ use aptos_logger::prelude::*; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - on_chain_config::{Features, OnChainExecutionConfig}, + on_chain_config::{Features, OnChainConsensusConfig, OnChainExecutionConfig}, transaction::SignedTransaction, }; use futures::{channel::mpsc, SinkExt}; @@ -92,8 +94,11 @@ impl TExecutionClient for MockExecutionClient { _epoch_state: Arc, _commit_signer_provider: Arc, _payload_manager: Arc, + _onchain_consensus_config: &OnChainConsensusConfig, _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, + _rand_config: Option, + _rand_msg_rx: aptos_channel::Receiver, ) { } diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index b47bc28997db2..3456cd3b58357 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -11,6 +11,7 @@ use crate::{ payload_manager::PayloadManager, pipeline::buffer_manager::OrderedBlocks, quorum_store::quorum_store_db::MockQuorumStoreDB, + rand::rand_gen::storage::in_memory::InMemRandDb, test_utils::{mock_execution_client::MockExecutionClient, MockStorage}, util::time_service::ClockTimeService, }; @@ -162,6 +163,7 @@ impl SMRNode { bounded_executor, aptos_time_service::TimeService::real(), vtxn_pool, + Arc::new(InMemRandDb::new()), ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/dkg/Cargo.toml b/dkg/Cargo.toml index dc338ebe86ca1..5ecd984e3bdb0 100644 --- a/dkg/Cargo.toml +++ b/dkg/Cargo.toml @@ -36,6 +36,7 @@ aptos-validator-transaction-pool = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } +fail = { workspace = true } futures = { workspace = true } futures-channel = { workspace = true } futures-util = { workspace = true } diff --git a/dkg/src/agg_trx_producer.rs b/dkg/src/agg_trx_producer.rs index 7d76caab5c294..b42ed929234f6 100644 --- a/dkg/src/agg_trx_producer.rs +++ b/dkg/src/agg_trx_producer.rs @@ -4,11 +4,13 @@ use crate::{ transcript_aggregation::TranscriptAggregationState, types::DKGTranscriptRequest, DKGMessage, }; use aptos_channels::aptos_channel::Sender; +use aptos_logger::info; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_types::{dkg::DKGTrait, epoch_state::EpochState}; use futures::future::AbortHandle; use futures_util::future::Abortable; -use std::sync::Arc; +use move_core_types::account_address::AccountAddress; +use std::{sync::Arc, time::Duration}; use tokio_retry::strategy::ExponentialBackoff; /// A sub-process of the whole DKG process. @@ -18,6 +20,8 @@ use tokio_retry::strategy::ExponentialBackoff; pub trait TAggTranscriptProducer: Send + Sync { fn start_produce( &self, + start_time: Duration, + my_addr: AccountAddress, epoch_state: Arc, dkg_config: S::PublicParams, agg_trx_tx: Option>, @@ -40,17 +44,38 @@ impl AggTranscriptProducer { impl TAggTranscriptProducer for AggTranscriptProducer { fn start_produce( &self, + start_time: Duration, + my_addr: AccountAddress, epoch_state: Arc, params: DKG::PublicParams, agg_trx_tx: Option>, ) -> AbortHandle { + let epoch = epoch_state.epoch; let rb = self.reliable_broadcast.clone(); let req = DKGTranscriptRequest::new(epoch_state.epoch); - let agg_state = Arc::new(TranscriptAggregationState::::new(params, epoch_state)); + let agg_state = Arc::new(TranscriptAggregationState::::new( + start_time, + my_addr, + params, + epoch_state, + )); let task = async move { let agg_trx = rb.broadcast(req, agg_state).await; - if let Some(tx) = agg_trx_tx { - let _ = tx.push((), agg_trx); // If the `DKGManager` was dropped, this send will fail by design. + info!( + epoch = epoch, + my_addr = my_addr, + "[DKG] aggregated transcript locally" + ); + if let Err(e) = agg_trx_tx + .expect("[DKG] agg_trx_tx should be available") + .push((), agg_trx) + { + // If the `DKGManager` was dropped, this send will fail by design. + info!( + epoch = epoch, + my_addr = my_addr, + "[DKG] Failed to send aggregated transcript to DKGManager, maybe DKGManager stopped and channel dropped: {:?}", e + ); } }; let (abort_handle, abort_registration) = AbortHandle::new_pair(); @@ -66,6 +91,8 @@ pub struct DummyAggTranscriptProducer {} impl TAggTranscriptProducer for DummyAggTranscriptProducer { fn start_produce( &self, + _start_time: Duration, + _my_addr: AccountAddress, _epoch_state: Arc, _dkg_config: DKG::PublicParams, _agg_trx_tx: Option>, diff --git a/dkg/src/counters.rs b/dkg/src/counters.rs index 7c490550bed79..da128591e19b3 100644 --- a/dkg/src/counters.rs +++ b/dkg/src/counters.rs @@ -1,6 +1,6 @@ // Copyright © Aptos Foundation -use aptos_metrics_core::{register_int_gauge, IntGauge}; +use aptos_metrics_core::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge}; use once_cell::sync::Lazy; /// Count of the pending messages sent to itself in the channel @@ -11,3 +11,12 @@ pub static PENDING_SELF_MESSAGES: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static DKG_STAGE_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + "aptos_dkg_session_stage_seconds", + "How long it takes to reach different DKG stages", + &["dealer", "stage"] + ) + .unwrap() +}); diff --git a/dkg/src/dkg_manager/mod.rs b/dkg/src/dkg_manager/mod.rs index c0193bc23ddb0..eae970cf83464 100644 --- a/dkg/src/dkg_manager/mod.rs +++ b/dkg/src/dkg_manager/mod.rs @@ -1,9 +1,13 @@ // Copyright © Aptos Foundation -use crate::{agg_trx_producer::TAggTranscriptProducer, network::IncomingRpcRequest, DKGMessage}; +use crate::{ + agg_trx_producer::TAggTranscriptProducer, counters::DKG_STAGE_SECONDS, + network::IncomingRpcRequest, DKGMessage, +}; use anyhow::{anyhow, bail, ensure, Result}; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_crypto::Uniform; -use aptos_logger::error; +use aptos_infallible::duration_since_epoch; +use aptos_logger::{debug, error, info}; use aptos_types::{ dkg::{ DKGSessionMetadata, DKGSessionState, DKGStartEvent, DKGTrait, DKGTranscript, @@ -13,62 +17,35 @@ use aptos_types::{ validator_txn::{Topic, ValidatorTransaction}, }; use aptos_validator_transaction_pool::{TxnGuard, VTxnPoolState}; +use fail::fail_point; use futures_channel::oneshot; use futures_util::{future::AbortHandle, FutureExt, StreamExt}; use move_core_types::account_address::AccountAddress; use rand::{prelude::StdRng, thread_rng, SeedableRng}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -#[allow(dead_code)] #[derive(Clone, Debug)] -enum InnerState { +enum InnerState { NotStarted, InProgress { - start_time_us: u64, - public_params: DKG::PublicParams, + start_time: Duration, my_transcript: DKGTranscript, abort_handle: AbortHandle, }, Finished { vtxn_guard: TxnGuard, - start_time_us: u64, + start_time: Duration, my_transcript: DKGTranscript, - pull_confirmed: bool, + proposed: bool, }, } -impl InnerState { - fn variant_name(&self) -> &str { - match self { - InnerState::NotStarted => "NotStarted", - InnerState::InProgress { .. } => "InProgress", - InnerState::Finished { .. } => "Finished", - } - } - - #[cfg(test)] - pub fn my_node_cloned(&self) -> DKGTranscript { - match self { - InnerState::NotStarted => panic!("my_node unavailable"), - InnerState::InProgress { - my_transcript: my_node, - .. - } - | InnerState::Finished { - my_transcript: my_node, - .. - } => my_node.clone(), - } - } -} - -impl Default for InnerState { +impl Default for InnerState { fn default() -> Self { Self::NotStarted } } -#[allow(dead_code)] pub struct DKGManager { dealer_sk: Arc, my_index: usize, @@ -85,7 +62,26 @@ pub struct DKGManager { // Control states. stopped: bool, - state: InnerState, + state: InnerState, +} + +impl InnerState { + fn variant_name(&self) -> &str { + match self { + InnerState::NotStarted => "NotStarted", + InnerState::InProgress { .. } => "InProgress", + InnerState::Finished { .. } => "Finished", + } + } + + #[cfg(test)] + pub fn my_node_cloned(&self) -> DKGTranscript { + match self { + InnerState::NotStarted => panic!("my_node unavailable"), + InnerState::InProgress { my_transcript, .. } + | InnerState::Finished { my_transcript, .. } => my_transcript.clone(), + } + } } impl DKGManager { @@ -101,8 +97,8 @@ impl DKGManager { aptos_channel::new(QueueStyle::KLAST, 1, None); Self { dealer_sk, - my_index, my_addr, + my_index, epoch_state, vtxn_pool, agg_trx_tx: None, @@ -117,60 +113,128 @@ impl DKGManager { pub async fn run( mut self, in_progress_session: Option, - dkg_start_event_rx: oneshot::Receiver, + mut dkg_start_event_rx: aptos_channel::Receiver<(), DKGStartEvent>, mut rpc_msg_rx: aptos_channel::Receiver< AccountAddress, (AccountAddress, IncomingRpcRequest), >, close_rx: oneshot::Receiver>, ) { + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr.to_hex().as_str(), + "[DKG] DKGManager started." + ); + let mut interval = tokio::time::interval(Duration::from_millis(5000)); + + let (agg_trx_tx, mut agg_trx_rx) = aptos_channel::new(QueueStyle::KLAST, 1, None); + self.agg_trx_tx = Some(agg_trx_tx); + if let Some(session_state) = in_progress_session { let DKGSessionState { - metadata, start_time_us, + metadata, .. } = session_state; - self.setup_deal_broadcast(start_time_us, &metadata) - .await - .expect("setup_deal_broadcast() should be infallible"); - } - let (agg_trx_tx, mut agg_trx_rx) = aptos_channel::new(QueueStyle::KLAST, 1, None); - self.agg_trx_tx = Some(agg_trx_tx); + if metadata.dealer_epoch == self.epoch_state.epoch { + info!( + epoch = self.epoch_state.epoch, + "Found unfinished and current DKG session. Continuing it." + ); + if let Err(e) = self.setup_deal_broadcast(start_time_us, &metadata).await { + error!(epoch = self.epoch_state.epoch, "dkg resumption failed: {e}"); + } + } else { + info!( + cur_epoch = self.epoch_state.epoch, + dealer_epoch = metadata.dealer_epoch, + "Found unfinished but stale DKG session. Ignoring it." + ); + } + } - let mut dkg_start_event_rx = dkg_start_event_rx.into_stream(); let mut close_rx = close_rx.into_stream(); while !self.stopped { let handling_result = tokio::select! { dkg_start_event = dkg_start_event_rx.select_next_some() => { - self.process_dkg_start_event(dkg_start_event.ok()).await + self.process_dkg_start_event(dkg_start_event) + .await + .map_err(|e|anyhow!("[DKG] process_dkg_start_event failed: {e}")) }, (_sender, msg) = rpc_msg_rx.select_next_some() => { - self.process_peer_rpc_msg(msg).await + self.process_peer_rpc_msg(msg) + .await + .map_err(|e|anyhow!("[DKG] process_peer_rpc_msg failed: {e}")) }, - agg_node = agg_trx_rx.select_next_some() => { - self.process_aggregated_transcript(agg_node).await + agg_transcript = agg_trx_rx.select_next_some() => { + self.process_aggregated_transcript(agg_transcript) + .await + .map_err(|e|anyhow!("[DKG] process_aggregated_transcript failed: {e}")) + }, dkg_txn = self.pull_notification_rx.select_next_some() => { - self.process_dkg_txn_pulled_notification(dkg_txn).await + self.process_dkg_txn_pulled_notification(dkg_txn) + .await + .map_err(|e|anyhow!("[DKG] process_dkg_txn_pulled_notification failed: {e}")) }, close_req = close_rx.select_next_some() => { self.process_close_cmd(close_req.ok()) - } + }, + _ = interval.tick().fuse() => { + self.observe() + }, }; if let Err(e) = handling_result { - error!("{}", e); + error!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr.to_hex().as_str(), + "[DKG] DKGManager handling error: {e}" + ); } } + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr.to_hex().as_str(), + "[DKG] DKGManager finished." + ); + } + + fn observe(&self) -> Result<()> { + debug!("[DKG] dkg_manager_state={:?}", self.state); + Ok(()) } /// On a CLOSE command from epoch manager, do clean-up. fn process_close_cmd(&mut self, ack_tx: Option>) -> Result<()> { self.stopped = true; - if let InnerState::InProgress { abort_handle, .. } = &self.state { - abort_handle.abort(); + match std::mem::take(&mut self.state) { + InnerState::NotStarted => {}, + InnerState::InProgress { abort_handle, .. } => { + abort_handle.abort(); + }, + InnerState::Finished { + vtxn_guard, + start_time, + .. + } => { + let epoch_change_time = duration_since_epoch(); + let secs_since_dkg_start = + epoch_change_time.as_secs_f64() - start_time.as_secs_f64(); + DKG_STAGE_SECONDS + .with_label_values(&[self.my_addr.to_hex().as_str(), "epoch_change"]) + .observe(secs_since_dkg_start); + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + secs_since_dkg_start = secs_since_dkg_start, + "[DKG] txn executed and entering new epoch.", + ); + + drop(vtxn_guard); + }, } if let Some(tx) = ack_tx { @@ -185,13 +249,33 @@ impl DKGManager { &mut self, _txn: Arc, ) -> Result<()> { - if let InnerState::Finished { pull_confirmed, .. } = &mut self.state { - if !*pull_confirmed { - // TODO(zjma): metric DKG_AGG_NODE_PROPOSED - } - *pull_confirmed = true; + match &mut self.state { + InnerState::Finished { + start_time, + proposed, + .. + } => { + if !*proposed { + *proposed = true; + let proposed_time = duration_since_epoch(); + let secs_since_dkg_start = + proposed_time.as_secs_f64() - start_time.as_secs_f64(); + DKG_STAGE_SECONDS + .with_label_values(&[self.my_addr.to_hex().as_str(), "proposed"]) + .observe(secs_since_dkg_start); + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + secs_since_dkg_start = secs_since_dkg_start, + "[DKG] aggregated transcript proposed by consensus.", + ); + } + Ok(()) + }, + _ => { + bail!("[DKG] pull notification only expected in finished state"); + }, } - Ok(()) } /// Calculate DKG config. Deal a transcript. Start broadcasting the transcript. @@ -204,49 +288,69 @@ impl DKGManager { start_time_us: u64, dkg_session_metadata: &DKGSessionMetadata, ) -> Result<()> { - self.state = match &self.state { - InnerState::NotStarted => { - let public_params = DKG::new_public_params(dkg_session_metadata); - let mut rng = if cfg!(feature = "smoke-test") { - StdRng::from_seed(self.my_addr.into_bytes()) - } else { - StdRng::from_rng(thread_rng()).unwrap() - }; - let input_secret = DKG::InputSecret::generate(&mut rng); - - let trx = DKG::generate_transcript( - &mut rng, - &public_params, - &input_secret, - self.my_index as u64, - &self.dealer_sk, - ); + ensure!( + matches!(&self.state, InnerState::NotStarted), + "transcript already dealt" + ); + let dkg_start_time = Duration::from_micros(start_time_us); + let deal_start = duration_since_epoch(); + let secs_since_dkg_start = deal_start.as_secs_f64() - dkg_start_time.as_secs_f64(); + DKG_STAGE_SECONDS + .with_label_values(&[self.my_addr.to_hex().as_str(), "deal_start"]) + .observe(secs_since_dkg_start); + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + secs_since_dkg_start = secs_since_dkg_start, + "[DKG] Deal transcript started.", + ); + let public_params = DKG::new_public_params(dkg_session_metadata); + let mut rng = if cfg!(feature = "smoke-test") { + StdRng::from_seed(self.my_addr.into_bytes()) + } else { + StdRng::from_rng(thread_rng()).unwrap() + }; + let input_secret = DKG::InputSecret::generate(&mut rng); - let dkg_transcript = DKGTranscript::new( - self.epoch_state.epoch, - self.my_addr, - bcs::to_bytes(&trx).map_err(|e| { - anyhow!("setup_deal_broadcast failed with trx serialization error: {e}") - })?, - ); + let trx = DKG::generate_transcript( + &mut rng, + &public_params, + &input_secret, + self.my_index as u64, + &self.dealer_sk, + ); - // TODO(zjma): DKG_NODE_READY metric + let my_transcript = DKGTranscript::new( + self.epoch_state.epoch, + self.my_addr, + bcs::to_bytes(&trx).map_err(|e| anyhow!("transcript serialization error: {e}"))?, + ); - let abort_handle = self.agg_trx_producer.start_produce( - self.epoch_state.clone(), - public_params.clone(), - self.agg_trx_tx.clone(), - ); + let deal_finish = duration_since_epoch(); + let secs_since_dkg_start = deal_finish.as_secs_f64() - dkg_start_time.as_secs_f64(); + DKG_STAGE_SECONDS + .with_label_values(&[self.my_addr.to_hex().as_str(), "deal_finish"]) + .observe(secs_since_dkg_start); + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + secs_since_dkg_start = secs_since_dkg_start, + "[DKG] Deal transcript finished.", + ); - // Switch to the next stage. - InnerState::InProgress { - start_time_us, - public_params, - my_transcript: dkg_transcript, - abort_handle, - } - }, - _ => unreachable!(), // `setup_deal_broadcast` is called only when DKG state is `NotStarted`. + let abort_handle = self.agg_trx_producer.start_produce( + dkg_start_time, + self.my_addr, + self.epoch_state.clone(), + public_params.clone(), + self.agg_trx_tx.clone(), + ); + + // Switch to the next stage. + self.state = InnerState::InProgress { + start_time: dkg_start_time, + my_transcript, + abort_handle, }; Ok(()) @@ -254,49 +358,75 @@ impl DKGManager { /// On a locally aggregated transcript, put it into the validator txn pool and update inner states. async fn process_aggregated_transcript(&mut self, agg_trx: DKG::Transcript) -> Result<()> { + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + "[DKG] Processing locally aggregated transcript." + ); self.state = match std::mem::take(&mut self.state) { InnerState::InProgress { - start_time_us, - my_transcript: my_node, + start_time, + my_transcript, .. } => { - // TODO(zjma): metric DKG_AGG_NODE_READY + let agg_transcript_ready_time = duration_since_epoch(); + let secs_since_dkg_start = + agg_transcript_ready_time.as_secs_f64() - start_time.as_secs_f64(); + DKG_STAGE_SECONDS + .with_label_values(&[self.my_addr.to_hex().as_str(), "agg_transcript_ready"]) + .observe(secs_since_dkg_start); + let txn = ValidatorTransaction::DKGResult(DKGTranscript { metadata: DKGTranscriptMetadata { epoch: self.epoch_state.epoch, author: self.my_addr, }, - transcript_bytes: bcs::to_bytes(&agg_trx).map_err(|e|anyhow!("process_aggregated_transcript failed with trx serialization error: {e}"))?, + transcript_bytes: bcs::to_bytes(&agg_trx) + .map_err(|e| anyhow!("transcript serialization error: {e}"))?, }); let vtxn_guard = self.vtxn_pool.put( Topic::DKG, Arc::new(txn), Some(self.pull_notification_tx.clone()), ); + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + "[DKG] aggregated transcript put into vtxn pool." + ); InnerState::Finished { vtxn_guard, - start_time_us, - my_transcript: my_node, - pull_confirmed: false, + start_time, + my_transcript, + proposed: false, } }, - _ => bail!("process agg trx failed with invalid inner state"), + _ => bail!("[DKG] aggregated transcript only expected during DKG"), }; Ok(()) } - /// On a DKG start event, execute DKG. - async fn process_dkg_start_event(&mut self, maybe_event: Option) -> Result<()> { - if let Some(event) = maybe_event { - let DKGStartEvent { - session_metadata, - start_time_us, - } = event; - ensure!(self.epoch_state.epoch == session_metadata.dealer_epoch); - self.setup_deal_broadcast(start_time_us, &session_metadata) - .await?; - } - Ok(()) + async fn process_dkg_start_event(&mut self, event: DKGStartEvent) -> Result<()> { + info!( + epoch = self.epoch_state.epoch, + my_addr = self.my_addr, + "[DKG] Processing DKGStart event." + ); + fail_point!("dkg::process_dkg_start_event"); + let DKGStartEvent { + session_metadata, + start_time_us, + } = event; + ensure!( + matches!(&self.state, InnerState::NotStarted), + "[DKG] dkg already started" + ); + ensure!( + self.epoch_state.epoch == session_metadata.dealer_epoch, + "[DKG] event not for current epoch" + ); + self.setup_deal_broadcast(start_time_us, &session_metadata) + .await } /// Process an RPC request from DKG peers. @@ -306,24 +436,17 @@ impl DKGManager { mut response_sender, .. } = req; - ensure!(msg.epoch() == self.epoch_state.epoch); + ensure!( + msg.epoch() == self.epoch_state.epoch, + "[DKG] msg not for current epoch" + ); let response = match (&self.state, &msg) { - ( - InnerState::Finished { - my_transcript: my_node, - .. - }, - DKGMessage::NodeRequest(_), - ) - | ( - InnerState::InProgress { - my_transcript: my_node, - .. - }, - DKGMessage::NodeRequest(_), - ) => Ok(DKGMessage::NodeResponse(my_node.clone())), + (InnerState::Finished { my_transcript, .. }, DKGMessage::TranscriptRequest(_)) + | (InnerState::InProgress { my_transcript, .. }, DKGMessage::TranscriptRequest(_)) => { + Ok(DKGMessage::TranscriptResponse(my_transcript.clone())) + }, _ => Err(anyhow!( - "msg {:?} unexpected in state {:?}", + "[DKG] msg {:?} unexpected in state {:?}", msg.name(), self.state.variant_name() )), diff --git a/dkg/src/dkg_manager/tests.rs b/dkg/src/dkg_manager/tests.rs index 8e48a6227df41..42ac02466185c 100644 --- a/dkg/src/dkg_manager/tests.rs +++ b/dkg/src/dkg_manager/tests.rs @@ -80,21 +80,26 @@ async fn test_dkg_state_transition() { // In state `NotStarted`, DKGManager should accept `DKGStartEvent`: // it should record start time, compute its own node, and enter state `InProgress`. - let handle_result = dkg_manager - .process_dkg_start_event(Some(DKGStartEvent { - session_metadata: DKGSessionMetadata { - dealer_epoch: 999, - dealer_validator_set: validator_consensus_info_move_structs.clone(), - target_validator_set: validator_consensus_info_move_structs.clone(), - }, - start_time_us: 1700000000000000, - })) - .await; + let start_time_1 = Duration::from_secs(1700000000); + let event = DKGStartEvent { + session_metadata: DKGSessionMetadata { + dealer_epoch: 999, + dealer_validator_set: validator_consensus_info_move_structs.clone(), + target_validator_set: validator_consensus_info_move_structs.clone(), + }, + start_time_us: start_time_1.as_micros() as u64, + }; + let handle_result = dkg_manager.process_dkg_start_event(event.clone()).await; assert!(handle_result.is_ok()); assert!( - matches!(&dkg_manager.state, InnerState::InProgress { start_time_us, my_transcript, .. } if *start_time_us == 1700000000000000 && my_transcript.metadata == DKGTranscriptMetadata{ epoch: 999, author: addrs[0]}) + matches!(&dkg_manager.state, InnerState::InProgress { start_time, my_transcript, .. } if *start_time == start_time_1 && my_transcript.metadata == DKGTranscriptMetadata{ epoch: 999, author: addrs[0]}) ); + // 2nd `DKGStartEvent` should be rejected. + let handle_result = dkg_manager.process_dkg_start_event(event).await; + println!("{:?}", handle_result); + assert!(handle_result.is_err()); + // In state `InProgress`, DKGManager should respond to `DKGNodeRequest` with its own node. let rpc_node_request = new_rpc_node_request(999, addrs[3], rpc_response_collector.clone()); let handle_result = dkg_manager.process_peer_rpc_msg(rpc_node_request).await; @@ -104,7 +109,9 @@ async fn test_dkg_state_transition() { .map(anyhow::Result::unwrap) .collect::>(); assert_eq!( - vec![DKGMessage::NodeResponse(dkg_manager.state.my_node_cloned())], + vec![DKGMessage::TranscriptResponse( + dkg_manager.state.my_node_cloned() + )], last_responses ); assert!(matches!(&dkg_manager.state, InnerState::InProgress { .. })); @@ -143,7 +150,9 @@ async fn test_dkg_state_transition() { .map(anyhow::Result::unwrap) .collect::>(); assert_eq!( - vec![DKGMessage::NodeResponse(dkg_manager.state.my_node_cloned())], + vec![DKGMessage::TranscriptResponse( + dkg_manager.state.my_node_cloned() + )], last_responses ); assert!(matches!(&dkg_manager.state, InnerState::Finished { .. })); @@ -156,7 +165,7 @@ fn new_rpc_node_request( response_collector: Arc>>>, ) -> IncomingRpcRequest { IncomingRpcRequest { - msg: DKGMessage::NodeRequest(DKGTranscriptRequest::new(epoch)), + msg: DKGMessage::TranscriptRequest(DKGTranscriptRequest::new(epoch)), sender, response_sender: Box::new(DummyRpcResponseSender::new(response_collector)), } diff --git a/dkg/src/epoch_manager.rs b/dkg/src/epoch_manager.rs index f5b210820ed2e..71634d7a8aae7 100644 --- a/dkg/src/epoch_manager.rs +++ b/dkg/src/epoch_manager.rs @@ -22,7 +22,8 @@ use aptos_types::{ dkg::{DKGStartEvent, DKGState, DKGTrait, DefaultDKG}, epoch_state::EpochState, on_chain_config::{ - FeatureFlag, Features, OnChainConfigPayload, OnChainConfigProvider, ValidatorSet, + FeatureFlag, Features, OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig, + ValidatorSet, }, }; use aptos_validator_transaction_pool::VTxnPoolState; @@ -45,7 +46,7 @@ pub struct EpochManager { dkg_rpc_msg_tx: Option>, dkg_manager_close_tx: Option>>, - dkg_start_event_tx: Option>, + dkg_start_event_tx: Option>, vtxn_pool: VTxnPoolState, // Network utils @@ -93,13 +94,13 @@ impl EpochManager

{ } fn on_dkg_start_notification(&mut self, notification: EventNotification) -> Result<()> { - if let Some(tx) = self.dkg_start_event_tx.take() { + if let Some(tx) = self.dkg_start_event_tx.as_ref() { let EventNotification { subscribed_events, .. } = notification; for event in subscribed_events { if let Ok(dkg_start_event) = DKGStartEvent::try_from(&event) { - let _ = tx.send(dkg_start_event); + let _ = tx.push((), dkg_start_event); return Ok(()); } else { debug!("[DKG] on_dkg_start_notification: failed in converting a contract event to a dkg start event!"); @@ -157,11 +158,16 @@ impl EpochManager

{ .copied(); let features = payload.get::().unwrap_or_default(); + let onchain_consensus_config: anyhow::Result = payload.get(); + if let Err(error) = &onchain_consensus_config { + error!("Failed to read on-chain consensus config {}", error); + } + let consensus_config = onchain_consensus_config.unwrap_or_default(); - if let (true, Some(my_index)) = ( - features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG), - my_index, - ) { + // Check both validator txn and DKG features are enabled + let randomness_enabled = consensus_config.is_vtxn_enabled() + && features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG); + if let (true, Some(my_index)) = (randomness_enabled, my_index) { let DKGState { in_progress: in_progress_session, .. @@ -178,7 +184,8 @@ impl EpochManager

{ ); let agg_trx_producer = AggTranscriptProducer::new(rb); - let (dkg_start_event_tx, dkg_start_event_rx) = oneshot::channel(); + let (dkg_start_event_tx, dkg_start_event_rx) = + aptos_channel::new(QueueStyle::KLAST, 1, None); self.dkg_start_event_tx = Some(dkg_start_event_tx); let (dkg_rpc_msg_tx, dkg_rpc_msg_rx) = aptos_channel::new::< diff --git a/dkg/src/lib.rs b/dkg/src/lib.rs index 9e62b71c8e35c..4b694eb980ce0 100644 --- a/dkg/src/lib.rs +++ b/dkg/src/lib.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +mod agg_trx_producer; mod counters; mod dkg_manager; pub mod epoch_manager; @@ -49,5 +50,3 @@ pub fn start_dkg_runtime( runtime.spawn(dkg_epoch_manager.start(network_receiver)); runtime } - -pub mod agg_trx_producer; diff --git a/dkg/src/transcript_aggregation/mod.rs b/dkg/src/transcript_aggregation/mod.rs index f0d896b2dfbf7..62d47817e17f4 100644 --- a/dkg/src/transcript_aggregation/mod.rs +++ b/dkg/src/transcript_aggregation/mod.rs @@ -1,16 +1,18 @@ // Copyright © Aptos Foundation -use crate::{types::DKGTranscriptRequest, DKGMessage}; -use anyhow::ensure; +use crate::{counters::DKG_STAGE_SECONDS, types::DKGTranscriptRequest, DKGMessage}; +use anyhow::{anyhow, ensure}; use aptos_consensus_types::common::Author; -use aptos_infallible::Mutex; +use aptos_infallible::{duration_since_epoch, Mutex}; +use aptos_logger::info; use aptos_reliable_broadcast::BroadcastStatus; use aptos_types::{ dkg::{DKGTrait, DKGTranscript}, epoch_state::EpochState, + validator_verifier::VerifyError, }; use move_core_types::account_address::AccountAddress; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, sync::Arc, time::Duration}; pub struct TranscriptAggregator { pub contributors: HashSet, @@ -27,15 +29,26 @@ impl Default for TranscriptAggregator { } pub struct TranscriptAggregationState { + start_time: Duration, + my_addr: AccountAddress, + valid_peer_transcript_seen: bool, trx_aggregator: Mutex>, dkg_pub_params: DKG::PublicParams, epoch_state: Arc, } impl TranscriptAggregationState { - pub fn new(dkg_pub_params: DKG::PublicParams, epoch_state: Arc) -> Self { + pub fn new( + start_time: Duration, + my_addr: AccountAddress, + dkg_pub_params: DKG::PublicParams, + epoch_state: Arc, + ) -> Self { //TODO(zjma): take DKG threshold as a parameter. Self { + start_time, + my_addr, + valid_peer_transcript_seen: false, trx_aggregator: Mutex::new(TranscriptAggregator::default()), dkg_pub_params, epoch_state, @@ -59,33 +72,79 @@ impl BroadcastStatus for Arc Some(*x), + Err(VerifyError::TooLittleVotingPower { voting_power, .. }) => Some(*voting_power), + _ => None, + }; + let maybe_aggregated = power_check_result .ok() - .map(|_aggregated_voting_power| trx_aggregator.trx.clone().unwrap()); + .map(|_| trx_aggregator.trx.clone().unwrap()); + info!( + epoch = self.epoch_state.epoch, + peer = sender, + is_self = is_self, + peer_power = peer_power, + new_total_power = new_total_power, + threshold = threshold, + threshold_exceeded = maybe_aggregated.is_some(), + "[DKG] added transcript from validator {}, {} out of {} aggregated.", + self.epoch_state + .verifier + .address_to_validator_index() + .get(&sender) + .unwrap(), + new_total_power.unwrap_or(0), + threshold + ); Ok(maybe_aggregated) } } diff --git a/dkg/src/transcript_aggregation/tests.rs b/dkg/src/transcript_aggregation/tests.rs index eeb2e34dcfb50..96f6c0308ddfa 100644 --- a/dkg/src/transcript_aggregation/tests.rs +++ b/dkg/src/transcript_aggregation/tests.rs @@ -2,6 +2,7 @@ use crate::transcript_aggregation::TranscriptAggregationState; use aptos_crypto::{bls12381::bls12381_keys, Uniform}; +use aptos_infallible::duration_since_epoch; use aptos_reliable_broadcast::BroadcastStatus; use aptos_types::{ dkg::{ @@ -23,6 +24,7 @@ fn test_transcript_aggregation_state() { let addrs: Vec = (0..num_validators) .map(|_| AccountAddress::random()) .collect(); + let vfn_addr = AccountAddress::random(); let private_keys: Vec = (0..num_validators) .map(|_| bls12381_keys::PrivateKey::generate_for_testing()) .collect(); @@ -46,6 +48,8 @@ fn test_transcript_aggregation_state() { }); let epoch_state = Arc::new(EpochState { epoch, verifier }); let trx_agg_state = Arc::new(TranscriptAggregationState::::new( + duration_since_epoch(), + addrs[0], pub_params, epoch_state, )); @@ -73,6 +77,16 @@ fn test_transcript_aggregation_state() { }); assert!(result.is_err()); + // Node authored by non-active-validator should be rejected. + let result = trx_agg_state.add(vfn_addr, DKGTranscript { + metadata: DKGTranscriptMetadata { + epoch: 999, + author: vfn_addr, + }, + transcript_bytes: good_trx_bytes.clone(), + }); + assert!(result.is_err()); + // Node with invalid transcript should be rejected. let mut bad_trx_bytes = good_trx_bytes.clone(); bad_trx_bytes[0] = 0xAB; diff --git a/dkg/src/types.rs b/dkg/src/types.rs index 29172e48e05ad..928b659027278 100644 --- a/dkg/src/types.rs +++ b/dkg/src/types.rs @@ -24,22 +24,22 @@ impl DKGTranscriptRequest { /// The DKG network message. #[derive(Clone, Serialize, Deserialize, Debug, EnumConversion, PartialEq)] pub enum DKGMessage { - NodeRequest(DKGTranscriptRequest), - NodeResponse(DKGTranscript), + TranscriptRequest(DKGTranscriptRequest), + TranscriptResponse(DKGTranscript), } impl DKGMessage { pub fn epoch(&self) -> u64 { match self { - DKGMessage::NodeRequest(request) => request.dealer_epoch, - DKGMessage::NodeResponse(response) => response.metadata.epoch, + DKGMessage::TranscriptRequest(request) => request.dealer_epoch, + DKGMessage::TranscriptResponse(response) => response.metadata.epoch, } } pub fn name(&self) -> &str { match self { - DKGMessage::NodeRequest(_) => "DKGTranscriptRequest", - DKGMessage::NodeResponse(_) => "DKGTranscriptResponse", + DKGMessage::TranscriptRequest(_) => "DKGTranscriptRequest", + DKGMessage::TranscriptResponse(_) => "DKGTranscriptResponse", } } }