Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: Allow creating vsocks in listen mode. #246

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/libkrun.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ int32_t krun_set_tee_config_file(uint32_t ctx_id, const char *filepath);
int32_t krun_add_vsock_port(uint32_t ctx_id,
uint32_t port,
const char *c_filepath);

/**
* Adds a port-path pairing for guest IPC with a process in the host.
*
* Arguments:
* "ctx_id" - the configuration context ID.
* "port" - a vsock port that the guest will connect to for IPC.
* "filepath" - a null-terminated string representing the path of the UNIX
* socket in the host.
* "listen" - true if guest expects connections to be initiated from host side
*/
int32_t krun_add_vsock_port2(uint32_t ctx_id,
uint32_t port,
const char *c_filepath,
bool listen);
/**
* Returns the eventfd file descriptor to signal the guest to shut down orderly. This must be
* called before starting the microVM with "krun_start_event". Only available in libkrun-efi.
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Vsock {
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
queues: Vec<VirtQueue>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let mut queue_events = Vec::new();
for _ in 0..queues.len() {
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Vsock {
pub fn new(
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let queues: Vec<VirtQueue> = defs::QUEUE_SIZES
.iter()
Expand Down
16 changes: 13 additions & 3 deletions src/devices/src/virtio/vsock/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct VsockMuxer {
irq_line: Option<u32>,
proxy_map: ProxyMap,
reaper_sender: Option<Sender<u64>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
}

impl VsockMuxer {
Expand All @@ -120,7 +120,7 @@ impl VsockMuxer {
host_port_map: Option<HashMap<u16, u16>>,
interrupt_evt: EventFd,
interrupt_status: Arc<AtomicUsize>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> Self {
VsockMuxer {
cid,
Expand Down Expand Up @@ -179,6 +179,7 @@ impl VsockMuxer {
intc,
irq_line,
sender.clone(),
self.unix_ipc_port_map.clone().unwrap_or_default(),
);
thread.run();

Expand Down Expand Up @@ -499,9 +500,18 @@ impl VsockMuxer {
if let Some(proxy) = proxy_map.get(&id) {
proxy.lock().unwrap().confirm_connect(pkt)
} else if let Some(ref mut ipc_map) = &mut self.unix_ipc_port_map {
if let Some(path) = ipc_map.get(&pkt.dst_port()) {
if let Some((path, listen)) = ipc_map.get(&pkt.dst_port()) {
let mem = self.mem.as_ref().unwrap();
let queue = self.queue.as_ref().unwrap();
if *listen {
warn!("vsock: Attempting to connect a socket that is listening, sending rst");
let rx = MuxerRx::Reset {
local_port: pkt.dst_port(),
peer_port: pkt.src_port(),
};
push_packet(self.cid, rx, &self.rxq, queue, mem);
return;
}
let rxq = self.rxq.clone();

let mut unix = UnixProxy::new(
Expand Down
71 changes: 57 additions & 14 deletions src/devices/src/virtio/vsock/muxer_thread.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
Expand All @@ -8,9 +10,11 @@ use super::super::Queue as VirtQueue;
use super::super::VIRTIO_MMIO_INT_VRING;
use super::muxer::{push_packet, MuxerRx, ProxyMap};
use super::muxer_rxq::MuxerRxQ;
use super::proxy::{ProxyRemoval, ProxyUpdate};
use super::proxy::{NewProxyType, Proxy, ProxyRemoval, ProxyUpdate};
use super::tcp::TcpProxy;

use crate::virtio::vsock::defs;
use crate::virtio::vsock::unix::{UnixAcceptorProxy, UnixProxy};
use crossbeam_channel::Sender;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
Expand All @@ -29,6 +33,7 @@ pub struct MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
}

impl MuxerThread {
Expand All @@ -45,6 +50,7 @@ impl MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
) -> Self {
MuxerThread {
cid,
Expand All @@ -58,6 +64,7 @@ impl MuxerThread {
intc,
irq_line,
reaper_sender,
unix_ipc_port_map,
}
}

Expand Down Expand Up @@ -111,24 +118,36 @@ impl MuxerThread {

let mut should_signal = update.signal_queue;

if let Some((peer_port, accept_fd)) = update.new_proxy {
if let Some((peer_port, accept_fd, proxy_type)) = update.new_proxy {
let local_port: u32 = thread_rng.gen_range(1024..u32::MAX);
let new_id: u64 = (peer_port as u64) << 32 | local_port as u64;
let new_proxy = TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
);
let new_proxy: Box<dyn Proxy> = match proxy_type {
NewProxyType::Tcp => Box::new(TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
NewProxyType::Unix => Box::new(UnixProxy::new_reverse(
new_id,
self.cid,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
};
self.proxy_map
.write()
.unwrap()
.insert(new_id, Mutex::new(Box::new(new_proxy)));
.insert(new_id, Mutex::new(new_proxy));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&new_id) {
proxy.lock().unwrap().push_op_request();
};
Expand All @@ -147,8 +166,32 @@ impl MuxerThread {
}
}

fn create_lisening_ipc_sockets(&self) {
for (port, (path, do_listen)) in &self.unix_ipc_port_map {
if !do_listen {
continue;
}
let id = (*port as u64) << 32 | defs::TSI_PROXY_PORT as u64;
let proxy = match UnixAcceptorProxy::new(id, path, *port) {
Ok(proxy) => proxy,
Err(e) => {
warn!("Failed to create listening proxy at {:?}: {:?}", path, e);
continue;
}
};
self.proxy_map
.write()
.unwrap()
.insert(id, Mutex::new(Box::new(proxy)));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&id) {
self.update_polling(id, proxy.lock().unwrap().as_raw_fd(), EventSet::IN);
};
}
}

fn work(self) {
let mut thread_rng = thread_rng();
self.create_lisening_ipc_sockets();
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match self
Expand Down
9 changes: 8 additions & 1 deletion src/devices/src/virtio/vsock/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ pub enum ProxyRemoval {
Deferred,
}

#[derive(Default)]
pub enum NewProxyType {
#[default]
Tcp,
Unix,
}

#[derive(Default)]
pub struct ProxyUpdate {
pub signal_queue: bool,
pub remove_proxy: ProxyRemoval,
pub polling: Option<(u64, RawFd, EventSet)>,
pub new_proxy: Option<(u32, RawFd)>,
pub new_proxy: Option<(u32, RawFd, NewProxyType)>,
pub push_accept: Option<(u64, u64)>,
pub push_credit_req: Option<MuxerRx>,
}
Expand Down
6 changes: 4 additions & 2 deletions src/devices/src/virtio/vsock/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use super::muxer_rxq::MuxerRxQ;
use super::packet::{
TsiAcceptReq, TsiConnectReq, TsiGetnameRsp, TsiListenReq, TsiSendtoAddr, VsockPacket,
};
use super::proxy::{Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt};
use super::proxy::{
NewProxyType, Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt,
};
use utils::epoll::EventSet;

use vm_memory::GuestMemoryMmap;
Expand Down Expand Up @@ -729,7 +731,7 @@ impl Proxy for TcpProxy {
{
match accept(self.fd) {
Ok(accept_fd) => {
update.new_proxy = Some((self.peer_port, accept_fd));
update.new_proxy = Some((self.peer_port, accept_fd, NewProxyType::Tcp));
}
Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e),
};
Expand Down
Loading
Loading