From 2dc682578f2aac3f746a58477470f8912fb8185c Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Sat, 18 Sep 2021 09:20:06 -0700 Subject: [PATCH 01/11] Add separate vote processing tpu port --- banking-bench/src/main.rs | 3 ++ core/benches/banking_stage.rs | 3 ++ core/src/banking_stage.rs | 62 +++++++++++++++++++++++------------ core/src/cluster_info.rs | 17 +++++++--- core/src/contact_info.rs | 19 ++++++----- core/src/fetch_stage.rs | 31 ++++++++++++++++-- core/src/serve_repair.rs | 6 ++-- core/src/tpu.rs | 15 +++++++++ core/src/validator.rs | 1 + validator/src/main.rs | 3 ++ 10 files changed, 121 insertions(+), 39 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 8ff0f3d3ce2ae4..fe8200ef8ae24a 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -169,6 +169,7 @@ fn main() { let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new(&genesis_config); let mut bank_forks = BankForks::new(bank0); @@ -225,6 +226,7 @@ fn main() { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, replay_vote_sender, @@ -380,6 +382,7 @@ fn main() { ); drop(verified_sender); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1bfc27aef931f1..e2c335a99935a1 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -157,6 +157,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { genesis_config.ticks_per_slot = 10_000; let (verified_sender, verified_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); let mut bank = Bank::new(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench @@ -208,6 +209,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, None, s, @@ -254,6 +256,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { start += chunk_len; start %= verified.len(); }); + drop(tpu_vote_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e5c51e9529f1ab..1dc59fef1be363 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -85,6 +85,9 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; +const MIN_THREADS_VOTES: u32 = 2; +const MIN_THREADS_BANKING: u32 = 1; + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicU64, @@ -247,6 +250,7 @@ impl BankingStage { cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -255,6 +259,7 @@ impl BankingStage { cluster_info, poh_recorder, verified_receiver, + tpu_verified_vote_receiver, verified_vote_receiver, Self::num_threads(), transaction_status_sender, @@ -267,6 +272,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, + vote_verified_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -282,13 +288,17 @@ impl BankingStage { ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. + assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, enable_forwarding) = if i < num_threads - 1 { - (verified_receiver.clone(), true) - } else { - // Disable forwarding of vote transactions, as votes are gossiped - (verified_vote_receiver.clone(), false) + let (verified_receiver, enable_forwarding) = match i.cmp(&(num_threads - 2)) { + std::cmp::Ordering::Less => (verified_receiver.clone(), true), + std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), false), + std::cmp::Ordering::Greater => { + // Disable forwarding of vote transactions + // from gossip. Note - votes can also arrive from tpu + (verified_vote_receiver.clone(), false) + } }; let poh_recorder = poh_recorder.clone(); @@ -697,8 +707,6 @@ impl BankingStage { } pub fn num_threads() -> u32 { - const MIN_THREADS_VOTES: u32 = 1; - const MIN_THREADS_BANKING: u32 = 1; cmp::max( env::var("SOLANA_BANKING_THREADS") .map(|x| x.parse().unwrap_or(NUM_THREADS)) @@ -1525,8 +1533,9 @@ mod tests { let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); - let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (vote_forward_sender, _vote_forward_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1541,12 +1550,14 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, - gossip_vote_sender, + vote_forward_sender, ); drop(verified_sender); - drop(vote_sender); + drop(gossip_verified_vote_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); poh_service.join().unwrap(); @@ -1565,7 +1576,7 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1580,19 +1591,22 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); - let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (vote_forward_sender, _vote_forward_receiver) = unbounded(); + let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + verified_gossip_vote_receiver, None, - gossip_vote_sender, + vote_forward_sender, ); trace!("sending bank"); drop(verified_sender); - drop(vote_sender); + drop(verified_gossip_sender); + drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -1632,7 +1646,8 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1655,7 +1670,8 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - vote_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, None, gossip_vote_sender, ); @@ -1695,7 +1711,8 @@ mod tests { .unwrap(); drop(verified_sender); - drop(vote_sender); + drop(tpu_vote_sender); + drop(gossip_verified_vote_sender); // wait until banking_stage to finish up all packets banking_stage.join().unwrap(); @@ -1776,6 +1793,7 @@ mod tests { verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -1802,8 +1820,9 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, + tpu_vote_receiver, vote_receiver, - 2, + 3, None, gossip_vote_sender, ); @@ -1818,6 +1837,7 @@ mod tests { }; drop(verified_sender); drop(vote_sender); + drop(tpu_vote_sender); // consume the entire entry_receiver, feed it into a new bank // check that the balance is what we expect. diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 40ca6ebb963957..8602010ce917a0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -328,7 +328,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")] +#[frozen_abi(digest = "2GhP3ypvWwVQBifYEgHtPCvATo75ZrcBQJEp7iw3tZem")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -2881,6 +2881,7 @@ pub struct Sockets { pub tvu_forwards: Vec, pub tpu: Vec, pub tpu_forwards: Vec, + pub tpu_vote: Vec, pub broadcast: Vec, pub repair: UdpSocket, pub retransmit_sockets: Vec, @@ -2907,6 +2908,7 @@ impl Node { let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -2926,7 +2928,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - unused: unused.local_addr().unwrap(), + tpu_vote: unused.local_addr().unwrap(), rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), @@ -2942,6 +2944,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast, repair, retransmit_sockets: vec![retransmit_socket], @@ -2982,6 +2985,7 @@ impl Node { let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range); let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range); + let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (repair_port, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); @@ -2998,7 +3002,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: SocketAddr::new(gossip_addr.ip(), rpc_port), rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -3016,6 +3020,7 @@ impl Node { tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], + tpu_vote: vec![tpu_vote], broadcast: vec![broadcast], repair, retransmit_sockets: vec![retransmit_socket], @@ -3045,6 +3050,9 @@ impl Node { let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); + let (tpu_vote_port, tpu_vote_sockets) = + multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + let (_, retransmit_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); @@ -3062,7 +3070,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), @@ -3079,6 +3087,7 @@ impl Node { tvu_forwards: tvu_forwards_sockets, tpu: tpu_sockets, tpu_forwards: tpu_forwards_sockets, + tpu_vote: tpu_vote_sockets, broadcast, repair, retransmit_sockets, diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 17630a38b07f55..cf9580351a18b0 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -25,7 +25,7 @@ pub struct ContactInfo { /// address to forward unprocessed transactions to pub tpu_forwards: SocketAddr, /// address to which to send bank state requests - pub unused: SocketAddr, + pub tpu_vote: SocketAddr, /// address to which to send JSON-RPC requests pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications @@ -73,7 +73,7 @@ impl Default for ContactInfo { repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), - unused: socketaddr_any!(), + tpu_vote: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: socketaddr_any!(), @@ -93,7 +93,7 @@ impl ContactInfo { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -123,7 +123,7 @@ impl ContactInfo { repair: addr, tpu: addr, tpu_forwards: addr, - unused: addr, + tpu_vote: addr, rpc: addr, rpc_pubsub: addr, serve_repair: addr, @@ -144,6 +144,7 @@ impl ContactInfo { let gossip = next_port(&bind_addr, 1); let tvu = next_port(&bind_addr, 2); let tpu_forwards = next_port(&bind_addr, 3); + let tpu_vote = next_port(&bind_addr, 1); let tvu_forwards = next_port(&bind_addr, 4); let repair = next_port(&bind_addr, 5); let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); @@ -157,7 +158,7 @@ impl ContactInfo { repair, tpu, tpu_forwards, - unused: "0.0.0.0:0".parse().unwrap(), + tpu_vote, rpc, rpc_pubsub, serve_repair, @@ -241,7 +242,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -253,7 +254,7 @@ mod tests { assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); - assert!(ci.unused.ip().is_multicast()); + assert!(ci.tpu_vote.ip().is_multicast()); assert!(ci.serve_repair.ip().is_multicast()); } #[test] @@ -266,7 +267,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -274,12 +275,12 @@ mod tests { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_with_socketaddr(&addr); assert_eq!(ci.tpu, addr); + assert_eq!(ci.tpu_vote.port(), 11); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert!(ci.unused.ip().is_unspecified()); assert_eq!(ci.serve_repair.port(), 16); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 3a5a0943b4c7b9..c3ad9d58d6f82a 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -24,40 +24,51 @@ impl FetchStage { pub fn new( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, coalesce_ms: u64, - ) -> (Self, PacketReceiver) { + ) -> (Self, PacketReceiver, PacketReceiver) { let (sender, receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); ( Self::new_with_sender( sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &sender, + &vote_sender, &poh_recorder, None, coalesce_ms, ), receiver, + vote_receiver, ) } + pub fn new_with_sender( sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, allocated_packet_limit: Option, coalesce_ms: u64, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); + let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect(); Self::new_multi_socket( tx_sockets, tpu_forwards_sockets, + tpu_vote_sockets, exit, &sender, + &vote_sender, &poh_recorder, allocated_packet_limit, coalesce_ms, @@ -102,8 +113,10 @@ impl FetchStage { fn new_multi_socket( sockets: Vec>, tpu_forwards_sockets: Vec>, + tpu_vote_sockets: Vec>, exit: &Arc, sender: &PacketSender, + vote_sender: &PacketSender, poh_recorder: &Arc>, limit: Option, coalesce_ms: u64, @@ -134,6 +147,17 @@ impl FetchStage { ) }); + let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| { + streamer::receiver( + socket, + &exit, + vote_sender.clone(), + recycler.clone(), + "fetch_vote_stage", + coalesce_ms, + ) + }); + let sender = sender.clone(); let poh_recorder = poh_recorder.clone(); @@ -155,7 +179,10 @@ impl FetchStage { }) .unwrap(); - let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect(); + let mut thread_hdls: Vec<_> = tpu_threads + .chain(tpu_forwards_threads) + .chain(tpu_vote_threads) + .collect(); thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 763117e0b02340..b031ba4df58ef8 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -713,7 +713,7 @@ mod tests { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + tpu_vote: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -801,7 +801,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr, @@ -830,7 +830,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + tpu_vote: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr2, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 600fb891e666b5..773f71f8f0894b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -37,6 +37,7 @@ pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, + vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, @@ -51,6 +52,7 @@ impl Tpu { retransmit_slots_receiver: RetransmitSlotsReceiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, + tpu_vote_sockets: Vec, broadcast_sockets: Vec, subscriptions: &Arc, transaction_status_sender: Option, @@ -69,11 +71,14 @@ impl Tpu { cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Self { let (packet_sender, packet_receiver) = channel(); + let (vote_packet_sender, vote_packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, tpu_forwards_sockets, + tpu_vote_sockets, &exit, &packet_sender, + &vote_packet_sender, &poh_recorder, // At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly // 20GB @@ -87,6 +92,13 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verified_sender, verifier) }; + let (vote_verified_sender, vote_verified_receiver) = unbounded(); + + let vote_sigverify_stage = { + let verifier = TransactionSigVerifier::default(); + SigVerifyStage::new(vote_packet_receiver, vote_verified_sender, verifier) + }; + let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( &exit, @@ -108,6 +120,7 @@ impl Tpu { &cluster_info, poh_recorder, verified_receiver, + vote_verified_receiver, verified_vote_packets_receiver, transaction_status_sender, replay_vote_sender, @@ -126,6 +139,7 @@ impl Tpu { Self { fetch_stage, sigverify_stage, + vote_sigverify_stage, banking_stage, cluster_info_vote_listener, broadcast_stage, @@ -136,6 +150,7 @@ impl Tpu { let results = vec![ self.fetch_stage.join(), self.sigverify_stage.join(), + self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), ]; diff --git a/core/src/validator.rs b/core/src/validator.rs index edac6531f3055b..88e178c06e4598 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -762,6 +762,7 @@ impl Validator { retransmit_slots_receiver, node.sockets.tpu, node.sockets.tpu_forwards, + node.sockets.tpu_vote, node.sockets.broadcast, &rpc_subscriptions, transaction_status_sender, diff --git a/validator/src/main.rs b/validator/src/main.rs index ae2768f695c3b2..45f62ed473c4ec 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -732,6 +732,9 @@ fn verify_reachable_ports( if ContactInfo::is_valid_address(&node.info.tpu_forwards) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); } + if ContactInfo::is_valid_address(&node.info.tpu_vote) { + udp_sockets.extend(node.sockets.tpu_vote.iter()); + } if ContactInfo::is_valid_address(&node.info.tvu) { udp_sockets.extend(node.sockets.tvu.iter()); udp_sockets.extend(node.sockets.broadcast.iter()); From 58bb8fc222cd2e26d69a369bac91022761ea4f56 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Sat, 18 Sep 2021 11:19:12 -0700 Subject: [PATCH 02/11] Add feature to send to tpu vote port --- core/src/banking_stage.rs | 35 +++++++++++++++++++++++------------ core/src/replay_stage.rs | 21 ++++++++++++++++++--- core/src/tvu.rs | 8 ++++++-- core/src/voting_service.rs | 19 +++++++++++++------ runtime/src/bank.rs | 5 +++++ sdk/src/feature_set.rs | 6 ++++++ 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1dc59fef1be363..a8a898620ddd88 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,6 +3,7 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::{ cluster_info::ClusterInfo, + contact_info::ContactInfo, data_budget::DataBudget, packet_hasher::PacketHasher, poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry}, @@ -56,7 +57,7 @@ use std::{ collections::{HashMap, VecDeque}, env, mem::size_of, - net::UdpSocket, + net::{SocketAddr, UdpSocket}, ops::DerefMut, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::mpsc::Receiver, @@ -1430,27 +1431,37 @@ pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, poh_recorder: &Mutex, ) -> Option { - if let Some(leader_pubkey) = poh_recorder - .lock() - .unwrap() - .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) - { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu) - } else { - None - } + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) } fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, - poh_recorder: &Arc>, + poh_recorder: &Mutex, +) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) +} + +pub(crate) fn next_leader_tpu_vote( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, ) -> Option { + next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) +} + +fn next_leader_x( + cluster_info: &ClusterInfo, + poh_recorder: &Mutex, + port_selector: F, +) -> Option +where + F: FnOnce(&ContactInfo) -> SocketAddr, +{ if let Some(leader_pubkey) = poh_recorder .lock() .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) { - cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) + cluster_info.lookup_contact_info(&leader_pubkey, port_selector) } else { None } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b115fd451eccf2..5b1799bafb777c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4882,7 +4882,12 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); let mut cursor = Cursor::default(); let (_, votes) = cluster_info.get_votes(&mut cursor); @@ -4939,7 +4944,12 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); let (_, votes) = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; @@ -5001,7 +5011,12 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); - crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info); + crate::voting_service::VotingService::handle_vote( + &cluster_info, + &poh_recorder, + vote_info, + false, + ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); let (_, votes) = cluster_info.get_votes(&mut cursor); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c6d4dd9ed680a4..92ed8e4cac4e9b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -268,8 +268,12 @@ impl Tvu { }; let (voting_sender, voting_receiver) = channel(); - let voting_service = - VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone()); + let voting_service = VotingService::new( + voting_receiver, + cluster_info.clone(), + poh_recorder.clone(), + bank_forks.clone(), + ); let replay_stage = ReplayStage::new( replay_stage_config, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 4df6e8d037a034..26e688d99e07bd 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,8 +1,9 @@ use crate::cluster_info::ClusterInfo; use crate::poh_recorder::PohRecorder; +use solana_runtime::bank_forks::BankForks; use solana_sdk::{clock::Slot, transaction::Transaction}; use std::{ - sync::{mpsc::Receiver, Arc, Mutex}, + sync::{mpsc::Receiver, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -38,12 +39,15 @@ impl VotingService { vote_receiver: Receiver, cluster_info: Arc, poh_recorder: Arc>, + bank_forks: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("sol-vote-service".to_string()) .spawn(move || { for vote_op in vote_receiver.iter() { - Self::handle_vote(&cluster_info, &poh_recorder, vote_op); + let rooted_bank = bank_forks.read().unwrap().root_bank().clone(); + let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled(); + Self::handle_vote(&cluster_info, &poh_recorder, vote_op, send_to_tpu_vote_port); } }) .unwrap(); @@ -54,11 +58,14 @@ impl VotingService { cluster_info: &ClusterInfo, poh_recorder: &Mutex, vote_op: VoteOp, + send_to_tpu_vote_port: bool, ) { - let _ = cluster_info.send_vote( - vote_op.tx(), - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), - ); + let target_address = if send_to_tpu_vote_port { + crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder) + } else { + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder) + }; + let _ = cluster_info.send_vote(vote_op.tx(), target_address); match vote_op { VoteOp::PushVote { tx, tower_slots } => { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f3005dc6ede895..f843067134c740 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4888,6 +4888,11 @@ impl Bank { .is_active(&feature_set::stakes_remove_delegation_if_inactive::id()) } + pub fn send_to_tpu_vote_port_enabled(&self) -> bool { + self.feature_set + .is_active(&feature_set::send_to_tpu_vote_port::id()) + } + // Check if the wallclock time from bank creation to now has exceeded the allotted // time for transaction processing pub fn should_bank_still_be_processing_txs( diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 5fe8748840cf2e..8cf1d2b5748839 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -174,6 +174,11 @@ pub mod stakes_remove_delegation_if_inactive { solana_sdk::declare_id!("HFpdDDNQjvcXnXKec697HDDsyk6tFoWS2o8fkxuhQZpL"); } +pub mod send_to_tpu_vote_port { + // todo: update + solana_sdk::declare_id!("3E3jV7v9VcdJL8iYZUMax9DiDno8j7EWUVbhm9RtShj3"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -216,6 +221,7 @@ lazy_static! { (spl_token_v2_set_authority_fix::id(), "spl-token set_authority fix"), (demote_program_write_locks::id(), "demote program write locks to readonly #19593"), (stakes_remove_delegation_if_inactive::id(), "remove delegations from stakes cache when inactive"), + (send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() From 0355a1c281dc12dd69c91862139ae379df0e1343 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Sat, 18 Sep 2021 10:59:13 -0700 Subject: [PATCH 03/11] Add vote rejecting sigverify mode --- core/src/banking_stage.rs | 2 +- core/src/cluster_info.rs | 3 +- core/src/cluster_info_vote_listener.rs | 5 +- core/src/sigverify.rs | 18 ++- core/src/tpu.rs | 2 +- perf/benches/sigverify.rs | 2 +- perf/src/sigverify.rs | 145 +++++++++++++++++++------ 7 files changed, 134 insertions(+), 43 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a8a898620ddd88..4f8fcbd09592b5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1616,7 +1616,7 @@ mod tests { ); trace!("sending bank"); drop(verified_sender); - drop(verified_gossip_sender); + drop(verified_gossip_vote_sender); drop(tpu_vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8602010ce917a0..b1089d8817f0e7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2919,7 +2919,6 @@ impl Node { let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let unused = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo { id: *pubkey, gossip: gossip_addr, @@ -2928,7 +2927,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - tpu_vote: unused.local_addr().unwrap(), + tpu_vote: tpu_vote.local_addr().unwrap(), rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 5d2262ba9082ab..6f87e022ea4656 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -347,7 +347,10 @@ impl ClusterInfoVoteListener { labels: Vec, ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { let mut msgs = packet::to_packets_chunked(&votes, 1); - sigverify::ed25519_verify_cpu(&mut msgs); + + // Votes should already be filtered by this point. + let reject_non_vote = false; + sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) .filter_map(|(label, vote, packet)| { diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 2103c1b3fa4671..1606e1023e5f88 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -17,6 +17,16 @@ pub use solana_perf::sigverify::{ pub struct TransactionSigVerifier { recycler: Recycler, recycler_out: Recycler>, + reject_non_vote: bool, +} + +impl TransactionSigVerifier { + pub fn new_reject_non_vote() -> Self { + TransactionSigVerifier { + reject_non_vote: true, + ..TransactionSigVerifier::default() + } + } } impl Default for TransactionSigVerifier { @@ -25,13 +35,19 @@ impl Default for TransactionSigVerifier { Self { recycler: Recycler::warmed(50, 4096, None, ""), recycler_out: Recycler::warmed(50, 4096, None, ""), + reject_non_vote: false, } } } impl SigVerifier for TransactionSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out); + sigverify::ed25519_verify( + &mut batch, + &self.recycler, + &self.recycler_out, + self.reject_non_vote, + ); batch } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 773f71f8f0894b..3ae2c8644c4ca7 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -95,7 +95,7 @@ impl Tpu { let (vote_verified_sender, vote_verified_receiver) = unbounded(); let vote_sigverify_stage = { - let verifier = TransactionSigVerifier::default(); + let verifier = TransactionSigVerifier::new_reject_non_vote(); SigVerifyStage::new(vote_packet_receiver, vote_verified_sender, verifier) }; diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 5f8aabe3020d0f..978b3102e6799e 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -19,7 +19,7 @@ fn bench_sigverify(bencher: &mut Bencher) { let recycler_out = Recycler::new_without_limit(""); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); + let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); }) } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 24e32ef7b81deb..8d1117a866059d 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -103,8 +103,8 @@ pub fn init() { } } -fn verify_packet(packet: &mut Packet) { - let packet_offsets = get_packet_offsets(packet, 0); +fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { + let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote); let mut sig_start = packet_offsets.sig_start as usize; let mut pubkey_start = packet_offsets.pubkey_start as usize; let msg_start = packet_offsets.msg_start as usize; @@ -158,10 +158,66 @@ pub fn batch_size(batches: &[Packets]) -> usize { batches.iter().map(|p| p.packets.len()).sum() } +// Return Err if non-vote packet +fn check_non_vote(packet: &Packet, packet_offsets: &PacketOffsets) -> Result<(), PacketError> { + if packet_offsets.sig_len != 1 { + return Err(PacketError::InvalidPubkeyLen); + } + + let mut reject_pubkey_start = packet_offsets.pubkey_start as usize; + let mut vote_index = None; + for i in 0..packet_offsets.pubkey_len { + let pubkey_end = reject_pubkey_start.saturating_add(size_of::()); + if &packet.data[reject_pubkey_start..pubkey_end] == solana_vote_program::id().as_ref() { + vote_index = Some(i); + break; + } + reject_pubkey_start = pubkey_end; + } + + if vote_index.is_none() { + return Err(PacketError::InvalidPubkeyLen); + } + + let pubkeys_end = (packet_offsets.pubkey_start as usize) + .saturating_add(size_of::().saturating_mul(packet_offsets.pubkey_len as usize)); + let hash_end = pubkeys_end.saturating_add(size_of::()); + let num_instructions_offset = hash_end; + + if hash_end.saturating_add(1) >= packet.meta.size { + return Err(PacketError::InvalidPubkeyLen); + } + + let (instructions_len, instructions_size) = + match decode_shortu16_len(&packet.data[num_instructions_offset..]) { + Ok((len, size)) => (len, size), + Err(_) => { + return Err(PacketError::InvalidPubkeyLen); + } + }; + + let instruction0_start = hash_end.saturating_add(instructions_size); + if instruction0_start > packet.meta.size { + return Err(PacketError::InvalidPubkeyLen); + } + + let vote_index = vote_index.unwrap(); + // First instruction should be the vote key + if packet.data[instruction0_start] as u32 != vote_index { + return Err(PacketError::InvalidPubkeyLen); + } + + if instructions_len != 1 { + return Err(PacketError::InvalidPubkeyLen); + } + Ok(()) +} + // internal function to be unit-tested; should be used only by get_packet_offsets fn do_get_packet_offsets( packet: &Packet, current_offset: usize, + reject_non_vote: bool, ) -> Result { // should have at least 1 signature, sig lengths and the message header let _ = 1usize @@ -240,17 +296,27 @@ fn do_get_packet_offsets( .checked_add(pubkey_start) .ok_or(PacketError::InvalidLen)?; - Ok(PacketOffsets::new( + let offsets = PacketOffsets::new( u32::try_from(sig_len_untrusted)?, u32::try_from(sig_start)?, u32::try_from(msg_start)?, u32::try_from(pubkey_start)?, u32::try_from(pubkey_len)?, - )) + ); + + if reject_non_vote { + check_non_vote(packet, &offsets)?; + } + + Ok(offsets) } -fn get_packet_offsets(packet: &mut Packet, current_offset: usize) -> PacketOffsets { - let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); +fn get_packet_offsets( + packet: &mut Packet, + current_offset: usize, + reject_non_vote: bool, +) -> PacketOffsets { + let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset, reject_non_vote); if let Ok(offsets) = unsanitized_packet_offsets { check_for_simple_vote_transaction(packet, &offsets, current_offset).ok(); offsets @@ -328,7 +394,11 @@ fn check_for_simple_vote_transaction( Ok(()) } -pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) -> TxOffsets { +pub fn generate_offsets( + batches: &mut [Packets], + recycler: &Recycler, + reject_non_vote: bool, +) -> TxOffsets { debug!("allocating.."); let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap(); signature_offsets.set_pinnable(); @@ -343,7 +413,7 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) batches.iter_mut().for_each(|p| { let mut sig_lens = Vec::new(); p.packets.iter_mut().for_each(|packet| { - let packet_offsets = get_packet_offsets(packet, current_offset); + let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); sig_lens.push(packet_offsets.sig_len); @@ -377,14 +447,16 @@ pub fn generate_offsets(batches: &mut [Packets], recycler: &Recycler) ) } -pub fn ed25519_verify_cpu(batches: &mut [Packets]) { +pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); PAR_THREAD_POOL.install(|| { - batches - .into_par_iter() - .for_each(|p| p.packets.par_iter_mut().for_each(|p| verify_packet(p))) + batches.into_par_iter().for_each(|p| { + p.packets + .par_iter_mut() + .for_each(|p| verify_packet(p, reject_non_vote)) + }) }); inc_new_counter_debug!("ed25519_verify_cpu", count); } @@ -464,10 +536,11 @@ pub fn ed25519_verify( batches: &mut [Packets], recycler: &Recycler, recycler_out: &Recycler>, + reject_non_vote: bool, ) { let api = perf_libs::api(); if api.is_none() { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let api = api.unwrap(); @@ -480,11 +553,11 @@ pub fn ed25519_verify( // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover if count < 64 { - return ed25519_verify_cpu(batches); + return ed25519_verify_cpu(batches, reject_non_vote); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = - generate_offsets(batches, recycler); + generate_offsets(batches, recycler, reject_non_vote); debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); @@ -599,7 +672,7 @@ mod tests { let message_data = tx.message_data(); let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0); + let packet_offsets = sigverify::get_packet_offsets(&mut packet, 0, false); assert_eq!( memfind(&tx_bytes, &tx.signatures[0].as_ref()), @@ -643,7 +716,7 @@ mod tests { let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!( unsanitized_packet_offsets, @@ -659,7 +732,7 @@ mod tests { let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!( unsanitized_packet_offsets, @@ -676,7 +749,7 @@ mod tests { packet.data[1] = 0xff; packet.meta.size = 2; - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidLen)); } @@ -691,10 +764,10 @@ mod tests { tx.message.header.num_required_signatures = NUM_SIG as u8; let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -727,10 +800,10 @@ mod tests { let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - verify_packet(&mut packet); + verify_packet(&mut packet, false); assert!(packet.meta.discard); packet.meta.discard = false; @@ -747,7 +820,7 @@ mod tests { // Make the signatures len huge packet.data[0] = 0x7f; - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidSignatureLen)); } @@ -762,7 +835,7 @@ mod tests { packet.data[2] = 0xff; packet.data[3] = 0xff; - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidShortVec)); } @@ -771,12 +844,12 @@ mod tests { let tx = test_tx(); let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); // make pubkey len huge packet.data[res.unwrap().pubkey_start as usize - 1] = 0x7f; - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); } @@ -795,7 +868,7 @@ mod tests { let mut tx = Transaction::new_unsigned(message); tx.signatures = vec![Signature::default()]; let packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0); + let res = sigverify::do_get_packet_offsets(&packet, 0, false); assert_eq!(res, Err(PacketError::PayerNotWritable)); } @@ -824,7 +897,7 @@ mod tests { // Just like get_packet_offsets, but not returning redundant information. fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize); + let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize, false); PacketOffsets::new( packet_offsets.sig_len, packet_offsets.sig_start - current_offset, @@ -905,7 +978,7 @@ mod tests { fn ed25519_verify(batches: &mut [Packets]) { let recycler = Recycler::new_without_limit(""); let recycler_out = Recycler::new_without_limit(""); - sigverify::ed25519_verify(batches, &recycler, &recycler_out); + sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); } #[test] @@ -1003,8 +1076,8 @@ mod tests { // verify from GPU verification pipeline (when GPU verification is enabled) are // equivalent to the CPU verification pipeline. let mut batches_cpu = batches.clone(); - sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); - ed25519_verify_cpu(&mut batches_cpu); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); + ed25519_verify_cpu(&mut batches_cpu, false); // check result batches @@ -1116,7 +1189,7 @@ mod tests { let mut tx = test_tx(); tx.message.instructions[0].data = vec![1, 2, 3]; let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(!packet.meta.is_simple_vote_tx); } @@ -1126,7 +1199,7 @@ mod tests { let mut tx = vote_tx(); tx.message.instructions[0].data = vec![1, 2, 3]; let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(packet.meta.is_simple_vote_tx); } @@ -1147,7 +1220,7 @@ mod tests { ], ); let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(!packet.meta.is_simple_vote_tx); } @@ -1170,7 +1243,7 @@ mod tests { .iter_mut() .enumerate() .for_each(|(index, mut packet)| { - let packet_offsets = do_get_packet_offsets(&packet, current_offset).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, current_offset, false).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, current_offset) .ok(); if index == 1 { From 2e7e9807b2ce3f40174b6317ca8d9b76e4ca9864 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 21 Sep 2021 16:05:36 -0500 Subject: [PATCH 04/11] use packet.meta.is_simple_vote_tx in place of deserialization --- core/src/banking_stage.rs | 146 ++++++++++++++++++++++++++++++-------- perf/src/sigverify.rs | 3 +- 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4f8fcbd09592b5..3204b26da438a0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1040,10 +1040,10 @@ impl BankingStage { .iter() .filter_map(|tx_index| { let p = &msgs.packets[*tx_index]; - let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; - if votes_only && !solana_runtime::bank::is_simple_vote_transaction(&tx) { + if votes_only && !p.meta.is_simple_vote_tx { return None; } + let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; tx.verify_precompiles(libsecp256k1_0_5_upgrade_enabled) .ok()?; let message_bytes = Self::packet_message(p)?; @@ -1532,6 +1532,7 @@ mod tests { transaction::TransactionError, }; use solana_transaction_status::TransactionWithStatusMeta; + use solana_vote_program::vote_transaction; use std::{ net::SocketAddr, path::Path, @@ -2945,36 +2946,123 @@ mod tests { ); } + #[cfg(test)] + fn make_test_packets( + transactions: Vec, + vote_indexes: Vec, + ) -> (Packets, Vec) { + let capacity = transactions.len(); + let mut packets = Packets::with_capacity(capacity); + let mut packet_indexes = Vec::with_capacity(capacity); + packets.packets.resize(capacity, Packet::default()); + for (index, tx) in transactions.iter().enumerate() { + Packet::populate_packet(&mut packets.packets[index], None, tx).ok(); + packet_indexes.push(index); + } + for index in vote_indexes.iter() { + packets.packets[*index].meta.is_simple_vote_tx = true; + } + (packets, packet_indexes) + } + #[test] fn test_transactions_from_packets() { - use solana_vote_program::vote_state::Vote; - solana_logger::setup(); - let mut vote_packet = Packet::default(); - let vote_instruction = solana_vote_program::vote_instruction::vote( - &Pubkey::new_unique(), - &Pubkey::new_unique(), - Vote::default(), - ); - let vote_transaction = - Transaction::new_with_payer(&[vote_instruction], Some(&Pubkey::new_unique())); - Packet::populate_packet(&mut vote_packet, None, &vote_transaction).unwrap(); - let mut non_vote = Packet::default(); - let tx = system_transaction::transfer( - &Keypair::new(), - &Pubkey::new_unique(), - 2, + let keypair = Keypair::new(); + let transfer_tx = + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); + let vote_tx = vote_transaction::new_vote_transaction( + vec![42], + Hash::default(), Hash::default(), + &keypair, + &keypair, + &keypair, + None, ); - Packet::populate_packet(&mut non_vote, None, &tx).unwrap(); - let msgs = Packets::new(vec![non_vote, vote_packet]); - let packet_indexes = [0, 1]; - let (transactions, _transaction_to_packet_indexes) = - BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, true); - assert_eq!(transactions.len(), 1); - assert!(!transactions[0].transaction().signatures.is_empty()); - - let (transactions, _transaction_to_packet_indexes) = - BankingStage::transactions_from_packets(&msgs, &packet_indexes, false, false); - assert_eq!(transactions.len(), 2); + + // packets with no votes + { + let vote_indexes = vec![]; + let (packets, packet_indexes) = + make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 1], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(0, txs.len()); + assert_eq!(0, tx_packet_index.len()); + } + + // packets with some votes + { + let vote_indexes = vec![0, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], + vote_indexes, + ); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(2, txs.len()); + assert_eq!(vec![0, 2], tx_packet_index); + } + + // packets with all votes + { + let vote_indexes = vec![0, 1, 2]; + let (packets, packet_indexes) = make_test_packets( + vec![vote_tx.clone(), vote_tx.clone(), vote_tx], + vote_indexes, + ); + + let mut votes_only = false; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + + votes_only = true; + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + false, + votes_only, + ); + assert_eq!(3, txs.len()); + assert_eq!(vec![0, 1, 2], tx_packet_index); + } } } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 8d1117a866059d..e6ab6b71a9161c 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -897,7 +897,8 @@ mod tests { // Just like get_packet_offsets, but not returning redundant information. fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = sigverify::get_packet_offsets(&mut packet, current_offset as usize, false); + let packet_offsets = + sigverify::get_packet_offsets(&mut packet, current_offset as usize, false); PacketOffsets::new( packet_offsets.sig_len, packet_offsets.sig_start - current_offset, From 06d60c67327ab1e793ca39038d7efc5edd3fbe2e Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 21 Sep 2021 22:12:12 -0500 Subject: [PATCH 05/11] consolidate code that identifies vote tx atcommon path for cpu and gpu --- perf/benches/sigverify.rs | 2 +- perf/src/sigverify.rs | 105 ++++++++------------------------------ runtime/src/bank.rs | 2 +- 3 files changed, 24 insertions(+), 85 deletions(-) diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 978b3102e6799e..23c7c2c35da7fa 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -34,6 +34,6 @@ fn bench_get_offsets(bencher: &mut Bencher) { let recycler = Recycler::new_without_limit(""); // verify packets bencher.iter(|| { - let _ans = sigverify::generate_offsets(&mut batches, &recycler); + let _ans = sigverify::generate_offsets(&mut batches, &recycler, false); }) } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e6ab6b71a9161c..cb3533372e98d9 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -158,66 +158,10 @@ pub fn batch_size(batches: &[Packets]) -> usize { batches.iter().map(|p| p.packets.len()).sum() } -// Return Err if non-vote packet -fn check_non_vote(packet: &Packet, packet_offsets: &PacketOffsets) -> Result<(), PacketError> { - if packet_offsets.sig_len != 1 { - return Err(PacketError::InvalidPubkeyLen); - } - - let mut reject_pubkey_start = packet_offsets.pubkey_start as usize; - let mut vote_index = None; - for i in 0..packet_offsets.pubkey_len { - let pubkey_end = reject_pubkey_start.saturating_add(size_of::()); - if &packet.data[reject_pubkey_start..pubkey_end] == solana_vote_program::id().as_ref() { - vote_index = Some(i); - break; - } - reject_pubkey_start = pubkey_end; - } - - if vote_index.is_none() { - return Err(PacketError::InvalidPubkeyLen); - } - - let pubkeys_end = (packet_offsets.pubkey_start as usize) - .saturating_add(size_of::().saturating_mul(packet_offsets.pubkey_len as usize)); - let hash_end = pubkeys_end.saturating_add(size_of::()); - let num_instructions_offset = hash_end; - - if hash_end.saturating_add(1) >= packet.meta.size { - return Err(PacketError::InvalidPubkeyLen); - } - - let (instructions_len, instructions_size) = - match decode_shortu16_len(&packet.data[num_instructions_offset..]) { - Ok((len, size)) => (len, size), - Err(_) => { - return Err(PacketError::InvalidPubkeyLen); - } - }; - - let instruction0_start = hash_end.saturating_add(instructions_size); - if instruction0_start > packet.meta.size { - return Err(PacketError::InvalidPubkeyLen); - } - - let vote_index = vote_index.unwrap(); - // First instruction should be the vote key - if packet.data[instruction0_start] as u32 != vote_index { - return Err(PacketError::InvalidPubkeyLen); - } - - if instructions_len != 1 { - return Err(PacketError::InvalidPubkeyLen); - } - Ok(()) -} - // internal function to be unit-tested; should be used only by get_packet_offsets fn do_get_packet_offsets( packet: &Packet, current_offset: usize, - reject_non_vote: bool, ) -> Result { // should have at least 1 signature, sig lengths and the message header let _ = 1usize @@ -296,19 +240,13 @@ fn do_get_packet_offsets( .checked_add(pubkey_start) .ok_or(PacketError::InvalidLen)?; - let offsets = PacketOffsets::new( + Ok(PacketOffsets::new( u32::try_from(sig_len_untrusted)?, u32::try_from(sig_start)?, u32::try_from(msg_start)?, u32::try_from(pubkey_start)?, u32::try_from(pubkey_len)?, - ); - - if reject_non_vote { - check_non_vote(packet, &offsets)?; - } - - Ok(offsets) + )) } fn get_packet_offsets( @@ -316,14 +254,15 @@ fn get_packet_offsets( current_offset: usize, reject_non_vote: bool, ) -> PacketOffsets { - let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset, reject_non_vote); + let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); if let Ok(offsets) = unsanitized_packet_offsets { check_for_simple_vote_transaction(packet, &offsets, current_offset).ok(); - offsets - } else { - // force sigverify to fail by returning zeros - PacketOffsets::new(0, 0, 0, 0, 0) + if !reject_non_vote || packet.meta.is_simple_vote_tx { + return offsets; + } } + // force sigverify to fail by returning zeros + PacketOffsets::new(0, 0, 0, 0, 0) } fn check_for_simple_vote_transaction( @@ -716,7 +655,7 @@ mod tests { let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0, false); + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!( unsanitized_packet_offsets, @@ -732,7 +671,7 @@ mod tests { let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0, false); + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!( unsanitized_packet_offsets, @@ -749,7 +688,7 @@ mod tests { packet.data[1] = 0xff; packet.meta.size = 2; - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidLen)); } @@ -764,7 +703,7 @@ mod tests { tx.message.header.num_required_signatures = NUM_SIG as u8; let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); verify_packet(&mut packet, false); @@ -800,7 +739,7 @@ mod tests { let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); verify_packet(&mut packet, false); @@ -820,7 +759,7 @@ mod tests { // Make the signatures len huge packet.data[0] = 0x7f; - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidSignatureLen)); } @@ -835,7 +774,7 @@ mod tests { packet.data[2] = 0xff; packet.data[3] = 0xff; - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidShortVec)); } @@ -844,12 +783,12 @@ mod tests { let tx = test_tx(); let mut packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); // make pubkey len huge packet.data[res.unwrap().pubkey_start as usize - 1] = 0x7f; - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); } @@ -868,7 +807,7 @@ mod tests { let mut tx = Transaction::new_unsigned(message); tx.signatures = vec![Signature::default()]; let packet = sigverify::make_packet_from_transaction(tx); - let res = sigverify::do_get_packet_offsets(&packet, 0, false); + let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::PayerNotWritable)); } @@ -1190,7 +1129,7 @@ mod tests { let mut tx = test_tx(); tx.message.instructions[0].data = vec![1, 2, 3]; let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(!packet.meta.is_simple_vote_tx); } @@ -1200,7 +1139,7 @@ mod tests { let mut tx = vote_tx(); tx.message.instructions[0].data = vec![1, 2, 3]; let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(packet.meta.is_simple_vote_tx); } @@ -1221,7 +1160,7 @@ mod tests { ], ); let mut packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = do_get_packet_offsets(&packet, 0, false).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); assert!(!packet.meta.is_simple_vote_tx); } @@ -1244,7 +1183,7 @@ mod tests { .iter_mut() .enumerate() .for_each(|(index, mut packet)| { - let packet_offsets = do_get_packet_offsets(&packet, current_offset, false).unwrap(); + let packet_offsets = do_get_packet_offsets(&packet, current_offset).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, current_offset) .ok(); if index == 1 { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f843067134c740..7447c3f0f1188a 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5162,7 +5162,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } -pub fn is_simple_vote_transaction(transaction: &Transaction) -> bool { +fn is_simple_vote_transaction(transaction: &Transaction) -> bool { if transaction.message.instructions.len() == 1 { let instruction = &transaction.message.instructions[0]; let program_pubkey = From 892d35bdd92a98313385608e351e5e6d7a166cad Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 12:12:34 -0500 Subject: [PATCH 06/11] new key for feature set --- sdk/src/feature_set.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 8cf1d2b5748839..b1e45c099c234e 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -175,8 +175,7 @@ pub mod stakes_remove_delegation_if_inactive { } pub mod send_to_tpu_vote_port { - // todo: update - solana_sdk::declare_id!("3E3jV7v9VcdJL8iYZUMax9DiDno8j7EWUVbhm9RtShj3"); + solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); } lazy_static! { From 1055778b3e3bac11544cc210fb1e72c996bb3a86 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 11:18:38 -0500 Subject: [PATCH 07/11] banking forward tpu vote --- core/src/banking_stage.rs | 42 +++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3204b26da438a0..ad7e3209b0585d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -244,6 +244,12 @@ pub enum BufferedPacketsDecision { Hold, } +pub enum ForwardOption { + NotForward, + ForwardTpuVote, + ForwardTransaction, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -292,13 +298,13 @@ impl BankingStage { assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, enable_forwarding) = match i.cmp(&(num_threads - 2)) { - std::cmp::Ordering::Less => (verified_receiver.clone(), true), - std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), false), + let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) { + std::cmp::Ordering::Less => (verified_receiver.clone(), ForwardOption::ForwardTransaction), + std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), ForwardOption::ForwardTpuVote), std::cmp::Ordering::Greater => { // Disable forwarding of vote transactions // from gossip. Note - votes can also arrive from tpu - (verified_vote_receiver.clone(), false) + (verified_vote_receiver.clone(), ForwardOption::NotForward) } }; @@ -319,7 +325,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - enable_forwarding, + forward_option, i, batch_limit, transaction_status_sender, @@ -525,7 +531,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, - enable_forwarding: bool, + forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, @@ -575,7 +581,7 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -586,7 +592,7 @@ impl BankingStage { } BufferedPacketsDecision::ForwardAndHold => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -601,7 +607,7 @@ impl BankingStage { } fn handle_forwarding( - enable_forwarding: bool, + forward_option: &ForwardOption, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, poh_recorder: &Arc>, @@ -609,14 +615,12 @@ impl BankingStage { hold: bool, data_budget: &DataBudget, ) { - if !enable_forwarding { - if !hold { - buffered_packets.clear(); - } - return; - } - - let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) { + let addr = match forward_option { + ForwardOption::NotForward => {if !hold { buffered_packets.clear(); } return}, + ForwardOption::ForwardTransaction => next_leader_tpu_forwards(cluster_info, poh_recorder), + ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + }; + let addr = match addr { Some(addr) => addr, None => return, }; @@ -638,7 +642,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - enable_forwarding: bool, + forward_option: ForwardOption, id: u32, batch_limit: usize, transaction_status_sender: Option, @@ -658,7 +662,7 @@ impl BankingStage { &poh_recorder, cluster_info, &mut buffered_packets, - enable_forwarding, + &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, &banking_stage_stats, From 0870fff0b08044b7353c051e792e22f419640bbc Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 12:28:39 -0500 Subject: [PATCH 08/11] add tpu vote port to dockerfile and other review changes --- core/src/banking_stage.rs | 31 ++++++++++++++++++++++--------- core/src/cluster_info.rs | 7 ++++--- core/src/contact_info.rs | 3 ++- core/src/tpu.rs | 17 +++++++++++------ sdk/docker-solana/Dockerfile | 2 ++ 5 files changed, 41 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ad7e3209b0585d..a23cedf0fc65de 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -86,7 +86,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; -const MIN_THREADS_VOTES: u32 = 2; +const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; #[derive(Debug, Default)] @@ -244,6 +244,7 @@ pub enum BufferedPacketsDecision { Hold, } +#[derive(Debug, Clone)] pub enum ForwardOption { NotForward, ForwardTpuVote, @@ -279,7 +280,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, - vote_verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -295,12 +296,17 @@ impl BankingStage { ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. - assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING); + assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) { - std::cmp::Ordering::Less => (verified_receiver.clone(), ForwardOption::ForwardTransaction), - std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), ForwardOption::ForwardTpuVote), + std::cmp::Ordering::Less => { + (verified_receiver.clone(), ForwardOption::ForwardTransaction) + } + std::cmp::Ordering::Equal => ( + tpu_verified_vote_receiver.clone(), + ForwardOption::ForwardTpuVote, + ), std::cmp::Ordering::Greater => { // Disable forwarding of vote transactions // from gossip. Note - votes can also arrive from tpu @@ -616,8 +622,15 @@ impl BankingStage { data_budget: &DataBudget, ) { let addr = match forward_option { - ForwardOption::NotForward => {if !hold { buffered_packets.clear(); } return}, - ForwardOption::ForwardTransaction => next_leader_tpu_forwards(cluster_info, poh_recorder), + ForwardOption::NotForward => { + if !hold { + buffered_packets.clear(); + } + return; + } + ForwardOption::ForwardTransaction => { + next_leader_tpu_forwards(cluster_info, poh_recorder) + } ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), }; let addr = match addr { @@ -716,7 +729,7 @@ impl BankingStage { env::var("SOLANA_BANKING_THREADS") .map(|x| x.parse().unwrap_or(NUM_THREADS)) .unwrap_or(NUM_THREADS), - MIN_THREADS_VOTES + MIN_THREADS_BANKING, + NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING, ) } @@ -2821,7 +2834,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let data_budget = DataBudget::default(); BankingStage::handle_forwarding( - true, + &ForwardOption::ForwardTransaction, &cluster_info, &mut unprocessed_packets, &poh_recorder, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b1089d8817f0e7..b9797bcc8154ed 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -824,7 +824,7 @@ impl ClusterInfo { } let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { @@ -839,6 +839,7 @@ impl ClusterInfo { "-".to_string() }, addr_to_string(&ip_addr, &node.gossip), + addr_to_string(&ip_addr, &node.tpu_vote), addr_to_string(&ip_addr, &node.tpu), addr_to_string(&ip_addr, &node.tpu_forwards), addr_to_string(&ip_addr, &node.tvu), @@ -853,9 +854,9 @@ impl ClusterInfo { format!( "IP Address |Age(ms)| Node identifier \ - | Version |Gossip| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ + | Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\ ------------------+-------+----------------------------------------------+---------+\ - ------+------+------+------+------+------+------+--------\n\ + ------+------+-------+------+------+------+------+------+--------\n\ {}\ Nodes: {}{}{}", nodes.join(""), diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index cf9580351a18b0..98baf8f249a797 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -144,12 +144,12 @@ impl ContactInfo { let gossip = next_port(&bind_addr, 1); let tvu = next_port(&bind_addr, 2); let tpu_forwards = next_port(&bind_addr, 3); - let tpu_vote = next_port(&bind_addr, 1); let tvu_forwards = next_port(&bind_addr, 4); let repair = next_port(&bind_addr, 5); let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); let serve_repair = next_port(&bind_addr, 6); + let tpu_vote = next_port(&bind_addr, 7); Self { id: *pubkey, gossip, @@ -307,6 +307,7 @@ mod tests { assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); + assert_eq!(d1.tpu_vote, socketaddr!("127.0.0.1:1241")); } #[test] diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3ae2c8644c4ca7..c73934fd247206 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -92,18 +92,23 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verified_sender, verifier) }; - let (vote_verified_sender, vote_verified_receiver) = unbounded(); + let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); - SigVerifyStage::new(vote_packet_receiver, vote_verified_sender, verifier) + SigVerifyStage::new( + vote_packet_receiver, + verified_tpu_vote_packets_sender, + verifier, + ) }; - let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded(); + let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = + unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( &exit, cluster_info.clone(), - verified_vote_packets_sender, + verified_gossip_vote_packets_sender, &poh_recorder, vote_tracker, bank_forks, @@ -120,8 +125,8 @@ impl Tpu { &cluster_info, poh_recorder, verified_receiver, - vote_verified_receiver, - verified_vote_packets_receiver, + verified_tpu_vote_packets_receiver, + verified_gossip_vote_packets_receiver, transaction_status_sender, replay_vote_sender, ); diff --git a/sdk/docker-solana/Dockerfile b/sdk/docker-solana/Dockerfile index 1beecc8ae8ef2d..6452efdcc509eb 100644 --- a/sdk/docker-solana/Dockerfile +++ b/sdk/docker-solana/Dockerfile @@ -30,6 +30,8 @@ EXPOSE 8006/udp EXPOSE 8007/udp # broadcast EXPOSE 8008/udp +# tpu_vote +EXPOSE 8009/udp RUN apt update && \ apt-get install -y bzip2 libssl-dev && \ From 0b3efdc753c6e5543976c4fe2972e6c272b8e279 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 16:37:48 -0500 Subject: [PATCH 09/11] ignore zeroize(drop) --- ci/do-audit.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ci/do-audit.sh b/ci/do-audit.sh index cbe09be77500b9..bbd45ac2c2dd17 100755 --- a/ci/do-audit.sh +++ b/ci/do-audit.sh @@ -52,5 +52,7 @@ cargo_audit_ignores=( # https://github.com/alexcrichton/tar-rs/issues/238 --ignore RUSTSEC-2021-0080 + # zeroize_derive: `#[zeroize(drop)]` doesn't implement `Drop` for `enum`s + --ignore RUSTSEC-2021-0115 ) scripts/cargo-for-all-lock-files.sh stable audit "${cargo_audit_ignores[@]}" From ce7f774c005acf9bfbe48ffd220fcf31c4eeb911 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 24 Sep 2021 15:21:08 -0700 Subject: [PATCH 10/11] Simplify thread id compare --- core/src/banking_stage.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a23cedf0fc65de..8cadeecbfd115e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -299,19 +299,17 @@ impl BankingStage { assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) { - std::cmp::Ordering::Less => { - (verified_receiver.clone(), ForwardOption::ForwardTransaction) - } - std::cmp::Ordering::Equal => ( - tpu_verified_vote_receiver.clone(), - ForwardOption::ForwardTpuVote, - ), - std::cmp::Ordering::Greater => { + let (verified_receiver, forward_option) = match i { + 0 => { // Disable forwarding of vote transactions // from gossip. Note - votes can also arrive from tpu (verified_vote_receiver.clone(), ForwardOption::NotForward) } + 1 => ( + tpu_verified_vote_receiver.clone(), + ForwardOption::ForwardTpuVote, + ), + _ => (verified_receiver.clone(), ForwardOption::ForwardTransaction), }; let poh_recorder = poh_recorder.clone(); From 69334e2747d71204966759396220180371200159 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 18:37:57 -0500 Subject: [PATCH 11/11] fix a test --- core/src/contact_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 98baf8f249a797..3523dc7933f714 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -275,7 +275,7 @@ mod tests { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_with_socketaddr(&addr); assert_eq!(ci.tpu, addr); - assert_eq!(ci.tpu_vote.port(), 11); + assert_eq!(ci.tpu_vote.port(), 17); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tpu_forwards.port(), 13);