diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index a3a0144aa4cf1..33aa1161b2a7d 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -285,11 +285,13 @@ To fix a pylint issue, do the following: 1. Remove module/modules from the `scripts/ci/pylint_todo.txt `__. -2. Run `scripts/ci/ci_pylint.sh `__. +2. Run `scripts/ci/ci_pylint_main.sh `__ and +`scripts/ci/ci_pylint_tests.sh `__. 3. Fix all the issues reported by pylint. -4. Re-run `scripts/ci/ci_pylint.sh `__. +4. Re-run `scripts/ci/ci_pylint_main.sh `__ and +`scripts/ci/ci_pylint_tests.sh `__. 5. If you see "success", submit a PR following `Pull Request guidelines <#pull-request-guidelines>`__. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bc26bc76a754d..d15dc9dd6dbc6 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -131,6 +131,11 @@ sql_alchemy_pool_pre_ping = True # SqlAlchemy supports databases with the concept of multiple schemas. sql_alchemy_schema = +# Import path for connect args in SqlAlchemy. Default to an empty dict. +# This is useful when you want to configure db engine args that SqlAlchemy won't parse in connection string. +# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args +# sql_alchemy_connect_args = + # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation @@ -378,7 +383,7 @@ smtp_mail_from = airflow@example.com [sentry] # Sentry (https://docs.sentry.io) integration -sentry_dsn = +sentry_dsn = [celery] diff --git a/airflow/settings.py b/airflow/settings.py index b618128d04028..2ef2aacd19f70 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -33,6 +33,7 @@ import airflow from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401 from airflow.logging_config import configure_logging +from airflow.utils.module_loading import import_string from airflow.utils.sqlalchemy import setup_event_handlers log = logging.getLogger(__name__) @@ -185,7 +186,14 @@ def configure_orm(disable_connection_pool=False): # For Python2 we get back a newstr and need a str engine_args['encoding'] = engine_args['encoding'].__str__() - engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) + if conf.has_option('core', 'sql_alchemy_connect_args'): + connect_args = import_string( + conf.get('core', 'sql_alchemy_connect_args') + ) + else: + connect_args = {} + + engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) setup_event_handlers(engine) Session = scoped_session( diff --git a/tests/test_sqlalchemy_config.py b/tests/test_sqlalchemy_config.py new file mode 100644 index 0000000000000..b908ce2537889 --- /dev/null +++ b/tests/test_sqlalchemy_config.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from sqlalchemy.pool import NullPool + +from airflow import settings +from tests.compat import patch +from tests.test_utils.config import conf_vars + +SQL_ALCHEMY_CONNECT_ARGS = { + 'test': 43503, + 'dict': { + 'is': 1, + 'supported': 'too' + } +} + + +class TestSqlAlchemySettings(unittest.TestCase): + def setUp(self): + self.old_engine = settings.engine + self.old_session = settings.Session + self.old_conn = settings.SQL_ALCHEMY_CONN + settings.SQL_ALCHEMY_CONN = "mysql+foobar://user:pass@host/dbname?inline=param&another=param" + + def tearDown(self): + settings.engine = self.old_engine + settings.Session = self.old_session + settings.SQL_ALCHEMY_CONN = self.old_conn + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_configure_orm_with_default_values(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + settings.configure_orm() + mock_create_engine.assert_called_once_with( + settings.SQL_ALCHEMY_CONN, + connect_args={}, + encoding='utf-8', + max_overflow=10, + pool_pre_ping=True, + pool_recycle=1800, + pool_size=5 + ) + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_sql_alchemy_connect_args(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + config = { + ('core', 'sql_alchemy_connect_args'): 'tests.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS', + ('core', 'sql_alchemy_pool_enabled'): 'False' + } + with conf_vars(config): + settings.configure_orm() + mock_create_engine.assert_called_once_with( + settings.SQL_ALCHEMY_CONN, + connect_args=SQL_ALCHEMY_CONNECT_ARGS, + poolclass=NullPool, + encoding='utf-8' + ) + + @patch('airflow.settings.setup_event_handlers') + @patch('airflow.settings.scoped_session') + @patch('airflow.settings.sessionmaker') + @patch('airflow.settings.create_engine') + def test_sql_alchemy_invalid_connect_args(self, + mock_create_engine, + mock_sessionmaker, + mock_scoped_session, + mock_setup_event_handlers): + config = { + ('core', 'sql_alchemy_connect_args'): 'does.not.exist', + ('core', 'sql_alchemy_pool_enabled'): 'False' + } + with self.assertRaises(ImportError): + with conf_vars(config): + settings.configure_orm()