Skip to content

Commit

Permalink
Explicitly manage schema upgrade transactions (#4096)
Browse files Browse the repository at this point in the history
- Previously, schema updates encouraged authors on each change to add
their own transactions, validating that the "current" and "target"
versions are correct.
- This unfortunately is not handled particularly well in scripted SQL. I
**incorrectly** thought that failing a transaction while
`batch_execute`-ing it (e.g., via a `CAST` error) would cause the
transaction to fail, and rollback. **This is not true**. In CockroachDB,
an error is thrown, but the transaction is not closed. This was the
cause of #4093 , where connections stuck in this mangled ongoing
transaction state were placed back into the connection pool.
- To fix this: Nexus now explicitly wraps each schema change in a
transaction using Diesel, which ensures that "on success, they're
committed, and on failure, they're rolled back").
- Additionally, this PR upgrades all existing schema changes to conform
to this "implied transaction from Nexus" policy, and makes it possible
to upgrade using multiple transactions in a single version change.

Fixes #4093
  • Loading branch information
smklein authored Sep 15, 2023
1 parent ea687b5 commit ca311de
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 219 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-bb8-diesel.workspace = true
async-trait.workspace = true
base64.workspace = true
bb8.workspace = true
camino.workspace = true
chrono.workspace = true
cookie.workspace = true
diesel.workspace = true
Expand Down
149 changes: 100 additions & 49 deletions nexus/db-queries/src/db/datastore/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use super::DataStore;
use crate::db;
use crate::db::error::public_error_from_diesel_pool;
use crate::db::error::ErrorHandler;
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use crate::db::TransactionError;
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::Utc;
use diesel::prelude::*;
use omicron_common::api::external::Error;
Expand All @@ -19,6 +23,48 @@ use std::collections::BTreeSet;
use std::ops::Bound;
use std::str::FromStr;

pub const EARLIEST_SUPPORTED_VERSION: &'static str = "1.0.0";

/// Reads a "version directory" and reads all SQL changes into
/// a result Vec.
///
/// Any file that starts with "up" and ends with "sql" is considered
/// part of the migration, and fully read to a string.
///
/// These are sorted lexicographically.
pub async fn all_sql_for_version_migration<P: AsRef<Utf8Path>>(
path: P,
) -> Result<Vec<String>, String> {
let target_dir = path.as_ref();
let mut up_sqls = vec![];
let entries = target_dir
.read_dir_utf8()
.map_err(|e| format!("Failed to readdir {target_dir}: {e}"))?;
for entry in entries {
let entry = entry.map_err(|err| format!("Invalid entry: {err}"))?;
let pathbuf = entry.into_path();
let is_up = pathbuf
.file_name()
.map(|name| name.starts_with("up"))
.unwrap_or(false);
let is_sql = matches!(pathbuf.extension(), Some("sql"));
if is_up && is_sql {
up_sqls.push(pathbuf);
}
}
up_sqls.sort();

let mut result = vec![];
for path in up_sqls.into_iter() {
result.push(
tokio::fs::read_to_string(&path)
.await
.map_err(|e| format!("Cannot read {path}: {e}"))?,
);
}
Ok(result)
}

impl DataStore {
// Ensures that the database schema matches "desired_version".
//
Expand Down Expand Up @@ -136,13 +182,12 @@ impl DataStore {
"target_version" => target_version.to_string(),
);

let up = config
.schema_dir
.join(target_version.to_string())
.join("up.sql");
let sql = tokio::fs::read_to_string(&up).await.map_err(|e| {
format!("Cannot read {up}: {e}", up = up.display())
})?;
let target_dir = Utf8PathBuf::from_path_buf(
config.schema_dir.join(target_version.to_string()),
)
.map_err(|e| format!("Invalid schema path: {}", e.display()))?;

let up_sqls = all_sql_for_version_migration(&target_dir).await?;

// Confirm the current version, set the "target_version"
// column to indicate that a schema update is in-progress.
Expand All @@ -160,8 +205,16 @@ impl DataStore {
"target_version" => target_version.to_string(),
);

// Perform the schema change.
self.apply_schema_update(&sql).await.map_err(|e| e.to_string())?;
for sql in &up_sqls {
// Perform the schema change.
self.apply_schema_update(
&current_version,
&target_version,
&sql,
)
.await
.map_err(|e| e.to_string())?;
}

info!(
log,
Expand Down Expand Up @@ -273,11 +326,37 @@ impl DataStore {

// Applies a schema update, using raw SQL read from a caller-supplied
// configuration file.
async fn apply_schema_update(&self, sql: &String) -> Result<(), Error> {
self.pool().batch_execute_async(&sql).await.map_err(|e| {
Error::internal_error(&format!("Failed to execute upgrade: {e}"))
})?;
Ok(())
async fn apply_schema_update(
&self,
current: &SemverVersion,
target: &SemverVersion,
sql: &String,
) -> Result<(), Error> {
let result = self.pool().transaction_async(|conn| async move {
if target.to_string() != EARLIEST_SUPPORTED_VERSION {
let validate_version_query = format!("SELECT CAST(\
IF(\
(\
SELECT version = '{current}' and target_version = '{target}'\
FROM omicron.public.db_metadata WHERE singleton = true\
),\
'true',\
'Invalid starting version for schema change'\
) AS BOOL\
);");
conn.batch_execute_async(&validate_version_query).await?;
}
conn.batch_execute_async(&sql).await?;
Ok::<_, TransactionError<()>>(())
}).await;

match result {
Ok(()) => Ok(()),
Err(TransactionError::CustomError(())) => panic!("No custom error"),
Err(TransactionError::Pool(e)) => {
Err(public_error_from_diesel_pool(e, ErrorHandler::Server))
}
}
}

// Completes a schema migration, upgrading to the new version.
Expand Down Expand Up @@ -392,50 +471,22 @@ mod test {
// v0 to v1 to v2, but it doesn't need to re-apply it.
add_upgrade(v0.clone(), "SELECT true;".to_string()).await;

// Ensure that all schema changes also validate the expected version
// information.
let wrap_in_version_checking_txn = |version, target, sql| -> String {
format!("BEGIN; \
SELECT CAST(\
IF(\
(\
SELECT version = '{version}' and target_version = '{target}'\
FROM omicron.public.db_metadata WHERE singleton = true\
),\
'true',\
'Invalid starting version for schema change'\
) AS BOOL\
);\
{sql};\
COMMIT;")
};

// This version adds a new table, but it takes a little while.
//
// This delay is intentional, so that some Nexus instances issuing
// the update act quickly, while others lag behind.
add_upgrade(
v1.clone(),
wrap_in_version_checking_txn(
&v0,
&v1,
"SELECT pg_sleep(RANDOM()); \
CREATE TABLE IF NOT EXISTS widget(); \
SELECT pg_sleep(RANDOM());",
),
"SELECT pg_sleep(RANDOM() / 10); \
CREATE TABLE IF NOT EXISTS widget(); \
SELECT pg_sleep(RANDOM() / 10);"
.to_string(),
)
.await;

// The table we just created is deleted by a subsequent update.
add_upgrade(
v2.clone(),
wrap_in_version_checking_txn(
&v1,
&v2,
"DROP TABLE IF EXISTS widget;",
),
)
.await;
add_upgrade(v2.clone(), "DROP TABLE IF EXISTS widget;".to_string())
.await;

// Show that the datastores can be created concurrently.
let config =
Expand Down
3 changes: 3 additions & 0 deletions nexus/db-queries/src/db/datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ mod vpc;
mod zpool;

pub use address_lot::AddressLotCreateResult;
pub use db_metadata::{
all_sql_for_version_migration, EARLIEST_SUPPORTED_VERSION,
};
pub use dns::DnsVersionUpdateBuilder;
pub use rack::RackInit;
pub use silo::Discoverability;
Expand Down
28 changes: 20 additions & 8 deletions nexus/tests/integration_tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use dropshot::test_util::LogContext;
use nexus_db_model::schema::SCHEMA_VERSION as LATEST_SCHEMA_VERSION;
use nexus_db_queries::db::datastore::{
all_sql_for_version_migration, EARLIEST_SUPPORTED_VERSION,
};
use nexus_test_utils::{db, load_test_config, ControlPlaneTestContextBuilder};
use omicron_common::api::external::SemverVersion;
use omicron_common::api::internal::shared::SwitchLocation;
Expand All @@ -22,7 +26,6 @@ use uuid::Uuid;

const SCHEMA_DIR: &'static str =
concat!(env!("CARGO_MANIFEST_DIR"), "/../schema/crdb");
const EARLIEST_SUPPORTED_VERSION: &'static str = "1.0.0";

async fn test_setup_just_crdb<'a>(
log: &Logger,
Expand Down Expand Up @@ -71,31 +74,40 @@ async fn apply_update(
// We skip this for the earliest supported version because these tables
// might not exist yet.
if version != EARLIEST_SUPPORTED_VERSION {
info!(log, "Updating schema version in db_metadata (setting target)");
let sql = format!("UPDATE omicron.public.db_metadata SET target_version = '{}' WHERE singleton = true;", version);
client
.batch_execute(&sql)
.await
.expect("Failed to bump version number");
}

let file = "up.sql";
let sql = tokio::fs::read_to_string(
PathBuf::from(SCHEMA_DIR).join(version).join(file),
)
.await
.unwrap();
let target_dir = Utf8PathBuf::from(SCHEMA_DIR).join(version);
let sqls = all_sql_for_version_migration(&target_dir).await.unwrap();

for _ in 0..times_to_apply {
client.batch_execute(&sql).await.expect("failed to apply update");
for sql in sqls.iter() {
client
.batch_execute("BEGIN;")
.await
.expect("Failed to BEGIN update");
client.batch_execute(&sql).await.expect("Failed to execute update");
client
.batch_execute("COMMIT;")
.await
.expect("Failed to COMMIT update");
}
}

// Normally, Nexus actually bumps the version number.
//
// We do so explicitly here.
info!(log, "Updating schema version in db_metadata (removing target)");
let sql = format!("UPDATE omicron.public.db_metadata SET version = '{}', target_version = NULL WHERE singleton = true;", version);
client.batch_execute(&sql).await.expect("Failed to bump version number");

client.cleanup().await.expect("cleaning up after wipe");
info!(log, "Update to {version} applied successfully");
}

async fn query_crdb_schema_version(crdb: &CockroachInstance) -> String {
Expand Down
4 changes: 0 additions & 4 deletions schema/crdb/1.0.0/up.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-- DO NOT EDIT THIS SCHEMA.
-- This file is a point-in-time snapshot of revision "1.0.0" of the database.

BEGIN;

/*
* We assume the database and user do not already exist so that we don't
* inadvertently clobber what's there. If they might exist, the user has to
Expand Down Expand Up @@ -2524,5 +2522,3 @@ INSERT INTO omicron.public.db_metadata (
) VALUES
( TRUE, NOW(), NOW(), '1.0.0', NULL)
ON CONFLICT DO NOTHING;

COMMIT;
23 changes: 0 additions & 23 deletions schema/crdb/2.0.0/up.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,2 @@
-- CRDB documentation recommends the following:
-- "Execute schema changes either as single statements (as an implicit transaction),
-- or in an explicit transaction consisting of the single schema change statement."
--
-- For each schema change, we transactionally:
-- 1. Check the current version
-- 2. Apply the idempotent update

BEGIN;

SELECT CAST(
IF(
(
SELECT version = '1.0.0' and target_version = '2.0.0'
FROM omicron.public.db_metadata WHERE singleton = true
),
'true',
'Invalid starting version for schema change'
) AS BOOL
);

ALTER TABLE omicron.public.instance
ADD COLUMN IF NOT EXISTS boot_on_fault BOOL NOT NULL DEFAULT false;

COMMIT;
27 changes: 2 additions & 25 deletions schema/crdb/3.0.0/up.sql
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
-- CRDB documentation recommends the following:
-- "Execute schema changes either as single statements (as an implicit transaction),
-- or in an explicit transaction consisting of the single schema change statement."
--
-- For each schema change, we transactionally:
-- 1. Check the current version
-- 2. Apply the idempotent update

BEGIN;

SELECT CAST(
IF(
(
SELECT version = '2.0.0' and target_version = '3.0.0'
FROM omicron.public.db_metadata WHERE singleton = true
),
'true',
'Invalid starting version for schema change'
) AS BOOL
);

ALTER TABLE omicron.public.ip_pool
ADD COLUMN IF NOT EXISTS silo_ID UUID,
ADD COLUMN IF NOT EXISTS project_id UUID,

-- if silo_id is null, then project_id must be null
ADD CONSTRAINT IF NOT EXISTS project_implies_silo CHECK (
NOT ((silo_id IS NULL) AND (project_id IS NOT NULL))
),

-- if internal = true, non-null silo_id and project_id are not allowed
-- if internal = true, non-null silo_id and project_id are not allowed
ADD CONSTRAINT IF NOT EXISTS internal_pools_have_null_silo_and_project CHECK (
NOT (INTERNAL AND ((silo_id IS NOT NULL) OR (project_id IS NOT NULL)))
);

COMMIT;
Loading

0 comments on commit ca311de

Please sign in to comment.