From b5ad4a8118ffdaa3c245ab854c644a3c39b73aea Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 21 Feb 2023 14:13:39 -0500 Subject: [PATCH] [nexus] Populate IP pool, nexus service information, during rack setup (#2358) # Summary My long-term goal is to have Nexus be in charge of provisioning all services. For that to be possible, Nexus must be able to internalize all input during the handoff from RSS. This PR extends the RSS -> Nexus handoff to include: - What "Nexus Services" are being launched? - What are the ranges of IP addresses that may be used for internal services? - What external IP addresses, from that pool, are currently in-use for Nexus services? # Nexus Changes ## Database Records - Adds a `nexus_service` record, which just includes the information about the in-use external IP address. ## IP Address Allocation - Adds an `explicit_ip` option, which lets callers perform an allocation with an explicit request for a single IP address. You might ask the question: "Why not just directly create a record with the IP address in question, if you want to create it?" We could! But we'd need to recreate all the logic which validates that the IP address exists within the known-to-the-DB IP ranges within the pool. - The ability for an operator to "request Nexus execute with a specific IP address" is a feature we want anyway, so this isn't wasted work. - The implementation and tests for this behavior are mostly within `nexus/src/db/queries/external_ip.rs` ## Rack Initialization - Populates IP pools and Service records as a part of the RSS handoff. - Implementation and tests exist within `nexus/src/db/datastore/rack.rs`. ## Populate - Move the body of some of the "populate" functions into their correct spot in the datastore, which makes it easier to... - ... call all the populate functions -- rather than just a chunk of them -- from `omicron_nexus::db::datastore::datastore_test`. - As a consequence, update some tests which assumed the rack would be "half-populated" -- it's either fully populated, or not populated at all. # Sled Agent changes - Explicitly pass the "IP pool ranges for internal services" up to Nexus. - In the future, it'll be possible to pass a larger range of addresses than just those used by running Nexus services. Fixes: https://github.com/oxidecomputer/omicron/issues/1958 Unblocks: https://github.com/oxidecomputer/omicron/issues/732 --- common/src/sql/dbinit.sql | 9 + nexus/db-model/src/external_ip.rs | 28 + nexus/db-model/src/lib.rs | 2 + nexus/db-model/src/nexus_service.rs | 20 + nexus/db-model/src/schema.rs | 7 + nexus/src/app/rack.rs | 20 +- nexus/src/db/datastore/external_ip.rs | 69 ++- nexus/src/db/datastore/ip_pool.rs | 23 +- nexus/src/db/datastore/mod.rs | 29 +- nexus/src/db/datastore/rack.rs | 566 +++++++++++++++++- .../virtual_provisioning_collection.rs | 17 + nexus/src/db/queries/external_ip.rs | 426 +++++++++++-- nexus/src/db/true_or_cast_error.rs | 2 +- nexus/src/lib.rs | 1 + nexus/src/populate.rs | 58 +- nexus/tests/integration_tests/ip_pools.rs | 5 +- nexus/types/src/external_api/shared.rs | 21 + nexus/types/src/internal_api/params.rs | 15 +- openapi/nexus-internal.json | 67 +++ sled-agent/src/rack_setup/service.rs | 30 +- sled-agent/src/sim/server.rs | 1 + 21 files changed, 1245 insertions(+), 171 deletions(-) create mode 100644 nexus/db-model/src/nexus_service.rs diff --git a/common/src/sql/dbinit.sql b/common/src/sql/dbinit.sql index 925714bd66..626334a61f 100644 --- a/common/src/sql/dbinit.sql +++ b/common/src/sql/dbinit.sql @@ -136,6 +136,15 @@ CREATE INDEX ON omicron.public.service ( sled_id ); +-- Extended information for services where "service.kind = nexus" +-- The "id" columng of this table should match "id" column of the +-- "omicron.public.service" table exactly. +CREATE TABLE omicron.public.nexus_service ( + id UUID PRIMARY KEY, + -- The external IP address used for Nexus' external interface. + external_ip_id UUID NOT NULL +); + CREATE TYPE omicron.public.physical_disk_kind AS ENUM ( 'm2', 'u2' diff --git a/nexus/db-model/src/external_ip.rs b/nexus/db-model/src/external_ip.rs index 48e8e77d0d..b3c1ff0a37 100644 --- a/nexus/db-model/src/external_ip.rs +++ b/nexus/db-model/src/external_ip.rs @@ -18,6 +18,7 @@ use nexus_types::external_api::shared; use nexus_types::external_api::views; use omicron_common::api::external::Error; use std::convert::TryFrom; +use std::net::IpAddr; use uuid::Uuid; impl_enum_type!( @@ -90,6 +91,8 @@ pub struct IncompleteExternalIp { kind: IpKind, instance_id: Option, pool_id: Uuid, + // Optional address requesting that a specific IP address be allocated. + explicit_ip: Option, } impl IncompleteExternalIp { @@ -106,6 +109,7 @@ impl IncompleteExternalIp { kind: IpKind::SNat, instance_id: Some(instance_id), pool_id, + explicit_ip: None, } } @@ -118,6 +122,7 @@ impl IncompleteExternalIp { kind: IpKind::Ephemeral, instance_id: Some(instance_id), pool_id, + explicit_ip: None, } } @@ -135,6 +140,24 @@ impl IncompleteExternalIp { kind: IpKind::Floating, instance_id: None, pool_id, + explicit_ip: None, + } + } + + pub fn for_service_explicit( + id: Uuid, + pool_id: Uuid, + address: IpAddr, + ) -> Self { + Self { + id, + name: None, + description: None, + time_created: Utc::now(), + kind: IpKind::Service, + instance_id: None, + pool_id, + explicit_ip: Some(IpNetwork::from(address)), } } @@ -147,6 +170,7 @@ impl IncompleteExternalIp { kind: IpKind::Service, instance_id: None, pool_id, + explicit_ip: None, } } @@ -177,6 +201,10 @@ impl IncompleteExternalIp { pub fn pool_id(&self) -> &Uuid { &self.pool_id } + + pub fn explicit_ip(&self) -> &Option { + &self.explicit_ip + } } impl TryFrom for shared::IpKind { diff --git a/nexus/db-model/src/lib.rs b/nexus/db-model/src/lib.rs index f13d6116f4..83f8419612 100644 --- a/nexus/db-model/src/lib.rs +++ b/nexus/db-model/src/lib.rs @@ -36,6 +36,7 @@ mod l4_port_range; mod macaddr; mod name; mod network_interface; +mod nexus_service; mod organization; mod oximeter_info; mod physical_disk; @@ -114,6 +115,7 @@ pub use ipv6net::*; pub use l4_port_range::*; pub use name::*; pub use network_interface::*; +pub use nexus_service::*; pub use organization::*; pub use oximeter_info::*; pub use physical_disk::*; diff --git a/nexus/db-model/src/nexus_service.rs b/nexus/db-model/src/nexus_service.rs new file mode 100644 index 0000000000..866f0f6672 --- /dev/null +++ b/nexus/db-model/src/nexus_service.rs @@ -0,0 +1,20 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::schema::nexus_service; +use uuid::Uuid; + +/// Nexus-specific extended service information. +#[derive(Queryable, Insertable, Debug, Clone, Selectable)] +#[diesel(table_name = nexus_service)] +pub struct NexusService { + pub id: Uuid, + pub external_ip_id: Uuid, +} + +impl NexusService { + pub fn new(id: Uuid, external_ip_id: Uuid) -> Self { + Self { id, external_ip_id } + } +} diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index f51b454790..7d853c76bb 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -408,6 +408,13 @@ table! { } } +table! { + nexus_service (id) { + id -> Uuid, + external_ip_id -> Uuid, + } +} + table! { physical_disk (id) { id -> Uuid, diff --git a/nexus/src/app/rack.rs b/nexus/src/app/rack.rs index 632e6639df..09c30d8e1f 100644 --- a/nexus/src/app/rack.rs +++ b/nexus/src/app/rack.rs @@ -65,22 +65,6 @@ impl super::Nexus { ) -> Result<(), Error> { opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; - // Convert from parameter -> DB type. - let services: Vec<_> = request - .services - .into_iter() - .map(|svc| { - db::model::Service::new( - svc.service_id, - svc.sled_id, - svc.address, - svc.kind.into(), - ) - }) - .collect(); - - // TODO(https://github.com/oxidecomputer/omicron/issues/1958): If nexus, add a pool? - let datasets: Vec<_> = request .datasets .into_iter() @@ -94,6 +78,7 @@ impl super::Nexus { }) .collect(); + let service_ip_pool_ranges = request.internal_services_ip_pool_ranges; let certificates: Vec<_> = request .certs .into_iter() @@ -126,8 +111,9 @@ impl super::Nexus { .rack_set_initialized( opctx, rack_id, - services, + request.services, datasets, + service_ip_pool_ranges, certificates, ) .await?; diff --git a/nexus/src/db/datastore/external_ip.rs b/nexus/src/db/datastore/external_ip.rs index c7afdc8c06..439e729da4 100644 --- a/nexus/src/db/datastore/external_ip.rs +++ b/nexus/src/db/datastore/external_ip.rs @@ -14,10 +14,11 @@ use crate::db::model::ExternalIp; use crate::db::model::IncompleteExternalIp; use crate::db::model::IpKind; use crate::db::model::Name; +use crate::db::pool::DbConnection; use crate::db::queries::external_ip::NextExternalIp; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; -use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; use chrono::Utc; use diesel::prelude::*; use nexus_types::identity::Resource; @@ -25,6 +26,7 @@ use omicron_common::api::external::CreateResult; use omicron_common::api::external::Error; use omicron_common::api::external::LookupResult; use omicron_common::api::external::Name as ExternalName; +use std::net::IpAddr; use std::str::FromStr; use uuid::Uuid; @@ -83,20 +85,58 @@ impl DataStore { opctx: &OpContext, data: IncompleteExternalIp, ) -> CreateResult { - NextExternalIp::new(data) - .get_result_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| { - use async_bb8_diesel::ConnectionError::Query; - use async_bb8_diesel::PoolError::Connection; - use diesel::result::Error::NotFound; - match e { - Connection(Query(NotFound)) => Error::invalid_request( - "No external IP addresses available", - ), - _ => public_error_from_diesel_pool(e, ErrorHandler::Server), + let conn = self.pool_authorized(opctx).await?; + Self::allocate_external_ip_on_connection(conn, data).await + } + + /// Variant of [Self::allocate_external_ip] which may be called from a + /// transaction context. + pub(crate) async fn allocate_external_ip_on_connection( + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), + data: IncompleteExternalIp, + ) -> CreateResult + where + ConnErr: From + Send + 'static, + PoolError: From, + { + let explicit_ip = data.explicit_ip().is_some(); + NextExternalIp::new(data).get_result_async(conn).await.map_err(|e| { + use async_bb8_diesel::ConnectionError::Query; + use async_bb8_diesel::PoolError::Connection; + use diesel::result::Error::NotFound; + let e = PoolError::from(e); + match e { + Connection(Query(NotFound)) => { + if explicit_ip { + Error::invalid_request( + "Requested external IP address not available", + ) + } else { + Error::invalid_request( + "No external IP addresses available", + ) + } } - }) + _ => crate::db::queries::external_ip::from_pool(e), + } + }) + } + + /// Allocates an explicit IP address for an internal service. + /// + /// Unlike the other IP allocation requests, this does not search for an + /// available IP address, it asks for one explicitly. + pub async fn allocate_explicit_service_ip( + &self, + opctx: &OpContext, + ip_id: Uuid, + ip: IpAddr, + ) -> CreateResult { + let (.., pool) = self.ip_pools_service_lookup(opctx).await?; + let data = + IncompleteExternalIp::for_service_explicit(ip_id, pool.id(), ip); + self.allocate_external_ip(opctx, data).await } /// Deallocate the external IP address with the provided ID. @@ -104,7 +144,6 @@ impl DataStore { /// To support idempotency, such as in saga operations, this method returns /// an extra boolean, rather than the usual `DeleteResult`. The meaning of /// return values are: - /// /// - `Ok(true)`: The record was deleted during this call /// - `Ok(false)`: The record was already deleted, such as by a previous /// call diff --git a/nexus/src/db/datastore/ip_pool.rs b/nexus/src/db/datastore/ip_pool.rs index cb4ec7b41e..6a665d1883 100644 --- a/nexus/src/db/datastore/ip_pool.rs +++ b/nexus/src/db/datastore/ip_pool.rs @@ -21,10 +21,11 @@ use crate::db::model::IpPoolRange; use crate::db::model::IpPoolUpdate; use crate::db::model::Name; use crate::db::pagination::paginated; +use crate::db::pool::DbConnection; use crate::db::queries::ip_pool::FilterOverlappingIpRanges; use crate::external_api::params; use crate::external_api::shared::IpRange; -use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; use chrono::Utc; use diesel::prelude::*; use ipnetwork::IpNetwork; @@ -287,6 +288,24 @@ impl DataStore { authz_pool: &authz::IpPool, range: &IpRange, ) -> CreateResult { + let conn = self.pool_authorized(opctx).await?; + Self::ip_pool_add_range_on_connection(conn, opctx, authz_pool, range) + .await + } + + /// Variant of [Self::ip_pool_add_range] which may be called from a + /// transaction context. + pub(crate) async fn ip_pool_add_range_on_connection( + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), + opctx: &OpContext, + authz_pool: &authz::IpPool, + range: &IpRange, + ) -> CreateResult + where + ConnErr: From + Send + 'static, + PoolError: From, + { use db::schema::ip_pool_range::dsl; opctx.authorize(authz::Action::CreateChild, authz_pool).await?; let pool_id = authz_pool.id(); @@ -295,7 +314,7 @@ impl DataStore { let insert_query = diesel::insert_into(dsl::ip_pool_range).values(filter_subquery); IpPool::insert_resource(pool_id, insert_query) - .insert_and_get_result_async(self.pool_authorized(opctx).await?) + .insert_and_get_result_async(conn) .await .map_err(|e| { use async_bb8_diesel::ConnectionError::Query; diff --git a/nexus/src/db/datastore/mod.rs b/nexus/src/db/datastore/mod.rs index 70a23d522b..269ae19675 100644 --- a/nexus/src/db/datastore/mod.rs +++ b/nexus/src/db/datastore/mod.rs @@ -81,6 +81,9 @@ pub use volume::CrucibleResources; // TODO: This should likely turn into a configuration option. pub(crate) const REGION_REDUNDANCY_THRESHOLD: usize = 3; +/// The name of the built-in IP pool for Oxide services. +pub const SERVICE_IP_POOL_NAME: &str = "oxide-service-pool"; + // Represents a query that is ready to be executed. // // This helper trait lets the statement either be executed or explained. @@ -236,12 +239,20 @@ pub async fn datastore_test( authn::Context::internal_db_init(), Arc::clone(&datastore), ); + + // TODO: Can we just call "Populate" instead of doing this? + let rack_id = Uuid::parse_str(nexus_test_utils::RACK_UUID).unwrap(); datastore.load_builtin_users(&opctx).await.unwrap(); datastore.load_builtin_roles(&opctx).await.unwrap(); datastore.load_builtin_role_asgns(&opctx).await.unwrap(); datastore.load_builtin_silos(&opctx).await.unwrap(); datastore.load_silo_users(&opctx).await.unwrap(); datastore.load_silo_user_role_assignments(&opctx).await.unwrap(); + datastore + .load_builtin_fleet_virtual_provisioning_collection(&opctx) + .await + .unwrap(); + datastore.load_builtin_rack_data(&opctx, rack_id).await.unwrap(); // Create an OpContext with the credentials of "test-privileged" for general // testing. @@ -1067,14 +1078,28 @@ mod test { // Initialize the Rack. let result = datastore - .rack_set_initialized(&opctx, rack.id(), vec![], vec![], vec![]) + .rack_set_initialized( + &opctx, + rack.id(), + vec![], + vec![], + vec![], + vec![], + ) .await .unwrap(); assert!(result.initialized); // Re-initialize the rack (check for idempotency) let result = datastore - .rack_set_initialized(&opctx, rack.id(), vec![], vec![], vec![]) + .rack_set_initialized( + &opctx, + rack.id(), + vec![], + vec![], + vec![], + vec![], + ) .await .unwrap(); assert!(result.initialized); diff --git a/nexus/src/db/datastore/rack.rs b/nexus/src/db/datastore/rack.rs index b8018cbbc1..2c7e3aafb3 100644 --- a/nexus/src/db/datastore/rack.rs +++ b/nexus/src/db/datastore/rack.rs @@ -5,6 +5,7 @@ //! [`DataStore`] methods on [`Rack`]s. use super::DataStore; +use super::SERVICE_IP_POOL_NAME; use crate::authz; use crate::context::OpContext; use crate::db; @@ -16,17 +17,22 @@ use crate::db::error::TransactionError; use crate::db::identity::Asset; use crate::db::model::Certificate; use crate::db::model::Dataset; +use crate::db::model::IncompleteExternalIp; +use crate::db::model::NexusService; use crate::db::model::Rack; use crate::db::model::Service; use crate::db::model::Sled; use crate::db::model::Zpool; use crate::db::pagination::paginated; +use crate::internal_api::params as internal_params; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use async_bb8_diesel::PoolError; use chrono::Utc; use diesel::prelude::*; use diesel::upsert::excluded; +use nexus_types::external_api::shared::IpRange; +use nexus_types::identity::Resource; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; @@ -85,8 +91,9 @@ impl DataStore { &self, opctx: &OpContext, rack_id: Uuid, - services: Vec, + services: Vec, datasets: Vec, + service_ip_pool_ranges: Vec, certificates: Vec, ) -> UpdateResult { use db::schema::rack::dsl as rack_dsl; @@ -95,12 +102,16 @@ impl DataStore { #[derive(Debug)] enum RackInitError { + AddingIp(Error), ServiceInsert { err: AsyncInsertError, sled_id: Uuid, svc_id: Uuid }, DatasetInsert { err: AsyncInsertError, zpool_id: Uuid }, RackUpdate(PoolError), } type TxnError = TransactionError; + let (authz_service_pool, service_pool) = + self.ip_pools_service_lookup(&opctx).await?; + // NOTE: This operation could likely be optimized with a CTE, but given // the low-frequency of calls, this optimization has been deferred. let log = opctx.log.clone(); @@ -114,6 +125,7 @@ impl DataStore { .get_result_async(&conn) .await .map_err(|e| { + warn!(log, "Initializing Rack: Rack UUID not found"); TxnError::CustomError(RackInitError::RackUpdate( PoolError::from(e), )) @@ -123,14 +135,36 @@ impl DataStore { return Ok(rack); } - // Otherwise, insert services and datasets - for svc in services { + // Otherwise, insert services and datasets. + + // Set up the IP pool for internal services. + for range in service_ip_pool_ranges { + Self::ip_pool_add_range_on_connection( + &conn, + opctx, + &authz_service_pool, + &range, + ).await.map_err(|err| { + warn!(log, "Initializing Rack: Failed to add IP pool range"); + TxnError::CustomError(RackInitError::AddingIp(err)) + })?; + } + + // Allocate records for all services. + for service in services { + let service_db = db::model::Service::new( + service.service_id, + service.sled_id, + service.address, + service.kind.into(), + ); + use db::schema::service::dsl; - let sled_id = svc.sled_id; + let sled_id = service.sled_id; >::insert_resource( sled_id, diesel::insert_into(dsl::service) - .values(svc.clone()) + .values(service_db.clone()) .on_conflict(dsl::id) .do_update() .set(( @@ -143,14 +177,47 @@ impl DataStore { .insert_and_get_result_async(&conn) .await .map_err(|err| { + warn!(log, "Initializing Rack: Failed to insert service"); TxnError::CustomError(RackInitError::ServiceInsert { err, sled_id, - svc_id: svc.id(), + svc_id: service.service_id, }) })?; + + if let internal_params::ServiceKind::Nexus { external_address } = service.kind { + // Allocate the explicit IP address that is currently + // in-use by this Nexus service. + let ip_id = Uuid::new_v4(); + let data = IncompleteExternalIp::for_service_explicit( + ip_id, + service_pool.id(), + external_address + ); + let allocated_ip = Self::allocate_external_ip_on_connection( + &conn, + data + ).await.map_err(|err| { + warn!(log, "Initializing Rack: Failed to allocate IP address"); + TxnError::CustomError(RackInitError::AddingIp(err)) + })?; + assert_eq!(allocated_ip.ip.ip(), external_address); + + // Add a service record for Nexus. + let nexus_service = NexusService::new(service.service_id, allocated_ip.id); + use db::schema::nexus_service::dsl; + diesel::insert_into(dsl::nexus_service) + .values(nexus_service) + .execute_async(&conn) + .await + .map_err(|e| { + warn!(log, "Initializing Rack: Failed to insert Nexus Service record"); + e + })?; + } } info!(log, "Inserted services"); + for dataset in datasets { use db::schema::dataset::dsl; let zpool_id = dataset.pool_id; @@ -188,6 +255,7 @@ impl DataStore { .execute_async(&conn) .await?; } + info!(log, "Inserted certificates"); let rack = diesel::update(rack_dsl::rack) .filter(rack_dsl::id.eq(rack_id)) @@ -207,6 +275,7 @@ impl DataStore { }) .await .map_err(|e| match e { + TxnError::CustomError(RackInitError::AddingIp(err)) => err, TxnError::CustomError(RackInitError::DatasetInsert { err, zpool_id, @@ -256,4 +325,489 @@ impl DataStore { } }) } + + pub async fn load_builtin_rack_data( + &self, + opctx: &OpContext, + rack_id: Uuid, + ) -> Result<(), Error> { + use crate::external_api::params; + use omicron_common::api::external::IdentityMetadataCreateParams; + use omicron_common::api::external::Name; + + self.rack_insert(opctx, &db::model::Rack::new(rack_id)).await?; + + let params = params::IpPoolCreate { + identity: IdentityMetadataCreateParams { + name: SERVICE_IP_POOL_NAME.parse::().unwrap(), + description: String::from("IP Pool for Oxide Services"), + }, + }; + self.ip_pool_create(opctx, ¶ms, /*internal=*/ true) + .await + .map(|_| ()) + .or_else(|e| match e { + Error::ObjectAlreadyExists { .. } => Ok(()), + _ => Err(e), + })?; + + let params = params::IpPoolCreate { + identity: IdentityMetadataCreateParams { + name: "default".parse::().unwrap(), + description: String::from("default IP pool"), + }, + }; + self.ip_pool_create(opctx, ¶ms, /*internal=*/ false) + .await + .map(|_| ()) + .or_else(|e| match e { + Error::ObjectAlreadyExists { .. } => Ok(()), + _ => Err(e), + })?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db::datastore::datastore_test; + use crate::db::model::ExternalIp; + use crate::db::model::IpKind; + use crate::db::model::IpPoolRange; + use crate::db::model::ServiceKind; + use async_bb8_diesel::AsyncSimpleConnection; + use nexus_test_utils::db::test_setup_database; + use nexus_types::identity::Asset; + use omicron_test_utils::dev; + use std::collections::HashMap; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV6}; + + fn rack_id() -> Uuid { + Uuid::parse_str(nexus_test_utils::RACK_UUID).unwrap() + } + + #[tokio::test] + async fn rack_set_initialized_empty() { + let logctx = dev::test_setup_log("rack_set_initialized_empty"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let services = vec![]; + let datasets = vec![]; + let service_ip_pool_ranges = vec![]; + let certificates = vec![]; + + // Initializing the rack with no data is odd, but allowed. + let rack = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services.clone(), + datasets.clone(), + service_ip_pool_ranges.clone(), + certificates.clone(), + ) + .await + .expect("Failed to initialize rack"); + + assert_eq!(rack.id(), rack_id()); + assert!(rack.initialized); + + // It should also be idempotent. + let rack2 = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services, + datasets, + service_ip_pool_ranges, + certificates, + ) + .await + .expect("Failed to initialize rack"); + assert_eq!(rack.time_modified(), rack2.time_modified()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + async fn create_test_sled(db: &DataStore) -> Sled { + let sled_id = Uuid::new_v4(); + let is_scrimlet = false; + let addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0); + let identifier = String::from("identifier"); + let model = String::from("model"); + let revision = 0; + let sled = Sled::new( + sled_id, + addr, + is_scrimlet, + identifier, + model, + revision, + rack_id(), + ); + db.sled_upsert(sled) + .await + .expect("Could not upsert sled during test prep") + } + + // Hacky macro helper to: + // - Perform a transaction... + // - ... That queries a particular table for all values... + // - ... and Selects them as the requested model type. + macro_rules! fn_to_get_all { + ($table:ident, $model:ident) => { + paste::paste! { + async fn [](db: &DataStore) -> Vec<$model> { + use crate::db::schema::$table::dsl; + db.pool_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + conn.batch_execute_async(crate::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + Ok::<_, crate::db::TransactionError<()>>( + dsl::$table + .select($model::as_select()) + .get_results_async(&conn) + .await + .unwrap() + ) + }) + .await + .unwrap() + } + } + } + } + + fn_to_get_all!(service, Service); + fn_to_get_all!(nexus_service, NexusService); + fn_to_get_all!(external_ip, ExternalIp); + fn_to_get_all!(ip_pool_range, IpPoolRange); + fn_to_get_all!(dataset, Dataset); + + #[tokio::test] + async fn rack_set_initialized_with_nexus_service() { + let logctx = + dev::test_setup_log("rack_set_initialized_with_nexus_service"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let sled = create_test_sled(&datastore).await; + + let nexus_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + let services = vec![internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: nexus_ip, + }, + }]; + let datasets = vec![]; + let service_ip_pool_ranges = vec![IpRange::from(nexus_ip)]; + let certificates = vec![]; + + let rack = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services.clone(), + datasets.clone(), + service_ip_pool_ranges, + certificates.clone(), + ) + .await + .expect("Failed to initialize rack"); + + assert_eq!(rack.id(), rack_id()); + assert!(rack.initialized); + + let observed_services = get_all_services(&datastore).await; + let observed_nexus_services = get_all_nexus_services(&datastore).await; + let observed_datasets = get_all_datasets(&datastore).await; + + // We should only see the one nexus we inserted earlier + assert_eq!(observed_services.len(), 1); + assert_eq!(observed_services[0].sled_id, sled.id()); + assert_eq!(observed_services[0].kind, ServiceKind::Nexus); + + // It should have a corresponding "Nexus service record" + assert_eq!(observed_nexus_services.len(), 1); + assert_eq!(observed_services[0].id(), observed_nexus_services[0].id); + + // We should also see the single external IP allocated for this nexus + // interface. + let observed_external_ips = get_all_external_ips(&datastore).await; + assert_eq!(observed_external_ips.len(), 1); + assert_eq!( + observed_external_ips[0].id, + observed_nexus_services[0].external_ip_id + ); + assert_eq!(observed_external_ips[0].kind, IpKind::Service); + + // Furthermore, we should be able to see that this IP address has been + // allocated as a part of the service IP pool. + let (.., svc_pool) = + datastore.ip_pools_service_lookup(&opctx).await.unwrap(); + assert!(svc_pool.internal); + + let observed_ip_pool_ranges = get_all_ip_pool_ranges(&datastore).await; + assert_eq!(observed_ip_pool_ranges.len(), 1); + assert_eq!(observed_ip_pool_ranges[0].ip_pool_id, svc_pool.id()); + + // Verify the allocated external IP + assert_eq!(observed_external_ips[0].ip_pool_id, svc_pool.id()); + assert_eq!( + observed_external_ips[0].ip_pool_range_id, + observed_ip_pool_ranges[0].id + ); + assert_eq!(observed_external_ips[0].kind, IpKind::Service); + assert_eq!(observed_external_ips[0].ip.ip(), nexus_ip,); + + assert!(observed_datasets.is_empty()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn rack_set_initialized_with_many_nexus_services() { + let logctx = dev::test_setup_log( + "rack_set_initialized_with_many_nexus_services", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let sled = create_test_sled(&datastore).await; + + // Ask for two Nexus services, with different external IPs. + let nexus_ip_start = Ipv4Addr::new(1, 2, 3, 4); + let nexus_ip_end = Ipv4Addr::new(1, 2, 3, 5); + let mut services = vec![ + internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: IpAddr::V4(nexus_ip_start), + }, + }, + internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: IpAddr::V4(nexus_ip_end), + }, + }, + ]; + services + .sort_by(|a, b| a.service_id.partial_cmp(&b.service_id).unwrap()); + + let datasets = vec![]; + let service_ip_pool_ranges = + vec![IpRange::try_from((nexus_ip_start, nexus_ip_end)) + .expect("Cannot create IP Range")]; + let certificates = vec![]; + + let rack = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services.clone(), + datasets.clone(), + service_ip_pool_ranges, + certificates.clone(), + ) + .await + .expect("Failed to initialize rack"); + + assert_eq!(rack.id(), rack_id()); + assert!(rack.initialized); + + let mut observed_services = get_all_services(&datastore).await; + let mut observed_nexus_services = + get_all_nexus_services(&datastore).await; + let observed_datasets = get_all_datasets(&datastore).await; + + // We should see both of the Nexus services we provisioned. + assert_eq!(observed_services.len(), 2); + observed_services.sort_by(|a, b| a.id().partial_cmp(&b.id()).unwrap()); + + assert_eq!(observed_services[0].sled_id, sled.id()); + assert_eq!(observed_services[1].sled_id, sled.id()); + assert_eq!(observed_services[0].kind, ServiceKind::Nexus); + assert_eq!(observed_services[1].kind, ServiceKind::Nexus); + + // It should have a corresponding "Nexus service record" + assert_eq!(observed_nexus_services.len(), 2); + observed_nexus_services + .sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); + assert_eq!(observed_services[0].id(), observed_nexus_services[0].id); + assert_eq!(observed_services[1].id(), observed_nexus_services[1].id); + + // We should see both IPs allocated for these services. + let observed_external_ips: HashMap<_, _> = + get_all_external_ips(&datastore) + .await + .into_iter() + .map(|ip| (ip.id, ip)) + .collect(); + assert_eq!(observed_external_ips.len(), 2); + + // The address referenced by the "NexusService" should match the input. + assert_eq!( + observed_external_ips[&observed_nexus_services[0].external_ip_id] + .ip + .ip(), + if let internal_params::ServiceKind::Nexus { external_address } = + services[0].kind + { + external_address + } else { + panic!("Unexpected service kind") + } + ); + assert_eq!( + observed_external_ips[&observed_nexus_services[1].external_ip_id] + .ip + .ip(), + if let internal_params::ServiceKind::Nexus { external_address } = + services[1].kind + { + external_address + } else { + panic!("Unexpected service kind") + } + ); + + // Furthermore, we should be able to see that this IP addresses have been + // allocated as a part of the service IP pool. + let (.., svc_pool) = + datastore.ip_pools_service_lookup(&opctx).await.unwrap(); + assert!(svc_pool.internal); + + let observed_ip_pool_ranges = get_all_ip_pool_ranges(&datastore).await; + assert_eq!(observed_ip_pool_ranges.len(), 1); + assert_eq!(observed_ip_pool_ranges[0].ip_pool_id, svc_pool.id()); + + assert!(observed_datasets.is_empty()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn rack_set_initialized_missing_service_pool_ip_throws_error() { + let logctx = dev::test_setup_log( + "rack_set_initialized_missing_service_pool_ip_throws_error", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let sled = create_test_sled(&datastore).await; + + let nexus_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + let services = vec![internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: nexus_ip, + }, + }]; + let datasets = vec![]; + let service_ip_pool_ranges = vec![]; + let certificates = vec![]; + + let result = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services.clone(), + datasets.clone(), + service_ip_pool_ranges, + certificates.clone(), + ) + .await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Invalid Request: Requested external IP address not available" + ); + + assert!(get_all_services(&datastore).await.is_empty()); + assert!(get_all_nexus_services(&datastore).await.is_empty()); + assert!(get_all_datasets(&datastore).await.is_empty()); + assert!(get_all_external_ips(&datastore).await.is_empty()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn rack_set_initialized_overlapping_ips_throws_error() { + let logctx = dev::test_setup_log( + "rack_set_initialized_overlapping_ips_throws_error", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let sled = create_test_sled(&datastore).await; + + // Request two services which happen to be using the same IP address. + let nexus_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + + let services = vec![ + internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: nexus_ip, + }, + }, + internal_params::ServicePutRequest { + service_id: Uuid::new_v4(), + sled_id: sled.id(), + address: Ipv6Addr::LOCALHOST, + kind: internal_params::ServiceKind::Nexus { + external_address: nexus_ip, + }, + }, + ]; + let datasets = vec![]; + let service_ip_pool_ranges = vec![IpRange::from(nexus_ip)]; + let certificates = vec![]; + + let result = datastore + .rack_set_initialized( + &opctx, + rack_id(), + services.clone(), + datasets.clone(), + service_ip_pool_ranges, + certificates.clone(), + ) + .await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Invalid Request: Requested external IP address not available", + ); + + assert!(get_all_services(&datastore).await.is_empty()); + assert!(get_all_nexus_services(&datastore).await.is_empty()); + assert!(get_all_datasets(&datastore).await.is_empty()); + assert!(get_all_external_ips(&datastore).await.is_empty()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } diff --git a/nexus/src/db/datastore/virtual_provisioning_collection.rs b/nexus/src/db/datastore/virtual_provisioning_collection.rs index da64f76c6f..d1a55fb883 100644 --- a/nexus/src/db/datastore/virtual_provisioning_collection.rs +++ b/nexus/src/db/datastore/virtual_provisioning_collection.rs @@ -320,4 +320,21 @@ impl DataStore { .append_cpu_metrics(&provisions); Ok(provisions) } + + pub async fn load_builtin_fleet_virtual_provisioning_collection( + &self, + opctx: &OpContext, + ) -> Result<(), Error> { + let id = *db::fixed_data::FLEET_ID; + self.virtual_provisioning_collection_create( + opctx, + db::model::VirtualProvisioningCollection::new( + id, + db::model::CollectionTypeProvisioned::Fleet, + ), + ) + .await?; + + Ok(()) + } } diff --git a/nexus/src/db/queries/external_ip.rs b/nexus/src/db/queries/external_ip.rs index 9a284967ce..eda5f9830c 100644 --- a/nexus/src/db/queries/external_ip.rs +++ b/nexus/src/db/queries/external_ip.rs @@ -12,6 +12,7 @@ use crate::db::model::IpKindEnum; use crate::db::model::Name; use crate::db::pool::DbConnection; use crate::db::schema; +use crate::db::true_or_cast_error::{matches_sentinel, TrueOrCastError}; use chrono::DateTime; use chrono::Utc; use diesel::pg::Pg; @@ -21,8 +22,10 @@ use diesel::query_builder::QueryFragment; use diesel::query_builder::QueryId; use diesel::sql_types; use diesel::Column; +use diesel::Expression; use diesel::QueryResult; use diesel::RunQueryDsl; +use omicron_common::api::external; use uuid::Uuid; type FromClause = @@ -34,6 +37,29 @@ type ExternalIpFromClause = FromClause; const EXTERNAL_IP_FROM_CLAUSE: ExternalIpFromClause = ExternalIpFromClause::new(); +const REALLOCATION_WITH_DIFFERENT_IP_SENTINEL: &'static str = + "Reallocation of IP with different value"; + +/// Translates a generic pool error to an external error. +pub fn from_pool(e: async_bb8_diesel::PoolError) -> external::Error { + use crate::db::error; + + let sentinels = [REALLOCATION_WITH_DIFFERENT_IP_SENTINEL]; + if let Some(sentinel) = matches_sentinel(&e, &sentinels) { + match sentinel { + REALLOCATION_WITH_DIFFERENT_IP_SENTINEL => { + return external::Error::invalid_request( + "Re-allocating IP address with a different value", + ); + } + // Fall-through to the generic error conversion. + _ => {} + } + } + + error::public_error_from_diesel_pool(e, error::ErrorHandler::Server) +} + // The number of ports available to an instance when doing source NAT. Note // that for static NAT, this value isn't used, and all ports are available. // @@ -85,20 +111,19 @@ const MAX_PORT: i32 = u16::MAX as _; /// CAST(candidate_first_port AS INT4) AS first_port, /// CAST(candidate_last_port AS INT4) AS last_port /// FROM -/// ( +/// SELECT * FROM ( /// -- Select all IP addresses by pool and range. /// SELECT /// ip_pool_id, /// id AS ip_pool_range_id, -/// first_address + -/// generate_series(0, last_address - first_address) -/// AS candidate_ip +/// AS candidate_ip /// FROM /// ip_pool_range /// WHERE /// AND /// time_deleted IS NULL -/// ) +/// ) AS candidates +/// WHERE candidates.candidate_ip IS NOT NULL /// CROSS JOIN /// ( /// -- Cartesian product with all first/last port values @@ -125,6 +150,41 @@ const MAX_PORT: i32 = u16::MAX as _; /// candidate_ip, candidate_first_port /// LIMIT 1 /// ), +/// -- Identify if the IP address has already been allocated, to validate +/// -- that the request looks the same (for idempotency). +/// previously_allocated_ip AS ( +/// SELECT +/// id as old_id, +/// ip as old_ip +/// FROM external_ip +/// WHERE +/// id = AND time_deleted IS NULL +/// ), +/// -- Compare `next_external_ip` with `previously_allocated_ip`, and throw +/// -- an error if the request is not idempotent. +/// validate_prior_allocation AS MATERIALIZED ( +/// CAST( +/// -- If this expression evaluates to false, we throw an error. +/// IF( +/// -- Either the previously_allocated_ip does not exist, or... +/// NOT EXISTS(SELECT 1 FROM previously_allocated_ip) OR +/// -- ... If it does exist, the IP address must be the same for +/// -- both the old and new request. +/// ( +/// SELECT ip = old_ip +/// FROM +/// ( +/// SELECT ip, old_ip +/// FROM next_external_ip +/// INNER JOIN +/// previously_allocated_ip ON old_id = id +/// ) +/// ), +/// TRUE, +/// AS BOOL +/// ) +/// ) +/// ), /// external_ip AS ( /// -- Insert the record into the actual table. /// -- When a conflict is detected, we'll update the timestamps but leave @@ -344,23 +404,18 @@ impl NextExternalIp { // In either case, we follow this with a filter `WHERE ip IS NULL`, // meaning we select the candidate address and first port that does not // have a matching record in the table already. + out.push_sql(" ON ("); + out.push_identifier(dsl::ip::NAME)?; + out.push_sql(","); if matches!(self.ip.kind(), &IpKind::SNat) { - out.push_sql(" ON ("); - out.push_identifier(dsl::ip::NAME)?; - out.push_sql(", candidate_first_port >= "); + out.push_sql(" candidate_first_port >= "); out.push_identifier(dsl::first_port::NAME)?; out.push_sql(" AND candidate_last_port <= "); out.push_identifier(dsl::last_port::NAME)?; - out.push_sql(", "); - out.push_identifier(dsl::time_deleted::NAME)?; - out.push_sql(" IS NULL) = (candidate_ip, TRUE, TRUE) "); - } else { - out.push_sql(" ON ("); - out.push_identifier(dsl::ip::NAME)?; - out.push_sql(", "); - out.push_identifier(dsl::time_deleted::NAME)?; - out.push_sql(" IS NULL) = (candidate_ip, TRUE) "); + out.push_sql(" AND "); } + out.push_identifier(dsl::time_deleted::NAME)?; + out.push_sql(" IS NULL) = (candidate_ip, TRUE) "); // In all cases, we're selecting rows from the join that don't have a // match in the existing table. @@ -388,38 +443,143 @@ impl NextExternalIp { Ok(()) } - // Push a subquery that selects the sequence of IP addresses, from each range in - // each IP Pool, along with the pool/range IDs. + fn push_prior_allocation_subquery<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> QueryResult<()> { + use schema::external_ip::dsl; + + out.push_sql("SELECT "); + out.push_identifier(dsl::id::NAME)?; + out.push_sql(" AS old_id,"); + out.push_identifier(dsl::ip::NAME)?; + out.push_sql(" AS old_ip FROM "); + EXTERNAL_IP_FROM_CLAUSE.walk_ast(out.reborrow())?; + out.push_sql(" WHERE "); + out.push_identifier(dsl::id::NAME)?; + out.push_sql(" = "); + out.push_bind_param::(self.ip.id())?; + out.push_sql(" AND "); + out.push_identifier(dsl::time_deleted::NAME)?; + out.push_sql(" IS NULL"); + + Ok(()) + } + + fn push_validate_prior_allocation_subquery<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> QueryResult<()> { + // Describes a boolean expression within the CTE to check if the "old IP + // address" matches the "to-be-allocated" IP address. + // + // This validates that, in a case where we're re-allocating the same IP, + // we don't attempt to assign a different value. + struct CheckIfOldAllocationHasSameIp {} + impl Expression for CheckIfOldAllocationHasSameIp { + type SqlType = diesel::sql_types::Bool; + } + impl QueryFragment for CheckIfOldAllocationHasSameIp { + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + use schema::external_ip::dsl; + out.unsafe_to_cache_prepared(); + // Either the allocation to this UUID needs to be new... + out.push_sql( + "NOT EXISTS(SELECT 1 FROM previously_allocated_ip) OR", + ); + // ... Or we are allocating the same IP adress... + out.push_sql("(SELECT "); + out.push_identifier(dsl::ip::NAME)?; + out.push_sql(" = old_ip FROM (SELECT "); + out.push_identifier(dsl::ip::NAME)?; + out.push_sql(", old_ip FROM next_external_ip INNER JOIN previously_allocated_ip ON old_id = id))"); + Ok(()) + } + } + + out.push_sql("SELECT "); + + const QUERY: TrueOrCastError = + TrueOrCastError::new( + CheckIfOldAllocationHasSameIp {}, + REALLOCATION_WITH_DIFFERENT_IP_SENTINEL, + ); + QUERY.walk_ast(out.reborrow())?; + Ok(()) + } + + // Push a subquery which selects either: + // - A sequence of candidate IP addresses from the IP pool range, if no + // explicit IP address has been supplied, or + // - A single IP address within the range, if an explicit IP address has + // been supplied. // // ```sql - // SELECT - // ip_pool_id, - // id AS ip_pool_range_id, - // first_address + - // generate_series(0, last_address - first_address) - // AS candidate_ip - // FROM - // ip_pool_range - // WHERE - // AND - // time_deleted IS NULL + // SELECT * FROM ( + // SELECT + // ip_pool_id, + // id AS ip_pool_range_id, + // -- Candidates with no explicit IP: + // first_address + generate_series(0, last_address - first_address) + // -- Candidates with explicit IP: + // CASE + // first_address <= AND + // <= last_address + // WHEN TRUE THEN ELSE NULL END + // -- Either way: + // AS candidate_ip + // FROM + // ip_pool_range + // WHERE + // AND + // time_deleted IS NULL + // ) AS candidates + // WHERE candidates.candidate_ip IS NOT NULL // ``` fn push_address_sequence_subquery<'a>( &'a self, mut out: AstPass<'_, 'a, Pg>, ) -> QueryResult<()> { use schema::ip_pool_range::dsl; + out.push_sql("SELECT * FROM ("); + out.push_sql("SELECT "); out.push_identifier(dsl::ip_pool_id::NAME)?; out.push_sql(", "); out.push_identifier(dsl::id::NAME)?; out.push_sql(" AS ip_pool_range_id, "); - out.push_identifier(dsl::first_address::NAME)?; - out.push_sql(" + generate_series(0, "); - out.push_identifier(dsl::last_address::NAME)?; - out.push_sql(" - "); - out.push_identifier(dsl::first_address::NAME)?; - out.push_sql(") AS candidate_ip FROM "); + + if let Some(explicit_ip) = self.ip.explicit_ip() { + out.push_sql("CASE "); + out.push_identifier(dsl::first_address::NAME)?; + out.push_sql(" <= "); + out.push_bind_param::( + explicit_ip, + )?; + out.push_sql(" AND "); + out.push_bind_param::( + explicit_ip, + )?; + out.push_sql(" <= "); + out.push_identifier(dsl::last_address::NAME)?; + out.push_sql(" WHEN TRUE THEN "); + out.push_bind_param::( + explicit_ip, + )?; + out.push_sql(" ELSE NULL END"); + } else { + out.push_identifier(dsl::first_address::NAME)?; + out.push_sql(" + generate_series(0, "); + out.push_identifier(dsl::last_address::NAME)?; + out.push_sql(" - "); + out.push_identifier(dsl::first_address::NAME)?; + out.push_sql(") "); + } + + out.push_sql(" AS candidate_ip FROM "); IP_POOL_RANGE_FROM_CLAUSE.walk_ast(out.reborrow())?; out.push_sql(" WHERE "); out.push_identifier(dsl::ip_pool_id::NAME)?; @@ -428,6 +588,8 @@ impl NextExternalIp { out.push_sql(" AND "); out.push_identifier(dsl::time_deleted::NAME)?; out.push_sql(" IS NULL"); + out.push_sql(") AS candidates "); + out.push_sql("WHERE candidates.candidate_ip IS NOT NULL"); Ok(()) } @@ -556,18 +718,28 @@ impl QueryFragment for NextExternalIp { // their IP address ranges. out.push_sql("WITH next_external_ip AS ("); self.push_next_ip_and_port_range_subquery(out.reborrow())?; + out.push_sql("), "); + + out.push_sql("previously_allocated_ip AS ("); + self.push_prior_allocation_subquery(out.reborrow())?; + out.push_sql("), "); + out.push_sql("validate_previously_allocated_ip AS MATERIALIZED("); + self.push_validate_prior_allocation_subquery(out.reborrow())?; + out.push_sql("), "); // Push the subquery that potentially inserts this record, or ignores // primary key conflicts (for idempotency). - out.push_sql("), external_ip AS ("); + out.push_sql("external_ip AS ("); self.push_update_external_ip_subquery(out.reborrow())?; + out.push_sql("), "); // Push the subquery that bumps the `rcgen` of the IP Pool range table - out.push_sql("), updated_pool_range AS ("); + out.push_sql("updated_pool_range AS ("); self.push_update_ip_pool_range_subquery(out.reborrow())?; + out.push_sql(") "); // Select the contents of the actual record that was created or updated. - out.push_sql(") SELECT * FROM external_ip"); + out.push_sql("SELECT * FROM external_ip"); Ok(()) } @@ -584,6 +756,7 @@ impl RunQueryDsl for NextExternalIp {} mod tests { use crate::context::OpContext; use crate::db::datastore::DataStore; + use crate::db::datastore::SERVICE_IP_POOL_NAME; use crate::db::identity::Resource; use crate::db::model::IpKind; use crate::db::model::IpPool; @@ -591,6 +764,7 @@ mod tests { use crate::db::model::Name; use crate::external_api::shared::IpRange; use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use dropshot::test_util::LogContext; use nexus_test_utils::db::test_setup_database; use omicron_common::api::external::Error; @@ -624,12 +798,8 @@ mod tests { Self { logctx, opctx, db, db_datastore } } - async fn create_ip_pool_internal( - &self, - name: &str, - range: IpRange, - internal: bool, - ) { + async fn create_ip_pool(&self, name: &str, range: IpRange) { + let internal = false; let pool = IpPool::new( &IdentityMetadataCreateParams { name: String::from(name).parse().unwrap(), @@ -638,7 +808,8 @@ mod tests { internal, ); - diesel::insert_into(crate::db::schema::ip_pool::dsl::ip_pool) + use crate::db::schema::ip_pool::dsl as ip_pool_dsl; + diesel::insert_into(ip_pool_dsl::ip_pool) .values(pool.clone()) .execute_async( self.db_datastore @@ -649,6 +820,26 @@ mod tests { .await .expect("Failed to create IP Pool"); + self.initialize_ip_pool(name, range).await; + } + + async fn initialize_ip_pool(&self, name: &str, range: IpRange) { + // Find the target IP pool + use crate::db::schema::ip_pool::dsl as ip_pool_dsl; + let pool = ip_pool_dsl::ip_pool + .filter(ip_pool_dsl::name.eq(name.to_string())) + .filter(ip_pool_dsl::time_deleted.is_null()) + .select(IpPool::as_select()) + .get_result_async( + self.db_datastore + .pool_authorized(&self.opctx) + .await + .unwrap(), + ) + .await + .expect("Failed to 'SELECT' IP Pool"); + + // Insert a range into this IP pool let pool_range = IpPoolRange::new(&range, pool.id()); diesel::insert_into( crate::db::schema::ip_pool_range::dsl::ip_pool_range, @@ -661,15 +852,6 @@ mod tests { .expect("Failed to create IP Pool range"); } - async fn create_service_ip_pool(&self, name: &str, range: IpRange) { - self.create_ip_pool_internal(name, range, /*internal=*/ true).await; - } - - async fn create_ip_pool(&self, name: &str, range: IpRange) { - self.create_ip_pool_internal(name, range, /*internal=*/ false) - .await; - } - async fn default_pool_id(&self) -> Uuid { let (.., pool) = self .db_datastore @@ -698,7 +880,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 1), )) .unwrap(); - context.create_ip_pool("default", range).await; + context.initialize_ip_pool("default", range).await; for first_port in (0..super::MAX_PORT).step_by(super::NUM_SOURCE_NAT_PORTS) { @@ -755,7 +937,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 1), )) .unwrap(); - context.create_ip_pool("default", range).await; + context.initialize_ip_pool("default", range).await; // Allocate an Ephemeral IP, which should take the entire port range of // the only address in the pool. @@ -839,7 +1021,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 3), )) .unwrap(); - context.create_ip_pool("default", range).await; + context.initialize_ip_pool("default", range).await; // TODO-completess: Implementing Iterator for IpRange would be nice. let addresses = [ @@ -942,7 +1124,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 3), )) .unwrap(); - context.create_ip_pool("default", range).await; + context.initialize_ip_pool("default", range).await; let instance_id = Uuid::new_v4(); let id = Uuid::new_v4(); @@ -976,7 +1158,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 2), )) .unwrap(); - context.create_service_ip_pool("for-nexus", ip_range).await; + context.initialize_ip_pool(SERVICE_IP_POOL_NAME, ip_range).await; // Allocate an IP address as we would for an external, rack-associated // service. @@ -1023,7 +1205,119 @@ mod tests { } #[tokio::test] - async fn test_insert_external_ip_for_service_is_idempoent() { + async fn test_explicit_external_ip_for_service_is_idempotent() { + let context = TestContext::new( + "test_explicit_external_ip_for_service_is_idempotent", + ) + .await; + + let ip_range = IpRange::try_from(( + Ipv4Addr::new(10, 0, 0, 1), + Ipv4Addr::new(10, 0, 0, 4), + )) + .unwrap(); + context.initialize_ip_pool(SERVICE_IP_POOL_NAME, ip_range).await; + + // Allocate an IP address as we would for an external, rack-associated + // service. + let id = Uuid::new_v4(); + let ip = context + .db_datastore + .allocate_explicit_service_ip( + &context.opctx, + id, + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), + ) + .await + .expect("Failed to allocate service IP address"); + assert_eq!(ip.kind, IpKind::Service); + assert_eq!(ip.ip.ip(), IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3))); + assert_eq!(ip.first_port.0, 0); + assert_eq!(ip.last_port.0, u16::MAX); + assert!(ip.instance_id.is_none()); + + // Try allocating the same service IP again. + let ip_again = context + .db_datastore + .allocate_explicit_service_ip( + &context.opctx, + id, + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), + ) + .await + .expect("Failed to allocate service IP address"); + assert_eq!(ip.id, ip_again.id); + assert_eq!(ip.ip.ip(), ip_again.ip.ip()); + + // Try allocating the same service IP once more, but do it with a + // different UUID. + let err = context + .db_datastore + .allocate_explicit_service_ip( + &context.opctx, + Uuid::new_v4(), + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), + ) + .await + .expect_err("Should have failed to re-allocate same IP address (different UUID)"); + assert_eq!( + err.to_string(), + "Invalid Request: Requested external IP address not available" + ); + + // Try allocating the same service IP once more, but do it with a + // different input address. + let err = context + .db_datastore + .allocate_explicit_service_ip( + &context.opctx, + id, + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)), + ) + .await + .expect_err("Should have failed to re-allocate different IP address (same UUID)"); + assert_eq!( + err.to_string(), + "Invalid Request: Re-allocating IP address with a different value" + ); + + context.success().await; + } + + #[tokio::test] + async fn test_explicit_external_ip_for_service_out_of_range() { + let context = TestContext::new( + "test_explicit_external_ip_for_service_out_of_range", + ) + .await; + + let ip_range = IpRange::try_from(( + Ipv4Addr::new(10, 0, 0, 1), + Ipv4Addr::new(10, 0, 0, 4), + )) + .unwrap(); + context.initialize_ip_pool(SERVICE_IP_POOL_NAME, ip_range).await; + + let id = Uuid::new_v4(); + let err = context + .db_datastore + .allocate_explicit_service_ip( + &context.opctx, + id, + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), + ) + .await + .expect_err("Should have failed to allocate out-of-bounds IP"); + assert_eq!( + err.to_string(), + "Invalid Request: Requested external IP address not available" + ); + + context.success().await; + } + + #[tokio::test] + async fn test_insert_external_ip_for_service_is_idempotent() { let context = TestContext::new( "test_insert_external_ip_for_service_is_idempotent", ) @@ -1034,7 +1328,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 2), )) .unwrap(); - context.create_service_ip_pool("for-nexus", ip_range).await; + context.initialize_ip_pool(SERVICE_IP_POOL_NAME, ip_range).await; // Allocate an IP address as we would for an external, rack-associated // service. @@ -1078,7 +1372,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 1), )) .unwrap(); - context.create_service_ip_pool("for-nexus", ip_range).await; + context.initialize_ip_pool(SERVICE_IP_POOL_NAME, ip_range).await; // Allocate an IP address as we would for an external, rack-associated // service. @@ -1117,7 +1411,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 3), )) .unwrap(); - context.create_ip_pool("default", range).await; + context.initialize_ip_pool("default", range).await; // Create one SNAT IP address. let instance_id = Uuid::new_v4(); @@ -1182,7 +1476,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 3), )) .unwrap(); - context.create_ip_pool("default", first_range).await; + context.initialize_ip_pool("default", first_range).await; let second_range = IpRange::try_from(( Ipv4Addr::new(10, 0, 0, 4), Ipv4Addr::new(10, 0, 0, 6), @@ -1226,7 +1520,7 @@ mod tests { Ipv4Addr::new(10, 0, 0, 3), )) .unwrap(); - context.create_ip_pool("default", first_range).await; + context.initialize_ip_pool("default", first_range).await; let first_address = Ipv4Addr::new(10, 0, 0, 4); let last_address = Ipv4Addr::new(10, 0, 0, 6); let second_range = diff --git a/nexus/src/db/true_or_cast_error.rs b/nexus/src/db/true_or_cast_error.rs index d99dc14bde..6f14cd4642 100644 --- a/nexus/src/db/true_or_cast_error.rs +++ b/nexus/src/db/true_or_cast_error.rs @@ -29,7 +29,7 @@ impl TrueOrCastError where E: Expression, { - pub fn new(expression: E, error: &'static str) -> Self { + pub const fn new(expression: E, error: &'static str) -> Self { Self { expression, error } } } diff --git a/nexus/src/lib.rs b/nexus/src/lib.rs index 468178dc14..94a889a92f 100644 --- a/nexus/src/lib.rs +++ b/nexus/src/lib.rs @@ -212,6 +212,7 @@ impl nexus_test_interface::NexusServer for Server { internal_api::params::RackInitializationRequest { services: vec![], datasets: vec![], + internal_services_ip_pool_ranges: vec![], certs: vec![], }, ) diff --git a/nexus/src/populate.rs b/nexus/src/populate.rs index 31a1cfcd71..3d315a444c 100644 --- a/nexus/src/populate.rs +++ b/nexus/src/populate.rs @@ -43,14 +43,11 @@ //! each populator behaves as expected in the above ways. use crate::context::OpContext; -use crate::db::{self, DataStore}; -use crate::external_api::params; +use crate::db::DataStore; use futures::future::BoxFuture; use futures::FutureExt; use lazy_static::lazy_static; use omicron_common::api::external::Error; -use omicron_common::api::external::IdentityMetadataCreateParams; -use omicron_common::api::external::Name; use omicron_common::backoff; use std::sync::Arc; use uuid::Uuid; @@ -279,18 +276,9 @@ impl Populator for PopulateFleet { 'a: 'b, { async { - let id = *db::fixed_data::FLEET_ID; datastore - .virtual_provisioning_collection_create( - opctx, - db::model::VirtualProvisioningCollection::new( - id, - db::model::CollectionTypeProvisioned::Fleet, - ), - ) - .await?; - - Ok(()) + .load_builtin_fleet_virtual_provisioning_collection(opctx) + .await } .boxed() } @@ -308,44 +296,8 @@ impl Populator for PopulateRack { where 'a: 'b, { - async { - datastore - .rack_insert(opctx, &db::model::Rack::new(args.rack_id)) - .await?; - - let params = params::IpPoolCreate { - identity: IdentityMetadataCreateParams { - name: "oxide-service-pool".parse::().unwrap(), - description: String::from("IP Pool for Oxide Services"), - }, - }; - datastore - .ip_pool_create(opctx, ¶ms, /*internal=*/ true) - .await - .map(|_| ()) - .or_else(|e| match e { - Error::ObjectAlreadyExists { .. } => Ok(()), - _ => Err(e), - })?; - - let params = params::IpPoolCreate { - identity: IdentityMetadataCreateParams { - name: "default".parse::().unwrap(), - description: String::from("default IP pool"), - }, - }; - datastore - .ip_pool_create(opctx, ¶ms, /*internal=*/ false) - .await - .map(|_| ()) - .or_else(|e| match e { - Error::ObjectAlreadyExists { .. } => Ok(()), - _ => Err(e), - })?; - - Ok(()) - } - .boxed() + async { datastore.load_builtin_rack_data(opctx, args.rack_id).await } + .boxed() } } diff --git a/nexus/tests/integration_tests/ip_pools.rs b/nexus/tests/integration_tests/ip_pools.rs index 4a794aefd0..0f29e1a9c8 100644 --- a/nexus/tests/integration_tests/ip_pools.rs +++ b/nexus/tests/integration_tests/ip_pools.rs @@ -674,7 +674,10 @@ async fn test_ip_pool_service(cptestctx: &ControlPlaneTestContext) { .unwrap() .parsed_body() .unwrap(); - assert_eq!(fetched_pool.identity.name, "oxide-service-pool"); + assert_eq!( + fetched_pool.identity.name, + omicron_nexus::db::datastore::SERVICE_IP_POOL_NAME + ); assert_eq!(fetched_pool.identity.description, "IP Pool for Oxide Services"); // Add some ranges. Pagination is tested more explicitly in the IP pool diff --git a/nexus/types/src/external_api/shared.rs b/nexus/types/src/external_api/shared.rs index 6b4971c8e0..189e2f8875 100644 --- a/nexus/types/src/external_api/shared.rs +++ b/nexus/types/src/external_api/shared.rs @@ -219,6 +219,15 @@ impl IpRange { } } +impl From for IpRange { + fn from(addr: IpAddr) -> Self { + match addr { + IpAddr::V4(addr) => IpRange::V4(Ipv4Range::from(addr)), + IpAddr::V6(addr) => IpRange::V6(Ipv6Range::from(addr)), + } + } +} + impl TryFrom<(Ipv4Addr, Ipv4Addr)> for IpRange { type Error = String; @@ -263,6 +272,12 @@ impl Ipv4Range { } } +impl From for Ipv4Range { + fn from(addr: Ipv4Addr) -> Self { + Self { first: addr, last: addr } + } +} + #[derive(Clone, Copy, Debug, Deserialize)] struct AnyIpv4Range { first: Ipv4Addr, @@ -305,6 +320,12 @@ impl Ipv6Range { } } +impl From for Ipv6Range { + fn from(addr: Ipv6Addr) -> Self { + Self { first: addr, last: addr } + } +} + #[derive(Clone, Copy, Debug, Deserialize)] struct AnyIpv6Range { first: Ipv6Addr, diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index ad001d13ee..6d9b600ce8 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -3,6 +3,8 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Params define the request bodies of API endpoints for creating or updating resources. + +use crate::external_api::shared::IpRange; use omicron_common::api::external::ByteCount; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -215,15 +217,14 @@ impl std::fmt::Debug for Certificate { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct RackInitializationRequest { + /// Services on the rack which have been created by RSS. pub services: Vec, + /// Datasets on the rack which have been provisioned by RSS. pub datasets: Vec, - // TODO(https://github.com/oxidecomputer/omicron/issues/1530): - // While it's true that Nexus will only run with a single address, - // we want to convey information about the available pool of addresses - // when handing off from RSS -> Nexus. - - // TODO(https://github.com/oxidecomputer/omicron/issues/1528): - // Support passing x509 cert info. + /// Ranges of the service IP pool which may be used for internal services, + /// such as Nexus. + pub internal_services_ip_pool_ranges: Vec, + /// x.509 Certificates used to encrypt communication with the external API. pub certs: Vec, } diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 4214164ae9..20bdba25b4 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -1680,6 +1680,62 @@ } ] }, + "IpRange": { + "oneOf": [ + { + "title": "v4", + "allOf": [ + { + "$ref": "#/components/schemas/Ipv4Range" + } + ] + }, + { + "title": "v6", + "allOf": [ + { + "$ref": "#/components/schemas/Ipv6Range" + } + ] + } + ] + }, + "Ipv4Range": { + "description": "A non-decreasing IPv4 address range, inclusive of both ends.\n\nThe first address must be less than or equal to the last address.", + "type": "object", + "properties": { + "first": { + "type": "string", + "format": "ipv4" + }, + "last": { + "type": "string", + "format": "ipv4" + } + }, + "required": [ + "first", + "last" + ] + }, + "Ipv6Range": { + "description": "A non-decreasing IPv6 address range, inclusive of both ends.\n\nThe first address must be less than or equal to the last address.", + "type": "object", + "properties": { + "first": { + "type": "string", + "format": "ipv6" + }, + "last": { + "type": "string", + "format": "ipv6" + } + }, + "required": [ + "first", + "last" + ] + }, "Measurement": { "description": "A `Measurement` is a timestamped datum from a single metric", "type": "object", @@ -1969,18 +2025,28 @@ "type": "object", "properties": { "certs": { + "description": "x.509 Certificates used to encrypt communication with the external API.", "type": "array", "items": { "$ref": "#/components/schemas/Certificate" } }, "datasets": { + "description": "Datasets on the rack which have been provisioned by RSS.", "type": "array", "items": { "$ref": "#/components/schemas/DatasetCreateRequest" } }, + "internal_services_ip_pool_ranges": { + "description": "Ranges of the service IP pool which may be used for internal services, such as Nexus.", + "type": "array", + "items": { + "$ref": "#/components/schemas/IpRange" + } + }, "services": { + "description": "Services on the rack which have been created by RSS.", "type": "array", "items": { "$ref": "#/components/schemas/ServicePutRequest" @@ -1990,6 +2056,7 @@ "required": [ "certs", "datasets", + "internal_services_ip_pool_ranges", "services" ] }, diff --git a/sled-agent/src/rack_setup/service.rs b/sled-agent/src/rack_setup/service.rs index 0344c42ddd..d2e495e9ae 100644 --- a/sled-agent/src/rack_setup/service.rs +++ b/sled-agent/src/rack_setup/service.rs @@ -86,7 +86,7 @@ use slog::Logger; use sprockets_host::Ed25519Certificate; use std::collections::{HashMap, HashSet}; use std::iter; -use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}; use std::path::PathBuf; use thiserror::Error; use tokio::sync::OnceCell; @@ -471,6 +471,8 @@ impl ServiceInner { // a format which can be processed by Nexus. let mut services: Vec = vec![]; let mut datasets: Vec = vec![]; + let mut internal_services_ip_pool_ranges: Vec = + vec![]; for (addr, service_request) in service_plan.services.iter() { let sled_id = *id_map .get(addr) @@ -480,6 +482,25 @@ impl ServiceInner { for svc in &zone.services { let kind = match svc { ServiceType::Nexus { external_ip, internal_ip: _ } => { + // NOTE: Eventually, this IP pool will be entirely + // user-supplied. For now, however, it's inferred + // based on the input IP addresses. + let range = match external_ip { + IpAddr::V4(addr) => NexusTypes::IpRange::V4( + NexusTypes::Ipv4Range { + first: *addr, + last: *addr, + }, + ), + IpAddr::V6(addr) => NexusTypes::IpRange::V6( + NexusTypes::Ipv6Range { + first: *addr, + last: *addr, + }, + ), + }; + internal_services_ip_pool_ranges.push(range); + NexusTypes::ServiceKind::Nexus { external_address: *external_ip, } @@ -526,6 +547,13 @@ impl ServiceInner { let request = NexusTypes::RackInitializationRequest { services, datasets, + // TODO(https://github.com/oxidecomputer/omicron/issues/1530): Plumb + // these pools through RSS's API. + // + // Currently, we're passing the addresses to accomodate Nexus + // services, but the operator may want to supply additional + // addresses. + internal_services_ip_pool_ranges, // TODO(https://github.com/oxidecomputer/omicron/issues/1959): Plumb // these paths through RSS's API. // diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs index 1e745fbd8e..0135c78ce7 100644 --- a/sled-agent/src/sim/server.rs +++ b/sled-agent/src/sim/server.rs @@ -214,6 +214,7 @@ impl Server { let rack_init_request = NexusTypes::RackInitializationRequest { services: vec![], datasets, + internal_services_ip_pool_ranges: vec![], certs: vec![], };