Skip to content

Commit

Permalink
[AIRFLOW-6527] Make send_task_to_executor timeout configurable (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqian90 authored Feb 2, 2020
1 parent 63aa3db commit f757a54
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 2 deletions.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,14 @@
type: string
example: ~
default: "prefork"
- name: operation_timeout
description: |
The number of seconds to wait before timing out ``send_task_to_executor`` or
``fetch_celery_task_state`` operations.
version_added: ~
type: int
example: ~
default: "2"
- name: celery_broker_transport_options
description: |
This section is for specifying options which can be passed to the
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ ssl_cacert =
# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html
pool = prefork

# The number of seconds to wait before timing out ``send_task_to_executor`` or
# ``fetch_celery_task_state`` operations.
operation_timeout = 2

[celery_broker_transport_options]

# This section is for specifying options which can be passed to the
Expand Down
6 changes: 4 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'

OPERATION_TIMEOUT = conf.getint('celery', 'operation_timeout', fallback=2)

'''
To start the celery worker, run the command:
airflow celery worker
Expand Down Expand Up @@ -102,7 +104,7 @@ def fetch_celery_task_state(celery_task: Tuple[TaskInstanceKeyType, AsyncResult]
"""

try:
with timeout(seconds=2):
with timeout(seconds=OPERATION_TIMEOUT):
# Accessing state property of celery task will make actual network request
# to get the current state of the task.
return celery_task[0], celery_task[1].state
Expand All @@ -122,7 +124,7 @@ def send_task_to_executor(task_tuple: TaskInstanceInCelery) \
"""Sends task to executor."""
key, _, command, queue, task_to_run = task_tuple
try:
with timeout(seconds=2):
with timeout(seconds=OPERATION_TIMEOUT):
result = task_to_run.apply_async(args=[command], queue=queue)
except Exception as e: # pylint: disable=broad-except
exception_traceback = "Celery Task ID: {}\n{}".format(key, traceback.format_exc())
Expand Down
4 changes: 4 additions & 0 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,9 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
mock_stats_gauge.assert_has_calls(calls)


def test_operation_timeout_config():
assert celery_executor.OPERATION_TIMEOUT == 2


if __name__ == '__main__':
unittest.main()

0 comments on commit f757a54

Please sign in to comment.