Skip to content

Commit

Permalink
Block packets in vote-only mode (#24906) (#25218)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3d96a1a)

Co-authored-by: sakridge <[email protected]>
  • Loading branch information
mergify[bot] and sakridge authored May 16, 2022
1 parent 268dbe5 commit da1a718
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 4 deletions.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ fn main() -> Result<()> {
stats.clone(),
1,
true,
None,
));
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl AncestorHashesService {
)),
1,
false,
None,
);

let ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>> =
Expand Down Expand Up @@ -929,6 +930,7 @@ mod test {
)),
1,
false,
None,
);
let t_listen = ServeRepair::listen(
responder_serve_repair,
Expand Down
7 changes: 7 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl FetchStage {
&vote_sender,
poh_recorder,
coalesce_ms,
None,
),
receiver,
vote_receiver,
Expand All @@ -68,6 +69,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
Expand All @@ -81,6 +83,7 @@ impl FetchStage {
vote_sender,
poh_recorder,
coalesce_ms,
in_vote_only_mode,
)
}

Expand Down Expand Up @@ -135,6 +138,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);

Expand All @@ -150,6 +154,7 @@ impl FetchStage {
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
Expand All @@ -167,6 +172,7 @@ impl FetchStage {
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
Expand All @@ -183,6 +189,7 @@ impl FetchStage {
tpu_vote_stats.clone(),
coalesce_ms,
true,
None,
)
})
.collect();
Expand Down
54 changes: 52 additions & 2 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender,
bank::{Bank, NewBankOptions},
bank_forks::BankForks,
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_sender_types::ReplayVoteSender,
Expand Down Expand Up @@ -421,6 +421,8 @@ impl ReplayStage {
last_refresh_time: Instant::now(),
last_print_time: Instant::now(),
};
let in_vote_only_mode = bank_forks.read().unwrap().get_vote_only_mode_signal();

loop {
// Stop getting entries if we get exit signal
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -587,6 +589,8 @@ impl ReplayStage {
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);
select_forks_time.stop();

Self::check_for_vote_only_mode(heaviest_bank.slot(), forks_root, &in_vote_only_mode, &bank_forks);

if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
Self::refresh_last_vote(&mut tower,
Expand Down Expand Up @@ -874,6 +878,41 @@ impl ReplayStage {
}
}

fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
in_vote_only_mode: &AtomicBool,
bank_forks: &RwLock<BankForks>,
) {
if heaviest_bank_slot.saturating_sub(forks_root) > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY {
if !in_vote_only_mode.load(Ordering::Relaxed)
&& in_vote_only_mode
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let bank_forks = bank_forks.read().unwrap();
datapoint_warn!(
"bank_forks-entering-vote-only-mode",
("banks_len", bank_forks.len(), i64),
("heaviest_bank", heaviest_bank_slot, i64),
("root", bank_forks.root(), i64),
);
}
} else if in_vote_only_mode.load(Ordering::Relaxed)
&& in_vote_only_mode
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let bank_forks = bank_forks.read().unwrap();
datapoint_warn!(
"bank_forks-exiting-vote-only-mode",
("banks_len", bank_forks.len(), i64),
("heaviest_bank", heaviest_bank_slot, i64),
("root", bank_forks.root(), i64),
);
}
}

fn is_partition_detected(
ancestors: &HashMap<Slot, HashSet<Slot>>,
last_voted_slot: Slot,
Expand Down Expand Up @@ -1505,7 +1544,6 @@ impl ReplayStage {
);

let root_distance = poh_slot - root_slot;
const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400;
let vote_only_bank = if root_distance > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY {
datapoint_info!("vote-only-bank", ("slot", poh_slot, i64));
true
Expand Down Expand Up @@ -6057,4 +6095,16 @@ pub mod tests {
) -> bool {
map1.len() == map2.len() && map1.iter().all(|(k, v)| map2.get(k).unwrap() == v)
}

#[test]
fn test_check_for_vote_only_mode() {
let in_vote_only_mode = AtomicBool::new(false);
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank0 = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank0));
ReplayStage::check_for_vote_only_mode(1000, 0, &in_vote_only_mode, &bank_forks);
assert!(in_vote_only_mode.load(Ordering::Relaxed));
ReplayStage::check_for_vote_only_mode(10, 0, &in_vote_only_mode, &bank_forks);
assert!(!in_vote_only_mode.load(Ordering::Relaxed));
}
}
1 change: 1 addition & 0 deletions core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl ServeRepairService {
Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
1,
false,
None,
);
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder(
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl ShredFetchStage {
Arc::new(StreamerReceiveStats::new("packet_modifier")),
1,
true,
None,
)
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Tpu {
&vote_packet_sender,
poh_recorder,
tpu_coalesce_ms,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);
let (verified_sender, verified_receiver) = unbounded();

Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl GossipService {
Arc::new(StreamerReceiveStats::new("gossip_receiver")),
1,
false,
None,
);
let (consume_sender, listen_receiver) = channel();
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
Expand Down
18 changes: 17 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use {
std::{
collections::{hash_map::Entry, HashMap, HashSet},
ops::Index,
sync::Arc,
sync::{atomic::AtomicBool, Arc},
time::Instant,
},
};

pub const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400;

struct SetRootTimings {
total_parent_banks: i64,
total_squash_cache_ms: i64,
Expand All @@ -41,6 +43,7 @@ pub struct BankForks {

pub accounts_hash_interval_slots: Slot,
last_accounts_hash_slot: Slot,
in_vote_only_mode: Arc<AtomicBool>,
}

impl Index<u64> for BankForks {
Expand All @@ -60,6 +63,18 @@ impl BankForks {
self.banks.clone()
}

pub fn get_vote_only_mode_signal(&self) -> Arc<AtomicBool> {
self.in_vote_only_mode.clone()
}

pub fn len(&self) -> usize {
self.banks.len()
}

pub fn is_empty(&self) -> bool {
self.banks.is_empty()
}

/// Create a map of bank slot id to the set of ancestors for the bank slot.
pub fn ancestors(&self) -> HashMap<Slot, HashSet<Slot>> {
let root = self.root;
Expand Down Expand Up @@ -145,6 +160,7 @@ impl BankForks {
snapshot_config: None,
accounts_hash_interval_slots: std::u64::MAX,
last_accounts_hash_slot: root,
in_vote_only_mode: Arc::new(AtomicBool::new(false)),
}
}

Expand Down
14 changes: 13 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
Arc,
},
thread::{Builder, JoinHandle},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
thiserror::Error,
Expand Down Expand Up @@ -95,6 +95,7 @@ fn recv_loop(
stats: &StreamerReceiveStats,
coalesce_ms: u64,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
Expand All @@ -108,6 +109,14 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}

if let Some(ref in_vote_only_mode) = in_vote_only_mode {
if in_vote_only_mode.load(Ordering::Relaxed) {
sleep(Duration::from_millis(1));
continue;
}
}

if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) {
if len > 0 {
let StreamerReceiveStats {
Expand Down Expand Up @@ -139,6 +148,7 @@ pub fn receiver(
stats: Arc<StreamerReceiveStats>,
coalesce_ms: u64,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Expand All @@ -153,6 +163,7 @@ pub fn receiver(
&stats,
coalesce_ms,
use_pinned_memory,
in_vote_only_mode,
);
})
.unwrap()
Expand Down Expand Up @@ -293,6 +304,7 @@ mod test {
stats.clone(),
1,
true,
None,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
Expand Down

0 comments on commit da1a718

Please sign in to comment.