Skip to content

Commit

Permalink
Merge branch 'main' into pipewire
Browse files Browse the repository at this point in the history
  • Loading branch information
slp authored Jan 14, 2025
2 parents 323d7ec + 8330325 commit 28e2f81
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 103 deletions.
15 changes: 15 additions & 0 deletions include/libkrun.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ int32_t krun_set_tee_config_file(uint32_t ctx_id, const char *filepath);
int32_t krun_add_vsock_port(uint32_t ctx_id,
uint32_t port,
const char *c_filepath);

/**
* Adds a port-path pairing for guest IPC with a process in the host.
*
* Arguments:
* "ctx_id" - the configuration context ID.
* "port" - a vsock port that the guest will connect to for IPC.
* "filepath" - a null-terminated string representing the path of the UNIX
* socket in the host.
* "listen" - true if guest expects connections to be initiated from host side
*/
int32_t krun_add_vsock_port2(uint32_t ctx_id,
uint32_t port,
const char *c_filepath,
bool listen);
/**
* Returns the eventfd file descriptor to signal the guest to shut down orderly. This must be
* called before starting the microVM with "krun_start_event". Only available in libkrun-efi.
Expand Down
5 changes: 1 addition & 4 deletions src/devices/src/virtio/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,7 @@ impl<'a> DescriptorChain<'a> {
return None;
}

let desc_head = match mem.checked_offset(desc_table, (index as usize) * 16) {
Some(a) => a,
None => return None,
};
let desc_head = mem.checked_offset(desc_table, (index as usize) * 16)?;
mem.checked_offset(desc_head, 16)?;

// These reads can't fail unless Guest memory is hopelessly broken.
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Vsock {
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
queues: Vec<VirtQueue>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let mut queue_events = Vec::new();
for _ in 0..queues.len() {
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Vsock {
pub fn new(
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let queues: Vec<VirtQueue> = defs::QUEUE_SIZES
.iter()
Expand Down
16 changes: 13 additions & 3 deletions src/devices/src/virtio/vsock/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct VsockMuxer {
irq_line: Option<u32>,
proxy_map: ProxyMap,
reaper_sender: Option<Sender<u64>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
}

impl VsockMuxer {
Expand All @@ -120,7 +120,7 @@ impl VsockMuxer {
host_port_map: Option<HashMap<u16, u16>>,
interrupt_evt: EventFd,
interrupt_status: Arc<AtomicUsize>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> Self {
VsockMuxer {
cid,
Expand Down Expand Up @@ -179,6 +179,7 @@ impl VsockMuxer {
intc,
irq_line,
sender.clone(),
self.unix_ipc_port_map.clone().unwrap_or_default(),
);
thread.run();

Expand Down Expand Up @@ -499,9 +500,18 @@ impl VsockMuxer {
if let Some(proxy) = proxy_map.get(&id) {
proxy.lock().unwrap().confirm_connect(pkt)
} else if let Some(ref mut ipc_map) = &mut self.unix_ipc_port_map {
if let Some(path) = ipc_map.get(&pkt.dst_port()) {
if let Some((path, listen)) = ipc_map.get(&pkt.dst_port()) {
let mem = self.mem.as_ref().unwrap();
let queue = self.queue.as_ref().unwrap();
if *listen {
warn!("vsock: Attempting to connect a socket that is listening, sending rst");
let rx = MuxerRx::Reset {
local_port: pkt.dst_port(),
peer_port: pkt.src_port(),
};
push_packet(self.cid, rx, &self.rxq, queue, mem);
return;
}
let rxq = self.rxq.clone();

let mut unix = UnixProxy::new(
Expand Down
71 changes: 57 additions & 14 deletions src/devices/src/virtio/vsock/muxer_thread.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
Expand All @@ -8,9 +10,11 @@ use super::super::Queue as VirtQueue;
use super::super::VIRTIO_MMIO_INT_VRING;
use super::muxer::{push_packet, MuxerRx, ProxyMap};
use super::muxer_rxq::MuxerRxQ;
use super::proxy::{ProxyRemoval, ProxyUpdate};
use super::proxy::{NewProxyType, Proxy, ProxyRemoval, ProxyUpdate};
use super::tcp::TcpProxy;

use crate::virtio::vsock::defs;
use crate::virtio::vsock::unix::{UnixAcceptorProxy, UnixProxy};
use crossbeam_channel::Sender;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
Expand All @@ -29,6 +33,7 @@ pub struct MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
}

impl MuxerThread {
Expand All @@ -45,6 +50,7 @@ impl MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
) -> Self {
MuxerThread {
cid,
Expand All @@ -58,6 +64,7 @@ impl MuxerThread {
intc,
irq_line,
reaper_sender,
unix_ipc_port_map,
}
}

Expand Down Expand Up @@ -111,24 +118,36 @@ impl MuxerThread {

let mut should_signal = update.signal_queue;

if let Some((peer_port, accept_fd)) = update.new_proxy {
if let Some((peer_port, accept_fd, proxy_type)) = update.new_proxy {
let local_port: u32 = thread_rng.gen_range(1024..u32::MAX);
let new_id: u64 = (peer_port as u64) << 32 | local_port as u64;
let new_proxy = TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
);
let new_proxy: Box<dyn Proxy> = match proxy_type {
NewProxyType::Tcp => Box::new(TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
NewProxyType::Unix => Box::new(UnixProxy::new_reverse(
new_id,
self.cid,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
};
self.proxy_map
.write()
.unwrap()
.insert(new_id, Mutex::new(Box::new(new_proxy)));
.insert(new_id, Mutex::new(new_proxy));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&new_id) {
proxy.lock().unwrap().push_op_request();
};
Expand All @@ -147,8 +166,32 @@ impl MuxerThread {
}
}

fn create_lisening_ipc_sockets(&self) {
for (port, (path, do_listen)) in &self.unix_ipc_port_map {
if !do_listen {
continue;
}
let id = (*port as u64) << 32 | defs::TSI_PROXY_PORT as u64;
let proxy = match UnixAcceptorProxy::new(id, path, *port) {
Ok(proxy) => proxy,
Err(e) => {
warn!("Failed to create listening proxy at {:?}: {:?}", path, e);
continue;
}
};
self.proxy_map
.write()
.unwrap()
.insert(id, Mutex::new(Box::new(proxy)));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&id) {
self.update_polling(id, proxy.lock().unwrap().as_raw_fd(), EventSet::IN);
};
}
}

fn work(self) {
let mut thread_rng = thread_rng();
self.create_lisening_ipc_sockets();
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match self
Expand Down
9 changes: 8 additions & 1 deletion src/devices/src/virtio/vsock/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ pub enum ProxyRemoval {
Deferred,
}

#[derive(Default)]
pub enum NewProxyType {
#[default]
Tcp,
Unix,
}

#[derive(Default)]
pub struct ProxyUpdate {
pub signal_queue: bool,
pub remove_proxy: ProxyRemoval,
pub polling: Option<(u64, RawFd, EventSet)>,
pub new_proxy: Option<(u32, RawFd)>,
pub new_proxy: Option<(u32, RawFd, NewProxyType)>,
pub push_accept: Option<(u64, u64)>,
pub push_credit_req: Option<MuxerRx>,
}
Expand Down
6 changes: 4 additions & 2 deletions src/devices/src/virtio/vsock/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use super::muxer_rxq::MuxerRxQ;
use super::packet::{
TsiAcceptReq, TsiConnectReq, TsiGetnameRsp, TsiListenReq, TsiSendtoAddr, VsockPacket,
};
use super::proxy::{Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt};
use super::proxy::{
NewProxyType, Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt,
};
use utils::epoll::EventSet;

use vm_memory::GuestMemoryMmap;
Expand Down Expand Up @@ -729,7 +731,7 @@ impl Proxy for TcpProxy {
{
match accept(self.fd) {
Ok(accept_fd) => {
update.new_proxy = Some((self.peer_port, accept_fd));
update.new_proxy = Some((self.peer_port, accept_fd, NewProxyType::Tcp));
}
Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e),
};
Expand Down
Loading

0 comments on commit 28e2f81

Please sign in to comment.