Skip to content

Commit

Permalink
clean up interface to VmObjects
Browse files Browse the repository at this point in the history
- use wrapper types to hide the tokio-ness of the underlying
  reader-writer lock
- provide direct access to some subcomponents of `Machine` for brevity
  • Loading branch information
gjcolombo committed Jul 2, 2024
1 parent d712bf6 commit 9f23e19
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 58 deletions.
25 changes: 14 additions & 11 deletions bin/propolis-server/src/lib/migrate/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> DestinationProtocol<T> {
info!(self.log(), "Destination read Preamble: {:?}", preamble);

if let Err(e) = preamble.is_migration_compatible(
self.vm_objects.as_ref().unwrap().read().await.instance_spec(),
self.vm_objects
.as_ref()
.unwrap()
.lock_shared()
.await
.instance_spec(),
) {
error!(
self.log(),
Expand Down Expand Up @@ -354,10 +359,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> DestinationProtocol<T> {
info!(self.log(), "Devices: {devices:#?}");

{
let objects = self.vm_objects.as_ref().unwrap().read().await;
let migrate_ctx = MigrateCtx {
mem: &objects.machine().acc_mem.access().unwrap(),
};
let objects = self.vm_objects.as_ref().unwrap().lock_shared().await;
let migrate_ctx =
MigrateCtx { mem: &objects.access_mem().unwrap() };
for device in devices {
info!(
self.log(),
Expand Down Expand Up @@ -411,10 +415,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> DestinationProtocol<T> {
.vm_objects
.as_ref()
.unwrap()
.read()
.lock_shared()
.await
.machine()
.hdl
.vmm_hdl()
.clone();

let (dst_hrt, dst_wc) = vmm::time::host_time_snapshot(vmm_hdl)
Expand Down Expand Up @@ -612,7 +615,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> DestinationProtocol<T> {
self.vm_objects
.as_ref()
.unwrap()
.read()
.lock_shared()
.await
.com1()
.import(&com1_history)
Expand Down Expand Up @@ -693,8 +696,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> DestinationProtocol<T> {
addr: GuestAddr,
buf: &[u8],
) -> Result<(), MigrateError> {
let objects = self.vm_objects.as_ref().unwrap().read().await;
let memctx = objects.machine().acc_mem.access().unwrap();
let objects = self.vm_objects.as_ref().unwrap().lock_shared().await;
let memctx = objects.access_mem().unwrap();
let len = buf.len();
memctx.write_from(addr, buf, len);
Ok(())
Expand Down
42 changes: 22 additions & 20 deletions bin/propolis-server/src/lib/migrate/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ pub async fn migrate<T: AsyncRead + AsyncWrite + Unpin + Send>(
// See the lengthy comment on `RamOfferDiscipline` above for more
// details about what's going on here.
{
let objects = proto.vm.read().await;
let machine = objects.machine();
let objects = proto.vm.lock_shared().await;
for (&GuestAddr(gpa), dirtiness) in proto.dirt.iter().flatten() {
if let Err(e) = machine.hdl.set_dirty_pages(gpa, dirtiness) {
if let Err(e) =
objects.vmm_hdl().set_dirty_pages(gpa, dirtiness)
{
// Bad news! Our attempt to re-set the dirty bit on these
// pages has failed! Thus, subsequent migration attempts
// /!\ CAN NO LONGER RELY ON DIRTY PAGE TRACKING /!\
Expand Down Expand Up @@ -253,7 +254,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
// the pre-pause RAM push.
let dirt = {
let can_npt_operate =
vm.read().await.machine().hdl.can_npt_operate();
vm.lock_shared().await.vmm_hdl().can_npt_operate();

if can_npt_operate {
Some(Default::default())
Expand Down Expand Up @@ -326,7 +327,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
async fn sync(&mut self) -> Result<(), MigrateError> {
self.update_state(MigrationState::Sync).await;
let preamble = Preamble::new(VersionedInstanceSpec::V0(
self.vm.read().await.instance_spec().clone(),
self.vm.lock_shared().await.instance_spec().clone(),
));
let s = ron::ser::to_string(&preamble)
.map_err(codec::ProtocolError::from)?;
Expand Down Expand Up @@ -576,10 +577,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
self.update_state(MigrationState::Device).await;
let mut device_states = vec![];
{
let objects = self.vm.read().await;
let machine = objects.machine();
let objects = self.vm.lock_shared().await;
let migrate_ctx =
MigrateCtx { mem: &machine.acc_mem.access().unwrap() };
MigrateCtx { mem: &objects.access_mem().unwrap() };

// Collect together the serialized state for all the devices
objects.for_each_device_fallible(|name, devop| {
Expand Down Expand Up @@ -640,7 +640,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {

// Read and send over the time data
async fn time_data(&mut self) -> Result<(), MigrateError> {
let vmm_hdl = &self.vm.read().await.machine().hdl.clone();
let vmm_hdl = &self.vm.lock_shared().await.vmm_hdl().clone();
let vm_time_data =
vmm::time::export_time_data(vmm_hdl).map_err(|e| {
MigrateError::TimeData(format!(
Expand Down Expand Up @@ -678,8 +678,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
}
_ => return Err(MigrateError::UnexpectedMessage),
};
let com1_history =
self.vm.read().await.com1().export_history(remote_addr).await?;
let com1_history = self
.vm
.lock_shared()
.await
.com1()
.export_history(remote_addr)
.await?;
self.send_msg(codec::Message::Serialized(com1_history)).await?;
self.read_ok().await
}
Expand Down Expand Up @@ -792,9 +797,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
async fn vmm_ram_bounds(
&mut self,
) -> Result<RangeInclusive<GuestAddr>, MigrateError> {
let objects = self.vm.read().await;
let machine = objects.machine();
let memctx = machine.acc_mem.access().unwrap();
let objects = self.vm.lock_shared().await;
let memctx = objects.access_mem().unwrap();
memctx.mem_bounds().ok_or(MigrateError::InvalidInstanceState)
}

Expand All @@ -804,10 +808,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
bits: &mut [u8],
) -> Result<(), MigrateError> {
self.vm
.read()
.lock_shared()
.await
.machine()
.hdl
.vmm_hdl()
.track_dirty_pages(start_gpa.0, bits)
.map_err(|_| MigrateError::InvalidInstanceState)
}
Expand All @@ -817,9 +820,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> SourceProtocol<T> {
addr: GuestAddr,
buf: &mut [u8],
) -> Result<(), MigrateError> {
let objects = self.vm.read().await;
let machine = objects.machine();
let memctx = machine.acc_mem.access().unwrap();
let objects = self.vm.lock_shared().await;
let memctx = objects.access_mem().unwrap();
let len = buf.len();
memctx.direct_read_into(addr, buf, len);
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions bin/propolis-server/src/lib/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ async fn instance_serial_history_get(
{
let ctx = rqctx.context();
let vm = ctx.vm.active_vm().await.ok_or_else(not_created_error)?;
let serial = vm.objects().read().await.com1().clone();
let serial = vm.objects().lock_shared().await.com1().clone();
let query_params = query.into_inner();

let byte_offset = SerialHistoryOffset::try_from(&query_params)?;
Expand Down Expand Up @@ -483,7 +483,7 @@ async fn instance_serial(
) -> dropshot::WebsocketChannelResult {
let ctx = rqctx.context();
let vm = ctx.vm.active_vm().await.ok_or_else(not_created_error)?;
let serial = vm.objects().read().await.com1().clone();
let serial = vm.objects().lock_shared().await.com1().clone();

// Use the default buffering paramters for the websocket configuration
//
Expand Down Expand Up @@ -577,7 +577,7 @@ async fn instance_issue_crucible_snapshot_request(
) -> Result<HttpResponseOk<()>, HttpError> {
let vm =
rqctx.context().vm.active_vm().await.ok_or_else(not_created_error)?;
let objects = vm.objects().read().await;
let objects = vm.objects().lock_shared().await;
let path_params = path_params.into_inner();

let backend =
Expand All @@ -604,7 +604,7 @@ async fn disk_volume_status(
let path_params = path_params.into_inner();
let vm =
rqctx.context().vm.active_vm().await.ok_or_else(not_created_error)?;
let objects = vm.objects().read().await;
let objects = vm.objects().lock_shared().await;
let backend =
objects.crucible_backends().get(&path_params.id).ok_or_else(|| {
let s = format!("No crucible backend for id {}", path_params.id);
Expand Down Expand Up @@ -667,7 +667,7 @@ async fn instance_issue_nmi(
) -> Result<HttpResponseOk<()>, HttpError> {
let vm =
rqctx.context().vm.active_vm().await.ok_or_else(not_created_error)?;
let _ = vm.objects().read().await.machine().inject_nmi();
let _ = vm.objects().lock_shared().await.machine().inject_nmi();

Ok(HttpResponseOk(()))
}
Expand Down
4 changes: 2 additions & 2 deletions bin/propolis-server/src/lib/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl Vm {
}
};

let spec = vm.objects().read().await.instance_spec().clone();
let spec = vm.objects().lock_shared().await.instance_spec().clone();
let state = vm.external_state_rx.borrow().clone();
Ok(propolis_api_types::InstanceSpecGetResponse {
properties: vm.properties.clone(),
Expand Down Expand Up @@ -422,7 +422,7 @@ impl Vm {
panic!("VM should be active before being run down");
};

let spec = vm.objects().read().await.instance_spec().clone();
let spec = vm.objects().lock_shared().await.instance_spec().clone();
let ActiveVm { external_state_rx, properties, .. } = vm;
guard.state = VmState::Rundown(VmDescription {
external_state_rx,
Expand Down
53 changes: 48 additions & 5 deletions bin/propolis-server/src/lib/vm/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! A collection of all of the components that make up a Propolis VM instance.
use std::{
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -13,6 +14,7 @@ use std::{
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use propolis::{
hw::{ps2::ctrl::PS2Ctrl, qemu::ramfb::RamFb, uart::LpcUart},
vmm::VmmHdl,
Machine,
};
use propolis_api_types::instance_spec::v0::InstanceSpecV0;
Expand Down Expand Up @@ -112,17 +114,17 @@ impl VmObjects {
///
/// This function is crate-visible to allow the API layer to read (but not
/// mutate) VM objects.
pub(crate) async fn read(&self) -> RwLockReadGuard<VmObjectsLocked> {
self.inner.read().await
pub(crate) async fn lock_shared(&self) -> VmObjectsShared {
VmObjectsShared(self.inner.read().await)
}

/// Yields an exclusive lock guard referring to the underlying object
/// collection.
///
/// This function is only visible within the `vm` module so that only the
/// state driver can obtain a mutable reference to the underlying objects.
pub(super) async fn write(&self) -> RwLockWriteGuard<VmObjectsLocked> {
self.inner.write().await
pub(super) async fn lock_exclusive(&self) -> VmObjectsExclusive {
VmObjectsExclusive(self.inner.write().await)
}
}

Expand Down Expand Up @@ -153,11 +155,24 @@ impl VmObjectsLocked {
&mut self.instance_spec
}

/// Yields the VM's current kernel VMM handle.
/// Yields the VM's current Propolis VM aggregation.
pub(crate) fn machine(&self) -> &Machine {
&self.machine
}

/// Yields the VM's current kernel VMM handle.
pub(crate) fn vmm_hdl(&self) -> &Arc<VmmHdl> {
&self.machine.hdl
}

/// Yields an accessor to the VM's memory context, or None if guest memory
/// is not currently accessible.
pub(crate) fn access_mem(
&self,
) -> Option<propolis::accessors::Guard<propolis::vmm::MemCtx>> {
self.machine.acc_mem.access()
}

/// Obtains a handle to the lifecycle trait object for the component with
/// the supplied `name`.
pub(crate) fn device_by_name(
Expand Down Expand Up @@ -452,3 +467,31 @@ impl Drop for VmObjects {
});
}
}

/// A shared lock on the contents of a [`VmObjects`].
pub(crate) struct VmObjectsShared<'o>(RwLockReadGuard<'o, VmObjectsLocked>);

/// An exclusive lock on the contents of a [`VmObjects`].
pub(crate) struct VmObjectsExclusive<'o>(RwLockWriteGuard<'o, VmObjectsLocked>);

impl Deref for VmObjectsShared<'_> {
type Target = VmObjectsLocked;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Deref for VmObjectsExclusive<'_> {
type Target = VmObjectsLocked;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for VmObjectsExclusive<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
8 changes: 4 additions & 4 deletions bin/propolis-server/src/lib/vm/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
stats::virtual_machine::VirtualMachine, vnc::PropolisVncServer,
};

use super::objects::{VmObjects, VmObjectsLocked};
use super::objects::{VmObjects, VmObjectsShared};

/// Information used to serve Oximeter metrics.
#[derive(Default)]
Expand Down Expand Up @@ -62,7 +62,7 @@ impl VmServices {
OximeterState::default()
};

let vm_objects = vm_objects.read().await;
let vm_objects = vm_objects.lock_shared().await;
let vnc_server = ensure_options.vnc_server.clone();
if let Some(ramfb) = vm_objects.framebuffer() {
vnc_server
Expand Down Expand Up @@ -179,9 +179,9 @@ async fn register_oximeter_producer(
}

/// Launches a serial console handler task.
async fn start_serial_task(
async fn start_serial_task<'vm>(
log: &slog::Logger,
vm_objects: &tokio::sync::RwLockReadGuard<'_, VmObjectsLocked>,
vm_objects: &VmObjectsShared<'vm>,
) -> crate::serial::SerialTask {
let (websocks_ch, websocks_recv) = tokio::sync::mpsc::channel(1);
let (control_ch, control_recv) = tokio::sync::mpsc::channel(1);
Expand Down
7 changes: 5 additions & 2 deletions bin/propolis-server/src/lib/vm/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl MigrateAsTargetContext {
// Drop the lock after this operation so that the migration task can
// acquire it to enumerate devices and import state into them.
{
let guard = self.vm_objects.read().await;
let guard = self.vm_objects.lock_shared().await;
guard.reset_vcpus();
guard.pause_kernel_vm();
}
Expand Down Expand Up @@ -346,7 +346,10 @@ impl MigrateAsTargetContext {
error!(self.log, "target migration task failed";
"error" => %e);

self.vm_objects.write().await.resume_kernel_vm();
self.vm_objects
.lock_exclusive()
.await
.resume_kernel_vm();
return Err(e);
}
},
Expand Down
Loading

0 comments on commit 9f23e19

Please sign in to comment.