Skip to content

Commit

Permalink
Add migration to turn parent_id column into bigint only if necess…
Browse files Browse the repository at this point in the history
…ary (flyteorg#554)

* Add migration to turn `parent_id` column into `bigint` only if necessary

Signed-off-by: eduardo apolinario <[email protected]>

* Migrations to handle the change of `parent_id` column in `node_executions` table

Signed-off-by: eduardo apolinario <[email protected]>

* Add unit test

Signed-off-by: eduardo apolinario <[email protected]>

* More coverage

Signed-off-by: eduardo apolinario <[email protected]>

* Create index outside transaction

Signed-off-by: eduardo apolinario <[email protected]>

---------

Signed-off-by: eduardo apolinario <[email protected]>
Co-authored-by: eduardo apolinario <[email protected]>
  • Loading branch information
eapolinario and eapolinario authored May 8, 2023
1 parent 950ad15 commit 00915ec
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 0 deletions.
161 changes: 161 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,143 @@ var NoopMigrations = []*gormigrate.Migration{
return nil
},
},
{
// This migration handles the necessary setup to change the type of the `parent_id` column in the node_executions table.
ID: "pg-2023-05-02-fix-parentid-type-phase-1",
Migrate: func(tx *gorm.DB) error {
shouldMigrate, err := shouldApplyFixParentidMigration(tx)
if err != nil {
return err
}
if !shouldMigrate {
return nil
}

// Alter table and add new column
if err := tx.Exec("ALTER TABLE node_executions ADD COLUMN new_parent_id BIGINT;").Error; err != nil {
return err
}

// Create trigger function
triggerFunction := `
CREATE FUNCTION set_new_parent_id() RETURNS TRIGGER AS
$BODY$
BEGIN
NEW.new_parent_id := NEW.parent_id;
RETURN NEW;
END
$BODY$ LANGUAGE PLPGSQL;
`
if err := tx.Exec(triggerFunction).Error; err != nil {
return err
}

// Create trigger
if err := tx.Exec("CREATE TRIGGER set_new_parent_id_trigger BEFORE INSERT OR UPDATE ON node_executions FOR EACH ROW EXECUTE PROCEDURE set_new_parent_id();").Error; err != nil {
return err
}

// Update table
if err := tx.Exec("UPDATE node_executions SET new_parent_id = parent_id WHERE parent_id is not null;").Error; err != nil {
return err
}

// Create new index
if err := tx.Exec("CREATE INDEX idx_node_executions_new_parent_id ON public.node_executions USING btree (new_parent_id);").Error; err != nil {
return err
}

return nil
},
Rollback: func(tx *gorm.DB) error {
// Drop trigger and function
if err := tx.Exec("DROP TRIGGER IF EXISTS set_new_parent_id_trigger ON node_executions;").Error; err != nil {
return err
}
if err := tx.Exec("DROP FUNCTION IF EXISTS set_new_parent_id();").Error; err != nil {
return err
}
// Drop column iff exists
if err := tx.Exec("ALTER TABLE node_executions DROP COLUMN IF EXISTS new_parent_id;").Error; err != nil {
return err
}
// Drop index idx_node_executions_new_parent_id
if err := tx.Exec("DROP INDEX IF EXISTS idx_node_executions_new_parent_id;").Error; err != nil {
return err
}

return nil
},
},
{
// This migration actually changes the type of the `parent_id` column in the node_executions table in a transaction.
ID: "pg-2023-05-02-fix-parentid-type-phase-2",
Migrate: func(tx *gorm.DB) error {
shouldMigrate, err := shouldApplyFixParentidMigration(tx)
if err != nil {
return err
}
if !shouldMigrate {
return nil
}

// Start transaction
tx1 := tx.Begin()
defer func() {
if r := recover(); r != nil {
tx1.Rollback()
}
}()

// Lock table
if err := tx1.Exec("LOCK TABLE node_executions IN EXCLUSIVE MODE;").Error; err != nil {
tx1.Rollback()
return err
}

// DropIndex and create a new one
if err := tx1.Exec("DROP INDEX idx_node_executions_parent_id;").Error; err != nil {
tx1.Rollback()
return err
}

// Drop and rename columns
if err := tx1.Exec("ALTER TABLE node_executions DROP COLUMN parent_id;").Error; err != nil {
tx1.Rollback()
return err
}

// Rename idx_node_executions_new_parent_id to idx_node_executions_parent_id
if err := tx1.Exec("ALTER INDEX idx_node_executions_new_parent_id RENAME TO idx_node_executions_parent_id;").Error; err != nil {
tx1.Rollback()
return err
}

if err := tx1.Exec("ALTER TABLE node_executions RENAME COLUMN new_parent_id TO parent_id;").Error; err != nil {
tx1.Rollback()
return err
}

// Drop trigger and function
if err := tx1.Exec("DROP TRIGGER IF EXISTS set_new_parent_id_trigger ON node_executions;").Error; err != nil {
tx1.Rollback()
return err
}
if err := tx1.Exec("DROP FUNCTION IF EXISTS set_new_parent_id();").Error; err != nil {
tx1.Rollback()
return err
}

// Commit transaction
if err := tx1.Commit().Error; err != nil {
return err
}
return nil
},
Rollback: func(tx *gorm.DB) error {
return nil
},
},
{
ID: "pg-noop-2023-03-31-noop-nodeexecution",
Migrate: func(tx *gorm.DB) error {
Expand Down Expand Up @@ -962,3 +1098,28 @@ func alterTableColumnType(db *sql.DB, columnName, columnType string) error {
}
return nil
}

func shouldApplyFixParentidMigration(db *gorm.DB) (bool, error) {
// This only applies to postgres and in the case of the node_executions table contains a
// column named parent_id of type `integer` instead of `bigint`.
if db.Dialector.Name() != "postgres" {
return false, nil
}

// We should only apply this migration in case the type of the parent_id column is integer
var columnType string
query := `
SELECT data_type
FROM information_schema.columns
WHERE table_name = ? AND column_name = ?;
`
err := db.Raw(query, "node_executions", "parent_id").Scan(&columnType).Error
if err != nil {
return false, err
}
if columnType == "bigint" {
return false, nil
}

return true, nil
}
26 changes: 26 additions & 0 deletions pkg/repositories/config/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

mocket "github.com/Selvatico/go-mocket"
"github.com/stretchr/testify/assert"
"gorm.io/driver/mysql"

"gorm.io/driver/postgres"
"gorm.io/gorm"
Expand Down Expand Up @@ -32,3 +33,28 @@ func GetDbForTest(t *testing.T) *gorm.DB {
}
return db
}

func TestShouldApplyFixParentidMigrationMysql(t *testing.T) {
mocket.Catcher.Register()
gormDb, _ := gorm.Open(mysql.New(mysql.Config{DriverName: mocket.DriverName}))
shouldApply, err := shouldApplyFixParentidMigration(gormDb)
assert.False(t, shouldApply)
assert.NoError(t, err)
}

func TestShouldApplyFixParentidMigration(t *testing.T) {
gormDb := GetDbForTest(t)
GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true
query := GlobalMock.NewMock()
query.WithQuery(`
SELECT data_type
FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2;
`)

shouldApply, err := shouldApplyFixParentidMigration(gormDb)
assert.True(t, shouldApply)
assert.True(t, query.Triggered)
assert.NoError(t, err)
}

0 comments on commit 00915ec

Please sign in to comment.