Skip to content

Commit

Permalink
[TWTR] CP from 1.10+twtr (twitter-forks#35)
Browse files Browse the repository at this point in the history
* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (twitter-forks#21)

* CP 51b1aee: Relax version requiremets (twitter-forks#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (twitter-forks#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (twitter-forks#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (twitter-forks#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (twitter-forks#31)

* fixing models.py and jobs.py file fix after CP

* fixing typo and version bump

Co-authored-by: Vishesh Jain <[email protected]>
  • Loading branch information
2 people authored and K Sampreeth Prem committed Apr 25, 2022
1 parent d99ccc0 commit d38f2ba
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 4 deletions.
7 changes: 4 additions & 3 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ def queue_command(
queue: Optional[str] = None,
):
"""Queues command to task"""
if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
else:
self.log.error("could not queue task %s", task_instance.key)
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, task_instance)

def queue_task_instance(
self,
Expand Down
7 changes: 7 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,17 @@ def _do_scheduling(self, session) -> int:
By "next oldest", we mean hasn't been examined/scheduled in the most time.
<<<<<<< HEAD
The reason we don't select all dagruns at once because the rows are selected with row locks, meaning
that only one scheduler can "process them", even it is waiting behind other dags. Increasing this
limit will allow more throughput for smaller DAGs but will likely slow down throughput for larger
(>500 tasks.) DAGs
=======
# Stop any processors
self.log.info("Terminating DAG processors")
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")
>>>>>>> 299b4d883... [TWTR] CP from 1.10+twtr (#35)
- Then, via a Critical Section (locking the rows of the Pool model) we queue tasks, and then send them
to the executor.
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def __init__(
email: Optional[Union[str, Iterable[str]]] = None,
email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True),
email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True),
retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0),
retries: Optional[int] = int(conf.get('core', 'default_task_retries', fallback=0)),
retry_delay: timedelta = timedelta(seconds=300),
retry_exponential_backoff: bool = False,
max_retry_delay: Optional[timedelta] = None,
Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,7 @@ def _run_raw_task(
return
except AirflowSkipException as e:
# Recording SKIP
# This change is in reference to [AIRFLOW-5653][CX-16266]
# log only if exception has any arguments to prevent log flooding
if e.args:
self.log.info(e)
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def _init_file(self, filename):
Path(directory).mkdir(parents=True, exist_ok=True)

if not os.path.exists(log_file_path):
logging.info("Creating file {}".format(log_file_path))
open(log_file_path, "a").close()

return log_file_path
106 changes: 106 additions & 0 deletions tests/test_sqlalchemy_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

from sqlalchemy.pool import NullPool

from airflow import settings
from tests.compat import patch
from tests.test_utils.config import conf_vars

SQL_ALCHEMY_CONNECT_ARGS = {
'test': 43503,
'dict': {
'is': 1,
'supported': 'too'
}
}


class TestSqlAlchemySettings(unittest.TestCase):
def setUp(self):
self.old_engine = settings.engine
self.old_session = settings.Session
self.old_conn = settings.SQL_ALCHEMY_CONN
settings.SQL_ALCHEMY_CONN = "mysql+foobar://user:pass@host/dbname?inline=param&another=param"

def tearDown(self):
settings.engine = self.old_engine
settings.Session = self.old_session
settings.SQL_ALCHEMY_CONN = self.old_conn

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_configure_orm_with_default_values(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
settings.configure_orm()
mock_create_engine.assert_called_once_with(
settings.SQL_ALCHEMY_CONN,
connect_args={},
encoding='utf-8',
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
pool_size=5
)

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_sql_alchemy_connect_args(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
config = {
('core', 'sql_alchemy_connect_args'): 'tests.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS',
('core', 'sql_alchemy_pool_enabled'): 'False'
}
with conf_vars(config):
settings.configure_orm()
mock_create_engine.assert_called_once_with(
settings.SQL_ALCHEMY_CONN,
connect_args=SQL_ALCHEMY_CONNECT_ARGS,
poolclass=NullPool,
encoding='utf-8'
)

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_sql_alchemy_invalid_connect_args(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
config = {
('core', 'sql_alchemy_connect_args'): 'does.not.exist',
('core', 'sql_alchemy_pool_enabled'): 'False'
}
with self.assertRaises(ImportError):
with conf_vars(config):
settings.configure_orm()

0 comments on commit d38f2ba

Please sign in to comment.