From ee834bbbffc5470fcc097b990ec7e2ed70baf164 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 5 Dec 2024 10:55:15 -0800 Subject: [PATCH 1/3] add test for small committee --- consensus/core/src/authority_node.rs | 99 ++++++++++++++++++++++++++++ consensus/core/src/round_prober.rs | 2 +- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 1b8b818b79721..8ee7069e11f31 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -573,6 +573,105 @@ mod tests { } } + #[rstest] + #[tokio::test(flavor = "current_thread")] + async fn test_small_committee( + #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork, + #[values(2, 3)] num_authorities: usize, + ) { + let db_registry = Registry::new(); + DBMetrics::init(&db_registry); + + let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]); + let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE(); + + let temp_dirs = (0..num_authorities) + .map(|_| TempDir::new().unwrap()) + .collect::>(); + + let mut output_receivers = Vec::with_capacity(committee.size()); + let mut authorities: Vec = Vec::with_capacity(committee.size()); + let mut boot_counters = vec![0; num_authorities]; + + for (index, _authority_info) in committee.authorities() { + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers.push(receiver); + authorities.push(authority); + } + + const NUM_TRANSACTIONS: u8 = 15; + let mut submitted_transactions = BTreeSet::>::new(); + for i in 0..NUM_TRANSACTIONS { + let txn = vec![i; 16]; + submitted_transactions.insert(txn.clone()); + authorities[i as usize % authorities.len()] + .transaction_client() + .submit(vec![txn]) + .await + .unwrap(); + } + + for receiver in &mut output_receivers { + let mut expected_transactions = submitted_transactions.clone(); + loop { + let committed_subdag = + tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + for b in committed_subdag.blocks { + for txn in b.transactions().iter().map(|t| t.data().to_vec()) { + assert!( + expected_transactions.remove(&txn), + "Transaction not submitted or already seen: {:?}", + txn + ); + } + } + assert_eq!(committed_subdag.reputation_scores_desc, vec![]); + if expected_transactions.is_empty() { + break; + } + } + } + + // Stop authority 0. + let index = committee.to_authority_index(1).unwrap(); + authorities.remove(index.value()).stop().await; + sleep(Duration::from_secs(10)).await; + + // Restart authority 0 and let it run. + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers[index] = receiver; + authorities.insert(index.value(), authority); + sleep(Duration::from_secs(10)).await; + + // Stop all authorities and exit. + for authority in authorities { + authority.stop().await; + } + } + #[rstest] #[tokio::test(flavor = "current_thread")] async fn test_amnesia_recovery_success( diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 9fb8022ed9703..9099b6b368c94 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -159,7 +159,7 @@ impl RoundProber { .collect::>(); let last_proposed_round = local_highest_accepted_rounds[own_index]; - // For our own index, the highest recieved & accepted round is our last + // For our own index, the highest received & accepted round is our last // accepted round or our last proposed round. highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds(); highest_accepted_rounds[own_index] = local_highest_accepted_rounds; From 0e629ba01e6b66f61f842272b0662c84eeac35c4 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 5 Dec 2024 16:14:38 -0800 Subject: [PATCH 2/3] fixes --- consensus/core/src/authority_node.rs | 7 ++--- consensus/core/src/core.rs | 20 +++----------- consensus/core/src/dag_state.rs | 15 ++++++----- consensus/core/src/synchronizer.rs | 26 ++++++++++++------- .../src/unit_tests/mysticeti_manager_tests.rs | 2 +- 5 files changed, 33 insertions(+), 37 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 8ee7069e11f31..706f8154f49e8 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -256,7 +256,8 @@ where block_manager, // For streaming RPC, Core will be notified when consumer is available. // For non-streaming RPC, there is no way to know so default to true. - !N::Client::SUPPORT_STREAMING, + // When there is only one (this) authority, assume subscriber exists. + !N::Client::SUPPORT_STREAMING || context.committee.size() == 1, commit_observer, core_signals, protocol_keypair, @@ -577,7 +578,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_small_committee( #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork, - #[values(2, 3)] num_authorities: usize, + #[values(1, 2, 3)] num_authorities: usize, ) { let db_registry = Registry::new(); DBMetrics::init(&db_registry); @@ -646,7 +647,7 @@ mod tests { } // Stop authority 0. - let index = committee.to_authority_index(1).unwrap(); + let index = committee.to_authority_index(0).unwrap(); authorities.remove(index.value()).stop().await; sleep(Duration::from_secs(10)).await; diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index b798d367339f9..70c2909483a5a 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -131,10 +131,7 @@ impl Core { .with_pipeline(true) .build(); - // Recover the last proposed block - let last_proposed_block = dag_state - .read() - .get_last_block_for_authority(context.own_index); + let last_proposed_block = dag_state.read().get_last_proposed_block(); // Recover the last included ancestor rounds based on the last proposed block. That will allow // to perform the next block proposal by using ancestor blocks of higher rounds and avoid @@ -207,10 +204,6 @@ impl Core { "Waiting for {} ms while recovering ancestors from storage", wait_ms ); - println!( - "Waiting for {} ms while recovering ancestors from storage", - wait_ms - ); std::thread::sleep(Duration::from_millis(wait_ms)); } // Recover the last available quorum to correctly advance the threshold clock. @@ -223,13 +216,10 @@ impl Core { { last_proposed_block } else { - let last_proposed_block = self - .dag_state - .read() - .get_last_block_for_authority(self.context.own_index); + let last_proposed_block = self.dag_state.read().get_last_proposed_block(); if self.should_propose() { - assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); + assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher than genesis should have been produced during recovery"); } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. @@ -1001,9 +991,7 @@ impl Core { } fn last_proposed_block(&self) -> VerifiedBlock { - self.dag_state - .read() - .get_last_block_for_authority(self.context.own_index) + self.dag_state.read().get_last_proposed_block() } } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 1fc9a30a872e3..2c09b29655710 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -452,7 +452,13 @@ impl DagState { blocks.first().cloned().unwrap() } - /// Retrieves the last block proposed for the specified `authority`. If no block is found in cache + /// Gets the last proposed block from this authority. + /// If no block is proposed yet, returns the genesis block. + pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock { + self.get_last_block_for_authority(self.context.own_index) + } + + /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache /// then the genesis block is returned as no other block has been received from that authority. pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock { if let Some(last) = self.recent_refs_by_authority[authority].last() { @@ -2209,12 +2215,7 @@ mod test { .find(|block| block.author() == context.own_index) .unwrap(); - assert_eq!( - dag_state - .read() - .get_last_block_for_authority(context.own_index), - my_genesis - ); + assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis); } // WHEN adding some blocks for authorities, only the last ones should be returned diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 92c2895fe580d..3b6d1c4f4fa9f 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -710,6 +710,7 @@ impl Synchronizer Synchronizer Synchronizer) -> Arc Date: Thu, 5 Dec 2024 17:10:31 -0800 Subject: [PATCH 3/3] add simtest --- crates/sui-benchmark/tests/simtest.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/sui-benchmark/tests/simtest.rs b/crates/sui-benchmark/tests/simtest.rs index 452594e1d68d7..627f3cdcba731 100644 --- a/crates/sui-benchmark/tests/simtest.rs +++ b/crates/sui-benchmark/tests/simtest.rs @@ -25,7 +25,7 @@ mod test { LocalValidatorAggregatorProxy, ValidatorProxy, }; use sui_config::node::AuthorityOverloadConfig; - use sui_config::{ExecutionCacheConfig, AUTHORITIES_DB_NAME, SUI_KEYSTORE_FILENAME}; + use sui_config::{AUTHORITIES_DB_NAME, SUI_KEYSTORE_FILENAME}; use sui_core::authority::authority_store_tables::AuthorityPerpetualTables; use sui_core::authority::framework_injection; use sui_core::authority::AuthorityState; @@ -173,13 +173,10 @@ mod test { test_cluster.wait_for_epoch_all_nodes(1).await; } - #[ignore("Disabled due to flakiness - re-enable when failure is fixed")] #[sim_test(config = "test_config()")] async fn test_simulated_load_reconfig_restarts() { - // TODO added to invalidate a failing test seed in CI. Remove me - tokio::time::sleep(Duration::from_secs(1)).await; sui_protocol_config::ProtocolConfig::poison_get_for_min_version(); - let test_cluster = build_test_cluster(4, 1000, 1).await; + let test_cluster = build_test_cluster(4, 5_000, 1).await; let node_restarter = test_cluster .random_node_restarter() .with_kill_interval_secs(5, 15) @@ -188,6 +185,13 @@ mod test { test_simulated_load(test_cluster, 120).await; } + #[sim_test(config = "test_config()")] + async fn test_simulated_load_small_committee_reconfig() { + sui_protocol_config::ProtocolConfig::poison_get_for_min_version(); + let test_cluster = build_test_cluster(1, 5_000, 0).await; + test_simulated_load(test_cluster, 120).await; + } + /// Get a list of nodes that we don't want to kill in the crash recovery tests. /// This includes the client node which is the node that is running the test, as well as /// rpc fullnode which are needed to run the benchmark.