diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 2d6168bc49f7a..3b339ba71a1d8 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -104,6 +104,11 @@ sql_alchemy_pool_recycle = 1800 # disconnects. Setting this to 0 disables retries. sql_alchemy_reconnect_timeout = 300 +# Import path for connect args in SqlAlchemy. Default to an empty dict. +# This is useful when you want to configure db engine args that SqlAlchemy won't parse in connection string. +# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args +# sql_alchemy_connect_args = + # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation diff --git a/airflow/settings.py b/airflow/settings.py index d9a5f607102a0..bd23b3c5fb7db 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -33,6 +33,7 @@ from airflow import configuration as conf from airflow.logging_config import configure_logging +from airflow.utils.module_loading import import_string from airflow.utils.sqlalchemy import setup_event_handlers log = logging.getLogger(__name__) @@ -175,7 +176,14 @@ def configure_orm(disable_connection_pool=False): engine_args['pool_size'] = pool_size engine_args['pool_recycle'] = pool_recycle - engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) + if conf.has_option('core', 'sql_alchemy_connect_args'): + connect_args = import_string( + conf.get('core', 'sql_alchemy_connect_args') + ) + else: + connect_args = {} + + engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT') setup_event_handlers(engine, reconnect_timeout) diff --git a/airflow/version.py b/airflow/version.py index 753f3d309bc54..7d5283af274ef 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0+twtr20' +version = '1.10.0+twtr21' diff --git a/tests/test_sqlalchemy_config.py b/tests/test_sqlalchemy_config.py new file mode 100644 index 0000000000000..b908ce2537889 --- /dev/null +++ b/tests/test_sqlalchemy_config.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from sqlalchemy.pool import NullPool + +from airflow import settings +from tests.compat import patch +from tests.test_utils.config import conf_vars + +SQL_ALCHEMY_CONNECT_ARGS = { + 'test': 43503, + 'dict': { + 'is': 1, + 'supported': 'too' + } +} + + +class TestSqlAlchemySettings(unittest.TestCase): + def setUp(self): + self.old_engine = settings.engine + self.old_session = settings.Session + self.old_conn = settings.SQL_ALCHEMY_CONN + settings.SQL_ALCHEMY_CONN = "mysql+foobar://user:pass@host/dbname?inline=param&another=param" + + def tearDown(self): + settings.engine = self.old_engine + settings.Session = self.old_session + settings.SQL_ALCHEMY_CONN = self.old_conn + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_configure_orm_with_default_values(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + settings.configure_orm() + mock_create_engine.assert_called_once_with( + settings.SQL_ALCHEMY_CONN, + connect_args={}, + encoding='utf-8', + max_overflow=10, + pool_pre_ping=True, + pool_recycle=1800, + pool_size=5 + ) + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_sql_alchemy_connect_args(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + config = { + ('core', 'sql_alchemy_connect_args'): 'tests.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS', + ('core', 'sql_alchemy_pool_enabled'): 'False' + } + with conf_vars(config): + settings.configure_orm() + mock_create_engine.assert_called_once_with( + settings.SQL_ALCHEMY_CONN, + connect_args=SQL_ALCHEMY_CONNECT_ARGS, + poolclass=NullPool, + encoding='utf-8' + ) + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_sql_alchemy_invalid_connect_args(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + config = { + ('core', 'sql_alchemy_connect_args'): 'does.not.exist', + ('core', 'sql_alchemy_pool_enabled'): 'False' + } + with self.assertRaises(ImportError): + with conf_vars(config): + settings.configure_orm()