From e13b34e714d387cc4e5f87f3f2ea17d5777903d8 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 17 Jul 2024 08:30:09 -0400 Subject: [PATCH] [Consensus Observer] Fix randomness_stall_recovery smoke test. --- .../src/config/consensus_observer_config.rs | 3 +- consensus/src/consensus_observer/observer.rs | 65 +++++++++++++------ consensus/src/consensus_provider.rs | 2 +- .../randomness/randomness_stall_recovery.rs | 17 +++++ 4 files changed, 64 insertions(+), 23 deletions(-) diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs index 8e68ce24605c6..d18c0faaca85e 100644 --- a/config/src/config/consensus_observer_config.rs +++ b/config/src/config/consensus_observer_config.rs @@ -106,8 +106,9 @@ impl ConfigOptimizer for ConsensusObserverConfig { }, NodeType::PublicFullnode => { if ENABLE_ON_PUBLIC_FULLNODES && !observer_manually_set { - // Only enable the observer for PFNs + // Enable both the observer and the publisher for PFNs consensus_observer_config.observer_enabled = true; + consensus_observer_config.publisher_enabled = true; modified_config = true; } }, diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs index 23f48e4af5f6e..7b5beadd112a7 100644 --- a/consensus/src/consensus_observer/observer.rs +++ b/consensus/src/consensus_observer/observer.rs @@ -27,7 +27,7 @@ use crate::{ state_replication::StateComputerCommitCallBackType, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; +use aptos_config::{config::NodeConfig, network_id::PeerNetworkId}; use aptos_consensus_types::{pipeline, pipelined_block::PipelinedBlock}; use aptos_crypto::{bls12381, Genesis}; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; @@ -46,7 +46,7 @@ use aptos_types::{ ledger_info::LedgerInfoWithSignatures, on_chain_config::{ OnChainConsensusConfig, OnChainExecutionConfig, OnChainRandomnessConfig, - RandomnessConfigMoveStruct, ValidatorSet, + RandomnessConfigMoveStruct, RandomnessConfigSeqNum, ValidatorSet, }, validator_signer::ValidatorSigner, }; @@ -65,8 +65,8 @@ const LOG_MESSAGES_AT_INFO_LEVEL: bool = true; /// The consensus observer receives consensus updates and propagates them to the execution pipeline pub struct ConsensusObserver { - // The configuration of the consensus observer - consensus_observer_config: ConsensusObserverConfig, + // The configuration of the node + node_config: NodeConfig, // The consensus observer client to send network messages consensus_observer_client: Arc>>, @@ -107,7 +107,7 @@ pub struct ConsensusObserver { impl ConsensusObserver { pub fn new( - consensus_observer_config: ConsensusObserverConfig, + node_config: NodeConfig, consensus_observer_client: Arc< ConsensusObserverClient>, >, @@ -123,8 +123,12 @@ impl ConsensusObserver { .get_latest_ledger_info() .expect("Failed to read latest ledger info!"); + // Get the consensus observer config + let consensus_observer_config = node_config.consensus_observer; + + // Create the consensus observer Self { - consensus_observer_config, + node_config, consensus_observer_client, epoch_state: None, quorum_store_enabled: false, // Updated on epoch changes @@ -350,13 +354,13 @@ impl ConsensusObserver { // Send a subscription request to the peer and wait for the response. // Note: it is fine to block here because we assume only a single active subscription. let subscription_request = ConsensusObserverRequest::Subscribe; + let request_timeout_ms = self + .node_config + .consensus_observer + .network_request_timeout_ms; let response = self .consensus_observer_client - .send_rpc_request_to_peer( - selected_peer, - subscription_request, - self.consensus_observer_config.network_request_timeout_ms, - ) + .send_rpc_request_to_peer(selected_peer, subscription_request, request_timeout_ms) .await; // Process the response and update the active subscription @@ -371,7 +375,7 @@ impl ConsensusObserver { // Update the active subscription let subscription = ConsensusObserverSubscription::new( - self.consensus_observer_config, + self.node_config.consensus_observer, self.db_reader.clone(), *selected_peer, self.time_service.clone(), @@ -994,7 +998,7 @@ impl ConsensusObserver { // Send an unsubscribe request to the peer and process the response. // Note: we execute this asynchronously, as we don't need to wait for the response. let consensus_observer_client = self.consensus_observer_client.clone(); - let consensus_observer_config = self.consensus_observer_config; + let consensus_observer_config = self.node_config.consensus_observer; tokio::spawn(async move { // Send the unsubscribe request to the peer let unsubscribe_request = ConsensusObserverRequest::Unsubscribe; @@ -1096,7 +1100,7 @@ impl ConsensusObserver { ) = &mut self.reconfig_events { - extract_on_chain_configs(reconfig_events).await + extract_on_chain_configs(&self.node_config, reconfig_events).await } else { panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!") }; @@ -1154,8 +1158,8 @@ impl ConsensusObserver { ) { // If the consensus publisher is enabled but the observer is disabled, // we should only forward incoming requests to the consensus publisher. - if self.consensus_observer_config.publisher_enabled - && !self.consensus_observer_config.observer_enabled + if self.node_config.consensus_observer.publisher_enabled + && !self.node_config.consensus_observer.observer_enabled { self.start_publisher_forwarding(&mut network_service_events) .await; @@ -1164,7 +1168,9 @@ impl ConsensusObserver { // Create a progress check ticker let mut progress_check_interval = IntervalStream::new(interval(Duration::from_millis( - self.consensus_observer_config.progress_check_interval_ms, + self.node_config + .consensus_observer + .progress_check_interval_ms, ))) .fuse(); @@ -1270,6 +1276,7 @@ fn check_root_epoch_and_round( /// A simple helper function that extracts the on-chain configs from the reconfig events async fn extract_on_chain_configs( + node_config: &NodeConfig, reconfig_events: &mut ReconfigNotificationListener, ) -> ( Arc, @@ -1318,7 +1325,21 @@ async fn extract_on_chain_configs( let execution_config = onchain_execution_config.unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing()); - // Extract the randomness config (or use the default if it's missing) + // Extract the randomness config sequence number (or use the default if it's missing) + let onchain_randomness_config_seq_num: anyhow::Result = + on_chain_configs.get(); + if let Err(error) = &onchain_randomness_config_seq_num { + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to read on-chain randomness config seq num! Error: {:?}", + error + )) + ); + } + let onchain_randomness_config_seq_num = onchain_randomness_config_seq_num + .unwrap_or_else(|_| RandomnessConfigSeqNum::default_if_missing()); + + // Extract the randomness config let onchain_randomness_config: anyhow::Result = on_chain_configs.get(); if let Err(error) = &onchain_randomness_config { @@ -1329,9 +1350,11 @@ async fn extract_on_chain_configs( )) ); } - let onchain_randomness_config = onchain_randomness_config - .and_then(OnChainRandomnessConfig::try_from) - .unwrap_or_else(|_| OnChainRandomnessConfig::default_if_missing()); + let onchain_randomness_config = OnChainRandomnessConfig::from_configs( + node_config.randomness_override_seq_num, + onchain_randomness_config_seq_num.seq_num, + onchain_randomness_config.ok(), + ); // Return the extracted epoch state and on-chain configs ( diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index e0e12bd453085..2e61a6c96c005 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -195,7 +195,7 @@ pub fn start_consensus_observer( // Create the consensus observer let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let consensus_observer = ConsensusObserver::new( - node_config.consensus_observer, + node_config.clone(), consensus_observer_client, aptos_db.reader.clone(), execution_client, diff --git a/testsuite/smoke-test/src/randomness/randomness_stall_recovery.rs b/testsuite/smoke-test/src/randomness/randomness_stall_recovery.rs index db9b1fba5e92a..aa71e069bd3d4 100644 --- a/testsuite/smoke-test/src/randomness/randomness_stall_recovery.rs +++ b/testsuite/smoke-test/src/randomness/randomness_stall_recovery.rs @@ -83,6 +83,23 @@ async fn randomness_stall_recovery() { tokio::time::sleep(Duration::from_secs(5)).await; } + info!("Hot-fixing the VFNs."); + for (idx, vfn) in swarm.fullnodes_mut().enumerate() { + info!("Stopping VFN {}.", idx); + vfn.stop(); + let config_path = vfn.config_path(); + let mut vfn_override_config = OverrideNodeConfig::load_config(config_path.clone()).unwrap(); + vfn_override_config + .override_config_mut() + .randomness_override_seq_num = 1; + info!("Updating VFN {} config.", idx); + vfn_override_config.save_config(config_path).unwrap(); + info!("Restarting VFN {}.", idx); + vfn.start().unwrap(); + info!("Let VFN {} bake for 5 secs.", idx); + tokio::time::sleep(Duration::from_secs(5)).await; + } + let liveness_check_result = swarm .liveness_check(Instant::now().add(Duration::from_secs(30))) .await;