Skip to content

Commit

Permalink
Add default task retry delay config (apache#23861)
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit authored May 24, 2022
1 parent ac9b224 commit 64689d6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@
type: string
example: ~
default: "0"
- name: default_task_retry_delay
description: |
The number of seconds each task is going to wait by default between retries. Can be overridden at
dag or task level.
version_added: 2.3.2
type: integer
example: ~
default: "300"
- name: default_task_weight_rule
description: |
The weighting method used for the effective total priority weight of the task
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ dag_ignore_file_syntax = regexp
# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

# The number of seconds each task is going to wait by default between retries. Can be overridden at
# dag or task level.
default_task_retry_delay = 300

# The weighting method used for the effective total priority weight of the task
default_task_weight_rule = downstream

Expand Down
4 changes: 3 additions & 1 deletion airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
DEFAULT_PRIORITY_WEIGHT: int = 1
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0)
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
seconds=conf.getint("core", "default_task_retry_delay", fallback=300)
)
DEFAULT_WEIGHT_RULE: WeightRule = WeightRule(
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,3 +994,26 @@ def __init__(self, value, arg1, **kwargs):
assert op.value == "{{ ds }}", "Should not be templated!"
assert op.arg1 == "{{ ds }}"
assert op.arg2 == "a"


def test_default_retry_delay(dag_maker):
with dag_maker(dag_id='test_default_retry_delay'):
task1 = BaseOperator(task_id='test_no_explicit_retry_delay')

assert task1.retry_delay == timedelta(seconds=300)


def test_dag_level_retry_delay(dag_maker):
with dag_maker(dag_id='test_dag_level_retry_delay', default_args={'retry_delay': timedelta(seconds=100)}):
task1 = BaseOperator(task_id='test_no_explicit_retry_delay')

assert task1.retry_delay == timedelta(seconds=100)


def test_task_level_retry_delay(dag_maker):
with dag_maker(
dag_id='test_task_level_retry_delay', default_args={'retry_delay': timedelta(seconds=100)}
):
task1 = BaseOperator(task_id='test_no_explicit_retry_delay', retry_delay=timedelta(seconds=200))

assert task1.retry_delay == timedelta(seconds=200)

0 comments on commit 64689d6

Please sign in to comment.