From da1a718e7b8a7e5c8218866f87907499245ad9ed Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 16 May 2022 17:04:55 +0000 Subject: [PATCH] Block packets in vote-only mode (#24906) (#25218) (cherry picked from commit 3d96a1ab76826e20599ac052d3ceccf5671b1152) Co-authored-by: sakridge --- bench-streamer/src/main.rs | 1 + core/src/ancestor_hashes_service.rs | 2 ++ core/src/fetch_stage.rs | 7 ++++ core/src/replay_stage.rs | 54 +++++++++++++++++++++++++++-- core/src/serve_repair_service.rs | 1 + core/src/shred_fetch_stage.rs | 1 + core/src/tpu.rs | 1 + gossip/src/gossip_service.rs | 1 + runtime/src/bank_forks.rs | 18 +++++++++- streamer/src/streamer.rs | 14 +++++++- 10 files changed, 96 insertions(+), 4 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index f9f7b0283f30f5..6a2c3fa43033e0 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -101,6 +101,7 @@ fn main() -> Result<()> { stats.clone(), 1, true, + None, )); } diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index c8e343fdac7bfa..93507c36e2d105 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -158,6 +158,7 @@ impl AncestorHashesService { )), 1, false, + None, ); let ancestor_hashes_request_statuses: Arc> = @@ -929,6 +930,7 @@ mod test { )), 1, false, + None, ); let t_listen = ServeRepair::listen( responder_serve_repair, diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 1ece659b90074c..f1d0cf81399119 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -53,6 +53,7 @@ impl FetchStage { &vote_sender, poh_recorder, coalesce_ms, + None, ), receiver, vote_receiver, @@ -68,6 +69,7 @@ impl FetchStage { vote_sender: &PacketBatchSender, poh_recorder: &Arc>, coalesce_ms: u64, + in_vote_only_mode: Option>, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); @@ -81,6 +83,7 @@ impl FetchStage { vote_sender, poh_recorder, coalesce_ms, + in_vote_only_mode, ) } @@ -135,6 +138,7 @@ impl FetchStage { vote_sender: &PacketBatchSender, poh_recorder: &Arc>, coalesce_ms: u64, + in_vote_only_mode: Option>, ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); @@ -150,6 +154,7 @@ impl FetchStage { tpu_stats.clone(), coalesce_ms, true, + in_vote_only_mode.clone(), ) }) .collect(); @@ -167,6 +172,7 @@ impl FetchStage { tpu_forward_stats.clone(), coalesce_ms, true, + in_vote_only_mode.clone(), ) }) .collect(); @@ -183,6 +189,7 @@ impl FetchStage { tpu_vote_stats.clone(), coalesce_ms, true, + None, ) }) .collect(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 773e703d180dea..b6508f7503b8e5 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -47,7 +47,7 @@ use { solana_runtime::{ accounts_background_service::AbsRequestSender, bank::{Bank, NewBankOptions}, - bank_forks::BankForks, + bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_sender_types::ReplayVoteSender, @@ -421,6 +421,8 @@ impl ReplayStage { last_refresh_time: Instant::now(), last_print_time: Instant::now(), }; + let in_vote_only_mode = bank_forks.read().unwrap().get_vote_only_mode_signal(); + loop { // Stop getting entries if we get exit signal if exit.load(Ordering::Relaxed) { @@ -587,6 +589,8 @@ impl ReplayStage { .select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks); select_forks_time.stop(); + Self::check_for_vote_only_mode(heaviest_bank.slot(), forks_root, &in_vote_only_mode, &bank_forks); + if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() { if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) { Self::refresh_last_vote(&mut tower, @@ -874,6 +878,41 @@ impl ReplayStage { } } + fn check_for_vote_only_mode( + heaviest_bank_slot: Slot, + forks_root: Slot, + in_vote_only_mode: &AtomicBool, + bank_forks: &RwLock, + ) { + if heaviest_bank_slot.saturating_sub(forks_root) > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY { + if !in_vote_only_mode.load(Ordering::Relaxed) + && in_vote_only_mode + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + let bank_forks = bank_forks.read().unwrap(); + datapoint_warn!( + "bank_forks-entering-vote-only-mode", + ("banks_len", bank_forks.len(), i64), + ("heaviest_bank", heaviest_bank_slot, i64), + ("root", bank_forks.root(), i64), + ); + } + } else if in_vote_only_mode.load(Ordering::Relaxed) + && in_vote_only_mode + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + let bank_forks = bank_forks.read().unwrap(); + datapoint_warn!( + "bank_forks-exiting-vote-only-mode", + ("banks_len", bank_forks.len(), i64), + ("heaviest_bank", heaviest_bank_slot, i64), + ("root", bank_forks.root(), i64), + ); + } + } + fn is_partition_detected( ancestors: &HashMap>, last_voted_slot: Slot, @@ -1505,7 +1544,6 @@ impl ReplayStage { ); let root_distance = poh_slot - root_slot; - const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400; let vote_only_bank = if root_distance > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY { datapoint_info!("vote-only-bank", ("slot", poh_slot, i64)); true @@ -6057,4 +6095,16 @@ pub mod tests { ) -> bool { map1.len() == map2.len() && map1.iter().all(|(k, v)| map2.get(k).unwrap() == v) } + + #[test] + fn test_check_for_vote_only_mode() { + let in_vote_only_mode = AtomicBool::new(false); + let genesis_config = create_genesis_config(10_000).genesis_config; + let bank0 = Bank::new_for_tests(&genesis_config); + let bank_forks = RwLock::new(BankForks::new(bank0)); + ReplayStage::check_for_vote_only_mode(1000, 0, &in_vote_only_mode, &bank_forks); + assert!(in_vote_only_mode.load(Ordering::Relaxed)); + ReplayStage::check_for_vote_only_mode(10, 0, &in_vote_only_mode, &bank_forks); + assert!(!in_vote_only_mode.load(Ordering::Relaxed)); + } } diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 4985c3c33b8f34..049b878faf26f5 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -40,6 +40,7 @@ impl ServeRepairService { Arc::new(StreamerReceiveStats::new("serve_repair_receiver")), 1, false, + None, ); let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder( diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 5be6c7c59b9a41..8bca754d57d8dc 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -156,6 +156,7 @@ impl ShredFetchStage { Arc::new(StreamerReceiveStats::new("packet_modifier")), 1, true, + None, ) }) .collect(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 17b360187c2cce..617cda34a63af1 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -95,6 +95,7 @@ impl Tpu { &vote_packet_sender, poh_recorder, tpu_coalesce_ms, + Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 4be241a7f6bdd2..293bfe3d261341 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -59,6 +59,7 @@ impl GossipService { Arc::new(StreamerReceiveStats::new("gossip_receiver")), 1, false, + None, ); let (consume_sender, listen_receiver) = channel(); // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 4c64069edf535a..05a6d93554009a 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -12,11 +12,13 @@ use { std::{ collections::{hash_map::Entry, HashMap, HashSet}, ops::Index, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, time::Instant, }, }; +pub const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400; + struct SetRootTimings { total_parent_banks: i64, total_squash_cache_ms: i64, @@ -41,6 +43,7 @@ pub struct BankForks { pub accounts_hash_interval_slots: Slot, last_accounts_hash_slot: Slot, + in_vote_only_mode: Arc, } impl Index for BankForks { @@ -60,6 +63,18 @@ impl BankForks { self.banks.clone() } + pub fn get_vote_only_mode_signal(&self) -> Arc { + self.in_vote_only_mode.clone() + } + + pub fn len(&self) -> usize { + self.banks.len() + } + + pub fn is_empty(&self) -> bool { + self.banks.is_empty() + } + /// Create a map of bank slot id to the set of ancestors for the bank slot. pub fn ancestors(&self) -> HashMap> { let root = self.root; @@ -145,6 +160,7 @@ impl BankForks { snapshot_config: None, accounts_hash_interval_slots: std::u64::MAX, last_accounts_hash_slot: root, + in_vote_only_mode: Arc::new(AtomicBool::new(false)), } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 043b1db2db7d41..2669c8b1945eef 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -15,7 +15,7 @@ use { mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, Arc, }, - thread::{Builder, JoinHandle}, + thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, thiserror::Error, @@ -95,6 +95,7 @@ fn recv_loop( stats: &StreamerReceiveStats, coalesce_ms: u64, use_pinned_memory: bool, + in_vote_only_mode: Option>, ) -> Result<()> { loop { let mut packet_batch = if use_pinned_memory { @@ -108,6 +109,14 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } + + if let Some(ref in_vote_only_mode) = in_vote_only_mode { + if in_vote_only_mode.load(Ordering::Relaxed) { + sleep(Duration::from_millis(1)); + continue; + } + } + if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) { if len > 0 { let StreamerReceiveStats { @@ -139,6 +148,7 @@ pub fn receiver( stats: Arc, coalesce_ms: u64, use_pinned_memory: bool, + in_vote_only_mode: Option>, ) -> JoinHandle<()> { let res = socket.set_read_timeout(Some(Duration::new(1, 0))); assert!(res.is_ok(), "streamer::receiver set_read_timeout error"); @@ -153,6 +163,7 @@ pub fn receiver( &stats, coalesce_ms, use_pinned_memory, + in_vote_only_mode, ); }) .unwrap() @@ -293,6 +304,7 @@ mod test { stats.clone(), 1, true, + None, ); const NUM_PACKETS: usize = 5; let t_responder = {