From ce7d41fe257d8d38be7b3e9815baf05efd838e33 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Fri, 9 Aug 2024 10:45:28 +0200 Subject: [PATCH] Refactor eventfd syscalls into 2 helper structs Also fleshes out all SAFETY comments --- src/codec/qcmp.rs | 35 ++-- src/components/proxy.rs | 2 +- src/components/proxy/io_uring_shared.rs | 240 +++++++++++++----------- 3 files changed, 142 insertions(+), 135 deletions(-) diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 383501421..e91f52bdc 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -257,24 +257,19 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra #[cfg(target_os = "linux")] pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> { + use crate::components::proxy::io_uring_shared::EventFd; use eyre::Context as _; - use std::os::fd::{AsRawFd, FromRawFd}; let port = crate::net::socket_port(&socket); // Create an eventfd so we can signal to the qcmp loop when we want to exit - // SAFETY: syscall - let shutdown_event = unsafe { std::os::fd::OwnedFd::from_raw_fd(libc::eventfd(0, 0)) }; - let shutdown_event_fd = shutdown_event.as_raw_fd(); + let mut shutdown_event = EventFd::new()?; + let shutdown = shutdown_event.writer(); // Spawn a task on the main loop whose sole purpose is to signal the eventfd tokio::task::spawn(async move { let _ = shutdown_rx.changed().await; - - // SAFETY: syscall - unsafe { - libc::eventfd_write(shutdown_event.as_raw_fd(), 1); - } + shutdown.write(1); }); let _thread_span = uring_span!(tracing::debug_span!("qcmp").or_current()); @@ -294,23 +289,19 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra // Queue the read from the shutdown eventfd used to signal when the loop // should exit - let mut shutdown_buf = 0u64; - { - let entry = io_uring::opcode::Read::new( - io_uring::types::Fd(shutdown_event_fd), - (&mut shutdown_buf as *mut u64).cast(), - 8, - ) - .build() - .user_data(SHUTDOWN); - // SAFETY: syscall - unsafe { sq.push(&entry) }.context("unable to insert io-uring entry")?; + let entry = shutdown_event.io_uring_entry().user_data(SHUTDOWN); + // SAFETY: the memory being written to is located on the stack inside the shutdown event, and is alive + // at least as long as the uring loop + unsafe { + sq.push(&entry).context("unable to insert io-uring entry")?; } // Our loop is simple and only ever processes one ping/pong pair at a time // so we just reuse the same buffer for both receives and sends let mut buf = QcmpPacket::default(); + // SAFETY: msghdr is POD let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; + // SAFETY: msghdr is POD let addr = unsafe { socket2::SockAddr::new( std::mem::zeroed(), @@ -340,7 +331,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra let entry = io_uring::opcode::RecvMsg::new(socket_fd, msghdr_mut) .build() .user_data(RECV); - // SAFETY: syscall + // SAFETY: the memory being written to is located on the stack and outlives the uring loop unsafe { sq.push(&entry) }.context("unable to insert io-uring entry")?; Ok(()) }; @@ -413,7 +404,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra ) .build() .user_data(SEND); - // SAFETY: syscall + // SAFETY: the memory being read from is located on the stack and outlives the uring loop if unsafe { sq.push(&entry) }.is_err() { tracing::error!("failed to enqueue QCMP pong response"); continue; diff --git a/src/components/proxy.rs b/src/components/proxy.rs index 44b247ed0..66299b12e 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -3,7 +3,7 @@ pub mod packet_router; mod sessions; #[cfg(target_os = "linux")] -mod io_uring_shared; +pub(crate) mod io_uring_shared; use super::RunArgs; pub use error::{ErrorMap, PipelineError}; diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index b3ebdc11f..dad2f3bf6 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -18,6 +18,74 @@ use std::{ sync::Arc, }; +/// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html) +/// +/// We use eventfd to signal to io uring loops from async tasks, it is essentially +/// the equivalent of a signalling 64 bit cross-process atomic +pub(crate) struct EventFd { + fd: std::os::fd::OwnedFd, + val: u64, +} + +#[derive(Clone)] +pub(crate) struct EventFdWriter { + fd: i32, +} + +impl EventFdWriter { + #[inline] + pub(crate) fn write(&self, val: u64) { + // SAFETY: we have a valid descriptor, and most of the errors that apply + // to the general write call that eventfd_write wraps are not applicable + // + // Note that while the docs state eventfd_write is glibc, it is implemented + // on musl as well, but really is just a write with 8 bytes + unsafe { + libc::eventfd_write(self.fd, val); + } + } +} + +impl EventFd { + #[inline] + pub(crate) fn new() -> std::io::Result { + // SAFETY: We have no invariants to uphold, but we do need to check the + // return value + let fd = unsafe { libc::eventfd(0, 0) }; + + // This can fail for various reasons mostly around resource limits, if + // this is hit there is either something really wrong (OOM, too many file + // descriptors), or resource limits were externally placed that were too strict + if fd == -1 { + return Err(std::io::Error::last_os_error()); + } + + Ok(Self { + // SAFETY: we've validated the file descriptor + fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) }, + val: 0, + }) + } + + #[inline] + pub(crate) fn writer(&self) -> EventFdWriter { + EventFdWriter { + fd: self.fd.as_raw_fd(), + } + } + + /// Constructs an io-uring entry to read (ie wait) on this eventfd + #[inline] + pub(crate) fn io_uring_entry(&mut self) -> Entry { + io_uring::opcode::Read::new( + Fd(self.fd.as_raw_fd()), + &mut self.val as *mut u64 as *mut u8, + 8, + ) + .build() + } +} + struct RecvPacket { /// The buffer filled with data during recv_from buffer: PoolBuffer, @@ -39,26 +107,21 @@ struct SendPacket { #[derive(Clone)] struct PendingSends { packets: Arc>>, - notify_fd: Fd, + notify: EventFdWriter, } impl PendingSends { - pub fn new(notify_fd: Fd) -> Self { + pub fn new(notify: EventFdWriter) -> Self { Self { packets: Default::default(), - notify_fd, + notify, } } #[inline] pub fn push(&self, packet: SendPacket) { self.packets.lock().push(packet); - - // SAFETY: syscall, this _shouldn't_ ever cause problems, even if the fd - // is invalid/closed - unsafe { - libc::eventfd_write(self.notify_fd.0, 1); - } + self.notify.write(1); } #[inline] @@ -73,6 +136,10 @@ enum LoopPacketInner { } /// A packet that is currently on the io-uring loop, either being received or sent +/// +/// The struct is expected to be pinned at a location in memory in a slab, as we +/// give pointers to the internal data in the struct, which also contains +/// referential pointers that need to stay pinned until the I/O is complete #[repr(C)] struct LoopPacket { msghdr: libc::msghdr, @@ -85,12 +152,14 @@ impl LoopPacket { #[inline] fn new() -> Self { Self { + // SAFETY: msghdr is POD msghdr: unsafe { std::mem::zeroed() }, packet: None, io_vec: libc::iovec { iov_base: std::ptr::null_mut(), iov_len: 0, }, + // SAFETY: sockaddr_storage is POD addr: unsafe { std::mem::zeroed() }, } } @@ -110,6 +179,7 @@ impl LoopPacket { self.io_vec.iov_base = send.buffer.as_ptr() as *mut u8 as *mut _; self.io_vec.iov_len = send.buffer.len(); + // SAFETY: both pointers are valid at this point, with the same size unsafe { std::ptr::copy_nonoverlapping( send.destination.as_ptr().cast(), @@ -136,6 +206,7 @@ impl LoopPacket { unreachable!("finalized a send packet") }; + // SAFETY: we're initialising it with correctly sized data let mut source = unsafe { SockAddr::new( self.addr, @@ -186,9 +257,9 @@ fn spawn_workers( rt: &tokio::runtime::Runtime, ctx: PacketProcessorCtx, pending_sends: PendingSends, - packet_processed_event: Fd, + packet_processed_event: EventFdWriter, mut shutdown_rx: crate::ShutdownRx, - shutdown_event: Fd, + shutdown_event: EventFdWriter, ) -> tokio::sync::mpsc::Sender { let (tx, mut rx) = tokio::sync::mpsc::channel::(1); @@ -197,9 +268,7 @@ fn spawn_workers( // The result is uninteresting, either a shutdown has been signalled, or all senders have been dropped // which equates to the same thing let _ = shutdown_rx.changed().await; - unsafe { - libc::eventfd_write(shutdown_event.0, 1); - } + shutdown_event.write(1); }); match ctx { @@ -237,10 +306,7 @@ fn spawn_workers( ) .await; - // SAFETY: syscall - unsafe { - libc::eventfd_write(packet_processed_event.0, 1); - } + packet_processed_event.write(1); } }); @@ -272,10 +338,7 @@ fn spawn_workers( ) .await; - // SAFETY: syscall - unsafe { - libc::eventfd_write(packet_processed_event.0, 1); - } + packet_processed_event.write(1); } }); @@ -328,7 +391,7 @@ impl<'uring> LoopCtx<'uring> { self.sq.sync(); } - /// Enqueues a downstream recv + /// Enqueues a recv_from on the socket #[inline] fn enqueue_recv(&mut self, buffer: crate::pool::PoolBuffer) { let packet = LoopPacketInner::Recv(RecvPacket { @@ -352,27 +415,7 @@ impl<'uring> LoopCtx<'uring> { ); } - /// Enqueues the wait for the received packet finishing processing - #[inline] - fn enqueue_recv_processed(&mut self, process_event: Fd, buf: &mut u64) { - let token = self.tokens.insert(Token::RecvPacketProcessed); - self.push( - io_uring::opcode::Read::new(process_event, buf as *mut u64 as *mut _, 8) - .build() - .user_data(token as _), - ); - } - - #[inline] - fn enqueue_pending_sends(&mut self, pending_sends: Fd, buf: &mut u64) { - let token = self.tokens.insert(Token::PendingsSends); - self.push( - io_uring::opcode::Read::new(pending_sends, buf as *mut u64 as *mut _, 8) - .build() - .user_data(token as _), - ); - } - + /// Enqueues a send_to on the socket #[inline] fn enqueue_send(&mut self, packet: SendPacket) { // We rely on sends using state with stable addresses, but realistically we should @@ -432,9 +475,16 @@ impl<'uring> LoopCtx<'uring> { Ok(()) } + #[inline] + fn push_with_token(&mut self, entry: Entry, token: Token) { + let token = self.tokens.insert(token); + self.push(entry.user_data(token as _)); + } + #[inline] fn push(&mut self, entry: Entry) { - // SAFETY: syscall + // SAFETY: we keep all memory/file descriptors alive and in a stable locations + // for the duration of the I/O requests unsafe { if self.sq.push(&entry).is_err() { self.backlog.push_back(entry); @@ -488,6 +538,16 @@ impl IoUringLoop { let mut ring = io_uring::IoUring::new((concurrent_sends + 3) as _)?; + // Used to notify the uring loop when 1 or more packets have been queued + // up to be sent to a remote address + let mut pending_sends_event = EventFd::new()?; + // Used to notify the uring when a received packet has finished + // processing and we can perform another recv, as we (currently) only + // ever process a single packet at a time + let mut process_event = EventFd::new()?; + // Used to notify the uring loop to shutdown + let mut shutdown_event = EventFd::new()?; + std::thread::Builder::new() .name(thread_name) .spawn(move || { @@ -498,25 +558,10 @@ impl IoUringLoop { // Create an eventfd to notify the uring thread (this one) of // pending sends - // SAFETY: syscall - let pending_sends_event = - unsafe { std::os::fd::OwnedFd::from_raw_fd(libc::eventfd(0, 0)) }; - let pending_sends_fd = Fd(pending_sends_event.as_raw_fd()); - let pending_sends = PendingSends::new(pending_sends_fd); + let pending_sends = PendingSends::new(pending_sends_event.writer()); // Just double buffer the pending writes for simplicity let mut double_pending_sends = Vec::new(); - // Used to notify the uring when a downstream packet has finished processing and we can perform another read - // SAFETY: syscall - let process_event = - unsafe { std::os::fd::OwnedFd::from_raw_fd(libc::eventfd(0, 0)) }; - let process_event_fd = Fd(process_event.as_raw_fd()); - - // SAFETY: syscall - let shutdown_event = - unsafe { std::os::fd::OwnedFd::from_raw_fd(libc::eventfd(0, 0)) }; - let shutdown_event_fd = Fd(shutdown_event.as_raw_fd()); - // When sending packets, this is the direction used when updating metrics let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { metrics::WRITE @@ -530,9 +575,9 @@ impl IoUringLoop { &rt, ctx, pending_sends.clone(), - process_event_fd, + process_event.writer(), shutdown, - shutdown_event_fd, + shutdown_event.writer(), ); let (submitter, sq, mut cq) = ring.split(); @@ -545,23 +590,10 @@ impl IoUringLoop { tokens, }; - let mut pending = 0; - let mut processed = 0; - let mut shutdown = 0u64; - loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); - loop_ctx.enqueue_pending_sends(pending_sends_fd, &mut pending); - - let token = loop_ctx.tokens.insert(Token::Shutdown); - loop_ctx.push( - io_uring::opcode::Read::new( - shutdown_event_fd, - &mut shutdown as *mut u64 as *mut _, - 8, - ) - .build() - .user_data(token as _), - ); + loop_ctx + .push_with_token(pending_sends_event.io_uring_entry(), Token::PendingsSends); + loop_ctx.push_with_token(shutdown_event.io_uring_entry(), Token::Shutdown); // Sync always needs to be called when entries have been pushed // onto the submission queue for the loop to actually function (ie, similar to await on futures) @@ -611,16 +643,21 @@ impl IoUringLoop { unreachable!("packet process thread has a pending packet"); } - // Queue the wait for the processing to finish - loop_ctx.enqueue_recv_processed(process_event_fd, &mut processed); + // Queue the wait for the processing of the packet to finish + loop_ctx.push_with_token( + process_event.io_uring_entry(), + Token::RecvPacketProcessed, + ); } Token::RecvPacketProcessed => { loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); } Token::PendingsSends => { double_pending_sends = pending_sends.swap(double_pending_sends); - - loop_ctx.enqueue_pending_sends(pending_sends_fd, &mut pending); + loop_ctx.push_with_token( + pending_sends_event.io_uring_entry(), + Token::PendingsSends, + ); for pending in double_pending_sends.drain(0..double_pending_sends.len()) @@ -677,26 +714,17 @@ mod test { #[test] #[cfg(target_os = "linux")] fn eventfd_works_as_expected() { - let event = unsafe { std::os::fd::OwnedFd::from_raw_fd(libc::eventfd(0, 0)) }; - let event_fd = Fd(event.as_raw_fd()); + let mut event = EventFd::new().unwrap(); + let event_writer = event.writer(); // Write even before we create the loop - unsafe { - libc::eventfd_write(event_fd.0, 1); - } + event_writer.write(1); let mut ring = io_uring::IoUring::new(2).unwrap(); let (submitter, mut sq, mut cq) = ring.split(); - let mut buf = 0u64; - unsafe { - sq.push( - &io_uring::opcode::Read::new(event_fd, &mut buf as *mut u64 as *mut _, 8) - .build() - .user_data(1), - ) - .unwrap(); + sq.push(&event.io_uring_entry().user_data(1)).unwrap(); } sq.sync(); @@ -718,27 +746,15 @@ mod test { // This was written before the loop started, but now write to the event // before queuing up the next read 1 => { - assert_eq!(buf, 1); - - unsafe { - libc::eventfd_write(event_fd.0, 9999); - } + assert_eq!(event.val, 1); + event_writer.write(9999); unsafe { - sq.push( - &io_uring::opcode::Read::new( - event_fd, - &mut buf as *mut u64 as *mut _, - 8, - ) - .build() - .user_data(2), - ) - .unwrap(); + sq.push(&event.io_uring_entry().user_data(2)).unwrap(); } } 2 => { - assert_eq!(buf, 9999); + assert_eq!(event.val, 9999); return; } _ => unreachable!(),