Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL onComplete flag, allow sql migration to run with others #280

Merged
merged 25 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,29 @@ A raw SQL operation runs arbitrary SQL against the database. This is intended as
}
```

By default, a `sql` operation cannot run together with other operations in the same migration. This is to ensure pgroll can correctly track the state of the database. However, it is possible to run a `sql` operation together with other operations by setting the `onComplete` flag to `true`.

The `onComplete` flag will make this operation run the `up` expression on the complete phase (instead of the default, which is to run it on the start phase).

`onComplete` flag is incompatible with `down` expression, as `pgroll` does not support running rollback after complete was executed.




```json
{
"sql": {
"up": "SQL expression",
"onComplete": true
}
}
```
exekias marked this conversation as resolved.
Show resolved Hide resolved

Example **raw SQL** migrations:

* [05_sql.json](../examples/05_sql.json)
* [32_sql_on_complete.json](../examples/32_sql_on_complete.json)


### Rename table

Expand Down
11 changes: 11 additions & 0 deletions examples/32_sql_on_complete.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "32_sql_on_complete",
"operations": [
{
"sql": {
"up": "ALTER TABLE people ADD COLUMN birth_date timestamp",
"onComplete": true
}
}
]
}
18 changes: 18 additions & 0 deletions pkg/jsonschema/testdata/sql-3.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
This is a valid 'sql' migration.
It specifies `up`, and `on_complete`

-- create_table.json --
{
"name": "migration_name",
"operations": [
{
"sql": {
"up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
"onComplete": true
}
}
]
}

-- valid --
true
19 changes: 19 additions & 0 deletions pkg/jsonschema/testdata/sql-4.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
This is an invalid 'sql' migration.
It specifies `up`, `down` and `on_complete`

-- create_table.json --
{
"name": "migration_name",
"operations": [
{
"sql": {
"up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
"down": "DROP TABLE users",
"onComplete": true
}
}
]
}

-- valid --
false
8 changes: 5 additions & 3 deletions pkg/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ type Operation interface {
// IsolatedOperation is an operation that cannot be executed with other operations
// in the same migration
type IsolatedOperation interface {
IsIsolated()
// this operation is isolated when executed on start, cannot be executed with other operations
IsIsolated() bool
andrew-farries marked this conversation as resolved.
Show resolved Hide resolved
}

// RequiresSchemaRefreshOperation is an operation that requires the resulting schema to be refreshed
type RequiresSchemaRefreshOperation interface {
// this operation requires the resulting schema to be refreshed when executed on start
RequiresSchemaRefresh()
}

Expand All @@ -56,8 +58,8 @@ type (
// returns a descriptive error if the migration is invalid
func (m *Migration) Validate(ctx context.Context, s *schema.Schema) error {
for _, op := range m.Operations {
if _, ok := op.(IsolatedOperation); ok {
if len(m.Operations) > 1 {
if isolatedOp, ok := op.(IsolatedOperation); ok {
if isolatedOp.IsIsolated() && len(m.Operations) > 1 {
return InvalidMigrationError{Reason: fmt.Sprintf("operation %q cannot be executed with other operations", OperationName(op))}
}
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestMigrationsIsolated(t *testing.T) {
&OpRawSQL{
Up: `foo`,
},
&OpRenameColumn{},
&OpCreateTable{Name: "foo"},
},
}

Expand All @@ -38,3 +38,18 @@ func TestMigrationsIsolatedValid(t *testing.T) {
err := migration.Validate(context.TODO(), schema.New())
assert.NoError(t, err)
}

func TestOnCompleteSQLMigrationsAreNotIsolated(t *testing.T) {
migration := Migration{
Name: "sql",
Operations: Operations{
&OpRawSQL{
Up: `foo`,
OnComplete: true,
},
&OpCreateTable{Name: "foo"},
},
}
err := migration.Validate(context.TODO(), schema.New())
assert.NoError(t, err)
}
18 changes: 13 additions & 5 deletions pkg/migrations/op_raw_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
var _ Operation = (*OpRawSQL)(nil)

func (o *OpRawSQL) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
_, err := conn.ExecContext(ctx, o.Up)
if err != nil {
if !o.OnComplete {
_, err := conn.ExecContext(ctx, o.Up)
return err
}
return nil
}

func (o *OpRawSQL) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
if o.OnComplete {
_, err := conn.ExecContext(ctx, o.Up)
return err
}
return nil
}

Expand All @@ -36,11 +40,15 @@ func (o *OpRawSQL) Validate(ctx context.Context, s *schema.Schema) error {
return EmptyMigrationError{}
}

if o.OnComplete && o.Down != "" {
return InvalidMigrationError{Reason: "down is not allowed with onComplete"}
}

return nil
}

// this operation is isolated, cannot be executed with other operations
func (o *OpRawSQL) IsIsolated() {}
func (o *OpRawSQL) IsIsolated() bool {
return !o.OnComplete
}

// this operation requires the resulting schema to be refreshed
func (o *OpRawSQL) RequiresSchemaRefresh() {}
75 changes: 75 additions & 0 deletions pkg/migrations/op_raw_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,81 @@ func TestRawSQL(t *testing.T) {
})
},
},
{
name: "raw SQL with onComplete",
migrations: []migrations.Migration{
{
Name: "01_create_table",
Operations: migrations.Operations{
&migrations.OpRawSQL{
OnComplete: true,
Up: `
CREATE TABLE test_table (
id serial,
name text
)
`,
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// SQL didn't run yet
TableMustNotExist(t, db, schema, "test_table")
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// table can be accessed after start
TableMustExist(t, db, schema, "test_table")

// inserts work
MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{
"name": "foo",
})
},
},
{
name: "raw SQL after a migration with onComplete",
migrations: []migrations.Migration{
{
Name: "01_create_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "test_table",
Columns: []migrations.Column{
{Name: "id", Type: "serial"},
{Name: "name", Type: "text"},
},
},
&migrations.OpRawSQL{
OnComplete: true,
Up: `
ALTER TABLE test_table ADD COLUMN age int
`,
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// SQL didn't run yet
ViewMustExist(t, db, schema, "01_create_table", "test_table")
ColumnMustNotExist(t, db, schema, "test_table", "age")
},
afterRollback: func(t *testing.T, db *sql.DB, schema string) {
// table is dropped after rollback
TableMustNotExist(t, db, schema, "test_table")
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// table can be accessed after start
TableMustExist(t, db, schema, "test_table")
ColumnMustExist(t, db, schema, "test_table", "age")

// inserts work
MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{
"name": "foo",
"age": "42",
})
},
},
{
name: "migration on top of raw SQL",
migrations: []migrations.Migration{
Expand Down
3 changes: 3 additions & 0 deletions pkg/migrations/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 39 additions & 11 deletions pkg/roll/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs .
fmt.Errorf("unable to execute start operation: %w", err),
errRollback)
}

// refresh schema when the op is isolated and requires a refresh (for example raw sql)
// we don't want to refresh the schema if the operation is not isolated as it would
// override changes made by other operations
if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok {
// refresh schema
newSchema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to refresh schema: %w", err)
if isolatedOp, ok := op.(migrations.IsolatedOperation); ok && isolatedOp.IsIsolated() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we now only refresh schema for ops that are both RequiresSchemaRefreshOperation and IsolatedOperation rather than just RequiresSchemaRefreshOperation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason is: that it could be non-isolated & require schema refresh. That's the case for migrations with onComplete.

Migrations that are not isolated & require a schema refresh on Start phase would surely break others, as it would override the resulting schema from the start ops, by refreshing it afterwards. I tried to capture this in the comment

newSchema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to refresh schema: %w", err)
}
}
}
}
Expand All @@ -64,16 +67,21 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs .
return nil
}

// create views for the new version
return m.ensureViews(ctx, newSchema, migration.Name)
}

func (m *Roll) ensureViews(ctx context.Context, schema *schema.Schema, version string) error {
// create schema for the new version
versionSchema := VersionedSchemaName(m.schema, migration.Name)
_, err = m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema)))
versionSchema := VersionedSchemaName(m.schema, version)
_, err := m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema)))
if err != nil {
return err
}

// create views in the new schema
for name, table := range newSchema.Tables {
err = m.createView(ctx, migration.Name, name, table)
for name, table := range schema.Tables {
err = m.ensureView(ctx, version, name, table)
if err != nil {
return fmt.Errorf("unable to create view: %w", err)
}
Expand Down Expand Up @@ -112,11 +120,29 @@ func (m *Roll) Complete(ctx context.Context) error {
}

// execute operations
refreshViews := false
for _, op := range migration.Operations {
err := op.Complete(ctx, m.pgConn, schema)
if err != nil {
return fmt.Errorf("unable to execute complete operation: %w", err)
}

if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok {
refreshViews = true
}
}

// recreate views for the new version (if some operations require it, ie SQL)
if refreshViews && !m.disableVersionSchemas {
schema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to read schema: %w", err)
}

err = m.ensureViews(ctx, schema, migration.Name)
if err != nil {
return err
}
}

// mark as completed
Expand Down Expand Up @@ -162,7 +188,7 @@ func (m *Roll) Rollback(ctx context.Context) error {
}

// create view creates a view for the new version of the schema
func (m *Roll) createView(ctx context.Context, version, name string, table schema.Table) error {
func (m *Roll) ensureView(ctx context.Context, version, name string, table schema.Table) error {
columns := make([]string, 0, len(table.Columns))
for k, v := range table.Columns {
columns = append(columns, fmt.Sprintf("%s AS %s", pq.QuoteIdentifier(v.Name), pq.QuoteIdentifier(k)))
Expand All @@ -179,7 +205,9 @@ func (m *Roll) createView(ctx context.Context, version, name string, table schem
}

_, err := m.pgConn.ExecContext(ctx,
fmt.Sprintf("CREATE OR REPLACE VIEW %s.%s %s AS SELECT %s FROM %s",
fmt.Sprintf("BEGIN; DROP VIEW IF EXISTS %s.%s; CREATE VIEW %s.%s %s AS SELECT %s FROM %s; COMMIT",
andrew-farries marked this conversation as resolved.
Show resolved Hide resolved
pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)),
pq.QuoteIdentifier(name),
pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)),
pq.QuoteIdentifier(name),
withOptions,
Expand Down
25 changes: 25 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,34 @@
"up": {
"description": "SQL expression for up migration",
"type": "string"
},
"onComplete": {
"description": "SQL expression will run on complete step (rather than on start)",
"type": "boolean",
"default": false
}
},
"required": ["up"],
"oneOf": [
{
"required": ["down"]
},
{
"required": ["onComplete"]
},
{
"not": {
"anyOf": [
{
"required": ["down"]
},
{
"required": ["onComplete"]
}
]
}
}
],
"type": "object"
},
"OpRenameTable": {
Expand Down
Loading