diff --git a/.sqlx/query-03c2ac38e6ec2801eed7e5b19076eafcc589f32e2424460b0c345e6c477a1b28.json b/.sqlx/query-03c2ac38e6ec2801eed7e5b19076eafcc589f32e2424460b0c345e6c477a1b28.json new file mode 100644 index 0000000..01991fa --- /dev/null +++ b/.sqlx/query-03c2ac38e6ec2801eed7e5b19076eafcc589f32e2424460b0c345e6c477a1b28.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create schema if not exists underway;", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "03c2ac38e6ec2801eed7e5b19076eafcc589f32e2424460b0c345e6c477a1b28" +} diff --git a/.sqlx/query-2584080f8ca262aa7526dde00a50ae125a6a8e52f4fc76d05d15f2c6f663d1cd.json b/.sqlx/query-2584080f8ca262aa7526dde00a50ae125a6a8e52f4fc76d05d15f2c6f663d1cd.json new file mode 100644 index 0000000..970dd2a --- /dev/null +++ b/.sqlx/query-2584080f8ca262aa7526dde00a50ae125a6a8e52f4fc76d05d15f2c6f663d1cd.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select exists (\n select 1 from pg_namespace where nspname = 'underway'\n );\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "2584080f8ca262aa7526dde00a50ae125a6a8e52f4fc76d05d15f2c6f663d1cd" +} diff --git a/.sqlx/query-ccf314910c001d4933326838f507380d36a50d63778ed5589d7ffe0fe9f60db3.json b/.sqlx/query-ccf314910c001d4933326838f507380d36a50d63778ed5589d7ffe0fe9f60db3.json new file mode 100644 index 0000000..5bf2953 --- /dev/null +++ b/.sqlx/query-ccf314910c001d4933326838f507380d36a50d63778ed5589d7ffe0fe9f60db3.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "set local search_path to underway;", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "ccf314910c001d4933326838f507380d36a50d63778ed5589d7ffe0fe9f60db3" +} diff --git a/.sqlx/query-f069227d261bdec30ade8d0553bffc2796c3f4914f773ef569526feb0c7062ea.json b/.sqlx/query-f069227d261bdec30ade8d0553bffc2796c3f4914f773ef569526feb0c7062ea.json new file mode 100644 index 0000000..5b7eb8b --- /dev/null +++ b/.sqlx/query-f069227d261bdec30ade8d0553bffc2796c3f4914f773ef569526feb0c7062ea.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select exists (\n select 1 from information_schema.tables\n where table_schema = 'underway' and \n table_name = '_sqlx_migrations'\n );\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "f069227d261bdec30ade8d0553bffc2796c3f4914f773ef569526feb0c7062ea" +} diff --git a/README.md b/README.md index 5a959b0..b79be54 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() @@ -207,7 +207,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 41b372d..0051a43 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/graceful_shutdown/src/main.rs b/examples/graceful_shutdown/src/main.rs index 7d406f6..e213e3a 100644 --- a/examples/graceful_shutdown/src/main.rs +++ b/examples/graceful_shutdown/src/main.rs @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { .await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/multitask/src/main.rs b/examples/multitask/src/main.rs index 867009b..47322bb 100644 --- a/examples/multitask/src/main.rs +++ b/examples/multitask/src/main.rs @@ -155,7 +155,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Create the task queue. let queue = Queue::builder() diff --git a/examples/rag/src/main.rs b/examples/rag/src/main.rs index 08c73ee..94afb0d 100644 --- a/examples/rag/src/main.rs +++ b/examples/rag/src/main.rs @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box> { let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set"); let pool = PgPool::connect(database_url).await?; - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; let openai_client = Client::new(); diff --git a/examples/scheduled/src/main.rs b/examples/scheduled/src/main.rs index d753e9e..c436eed 100644 --- a/examples/scheduled/src/main.rs +++ b/examples/scheduled/src/main.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(&database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/step/src/main.rs b/examples/step/src/main.rs index fd5e557..80afe91 100644 --- a/examples/step/src/main.rs +++ b/examples/step/src/main.rs @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Create our job. let job = Job::builder() diff --git a/examples/tracing/src/main.rs b/examples/tracing/src/main.rs index b4c93d6..3f489d7 100644 --- a/examples/tracing/src/main.rs +++ b/examples/tracing/src/main.rs @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/migrations/20240921151751_0.sql b/migrations/20240921151751_0.sql index 9bad654..4dbac9c 100644 --- a/migrations/20240921151751_0.sql +++ b/migrations/20240921151751_0.sql @@ -1,5 +1,9 @@ create schema if not exists underway; +-- Manage Underway migrations within the Underway schema. +create table if not exists underway._sqlx_migrations +(like public._sqlx_migrations including all); + create table underway.task_queue ( name text not null, dlq_name text, diff --git a/src/lib.rs b/src/lib.rs index daf36d1..293afab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,7 +60,7 @@ //! let pool = PgPool::connect(database_url).await?; //! //! // Run migrations. -//! underway::MIGRATOR.run(&pool).await?; +//! underway::run_migrations(&pool).await?; //! //! // Build the job. //! let job = Job::builder() @@ -135,7 +135,7 @@ //! let pool = PgPool::connect(database_url).await?; //! //! // Run migrations. -//! underway::MIGRATOR.run(&pool).await?; +//! underway::run_migrations(&pool).await?; //! //! // Build the job. //! let job = Job::builder() @@ -194,7 +194,7 @@ //! let pool = PgPool::connect(database_url).await?; //! //! // Run migrations. -//! underway::MIGRATOR.run(&pool).await?; +//! underway::run_migrations(&pool).await?; //! //! // Build the job. //! let job = Job::builder() @@ -265,7 +265,7 @@ #![warn(clippy::all, nonstandard_style, future_incompatible, missing_docs)] -use sqlx::migrate::Migrator; +use sqlx::{migrate::Migrator, PgPool}; pub use crate::{ job::{Job, To}, @@ -281,7 +281,16 @@ mod scheduler; pub mod task; pub mod worker; -/// A SQLx [`Migrator`] which provides Underway's schema migrations. +static MIGRATOR: Migrator = sqlx::migrate!(); + +/// Runs Underway migrations. +/// +/// A transaction is acquired via the provided pool and migrations are run via +/// this transaction. +/// +/// As no direct support for specifying the schema under which the migrations +/// table will live, we manually specify this via the search path. This ensures +/// that migrations are isolated to underway._sqlx_migrations. /// /// These migrations must be applied before queues, tasks, and workers can be /// run. @@ -304,8 +313,87 @@ pub mod worker; /// let pool = PgPool::connect(database_url).await?; /// /// // Run migrations. -/// underway::MIGRATOR.run(&pool).await?; +/// underway::run_migrations(&pool).await?; /// # Ok::<(), Box>(()) /// # }); /// # } -pub static MIGRATOR: Migrator = sqlx::migrate!(); +pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> { + let mut tx = pool.begin().await?; + + // Ensure the 'underway' schema exists + sqlx::query!("create schema if not exists underway;") + .execute(&mut *tx) + .await?; + + // Temporarily set search_path for this transaction + sqlx::query!("set local search_path to underway;") + .execute(&mut *tx) + .await?; + + // Run migrations within the 'underway' schema + MIGRATOR.run(&mut *tx).await?; + + tx.commit().await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use super::run_migrations; + + #[sqlx::test(migrations = false)] + async fn sanity_check_run_migrations(pool: PgPool) -> Result<(), sqlx::Error> { + run_migrations(&pool).await?; + + let schema_exists: bool = sqlx::query_scalar!( + r#" + select exists ( + select 1 from pg_namespace where nspname = 'underway' + ); + "#, + ) + .fetch_one(&pool) + .await? + .unwrap(); + assert!( + schema_exists, + "Schema 'underway' should exist after migrations." + ); + + let migrations_table_exists: bool = sqlx::query_scalar!( + r#" + select exists ( + select 1 from information_schema.tables + where table_schema = 'underway' and + table_name = '_sqlx_migrations' + ); + "#, + ) + .fetch_one(&pool) + .await? + .unwrap(); + assert!( + migrations_table_exists, + "Migrations table should exist in 'underway' schema." + ); + + let search_path: String = sqlx::query_scalar("show search_path;") + .fetch_one(&pool) + .await?; + + assert!( + !search_path.contains("underway"), + "search_path should not include 'underway' after the transaction." + ); + + assert!( + search_path.contains("public"), + "Default search_path should include 'public'." + ); + + Ok(()) + } +}