From 7f48924928461e7dc8d51ce47314ff098c316c1e Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 7 Oct 2022 17:37:55 +0100 Subject: [PATCH] Add missing AUTOINC/SERIAL for FAB tables (#26885) * Add missing AUTOINC/SERIAL for FAB tables In 1.10.13 we introduced a migration that creates the tables with the server_default but that migration only did anything if the tables didn't already exist. But the tables created by the FAB model have a default (but not a server_default). Oh, and the final bit of the puzzle, in 2.4 we finally "took control" of the FAB security models in to airflow and those do not have the default set. * Update airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py * Fix static checks * Run migrations with with a pool of a connection. Without this `create_session()` will open a new connection, and that causes mysql to hang waiting to get a "metadata lock on table". Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that instead times out if you try to get a new connection from the pool. SingletonThreadPool instead returns the existing active connection which is what we want. Co-authored-by: Tzu-ping Chung (cherry picked from commit 7efdeed5eccbf5cb709af40c8c66757e59c957ed) --- airflow/migrations/env.py | 8 +- .../0118_2_4_2_add_missing_autoinc_fab.py | 78 +++++++++++++++++++ airflow/settings.py | 15 ++-- airflow/utils/db.py | 17 +++- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/migrations-ref.rst | 4 +- 6 files changed, 113 insertions(+), 11 deletions(-) create mode 100644 airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 6474f0799c797..9dcd29e9ca0b5 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import contextlib from logging.config import fileConfig from alembic import context @@ -89,9 +90,12 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = settings.engine + with contextlib.ExitStack() as stack: + connection = config.attributes.get('connection', None) + + if not connection: + connection = stack.push(settings.engine.connect()) - with connectable.connect() as connection: context.configure( connection=connection, transaction_per_migration=True, diff --git a/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py new file mode 100644 index 0000000000000..f6becd9dfe64a --- /dev/null +++ b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add missing auto-increment to columns on FAB tables + +Revision ID: b0d31815b5a6 +Revises: ecb43d2a1842 +Create Date: 2022-10-05 13:16:45.638490 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'b0d31815b5a6' +down_revision = 'ecb43d2a1842' +branch_labels = None +depends_on = None +airflow_version = '2.4.2' + + +def upgrade(): + """Apply migration. + + If these columns are already of the right type (i.e. created by our + migration in 1.10.13 rather than FAB itself in an earlier version), this + migration will issue an alter statement to change them to what they already + are -- i.e. its a no-op. + + These tables are small (100 to low 1k rows at most), so it's not too costly + to change them. + """ + conn = op.get_bind() + if conn.dialect.name in ['mssql', 'sqlite']: + # 1.10.12 didn't support SQL Server, so it couldn't have gotten this wrong --> nothing to correct + # SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY + return + + for table in ( + 'ab_permission', + 'ab_view_menu', + 'ab_role', + 'ab_permission_view', + 'ab_permission_view_role', + 'ab_user', + 'ab_user_role', + 'ab_register_user', + ): + with op.batch_alter_table(table) as batch: + kwargs = {} + if conn.dialect.name == 'postgresql': + kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value() + else: + kwargs['autoincrement'] = True + batch.alter_column("id", existing_type=sa.Integer(), existing_nullable=False, **kwargs) + + +def downgrade(): + """Unapply add_missing_autoinc_fab""" + # No downgrade needed, these _should_ have applied from 1.10.13 but didn't due to a previous bug! diff --git a/airflow/settings.py b/airflow/settings.py index 10e963d5f760d..696b4b652ae25 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -259,14 +259,14 @@ def configure_vars(): DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False) -def configure_orm(disable_connection_pool=False): +def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy""" from airflow.utils.log.secrets_masker import mask_secret log.debug("Setting up DB connection pool (PID %s)", os.getpid()) global engine global Session - engine_args = prepare_engine_args(disable_connection_pool) + engine_args = prepare_engine_args(disable_connection_pool, pool_class) if conf.has_option('database', 'sql_alchemy_connect_args'): connect_args = conf.getimport('database', 'sql_alchemy_connect_args') @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False): } -def prepare_engine_args(disable_connection_pool=False): +def prepare_engine_args(disable_connection_pool=False, pool_class=None): """Prepare SQLAlchemy engine args""" default_args = {} for dialect, default in DEFAULT_ENGINE_ARGS.items(): @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False): 'database', 'sql_alchemy_engine_args', fallback=default_args ) # type: ignore - if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): + if pool_class: + # Don't use separate settings for size etc, only those from sql_alchemy_engine_args + engine_args['poolclass'] = pool_class + elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): engine_args['poolclass'] = NullPool log.debug("settings.prepare_engine_args(): Using NullPool") elif not SQL_ALCHEMY_CONN.startswith('sqlite'): @@ -413,10 +416,10 @@ def dispose_orm(): engine = None -def reconfigure_orm(disable_connection_pool=False): +def reconfigure_orm(disable_connection_pool=False, pool_class=None): """Properly close database connections and re-configure ORM""" dispose_orm() - configure_orm(disable_connection_pool=disable_connection_pool) + configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) def configure_adapters(): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b2ff28f68ad86..e9a5c8f0bb735 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1527,8 +1527,23 @@ def upgradedb( initdb(session=session, load_connections=False) return with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + import sqlalchemy.pool + log.info("Creating tables") - command.upgrade(config, revision=to_revision or 'heads') + val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + try: + # Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever + # trying to ALTER TABLEs + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1' + settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool) + command.upgrade(config, revision=to_revision or 'heads') + finally: + if val is None: + os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + else: + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val + settings.reconfigure_orm() + reserialize_dags(session=session) add_default_pool_if_not_exists(session=session) synchronize_log_template(session=session) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index d8d6035893ea5..1c82ebf2bf659 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -543fe0db520047b59d9d036b17d54b07f1031a2b6ef1f46dcd2ae970f14ab0e6 \ No newline at end of file +030fee7fdf6b154b107a12331400a26843f4ebb4abaab84566aeca7c26d79450 \ No newline at end of file diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index fa1606946f989..0a42c7e5d30e5 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``ecb43d2a1842`` (head) | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel | +| ``b0d31815b5a6`` (head) | ``ecb43d2a1842`` | ``2.4.2`` | Add missing auto-increment to columns on FAB tables | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``ecb43d2a1842`` | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel | | | | | and CallbackRequest tables. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``1486deb605b4`` | ``f4ff391becb5`` | ``2.4.0`` | add dag_owner_attributes table |