diff --git a/Cargo.lock b/Cargo.lock index 1e2d195657..14c2cdcbe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3321,6 +3321,7 @@ dependencies = [ "oximeter-instruments", "oximeter-producer", "parse-display", + "petgraph", "pq-sys", "propolis-client", "rand 0.8.5", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index ef426c0912..b489e69a7d 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -131,6 +131,7 @@ nexus-test-utils = { path = "test-utils" } omicron-test-utils = { path = "../test-utils" } omicron-sled-agent = { path = "../sled-agent" } openapiv3 = "1.0" +petgraph = "0.6.2" regex = "1.6.0" subprocess = "0.2.9" term = "0.7" diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index 91f74de1fd..8f6e79662a 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -981,3 +981,222 @@ async fn sic_instance_ensure( Ok(()) } + +#[cfg(test)] +mod test { + use crate::{ + app::saga::create_saga_dag, app::sagas::instance_create::Params, + app::sagas::instance_create::SagaInstanceCreate, + authn::saga::Serialized, context::OpContext, db::datastore::DataStore, + external_api::params, + }; + use async_bb8_diesel::{AsyncRunQueryDsl, OptionalExtension}; + use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; + use dropshot::test_util::ClientTestContext; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_ip_pool; + use nexus_test_utils::resource_helpers::create_organization; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils_macros::nexus_test; + use omicron_common::api::external::{ + ByteCount, IdentityMetadataCreateParams, InstanceCpuCount, + }; + use omicron_sled_agent::sim::SledAgent; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const ORG_NAME: &str = "test-org"; + const PROJECT_NAME: &str = "springfield-squidport"; + const DISK_NAME: &str = "my-disk"; + + async fn create_org_project_and_disk(client: &ClientTestContext) -> Uuid { + create_ip_pool(&client, "p0", None, None).await; + create_organization(&client, ORG_NAME).await; + let project = create_project(client, ORG_NAME, PROJECT_NAME).await; + create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + project.identity.id + } + + // Helper for creating instance create parameters + fn new_test_params(opctx: &OpContext, project_id: Uuid) -> Params { + Params { + serialized_authn: Serialized::for_opctx(opctx), + organization_name: ORG_NAME.parse().unwrap(), + project_name: PROJECT_NAME.parse().unwrap(), + project_id, + create_params: params::InstanceCreate { + identity: IdentityMetadataCreateParams { + name: "my-instance".parse().unwrap(), + description: "My instance".to_string(), + }, + ncpus: InstanceCpuCount::try_from(2).unwrap(), + memory: ByteCount::from_gibibytes_u32(4), + hostname: String::from("inst"), + user_data: vec![], + network_interfaces: + params::InstanceNetworkInterfaceAttachment::Default, + external_ips: vec![params::ExternalIpCreate::Ephemeral { + pool_name: None, + }], + disks: vec![params::InstanceDiskAttachment::Attach( + params::InstanceDiskAttach { + name: DISK_NAME.parse().unwrap(), + }, + )], + start: true, + }, + } + } + + pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.apictx.nexus.datastore().clone(), + ) + } + + #[nexus_test(server = crate::Server)] + async fn test_saga_basic_usage_succeeds( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let project_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + let params = new_test_params(&opctx, project_id); + let dag = create_saga_dag::(params).unwrap(); + let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); + + // Actually run the saga + nexus.run_saga(runnable_saga).await.unwrap(); + } + + async fn no_instance_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::Instance; + use crate::db::schema::instance::dsl; + + dsl::instance + .filter(dsl::time_deleted.is_null()) + .select(Instance::as_select()) + .first_async::(datastore.pool_for_tests().await.unwrap()) + .await + .optional() + .unwrap() + .is_none() + } + + async fn no_network_interface_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::NetworkInterface; + use crate::db::schema::network_interface::dsl; + + dsl::network_interface + .filter(dsl::time_deleted.is_null()) + .select(NetworkInterface::as_select()) + .first_async::( + datastore.pool_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() + } + + async fn no_external_ip_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::ExternalIp; + use crate::db::schema::external_ip::dsl; + + dsl::external_ip + .filter(dsl::time_deleted.is_null()) + .select(ExternalIp::as_select()) + .first_async::( + datastore.pool_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() + } + + async fn disk_is_detached(datastore: &DataStore) -> bool { + use crate::db::model::Disk; + use crate::db::schema::disk::dsl; + + dsl::disk + .filter(dsl::time_deleted.is_null()) + .filter(dsl::name.eq(DISK_NAME)) + .select(Disk::as_select()) + .first_async::(datastore.pool_for_tests().await.unwrap()) + .await + .unwrap() + .runtime_state + .disk_state + == "detached" + } + + async fn no_instances_or_disks_on_sled(sled_agent: &SledAgent) -> bool { + sled_agent.instance_count().await == 0 + && sled_agent.disk_count().await == 0 + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let project_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + let params = new_test_params(&opctx, project_id); + let dag = create_saga_dag::(params).unwrap(); + + for node in dag.get_nodes() { + // Create a new saga for this node. + info!( + log, + "Creating new saga which will fail at index {:?}", node.index(); + "node_name" => node.name().as_ref(), + "label" => node.label(), + ); + + let runnable_saga = + nexus.create_runnable_saga(dag.clone()).await.unwrap(); + + // Inject an error instead of running the node. + // + // This should cause the saga to unwind. + nexus + .sec() + .saga_inject_error(runnable_saga.id(), node.index()) + .await + .unwrap(); + nexus + .run_saga(runnable_saga) + .await + .expect_err("Saga should have failed"); + + let datastore = nexus.datastore(); + + // Check that no partial artifacts of instance creation exist + assert!(no_instance_records_exist(datastore).await); + assert!(no_network_interface_records_exist(datastore).await); + assert!(no_external_ip_records_exist(datastore).await); + assert!(disk_is_detached(datastore).await); + assert!( + no_instances_or_disks_on_sled(&cptestctx.sled_agent.sled_agent) + .await + ); + } + } +} diff --git a/sled-agent/src/sim/collection.rs b/sled-agent/src/sim/collection.rs index 5a1c421b94..3c2cde7c11 100644 --- a/sled-agent/src/sim/collection.rs +++ b/sled-agent/src/sim/collection.rs @@ -191,6 +191,10 @@ impl SimCollection { } } + pub async fn size(&self) -> usize { + self.objects.lock().await.len() + } + /// Body of the background task (one per `SimObject`) that simulates /// asynchronous transitions. Each time we read a message from the object's /// channel, we sleep for a bit and then invoke `poke()` to complete whatever diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index d7a6e2d69f..bc59b4a0d2 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -250,6 +250,14 @@ impl SledAgent { self.disks.sim_ensure(&disk_id, initial_state, target).await } + pub async fn instance_count(&self) -> usize { + self.instances.size().await + } + + pub async fn disk_count(&self) -> usize { + self.disks.size().await + } + pub async fn instance_poke(&self, id: Uuid) { self.instances.sim_poke(id).await; }