diff --git a/config/src/config/identity_config.rs b/config/src/config/identity_config.rs index 0d2142574d8340..9fcfd3aec04227 100644 --- a/config/src/config/identity_config.rs +++ b/config/src/config/identity_config.rs @@ -2,14 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{config::SecureBackend, keys::ConfigKey}; +use anyhow::anyhow; use aptos_crypto::{ bls12381, ed25519::Ed25519PrivateKey, x25519::{self, PRIVATE_KEY_SIZE}, ValidCryptoMaterial, }; -use aptos_types::account_address::{ - from_identity_public_key, AccountAddress, AccountAddress as PeerId, +use aptos_types::{ + account_address::{from_identity_public_key, AccountAddress, AccountAddress as PeerId}, + dkg::{real_dkg::maybe_dk_from_bls_sk, DKGTrait, DefaultDKG}, }; use serde::{Deserialize, Serialize}; use std::{ @@ -43,6 +45,21 @@ impl IdentityBlob { let mut file = File::open(path)?; Ok(file.write_all(serde_yaml::to_string(self)?.as_bytes())?) } + + pub fn try_into_dkg_dealer_private_key( + self, + ) -> Option<::DealerPrivateKey> { + self.consensus_private_key + } + + pub fn try_into_dkg_new_validator_decrypt_key( + self, + ) -> anyhow::Result<::NewValidatorDecryptKey> { + let consensus_sk = self.consensus_private_key.as_ref().ok_or_else(|| { + anyhow!("try_into_dkg_new_validator_decrypt_key failed with missing consensus key") + })?; + maybe_dk_from_bls_sk(consensus_sk) + } } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index a60f38c5656f52..610525f37757fd 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -10,6 +10,7 @@ use crate::{ persistent_liveness_storage::StorageWriteProxy, pipeline::execution_client::ExecutionProxyClient, quorum_store::quorum_store_db::QuorumStoreDB, + rand::rand_gen::storage::db::RandDb, state_computer::ExecutionProxy, transaction_filter::TransactionFilter, txn_notifier::MempoolNotifier, @@ -66,13 +67,16 @@ pub fn start_consensus( aptos_channels::new_unbounded(&counters::PENDING_SELF_MESSAGES); let consensus_network_client = ConsensusNetworkClient::new(network_client); let bounded_executor = BoundedExecutor::new(32, runtime.handle().clone()); + let rand_storage = Arc::new(RandDb::new(node_config.storage.dir())); let execution_client = Arc::new(ExecutionProxyClient::new( + node_config.consensus.clone(), Arc::new(execution_proxy), node_config.validator_network.as_ref().unwrap().peer_id(), self_sender.clone(), consensus_network_client.clone(), bounded_executor.clone(), + rand_storage.clone(), )); let epoch_mgr = EpochManager::new( @@ -89,6 +93,7 @@ pub fn start_consensus( bounded_executor, aptos_time_service::TimeService::real(), vtxn_pool, + rand_storage, ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 88b1bfd23c6ba6..4838b756b9cc94 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -29,7 +29,7 @@ use crate::{ monitor, network::{ IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest, - IncomingRpcRequest, NetworkReceivers, NetworkSender, + IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender, }, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_client::{ @@ -43,16 +43,20 @@ use crate::{ quorum_store_coordinator::CoordinatorCommand, quorum_store_db::QuorumStoreStorage, }, + rand::rand_gen::{ + storage::interface::RandStorage, + types::{AugmentedData, RandConfig}, + }, recovery_manager::RecoveryManager, round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent}, util::time_service::TimeService, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context}; use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{ ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig, QcAggregatorType, - SecureBackend, + SafetyRulesConfig, SecureBackend, }; use aptos_consensus_types::{ common::{Author, Round}, @@ -60,6 +64,11 @@ use aptos_consensus_types::{ epoch_retrieval::EpochRetrievalRequest, proof_of_store::{BatchInfo, ProofCache}, }; +use aptos_crypto::bls12381; +use aptos_dkg::{ + pvss::{traits::Transcript, Player}, + weighted_vuf::traits::WeightedVUF, +}; use aptos_event_notifications::ReconfigNotificationListener; use aptos_global_constants::CONSENSUS_KEY; use aptos_infallible::{duration_since_epoch, Mutex}; @@ -70,12 +79,14 @@ use aptos_safety_rules::SafetyRulesManager; use aptos_secure_storage::{KVStorage, Storage}; use aptos_types::{ account_address::AccountAddress, + dkg::{real_dkg::maybe_dk_from_bls_sk, DKGState, DKGTrait, DefaultDKG}, epoch_change::EpochChangeProof, epoch_state::EpochState, on_chain_config::{ - Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider, + FeatureFlag, Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig, OnChainExecutionConfig, ProposerElectionType, ValidatorSet, }, + randomness::{RandKeys, WvufPP, WVUF}, validator_signer::ValidatorSigner, }; use aptos_validator_transaction_pool::VTxnPoolState; @@ -90,6 +101,7 @@ use futures::{ }; use itertools::Itertools; use mini_moka::sync::Cache; +use rand::{prelude::StdRng, thread_rng, SeedableRng}; use std::{ cmp::Ordering, collections::HashMap, @@ -130,6 +142,8 @@ pub struct EpochManager { safety_rules_manager: SafetyRulesManager, vtxn_pool: VTxnPoolState, reconfig_events: ReconfigNotificationListener

, + // channels to rand manager + rand_manager_msg_tx: Option>, // channels to round manager round_manager_tx: Option< aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, @@ -154,6 +168,7 @@ pub struct EpochManager { dag_config: DagConsensusConfig, payload_manager: Arc, proof_cache: ProofCache, + rand_storage: Arc>, } impl EpochManager

{ @@ -171,6 +186,7 @@ impl EpochManager

{ bounded_executor: BoundedExecutor, aptos_time_service: aptos_time_service::TimeService, vtxn_pool: VTxnPoolState, + rand_storage: Arc>, ) -> Self { let author = node_config.validator_network.as_ref().unwrap().peer_id(); let config = node_config.consensus.clone(); @@ -194,6 +210,7 @@ impl EpochManager

{ safety_rules_manager, vtxn_pool, reconfig_events, + rand_manager_msg_tx: None, round_manager_tx: None, round_manager_close_tx: None, buffered_proposal_tx: None, @@ -215,6 +232,7 @@ impl EpochManager

{ .initial_capacity(1_000) .time_to_live(Duration::from_secs(10)) .build(), + rand_storage, } } @@ -582,6 +600,9 @@ impl EpochManager

{ } self.dag_shutdown_tx = None; + // Shutdown the previous rand manager + self.rand_manager_msg_tx = None; + // Shutdown the previous buffer manager, to release the SafetyRule client self.execution_client.end_epoch().await; @@ -713,7 +734,9 @@ impl EpochManager

{ network_sender: Arc, payload_client: Arc, payload_manager: Arc, + rand_config: Option, features: Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; info!( @@ -759,8 +782,11 @@ impl EpochManager

{ epoch_state.clone(), safety_rules_container.clone(), payload_manager.clone(), + &onchain_consensus_config, &onchain_execution_config, &features, + rand_config, + rand_msg_rx, ) .await; @@ -823,7 +849,7 @@ impl EpochManager

{ onchain_consensus_config, buffered_proposal_tx, self.config.clone(), - features, + features.clone(), true, ); @@ -859,6 +885,98 @@ impl EpochManager

{ ) } + fn try_get_rand_config_for_new_epoch( + &self, + new_epoch_state: &EpochState, + features: &Features, + maybe_dkg_state: anyhow::Result, + consensus_config: &OnChainConsensusConfig, + ) -> Result { + if !consensus_config.is_vtxn_enabled() { + return Err(NoRandomnessReason::VTxnDisabled); + } + if !features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG) { + return Err(NoRandomnessReason::FeatureDisabled); + } + let new_epoch = new_epoch_state.epoch; + + let dkg_state = maybe_dkg_state.map_err(NoRandomnessReason::DKGStateResourceMissing)?; + let dkg_session = dkg_state + .last_completed + .ok_or_else(|| NoRandomnessReason::DKGCompletedSessionResourceMissing)?; + if dkg_session.metadata.dealer_epoch + 1 != new_epoch_state.epoch { + return Err(NoRandomnessReason::CompletedSessionTooOld); + } + let dkg_pub_params = DefaultDKG::new_public_params(&dkg_session.metadata); + let my_index = new_epoch_state + .verifier + .address_to_validator_index() + .get(&self.author) + .copied() + .ok_or_else(|| NoRandomnessReason::NotInValidatorSet)?; + + let dkg_decrypt_key = load_dkg_decrypt_key(&self.config.safety_rules) + .ok_or_else(|| NoRandomnessReason::DKGDecryptKeyUnavailable)?; + let transcript = bcs::from_bytes::<::Transcript>( + dkg_session.transcript.as_slice(), + ) + .map_err(NoRandomnessReason::TranscriptDeserializationError)?; + + let vuf_pp = WvufPP::from(&dkg_pub_params.pvss_config.pp); + + // No need to verify the transcript. + + // keys for randomness generation + let (sk, pk) = DefaultDKG::decrypt_secret_share_from_transcript( + &dkg_pub_params, + &transcript, + my_index as u64, + &dkg_decrypt_key, + ) + .map_err(NoRandomnessReason::SecretShareDecryptionFailed)?; + + let pk_shares = (0..new_epoch_state.verifier.len()) + .map(|id| { + transcript.get_public_key_share(&dkg_pub_params.pvss_config.wconfig, &Player { id }) + }) + .collect::>(); + + // Recover existing augmented key pair or generate a new one + let (ask, apk) = if let Some((_, key_pair)) = self + .rand_storage + .get_key_pair_bytes() + .map_err(NoRandomnessReason::RandDbNotAvailable)? + .filter(|(epoch, _)| *epoch == new_epoch) + { + bcs::from_bytes(&key_pair).map_err(NoRandomnessReason::KeyPairDeserializationError)? + } else { + let mut rng = + StdRng::from_rng(thread_rng()).map_err(NoRandomnessReason::RngCreationError)?; + let augmented_key_pair = WVUF::augment_key_pair(&vuf_pp, sk, pk, &mut rng); + self.rand_storage + .save_key_pair_bytes( + new_epoch, + bcs::to_bytes(&augmented_key_pair) + .map_err(NoRandomnessReason::KeyPairSerializationError)?, + ) + .map_err(NoRandomnessReason::KeyPairPersistError)?; + augmented_key_pair + }; + + let keys = RandKeys::new(ask, apk, pk_shares, new_epoch_state.verifier.len()); + + let rand_config = RandConfig::new( + self.author, + new_epoch, + new_epoch_state.verifier.clone(), + vuf_pp, + keys, + dkg_pub_params.pvss_config.wconfig.clone(), + ); + + Ok(rand_config) + } + async fn start_new_epoch(&mut self, payload: OnChainConfigPayload

) { let validator_set: ValidatorSet = payload .get() @@ -868,9 +986,12 @@ impl EpochManager

{ verifier: (&validator_set).into(), }); + self.epoch_state = Some(epoch_state.clone()); + let onchain_consensus_config: anyhow::Result = payload.get(); let onchain_execution_config: anyhow::Result = payload.get(); let features = payload.get::(); + let dkg_state = payload.get::(); if let Err(error) = &onchain_consensus_config { error!("Failed to read on-chain consensus config {}", error); @@ -891,10 +1012,30 @@ impl EpochManager

{ .unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing()); let features = features.unwrap_or_default(); + let rand_config = self.try_get_rand_config_for_new_epoch( + &epoch_state, + &features, + dkg_state, + &consensus_config, + ); + info!( + "[Randomness] start_new_epoch: epoch={}, rand_config={:?}, ", + epoch_state.epoch, rand_config + ); // The sk inside has `SlientDebug`. + let rand_config = rand_config.ok(); + let (network_sender, payload_client, payload_manager) = self .initialize_shared_component(&epoch_state, &consensus_config) .await; + let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::( + QueueStyle::FIFO, + 100, + None, + ); + + self.rand_manager_msg_tx = Some(rand_msg_tx); + if consensus_config.is_dag_enabled() { self.start_new_epoch_with_dag( epoch_state, @@ -903,7 +1044,9 @@ impl EpochManager

{ network_sender, payload_client, payload_manager, - features, + rand_config, + &features, + rand_msg_rx, ) .await } else { @@ -914,7 +1057,9 @@ impl EpochManager

{ network_sender, payload_client, payload_manager, - features, + rand_config, + &features, + rand_msg_rx, ) .await } @@ -954,7 +1099,9 @@ impl EpochManager

{ network_sender: NetworkSender, payload_client: Arc, payload_manager: Arc, - features: Features, + rand_config: Option, + features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { @@ -967,7 +1114,9 @@ impl EpochManager

{ Arc::new(network_sender), payload_client, payload_manager, - features, + rand_config, + features.clone(), + rand_msg_rx, ) .await }, @@ -992,12 +1141,15 @@ impl EpochManager

{ network_sender: NetworkSender, payload_client: Arc, payload_manager: Arc, - features: Features, + rand_config: Option, + features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; - - let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); - let commit_signer = Arc::new(DagCommitSigner::new(signer)); + let consensus_key = new_consensus_key_from_storage(&self.config.safety_rules.backend) + .expect("unable to get private key"); + let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); + let commit_signer = Arc::new(DagCommitSigner::new(signer.clone())); assert!( onchain_consensus_config.decoupled_execution(), @@ -1009,8 +1161,11 @@ impl EpochManager

{ epoch_state.clone(), commit_signer, payload_manager.clone(), + &onchain_consensus_config, &on_chain_execution_config, - &features, + features, + rand_config, + rand_msg_rx, ) .await; @@ -1028,7 +1183,6 @@ impl EpochManager

{ self.storage.aptos_db(), )); - let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); let network_sender_arc = Arc::new(network_sender); let bootstrapper = DagBootstrapper::new( @@ -1049,7 +1203,7 @@ impl EpochManager

{ onchain_consensus_config.quorum_store_enabled(), onchain_consensus_config.effective_validator_txn_config(), self.bounded_executor.clone(), - features, + features.clone(), ); let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None); @@ -1337,7 +1491,13 @@ impl EpochManager

{ IncomingRpcRequest::CommitRequest(request) => { self.execution_client.send_commit_msg(peer_id, request) }, - IncomingRpcRequest::RandGenRequest(_) => Ok(()), + IncomingRpcRequest::RandGenRequest(request) => { + if let Some(tx) = &self.rand_manager_msg_tx { + tx.push(peer_id, request) + } else { + bail!("Rand manager not started"); + } + }, } } @@ -1408,15 +1568,69 @@ impl EpochManager

{ } } -#[allow(dead_code)] -fn new_signer_from_storage(author: Author, backend: &SecureBackend) -> Arc { +fn new_consensus_key_from_storage(backend: &SecureBackend) -> anyhow::Result { let storage: Storage = backend.into(); - if let Err(error) = storage.available() { - panic!("Storage is not available: {:?}", error); - } - let private_key = storage + storage + .available() + .map_err(|e| anyhow!("Storage is not available: {e}"))?; + storage .get(CONSENSUS_KEY) .map(|v| v.value) - .expect("Unable to get private key"); - Arc::new(ValidatorSigner::new(author, private_key)) + .map_err(|e| anyhow!("storage get and map err: {e}")) +} + +fn load_dkg_decrypt_key_from_identity_blob( + config: &SafetyRulesConfig, +) -> anyhow::Result<::NewValidatorDecryptKey> { + let identity_blob = config.initial_safety_rules_config.identity_blob()?; + identity_blob.try_into_dkg_new_validator_decrypt_key() +} + +fn load_dkg_decrypt_key_from_secure_storage( + config: &SafetyRulesConfig, +) -> anyhow::Result<::NewValidatorDecryptKey> { + let consensus_key = new_consensus_key_from_storage(&config.backend)?; + maybe_dk_from_bls_sk(&consensus_key) +} + +fn load_dkg_decrypt_key( + config: &SafetyRulesConfig, +) -> Option<::NewValidatorDecryptKey> { + match load_dkg_decrypt_key_from_secure_storage(config) { + Ok(dk) => { + return Some(dk); + }, + Err(e) => { + warn!("{e}"); + }, + } + + match load_dkg_decrypt_key_from_identity_blob(config) { + Ok(dk) => { + return Some(dk); + }, + Err(e) => { + warn!("{e}"); + }, + } + + None +} + +#[derive(Debug)] +enum NoRandomnessReason { + VTxnDisabled, + FeatureDisabled, + DKGStateResourceMissing(anyhow::Error), + DKGCompletedSessionResourceMissing, + CompletedSessionTooOld, + NotInValidatorSet, + DKGDecryptKeyUnavailable, + TranscriptDeserializationError(bcs::Error), + SecretShareDecryptionFailed(anyhow::Error), + RngCreationError(rand::Error), + RandDbNotAvailable(anyhow::Error), + KeyPairDeserializationError(bcs::Error), + KeyPairSerializationError(bcs::Error), + KeyPairPersistError(anyhow::Error), } diff --git a/consensus/src/pipeline/errors.rs b/consensus/src/pipeline/errors.rs index cdb8396bb00a7e..746228438ecebe 100644 --- a/consensus/src/pipeline/errors.rs +++ b/consensus/src/pipeline/errors.rs @@ -15,4 +15,6 @@ pub enum Error { VerificationError, #[error("Reset host dropped")] ResetDropped, + #[error("Rand Reset host dropped")] + RandResetDropped, } diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs index fdeacae3dae036..fcc81ab66441a9 100644 --- a/consensus/src/pipeline/execution_client.rs +++ b/consensus/src/pipeline/execution_client.rs @@ -5,7 +5,7 @@ use crate::{ counters, error::StateSyncError, - network::{IncomingCommitRequest, NetworkSender}, + network::{IncomingCommitRequest, IncomingRandGenRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_manager::PayloadManager, pipeline::{ @@ -14,6 +14,11 @@ use crate::{ errors::Error, signing_phase::CommitSignerProvider, }, + rand::rand_gen::{ + rand_manager::RandManager, + storage::interface::RandStorage, + types::{AugmentedData, RandConfig, Share}, + }, state_computer::ExecutionProxy, state_replication::{StateComputer, StateComputerCommitCallBackType}, transaction_deduper::create_transaction_deduper, @@ -22,15 +27,18 @@ use crate::{ use anyhow::Result; use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_config::config::ConsensusConfig; use aptos_consensus_types::{common::Author, pipelined_block::PipelinedBlock}; use aptos_executor_types::ExecutorResult; use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_network::{application::interface::NetworkClient, protocols::network::Event}; +use aptos_safety_rules::safety_rules_manager::load_consensus_key_from_secure_storage; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - on_chain_config::{FeatureFlag, Features, OnChainExecutionConfig}, + on_chain_config::{FeatureFlag, Features, OnChainConsensusConfig, OnChainExecutionConfig}, + validator_signer::ValidatorSigner, }; use fail::fail_point; use futures::{ @@ -49,8 +57,11 @@ pub trait TExecutionClient: Send + Sync { epoch_state: Arc, commit_signer_provider: Arc, payload_manager: Arc, + onchain_consensus_config: &OnChainConsensusConfig, onchain_execution_config: &OnChainExecutionConfig, features: &Features, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ); /// This is needed for some DAG tests. Clean this up as a TODO. @@ -80,7 +91,8 @@ pub trait TExecutionClient: Send + Sync { struct BufferManagerHandle { pub execute_tx: Option>, pub commit_tx: Option>, - pub reset_tx: Option>, + pub reset_tx_to_buffer_manager: Option>, + pub reset_tx_to_rand_manager: Option>, } impl BufferManagerHandle { @@ -88,7 +100,8 @@ impl BufferManagerHandle { Self { execute_tx: None, commit_tx: None, - reset_tx: None, + reset_tx_to_buffer_manager: None, + reset_tx_to_rand_manager: None, } } @@ -96,23 +109,31 @@ impl BufferManagerHandle { &mut self, execute_tx: UnboundedSender, commit_tx: aptos_channel::Sender, - reset_tx: UnboundedSender, + reset_tx_to_buffer_manager: UnboundedSender, + reset_tx_to_rand_manager: Option>, ) { self.execute_tx = Some(execute_tx); self.commit_tx = Some(commit_tx); - self.reset_tx = Some(reset_tx); + self.reset_tx_to_buffer_manager = Some(reset_tx_to_buffer_manager); + self.reset_tx_to_rand_manager = reset_tx_to_rand_manager; } - pub fn reset(&mut self) -> Option> { - let reset_tx = self.reset_tx.take(); + pub fn reset( + &mut self, + ) -> ( + Option>, + Option>, + ) { + let reset_tx_to_rand_manager = self.reset_tx_to_rand_manager.take(); + let reset_tx_to_buffer_manager = self.reset_tx_to_buffer_manager.take(); self.execute_tx = None; self.commit_tx = None; - self.reset_tx = None; - reset_tx + (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) } } pub struct ExecutionProxyClient { + consensus_config: ConsensusConfig, execution_proxy: Arc, author: Author, self_sender: aptos_channels::UnboundedSender>, @@ -120,23 +141,28 @@ pub struct ExecutionProxyClient { bounded_executor: BoundedExecutor, // channels to buffer manager handle: Arc>, + rand_storage: Arc>, } impl ExecutionProxyClient { pub fn new( + consensus_config: ConsensusConfig, execution_proxy: Arc, author: Author, self_sender: aptos_channels::UnboundedSender>, network_sender: ConsensusNetworkClient>, bounded_executor: BoundedExecutor, + rand_storage: Arc>, ) -> Self { Self { + consensus_config, execution_proxy, author, self_sender, network_sender, bounded_executor, handle: Arc::new(RwLock::new(BufferManagerHandle::new())), + rand_storage, } } @@ -144,6 +170,8 @@ impl ExecutionProxyClient { &self, commit_signer_provider: Arc, epoch_state: Arc, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ) { let network_sender = NetworkSender::new( self.author, @@ -152,8 +180,7 @@ impl ExecutionProxyClient { epoch_state.verifier.clone(), ); - let (block_tx, block_rx) = unbounded::(); - let (reset_tx, reset_rx) = unbounded::(); + let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::(); let (commit_msg_tx, commit_msg_rx) = aptos_channel::new::( @@ -162,7 +189,51 @@ impl ExecutionProxyClient { Some(&counters::BUFFER_MANAGER_MSGS), ); - self.handle.write().init(block_tx, commit_msg_tx, reset_tx); + let (execution_ready_block_tx, execution_ready_block_rx, maybe_reset_tx_to_rand_manager) = + if let Some(rand_config) = rand_config { + let (ordered_block_tx, ordered_block_rx) = unbounded::(); + let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::(); + + let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::(); + let consensus_key = + load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules) + .expect("Failed in loading consensus key for ExecutionProxyClient."); + let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); + + let rand_manager = RandManager::::new( + self.author, + epoch_state.clone(), + signer, + rand_config, + rand_ready_block_tx, + Arc::new(network_sender.clone()), + self.rand_storage.clone(), + self.bounded_executor.clone(), + ); + + tokio::spawn(rand_manager.start( + ordered_block_rx, + rand_msg_rx, + reset_rand_manager_rx, + self.bounded_executor.clone(), + )); + + ( + ordered_block_tx, + rand_ready_block_rx, + Some(reset_tx_to_rand_manager), + ) + } else { + let (ordered_block_tx, ordered_block_rx) = unbounded(); + (ordered_block_tx, ordered_block_rx, None) + }; + + self.handle.write().init( + execution_ready_block_tx, + commit_msg_tx, + reset_buffer_manager_tx, + maybe_reset_tx_to_rand_manager, + ); let ( execution_schedule_phase, @@ -177,8 +248,8 @@ impl ExecutionProxyClient { network_sender, commit_msg_rx, self.execution_proxy.clone(), - block_rx, - reset_rx, + execution_ready_block_rx, + reset_buffer_manager_rx, epoch_state, self.bounded_executor.clone(), ); @@ -198,10 +269,18 @@ impl TExecutionClient for ExecutionProxyClient { epoch_state: Arc, commit_signer_provider: Arc, payload_manager: Arc, + onchain_consensus_config: &OnChainConsensusConfig, onchain_execution_config: &OnChainExecutionConfig, features: &Features, + rand_config: Option, + rand_msg_rx: aptos_channel::Receiver, ) { - self.spawn_decoupled_execution(commit_signer_provider, epoch_state.clone()); + let maybe_rand_msg_tx = self.spawn_decoupled_execution( + commit_signer_provider, + epoch_state.clone(), + rand_config, + rand_msg_rx, + ); let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); @@ -209,14 +288,18 @@ impl TExecutionClient for ExecutionProxyClient { onchain_execution_config.block_executor_onchain_config(); let transaction_deduper = create_transaction_deduper(onchain_execution_config.transaction_deduper_type()); + let randomness_enabled = onchain_consensus_config.is_vtxn_enabled() + && features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG); self.execution_proxy.new_epoch( &epoch_state, payload_manager, transaction_shuffler, block_executor_onchain_config, transaction_deduper, - features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG), + randomness_enabled, ); + + maybe_rand_msg_tx } fn get_execution_channel(&self) -> Option> { @@ -280,9 +363,27 @@ impl TExecutionClient for ExecutionProxyClient { Err(anyhow::anyhow!("Injected error in sync_to").into()) }); - let reset_tx = self.handle.read().reset_tx.clone(); + let (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) = { + let handle = self.handle.read(); + ( + handle.reset_tx_to_rand_manager.clone(), + handle.reset_tx_to_buffer_manager.clone(), + ) + }; - if let Some(mut reset_tx) = reset_tx { + if let Some(mut reset_tx) = reset_tx_to_rand_manager { + let (ack_tx, ack_rx) = oneshot::channel::(); + reset_tx + .send(ResetRequest { + tx: ack_tx, + signal: ResetSignal::TargetRound(target.commit_info().round()), + }) + .await + .map_err(|_| Error::RandResetDropped)?; + ack_rx.await.map_err(|_| Error::RandResetDropped)?; + } + + if let Some(mut reset_tx) = reset_tx_to_buffer_manager { // reset execution phase and commit phase let (tx, rx) = oneshot::channel::(); reset_tx @@ -302,8 +403,25 @@ impl TExecutionClient for ExecutionProxyClient { } async fn end_epoch(&self) { - let reset_tx = self.handle.write().reset(); - if let Some(mut tx) = reset_tx { + let (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) = { + let mut handle = self.handle.write(); + handle.reset() + }; + + if let Some(mut tx) = reset_tx_to_rand_manager { + let (ack_tx, ack_rx) = oneshot::channel(); + tx.send(ResetRequest { + tx: ack_tx, + signal: ResetSignal::Stop, + }) + .await + .expect("[EpochManager] Fail to drop rand manager"); + ack_rx + .await + .expect("[EpochManager] Fail to drop rand manager"); + } + + if let Some(mut tx) = reset_tx_to_buffer_manager { let (ack_tx, ack_rx) = oneshot::channel(); tx.send(ResetRequest { tx: ack_tx, @@ -328,8 +446,11 @@ impl TExecutionClient for DummyExecutionClient { _epoch_state: Arc, _commit_signer_provider: Arc, _payload_manager: Arc, + _onchain_consensus_config: &OnChainConsensusConfig, _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, + _rand_config: Option, + _rand_msg_rx: aptos_channel::Receiver, ) { } diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index 82d30670b84a86..579812c75207b0 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -4,16 +4,18 @@ use crate::{ error::StateSyncError, - network::IncomingCommitRequest, + network::{IncomingCommitRequest, IncomingRandGenRequest}, payload_manager::PayloadManager, pipeline::{ buffer_manager::OrderedBlocks, execution_client::TExecutionClient, signing_phase::CommitSignerProvider, }, + rand::rand_gen::types::RandConfig, state_replication::StateComputerCommitCallBackType, test_utils::mock_storage::MockStorage, }; use anyhow::{format_err, Result}; +use aptos_channels::aptos_channel; use aptos_consensus_types::{common::Payload, pipelined_block::PipelinedBlock}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; @@ -22,7 +24,7 @@ use aptos_logger::prelude::*; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - on_chain_config::{Features, OnChainExecutionConfig}, + on_chain_config::{Features, OnChainConsensusConfig, OnChainExecutionConfig}, transaction::SignedTransaction, }; use futures::{channel::mpsc, SinkExt}; @@ -92,8 +94,11 @@ impl TExecutionClient for MockExecutionClient { _epoch_state: Arc, _commit_signer_provider: Arc, _payload_manager: Arc, + _onchain_consensus_config: &OnChainConsensusConfig, _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, + _rand_config: Option, + _rand_msg_rx: aptos_channel::Receiver, ) { } diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index b47bc28997db24..3456cd3b58357b 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -11,6 +11,7 @@ use crate::{ payload_manager::PayloadManager, pipeline::buffer_manager::OrderedBlocks, quorum_store::quorum_store_db::MockQuorumStoreDB, + rand::rand_gen::storage::in_memory::InMemRandDb, test_utils::{mock_execution_client::MockExecutionClient, MockStorage}, util::time_service::ClockTimeService, }; @@ -162,6 +163,7 @@ impl SMRNode { bounded_executor, aptos_time_service::TimeService::real(), vtxn_pool, + Arc::new(InMemRandDb::new()), ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver);