From 64689d61d8fa89efc2a01ec2add5915d2847517d Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Tue, 24 May 2022 15:06:34 +0530 Subject: [PATCH] Add default task retry delay config (#23861) --- airflow/config_templates/config.yml | 8 +++++++ airflow/config_templates/default_airflow.cfg | 4 ++++ airflow/models/abstractoperator.py | 4 +++- tests/models/test_baseoperator.py | 23 ++++++++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b5ca6a7d80c63..3309a3b0a58eb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -249,6 +249,14 @@ type: string example: ~ default: "0" + - name: default_task_retry_delay + description: | + The number of seconds each task is going to wait by default between retries. Can be overridden at + dag or task level. + version_added: 2.3.2 + type: integer + example: ~ + default: "300" - name: default_task_weight_rule description: | The weighting method used for the effective total priority weight of the task diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 07af45aecb683..14351e49b89b0 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -147,6 +147,10 @@ dag_ignore_file_syntax = regexp # The number of retries each task is going to have by default. Can be overridden at dag or task level. default_task_retries = 0 +# The number of seconds each task is going to wait by default between retries. Can be overridden at +# dag or task level. +default_task_retry_delay = 300 + # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 8d2e06442a2e5..6187c37182950 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -62,7 +62,9 @@ DEFAULT_PRIORITY_WEIGHT: int = 1 DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0) -DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300) +DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta( + seconds=conf.getint("core", "default_task_retry_delay", fallback=300) +) DEFAULT_WEIGHT_RULE: WeightRule = WeightRule( conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) ) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 8cb8d96e813a8..5ba271a5a136a 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -994,3 +994,26 @@ def __init__(self, value, arg1, **kwargs): assert op.value == "{{ ds }}", "Should not be templated!" assert op.arg1 == "{{ ds }}" assert op.arg2 == "a" + + +def test_default_retry_delay(dag_maker): + with dag_maker(dag_id='test_default_retry_delay'): + task1 = BaseOperator(task_id='test_no_explicit_retry_delay') + + assert task1.retry_delay == timedelta(seconds=300) + + +def test_dag_level_retry_delay(dag_maker): + with dag_maker(dag_id='test_dag_level_retry_delay', default_args={'retry_delay': timedelta(seconds=100)}): + task1 = BaseOperator(task_id='test_no_explicit_retry_delay') + + assert task1.retry_delay == timedelta(seconds=100) + + +def test_task_level_retry_delay(dag_maker): + with dag_maker( + dag_id='test_task_level_retry_delay', default_args={'retry_delay': timedelta(seconds=100)} + ): + task1 = BaseOperator(task_id='test_no_explicit_retry_delay', retry_delay=timedelta(seconds=200)) + + assert task1.retry_delay == timedelta(seconds=200)