diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index f6bfa4a0a82f66..5cca509e41496a 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -727,6 +727,7 @@ impl EpochManager

{ payload_manager: Arc, rand_config: Option, features: Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; info!( @@ -776,6 +777,7 @@ impl EpochManager

{ &onchain_execution_config, &features, rand_config, + rand_msg_rx, ) .await; @@ -1017,6 +1019,14 @@ impl EpochManager

{ .initialize_shared_component(&epoch_state, &consensus_config) .await; + let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::( + QueueStyle::FIFO, + 100, + None, + ); + + self.rand_manager_msg_tx = Some(rand_msg_tx); + if consensus_config.is_dag_enabled() { self.start_new_epoch_with_dag( epoch_state, @@ -1027,6 +1037,7 @@ impl EpochManager

{ payload_manager, rand_config, &features, + rand_msg_rx, ) .await } else { @@ -1039,6 +1050,7 @@ impl EpochManager

{ payload_manager, rand_config, &features, + rand_msg_rx, ) .await } @@ -1080,6 +1092,7 @@ impl EpochManager

{ payload_manager: Arc, rand_config: Option, features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { @@ -1094,6 +1107,7 @@ impl EpochManager

{ payload_manager, rand_config, features.clone(), + rand_msg_rx, ) .await }, @@ -1120,6 +1134,7 @@ impl EpochManager

{ payload_manager: Arc, rand_config: Option, features: &Features, + rand_msg_rx: aptos_channel::Receiver, ) { let epoch = epoch_state.epoch; let consensus_key = new_consensus_key_from_storage(&self.config.safety_rules.backend) @@ -1141,6 +1156,7 @@ impl EpochManager

{ &on_chain_execution_config, features, rand_config, + rand_msg_rx, ) .await; diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs index 2d30f48609b045..fcc81ab66441a9 100644 --- a/consensus/src/pipeline/execution_client.rs +++ b/consensus/src/pipeline/execution_client.rs @@ -61,7 +61,8 @@ pub trait TExecutionClient: Send + Sync { onchain_execution_config: &OnChainExecutionConfig, features: &Features, rand_config: Option, - ) -> Option>; + rand_msg_rx: aptos_channel::Receiver, + ); /// This is needed for some DAG tests. Clean this up as a TODO. fn get_execution_channel(&self) -> Option>; @@ -170,7 +171,8 @@ impl ExecutionProxyClient { commit_signer_provider: Arc, epoch_state: Arc, rand_config: Option, - ) -> Option> { + rand_msg_rx: aptos_channel::Receiver, + ) { let network_sender = NetworkSender::new( self.author, self.network_sender.clone(), @@ -187,53 +189,44 @@ impl ExecutionProxyClient { Some(&counters::BUFFER_MANAGER_MSGS), ); - let ( - execution_ready_block_tx, - execution_ready_block_rx, - maybe_reset_tx_to_rand_manager, - maybe_rand_msg_tx, - ) = if let Some(rand_config) = rand_config { - let (ordered_block_tx, ordered_block_rx) = unbounded::(); - let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::(); - let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::< - AccountAddress, - IncomingRandGenRequest, - >(QueueStyle::FIFO, 100, None); - - let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::(); - let consensus_key = - load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules) - .expect("Failed in loading consensus key for ExecutionProxyClient."); - let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); - - let rand_manager = RandManager::::new( - self.author, - epoch_state.clone(), - signer, - rand_config, - rand_ready_block_tx, - Arc::new(network_sender.clone()), - self.rand_storage.clone(), - self.bounded_executor.clone(), - ); - - tokio::spawn(rand_manager.start( - ordered_block_rx, - rand_msg_rx, - reset_rand_manager_rx, - self.bounded_executor.clone(), - )); - - ( - ordered_block_tx, - rand_ready_block_rx, - Some(reset_tx_to_rand_manager), - Some(rand_msg_tx), - ) - } else { - let (ordered_block_tx, ordered_block_rx) = unbounded(); - (ordered_block_tx, ordered_block_rx, None, None) - }; + let (execution_ready_block_tx, execution_ready_block_rx, maybe_reset_tx_to_rand_manager) = + if let Some(rand_config) = rand_config { + let (ordered_block_tx, ordered_block_rx) = unbounded::(); + let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::(); + + let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::(); + let consensus_key = + load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules) + .expect("Failed in loading consensus key for ExecutionProxyClient."); + let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key)); + + let rand_manager = RandManager::::new( + self.author, + epoch_state.clone(), + signer, + rand_config, + rand_ready_block_tx, + Arc::new(network_sender.clone()), + self.rand_storage.clone(), + self.bounded_executor.clone(), + ); + + tokio::spawn(rand_manager.start( + ordered_block_rx, + rand_msg_rx, + reset_rand_manager_rx, + self.bounded_executor.clone(), + )); + + ( + ordered_block_tx, + rand_ready_block_rx, + Some(reset_tx_to_rand_manager), + ) + } else { + let (ordered_block_tx, ordered_block_rx) = unbounded(); + (ordered_block_tx, ordered_block_rx, None) + }; self.handle.write().init( execution_ready_block_tx, @@ -266,8 +259,6 @@ impl ExecutionProxyClient { tokio::spawn(signing_phase.start()); tokio::spawn(persisting_phase.start()); tokio::spawn(buffer_manager.start()); - - maybe_rand_msg_tx } } @@ -282,11 +273,13 @@ impl TExecutionClient for ExecutionProxyClient { onchain_execution_config: &OnChainExecutionConfig, features: &Features, rand_config: Option, - ) -> Option> { + rand_msg_rx: aptos_channel::Receiver, + ) { let maybe_rand_msg_tx = self.spawn_decoupled_execution( commit_signer_provider, epoch_state.clone(), rand_config, + rand_msg_rx, ); let transaction_shuffler = @@ -457,8 +450,8 @@ impl TExecutionClient for DummyExecutionClient { _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, _rand_config: Option, - ) -> Option> { - None + _rand_msg_rx: aptos_channel::Receiver, + ) { } fn get_execution_channel(&self) -> Option> { diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index 9aa466588139d8..579812c75207b0 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -98,8 +98,8 @@ impl TExecutionClient for MockExecutionClient { _onchain_execution_config: &OnChainExecutionConfig, _features: &Features, _rand_config: Option, - ) -> Option> { - None + _rand_msg_rx: aptos_channel::Receiver, + ) { } fn get_execution_channel(&self) -> Option> {