diff --git a/nexus/db-queries/src/db/datastore/db_metadata.rs b/nexus/db-queries/src/db/datastore/db_metadata.rs index 9e4e8b1a48..0ae61a7c38 100644 --- a/nexus/db-queries/src/db/datastore/db_metadata.rs +++ b/nexus/db-queries/src/db/datastore/db_metadata.rs @@ -25,6 +25,17 @@ use std::str::FromStr; pub const EARLIEST_SUPPORTED_VERSION: &'static str = "1.0.0"; +/// Describes a single file containing a schema change, as SQL. +pub struct SchemaUpgradeStep { + pub path: Utf8PathBuf, + pub sql: String, +} + +/// Describes a sequence of files containing schema changes. +pub struct SchemaUpgrade { + pub steps: Vec, +} + /// Reads a "version directory" and reads all SQL changes into /// a result Vec. /// @@ -34,7 +45,7 @@ pub const EARLIEST_SUPPORTED_VERSION: &'static str = "1.0.0"; /// These are sorted lexicographically. pub async fn all_sql_for_version_migration>( path: P, -) -> Result, String> { +) -> Result { let target_dir = path.as_ref(); let mut up_sqls = vec![]; let entries = target_dir @@ -54,13 +65,12 @@ pub async fn all_sql_for_version_migration>( } up_sqls.sort(); - let mut result = vec![]; + let mut result = SchemaUpgrade { steps: vec![] }; for path in up_sqls.into_iter() { - result.push( - tokio::fs::read_to_string(&path) - .await - .map_err(|e| format!("Cannot read {path}: {e}"))?, - ); + let sql = tokio::fs::read_to_string(&path) + .await + .map_err(|e| format!("Cannot read {path}: {e}"))?; + result.steps.push(SchemaUpgradeStep { path: path.to_owned(), sql }); } Ok(result) } @@ -187,7 +197,8 @@ impl DataStore { ) .map_err(|e| format!("Invalid schema path: {}", e.display()))?; - let up_sqls = all_sql_for_version_migration(&target_dir).await?; + let schema_change = + all_sql_for_version_migration(&target_dir).await?; // Confirm the current version, set the "target_version" // column to indicate that a schema update is in-progress. @@ -205,7 +216,7 @@ impl DataStore { "target_version" => target_version.to_string(), ); - for sql in &up_sqls { + for SchemaUpgradeStep { path: _, sql } in &schema_change.steps { // Perform the schema change. self.apply_schema_update( ¤t_version, diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index f5283e263e..2dc1e69a6f 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -91,7 +91,8 @@ mod zpool; pub use address_lot::AddressLotCreateResult; pub use db_metadata::{ - all_sql_for_version_migration, EARLIEST_SUPPORTED_VERSION, + all_sql_for_version_migration, SchemaUpgrade, SchemaUpgradeStep, + EARLIEST_SUPPORTED_VERSION, }; pub use dns::DnsVersionUpdateBuilder; pub use instance::InstanceAndActiveVmm; diff --git a/nexus/tests/integration_tests/schema.rs b/nexus/tests/integration_tests/schema.rs index e75211b834..d79dd09fc1 100644 --- a/nexus/tests/integration_tests/schema.rs +++ b/nexus/tests/integration_tests/schema.rs @@ -89,6 +89,7 @@ async fn apply_update_as_transaction( match apply_update_as_transaction_inner(client, sql).await { Ok(()) => break, Err(err) => { + warn!(log, "Failed to apply update as transaction"; "err" => err.to_string()); client .batch_execute("ROLLBACK;") .await @@ -111,7 +112,9 @@ async fn apply_update( version: &str, times_to_apply: usize, ) { - info!(log, "Performing upgrade to {version}"); + let log = log.new(o!("target version" => version.to_string())); + info!(log, "Performing upgrade"); + let client = crdb.connect().await.expect("failed to connect"); // We skip this for the earliest supported version because these tables @@ -126,11 +129,15 @@ async fn apply_update( } let target_dir = Utf8PathBuf::from(SCHEMA_DIR).join(version); - let sqls = all_sql_for_version_migration(&target_dir).await.unwrap(); + let schema_change = + all_sql_for_version_migration(&target_dir).await.unwrap(); for _ in 0..times_to_apply { - for sql in sqls.iter() { - apply_update_as_transaction(log, &client, sql).await; + for nexus_db_queries::db::datastore::SchemaUpgradeStep { path, sql } in + &schema_change.steps + { + info!(log, "Applying sql schema upgrade step"; "path" => path.to_string()); + apply_update_as_transaction(&log, &client, sql).await; } }