diff --git a/agent-antagonist/src/main.rs b/agent-antagonist/src/main.rs index 65340bde6..c30fa20b8 100644 --- a/agent-antagonist/src/main.rs +++ b/agent-antagonist/src/main.rs @@ -7,7 +7,7 @@ use rand::random; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use slog::Logger; -use slog::{info, warn}; +use slog::{debug, info, warn}; use std::net::SocketAddr; use std::process::Command; use std::sync::{ @@ -27,8 +27,11 @@ use crucible_agent_client::{ about = "Stress tester for the crucible agent" )] enum Args { - /// Run a number of antagonist loop tasks that will do the following: + /// Run a number of antagonist loop tasks. + /// There are two options for what the test will do. + /// All tests operate on a 1GB region. /// + /// The default test loop will do this loop: /// - Create a 1GB region /// /// - Randomly: @@ -40,6 +43,14 @@ enum Args { /// /// - Delete the region /// + /// The clone stress test will do this: + /// - Create a 1GB region + /// - Create a snapshot of that region. + /// - Loop on: + /// - Clone the snapshot to a new region. + /// - Delete the region. + /// + /// /// Additionally, one task is spawned that will: /// /// - Get a list of regions @@ -51,6 +62,10 @@ enum Args { #[clap(short, long)] agent: Option, + /// Stress test the downstairs clone of a snapshot operation. + #[clap(long, action)] + clone_stress: bool, + /// Dataset for the crucible agent - leave blank to autodetect if in the /// crucible zone #[clap(short, long)] @@ -69,7 +84,7 @@ fn command(log: &Logger, bin: &'static str, args: &[&str]) -> Result { info!(log, "{} {:?} took {:?}", bin, args, elapsed); if !cmd.status.success() { - bail!("zfs list failed!"); + bail!("command: {} {:?} failed {:?}", bin, args, cmd); } Ok(String::from_utf8(cmd.stdout)?.trim_end().to_string()) @@ -174,6 +189,95 @@ async fn main_thread( Ok(()) } +async fn main_clone_thread( + log: Logger, + agent: SocketAddr, + dataset: String, + stop_flag: Arc, +) -> Result<()> { + // Create a 1 GB region + let region_id = Uuid::new_v4(); + + let region_request = CreateRegion { + block_size: 512, + extent_count: 16, + extent_size: 131072, + id: RegionId(region_id.to_string()), + encrypted: true, + cert_pem: None, + key_pem: None, + root_pem: None, + source: None, + }; + + if let Err(e) = create_a_region(agent, &log, region_request.clone()).await { + bail!("Region create {region_id} failed: {e}"); + } + + let snapshot_id = Uuid::new_v4(); + info!(log, "Create snapshot {snapshot_id}"); + + if let Err(e) = + create_a_snapshot(agent, &log, &dataset, region_id, snapshot_id).await + { + bail!("Snapshot create returned {e}"); + } + + let mut count = 1; + loop { + if stop_flag.load(Ordering::SeqCst) { + break; + } + let clone_region_id = Uuid::new_v4(); + info!( + log, + "From {region_id}--{snapshot_id} clone:{clone_region_id} at \ + count:{count}---" + ); + if let Err(e) = clone_a_snapshot( + agent, + &log, + region_id, + region_request.clone(), + snapshot_id, + clone_region_id, + ) + .await + { + bail!("Snapshot clone returned {e}"); + } + + info!(log, "Delete clone:{clone_region_id} at count:{count}"); + + if let Err(e) = delete_a_region(agent, &log, clone_region_id).await { + bail!("Region clone delete {clone_region_id} failed: {e}"); + } + count += 1; + info!( + log, + "Completed {:5} clones from {region_id}--{snapshot_id}", count + ); + + // If we don't add a little disturbance, all the threads end up + // cloning at the same time. This little variation here will ensure + // that the tasks are not always synced up to each other. + if random() && random() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + if let Err(e) = delete_a_snapshot(agent, &log, region_id, snapshot_id).await + { + bail!("Snapshot delete returned {e}"); + } + + // Delete region + if let Err(e) = delete_a_region(agent, &log, region_id).await { + bail!("Region delete {region_id} failed: {e}"); + } + Ok(()) +} + // Create a region. // Loop till it is ready. async fn create_a_region( @@ -181,12 +285,13 @@ async fn create_a_region( log: &Logger, region_request: CreateRegion, ) -> Result<()> { + info!(log, "creating region {:?}", region_request.id); + let mut retry = 1; loop { - info!(log, "creating region {:?}", region_request.id); let client = get_client(&agent); let region = match client.region_create(®ion_request).await { Ok(region) => { - info!(log, "creating region {:?} ok", region_request.id,); + debug!(log, "creating region {:?} ok", region_request.id,); region } @@ -203,9 +308,10 @@ async fn create_a_region( RegionState::Requested => { info!( log, - "waiting for region {:?}: state {:?}", + "waiting for region {:?}: state {:?} try:{}", region_request.id, region.state, + retry, ); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -226,6 +332,7 @@ async fn create_a_region( ); } } + retry += 1; } Ok(()) } @@ -567,6 +674,44 @@ async fn clone_a_snapshot( "Use {:?} for source clone {:?}", source_addr, region_request ); + // We just created a region, then took a snapshot. Next we will try to + // clone that snapshot. It's possible we arrive here before the downstairs + // (that we are cloning from) has come all the way online. We loop a few + // times and hit a known endpoint on the expected downstairs repair port to + // verify that things have come online before trying to clone. This avoids + // us trying to clone too soon. + let url = format!("http://{}/region-info", source_addr).to_string(); + + let mut retry = 0; + loop { + let res = reqwest::get(url.clone()).await; + match res { + Ok(resp) => { + if resp.status().is_success() { + info!(log, "http to clone {} was successful.", url); + break; + } else { + warn!( + log, + "Request {retry} to {} failed with status: {}", + url, + resp.status() + ); + } + } + Err(e) => { + warn!(log, "Request {retry} to {} failed: {}", url, e); + } + } + retry += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if retry > 20 { + bail!("Failed check to clone endpoint {}", url); + } else { + warn!(log, "http to clone {} failed, try:{}", url, retry); + } + } + if let Err(e) = create_a_region(agent, log, region_request.clone()).await { bail!("Region clone create failed, returned {e}"); } @@ -582,14 +727,15 @@ async fn delete_a_region( log: &Logger, region_id: Uuid, ) -> Result<()> { + info!(log, "tombstoning region {:?}", region_id); + let mut retry = 1; loop { - info!(log, "tombstoning region {:?}", region_id); let client = get_client(&agent); let r = client.region_delete(&RegionId(region_id.to_string())).await; drop(client); match r { Ok(_) => { - info!(log, "tombstoning region {:?} ok", region_id); + debug!(log, "tombstoning region {:?} ok", region_id); } Err(e) => { @@ -602,7 +748,7 @@ async fn delete_a_region( drop(client); let region = match r { Ok(region) => { - info!(log, "get region {:?} ok", region_id); + debug!(log, "get region {:?} ok", region_id); region } @@ -615,9 +761,10 @@ async fn delete_a_region( RegionState::Tombstoned => { info!( log, - "waiting for region {:?}: state {:?}", + "waiting for region {:?}: state {:?} try:{}", region_id, region.state, + retry, ); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -635,6 +782,7 @@ async fn delete_a_region( ); } } + retry += 1; } Ok(()) } @@ -724,6 +872,7 @@ async fn main() -> Result<()> { match args { Args::Run { agent, + clone_stress, dataset, tasks, } => { @@ -788,14 +937,28 @@ async fn main() -> Result<()> { let dataset = dataset.clone(); let stop_flag_clone = stop_flag.clone(); - tokio::spawn(async move { - main_thread(log, agent, dataset, stop_flag_clone).await - }) + if clone_stress { + tokio::spawn(async move { + main_clone_thread( + log, + agent, + dataset, + stop_flag_clone, + ) + .await + }) + } else { + tokio::spawn(async move { + main_thread(log, agent, dataset, stop_flag_clone) + .await + }) + } }) .collect(); // Add another task that grabs all regions, and queries all // snapshots for those regions + let stop_flag_clone = stop_flag.clone(); let qlog = log.new(slog::o!("query" => 0)); jhs.push(tokio::spawn(async move { diff --git a/agent/src/datafile.rs b/agent/src/datafile.rs index 42b54abae..4cd4d1048 100644 --- a/agent/src/datafile.rs +++ b/agent/src/datafile.rs @@ -2,18 +2,24 @@ use super::model::*; use anyhow::{anyhow, bail, Result}; +use chrono::{DateTime, Utc}; use crucible_common::write_json; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use slog::{crit, error, info, Logger}; -use std::collections::BTreeMap; +use slog::{crit, error, info, warn, Logger}; +use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::sync::{mpsc, Arc, Condvar, Mutex}; +use std::thread::JoinHandle; use crate::snapshot_interface::SnapshotInterface; use crate::ZFSDataset; +/// Maximum parallel region creation we allow. +const MAX_REGION_WORK: usize = 5; + pub struct DataFile { log: Logger, base_path: PathBuf, @@ -22,8 +28,28 @@ pub struct DataFile { port_min: u16, port_max: u16, bell: Condvar, - inner: Mutex, + outer: Mutex, snapshot_interface: Arc, + // When any task is updating SMF, it should obtain this lock first. + pub smf_lock: Mutex, +} + +/// Describing an active region create job the agent is doing. +struct RegionJob { + /// When this job was requested. + request_time: DateTime, + /// The join_handle for the spawned worker thread + join_handle: JoinHandle>, + /// When the thread has finished its work, a message will arrive here. + done_rx: mpsc::Receiver, +} + +// This struct covers both the inner regions and snapshots as well as +// the work queue for the agent. We put both here so we can protect +// them in the same mutex. +struct Outer { + inner: Inner, + work_queue: HashMap, } #[derive(Serialize, Deserialize, Default)] @@ -33,6 +59,12 @@ struct Inner { running_snapshots: BTreeMap>, } +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct JobInfo { + request_time: DateTime, + region_id: RegionId, +} + impl DataFile { pub fn new( log: Logger, @@ -57,6 +89,9 @@ impl DataFile { } }; + let work_queue = HashMap::new(); + let outer = Outer { inner, work_queue }; + Ok(DataFile { log, base_path: base_path.to_path_buf(), @@ -65,8 +100,9 @@ impl DataFile { port_min, port_max, bell: Condvar::new(), - inner: Mutex::new(inner), + outer: Mutex::new(outer), snapshot_interface, + smf_lock: Mutex::new(false), }) } @@ -75,31 +111,64 @@ impl DataFile { } pub fn regions(&self) -> Vec { - self.inner - .lock() - .unwrap() - .regions - .values() - .cloned() - .collect() + let outer = self.outer.lock().unwrap(); + outer.inner.regions.values().cloned().collect() } pub fn running_snapshots( &self, ) -> BTreeMap> { - self.inner.lock().unwrap().running_snapshots.clone() + self.outer.lock().unwrap().inner.running_snapshots.clone() } pub fn get(&self, id: &RegionId) -> Option { - self.inner.lock().unwrap().regions.get(id).cloned() + self.outer.lock().unwrap().inner.regions.get(id).cloned() + } + + // Add the details about a spawned work task to the work queue. + pub fn add_work( + &self, + id: RegionId, + join_handle: JoinHandle>, + request_time: DateTime, + done_rx: mpsc::Receiver, + ) { + let work_queue = &mut self.outer.lock().unwrap().work_queue; + let region_job = RegionJob { + request_time, + join_handle, + done_rx, + }; + work_queue.insert(id, region_job); + } + + // Return a Vec of JobInfo about all jobs on the work queue. + pub fn get_work_queue(&self) -> Vec { + let work_queue = &mut self.outer.lock().unwrap().work_queue; + let mut regions = Vec::new(); + for (k, v) in work_queue.iter() { + let job_info = JobInfo { + request_time: v.request_time, + region_id: k.clone(), + }; + regions.push(job_info); + } + regions + } + + // When a piece of work has completed, it should call this to signal to + // the main thread that work has completed. + pub fn work_done(&self, done_tx: mpsc::Sender) { + let _ = done_tx.send(true); + self.bell.notify_all(); } /** * Store the database into the JSON file. */ - fn store(&self, inner: MutexGuard) { + fn store(&self, inner: &Inner) { loop { - match write_json(&self.conf_path, &*inner, true) { + match write_json(&self.conf_path, inner, true) { Ok(()) => return, Err(e) => { /* @@ -117,7 +186,7 @@ impl DataFile { } } - fn get_free_port(&self, inner: &MutexGuard) -> Result { + fn get_free_port(&self, inner: &Inner) -> Result { for port_number in self.port_min..=self.port_max { let mut region_uses_port = false; let mut running_snapshot_uses_port = false; @@ -180,7 +249,7 @@ impl DataFile { &self, create: CreateRegion, ) -> Result { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for a region with this ID. @@ -204,7 +273,7 @@ impl DataFile { /* * Allocate a port number that is not yet in use. */ - let port_number = self.get_free_port(&inner)?; + let port_number = self.get_free_port(inner)?; let read_only = create.source.is_some(); @@ -243,7 +312,7 @@ impl DataFile { &self, request: CreateRunningSnapshotRequest, ) -> Result { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for an existing running snapshot. @@ -293,7 +362,7 @@ impl DataFile { /* * Allocate a port number that is not yet in use. */ - let port_number = self.get_free_port(&inner)?; + let port_number = self.get_free_port(inner)?; let s = RunningSnapshot { id: request.id.clone(), @@ -330,7 +399,7 @@ impl DataFile { &self, request: DeleteRunningSnapshotRequest, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for an existing running snapshot. @@ -426,7 +495,7 @@ impl DataFile { &self, request: DeleteSnapshotRequest, ) -> Result<()> { - let inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Are we running a read-only downstairs for this snapshot? Fail if so. @@ -518,7 +587,7 @@ impl DataFile { * Mark a particular region as failed to provision. */ pub fn fail(&self, id: &RegionId) { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Failed; @@ -539,7 +608,7 @@ impl DataFile { * Mark a particular running snapshot as failed to provision. */ pub fn fail_rs(&self, region_id: &RegionId, snapshot_name: &str) { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -570,7 +639,7 @@ impl DataFile { * Mark a particular region as provisioned. */ pub fn created(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Created; @@ -604,7 +673,7 @@ impl DataFile { region_id: &RegionId, snapshot_name: &str, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -672,7 +741,7 @@ impl DataFile { * used in a saga. */ pub fn destroyed(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Destroyed; @@ -702,7 +771,7 @@ impl DataFile { region_id: &RegionId, snapshot_name: &str, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -731,7 +800,7 @@ impl DataFile { * Nexus has requested that we destroy this particular region. */ pub fn destroy(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner .regions @@ -772,9 +841,41 @@ impl DataFile { * wait on the condition variable. */ pub fn first_in_states(&self, states: &[State]) -> Resource { - let mut inner = self.inner.lock().unwrap(); + let mut outer = self.outer.lock().unwrap(); loop { + // First check to see if there are completed jobs on the + // region create work queue. If we find any that are done, + // then remove them now. + { + let mut done_jobs = Vec::new(); + for (id, region_job) in outer.work_queue.iter_mut() { + if region_job.join_handle.is_finished() + || region_job.done_rx.try_recv().is_ok() + { + done_jobs.push(id.clone()); + } else { + info!(self.log, "id {:?} still running", id); + } + } + + for done_id in done_jobs { + let region_job = outer.work_queue.remove(&done_id).unwrap(); + // Wait for the thread to wrap up. + if let Err(e) = region_job.join_handle.join() { + warn!( + self.log, + "Exiting work thread reported: {:?}", e + ); + } + } + + info!( + self.log, + "reqion create work queue len is now: {:?}", + outer.work_queue.len() + ); + } /* * States are provided in priority order. We check for regions * in the first requested state before we check for @@ -783,20 +884,46 @@ impl DataFile { * regions ahead of creating new regions. */ for s in states { - for r in inner.regions.values() { + for r in outer.inner.regions.values() { if &r.state == s { - return Resource::Region(r.clone()); + // If this region ID is on the work queue hashmap, then + // let that work finish before we take any other action + // on it. + if outer.work_queue.contains_key(&r.id) { + continue; + } + + // We only return regions in Requested state if we + // have not started working on them yet, and we have + // room on the work queue. Otherwise, they remain + // requested until we can service them. + if r.state == State::Requested { + assert!(!outer.work_queue.contains_key(&r.id)); + if outer.work_queue.len() < MAX_REGION_WORK { + info!(self.log, "ID {:?} ready to add", r.id); + return Resource::Region(r.clone()); + } else { + info!(self.log, "No room for {:?} on wq", r.id); + continue; + } + } else { + return Resource::Region(r.clone()); + } } } - for (rid, r) in &inner.running_snapshots { + for (rid, r) in &outer.inner.running_snapshots { for (name, rs) in r { if &rs.state == s { - return Resource::RunningSnapshot( - rid.clone(), - name.clone(), - rs.clone(), - ); + if outer.work_queue.contains_key(rid) { + continue; + } else { + return Resource::RunningSnapshot( + rid.clone(), + name.clone(), + rs.clone(), + ); + } } } } @@ -806,7 +933,7 @@ impl DataFile { * If we did not find any regions in the specified state, sleep * on the condvar. */ - inner = self.bell.wait(inner).unwrap(); + outer = self.bell.wait(outer).unwrap(); } } diff --git a/agent/src/main.rs b/agent/src/main.rs index 8f3a61855..250c034c9 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,6 +1,8 @@ // Copyright 2021 Oxide Computer Company +use crate::model::Region; use anyhow::{anyhow, bail, Result}; +use chrono::Utc; use clap::Parser; use dropshot::{ConfigLogging, ConfigLoggingIfExists, ConfigLoggingLevel}; use slog::{debug, error, info, o, Logger}; @@ -9,7 +11,7 @@ use std::io::Write; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; const PROG: &str = "crucible-agent"; const SERVICE: &str = "oxide/crucible/downstairs"; @@ -77,7 +79,7 @@ enum Args { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ZFSDataset { dataset: String, } @@ -158,6 +160,7 @@ impl ZFSDataset { cmd.arg("-o").arg(format!("quota={}", quota)); } + info!(log, "cmd is: {:?} {:?}", cmd, dataset); let res = cmd.arg(&dataset).output()?; if !res.status.success() { @@ -352,6 +355,7 @@ fn apply_smf( downstairs_prefix: &str, snapshot_prefix: &str, ) -> Result<()> { + let _smf_lock = df.smf_lock.lock().unwrap(); let scf = crucible_smf::Scf::new()?; let scope = scf.scope_local()?; let svc = scope @@ -400,7 +404,7 @@ where */ let expected_downstairs_instances = regions .iter() - .filter(|r| r.state == State::Created) + .filter(|r| r.state == State::Created || r.state == State::Requested) .map(|r| format!("{}-{}", downstairs_prefix, r.id.0)) .collect::>(); @@ -408,7 +412,9 @@ where .iter() .flat_map(|(_, n)| { n.iter() - .filter(|(_, rs)| rs.state == State::Created) + .filter(|(_, rs)| { + rs.state == State::Created || rs.state == State::Requested + }) .map(|(_, rs)| { format!("{}-{}-{}", snapshot_prefix, rs.id.0, rs.name) }) @@ -554,7 +560,7 @@ where reconfig = true; info!( log, - "existing {} value {} does not match {}", + "existing {} value {} doesn't match {}", property.name, val.as_string()?, property.val, @@ -585,9 +591,9 @@ where } } else { /* - * No running snapshot means the service has never started. Prod - * the restarter by disabling it, then we'll create everything - * from scratch. + * No running snapshot means the service has never started. + * Prod the restarter by disabling it, then we'll create + * everything from scratch. */ inst.disable(false)?; true @@ -631,7 +637,7 @@ where info!(log, "ok!"); } crucible_smf::CommitResult::OutOfDate => { - error!(log, "concurrent modification?!"); + error!(log, "concurrent modification for: {}", r.id.0); } } } else { @@ -799,6 +805,10 @@ where } crucible_smf::CommitResult::OutOfDate => { error!(log, "concurrent modification?!"); + panic!( + "concurrent modification for snap: {}", + snapshot.id.0 + ); } } } else { @@ -901,8 +911,8 @@ mod test { Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()) } - /// Wrap a datafile, mock SMF interface, and mock snapshot interface, in order to test the - /// agent's SMF related behaviour. + /// Wrap a datafile, mock SMF interface, and mock snapshot interface, in + /// order to test the agent's SMF related behaviour. pub struct TestSmfHarness { log: Logger, dir: TempDir, @@ -1014,8 +1024,8 @@ mod test { impl Drop for TestSmfHarness { fn drop(&mut self) { - // If the agent zone bounces, it should read state from the datafile and recreate - // everything. Compare here during the drop. + // If the agent zone bounces, it should read state from the datafile + // and recreate everything. Compare here during the drop. let after_bounce_smf_interface = MockSmf::new(SERVICE.to_string()); let mut path_buf = self.dir.path().to_path_buf(); @@ -1031,8 +1041,8 @@ mod test { ) .unwrap(); - // Prune disabled services: a bounced agent zone will lose all these, and the agent - // will not recreate them. + // Prune disabled services: a bounced agent zone will lose all + // these, and the agent will not recreate them. self.smf_interface.prune(); assert_eq!(self.smf_interface, after_bounce_smf_interface); @@ -1585,6 +1595,108 @@ mod test { } } +/// Do all the steps required of the agent program to create a region. +/// This is called in a thread and takes care of both the actual creating of +/// a region, and creating the SMF service for that region. +/// +/// The function is responsible for updating the internal Datafile structure +/// with the results (pass or fail) from what it performs here. +/// +/// When this function is done, signal to the worker thread that state has +/// changed and it can cleanup this job and possibly create more work. +#[allow(clippy::too_many_arguments)] +fn agent_region_create( + log: Logger, + df: Arc, + regions_dataset: ZFSDataset, + regions_dataset_path: PathBuf, + downstairs_program: PathBuf, + downstairs_prefix: String, + r: Region, + snapshot_prefix: String, + done_tx: mpsc::Sender, +) { + let region_id = r.id.clone(); + info!(log, "spawned task for {:?}", region_id); + + /* + * Compute the actual size required for a full region, + * then add our metadata overhead to that. + */ + let region_size = r.block_size * r.extent_size * r.extent_count as u64; + let reservation = (region_size as f64 * RESERVATION_FACTOR).round() as u64; + let quota = region_size * QUOTA_FACTOR; + + info!( + log, + "Region size:{} reservation:{} quota:{}", + region_size, + reservation, + quota, + ); + + // If regions need to be created, do that before apply_smf. + let region_dataset = match regions_dataset.ensure_child_dataset( + &r.id.0, + Some(reservation), + Some(quota), + &log, + ) { + Ok(region_dataset) => region_dataset, + Err(e) => { + error!(log, "Dataset {} creation failed: {}", &r.id.0, e,); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + }; + + let dataset_path = match region_dataset.path() { + Ok(dataset_path) => dataset_path, + Err(e) => { + error!(log, "Failed to find path for dataset {}: {}", &r.id.0, e,); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + }; + + // It's important that a region transition to "Created" only after it has + // been created as a dataset: after the crucible agent restarts, + // `apply_smf` will only start downstairs services for those in "Created". + // If the `df.created` is moved to after this function's `apply_smf` call, + // and there is a crash before that moved `df.created` is set, then the + // agent will not start a downstairs service for this region when rebooted. + let res = + worker_region_create(&log, &downstairs_program, &r, &dataset_path) + .and_then(|_| df.created(&r.id)); + + if let Err(e) = res { + error!(log, "Region {:?} create failed: {:?}", r.id.0, e); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + + info!(log, "Applying SMF actions post create {:?} ...", r.id.0); + let result = apply_smf( + &log, + &df, + regions_dataset_path.clone(), + &downstairs_prefix, + &snapshot_prefix, + ); + + if let Err(e) = result { + error!(log, "SMF application failure: {:?}", e); + } else { + info!(log, "SMF ok!"); + } + + info!(log, "Task for {:?} done, send notify", region_id); + df.work_done(done_tx); +} + /** * For region with state Tombstoned, destroy the region. * @@ -1633,107 +1745,45 @@ fn worker( * then we finish up destroying the region. */ match &r.state { - State::Requested => 'requested: { - /* - * Compute the actual size required for a full region, - * then add our metadata overhead to that. - */ - let region_size = r.block_size - * r.extent_size - * r.extent_count as u64; - let reservation = - (region_size as f64 * RESERVATION_FACTOR).round() - as u64; - let quota = region_size * QUOTA_FACTOR; - - info!( - log, - "Region size:{} reservation:{} quota:{}", - region_size, - reservation, - quota, - ); - - // If regions need to be created, do that before - // apply_smf. - let region_dataset = match regions_dataset - .ensure_child_dataset( - &r.id.0, - Some(reservation), - Some(quota), - &log, - ) { - Ok(region_dataset) => region_dataset, - Err(e) => { - error!( - log, - "Dataset {} creation failed: {}", - &r.id.0, - e, - ); - df.fail(&r.id); - break 'requested; - } - }; - - let dataset_path = match region_dataset.path() { - Ok(dataset_path) => dataset_path, - Err(e) => { - error!( - log, - "Failed to find path for dataset {}: {}", - &r.id.0, - e, - ); - df.fail(&r.id); - break 'requested; - } - }; - - // It's important that a region transition to "Created" - // only after it has been created as a dataset: - // after the crucible agent restarts, `apply_smf` will - // only start downstairs services for those in - // "Created". If the `df.created` is moved to after this - // function's `apply_smf` call, and there is a crash - // before that moved `df.created` is set, then the agent - // will not start a downstairs service for this region - // when rebooted. - let res = worker_region_create( - &log, - &downstairs_program, - &r, - &dataset_path, - ) - .and_then(|_| df.created(&r.id)); - - if let Err(e) = res { - error!( - log, - "region {:?} create failed: {:?}", r.id.0, e + State::Requested => { + // first_in_states has given us a new region to create + // that does not already have a job on work queue, so + // now we will create that job and spawn a task for + // it to do the work in. + let log0 = log.new(o!("component" => "worktask")); + let df_c = Arc::clone(&df); + let r_c = r.clone(); + let rd_c = regions_dataset.clone(); + let rdp_c = regions_dataset_path.clone(); + let dp_c = downstairs_program.clone(); + let dpre_c = downstairs_prefix.clone(); + let sp_c = snapshot_prefix.clone(); + let request_time = Utc::now(); + + info!(log0, "Spawing a region create for {:?}", r.id); + // This channel will tell us when the job is done. + let (done_tx, done_rx) = mpsc::channel(); + let job_handle = std::thread::spawn(move || { + agent_region_create( + log0, df_c, rd_c, rdp_c, dp_c, dpre_c, r_c, + sp_c, done_tx, ); - df.fail(&r.id); - break 'requested; - } - - info!(log, "applying SMF actions post create..."); - let result = apply_smf( - &log, - &df, - regions_dataset_path.clone(), - &downstairs_prefix, - &snapshot_prefix, + Ok(()) + }); + df.add_work( + r.id.clone(), + job_handle, + request_time, + done_rx, ); - - if let Err(e) = result { - error!(log, "SMF application failure: {:?}", e); - } else { - info!(log, "SMF ok!"); - } + info!(log, "Spawned a region create for {:?}", r.id); } State::Tombstoned => 'tombstoned: { - info!(log, "applying SMF actions before removal..."); + info!( + log, + "applying SMF actions before removal of {:?}", r.id + ); let result = apply_smf( &log, &df, @@ -1801,7 +1851,9 @@ fn worker( */ info!( log, - "applying SMF actions for region {} running snapshot {} (state {:?})...", + "applying SMF actions for region {} with {} running \ + snapshot {} (state {:?})...", + region_id.0, rs.id.0, rs.name, rs.state, diff --git a/agent/src/model.rs b/agent/src/model.rs index 614881efe..c07901568 100644 --- a/agent/src/model.rs +++ b/agent/src/model.rs @@ -202,6 +202,7 @@ impl CreateRegion { Clone, PartialOrd, Ord, + Hash, )] pub struct RegionId(pub String); diff --git a/agent/src/server.rs b/agent/src/server.rs index 63bf68dea..9ef66b821 100644 --- a/agent/src/server.rs +++ b/agent/src/server.rs @@ -1,5 +1,5 @@ // Copyright 2024 Oxide Computer Company -use super::datafile::DataFile; +use super::datafile::{DataFile, JobInfo}; use super::model; use anyhow::{anyhow, Result}; use dropshot::{ @@ -318,12 +318,22 @@ async fn region_delete_running_snapshot( match rc.context().delete_running_snapshot_request(request) { Ok(_) => Ok(HttpResponseDeleted()), Err(e) => Err(HttpError::for_internal_error(format!( - "running snapshot create failure: {:?}", + "running snapshot delete failure: {:?}", e ))), } } +#[endpoint { + method = GET, + path = "/crucible/0/work", +}] +async fn region_get_work_queue( + rc: RequestContext>, +) -> SResult>, HttpError> { + Ok(HttpResponseOk(rc.context().get_work_queue())) +} + pub fn make_api() -> Result>> { let mut api = dropshot::ApiDescription::new(); @@ -339,6 +349,7 @@ pub fn make_api() -> Result>> { api.register(region_run_snapshot)?; api.register(region_delete_running_snapshot)?; + api.register(region_get_work_queue)?; Ok(api) } diff --git a/openapi/crucible-agent.json b/openapi/crucible-agent.json index f46982f1e..74fcd64f7 100644 --- a/openapi/crucible-agent.json +++ b/openapi/crucible-agent.json @@ -298,6 +298,33 @@ } } } + }, + "/crucible/0/work": { + "get": { + "operationId": "region_get_work_queue", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Array_of_JobInfo", + "type": "array", + "items": { + "$ref": "#/components/schemas/JobInfo" + } + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { @@ -392,6 +419,22 @@ "snapshots" ] }, + "JobInfo": { + "type": "object", + "properties": { + "region_id": { + "$ref": "#/components/schemas/RegionId" + }, + "request_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "region_id", + "request_time" + ] + }, "Region": { "type": "object", "properties": {