From a39df7ee5dd6923ba172a301ebb0d9c9792d170a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 15 Jun 2020 07:36:08 +0900 Subject: [PATCH] Fix udp port check retry and check all udp ports (#10385) * Don't start if udp port is really closed * Fully check all udp ports * Remove test code....... * Add tests and adjust impl a bit * Add comment * Move comment a bit * Move a bit * clean ups --- net-utils/src/ip_echo_server.rs | 6 +- net-utils/src/lib.rs | 235 +++++++++++++++++++++++++------- validator/src/main.rs | 28 ++-- 3 files changed, 199 insertions(+), 70 deletions(-) diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 2e9f7cce2bb0a5..7a3d17967dc369 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -7,10 +7,12 @@ use tokio_codec::{BytesCodec, Decoder}; pub type IpEchoServer = Runtime; +pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4; + #[derive(Serialize, Deserialize, Default)] pub(crate) struct IpEchoServerMessage { - tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde - udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde + tcp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde + udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde } impl IpEchoServerMessage { diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index e18f86a3e77230..d08e641b8f9b27 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -2,6 +2,7 @@ use log::*; use rand::{thread_rng, Rng}; use socket2::{Domain, SockAddr, Socket, Type}; +use std::collections::{BTreeMap, BTreeSet}; use std::io::{self, Read, Write}; use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; use std::sync::mpsc::channel; @@ -9,7 +10,7 @@ use std::time::Duration; mod ip_echo_server; use ip_echo_server::IpEchoServerMessage; -pub use ip_echo_server::{ip_echo_server, IpEchoServer}; +pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE}; /// A data type representing a public Udp socket pub struct UdpSocketPair { @@ -92,34 +93,36 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result, udp_sockets: &[&UdpSocket], + timeout: u64, + udp_retry_count: usize, ) -> bool { - let udp_ports: Vec<_> = udp_sockets - .iter() - .map(|udp_socket| udp_socket.local_addr().unwrap().port()) - .collect(); - info!( - "Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}", - tcp_listeners, udp_ports, ip_echo_server_addr + "Checking that tcp ports {:?} from {:?}", + tcp_listeners, ip_echo_server_addr ); let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect(); let _ = ip_echo_server_request( ip_echo_server_addr, - IpEchoServerMessage::new(&tcp_ports, &udp_ports), + IpEchoServerMessage::new(&tcp_ports, &[]), ) .map_err(|err| warn!("ip_echo_server request failed: {}", err)); let mut ok = true; + let timeout = Duration::from_secs(timeout); // Wait for a connection to open on each TCP port for (port, tcp_listener) in tcp_listeners { let (sender, receiver) = channel(); - std::thread::spawn(move || { + let listening_addr = tcp_listener.local_addr().unwrap(); + let thread_handle = std::thread::spawn(move || { debug!("Waiting for incoming connection on tcp/{}", port); match tcp_listener.incoming().next() { Some(_) => sender @@ -128,7 +131,7 @@ pub fn verify_reachable_ports( None => warn!("tcp incoming failed"), } }); - match receiver.recv_timeout(Duration::from_secs(5)) { + match receiver.recv_timeout(timeout) { Ok(_) => { info!("tcp/{} is reachable", port); } @@ -137,9 +140,16 @@ pub fn verify_reachable_ports( "Received no response at tcp/{}, check your port configuration: {}", port, err ); + // Ugh, std rustc doesn't provide acceptng with timeout or restoring original + // nonblocking-status of sockets because of lack of getter, only the setter... + // So, to close the thread cleanly, just connect from here. + // ref: https://github.com/rust-lang/rust/issues/31615 + TcpStream::connect_timeout(&listening_addr, timeout).unwrap(); ok = false; } } + // ensure to reap the thread + thread_handle.join().unwrap(); } if !ok { @@ -147,51 +157,110 @@ pub fn verify_reachable_ports( return ok; } - for _udp_retries in 0..5 { - // Wait for a datagram to arrive at each UDP port - for udp_socket in udp_sockets { - let port = udp_socket.local_addr().unwrap().port(); - let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket"); - let (sender, receiver) = channel(); - std::thread::spawn(move || { - let mut buf = [0; 1]; - debug!("Waiting for incoming datagram on udp/{}", port); - match udp_socket.recv(&mut buf) { - Ok(_) => sender - .send(()) - .unwrap_or_else(|err| warn!("send failure: {}", err)), - Err(err) => warn!("udp recv failure: {}", err), - } - }); - match receiver.recv_timeout(Duration::from_secs(5)) { - Ok(_) => { - info!("udp/{} is reachable", port); - } - Err(err) => { - error!( - "Received no response at udp/{}, check your port configuration: {}", - port, err - ); - ok = false; - } + let mut udp_ports: BTreeMap<_, _> = BTreeMap::new(); + udp_sockets.iter().for_each(|udp_socket| { + let port = udp_socket.local_addr().unwrap().port(); + udp_ports + .entry(port) + .or_insert_with(Vec::new) + .push(udp_socket); + }); + let udp_ports: Vec<_> = udp_ports.into_iter().collect(); + + info!( + "Checking that udp ports {:?} are reachable from {:?}", + udp_ports.iter().map(|(port, _)| port).collect::>(), + ip_echo_server_addr + ); + + 'outer: for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) { + ok = false; + + for udp_remaining_retry in (0_usize..udp_retry_count).rev() { + let (checked_ports, checked_socket_iter) = ( + checked_ports_and_sockets + .iter() + .map(|(port, _)| *port) + .collect::>(), + checked_ports_and_sockets + .iter() + .map(|(_, sockets)| sockets) + .flatten(), + ); + + let _ = ip_echo_server_request( + ip_echo_server_addr, + IpEchoServerMessage::new(&[], &checked_ports), + ) + .map_err(|err| warn!("ip_echo_server request failed: {}", err)); + + // Spawn threads at once! + let thread_handles: Vec<_> = checked_socket_iter + .map(|udp_socket| { + let port = udp_socket.local_addr().unwrap().port(); + let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket"); + std::thread::spawn(move || { + let mut buf = [0; 1]; + let original_read_timeout = udp_socket.read_timeout().unwrap(); + udp_socket.set_read_timeout(Some(timeout)).unwrap(); + let recv_result = udp_socket.recv(&mut buf); + debug!( + "Waited for incoming datagram on udp/{}: {:?}", + port, recv_result + ); + udp_socket.set_read_timeout(original_read_timeout).unwrap(); + recv_result.map(|_| port).ok() + }) + }) + .collect(); + + // Now join threads! + // Separate from the above by collect()-ing as an intermediately step to make the iterator + // eager not lazy so that joining happens here at once after creating bunch of threads + // at once. + let reachable_ports: BTreeSet<_> = thread_handles + .into_iter() + .filter_map(|t| t.join().unwrap()) + .collect(); + + if reachable_ports.len() == checked_ports.len() { + info!( + "checked udp ports: {:?}, reachable udp ports: {:?}", + checked_ports, reachable_ports + ); + ok = true; + break; + } else if udp_remaining_retry > 0 { + // Might have lost a UDP packet, retry a couple times + error!( + "checked udp ports: {:?}, reachable udp ports: {:?}", + checked_ports, reachable_ports + ); + error!("There are some udp ports with no response!! Retrying..."); + } else { + error!("Maximum retry count is reached...."); + break 'outer; } } - if ok { - break; - } - ok = true; - - // Might have lost a UDP packet, retry a couple times - let _ = ip_echo_server_request( - ip_echo_server_addr, - IpEchoServerMessage::new(&[], &udp_ports), - ) - .map_err(|err| warn!("ip_echo_server request failed: {}", err)); } ok } +pub fn verify_reachable_ports( + ip_echo_server_addr: &SocketAddr, + tcp_listeners: Vec<(u16, TcpListener)>, + udp_sockets: &[&UdpSocket], +) -> bool { + do_verify_reachable_ports( + ip_echo_server_addr, + tcp_listeners, + udp_sockets, + DEFAULT_TIMEOUT_SECS, + DEFAULT_RETRY_COUNT, + ) +} + pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr { if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { @@ -511,7 +580,25 @@ mod tests { } #[test] - fn test_get_public_ip_addr() { + fn test_get_public_ip_addr_none() { + solana_logger::setup(); + let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + let (_server_port, (server_udp_socket, server_tcp_listener)) = + bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + + let _runtime = ip_echo_server(server_tcp_listener); + + let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); + assert_eq!( + get_public_ip_addr(&server_ip_echo_addr), + parse_host("127.0.0.1"), + ); + + assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],)); + } + + #[test] + fn test_get_public_ip_addr_reachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let (_server_port, (server_udp_socket, server_tcp_listener)) = @@ -533,4 +620,50 @@ mod tests { &[&client_udp_socket], )); } + + #[test] + fn test_get_public_ip_addr_tcp_unreachable() { + solana_logger::setup(); + let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + let (_server_port, (server_udp_socket, _server_tcp_listener)) = + bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + + // make the socket unreachable by not running the ip echo server! + + let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); + + let (correct_client_port, (_client_udp_socket, client_tcp_listener)) = + bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + + assert!(!do_verify_reachable_ports( + &server_ip_echo_addr, + vec![(correct_client_port, client_tcp_listener)], + &[], + 2, + 3, + )); + } + + #[test] + fn test_get_public_ip_addr_udp_unreachable() { + solana_logger::setup(); + let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + let (_server_port, (server_udp_socket, _server_tcp_listener)) = + bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + + // make the socket unreachable by not running the ip echo server! + + let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); + + let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) = + bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + + assert!(!do_verify_reachable_ports( + &server_ip_echo_addr, + vec![], + &[&client_udp_socket], + 2, + 3, + )); + } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 220405aaa609fb..939c6e6a6a1475 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1110,23 +1110,17 @@ pub fn main() { } if let Some(ref cluster_entrypoint) = cluster_entrypoint { - let udp_sockets = [ - node.sockets.tpu.first(), - /* - Enable these ports when `IpEchoServerMessage` supports more than 4 UDP ports: - node.sockets.tpu_forwards.first(), - node.sockets.tvu.first(), - node.sockets.tvu_forwards.first(), - node.sockets.broadcast.first(), - node.sockets.retransmit_sockets.first(), - */ - Some(&node.sockets.gossip), - Some(&node.sockets.repair), - Some(&node.sockets.serve_repair), - ] - .iter() - .filter_map(|x| *x) - .collect::>(); + let mut udp_sockets = vec![ + &node.sockets.gossip, + &node.sockets.repair, + &node.sockets.serve_repair, + ]; + udp_sockets.extend(node.sockets.tpu.iter()); + udp_sockets.extend(node.sockets.tpu_forwards.iter()); + udp_sockets.extend(node.sockets.tvu.iter()); + udp_sockets.extend(node.sockets.tvu_forwards.iter()); + udp_sockets.extend(node.sockets.broadcast.iter()); + udp_sockets.extend(node.sockets.retransmit_sockets.iter()); let mut tcp_listeners = vec![]; if !private_rpc {