From c61afdcc5351bb4beee8d28aa0eccfe857f41678 Mon Sep 17 00:00:00 2001 From: Ulan Degenbaev Date: Tue, 18 Apr 2023 15:55:25 +0000 Subject: [PATCH] RUN-619: Trim IPC buffers of idle sandbox processes --- ic-os/guestos/rootfs/prep/ic-node/ic-node.te | 7 +- rs/canister_sandbox/backend_lib/src/lib.rs | 4 +- rs/canister_sandbox/common/src/process.rs | 2 + .../common/src/test_sandbox.rs | 3 +- rs/canister_sandbox/common/src/transport.rs | 342 +++++++++++++++++- .../src/launch_as_process.rs | 4 +- .../sandbox_launcher/src/lib.rs | 4 +- 7 files changed, 341 insertions(+), 25 deletions(-) diff --git a/ic-os/guestos/rootfs/prep/ic-node/ic-node.te b/ic-os/guestos/rootfs/prep/ic-node/ic-node.te index b31381f7fb1..ff8a6bdaa46 100644 --- a/ic-os/guestos/rootfs/prep/ic-node/ic-node.te +++ b/ic-os/guestos/rootfs/prep/ic-node/ic-node.te @@ -200,7 +200,8 @@ create_dirs_pattern(ic_replica_t, tmp_t, tmp_t) # Replica creates and reads/writes to/from internal unix domain socket, # as well as the socket connecting it to the sandbox. -allow ic_replica_t self : unix_stream_socket { create read write }; +# It also uses setsockopt to configure socket timeouts. +allow ic_replica_t self : unix_stream_socket { create setopt read write }; # Replica uses an internal fifo file allow ic_replica_t ic_replica_t : fifo_file { read }; @@ -308,7 +309,7 @@ allow ic_canister_sandbox_t ic_canister_sandbox_t : process { getsched }; # communication channel (and such that there is no "accidental" use of any # differently labeled channel. allow ic_canister_sandbox_t ic_replica_t : fd use; -allow ic_canister_sandbox_t ic_replica_t : unix_stream_socket { read write }; +allow ic_canister_sandbox_t ic_replica_t : unix_stream_socket { setopt read write }; # Allow to access the shared memory area set up by replica. NB this should be # labelled differently eventually because allowing tmpfs is fairly broad. @@ -334,7 +335,7 @@ dontaudit ic_canister_sandbox_t ic_orchestrator_t : fd { use }; # This should actually not be allowed, logs should be routed through # replica. allow ic_canister_sandbox_t init_t : fd { use }; -allow ic_canister_sandbox_t init_t : unix_stream_socket { read write }; +allow ic_canister_sandbox_t init_t : unix_stream_socket { setopt read write }; # Deny access to system information as well as own proc file (would # also allow accessing proc files of *other* sandboxes). diff --git a/rs/canister_sandbox/backend_lib/src/lib.rs b/rs/canister_sandbox/backend_lib/src/lib.rs index 10388a38c19..50ef31f3f13 100644 --- a/rs/canister_sandbox/backend_lib/src/lib.rs +++ b/rs/canister_sandbox/backend_lib/src/lib.rs @@ -4,7 +4,8 @@ pub mod sandbox_manager; pub mod sandbox_server; use ic_canister_sandbox_common::{ - child_process_initialization, controller_client_stub, protocol, rpc, transport, + child_process_initialization, controller_client_stub, protocol, rpc, + transport::{self, SocketReaderConfig}, }; use ic_config::embedders::Config as EmbeddersConfig; use ic_logger::new_replica_logger_from_config; @@ -118,5 +119,6 @@ pub fn run_canister_sandbox( frame_handler.handle(message); }, socket, + SocketReaderConfig::for_sandbox(), ); } diff --git a/rs/canister_sandbox/common/src/process.rs b/rs/canister_sandbox/common/src/process.rs index ec3bb524616..99bf61f8fe3 100644 --- a/rs/canister_sandbox/common/src/process.rs +++ b/rs/canister_sandbox/common/src/process.rs @@ -4,6 +4,7 @@ use std::os::unix::prelude::{CommandExt, RawFd}; use std::process::{Child, Command}; use std::sync::atomic::{AtomicBool, Ordering}; +use crate::transport::SocketReaderConfig; use crate::{ protocol, protocol::ctlsvc, rpc, sandbox_client_stub::SandboxClientStub, sandbox_service::SandboxService, transport, @@ -107,6 +108,7 @@ pub fn spawn_canister_sandbox_process_with_factory( demux.handle(message); }, socket, + SocketReaderConfig::default(), ); // If we the connection drops, but it is not terminated from // our end, that implies that the sandbox process died. At diff --git a/rs/canister_sandbox/common/src/test_sandbox.rs b/rs/canister_sandbox/common/src/test_sandbox.rs index 73d9af1627f..c76a52524bf 100644 --- a/rs/canister_sandbox/common/src/test_sandbox.rs +++ b/rs/canister_sandbox/common/src/test_sandbox.rs @@ -1,5 +1,5 @@ -use ic_canister_sandbox_common::protocol::sbxsvc; use ic_canister_sandbox_common::*; +use ic_canister_sandbox_common::{protocol::sbxsvc, transport::SocketReaderConfig}; use ic_embedders::{ wasm_utils::{Segments, WasmImportsDetails}, CompilationResult, SerializedModule, SerializedModuleBytes, @@ -127,5 +127,6 @@ fn main() { demux.handle(message); }, socket, + SocketReaderConfig::for_testing(), ); } diff --git a/rs/canister_sandbox/common/src/transport.rs b/rs/canister_sandbox/common/src/transport.rs index 42f9b0b68fe..98d04d443a1 100644 --- a/rs/canister_sandbox/common/src/transport.rs +++ b/rs/canister_sandbox/common/src/transport.rs @@ -9,10 +9,10 @@ use bytes::{ use serde::de::DeserializeOwned; use serde::Serialize; -use std::convert::TryInto; use std::marker::PhantomData; use std::os::unix::{io::AsRawFd, io::RawFd, net::UnixStream}; use std::sync::{Arc, Condvar, Mutex}; +use std::{convert::TryInto, time::Duration}; // The maximum number of file descriptors that can be sent in a single message. const MAX_NUM_FD_PER_MESSAGE: usize = 16; @@ -24,6 +24,14 @@ const INITIAL_BUFFER_CAPACITY: usize = 65536; // The minimum buffer capacity for reading in `recv_msg()`. const MIN_READ_BUFFER_CAPACITY: usize = 16384; +// The timeout after which the IPC buffers are trimmed. +const IDLE_TIMEOUT_TO_TRIM_BUFFER: Duration = Duration::from_secs(50); + +// The timeout after which `libc::malloc_trim()` is called to unmap free pages +// of the malloc allocator. Note that this timeout should be higher than +// `IDLE_TIMEOUT_TO_TRIM_BUFFER` to achieve maximum memory reduction. +const IDLE_TIMEOUT_TO_TRIM_MALLOC: Duration = Duration::from_secs(100); + // There are different types used for the msg_controllen member of // struct cmsghdr -- we presently support Linux and Darwin compilation. // Add a type alias to allow making correct casts. @@ -104,6 +112,8 @@ struct UnixStreamMessageWriterInt { fds: Vec, sending_in_background: bool, quit_requested: bool, + // This is needed only for testing. + number_of_timeouts: usize, // Phantom data to ensure the writer is correctly parameterized // over the type it can write. This needs to be inside the "Int" // object to ensure compiler can see it "inside" the mutex @@ -114,7 +124,7 @@ struct UnixStreamMessageWriterInt { impl UnixStreamMessageWriter { - fn new(socket: Arc) -> Arc { + fn new(socket: Arc, idle_timeout_to_trim_buffer: Duration) -> Arc { let socket_fd = socket.as_raw_fd(); let instance = Arc::new(UnixStreamMessageWriter { state: Mutex::new(UnixStreamMessageWriterInt:: { @@ -122,6 +132,7 @@ impl fds: vec![], sending_in_background: false, quit_requested: false, + number_of_timeouts: 0, phantom: PhantomData, }), _socket: socket, @@ -130,29 +141,43 @@ impl }); let copy_instance = Arc::clone(&instance); std::thread::spawn(move || { - copy_instance.background_sending_thread(); + copy_instance.background_sending_thread(idle_timeout_to_trim_buffer); }); instance } - fn background_sending_thread(&self) { + fn background_sending_thread(&self, idle_timeout_to_free_buffer: Duration) { + // This predicate corresponds to the `trigger_background_sending` + // condition variable. + let awaiting_input = |guard: &mut UnixStreamMessageWriterInt<_>| { + guard.buf.is_empty() && !guard.quit_requested + }; + loop { let (mut buf, mut fds) = { let mut guard = self.state.lock().unwrap(); + if awaiting_input(&mut guard) { + guard.sending_in_background = false; + let result = self + .trigger_background_sending + .wait_timeout_while(guard, idle_timeout_to_free_buffer, awaiting_input) + .unwrap(); + guard = result.0; + if result.1.timed_out() && awaiting_input(&mut guard) { + guard.number_of_timeouts += 1; + // Trim the buffer and then wait without any timeout. + guard.buf = BytesMut::new(); + guard = self + .trigger_background_sending + .wait_while(guard, awaiting_input) + .unwrap(); + } + assert!(!awaiting_input(&mut guard)); + } if guard.quit_requested { return; } - loop { - if guard.buf.is_empty() { - guard.sending_in_background = false; - guard = self.trigger_background_sending.wait(guard).unwrap(); - if guard.quit_requested { - return; - } - } else { - break (guard.buf.split(), std::mem::take(&mut guard.fds)); - } - } + (guard.buf.split(), std::mem::take(&mut guard.fds)) }; while !buf.is_empty() { send_message(self.socket_fd, &mut buf, &mut fds, 0); @@ -200,6 +225,13 @@ impl guard.quit_requested = true; self.trigger_background_sending.notify_one(); } + + // A helper for testing. + #[cfg(test)] + fn number_of_timeouts(&self) -> usize { + let guard = self.state.lock().unwrap(); + guard.number_of_timeouts + } } impl< @@ -236,7 +268,7 @@ impl UnixStreamMuxWriter { pub fn new(socket: Arc) -> Self { - let repr = UnixStreamMessageWriter::new(socket); + let repr = UnixStreamMessageWriter::new(socket, IDLE_TIMEOUT_TO_TRIM_BUFFER); Self { repr } } @@ -251,6 +283,133 @@ impl } } +/// Config for `socket_read_messages()` function. +/// It controls idle time trimming of the reader buffer and unmapping of free +/// pages of the malloc allocator. +pub struct SocketReaderConfig { + // Specifies whether to call `libc::malloc_trim()` or not when + // the socket becomes idle. + idle_malloc_trim: bool, + // Specifies the timeout after which the socket is considered idle. + idle_timeout: Duration, +} + +impl Default for SocketReaderConfig { + fn default() -> Self { + Self { + // We don't trim malloc by default because it is an expensive operation. + // Moreover, the replica process uses jemalloc for which `malloc_trim` + // is a no-op. + idle_malloc_trim: false, + idle_timeout: IDLE_TIMEOUT_TO_TRIM_BUFFER, + } + } +} + +impl SocketReaderConfig { + pub fn for_sandbox() -> Self { + Self { + idle_malloc_trim: true, + idle_timeout: IDLE_TIMEOUT_TO_TRIM_MALLOC, + } + } + + pub fn for_testing() -> Self { + Self { + idle_malloc_trim: true, + idle_timeout: Duration::from_secs(0), + } + } +} + +// A helper to read messages from a socket with a timeout. +// It uses the `libc::setsockopt()` syscall to set the timeout and keeps track +// of the currently set timeout in order to reduce the number of syscalls. +struct SocketReaderWithTimeout { + socket_fd: i32, + socket_timeout: Option, +} + +impl SocketReaderWithTimeout { + fn new(socket_fd: i32) -> Self { + Self { + socket_fd, + socket_timeout: None, + } + } + + // A wrapper around the standalone `receive_message()` function. + // It takes an additional `timeout` parameter and returns `None` + // if no message was read within the given timeout. + // Otherwise, it returns the result of the wrapped function. + fn receive_message( + &mut self, + buf: &mut BytesMut, + fds: &mut Vec, + flags: libc::c_int, + timeout: Option, + ) -> Option { + let result = self.update_socket_timeout(timeout); + if let Err(err) = result { + // We didn't manage to update the timeout. Since timeout is used + // for optimization and not for correctness, we can continue + // without crashing and let `recvmsg` handle `EWOULDBLOCK`. + eprintln!("Failed to update sandbox IPC socket timeout: {}", err); + } + let num_bytes_received = receive_message(self.socket_fd, buf, fds, flags); + if num_bytes_received == -1 + && std::io::Error::last_os_error().raw_os_error() == Some(libc::EWOULDBLOCK) + { + return None; + } + Some(num_bytes_received) + } + + fn update_socket_timeout(&mut self, timeout: Option) -> Result<(), std::io::Error> { + if timeout == self.socket_timeout { + // The fast path to avoid making the syscall to update the socket + // timeout to the same value. + return Ok(()); + } + + let (tv_sec, tv_usec) = match timeout { + None => (0, 0), + Some(dur) => { + let tv_sec = dur.as_secs() as libc::time_t; + let tv_usec = dur.subsec_micros() as libc::suseconds_t; + if tv_sec == 0 && tv_usec == 0 { + // `setsockopt` interprets `(0, 0)` as no timeout, + // so we need to take the next smallest value. + (tv_sec, tv_usec + 1) + } else { + (tv_sec, tv_usec) + } + } + }; + + let tv = libc::timeval { tv_sec, tv_usec }; + + // SAFETY: All parameters are valid. + let result = unsafe { + libc::setsockopt( + self.socket_fd, + libc::SOL_SOCKET, + libc::SO_RCVTIMEO, + &tv as *const libc::timeval as *const libc::c_void, + std::mem::size_of::() as u32, + ) + }; + + debug_assert_eq!(result, 0); + if result == 0 { + self.socket_timeout = timeout; + Ok(()) + } else { + Err(std::io::Error::last_os_error()) + } + } +} + /// Reads from a unix stream socket and passes individual messages /// to given handler. pub fn socket_read_messages< @@ -259,17 +418,47 @@ pub fn socket_read_messages< >( handler: Handler, socket: Arc, + config: SocketReaderConfig, ) { let socket_fd = socket.as_raw_fd(); let mut decoder = FrameDecoder::::new(); let mut buf = BytesMut::with_capacity(INITIAL_BUFFER_CAPACITY); let mut fds = Vec::::new(); + let mut reader = SocketReaderWithTimeout::new(socket_fd); loop { while let Some(mut frame) = decoder.decode(&mut buf) { install_file_descriptors(&mut frame, &mut fds); handler(frame); } - let num_bytes_received = receive_message(socket_fd, &mut buf, &mut fds, 0); + + let num_bytes_received = + match reader.receive_message(&mut buf, &mut fds, 0, Some(config.idle_timeout)) { + Some(bytes) => bytes, + None => { + // The operation has timed out. + // Trim the buffer and trim malloc if needed. + if buf.is_empty() { + buf = BytesMut::with_capacity(INITIAL_BUFFER_CAPACITY); + if config.idle_malloc_trim { + // SAFETY: 0 is always a valid argument to `malloc_trim`. + #[cfg(target_os = "linux")] + unsafe { + libc::malloc_trim(0); + } + } + } + // Read the message without any timeout. + // The loop is not strictly necessary, but we keep it in + // order to make the code robust against failures in + // updating the socket timeout. + loop { + if let Some(bytes) = reader.receive_message(&mut buf, &mut fds, 0, None) { + break (bytes); + } + } + } + }; + if num_bytes_received <= 0 { break; } @@ -524,7 +713,8 @@ mod tests { let (mut test_send, test_recv) = std::os::unix::net::UnixStream::pair().unwrap(); // Set up sender (to send via "comm_send". - let sender = UnixStreamMessageWriter::::new(comm_send); + let sender = + UnixStreamMessageWriter::::new(comm_send, IDLE_TIMEOUT_TO_TRIM_BUFFER); // Set up receiver thread (to receive via "comm_recv") and // channel to pass result back to test thread. @@ -535,6 +725,7 @@ mod tests { ch_sender.send(message).unwrap(); }, comm_recv, + SocketReaderConfig::for_testing(), ); }); @@ -659,4 +850,119 @@ mod tests { assert_eq!(s, format!("msg-{}", i)); } } + + #[derive(Serialize, Deserialize, Clone)] + struct StringMessage { + payload: String, + } + + impl MuxInto for StringMessage { + fn wrap(self, _cookie: u64) -> StringMessage { + self + } + } + + impl EnumerateInnerFileDescriptors for StringMessage { + fn enumerate_fds<'a>(&'a mut self, _fds: &mut Vec<&'a mut RawFd>) {} + } + + #[test] + fn sender_timeout() { + // Create a socketpair through which we will communicate. + let (comm_send, comm_recv) = std::os::unix::net::UnixStream::pair().unwrap(); + let comm_send = Arc::new(comm_send); + let comm_recv = Arc::new(comm_recv); + + // Set up sender (to send via "comm_send". + let sender = + UnixStreamMessageWriter::::new(comm_send, Duration::from_millis(1)); + + // Set up receiver thread (to receive via "comm_recv") and + // channel to pass result back to test thread. + let (ch_sender, ch_receiver) = sync_channel::(1); + std::thread::spawn(move || { + socket_read_messages( + |message: StringMessage| { + ch_sender.send(message).unwrap(); + }, + comm_recv, + SocketReaderConfig::for_testing(), + ); + }); + + // Send and receive the message. + sender.handle( + 0, + StringMessage { + payload: String::from_utf8(vec![b'1'; 1_000_000]).unwrap(), + }, + ); + let message1 = ch_receiver.recv().unwrap(); + + // Give some time for the background sender to timeout. + std::thread::sleep(Duration::from_millis(100)); + + // Send and receive the message again. + sender.handle( + 0, + StringMessage { + payload: String::from_utf8(vec![b'2'; 1_000]).unwrap(), + }, + ); + let message2 = ch_receiver.recv().unwrap(); + + // Can stop sender now, don't need it anymore. + sender.stop(); + + assert!(sender.number_of_timeouts() > 0); + assert_eq!( + message1.payload, + String::from_utf8(vec![b'1'; 1_000_000]).unwrap() + ); + assert_eq!( + message2.payload, + String::from_utf8(vec![b'2'; 1_000]).unwrap() + ); + } + + #[test] + fn reader_timeout() { + // Create a socketpair through which we will communicate. + let (comm_send, comm_recv) = std::os::unix::net::UnixStream::pair().unwrap(); + + let mut reader = SocketReaderWithTimeout::new(comm_recv.as_raw_fd()); + + let mut recv_buf = BytesMut::new(); + let mut recv_fds = vec![]; + let bytes = reader.receive_message( + &mut recv_buf, + &mut recv_fds, + 0, + Some(Duration::from_secs(0)), + ); + assert_eq!(bytes, None); + + let mut send_buf = BytesMut::new(); + send_buf.extend_from_slice(&[42; 1000]); + let mut send_fds = vec![]; + let bytes = send_message(comm_send.as_raw_fd(), &mut send_buf, &mut send_fds, 0); + assert_eq!(bytes, 1000); + + let bytes = reader.receive_message( + &mut recv_buf, + &mut recv_fds, + 0, + Some(Duration::from_secs(0)), + ); + assert_eq!(bytes, Some(1000)); + assert_eq!(recv_buf.iter().copied().collect::>(), vec![42; 1000]); + + let bytes = reader.receive_message( + &mut recv_buf, + &mut recv_fds, + 0, + Some(Duration::from_millis(10)), + ); + assert_eq!(bytes, None); + } } diff --git a/rs/canister_sandbox/replica_controller/src/launch_as_process.rs b/rs/canister_sandbox/replica_controller/src/launch_as_process.rs index cfb595f44a4..52ecc441b0f 100644 --- a/rs/canister_sandbox/replica_controller/src/launch_as_process.rs +++ b/rs/canister_sandbox/replica_controller/src/launch_as_process.rs @@ -15,7 +15,7 @@ use ic_canister_sandbox_common::{ rpc, sandbox_client_stub::SandboxClientStub, sandbox_service::SandboxService, - transport, + transport::{self, SocketReaderConfig}, }; pub fn spawn_launcher_process( @@ -57,6 +57,7 @@ pub fn spawn_launcher_process( demux.handle(message); }, socket, + SocketReaderConfig::default(), ); }); @@ -117,6 +118,7 @@ pub fn spawn_canister_sandbox_process( demux.handle(message); }, socket, + SocketReaderConfig::default(), ); // Send a notification to the writer thread to stop. // Otherwise, the writer thread will remain waiting forever. diff --git a/rs/canister_sandbox/sandbox_launcher/src/lib.rs b/rs/canister_sandbox/sandbox_launcher/src/lib.rs index 4e4f9c8139c..2e5b83e14b9 100644 --- a/rs/canister_sandbox/sandbox_launcher/src/lib.rs +++ b/rs/canister_sandbox/sandbox_launcher/src/lib.rs @@ -16,7 +16,8 @@ use ic_canister_sandbox_common::{ ctllaunchersvc::SandboxExitedRequest, launchersvc::{LaunchSandboxReply, LaunchSandboxRequest}, }, - rpc, transport, + rpc, + transport::{self, SocketReaderConfig}, }; use ic_types::CanisterId; use nix::{ @@ -65,6 +66,7 @@ pub fn run_launcher(socket: std::os::unix::net::UnixStream) { frame_handler.handle(message); }, socket, + SocketReaderConfig::default(), ); }