Skip to content

Commit

Permalink
Restructure File to use readv and writev (shadow#2798)
Browse files Browse the repository at this point in the history
Similar to shadow#2797, but with `readv` and `writev` instead. This doesn't
add support for the `readv` and `writev` syscalls, but restructures the
socket code so that we can support these syscalls in the future.
  • Loading branch information
stevenengler authored Mar 22, 2023
2 parents bc770a7 + 747f830 commit 297ab3f
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 123 deletions.
53 changes: 33 additions & 20 deletions src/main/host/descriptor/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use crate::host::descriptor::{
FileMode, FileState, FileStatus, StateEventSource, StateListenerFilter,
};
use crate::host::memory_manager::MemoryManager;
use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
use crate::host::syscall_types::{PluginPtr, SyscallError, SyscallResult};
use crate::utility::callback_queue::{CallbackQueue, Handle};
use crate::utility::stream_len::StreamLen;
use crate::utility::HostTreePointer;

use std::io::{Read, Write};

pub struct EventFd {
counter: u64,
is_semaphore_mode: bool,
Expand Down Expand Up @@ -68,15 +70,14 @@ impl EventFd {
Ok(())
}

pub fn read<W>(
pub fn readv(
&mut self,
mut bytes: W,
iovs: &[IoVec],
offset: Option<libc::off_t>,
_flags: libc::c_int,
mem: &mut MemoryManager,
cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
W: std::io::Write + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// eventfds don't support seeking
if offset.is_some() {
return Err(Errno::ESPIPE.into());
Expand All @@ -85,9 +86,11 @@ impl EventFd {
// eventfd(2): "Each successful read(2) returns an 8-byte integer"
const NUM_BYTES: usize = 8;

let len: libc::size_t = iovs.iter().map(|x| x.len).sum();

// this check doesn't guarentee that we can write all bytes since the stream length is only
// a hint
if usize::try_from(bytes.stream_len_bp()?).unwrap() < NUM_BYTES {
if len < NUM_BYTES {
log::trace!(
"Reading from eventfd requires a buffer of at least {} bytes",
NUM_BYTES
Expand All @@ -100,31 +103,32 @@ impl EventFd {
return Err(Errno::EWOULDBLOCK.into());
}

let mut writer = IoVecWriter::new(iovs, mem);

// behavior defined in `man 2 eventfd`
if self.is_semaphore_mode {
const ONE: [u8; NUM_BYTES] = 1u64.to_ne_bytes();
bytes.write_all(&ONE)?;
writer.write_all(&ONE)?;
self.counter -= 1;
} else {
let to_write: [u8; NUM_BYTES] = self.counter.to_ne_bytes();
bytes.write_all(&to_write)?;
writer.write_all(&to_write)?;
self.counter = 0;
}

self.update_state(cb_queue);

Ok(NUM_BYTES.into())
Ok(NUM_BYTES.try_into().unwrap())
}

pub fn write<R>(
pub fn writev(
&mut self,
mut bytes: R,
iovs: &[IoVec],
offset: Option<libc::off_t>,
_flags: libc::c_int,
mem: &mut MemoryManager,
cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
R: std::io::Read + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// eventfds don't support seeking
if offset.is_some() {
return Err(Errno::ESPIPE.into());
Expand All @@ -134,18 +138,27 @@ impl EventFd {
// counter"
const NUM_BYTES: usize = 8;

let len: libc::size_t = iovs.iter().map(|x| x.len).sum();

// this check doesn't guarentee that we can read all bytes since the stream length is only
// a hint
if usize::try_from(bytes.stream_len_bp()?).unwrap() < NUM_BYTES {
if len < NUM_BYTES {
log::trace!(
"Writing to eventfd requires a buffer with at least {} bytes",
NUM_BYTES
);
return Err(Errno::EINVAL.into());
}

if iovs.len() > 1 {
// Linux doesn't seem to let you write to an eventfd with multiple iovecs
return Err(Errno::EINVAL.into());
}

let mut reader = IoVecReader::new(iovs, mem);

let mut read_buf = [0u8; NUM_BYTES];
bytes.read_exact(&mut read_buf)?;
reader.read_exact(&mut read_buf)?;
let value: u64 = u64::from_ne_bytes(read_buf);

if value == u64::MAX {
Expand All @@ -162,7 +175,7 @@ impl EventFd {
self.counter += value;
self.update_state(cb_queue);

Ok(NUM_BYTES.into())
Ok(NUM_BYTES.try_into().unwrap())
}

pub fn ioctl(
Expand Down
15 changes: 7 additions & 8 deletions src/main/host/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use nix::fcntl::OFlag;
use crate::core::worker;
use crate::cshadow as c;
use crate::host::memory_manager::MemoryManager;
use crate::host::syscall::io::IoVec;
use crate::host::syscall_types::{PluginPtr, SyscallError, SyscallResult};
use crate::utility::callback_queue::{CallbackQueue, EventSource, Handle};
use crate::utility::{HostTreePointer, IsSend, IsSync};
Expand Down Expand Up @@ -430,15 +431,13 @@ impl FileRefMut<'_> {
enum_passthrough!(self, (ptr), Pipe, EventFd, Socket;
pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener)
);

enum_passthrough_generic!(self, (bytes, offset, cb_queue), Pipe, EventFd, Socket;
pub fn read<W>(&mut self, bytes: W, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where W: std::io::Write + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), Pipe, EventFd, Socket;
pub fn readv(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);

enum_passthrough_generic!(self, (source, offset, cb_queue), Pipe, EventFd, Socket;
pub fn write<R>(&mut self, source: R, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where R: std::io::Read + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), Pipe, EventFd, Socket;
pub fn writev(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);
}

Expand Down
52 changes: 28 additions & 24 deletions src/main/host/descriptor/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::host::descriptor::{
FileMode, FileState, FileStatus, StateEventSource, StateListenerFilter,
};
use crate::host::memory_manager::MemoryManager;
use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
use crate::host::syscall_types::{PluginPtr, SyscallError, SyscallResult};
use crate::utility::callback_queue::{CallbackQueue, Handle};
use crate::utility::stream_len::StreamLen;
use crate::utility::HostTreePointer;

pub struct Pipe {
Expand Down Expand Up @@ -117,15 +117,14 @@ impl Pipe {
Ok(())
}

pub fn read<W>(
pub fn readv(
&mut self,
mut bytes: W,
iovs: &[IoVec],
offset: Option<libc::off_t>,
_flags: libc::c_int,
mem: &mut MemoryManager,
cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
W: std::io::Write + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// pipes don't support seeking
if offset.is_some() {
return Err(nix::errno::Errno::ESPIPE.into());
Expand All @@ -136,36 +135,39 @@ impl Pipe {
return Err(nix::errno::Errno::EBADF.into());
}

let num_bytes_to_read: libc::size_t = iovs.iter().map(|x| x.len).sum();

let mut writer = IoVecWriter::new(iovs, mem);

let (num_copied, _num_removed_from_buf) = self
.buffer
.as_ref()
.unwrap()
.borrow_mut()
.read(&mut bytes, cb_queue)?;
.read(&mut writer, cb_queue)?;

// the read would block if all:
// 1. we could not read any bytes
// 2. we were asked to read >0 bytes
// 3. there are open descriptors that refer to the write end of the pipe
if num_copied == 0
&& bytes.stream_len_bp()? != 0
&& num_bytes_to_read != 0
&& self.buffer.as_ref().unwrap().borrow().num_writers() > 0
{
Err(Errno::EWOULDBLOCK.into())
} else {
Ok(num_copied.into())
Ok(num_copied.try_into().unwrap())
}
}

pub fn write<R>(
pub fn writev(
&mut self,
mut bytes: R,
iovs: &[IoVec],
offset: Option<libc::off_t>,
_flags: libc::c_int,
mem: &mut MemoryManager,
cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
R: std::io::Read + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// pipes don't support seeking
if offset.is_some() {
return Err(nix::errno::Errno::ESPIPE.into());
Expand Down Expand Up @@ -194,10 +196,12 @@ impl Pipe {
}
}

let len = bytes.stream_len_bp()? as usize;
let len: libc::size_t = iovs.iter().map(|x| x.len).sum();

let mut reader = IoVecReader::new(iovs, mem);

let result = match self.write_mode {
WriteMode::Stream => buffer.write_stream(bytes.by_ref(), len, cb_queue),
let num_copied = match self.write_mode {
WriteMode::Stream => buffer.write_stream(&mut reader, len, cb_queue)?,
WriteMode::Packet => {
let mut num_written = 0;

Expand All @@ -207,26 +211,26 @@ impl Pipe {

// if there are no more bytes to write (pipes don't support 0-length packets)
if bytes_remaining == 0 {
break Ok(num_written);
break num_written;
}

// split the packet up into PIPE_BUF-sized packets
let bytes_to_write = std::cmp::min(bytes_remaining, libc::PIPE_BUF);

if let Err(e) = buffer.write_packet(bytes.by_ref(), bytes_to_write, cb_queue) {
if let Err(e) = buffer.write_packet(&mut reader, bytes_to_write, cb_queue) {
// if we've already written bytes, return those instead of an error
if num_written > 0 {
break Ok(num_written);
break num_written;
}
break Err(e);
return Err(e.into());
}

num_written += bytes_to_write;
}
}
};

Ok(result?.into())
Ok(num_copied.try_into().unwrap())
}

pub fn ioctl(
Expand Down
32 changes: 15 additions & 17 deletions src/main/host/descriptor/socket/inet/legacy_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::host::descriptor::{
};
use crate::host::host::Host;
use crate::host::memory_manager::MemoryManager;
use crate::host::syscall::io::write_partial;
use crate::host::syscall::io::{write_partial, IoVec};
use crate::host::syscall_types::{PluginPtr, SyscallError, TypedPluginPtr};
use crate::host::thread::ThreadId;
use crate::network::net_namespace::NetworkNamespace;
Expand Down Expand Up @@ -276,34 +276,32 @@ impl LegacyTcpSocket {
Ok(0.into())
}

pub fn read<W>(
pub fn readv(
&mut self,
mut _bytes: W,
_iovs: &[IoVec],
_offset: Option<libc::off_t>,
_flags: libc::c_int,
_mem: &mut MemoryManager,
_cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
W: std::io::Write + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// we could call LegacyTcpSocket::recvmsg() here, but for now we expect that there are no
// code paths that would call LegacyTcpSocket::read() since the read() syscall handler
// code paths that would call LegacyTcpSocket::readv() since the readv() syscall handler
// should have called LegacyTcpSocket::recvmsg() instead
panic!("Called LegacyTcpSocket::read() on a TCP socket.");
panic!("Called LegacyTcpSocket::readv() on a TCP socket.");
}

pub fn write<R>(
pub fn writev(
&mut self,
mut _bytes: R,
_iovs: &[IoVec],
_offset: Option<libc::off_t>,
_flags: libc::c_int,
_mem: &mut MemoryManager,
_cb_queue: &mut CallbackQueue,
) -> SyscallResult
where
R: std::io::Read + std::io::Seek,
{
) -> Result<libc::ssize_t, SyscallError> {
// we could call LegacyTcpSocket::sendmsg() here, but for now we expect that there are no
// code paths that would call LegacyTcpSocket::write() since the write() syscall handler
// code paths that would call LegacyTcpSocket::writev() since the writev() syscall handler
// should have called LegacyTcpSocket::sendmsg() instead
panic!("Called LegacyTcpSocket::write() on a TCP socket");
panic!("Called LegacyTcpSocket::writev() on a TCP socket");
}

pub fn sendmsg(
Expand Down
15 changes: 7 additions & 8 deletions src/main/host/descriptor/socket/inet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::cshadow as c;
use crate::host::descriptor::socket::{RecvmsgArgs, RecvmsgReturn, SendmsgArgs};
use crate::host::descriptor::{FileMode, FileState, FileStatus, OpenFile, SyscallResult};
use crate::host::memory_manager::MemoryManager;
use crate::host::syscall::io::IoVec;
use crate::host::syscall_types::{PluginPtr, SyscallError};
use crate::network::net_namespace::NetworkNamespace;
use crate::network::packet::Packet;
Expand Down Expand Up @@ -246,15 +247,13 @@ impl InetSocketRefMut<'_> {
enum_passthrough!(self, (ptr), LegacyTcp;
pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener)
);

enum_passthrough_generic!(self, (bytes, offset, cb_queue), LegacyTcp;
pub fn read<W>(&mut self, bytes: W, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where W: std::io::Write + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), LegacyTcp;
pub fn readv(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);

enum_passthrough_generic!(self, (source, offset, cb_queue), LegacyTcp;
pub fn write<R>(&mut self, source: R, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where R: std::io::Read + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), LegacyTcp;
pub fn writev(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);
}

Expand Down
14 changes: 6 additions & 8 deletions src/main/host/descriptor/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,13 @@ impl SocketRefMut<'_> {
enum_passthrough!(self, (ptr), Unix, Inet;
pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener)
);

enum_passthrough_generic!(self, (bytes, offset, cb_queue), Unix, Inet;
pub fn read<W>(&mut self, bytes: W, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where W: std::io::Write + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), Unix, Inet;
pub fn readv(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);

enum_passthrough_generic!(self, (source, offset, cb_queue), Unix, Inet;
pub fn write<R>(&mut self, source: R, offset: Option<libc::off_t>, cb_queue: &mut CallbackQueue) -> SyscallResult
where R: std::io::Read + std::io::Seek
enum_passthrough!(self, (iovs, offset, flags, mem, cb_queue), Unix, Inet;
pub fn writev(&mut self, iovs: &[IoVec], offset: Option<libc::off_t>, flags: libc::c_int,
mem: &mut MemoryManager, cb_queue: &mut CallbackQueue) -> Result<libc::ssize_t, SyscallError>
);
}

Expand Down
Loading

0 comments on commit 297ab3f

Please sign in to comment.