From b153a2e506b01dcb4d5d728ec3e2da8e31ccf322 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 4 Mar 2022 18:59:52 +0100 Subject: [PATCH] Add test to run DB downgrade in the CI (#21273) This attempts to add db upgrade/downgrade test to the CI --- .github/workflows/ci.yml | 8 ++ airflow/migrations/utils.py | 43 +++++++++++ ...add_max_active_runs_column_to_dagmodel_.py | 3 +- ...2c6edca13270_resource_based_permissions.py | 2 +- ...3_increase_length_of_email_and_username.py | 40 ++++++++-- ...b18f_added_timetable_description_column.py | 7 +- ...2661a43ba3_taskinstance_keyed_to_dagrun.py | 38 ++-------- ...increase_pool_name_size_in_taskinstance.py | 11 ++- ...b8_add_queued_at_column_to_dagrun_table.py | 3 +- ...5a55525161_increase_length_of_pool_name.py | 1 - ...bbf4a7ad0465_remove_id_column_from_xcom.py | 7 +- ...dd_has_import_errors_column_to_dagmodel.py | 3 +- ...5b5ae4a_switch_xcom_table_to_use_run_id.py | 16 ++-- ...7_add_max_tries_column_to_task_instance.py | 4 +- ..._add_taskmap_and_map_id_on_taskinstance.py | 4 +- ...1f0_make_xcom_pkey_columns_non_nullable.py | 11 +-- ..._change_field_in_dagcode_to_mediumtext_.py | 5 +- ...89_add_task_log_filename_template_model.py | 1 + scripts/ci/libraries/_testing.sh | 38 ++++++++++ .../ci_run_single_airflow_test_in_docker.sh | 37 +--------- scripts/ci/testing/run_downgrade_test.sh | 73 +++++++++++++++++++ 21 files changed, 256 insertions(+), 99 deletions(-) create mode 100644 airflow/migrations/utils.py create mode 100755 scripts/ci/testing/run_downgrade_test.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e951a8982a701..1cbab48546b8c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -766,6 +766,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" run: airflow-freespace - name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}" run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh + - name: "Test downgrade" + run: ./scripts/ci/testing/run_downgrade_test.sh - name: "Tests: ${{needs.build-info.outputs.testTypes}}" run: ./scripts/ci/testing/ci_run_airflow_testing.sh env: @@ -829,6 +831,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" run: airflow-freespace - name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}" run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh + - name: "Test downgrade" + run: ./scripts/ci/testing/run_downgrade_test.sh - name: "Tests: ${{needs.build-info.outputs.testTypes}}" run: ./scripts/ci/testing/ci_run_airflow_testing.sh env: @@ -891,6 +895,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" run: airflow-freespace - name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}" run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh + - name: "Test downgrade" + run: ./scripts/ci/testing/run_downgrade_test.sh - name: "Tests: ${{needs.build-info.outputs.testTypes}}" run: ./scripts/ci/testing/ci_run_airflow_testing.sh env: @@ -951,6 +957,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" run: airflow-freespace - name: "Pull CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}" run: ./scripts/ci/images/ci_pull_ci_image_on_ci.sh + - name: "Test downgrade" + run: ./scripts/ci/testing/run_downgrade_test.sh - name: "Tests: ${{needs.build-info.outputs.testTypes}}" run: ./scripts/ci/testing/ci_run_airflow_testing.sh env: diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py new file mode 100644 index 0000000000000..3294ced3887f1 --- /dev/null +++ b/airflow/migrations/utils.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections import defaultdict + + +def get_mssql_table_constraints(conn, table_name): + """ + This function return primary and unique constraint + along with column name. Some tables like `task_instance` + is missing the primary key constraint name and the name is + auto-generated by the SQL server. so this function helps to + retrieve any primary or unique constraint name. + :param conn: sql connection object + :param table_name: table name + :return: a dictionary of ((constraint name, constraint type), column name) of table + :rtype: defaultdict(list) + """ + query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc + JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME + WHERE tc.TABLE_NAME = '{table_name}' AND + (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE') + """ + result = conn.execute(query).fetchall() + constraint_dict = defaultdict(lambda: defaultdict(list)) + for constraint, constraint_type, col_name in result: + constraint_dict[constraint_type][constraint].append(col_name) + return constraint_dict diff --git a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py index 1b759ac27da2d..49ba327dc00e8 100644 --- a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py +++ b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py @@ -53,7 +53,8 @@ def upgrade(): def downgrade(): """Unapply Add ``max_active_runs`` column to ``dag_model`` table""" - op.drop_column('dag', 'max_active_runs') + with op.batch_alter_table('dag') as batch_op: + batch_op.drop_column('max_active_runs') with op.batch_alter_table('dag_run', schema=None) as batch_op: # Drop index to dag_run.dag_id and also drop index to dag_run.state where state==running batch_op.drop_index('idx_dag_run_dag_id') diff --git a/airflow/migrations/versions/2c6edca13270_resource_based_permissions.py b/airflow/migrations/versions/2c6edca13270_resource_based_permissions.py index 62121a35455a0..6942be25cfd18 100644 --- a/airflow/migrations/versions/2c6edca13270_resource_based_permissions.py +++ b/airflow/migrations/versions/2c6edca13270_resource_based_permissions.py @@ -313,7 +313,7 @@ def remap_permissions(): def undo_remap_permissions(): """Unapply Map Airflow permissions""" appbuilder = create_app(config={'FAB_UPDATE_PERMS': False}).appbuilder - for old, new in mapping.items: + for old, new in mapping.items(): (new_resource_name, new_action_name) = new[0] new_permission = appbuilder.sm.get_permission(new_action_name, new_resource_name) if not new_permission: diff --git a/airflow/migrations/versions/5e3ec427fdd3_increase_length_of_email_and_username.py b/airflow/migrations/versions/5e3ec427fdd3_increase_length_of_email_and_username.py index 54087dfe02d1d..968b5f0162368 100644 --- a/airflow/migrations/versions/5e3ec427fdd3_increase_length_of_email_and_username.py +++ b/airflow/migrations/versions/5e3ec427fdd3_increase_length_of_email_and_username.py @@ -27,6 +27,8 @@ import sqlalchemy as sa from alembic import op +from airflow.migrations.utils import get_mssql_table_constraints + # revision identifiers, used by Alembic. revision = '5e3ec427fdd3' down_revision = '587bdf053233' @@ -47,9 +49,35 @@ def upgrade(): def downgrade(): """Revert length of email from 256 to 64 characters""" - with op.batch_alter_table('ab_user') as batch_op: - batch_op.alter_column('username', type_=sa.String(64)) - batch_op.alter_column('email', type_=sa.String(64)) - with op.batch_alter_table('ab_register_user') as batch_op: - batch_op.alter_column('username', type_=sa.String(64)) - batch_op.alter_column('email', type_=sa.String(64)) + conn = op.get_bind() + if conn.dialect.name != 'mssql': + with op.batch_alter_table('ab_user') as batch_op: + batch_op.alter_column('username', type_=sa.String(64), nullable=False) + batch_op.alter_column('email', type_=sa.String(64)) + with op.batch_alter_table('ab_register_user') as batch_op: + batch_op.alter_column('username', type_=sa.String(64)) + batch_op.alter_column('email', type_=sa.String(64)) + else: + # MSSQL doesn't drop implicit unique constraints it created + # We need to drop the two unique constraints explicitly + with op.batch_alter_table('ab_user') as batch_op: + # Drop the unique constraint on username and email + constraints = get_mssql_table_constraints(conn, 'ab_user') + unique_key, _ = constraints['UNIQUE'].popitem() + batch_op.drop_constraint(unique_key, type_='unique') + unique_key, _ = constraints['UNIQUE'].popitem() + batch_op.drop_constraint(unique_key, type_='unique') + batch_op.alter_column('username', type_=sa.String(64), nullable=False) + batch_op.create_unique_constraint(None, ['username']) + batch_op.alter_column('email', type_=sa.String(64)) + batch_op.create_unique_constraint(None, ['email']) + + with op.batch_alter_table('ab_register_user') as batch_op: + # Drop the unique constraint on username and email + constraints = get_mssql_table_constraints(conn, 'ab_register_user') + for k, _ in constraints.get('UNIQUE').items(): + batch_op.drop_constraint(k, type_='unique') + batch_op.alter_column('username', type_=sa.String(64)) + batch_op.create_unique_constraint(None, ['username']) + batch_op.alter_column('email', type_=sa.String(64)) + batch_op.create_unique_constraint(None, ['email']) diff --git a/airflow/migrations/versions/786e3737b18f_added_timetable_description_column.py b/airflow/migrations/versions/786e3737b18f_added_timetable_description_column.py index ca569cd12b70d..811ece55c54bc 100644 --- a/airflow/migrations/versions/786e3737b18f_added_timetable_description_column.py +++ b/airflow/migrations/versions/786e3737b18f_added_timetable_description_column.py @@ -43,5 +43,10 @@ def upgrade(): def downgrade(): """Unapply Add ``timetable_description`` column to DagModel for UI.""" - with op.batch_alter_table('dag', schema=None) as batch_op: + is_sqlite = bool(op.get_bind().dialect.name == 'sqlite') + if is_sqlite: + op.execute('PRAGMA foreign_keys=off') + with op.batch_alter_table('dag') as batch_op: batch_op.drop_column('timetable_description') + if is_sqlite: + op.execute('PRAGMA foreign_keys=on') diff --git a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py index 924e15c953716..b7bff79edfa6b 100644 --- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py +++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py @@ -24,13 +24,12 @@ """ -from collections import defaultdict - import sqlalchemy as sa from alembic import op from sqlalchemy.sql import and_, column, select, table from airflow.migrations.db_types import TIMESTAMP, StringID +from airflow.migrations.utils import get_mssql_table_constraints ID_LEN = 250 @@ -65,31 +64,6 @@ ) -def get_table_constraints(conn, table_name): - """ - This function return primary and unique constraint - along with column name. Some tables like `task_instance` - is missing the primary key constraint name and the name is - auto-generated by the SQL server. so this function helps to - retrieve any primary or unique constraint name. - :param conn: sql connection object - :param table_name: table name - :return: a dictionary of ((constraint name, constraint type), column name) of table - :rtype: defaultdict(list) - """ - query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME - FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc - JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME - WHERE tc.TABLE_NAME = '{table_name}' AND - (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE') - """ - result = conn.execute(query).fetchall() - constraint_dict = defaultdict(lambda: defaultdict(list)) - for constraint, constraint_type, col_name in result: - constraint_dict[constraint_type][constraint].append(col_name) - return constraint_dict - - def upgrade(): """Apply Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.""" conn = op.get_bind() @@ -246,7 +220,7 @@ def upgrade(): if dialect_name != 'postgresql': # TODO: Is this right for non-postgres? if dialect_name == 'mssql': - constraints = get_table_constraints(conn, "task_instance") + constraints = get_mssql_table_constraints(conn, "task_instance") pk, _ = constraints['PRIMARY KEY'].popitem() batch_op.drop_constraint(pk, type_='primary') elif dialect_name not in ('sqlite'): @@ -337,9 +311,10 @@ def downgrade(): with op.batch_alter_table('task_instance', schema=None) as batch_op: batch_op.drop_constraint('task_instance_pkey', type_='primary') batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False) - batch_op.alter_column( - 'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=True - ) + if dialect_name != 'mssql': + batch_op.alter_column( + 'dag_id', existing_type=string_id_col_type, existing_nullable=False, nullable=True + ) batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'execution_date']) @@ -375,6 +350,7 @@ def downgrade(): batch_op.drop_index('dag_id_state') batch_op.drop_index('idx_dag_run_running_dags') batch_op.drop_index('idx_dag_run_queued_dags') + batch_op.drop_index('idx_dag_run_dag_id') batch_op.alter_column('dag_id', existing_type=string_id_col_type, nullable=True) batch_op.alter_column('execution_date', existing_type=dt_type, nullable=True) diff --git a/airflow/migrations/versions/90d1635d7b86_increase_pool_name_size_in_taskinstance.py b/airflow/migrations/versions/90d1635d7b86_increase_pool_name_size_in_taskinstance.py index bc7d61f859ab3..30e8ca553f8c3 100644 --- a/airflow/migrations/versions/90d1635d7b86_increase_pool_name_size_in_taskinstance.py +++ b/airflow/migrations/versions/90d1635d7b86_increase_pool_name_size_in_taskinstance.py @@ -43,5 +43,12 @@ def upgrade(): def downgrade(): """Unapply Increase maximum length of pool name in ``task_instance`` table to ``256`` characters""" - with op.batch_alter_table('task_instance') as batch_op: - batch_op.alter_column('pool', type_=sa.String(50), nullable=False) + conn = op.get_bind() + if conn.dialect.name == 'mssql': + with op.batch_alter_table('task_instance') as batch_op: + batch_op.drop_index('ti_pool') + batch_op.alter_column('pool', type_=sa.String(50), nullable=False) + batch_op.create_index('ti_pool', ['pool']) + else: + with op.batch_alter_table('task_instance') as batch_op: + batch_op.alter_column('pool', type_=sa.String(50), nullable=False) diff --git a/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py index 7561837c53e99..f40daa1035b23 100644 --- a/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py +++ b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py @@ -44,4 +44,5 @@ def upgrade(): def downgrade(): """Unapply Add ``queued_at`` column in ``dag_run`` table""" - op.drop_column('dag_run', "queued_at") + with op.batch_alter_table('dag_run') as batch_op: + batch_op.drop_column('queued_at') diff --git a/airflow/migrations/versions/b25a55525161_increase_length_of_pool_name.py b/airflow/migrations/versions/b25a55525161_increase_length_of_pool_name.py index b25eae2b3b780..1174c9be6bf90 100644 --- a/airflow/migrations/versions/b25a55525161_increase_length_of_pool_name.py +++ b/airflow/migrations/versions/b25a55525161_increase_length_of_pool_name.py @@ -46,6 +46,5 @@ def upgrade(): def downgrade(): """Revert Increased length of pool name from 256 to 50 characters""" - # use batch_alter_table to support SQLite workaround with op.batch_alter_table('slot_pool', table_args=sa.UniqueConstraint('pool')) as batch_op: batch_op.alter_column('pool', type_=sa.String(50)) diff --git a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py index b1abe27472346..a588af5c53917 100644 --- a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py +++ b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py @@ -115,7 +115,10 @@ def upgrade(): def downgrade(): """Unapply Remove id column from xcom""" + conn = op.get_bind() with op.batch_alter_table('xcom') as bop: - bop.drop_constraint('pk_xcom', type_='primary') - bop.add_column(Column('id', Integer, primary_key=True)) + if conn.dialect.name != 'mssql': + bop.drop_constraint('pk_xcom', type_='primary') + bop.add_column(Column('id', Integer, nullable=False)) + bop.create_primary_key('id', ['id']) bop.create_index('idx_xcom_dag_task_date', ['dag_id', 'task_id', 'key', 'execution_date']) diff --git a/airflow/migrations/versions/be2bfac3da23_add_has_import_errors_column_to_dagmodel.py b/airflow/migrations/versions/be2bfac3da23_add_has_import_errors_column_to_dagmodel.py index 3758805d068f4..d401e241b862d 100644 --- a/airflow/migrations/versions/be2bfac3da23_add_has_import_errors_column_to_dagmodel.py +++ b/airflow/migrations/versions/be2bfac3da23_add_has_import_errors_column_to_dagmodel.py @@ -42,4 +42,5 @@ def upgrade(): def downgrade(): """Unapply Add has_import_errors column to DagModel""" - op.drop_column("dag", "has_import_errors") + with op.batch_alter_table('dag') as batch_op: + batch_op.drop_column('has_import_errors', mssql_drop_default=True) diff --git a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py index 11675731f436f..951c0a6c89bf3 100644 --- a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py +++ b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py @@ -28,6 +28,7 @@ from sqlalchemy import Column, Integer, LargeBinary, MetaData, Table, and_, select from airflow.migrations.db_types import TIMESTAMP, StringID +from airflow.migrations.utils import get_mssql_table_constraints # Revision identifiers, used by Alembic. revision = "c306b5b5ae4a" @@ -54,12 +55,12 @@ def _get_new_xcom_columns() -> Sequence[Column]: def _get_old_xcom_columns() -> Sequence[Column]: return [ - Column("key", StringID(length=512), nullable=False), + Column("key", StringID(length=512), nullable=False, primary_key=True), Column("value", LargeBinary), Column("timestamp", TIMESTAMP, nullable=False), - Column("task_id", StringID(), nullable=False), - Column("dag_id", StringID(), nullable=False), - Column("execution_date", StringID(), nullable=False), + Column("task_id", StringID(length=250), nullable=False, primary_key=True), + Column("dag_id", StringID(length=250), nullable=False, primary_key=True), + Column("execution_date", TIMESTAMP, nullable=False, primary_key=True), ] @@ -127,6 +128,7 @@ def downgrade(): Basically an inverse operation. """ + conn = op.get_bind() op.create_table("__airflow_tmp_xcom", *_get_old_xcom_columns()) xcom = Table("xcom", metadata, *_get_new_xcom_columns()) @@ -153,4 +155,8 @@ def downgrade(): op.drop_table("xcom") op.rename_table("__airflow_tmp_xcom", "xcom") - op.create_primary_key("xcom_pkey", "xcom", ["dag_id", "task_id", "execution_date", "key"]) + if conn.dialect.name == 'mssql': + constraints = get_mssql_table_constraints(conn, 'xcom') + pk, _ = constraints['PRIMARY KEY'].popitem() + op.drop_constraint(pk, 'xcom', type_='primary') + op.create_primary_key("pk_xcom", "xcom", ["dag_id", "task_id", "execution_date", "key"]) diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py index f3d1bd8891e67..7685b77afd04d 100644 --- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -101,8 +101,8 @@ def upgrade(): def downgrade(): engine = settings.engine - if engine.dialect.has_table(engine, 'task_instance'): - connection = op.get_bind() + connection = op.get_bind() + if engine.dialect.has_table(connection, 'task_instance'): sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=connection) dagbag = DagBag(settings.DAGS_FOLDER) diff --git a/airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py b/airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py index acd4b81e19a52..741c02d0ff6fb 100644 --- a/airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py +++ b/airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py @@ -102,13 +102,13 @@ def downgrade(): with op.batch_alter_table("task_reschedule") as batch_op: batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey") batch_op.drop_index("idx_task_reschedule_dag_task_run") - batch_op.drop_column("map_index") + batch_op.drop_column("map_index", mssql_drop_default=True) op.execute("DELETE FROM task_instance WHERE map_index != -1") with op.batch_alter_table("task_instance") as batch_op: batch_op.drop_constraint("task_instance_pkey", type_="primary") - batch_op.drop_column("map_index") + batch_op.drop_column("map_index", mssql_drop_default=True) batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id"]) with op.batch_alter_table("task_reschedule") as batch_op: diff --git a/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py index 7873c57b09ff9..45a4559056418 100644 --- a/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py +++ b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py @@ -49,14 +49,11 @@ def downgrade(): """Unapply Make XCom primary key columns non-nullable""" conn = op.get_bind() with op.batch_alter_table('xcom') as bop: - if conn.dialect.name == 'mssql': - bop.drop_constraint('pk_xcom', 'primary') - # regardless of what the model defined, the `key` and `execution_date` - # columns were always non-nullable for sqlite and postgres, so leave them alone + # columns were always non-nullable for mysql, sqlite and postgres, so leave them alone - if conn.dialect.name in ['mysql', 'mssql']: - bop.alter_column("key", type_=StringID(length=512), nullable=True) if conn.dialect.name == 'mssql': - # execution_date wasn't nullable in the other databases + bop.drop_constraint('pk_xcom', 'primary') + # execution_date and key wasn't nullable in the other databases + bop.alter_column("key", type_=StringID(length=512), nullable=True) bop.alter_column("execution_date", type_=TIMESTAMP, nullable=True) diff --git a/airflow/migrations/versions/e959f08ac86c_change_field_in_dagcode_to_mediumtext_.py b/airflow/migrations/versions/e959f08ac86c_change_field_in_dagcode_to_mediumtext_.py index 1faa9f336e20b..2b905c8c2916d 100644 --- a/airflow/migrations/versions/e959f08ac86c_change_field_in_dagcode_to_mediumtext_.py +++ b/airflow/migrations/versions/e959f08ac86c_change_field_in_dagcode_to_mediumtext_.py @@ -43,6 +43,5 @@ def upgrade(): def downgrade(): - conn = op.get_bind() - if conn.dialect.name == "mysql": - op.alter_column(table_name='dag_code', column_name='source_code', type_=mysql.TEXT, nullable=False) + # Do not downgrade to TEXT as it will break data + pass diff --git a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py index 51d2a2b63fe7a..a2511a398da08 100644 --- a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py +++ b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py @@ -57,5 +57,6 @@ def upgrade(): def downgrade(): """Remove fk on task instance and model for task log filename template.""" with op.batch_alter_table("dag_run") as batch_op: + batch_op.drop_constraint("task_instance_log_template_id_fkey", type_="foreignkey") batch_op.drop_column("log_template_id") op.drop_table("log_template") diff --git a/scripts/ci/libraries/_testing.sh b/scripts/ci/libraries/_testing.sh index 6b387e70d53eb..7741c4ad40c57 100644 --- a/scripts/ci/libraries/_testing.sh +++ b/scripts/ci/libraries/_testing.sh @@ -128,3 +128,41 @@ function testing::dump_container_logs() { echo "${COLOR_BLUE}###########################################################################################${COLOR_RESET}" start_end::group_end } + +function testing::setup_docker_compose_backend() { + local TEST_TYPE + TEST_TYPE="${1}" + if [[ ${BACKEND} == "mssql" ]]; then + local backend_docker_compose=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-${BACKEND}-${DEBIAN_VERSION}.yml") + local docker_filesystem + docker_filesystem=$(stat "-f" "-c" "%T" /var/lib/docker 2>/dev/null || echo "unknown") + if [[ ${docker_filesystem} == "tmpfs" ]]; then + # In case of tmpfs backend for docker, mssql fails because TMPFS does not support + # O_DIRECT parameter for direct writing to the filesystem + # https://github.com/microsoft/mssql-docker/issues/13 + # so we need to mount an external volume for its db location + # the external db must allow for parallel testing so TEST_TYPE + # is added to the volume name + export MSSQL_DATA_VOLUME="${HOME}/tmp-mssql-volume-${TEST_TYPE}-${MSSQL_VERSION}" + mkdir -p "${MSSQL_DATA_VOLUME}" + # MSSQL 2019 runs with non-root user by default so we have to make the volumes world-writeable + # This is a bit scary and we could get by making it group-writeable but the group would have + # to be set to "root" (GID=0) for the volume to work and this cannot be accomplished without sudo + chmod a+rwx "${MSSQL_DATA_VOLUME}" + backend_docker_compose+=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-mssql-bind-volume.yml") + + # Runner user doesn't have blanket sudo access, but we can run docker as root. Go figure + traps::add_trap "docker run -u 0 --rm -v ${MSSQL_DATA_VOLUME}:/mssql alpine sh -c 'rm -rvf -- /mssql/.* /mssql/*' || true" EXIT + + # Clean up at start too, in case a previous runner left it messy + docker run --rm -u 0 -v "${MSSQL_DATA_VOLUME}":/mssql alpine sh -c 'rm -rfv -- /mssql/.* /mssql/*' || true + export BACKEND_DOCKER_COMPOSE=("${backend_docker_compose[@]}") + else + backend_docker_compose+=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-mssql-docker-volume.yml") + export BACKEND_DOCKER_COMPOSE=("${backend_docker_compose[@]}") + fi + else + local backend_docker_compose=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-${BACKEND}.yml") + export BACKEND_DOCKER_COMPOSE=("${backend_docker_compose[@]}") + fi +} diff --git a/scripts/ci/testing/ci_run_single_airflow_test_in_docker.sh b/scripts/ci/testing/ci_run_single_airflow_test_in_docker.sh index b60d3d1f8c4ba..d5db23fa7410a 100755 --- a/scripts/ci/testing/ci_run_single_airflow_test_in_docker.sh +++ b/scripts/ci/testing/ci_run_single_airflow_test_in_docker.sh @@ -73,6 +73,7 @@ function prepare_tests() { echo "**********************************************************************************************" } + # Runs airflow testing in docker container # You need to set variable TEST_TYPE - test type to run # "${@}" - extra arguments to pass to docker command @@ -83,38 +84,7 @@ function run_airflow_testing_in_docker() { echo echo "Semaphore grabbed. Running tests for ${TEST_TYPE}" echo - if [[ ${BACKEND} == "mssql" ]]; then - local backend_docker_compose=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-${BACKEND}-${DEBIAN_VERSION}.yml") - else - local backend_docker_compose=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-${BACKEND}.yml") - fi - if [[ ${BACKEND} == "mssql" ]]; then - local docker_filesystem - docker_filesystem=$(stat "-f" "-c" "%T" /var/lib/docker 2>/dev/null || echo "unknown") - if [[ ${docker_filesystem} == "tmpfs" ]]; then - # In case of tmpfs backend for docker, mssql fails because TMPFS does not support - # O_DIRECT parameter for direct writing to the filesystem - # https://github.com/microsoft/mssql-docker/issues/13 - # so we need to mount an external volume for its db location - # the external db must allow for parallel testing so TEST_TYPE - # is added to the volume name - export MSSQL_DATA_VOLUME="${HOME}/tmp-mssql-volume-${TEST_TYPE}-${MSSQL_VERSION}" - mkdir -p "${MSSQL_DATA_VOLUME}" - # MSSQL 2019 runs with non-root user by default so we have to make the volumes world-writeable - # This is a bit scary and we could get by making it group-writeable but the group would have - # to be set to "root" (GID=0) for the volume to work and this cannot be accomplished without sudo - chmod a+rwx "${MSSQL_DATA_VOLUME}" - backend_docker_compose+=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-mssql-bind-volume.yml") - - # Runner user doesn't have blanket sudo access, but we can run docker as root. Go figure - traps::add_trap "docker run -u 0 --rm -v ${MSSQL_DATA_VOLUME}:/mssql alpine sh -c 'rm -rvf -- /mssql/.* /mssql/*' || true" EXIT - - # Clean up at start too, in case a previous runner left it messy - docker run --rm -u 0 -v "${MSSQL_DATA_VOLUME}":/mssql alpine sh -c 'rm -rfv -- /mssql/.* /mssql/*' || true - else - backend_docker_compose+=("-f" "${SCRIPTS_CI_DIR}/docker-compose/backend-mssql-docker-volume.yml") - fi - fi + echo "Making sure docker-compose is down and remnants removed" echo docker-compose -f "${SCRIPTS_CI_DIR}/docker-compose/base.yml" \ @@ -124,7 +94,7 @@ function run_airflow_testing_in_docker() { --volumes --timeout 10 docker-compose --log-level INFO \ -f "${SCRIPTS_CI_DIR}/docker-compose/base.yml" \ - "${backend_docker_compose[@]}" \ + "${BACKEND_DOCKER_COMPOSE[@]}" \ "${INTEGRATIONS[@]}" \ "${DOCKER_COMPOSE_LOCAL[@]}" \ --project-name "airflow-${TEST_TYPE}-${BACKEND}" \ @@ -207,4 +177,5 @@ function run_airflow_testing_in_docker() { prepare_tests +testing::setup_docker_compose_backend "${TEST_TYPE}" run_airflow_testing_in_docker "${@}" diff --git a/scripts/ci/testing/run_downgrade_test.sh b/scripts/ci/testing/run_downgrade_test.sh new file mode 100755 index 0000000000000..0f94c6f445378 --- /dev/null +++ b/scripts/ci/testing/run_downgrade_test.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# shellcheck source=scripts/ci/libraries/_script_init.sh +. "$( dirname "${BASH_SOURCE[0]}" )/../libraries/_script_init.sh" + +function run_db_downgrade() { + set +u + set +e + local exit_code + echo + echo "Semaphore grabbed. Running upgrade and downgrade tests for ${BACKEND}" + echo + echo "Making sure docker-compose is down and remnants removed" + echo + docker-compose -f "${SCRIPTS_CI_DIR}/docker-compose/base.yml" \ + --project-name "airflow-downgrade-${BACKEND}" \ + down --remove-orphans \ + --volumes --timeout 10 + docker-compose --log-level INFO \ + -f "${SCRIPTS_CI_DIR}/docker-compose/base.yml" \ + "${BACKEND_DOCKER_COMPOSE[@]}" \ + "${DOCKER_COMPOSE_LOCAL[@]}" \ + --project-name "airflow-downgrade-${BACKEND}" \ + run airflow -c "airflow db downgrade -r e959f08ac86c -y" + exit_code=$? + docker ps + if [[ ${exit_code} != "0" && ${CI} == "true" ]]; then + docker ps --all + local container + for container in $(docker ps --all --format '{{.Names}}') + do + testing::dump_container_logs "${container}" + done + fi + + docker-compose --log-level INFO -f "${SCRIPTS_CI_DIR}/docker-compose/base.yml" \ + --project-name "airflow-downgrade-${BACKEND}" \ + down --remove-orphans \ + --volumes --timeout 10 + set -u + set -e + if [[ ${exit_code} == 0 ]]; then + echo + echo "${COLOR_GREEN}Test downgrade succeeded.${COLOR_RESET}" + else + echo + echo "${COLOR_RED}Test downgrade failed.${COLOR_RESET}" + fi + return "${exit_code}" +} + + +build_images::prepare_ci_build + +build_images::rebuild_ci_image_if_needed_with_group +testing::get_docker_compose_local +testing::setup_docker_compose_backend "downgrade" +run_db_downgrade "${@}"