Skip to content

Commit

Permalink
patches bug in recv_mmsg when npkts != nrecv
Browse files Browse the repository at this point in the history
If recv_mmsg receives 2 packets where the first one is filtered out,
then it returns npkts == 1:
https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/recvmmsg.rs#L104-L115

But then streamer::packet::recv_from will erroneously keep the 1st
packet and drop the 2nd one:
https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/packet.rs#L34-L49

To avoid this bug, this commit updates recv_mmsg to always return total
number of received packets. If socket address cannot be correctly
obtained, it is left as the default value which is UNSPECIFIED:
https://github.com/solana-labs/solana/blob/01a096adc/sdk/src/packet.rs#L145
  • Loading branch information
behzadnouri committed Jan 4, 2022
1 parent 4b24499 commit 379feec
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
4 changes: 4 additions & 0 deletions streamer/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ mod tests {
}
send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();

batch
.packets
.iter_mut()
.for_each(|pkt| pkt.meta = Meta::default());
let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap();

assert_eq!(recvd, batch.packets.len());
Expand Down
39 changes: 23 additions & 16 deletions streamer/src/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
pub use solana_perf::packet::NUM_RCVMMSGS;
use {
crate::packet::Packet,
crate::packet::{Meta, Packet},
std::{cmp, io, net::UdpSocket},
};
#[cfg(target_os = "linux")]
Expand All @@ -15,6 +15,7 @@ use {

#[cfg(not(target_os = "linux"))]
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
let mut i = 0;
let count = cmp::min(NUM_RCVMMSGS, packets.len());
for p in packets.iter_mut().take(count) {
Expand Down Expand Up @@ -66,6 +67,8 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<InetAddr>
#[cfg(target_os = "linux")]
#[allow(clippy::uninit_assumed_init)]
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
// Assert that there are no leftovers in packets.
debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();

let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
Expand Down Expand Up @@ -93,23 +96,18 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num p
};
let nrecv =
unsafe { libc::recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) };
if nrecv < 0 {
let nrecv = if nrecv < 0 {
return Err(io::Error::last_os_error());
} else {
usize::try_from(nrecv).unwrap()
};
for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
pkt.meta.size = hdr.msg_len as usize;
if let Some(addr) = cast_socket_addr(&addr, &hdr) {
pkt.meta.set_addr(&addr.to_std());
}
}
let mut npkts = 0;

izip!(addrs, hdrs, packets.iter_mut())
.take(nrecv as usize)
.filter_map(|(addr, hdr, pkt)| {
let addr = cast_socket_addr(&addr, &hdr)?.to_std();
Some((addr, hdr, pkt))
})
.for_each(|(addr, hdr, pkt)| {
pkt.meta.size = hdr.msg_len as usize;
pkt.meta.set_addr(&addr);
npkts += 1;
});
Ok(npkts)
Ok(nrecv)
}

#[cfg(test)]
Expand Down Expand Up @@ -176,6 +174,9 @@ mod tests {
assert_eq!(packet.meta.addr(), saddr);
}

packets
.iter_mut()
.for_each(|pkt| pkt.meta = Meta::default());
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
assert_eq!(sent - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
Expand Down Expand Up @@ -216,6 +217,9 @@ mod tests {
}
reader.set_nonblocking(true).unwrap();

packets
.iter_mut()
.for_each(|pkt| pkt.meta = Meta::default());
let _recv = recv_mmsg(&reader, &mut packets[..]);
assert!(start.elapsed().as_secs() < 5);
}
Expand Down Expand Up @@ -256,6 +260,9 @@ mod tests {
assert_eq!(packet.meta.addr(), saddr2);
}

packets
.iter_mut()
.for_each(|pkt| pkt.meta = Meta::default());
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
Expand Down
5 changes: 4 additions & 1 deletion streamer/tests/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use {
solana_streamer::{
packet::{Packet, PACKET_DATA_SIZE},
packet::{Meta, Packet, PACKET_DATA_SIZE},
recvmmsg::*,
},
std::{net::UdpSocket, time::Instant},
Expand Down Expand Up @@ -44,6 +44,9 @@ pub fn test_recv_mmsg_batch_size() {
if recv >= TEST_BATCH_SIZE {
break;
}
packets
.iter_mut()
.for_each(|pkt| pkt.meta = Meta::default());
}
elapsed_in_small_batch += now.elapsed().as_nanos();
assert_eq!(TEST_BATCH_SIZE, recv);
Expand Down

0 comments on commit 379feec

Please sign in to comment.