diff --git a/nexus/db-queries/src/db/queries/network_interface.rs b/nexus/db-queries/src/db/queries/network_interface.rs index 3a5648cd86..ec1bdf6700 100644 --- a/nexus/db-queries/src/db/queries/network_interface.rs +++ b/nexus/db-queries/src/db/queries/network_interface.rs @@ -9,7 +9,7 @@ use crate::db::error::{public_error_from_diesel, retryable, ErrorHandler}; use crate::db::model::IncompleteNetworkInterface; use crate::db::pool::DbConnection; use crate::db::queries::next_item::DefaultShiftGenerator; -use crate::db::queries::next_item::NextItem; +use crate::db::queries::next_item::{NextItem, NextItemSelfJoined}; use crate::db::schema::network_interface::dsl; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; @@ -33,7 +33,7 @@ use omicron_common::api::external; use omicron_common::api::external::MacAddr; use once_cell::sync::Lazy; use slog_error_chain::SlogInlineError; -use std::net::IpAddr; +use std::net::{IpAddr, Ipv6Addr}; use uuid::Uuid; // These are sentinel values and other constants used to verify the state of the @@ -448,36 +448,6 @@ fn decode_database_error( } } -// Helper to return the offset of the last valid/allocatable IP in a subnet. -// Note that this is the offset from the _first available address_, not the -// network address. -fn last_address_offset(subnet: &IpNetwork) -> u32 { - // Generate last address in the range. - // - // NOTE: First subtraction is to convert from the subnet size to an - // offset, since `generate_series` is inclusive of the last value. - // Example: 256 -> 255. - let last_address_offset = match subnet { - IpNetwork::V4(network) => network.size() - 1, - IpNetwork::V6(network) => { - // TODO-robustness: IPv6 subnets are always /64s, so in theory we - // could require searching all ~2^64 items for the next address. - // That won't happen in practice, because there will be other limits - // on the number of IPs (such as MAC addresses, or just project - // accounting limits). However, we should update this to be the - // actual maximum size we expect or want to support, once we get a - // better sense of what that is. - u32::try_from(network.size() - 1).unwrap_or(u32::MAX - 1) - } - }; - - // This subtraction is because the last address in a subnet is - // explicitly reserved for Oxide use. - last_address_offset - .checked_sub(1 + NUM_INITIAL_RESERVED_IP_ADDRESSES as u32) - .unwrap_or_else(|| panic!("Unexpectedly small IP subnet: '{}'", subnet)) -} - // Return the first available address in a subnet. This is not the network // address, since Oxide reserves the first few addresses. fn first_available_address(subnet: &IpNetwork) -> IpAddr { @@ -489,12 +459,9 @@ fn first_available_address(subnet: &IpNetwork) -> IpAddr { }) .into(), IpNetwork::V6(network) => { - // TODO-performance: This is unfortunate. `ipnetwork` implements a - // direct addition-based approach for IPv4 but not IPv6. This will - // loop, which, while it may not matter much, can be nearly - // trivially avoided by converting to u128, adding, and converting - // back. Given that these spaces can be _really_ big, that is - // probably worth doing. + // NOTE: This call to `nth()` will loop and call the `next()` + // implementation. That's inefficient, but the number of reserved + // addresses is very small, so it should not matter. network .iter() .nth(NUM_INITIAL_RESERVED_IP_ADDRESSES as _) @@ -506,11 +473,41 @@ fn first_available_address(subnet: &IpNetwork) -> IpAddr { } } +// Return the last available address in a subnet. This is not the broadcast +// address, since that is reserved. +fn last_available_address(subnet: &IpNetwork) -> IpAddr { + // NOTE: In both cases below, we subtract 2 from the network size. That's + // because we first subtract 1 to go from a size to an index, and then + // another 1 because the broadcast address isn't valid for an interface. + match subnet { + IpNetwork::V4(network) => network + .size() + .checked_sub(2) + .and_then(|n| network.nth(n)) + .map(IpAddr::V4) + .unwrap_or_else(|| { + panic!("Unexpectedly small IPv4 subnetwork: '{}'", network); + }), + IpNetwork::V6(network) => { + // NOTE: The iterator implementation for `Ipv6Network` only + // implements the required `Iterator::next()` method. That means we + // get the default implementation of the `nth()` method, which will + // loop and call `next()`. That is ridiculously inefficient, so we + // manually compute the nth address through addition instead. + let base = u128::from(network.network()); + let n = network.size().checked_sub(2).unwrap_or_else(|| { + panic!("Unexpectedly small IPv6 subnetwork: '{}'", network); + }); + IpAddr::V6(Ipv6Addr::from(base + n)) + } + } +} + /// The `NextIpv4Address` query is a `NextItem` query for choosing the next /// available IPv4 address for an interface. #[derive(Debug, Clone, Copy)] pub struct NextIpv4Address { - inner: NextItem< + inner: NextItemSelfJoined< db::schema::network_interface::table, IpNetwork, db::schema::network_interface::dsl::ip, @@ -522,11 +519,9 @@ pub struct NextIpv4Address { impl NextIpv4Address { pub fn new(subnet: Ipv4Network, subnet_id: Uuid) -> Self { let subnet = IpNetwork::from(subnet); - let net = IpNetwork::from(first_available_address(&subnet)); - let max_shift = i64::from(last_address_offset(&subnet)); - let generator = DefaultShiftGenerator::new(net, max_shift, 0) - .expect("invalid min/max shift"); - Self { inner: NextItem::new_scoped(generator, subnet_id) } + let min = IpNetwork::from(first_available_address(&subnet)); + let max = IpNetwork::from(last_available_address(&subnet)); + Self { inner: NextItemSelfJoined::new_scoped(subnet_id, min, max) } } } @@ -607,7 +602,7 @@ impl QueryFragment for NextNicSlot { /// a network interface. #[derive(Debug, Clone, Copy)] pub struct NextMacAddress { - inner: NextItem< + inner: NextItemSelfJoined< db::schema::network_interface::table, db::model::MacAddr, db::schema::network_interface::dsl::mac, @@ -616,63 +611,19 @@ pub struct NextMacAddress { >, } -// Helper to ensure we correctly compute the min/max shifts for a next MAC -// query. -#[derive(Copy, Clone, Debug)] -struct NextMacShifts { - base: MacAddr, - min_shift: i64, - max_shift: i64, -} - -impl NextMacShifts { - fn for_guest() -> Self { - let base = MacAddr::random_guest(); - Self::shifts_for(base, MacAddr::MIN_GUEST_ADDR, MacAddr::MAX_GUEST_ADDR) - } - - fn for_system() -> NextMacShifts { - let base = MacAddr::random_system(); - Self::shifts_for( - base, - MacAddr::MIN_SYSTEM_ADDR, - MacAddr::MAX_SYSTEM_ADDR, - ) - } - - fn shifts_for(base: MacAddr, min: i64, max: i64) -> NextMacShifts { - let x = base.to_i64(); - - // The max shift is the distance to the last value. This min shift is - // always expressed as a negative number, giving the largest leftward - // shift, i.e., the distance to the first value. - let max_shift = max - x; - let min_shift = min - x; - Self { base, min_shift, max_shift } - } -} - impl NextMacAddress { pub fn new(vpc_id: Uuid, kind: NetworkInterfaceKind) -> Self { - let (base, max_shift, min_shift) = match kind { + let (min, max) = match kind { NetworkInterfaceKind::Instance | NetworkInterfaceKind::Probe => { - let NextMacShifts { base, min_shift, max_shift } = - NextMacShifts::for_guest(); - (base.into(), max_shift, min_shift) + (MacAddr::MIN_GUEST_ADDR, MacAddr::MAX_GUEST_ADDR) } NetworkInterfaceKind::Service => { - let NextMacShifts { base, min_shift, max_shift } = - NextMacShifts::for_system(); - (base.into(), max_shift, min_shift) + (MacAddr::MIN_SYSTEM_ADDR, MacAddr::MAX_SYSTEM_ADDR) } }; - let generator = DefaultShiftGenerator::new(base, max_shift, min_shift) - .unwrap_or_else(|| { - panic!( - "invalid min shift ({min_shift}) or max_shift ({max_shift})" - ) - }); - Self { inner: NextItem::new_scoped(generator, vpc_id) } + let min = db::model::MacAddr(MacAddr::from_i64(min)); + let max = db::model::MacAddr(MacAddr::from_i64(max)); + Self { inner: NextItemSelfJoined::new_scoped(vpc_id, min, max) } } } @@ -1840,7 +1791,6 @@ fn decode_delete_network_interface_database_error( #[cfg(test)] mod tests { use super::first_available_address; - use super::last_address_offset; use super::DeleteError; use super::InsertError; use super::MAX_NICS_PER_INSTANCE; @@ -1857,7 +1807,7 @@ mod tests { use crate::db::model::NetworkInterface; use crate::db::model::Project; use crate::db::model::VpcSubnet; - use crate::db::queries::network_interface::NextMacShifts; + use crate::db::queries::network_interface::last_available_address; use async_bb8_diesel::AsyncRunQueryDsl; use dropshot::test_util::LogContext; use model::NetworkInterfaceKind; @@ -2821,7 +2771,8 @@ mod tests { .await; assert!( matches!(result, Err(InsertError::NoAvailableIpAddresses)), - "Address exhaustion should be detected and handled" + "Address exhaustion should be detected and handled, found {:?}", + result, ); context.success().await; } @@ -2963,24 +2914,6 @@ mod tests { context.success().await; } - #[test] - fn test_last_address_offset() { - let subnet = "172.30.0.0/28".parse().unwrap(); - assert_eq!( - last_address_offset(&subnet), - // /28 = 2 ** 4 = 16 total addresses - // ... - 1 for converting from size to index = 15 - // ... - 1 for reserved broadcast address = 14 - // ... - 5 for reserved initial addresses = 9 - 9, - ); - let subnet = "fd00::/64".parse().unwrap(); - assert_eq!( - last_address_offset(&subnet), - u32::MAX - 1 - 1 - super::NUM_INITIAL_RESERVED_IP_ADDRESSES as u32, - ); - } - #[test] fn test_first_available_address() { let subnet = "172.30.0.0/28".parse().unwrap(); @@ -2996,35 +2929,16 @@ mod tests { } #[test] - fn test_next_mac_shifts_for_system() { - let NextMacShifts { base, min_shift, max_shift } = - NextMacShifts::for_system(); - assert!(base.is_system()); - assert!( - min_shift <= 0, - "expected min shift to be negative, found {min_shift}" - ); - assert!(max_shift >= 0, "found {max_shift}"); - let x = base.to_i64(); - assert_eq!(x + min_shift, MacAddr::MIN_SYSTEM_ADDR); - assert_eq!(x + max_shift, MacAddr::MAX_SYSTEM_ADDR); - } - - #[test] - fn test_next_mac_shifts_for_guest() { - let NextMacShifts { base, min_shift, max_shift } = - NextMacShifts::for_guest(); - assert!(base.is_guest()); - assert!( - min_shift <= 0, - "expected min shift to be negative, found {min_shift}" + fn test_last_available_address() { + let subnet = "172.30.0.0/28".parse().unwrap(); + assert_eq!( + last_available_address(&subnet), + "172.30.0.14".parse::().unwrap(), ); - assert!( - max_shift >= 0, - "expected max shift to be positive, found {max_shift}" + let subnet = "fd00::/64".parse().unwrap(); + assert_eq!( + last_available_address(&subnet), + "fd00::ffff:ffff:ffff:fffe".parse::().unwrap(), ); - let x = base.to_i64(); - assert_eq!(x + min_shift, MacAddr::MIN_GUEST_ADDR); - assert_eq!(x + max_shift, MacAddr::MAX_GUEST_ADDR); } } diff --git a/nexus/db-queries/src/db/queries/next_item.rs b/nexus/db-queries/src/db/queries/next_item.rs index 658d151a5b..970baede2c 100644 --- a/nexus/db-queries/src/db/queries/next_item.rs +++ b/nexus/db-queries/src/db/queries/next_item.rs @@ -4,15 +4,19 @@ //! A generic query for selecting a unique next item from a table. +use crate::db::DbConnection; use diesel::associations::HasTable; use diesel::pg::Pg; use diesel::prelude::Column; use diesel::prelude::Expression; use diesel::query_builder::AstPass; +use diesel::query_builder::Query; use diesel::query_builder::QueryFragment; +use diesel::query_builder::QueryId; use diesel::serialize::ToSql; use diesel::sql_types; use diesel::sql_types::HasSqlType; +use diesel::RunQueryDsl; use std::marker::PhantomData; use uuid::Uuid; @@ -236,6 +240,8 @@ where } } +const TIME_DELETED_COLUMN_IDENT: &str = "time_deleted"; +const NEXT_ITEM_COLUMN_IDENT: &str = "next_item"; const SHIFT_COLUMN_IDENT: &str = "shift"; const INDEX_COLUMN_IDENT: &str = "index"; @@ -288,6 +294,19 @@ where } } +impl QueryId + for NextItem +{ + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl + RunQueryDsl + for NextItem +{ +} + impl QueryFragment for NextItem where @@ -584,6 +603,320 @@ impl ShiftGenerator for DefaultShiftGenerator { } } +/// Select the next available item from a table, using a self-join. +/// +/// This query is similar in spirit to the above `NextItem` query. However, it's +/// implementation is different, and specifically designed to limit memory +/// consumption of the overall query. +/// +/// CockroachDB eagerly evaluates subqueries. This leads to high memory usage +/// for the basic `NextItem` query, since that relies on a subquery that uses +/// `generate_series()` to create the list of all possible items. That entire +/// list must be materialized and held in memory. +/// +/// In contrast, this query is implemented using a self-join between the +/// existing table to be searched and the "next entry" (e.g., the value of the +/// target column plus 1). This can result in lower memory consumption, since +/// that series need not be stored in memory. Note that this relies on an index +/// on the item column, which may be partial. +/// +/// The full query looks like this: +/// +/// ```sql +/// SELECT IF( +/// -- This condition detects if the table is empty. +/// EXISTS( +/// SELECT +/// 1 +/// FROM +/// +/// WHERE +/// = AND +/// time_deleted IS NULL +/// LIMIT 1 +/// ), +/// -- If the table is _not_ empty, we do the self-join between `item` and +/// -- `item + 1`, and take the first value where `item` is NULL, i.e., the +/// -- smallest value of `item + 1` where there is no corresponding `item` +/// -- in the table. +/// ( +/// SELECT next_item AS +/// FROM ( +/// SELECT +/// + 1 AS next_item +/// FROM +///
+/// WHERE +/// = AND +/// time_deleted IS NULL +/// ) +/// LEFT OUTER JOIN +///
+/// ON +/// (, next_item <= , time_deleted IS NULL) = +/// (, TRUE, TRUE) +/// WHERE +/// IS NULL AND next_item <= +/// ORDER BY next_item +/// LIMIT 1 +/// ), +/// -- If the table _is_ empty, just insert the minimum value. +/// +/// ) +/// ``` +/// +/// # Which one to use? +/// +/// One should probably prefer to use this form of the query, if possible. Both +/// queries appear to read roughly the same amount of data from disk, but the +/// series-based implementation uses far more memory and runs more slowly. +/// +/// One downside of this query is that it always allocates the next _smallest_ +/// item that's available. The `NextItem` query lets the caller choose a base +/// from which to start the search, which is useful in situations where one +/// would prefer to randomly distribute items rather than sequentially allocate +/// them. +#[derive(Debug, Clone, Copy)] +pub(super) struct NextItemSelfJoined< + Table, + Item, + ItemColumn, + ScopeKey = NoScopeKey, + ScopeColumn = NoScopeColumn, +> { + table: Table, + _d: PhantomData<(Item, ItemColumn, ScopeColumn)>, + scope_key: ScopeKey, + item_min: Item, + item_max: Item, +} + +impl + NextItemSelfJoined +where + // Table is a database table whose name can be used in a query fragment + Table: diesel::Table + HasTable
+ QueryFragment + Copy, + + // Item can be converted to the SQL type of the ItemColumn + Item: ToSql<::SqlType, Pg> + Copy, + + // ItemColum is a column in the target table + ItemColumn: Column
+ Copy, + + // ScopeKey can be converted to the SQL type of the ScopeColumn + ScopeKey: ScopeKeyType + ToSql<::SqlType, Pg>, + + // ScopeColumn is a column on the target table + ScopeColumn: ScopeColumnType + Column
, + + // The Postgres backend supports the SQL types of both columns + Pg: HasSqlType<::SqlType> + + HasSqlType<::SqlType>, +{ + /// Create a new `NextItemSelfJoined` query, scoped to a particular key. + /// + /// Both `item_min` and `item_max` are _inclusive_. + pub(super) fn new_scoped( + scope_key: ScopeKey, + item_min: Item, + item_max: Item, + ) -> Self { + Self { + table: Table::table(), + _d: PhantomData, + scope_key, + item_min, + item_max, + } + } +} + +impl + NextItemSelfJoined +where + Table: diesel::Table + HasTable
+ QueryFragment + Copy, + Item: ToSql<::SqlType, Pg> + Copy, + ItemColumn: Column
+ Copy, + Pg: HasSqlType<::SqlType>, +{ + /// Create a new `NextItemSelfJoined` query, with a global scope. + #[cfg(test)] + fn new_unscoped(item_min: Item, item_max: Item) -> Self { + Self { + table: Table::table(), + _d: PhantomData, + scope_key: NoScopeKey, + item_min, + item_max, + } + } +} + +impl QueryFragment + for NextItemSelfJoined +where + Table: diesel::Table + HasTable
+ QueryFragment + Copy, + Item: ToSql<::SqlType, Pg> + Copy, + ItemColumn: Column
+ Copy, + ScopeKey: ToSql<::SqlType, Pg>, + ScopeColumn: Column
, + Pg: HasSqlType<::SqlType> + + HasSqlType<::SqlType>, +{ + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.push_sql("SELECT IF(EXISTS(SELECT 1 FROM "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" WHERE "); + out.push_identifier(ScopeColumn::NAME)?; + out.push_sql(" = "); + out.push_bind_param::<::SqlType, ScopeKey>( + &self.scope_key, + )?; + out.push_sql(" AND "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL LIMIT 1), (SELECT "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" AS "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" FROM (SELECT "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" + 1 AS "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" FROM "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" WHERE "); + out.push_identifier(ScopeColumn::NAME)?; + out.push_sql(" = "); + out.push_bind_param::<::SqlType, ScopeKey>( + &self.scope_key, + )?; + out.push_sql(" AND "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL) LEFT OUTER JOIN "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" ON ("); + out.push_identifier(ScopeColumn::NAME)?; + out.push_sql(", "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(", "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" <= "); + out.push_bind_param::<::SqlType, Item>( + &self.item_max, + )?; + out.push_sql(", "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL) = ("); + out.push_bind_param::<::SqlType, ScopeKey>( + &self.scope_key, + )?; + out.push_sql(", "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(", TRUE, TRUE) WHERE "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" IS NULL AND "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" <= "); + out.push_bind_param::<::SqlType, Item>( + &self.item_max, + )?; + out.push_sql(" ORDER BY "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" LIMIT 1), "); + out.push_bind_param::<::SqlType, Item>( + &self.item_min, + )?; + out.push_sql(")"); + Ok(()) + } +} + +impl QueryId + for NextItemSelfJoined +{ + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl Query + for NextItemSelfJoined +where + Table: diesel::Table + HasTable
+ QueryFragment + Copy, + Item: ToSql<::SqlType, Pg> + Copy, + ItemColumn: Column
+ Copy, +{ + type SqlType = ::SqlType; +} + +impl RunQueryDsl + for NextItemSelfJoined +{ +} + +impl QueryFragment + for NextItemSelfJoined +where + Table: diesel::Table + HasTable
+ QueryFragment + Copy, + Item: ToSql<::SqlType, Pg> + Copy, + ItemColumn: Column
+ Copy, + Pg: HasSqlType<::SqlType>, +{ + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.push_sql("SELECT IF(EXISTS(SELECT 1 FROM "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" WHERE "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL LIMIT 1), (SELECT "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" AS "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" FROM (SELECT "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" + 1 AS "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" FROM "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" WHERE "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL) LEFT OUTER JOIN "); + self.table.walk_ast(out.reborrow())?; + out.push_sql(" ON ("); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(", "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" <= "); + out.push_bind_param::<::SqlType, Item>( + &self.item_max, + )?; + out.push_sql(", "); + out.push_identifier(TIME_DELETED_COLUMN_IDENT)?; + out.push_sql(" IS NULL) = ("); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(", TRUE, TRUE) WHERE "); + out.push_identifier(ItemColumn::NAME)?; + out.push_sql(" IS NULL AND "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" <= "); + out.push_bind_param::<::SqlType, Item>( + &self.item_max, + )?; + out.push_sql(" ORDER BY "); + out.push_identifier(NEXT_ITEM_COLUMN_IDENT)?; + out.push_sql(" LIMIT 1), "); + out.push_bind_param::<::SqlType, Item>( + &self.item_min, + )?; + out.push_sql(")"); + Ok(()) + } +} + #[cfg(test)] mod tests { @@ -591,6 +924,8 @@ mod tests { use super::NextItem; use super::ShiftIndices; use crate::db; + use crate::db::explain::ExplainableAsync as _; + use crate::db::queries::next_item::NextItemSelfJoined; use async_bb8_diesel::AsyncRunQueryDsl; use async_bb8_diesel::AsyncSimpleConnection; use chrono::DateTime; @@ -700,6 +1035,68 @@ mod tests { } } + #[derive(Debug, Clone, Copy)] + struct NextItemSelfJoinedQuery { + inner: NextItemSelfJoined, + } + + // These implementations are needed to actually allow inserting the results + // of the `NextItemSelfJoinedQuery` itself. + impl NextItemSelfJoinedQuery { + fn new(min: i32, max: i32) -> Self { + let inner = NextItemSelfJoined::new_unscoped(min, max); + Self { inner } + } + } + + delegate_query_fragment_impl!(NextItemSelfJoinedQuery); + + impl QueryId for NextItemSelfJoinedQuery { + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; + } + + impl Insertable for NextItemSelfJoinedQuery { + type Values = NextItemQuerySelfJoinedValues; + fn values(self) -> Self::Values { + NextItemQuerySelfJoinedValues(self) + } + } + + #[derive(Debug, Clone)] + struct NextItemQuerySelfJoinedValues(NextItemSelfJoinedQuery); + + impl QueryId for NextItemQuerySelfJoinedValues { + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; + } + + impl diesel::insertable::CanInsertInSingleQuery + for NextItemQuerySelfJoinedValues + { + fn rows_to_insert(&self) -> Option { + Some(1) + } + } + + impl QueryFragment for NextItemQuerySelfJoinedValues { + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.push_sql("("); + out.push_identifier(item::dsl::id::NAME)?; + out.push_sql(", "); + out.push_identifier(item::dsl::value::NAME)?; + out.push_sql(", "); + out.push_identifier(item::dsl::time_deleted::NAME)?; + out.push_sql(") VALUES (gen_random_uuid(), ("); + self.0.walk_ast(out.reborrow())?; + out.push_sql("), NULL)"); + Ok(()) + } + } + // Test that we correctly insert the next available item #[tokio::test] async fn test_wrapping_next_item_query() { @@ -853,4 +1250,178 @@ mod tests { assert_eq!(indices.second_start, 1); assert_eq!(indices.second_end, 10); } + + #[tokio::test] + async fn test_explain_next_item_self_joined() { + // Setup the test database + let logctx = dev::test_setup_log("test_explain_next_item_self_joined"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); + + // We're going to operate on a separate table, for simplicity. + setup_test_schema(&pool).await; + + let query = NextItemSelfJoined::< + item::dsl::item, + _, + item::dsl::value, + _, + item::dsl::id, + >::new_scoped(Uuid::nil(), i32::MIN, i32::MAX); + let out = query.explain_async(&conn).await.unwrap(); + println!("{out}"); + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_next_item_self_joined() { + // Setup the test database + let logctx = dev::test_setup_log("test_next_item_self_joined"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); + + // We're going to operate on a separate table, for simplicity. + setup_test_schema(&pool).await; + + for i in 0..10 { + let query = NextItemSelfJoinedQuery::new(0, 9); + let it = diesel::insert_into(item::dsl::item) + .values(query) + .returning(Item::as_returning()) + .get_result_async(&*conn) + .await + .unwrap(); + assert_eq!(it.value, i, "Should insert values in order"); + } + + let query = NextItemSelfJoinedQuery::new(0, 9); + diesel::insert_into(item::dsl::item) + .values(query) + .returning(Item::as_returning()) + .get_result_async(&*conn) + .await + .expect_err("should not be able to insert after the query range is exhausted"); + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_next_item_self_joined_with_gaps() { + // Setup the test database + let logctx = + dev::test_setup_log("test_next_item_self_joined_with_gaps"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); + + // We're going to operate on a separate table, for simplicity. + setup_test_schema(&pool).await; + + // Insert mostly the same items, but leave some gaps. + const TO_SKIP: [i32; 2] = [3, 7]; + let items: Vec<_> = (0..10) + .filter_map(|value| { + if TO_SKIP.contains(&value) { + None + } else { + Some(Item { + id: Uuid::new_v4(), + value: value as _, + time_deleted: None, + }) + } + }) + .collect(); + diesel::insert_into(item::dsl::item) + .values(items.clone()) + .execute_async(&*conn) + .await + .expect("Should be able to insert basic items"); + + // Next, let's ensure we get the items we skipped in the last round. + for i in TO_SKIP.iter() { + let query = NextItemSelfJoinedQuery::new(0, 9); + let it = diesel::insert_into(item::dsl::item) + .values(query) + .returning(Item::as_returning()) + .get_result_async(&*conn) + .await + .unwrap(); + assert_eq!( + it.value, *i, + "Should have inserted the next skipped value" + ); + } + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[ignore] + #[tokio::test] + async fn print_next_item_query_forms() { + // Setup the test database + let logctx = dev::test_setup_log("print_next_item_query_forms"); + let log = logctx.log.new(o!()); + let db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); + + // We're going to operate on a separate table, for simplicity. + setup_test_schema(&pool).await; + + // Insert a bunch of items, not using the next item machinery. + const N_ITEMS: usize = 10_000; + let items: Vec<_> = (0..N_ITEMS) + .map(|value| Item { + id: Uuid::new_v4(), + value: value as _, + time_deleted: None, + }) + .collect(); + diesel::insert_into(item::dsl::item) + .values(items.clone()) + .execute_async(&*conn) + .await + .expect("Should be able to insert basic items"); + + // Create the SQL queries in the two forms, and print them. + let next_item_generate = + NextItem::::new_unscoped( + DefaultShiftGenerator::new(0, N_ITEMS as _, 0).unwrap(), + ); + let next_item_join = NextItemSelfJoined::< + item::dsl::item, + i32, + item::dsl::value, + >::new_unscoped(0, N_ITEMS as _); + println!( + "Next-item using `generate_series()`:\n{}\n\ + Next-item using self-join:\n{}\n", + diesel::debug_query(&next_item_generate), + diesel::debug_query(&next_item_join) + ); + assert!( + false, + "This test fails intentionally. The above queries \ + can be run in the database that has been left around and \ + seeded with {N_ITEMS} items in the 'test_schema.item' table. \ + Manually copy them and insert the bind parameters, and you \ + can use EXPLAIN ANALYZE to show their runtime and \ + memory consumption profile.\n", + ); + } }