Skip to content

Commit

Permalink
Add test to run DB downgrade in the CI (#21273)
Browse files Browse the repository at this point in the history
This attempts to add db upgrade/downgrade test to the CI
  • Loading branch information
ephraimbuddy authored Mar 4, 2022
1 parent 1949f5d commit b153a2e
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 99 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
run: airflow-freespace
- name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh
- name: "Test downgrade"
run: ./scripts/ci/testing/run_downgrade_test.sh
- name: "Tests: ${{needs.build-info.outputs.testTypes}}"
run: ./scripts/ci/testing/ci_run_airflow_testing.sh
env:
Expand Down Expand Up @@ -829,6 +831,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
run: airflow-freespace
- name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh
- name: "Test downgrade"
run: ./scripts/ci/testing/run_downgrade_test.sh
- name: "Tests: ${{needs.build-info.outputs.testTypes}}"
run: ./scripts/ci/testing/ci_run_airflow_testing.sh
env:
Expand Down Expand Up @@ -891,6 +895,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
run: airflow-freespace
- name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh
- name: "Test downgrade"
run: ./scripts/ci/testing/run_downgrade_test.sh
- name: "Tests: ${{needs.build-info.outputs.testTypes}}"
run: ./scripts/ci/testing/ci_run_airflow_testing.sh
env:
Expand Down Expand Up @@ -951,6 +957,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
run: airflow-freespace
- name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh
- name: "Test downgrade"
run: ./scripts/ci/testing/run_downgrade_test.sh
- name: "Tests: ${{needs.build-info.outputs.testTypes}}"
run: ./scripts/ci/testing/ci_run_airflow_testing.sh
env:
Expand Down
43 changes: 43 additions & 0 deletions airflow/migrations/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from collections import defaultdict


def get_mssql_table_constraints(conn, table_name):
"""
This function return primary and unique constraint
along with column name. Some tables like `task_instance`
is missing the primary key constraint name and the name is
auto-generated by the SQL server. so this function helps to
retrieve any primary or unique constraint name.
:param conn: sql connection object
:param table_name: table name
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(lambda: defaultdict(list))
for constraint, constraint_type, col_name in result:
constraint_dict[constraint_type][constraint].append(col_name)
return constraint_dict
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def upgrade():

def downgrade():
"""Unapply Add ``max_active_runs`` column to ``dag_model`` table"""
op.drop_column('dag', 'max_active_runs')
with op.batch_alter_table('dag') as batch_op:
batch_op.drop_column('max_active_runs')
with op.batch_alter_table('dag_run', schema=None) as batch_op:
# Drop index to dag_run.dag_id and also drop index to dag_run.state where state==running
batch_op.drop_index('idx_dag_run_dag_id')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def remap_permissions():
def undo_remap_permissions():
"""Unapply Map Airflow permissions"""
appbuilder = create_app(config={'FAB_UPDATE_PERMS': False}).appbuilder
for old, new in mapping.items:
for old, new in mapping.items():
(new_resource_name, new_action_name) = new[0]
new_permission = appbuilder.sm.get_permission(new_action_name, new_resource_name)
if not new_permission:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import sqlalchemy as sa
from alembic import op

from airflow.migrations.utils import get_mssql_table_constraints

# revision identifiers, used by Alembic.
revision = '5e3ec427fdd3'
down_revision = '587bdf053233'
Expand All @@ -47,9 +49,35 @@ def upgrade():

def downgrade():
"""Revert length of email from 256 to 64 characters"""
with op.batch_alter_table('ab_user') as batch_op:
batch_op.alter_column('username', type_=sa.String(64))
batch_op.alter_column('email', type_=sa.String(64))
with op.batch_alter_table('ab_register_user') as batch_op:
batch_op.alter_column('username', type_=sa.String(64))
batch_op.alter_column('email', type_=sa.String(64))
conn = op.get_bind()
if conn.dialect.name != 'mssql':
with op.batch_alter_table('ab_user') as batch_op:
batch_op.alter_column('username', type_=sa.String(64), nullable=False)
batch_op.alter_column('email', type_=sa.String(64))
with op.batch_alter_table('ab_register_user') as batch_op:
batch_op.alter_column('username', type_=sa.String(64))
batch_op.alter_column('email', type_=sa.String(64))
else:
# MSSQL doesn't drop implicit unique constraints it created
# We need to drop the two unique constraints explicitly
with op.batch_alter_table('ab_user') as batch_op:
# Drop the unique constraint on username and email
constraints = get_mssql_table_constraints(conn, 'ab_user')
unique_key, _ = constraints['UNIQUE'].popitem()
batch_op.drop_constraint(unique_key, type_='unique')
unique_key, _ = constraints['UNIQUE'].popitem()
batch_op.drop_constraint(unique_key, type_='unique')
batch_op.alter_column('username', type_=sa.String(64), nullable=False)
batch_op.create_unique_constraint(None, ['username'])
batch_op.alter_column('email', type_=sa.String(64))
batch_op.create_unique_constraint(None, ['email'])

with op.batch_alter_table('ab_register_user') as batch_op:
# Drop the unique constraint on username and email
constraints = get_mssql_table_constraints(conn, 'ab_register_user')
for k, _ in constraints.get('UNIQUE').items():
batch_op.drop_constraint(k, type_='unique')
batch_op.alter_column('username', type_=sa.String(64))
batch_op.create_unique_constraint(None, ['username'])
batch_op.alter_column('email', type_=sa.String(64))
batch_op.create_unique_constraint(None, ['email'])
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,10 @@ def upgrade():

def downgrade():
"""Unapply Add ``timetable_description`` column to DagModel for UI."""
with op.batch_alter_table('dag', schema=None) as batch_op:
is_sqlite = bool(op.get_bind().dialect.name == 'sqlite')
if is_sqlite:
op.execute('PRAGMA foreign_keys=off')
with op.batch_alter_table('dag') as batch_op:
batch_op.drop_column('timetable_description')
if is_sqlite:
op.execute('PRAGMA foreign_keys=on')
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
"""

from collections import defaultdict

import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql import and_, column, select, table

from airflow.migrations.db_types import TIMESTAMP, StringID
from airflow.migrations.utils import get_mssql_table_constraints

ID_LEN = 250

Expand Down Expand Up @@ -65,31 +64,6 @@
)


def get_table_constraints(conn, table_name):
"""
This function return primary and unique constraint
along with column name. Some tables like `task_instance`
is missing the primary key constraint name and the name is
auto-generated by the SQL server. so this function helps to
retrieve any primary or unique constraint name.
:param conn: sql connection object
:param table_name: table name
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(lambda: defaultdict(list))
for constraint, constraint_type, col_name in result:
constraint_dict[constraint_type][constraint].append(col_name)
return constraint_dict


def upgrade():
"""Apply Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id."""
conn = op.get_bind()
Expand Down Expand Up @@ -246,7 +220,7 @@ def upgrade():
if dialect_name != 'postgresql':
# TODO: Is this right for non-postgres?
if dialect_name == 'mssql':
constraints = get_table_constraints(conn, "task_instance")
constraints = get_mssql_table_constraints(conn, "task_instance")
pk, _ = constraints['PRIMARY KEY'].popitem()
batch_op.drop_constraint(pk, type_='primary')
elif dialect_name not in ('sqlite'):
Expand Down Expand Up @@ -337,9 +311,10 @@ def downgrade():
with op.batch_alter_table('task_instance', schema=None) as batch_op:
batch_op.drop_constraint('task_instance_pkey', type_='primary')
batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False)
batch_op.alter_column(
'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=True
)
if dialect_name != 'mssql':
batch_op.alter_column(
'dag_id', existing_type=string_id_col_type, existing_nullable=False, nullable=True
)

batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'execution_date'])

Expand Down Expand Up @@ -375,6 +350,7 @@ def downgrade():
batch_op.drop_index('dag_id_state')
batch_op.drop_index('idx_dag_run_running_dags')
batch_op.drop_index('idx_dag_run_queued_dags')
batch_op.drop_index('idx_dag_run_dag_id')

batch_op.alter_column('dag_id', existing_type=string_id_col_type, nullable=True)
batch_op.alter_column('execution_date', existing_type=dt_type, nullable=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,12 @@ def upgrade():

def downgrade():
"""Unapply Increase maximum length of pool name in ``task_instance`` table to ``256`` characters"""
with op.batch_alter_table('task_instance') as batch_op:
batch_op.alter_column('pool', type_=sa.String(50), nullable=False)
conn = op.get_bind()
if conn.dialect.name == 'mssql':
with op.batch_alter_table('task_instance') as batch_op:
batch_op.drop_index('ti_pool')
batch_op.alter_column('pool', type_=sa.String(50), nullable=False)
batch_op.create_index('ti_pool', ['pool'])
else:
with op.batch_alter_table('task_instance') as batch_op:
batch_op.alter_column('pool', type_=sa.String(50), nullable=False)
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ def upgrade():

def downgrade():
"""Unapply Add ``queued_at`` column in ``dag_run`` table"""
op.drop_column('dag_run', "queued_at")
with op.batch_alter_table('dag_run') as batch_op:
batch_op.drop_column('queued_at')
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ def upgrade():

def downgrade():
"""Revert Increased length of pool name from 256 to 50 characters"""
# use batch_alter_table to support SQLite workaround
with op.batch_alter_table('slot_pool', table_args=sa.UniqueConstraint('pool')) as batch_op:
batch_op.alter_column('pool', type_=sa.String(50))
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ def upgrade():

def downgrade():
"""Unapply Remove id column from xcom"""
conn = op.get_bind()
with op.batch_alter_table('xcom') as bop:
bop.drop_constraint('pk_xcom', type_='primary')
bop.add_column(Column('id', Integer, primary_key=True))
if conn.dialect.name != 'mssql':
bop.drop_constraint('pk_xcom', type_='primary')
bop.add_column(Column('id', Integer, nullable=False))
bop.create_primary_key('id', ['id'])
bop.create_index('idx_xcom_dag_task_date', ['dag_id', 'task_id', 'key', 'execution_date'])
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def upgrade():

def downgrade():
"""Unapply Add has_import_errors column to DagModel"""
op.drop_column("dag", "has_import_errors")
with op.batch_alter_table('dag') as batch_op:
batch_op.drop_column('has_import_errors', mssql_drop_default=True)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy import Column, Integer, LargeBinary, MetaData, Table, and_, select

from airflow.migrations.db_types import TIMESTAMP, StringID
from airflow.migrations.utils import get_mssql_table_constraints

# Revision identifiers, used by Alembic.
revision = "c306b5b5ae4a"
Expand All @@ -54,12 +55,12 @@ def _get_new_xcom_columns() -> Sequence[Column]:

def _get_old_xcom_columns() -> Sequence[Column]:
return [
Column("key", StringID(length=512), nullable=False),
Column("key", StringID(length=512), nullable=False, primary_key=True),
Column("value", LargeBinary),
Column("timestamp", TIMESTAMP, nullable=False),
Column("task_id", StringID(), nullable=False),
Column("dag_id", StringID(), nullable=False),
Column("execution_date", StringID(), nullable=False),
Column("task_id", StringID(length=250), nullable=False, primary_key=True),
Column("dag_id", StringID(length=250), nullable=False, primary_key=True),
Column("execution_date", TIMESTAMP, nullable=False, primary_key=True),
]


Expand Down Expand Up @@ -127,6 +128,7 @@ def downgrade():
Basically an inverse operation.
"""
conn = op.get_bind()
op.create_table("__airflow_tmp_xcom", *_get_old_xcom_columns())

xcom = Table("xcom", metadata, *_get_new_xcom_columns())
Expand All @@ -153,4 +155,8 @@ def downgrade():

op.drop_table("xcom")
op.rename_table("__airflow_tmp_xcom", "xcom")
op.create_primary_key("xcom_pkey", "xcom", ["dag_id", "task_id", "execution_date", "key"])
if conn.dialect.name == 'mssql':
constraints = get_mssql_table_constraints(conn, 'xcom')
pk, _ = constraints['PRIMARY KEY'].popitem()
op.drop_constraint(pk, 'xcom', type_='primary')
op.create_primary_key("pk_xcom", "xcom", ["dag_id", "task_id", "execution_date", "key"])
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def upgrade():

def downgrade():
engine = settings.engine
if engine.dialect.has_table(engine, 'task_instance'):
connection = op.get_bind()
connection = op.get_bind()
if engine.dialect.has_table(connection, 'task_instance'):
sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=connection)
dagbag = DagBag(settings.DAGS_FOLDER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ def downgrade():
with op.batch_alter_table("task_reschedule") as batch_op:
batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
batch_op.drop_index("idx_task_reschedule_dag_task_run")
batch_op.drop_column("map_index")
batch_op.drop_column("map_index", mssql_drop_default=True)

op.execute("DELETE FROM task_instance WHERE map_index != -1")

with op.batch_alter_table("task_instance") as batch_op:
batch_op.drop_constraint("task_instance_pkey", type_="primary")
batch_op.drop_column("map_index")
batch_op.drop_column("map_index", mssql_drop_default=True)
batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id"])

with op.batch_alter_table("task_reschedule") as batch_op:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,11 @@ def downgrade():
"""Unapply Make XCom primary key columns non-nullable"""
conn = op.get_bind()
with op.batch_alter_table('xcom') as bop:
if conn.dialect.name == 'mssql':
bop.drop_constraint('pk_xcom', 'primary')

# regardless of what the model defined, the `key` and `execution_date`
# columns were always non-nullable for sqlite and postgres, so leave them alone
# columns were always non-nullable for mysql, sqlite and postgres, so leave them alone

if conn.dialect.name in ['mysql', 'mssql']:
bop.alter_column("key", type_=StringID(length=512), nullable=True)
if conn.dialect.name == 'mssql':
# execution_date wasn't nullable in the other databases
bop.drop_constraint('pk_xcom', 'primary')
# execution_date and key wasn't nullable in the other databases
bop.alter_column("key", type_=StringID(length=512), nullable=True)
bop.alter_column("execution_date", type_=TIMESTAMP, nullable=True)
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ def upgrade():


def downgrade():
conn = op.get_bind()
if conn.dialect.name == "mysql":
op.alter_column(table_name='dag_code', column_name='source_code', type_=mysql.TEXT, nullable=False)
# Do not downgrade to TEXT as it will break data
pass
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ def upgrade():
def downgrade():
"""Remove fk on task instance and model for task log filename template."""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_constraint("task_instance_log_template_id_fkey", type_="foreignkey")
batch_op.drop_column("log_template_id")
op.drop_table("log_template")
Loading

0 comments on commit b153a2e

Please sign in to comment.