Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix udp port check retry and check all udp ports #10385

Merged
merged 8 commits into from
Jun 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions net-utils/src/ip_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use tokio_codec::{BytesCodec, Decoder};

pub type IpEchoServer = Runtime;

pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;

#[derive(Serialize, Deserialize, Default)]
pub(crate) struct IpEchoServerMessage {
tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's strive for not breaking ABI. ;)

udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
tcp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
}

impl IpEchoServerMessage {
Expand Down
134 changes: 87 additions & 47 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
use log::*;
use rand::{thread_rng, Rng};
use socket2::{Domain, SockAddr, Socket, Type};
use std::collections::{HashMap, HashSet};
use std::io::{self, Read, Write};
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::sync::mpsc::channel;
use std::time::Duration;

mod ip_echo_server;
use ip_echo_server::IpEchoServerMessage;
pub use ip_echo_server::{ip_echo_server, IpEchoServer};
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};

/// A data type representing a public Udp socket
pub struct UdpSocketPair {
Expand Down Expand Up @@ -97,20 +98,15 @@ pub fn verify_reachable_ports(
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
) -> bool {
let udp_ports: Vec<_> = udp_sockets
.iter()
.map(|udp_socket| udp_socket.local_addr().unwrap().port())
.collect();

info!(
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
tcp_listeners, udp_ports, ip_echo_server_addr
"Checking that tcp ports {:?} from {:?}",
tcp_listeners, ip_echo_server_addr
);

let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &udp_ports),
IpEchoServerMessage::new(&tcp_ports, &[]),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

Expand Down Expand Up @@ -147,46 +143,90 @@ pub fn verify_reachable_ports(
return ok;
}

for _udp_retries in 0..5 {
// Wait for a datagram to arrive at each UDP port
for udp_socket in udp_sockets {
let port = udp_socket.local_addr().unwrap().port();
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
let (sender, receiver) = channel();
std::thread::spawn(move || {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not join()-ing is a bad taste. I'm pretty sure this leaks a thread or it reads an actual data from the socket at arbitrary time after the validator really start to boot...

Copy link
Member Author

@ryoqun ryoqun Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Oops, I'll fix this for tcp as well...

let mut buf = [0; 1];
debug!("Waiting for incoming datagram on udp/{}", port);
match udp_socket.recv(&mut buf) {
Ok(_) => sender
.send(())
.unwrap_or_else(|err| warn!("send failure: {}", err)),
Err(err) => warn!("udp recv failure: {}", err),
}
});
match receiver.recv_timeout(Duration::from_secs(5)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I dunno why channel() is needed.. I've simplified it just by socket.set_read_timeout()....

Ok(_) => {
info!("udp/{} is reachable", port);
}
Err(err) => {
error!(
"Received no response at udp/{}, check your port configuration: {}",
port, err
);
ok = false;
}
let mut udp_ports: HashMap<_, _> = HashMap::new();
udp_sockets.iter().for_each(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port();
udp_ports
.entry(port)
.or_insert_with(Vec::new)
.push(udp_socket);
});
let udp_ports: Vec<_> = udp_ports.into_iter().collect();

info!(
"Checking that udp ports {:?} are reachable from {:?}",
udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
ip_echo_server_addr
);

ok = false;

for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) {
for udp_remaining_retry in (0_usize..5).rev() {
let (checked_ports, checked_socket_iter) = (
checked_ports_and_sockets
.iter()
.map(|(port, _)| *port)
.collect::<Vec<_>>(),
checked_ports_and_sockets
.iter()
.map(|(_, sockets)| sockets)
.flatten(),
);

if udp_remaining_retry > 0 {
// Might have lost a UDP packet, retry a couple times
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&[], &checked_ports),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
} else {
error!("Maximum retry count is reached....");
return false;
}

let results: Vec<_> = checked_socket_iter
.map(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port();
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
std::thread::spawn(move || {
let mut buf = [0; 1];
let original_read_timeout = udp_socket.read_timeout().unwrap();
udp_socket
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let recv_result = udp_socket.recv(&mut buf).map(|_| port);
debug!(
"Waited for incoming datagram on udp/{}: {:?}",
port, recv_result
);
udp_socket.set_read_timeout(original_read_timeout).unwrap();
recv_result.ok()
})
})
.collect();

let opened_ports: HashSet<_> = results
.into_iter()
.filter_map(|t| t.join().unwrap())
.collect();

if opened_ports.len() == checked_ports.len() {
info!(
"checked ports: {:?}, opened ports: {:?}",
checked_ports, opened_ports
);
ok = true;
break;
} else {
error!(
"checked ports: {:?}, opened ports: {:?}",
checked_ports, opened_ports
);
error!("There are some ports with no response!! Retrying...");
}
}
if ok {
break;
}
ok = true;

// Might have lost a UDP packet, retry a couple times
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&[], &udp_ports),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
}

ok
Expand Down
28 changes: 11 additions & 17 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,23 +1110,17 @@ pub fn main() {
}

if let Some(ref cluster_entrypoint) = cluster_entrypoint {
let udp_sockets = [
node.sockets.tpu.first(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvines Well, this was broken and caused actual CI failures in this pr...
That's because we open many sockets on the same port and there is no guarantee os feeds the echo-backed packets into the first socket. We must recv from all of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nice find

/*
Enable these ports when `IpEchoServerMessage` supports more than 4 UDP ports:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started to support these as well as a side-effect of addressing same-port-shared sockets.

For this, I only needed just chunks().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome

node.sockets.tpu_forwards.first(),
node.sockets.tvu.first(),
node.sockets.tvu_forwards.first(),
node.sockets.broadcast.first(),
node.sockets.retransmit_sockets.first(),
*/
Some(&node.sockets.gossip),
Some(&node.sockets.repair),
Some(&node.sockets.serve_repair),
]
.iter()
.filter_map(|x| *x)
.collect::<Vec<_>>();
let mut udp_sockets = vec![
&node.sockets.gossip,
&node.sockets.repair,
&node.sockets.serve_repair,
];
udp_sockets.extend(node.sockets.tpu.iter().take(3));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no.

udp_sockets.extend(node.sockets.tpu_forwards.iter());
udp_sockets.extend(node.sockets.tvu.iter());
udp_sockets.extend(node.sockets.tvu_forwards.iter());
udp_sockets.extend(node.sockets.broadcast.iter());
udp_sockets.extend(node.sockets.retransmit_sockets.iter());

let mut tcp_listeners = vec![];
if !private_rpc {
Expand Down