Skip to content

Commit

Permalink
isolate underway migrations
Browse files Browse the repository at this point in the history
This is something of a hack to work around the fact that SQLx migrations
do not currently support specifying a schema under which the migrations
table will live.

Here we provide a search path throughout our migrations and in the
transaction that will run the migrations to ensure that migrations are
applied to `underway._sqlx_migrations`. Note that this assumes a
`public._sqlx_migrations` exists.

In the future we should be able to use `sqlx.toml` to address this more
robustly. That's expected as part of the `0.9.0` release of SQLx. Please
see: launchbadge/sqlx#3383

Closes #11
  • Loading branch information
maxcountryman committed Oct 31, 2024
1 parent 707f284 commit fa060d4
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 17 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/graceful_shutdown/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/multitask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create the task queue.
let queue = Queue::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/rag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

let openai_client = Client::new();

Expand Down
2 changes: 1 addition & 1 deletion examples/scheduled/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(&database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/step/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create our job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
4 changes: 4 additions & 0 deletions migrations/20240921151751_0.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
create schema if not exists underway;

-- Manage Underway migrations within the Underway schema.
create table if not exists underway._sqlx_migrations
(like public._sqlx_migrations including all);

create table underway.task_queue (
name text not null,
dlq_name text,
Expand Down
99 changes: 92 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -135,7 +135,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -194,7 +194,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -265,7 +265,7 @@

#![warn(clippy::all, nonstandard_style, future_incompatible, missing_docs)]

use sqlx::migrate::Migrator;
use sqlx::{migrate::Migrator, PgPool};

pub use crate::{
job::{Job, To},
Expand All @@ -281,7 +281,16 @@ mod scheduler;
pub mod task;
pub mod worker;

/// A SQLx [`Migrator`] which provides Underway's schema migrations.
static MIGRATOR: Migrator = sqlx::migrate!();

/// Runs Underway migrations.
///
/// A transaction is acquired via the provided pool and migrations are run via
/// this transaction.
///
/// As no direct support for specifying the schema under which the migrations
/// table will live, we manually specify this via the search path. This ensures
/// that migrations are isolated to underway._sqlx_migrations.
///
/// These migrations must be applied before queues, tasks, and workers can be
/// run.
Expand All @@ -304,8 +313,84 @@ pub mod worker;
/// let pool = PgPool::connect(database_url).await?;
///
/// // Run migrations.
/// underway::MIGRATOR.run(&pool).await?;
/// underway::run_migrations(&pool).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
pub static MIGRATOR: Migrator = sqlx::migrate!();
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;

// Ensure the 'underway' schema exists
sqlx::query("create schema if not exists underway;")
.execute(&mut *tx)
.await?;

// Temporarily set search_path for this transaction
sqlx::query("set local search_path to underway;")
.execute(&mut *tx)
.await?;

// Run migrations within the 'underway' schema
MIGRATOR.run(&mut *tx).await?;

tx.commit().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use sqlx::{postgres::PgRow, Executor, PgPool, Row};

use super::run_migrations;

#[sqlx::test(migrations = false)]
async fn sanity_check_run_migrations(pool: PgPool) -> Result<(), sqlx::Error> {
run_migrations(&pool).await?;

let schema_exists: (bool,) = sqlx::query_as(
r#"
select exists (
select 1 from pg_namespace where nspname = 'underway'
);
"#,
)
.fetch_one(&pool)
.await?;
assert!(
schema_exists.0,
"Schema 'underway' should exist after migrations."
);

let migrations_table_exists: (bool,) = sqlx::query_as(
r#"
select exists (
select 1 from information_schema.tables
where table_schema = 'underway' and
table_name = '_sqlx_migrations'
);
"#,
)
.fetch_one(&pool)
.await?;
assert!(
migrations_table_exists.0,
"Migrations table should exist in 'underway' schema."
);

let row: PgRow = pool.fetch_one("show search_path;").await?;

let search_path: String = row.get(0);
assert!(
!search_path.contains("underway"),
"search_path should not include 'underway' after the transaction."
);

assert!(
search_path.contains("public"),
"Default search_path should include 'public'."
);

Ok(())
}
}

0 comments on commit fa060d4

Please sign in to comment.