From 0870fff0b08044b7353c051e792e22f419640bbc Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 12:28:39 -0500 Subject: [PATCH] add tpu vote port to dockerfile and other review changes --- core/src/banking_stage.rs | 31 ++++++++++++++++++++++--------- core/src/cluster_info.rs | 7 ++++--- core/src/contact_info.rs | 3 ++- core/src/tpu.rs | 17 +++++++++++------ sdk/docker-solana/Dockerfile | 2 ++ 5 files changed, 41 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ad7e3209b0585d..a23cedf0fc65de 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -86,7 +86,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; -const MIN_THREADS_VOTES: u32 = 2; +const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; #[derive(Debug, Default)] @@ -244,6 +244,7 @@ pub enum BufferedPacketsDecision { Hold, } +#[derive(Debug, Clone)] pub enum ForwardOption { NotForward, ForwardTpuVote, @@ -279,7 +280,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, - vote_verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -295,12 +296,17 @@ impl BankingStage { ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. - assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING); + assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) { - std::cmp::Ordering::Less => (verified_receiver.clone(), ForwardOption::ForwardTransaction), - std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), ForwardOption::ForwardTpuVote), + std::cmp::Ordering::Less => { + (verified_receiver.clone(), ForwardOption::ForwardTransaction) + } + std::cmp::Ordering::Equal => ( + tpu_verified_vote_receiver.clone(), + ForwardOption::ForwardTpuVote, + ), std::cmp::Ordering::Greater => { // Disable forwarding of vote transactions // from gossip. Note - votes can also arrive from tpu @@ -616,8 +622,15 @@ impl BankingStage { data_budget: &DataBudget, ) { let addr = match forward_option { - ForwardOption::NotForward => {if !hold { buffered_packets.clear(); } return}, - ForwardOption::ForwardTransaction => next_leader_tpu_forwards(cluster_info, poh_recorder), + ForwardOption::NotForward => { + if !hold { + buffered_packets.clear(); + } + return; + } + ForwardOption::ForwardTransaction => { + next_leader_tpu_forwards(cluster_info, poh_recorder) + } ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), }; let addr = match addr { @@ -716,7 +729,7 @@ impl BankingStage { env::var("SOLANA_BANKING_THREADS") .map(|x| x.parse().unwrap_or(NUM_THREADS)) .unwrap_or(NUM_THREADS), - MIN_THREADS_VOTES + MIN_THREADS_BANKING, + NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING, ) } @@ -2821,7 +2834,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let data_budget = DataBudget::default(); BankingStage::handle_forwarding( - true, + &ForwardOption::ForwardTransaction, &cluster_info, &mut unprocessed_packets, &poh_recorder, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b1089d8817f0e7..b9797bcc8154ed 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -824,7 +824,7 @@ impl ClusterInfo { } let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { @@ -839,6 +839,7 @@ impl ClusterInfo { "-".to_string() }, addr_to_string(&ip_addr, &node.gossip), + addr_to_string(&ip_addr, &node.tpu_vote), addr_to_string(&ip_addr, &node.tpu), addr_to_string(&ip_addr, &node.tpu_forwards), addr_to_string(&ip_addr, &node.tvu), @@ -853,9 +854,9 @@ impl ClusterInfo { format!( "IP Address |Age(ms)| Node identifier \ - | Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ + | Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ ------------------+-------+----------------------------------------------+---------+\ - ------+------+------+------+------+------+------+--------\n\ + ------+------+-------+------+------+------+------+------+--------\n\ {}\ Nodes: {}{}{}", nodes.join(""), diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index cf9580351a18b0..98baf8f249a797 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -144,12 +144,12 @@ impl ContactInfo { let gossip = next_port(&bind_addr, 1); let tvu = next_port(&bind_addr, 2); let tpu_forwards = next_port(&bind_addr, 3); - let tpu_vote = next_port(&bind_addr, 1); let tvu_forwards = next_port(&bind_addr, 4); let repair = next_port(&bind_addr, 5); let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); let serve_repair = next_port(&bind_addr, 6); + let tpu_vote = next_port(&bind_addr, 7); Self { id: *pubkey, gossip, @@ -307,6 +307,7 @@ mod tests { assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); + assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241")); } #[test] diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3ae2c8644c4ca7..c73934fd247206 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -92,18 +92,23 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verified_sender, verifier) }; - let (vote_verified_sender, vote_verified_receiver) = unbounded(); + let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); - SigVerifyStage::new(vote_packet_receiver, vote_verified_sender, verifier) + SigVerifyStage::new( + vote_packet_receiver, + verified_tpu_vote_packets_sender, + verifier, + ) }; - let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); + let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = + unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( &exit, cluster_info.clone(), - verified_vote_packets_sender, + verified_gossip_vote_packets_sender, &poh_recorder, vote_tracker, bank_forks, @@ -120,8 +125,8 @@ impl Tpu { &cluster_info, poh_recorder, verified_receiver, - vote_verified_receiver, - verified_vote_packets_receiver, + verified_tpu_vote_packets_receiver, + verified_gossip_vote_packets_receiver, transaction_status_sender, replay_vote_sender, ); diff --git a/sdk/docker-solana/Dockerfile b/sdk/docker-solana/Dockerfile index 1beecc8ae8ef2d..6452efdcc509eb 100644 --- a/sdk/docker-solana/Dockerfile +++ b/sdk/docker-solana/Dockerfile @@ -30,6 +30,8 @@ EXPOSE 8006/udp EXPOSE 8007/udp # broadcast EXPOSE 8008/udp +# tpu_vote +EXPOSE 8009/udp RUN apt update && \ apt-get install -y bzip2 libssl-dev && \