diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index bed39158cc2..e59a1e6a673 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -1438,7 +1438,7 @@ We are using certain prefixes for email subjects for different purposes. Start y Voting is governed by the rules described in `Voting `_ We are all devoting our time for community as individuals who except for being active in Apache Airflow have -families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are +families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are busy with day-to-day duties that our response time might be delayed. For us it's crucial to remember to respect each other in the project with no formal structure. There are no managers, departments, most of us is autonomous in our opinions, decisions. diff --git a/UPDATING.md b/UPDATING.md index 8b7267f8cd2..3f6afa3d992 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -245,7 +245,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`. ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, concurrency=3, ) @@ -256,7 +256,7 @@ dag = DAG( ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, max_active_tasks=3, ) @@ -3329,7 +3329,7 @@ Type "help", "copyright", "credits" or "license" for more information. >>> from airflow.models.dag import DAG >>> from airflow.operators.dummy import DummyOperator >>> ->>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1)) +>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC")) >>> >>> task = DummyOperator(task_id='task_1', dag=dag) >>> diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index f679f8d8753..82045922203 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -18,7 +18,9 @@ """Example DAG demonstrating the usage of the BashOperator.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -27,9 +29,9 @@ with DAG( dag_id='example_bash_operator', schedule_interval='0 0 * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) as dag: diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py index bdc50ca4368..76b109fb688 100644 --- a/airflow/example_dags/example_branch_datetime_operator.py +++ b/airflow/example_dags/example_branch_datetime_operator.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as targets. """ -import datetime +import pendulum from airflow import DAG from airflow.operators.datetime import BranchDateTimeOperator @@ -28,7 +28,7 @@ dag = DAG( dag_id="example_branch_datetime_operator", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -42,8 +42,8 @@ task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0), - target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0), + target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0), + target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0), dag=dag, ) @@ -54,7 +54,7 @@ dag = DAG( dag_id="example_branch_datetime_operator_2", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -67,8 +67,8 @@ task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.time(0, 0, 0), - target_lower=datetime.time(15, 0, 0), + target_upper=pendulum.time(0, 0, 0), + target_lower=pendulum.time(15, 0, 0), dag=dag, ) diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py index 6d1a33117cf..dae303a9035 100644 --- a/airflow/example_dags/example_branch_day_of_week_operator.py +++ b/airflow/example_dags/example_branch_day_of_week_operator.py @@ -19,7 +19,7 @@ """ Example DAG demonstrating the usage of BranchDayOfWeekOperator. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -27,7 +27,7 @@ with DAG( dag_id="example_weekday_branch_operator", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", diff --git a/airflow/example_dags/example_branch_labels.py b/airflow/example_dags/example_branch_labels.py index bd6ce098198..2215bcfe19c 100644 --- a/airflow/example_dags/example_branch_labels.py +++ b/airflow/example_dags/example_branch_labels.py @@ -19,14 +19,17 @@ """ Example DAG demonstrating the usage of labels with different branches. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.edgemodifier import Label with DAG( - "example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False + "example_branch_labels", + schedule_interval="@daily", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, ) as dag: ingest = DummyOperator(task_id="ingest") analyse = DummyOperator(task_id="analyze") diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 69f939e9df2..eaa1532eeef 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -19,7 +19,8 @@ """Example DAG demonstrating the usage of the BranchPythonOperator.""" import random -from datetime import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,7 +30,7 @@ with DAG( dag_id='example_branch_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=['example', 'example2'], diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 09d96bea7ed..d85eda140ae 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run or skipped on alternating runs. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -49,7 +49,7 @@ def should_run(**kwargs): with DAG( dag_id='example_branch_dop_operator_v3', schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args={'depends_on_past': True}, tags=['example'], diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py index a141236cd20..22e1906c042 100644 --- a/airflow/example_dags/example_complex.py +++ b/airflow/example_dags/example_complex.py @@ -19,7 +19,7 @@ """ Example Airflow DAG that shows the complex DAG structure. """ -from datetime import datetime +import pendulum from airflow import models from airflow.models.baseoperator import chain @@ -28,7 +28,7 @@ with models.DAG( dag_id="example_complex", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example', 'example2', 'example3'], ) as dag: diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index fb653d237ac..88e0282016d 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -15,10 +15,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime from typing import Any, Dict import httpx +import pendulum from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator @@ -38,7 +38,12 @@ def execute(self, context: Context): # [START dag_decorator_usage] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def example_dag_decorator(email: str = 'example@example.com'): """ DAG to send server IP to email. diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index b6c3da84360..a39ebf5c404 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -37,13 +37,13 @@ exception """ -import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor -start_date = datetime.datetime(2021, 1, 1) +start_date = pendulum.datetime(2021, 1, 1, tz="UTC") with DAG( dag_id="example_external_task_marker_parent", diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index d31be60461b..d5d42f87d92 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -20,7 +20,8 @@ """ import logging import os -from datetime import datetime + +import pendulum from airflow import DAG from airflow.configuration import conf @@ -45,7 +46,7 @@ with DAG( dag_id='example_kubernetes_executor', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 76b5f630c7d..67f004aef38 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -20,7 +20,9 @@ """ # [START example] -import datetime as dt +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,8 +31,8 @@ with DAG( dag_id='latest_only_with_trigger', - schedule_interval=dt.timedelta(hours=4), - start_date=dt.datetime(2021, 1, 1), + schedule_interval=datetime.timedelta(hours=4), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py index add81a9fd69..27e71054d11 100644 --- a/airflow/example_dags/example_nested_branch_dag.py +++ b/airflow/example_dags/example_nested_branch_dag.py @@ -21,7 +21,7 @@ ``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding ``BranchPythonOperator`` are skipped. """ -from datetime import datetime +import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -30,7 +30,7 @@ with DAG( dag_id="example_nested_branch_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=["example"], diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index c570591d9bc..8057d5fd54a 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -18,10 +18,12 @@ """Example DAG demonstrating the usage of the params arguments in templated arguments.""" +import datetime import os -from datetime import datetime, timedelta from textwrap import dedent +import pendulum + from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator @@ -58,9 +60,9 @@ def print_env_vars(test_mode=None): with DAG( "example_passing_params_via_test_command", schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=4), + dagrun_timeout=datetime.timedelta(minutes=4), tags=['example'], ) as dag: run_this = my_py_command(params={"miff": "agg"}) diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index d533d84506a..0f9a7fc476a 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -23,9 +23,10 @@ import logging import shutil import time -from datetime import datetime from pprint import pprint +import pendulum + from airflow import DAG from airflow.decorators import task @@ -34,7 +35,7 @@ with DAG( dag_id='example_python_operator', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index ba772787d19..619785731ac 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the ShortCircuitOperator.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.models.baseoperator import chain @@ -27,7 +27,7 @@ with DAG( dag_id='example_short_circuit_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 8ccdcfd5f2c..fd220eb7a61 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.exceptions import AirflowSkipException @@ -55,6 +55,11 @@ def create_test_pipeline(suffix, trigger_rule): join >> final -with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag: +with DAG( + dag_id='example_skip_dag', + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) as dag: create_test_pipeline('1', TriggerRule.ALL_SUCCESS) create_test_pipeline('2', TriggerRule.ONE_SUCCESS) diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py index 7a46bc4ec11..0db6bc1ba7f 100644 --- a/airflow/example_dags/example_sla_dag.py +++ b/airflow/example_dags/example_sla_dag.py @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. +import datetime import time -from datetime import datetime, timedelta + +import pendulum from airflow.decorators import dag, task @@ -39,13 +41,13 @@ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis): @dag( schedule_interval="*/2 * * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, sla_miss_callback=sla_callback, default_args={'email': "email@example.com"}, ) def example_sla_dag(): - @task(sla=timedelta(seconds=10)) + @task(sla=datetime.timedelta(seconds=10)) def sleep_20(): """Sleep for 20 seconds""" time.sleep(20) diff --git a/airflow/example_dags/example_task_group.py b/airflow/example_dags/example_task_group.py index d81bf007bab..46f709eaf87 100644 --- a/airflow/example_dags/example_task_group.py +++ b/airflow/example_dags/example_task_group.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the TaskGroup.""" -from datetime import datetime +import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator @@ -26,7 +26,10 @@ # [START howto_task_group] with DAG( - dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start = DummyOperator(task_id="start") diff --git a/airflow/example_dags/example_task_group_decorator.py b/airflow/example_dags/example_task_group_decorator.py index c35e1a9a4b3..2e207fbd81f 100644 --- a/airflow/example_dags/example_task_group_decorator.py +++ b/airflow/example_dags/example_task_group_decorator.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the usage of the @taskgroup decorator.""" -from datetime import datetime +import pendulum from airflow.decorators import task, task_group from airflow.models.dag import DAG @@ -65,7 +65,10 @@ def task_group_function(value: int) -> None: # Executing Tasks and TaskGroups with DAG( - dag_id="example_task_group_decorator", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start_task = task_start() end_task = task_end() diff --git a/airflow/example_dags/example_time_delta_sensor_async.py b/airflow/example_dags/example_time_delta_sensor_async.py index ce8cab005e6..1a7126a2262 100644 --- a/airflow/example_dags/example_time_delta_sensor_async.py +++ b/airflow/example_dags/example_time_delta_sensor_async.py @@ -21,7 +21,9 @@ defers and doesn't occupy a worker slot while it waits """ -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -30,10 +32,10 @@ with DAG( dag_id="example_time_delta_sensor_async", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: - wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=10)) + wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10)) finish = DummyOperator(task_id="finish") wait >> finish diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 27df3d26510..a017c9a5b41 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -21,14 +21,14 @@ 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator with DAG( dag_id="example_trigger_controller_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@once", tags=['example'], diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index b7943a3dfde..20932338c8d 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -21,7 +21,7 @@ 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -40,7 +40,7 @@ def run_this_func(dag_run=None): with DAG( dag_id="example_trigger_target_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 405d5c527d1..b55d4e5d667 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of XComs.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -64,7 +64,7 @@ def pull_value_from_bash_push(ti=None): with DAG( 'example_xcom', schedule_interval="@once", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_xcomargs.py b/airflow/example_dags/example_xcomargs.py index 7e0cdd901ce..00af4725c24 100644 --- a/airflow/example_dags/example_xcomargs.py +++ b/airflow/example_dags/example_xcomargs.py @@ -18,7 +18,8 @@ """Example DAG demonstrating the usage of the XComArgs.""" import logging -from datetime import datetime + +import pendulum from airflow import DAG from airflow.decorators import task @@ -41,7 +42,7 @@ def print_value(value, ts=None): with DAG( dag_id='example_xcom_args', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], @@ -50,7 +51,7 @@ def print_value(value, ts=None): with DAG( "example_xcom_args_with_operators", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index d337a03679c..7c913099b31 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -19,7 +19,7 @@ """Helper function to generate a DAG and operators given some arguments.""" # [START subdag] -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -38,7 +38,7 @@ def subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", ) diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py index dd184497869..d039a73488c 100644 --- a/airflow/example_dags/tutorial_etl_dag.py +++ b/airflow/example_dags/tutorial_etl_dag.py @@ -24,9 +24,10 @@ # [START tutorial] # [START import_module] import json -from datetime import datetime from textwrap import dedent +import pendulum + # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -45,7 +46,7 @@ # [END default_args] description='ETL DAG tutorial', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py index 3b0ba51a28c..f6af78f0a5a 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl.py @@ -20,7 +20,8 @@ # [START tutorial] # [START import_module] import json -from datetime import datetime + +import pendulum from airflow.decorators import dag, task @@ -28,7 +29,12 @@ # [START instantiate_dag] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def tutorial_taskflow_api_etl(): """ ### TaskFlow API Tutorial Documentation diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 68a2b23fc50..f893096a516 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -194,6 +194,9 @@ class DAG(LoggingMixin): DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG. + Note that if you plan to use time zones all the dates provided should be pendulum + dates. See :ref:`timezone_aware_dags`. + :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :param description: The description for the DAG to e.g. be shown on the webserver diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 6dd34760d3b..6a304a46b2e 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -48,6 +48,7 @@ from airflow.settings import json from airflow.timetables.base import Timetable from airflow.utils.code_utils import get_python_source +from airflow.utils.docs import get_docs_url from airflow.utils.module_loading import as_importable_string, import_string from airflow.utils.task_group import MappedTaskGroup, TaskGroup @@ -117,7 +118,10 @@ def encode_timezone(var: Timezone) -> Union[str, int]: return var.offset if isinstance(var, Timezone): return var.name - raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}") + raise ValueError( + f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}. " + f"See {get_docs_url('timezone.html#time-zone-aware-dags')}" + ) def decode_timezone(var: Union[str, int]) -> Timezone: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 951e6b42907..3b01b3ece98 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -121,7 +121,7 @@ Bad example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -131,7 +131,7 @@ Bad example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -151,7 +151,7 @@ Good example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -159,7 +159,7 @@ Good example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -237,12 +237,13 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha .. code-block:: python + import pendulum from my_company_utils.common import ALL_TASKS with DAG( dag_id="my_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) as dag: for task in ALL_TASKS: @@ -486,13 +487,14 @@ This is an example test want to verify the structure of a code-generated DAG aga .. code-block:: python import datetime + import pendulum import pytest from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType - DATA_INTERVAL_START = datetime.datetime(2021, 9, 13) + DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) TEST_DAG_ID = "my_custom_operator_dag" diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index e339abeda65..32d21cea197 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -38,19 +38,22 @@ There are three ways to declare a DAG - either you can use a context manager, which will add the DAG to anything inside it implicitly:: with DAG( - "my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False ) as dag: op = DummyOperator(task_id="task") Or, you can use a standard constructor, passing the dag into any operators you use:: - my_dag = DAG("my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) op = DummyOperator(task_id="task", dag=my_dag) Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator `:: - @dag(start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) def generate_dag(): op = DummyOperator(task_id="task") @@ -214,10 +217,11 @@ Default Arguments Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it:: + import pendulum with DAG( dag_id='my_dag', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, default_args={'retries': 2}, @@ -390,7 +394,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality .. code-block:: python # dags/branch_without_trigger.py - import datetime as dt + import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -399,7 +403,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality dag = DAG( dag_id="branch_without_trigger", schedule_interval="@once", - start_date=dt.datetime(2019, 2, 28), + start_date=pendulum.datetime(2019, 2, 28, tz="UTC"), ) run_this_first = DummyOperator(task_id="run_this_first", dag=dag) @@ -483,9 +487,11 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level:: + import pendulum + with DAG( dag_id='dag1', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False, default_args={'retries': 1}, @@ -563,9 +569,13 @@ This is especially useful if your tasks are built dynamically from configuration """ ### My great DAG """ + import pendulum dag = DAG( - "my_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", + catchup=False, ) dag.doc_md = __doc__ diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 9752287daa1..00aa5f41565 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -177,7 +177,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: dag = DAG( dag_id="example_template_as_python_object", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, render_template_as_native_obj=True, ) diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 62555b10ed3..1a2bbe3d6ec 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -113,17 +113,18 @@ in the configuration file. When turned off, the scheduler creates a DAG run only """ from airflow.models.dag import DAG from airflow.operators.bash import BashOperator - from datetime import datetime, timedelta + import datetime + import pendulum dag = DAG( "tutorial", default_args={ "depends_on_past": True, "retries": 1, - "retry_delay": timedelta(minutes=3), + "retry_delay": datetime.timedelta(minutes=3), }, - start_date=datetime(2015, 12, 1), + start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), description="A simple tutorial DAG", schedule_interval="@daily", catchup=False, @@ -225,7 +226,7 @@ Example of a parameterized DAG: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -233,7 +234,7 @@ Example of a parameterized DAG: dag = DAG( "example_parameterized_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst index 99ab46a03db..72f441026e3 100644 --- a/docs/apache-airflow/executor/kubernetes.rst +++ b/docs/apache-airflow/executor/kubernetes.rst @@ -154,7 +154,8 @@ Here is an example of a task with both features: .. code-block:: python import os - from datetime import datetime + + import pendulum from airflow import DAG from airflow.decorators import task @@ -166,7 +167,7 @@ Here is an example of a task with both features: with DAG( dag_id="example_pod_template_file", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example3"], ) as dag: diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index a2f3d28617c..c2568926ba6 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -178,7 +178,8 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo from airflow import DAG from airflow.operators.python_operator import PythonOperator - from datetime import datetime + + import pendulum def create_dag(dag_id, schedule, dag_number, default_args): @@ -186,7 +187,12 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo print("Hello World") print("This is DAG: {}".format(str(dag_number))) - dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args) + dag = DAG( + dag_id, + schedule_interval=schedule, + default_args=default_args, + pendulum.datetime(2021, 9, 13, tz="UTC"), + ) with dag: t1 = PythonOperator(task_id="hello_world", python_callable=hello_world_py) @@ -242,6 +248,14 @@ backfill CLI command, gets overridden by the backfill's ``start_date`` commands. This allows for a backfill on tasks that have ``depends_on_past=True`` to actually start. If this were not the case, the backfill just would not start. +Using time zones +---------------- + +Creating a time zone aware datetime (e.g. DAG's ``start_date``) is quite simple. Just make sure to supply +a time zone aware dates using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + .. _faq:what-does-execution-date-mean: @@ -389,12 +403,12 @@ upstream task. .. code-block:: python + import pendulum + from airflow.decorators import dag, task from airflow.exceptions import AirflowException from airflow.utils.trigger_rule import TriggerRule - from datetime import datetime - @task def a_func(): @@ -408,7 +422,7 @@ upstream task. pass - @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1)) + @dag(schedule_interval="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC")) def my_dag(): a = a_func() b = b_func() diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 8317fbbc3a5..61dba352145 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -69,7 +69,7 @@ file: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -77,7 +77,7 @@ file: with DAG( dag_id="example_after_workday_timetable_dag", - start_date=datetime.datetime(2021, 3, 10), + start_date=pendulum.datetime(2021, 3, 10, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: @@ -190,7 +190,7 @@ For reference, here's our plugin and DAG files in their entirety: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -199,7 +199,7 @@ For reference, here's our plugin and DAG files in their entirety: with DAG( dag_id="example_workday_timetable", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 7e11517ca95..3c8899c6f9a 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -30,7 +30,8 @@ works. .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum from airflow.lineage import AUTO from airflow.lineage.entities import File @@ -42,10 +43,10 @@ works. dag = DAG( dag_id="example_lineage", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="0 0 * * *", catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) f_final = File(url="/tmp/final") diff --git a/docs/apache-airflow/logging-monitoring/callbacks.rst b/docs/apache-airflow/logging-monitoring/callbacks.rst index 77ac594aa37..15bbacbabe8 100644 --- a/docs/apache-airflow/logging-monitoring/callbacks.rst +++ b/docs/apache-airflow/logging-monitoring/callbacks.rst @@ -51,7 +51,9 @@ In the following example, failures in any task call the ``task_failure_alert`` f .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum + from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -67,8 +69,8 @@ In the following example, failures in any task call the ``task_failure_alert`` f with DAG( dag_id="example_callback", schedule_interval=None, - start_date=datetime(2021, 1, 1), - dagrun_timeout=timedelta(minutes=60), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + dagrun_timeout=datetime.timedelta(minutes=60), catchup=False, on_success_callback=None, on_failure_callback=task_failure_alert, diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 527888ece2e..5df5a37fa2a 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -40,6 +40,7 @@ The time zone is set in ``airflow.cfg``. By default it is set to UTC, but you ch an arbitrary IANA time zone, e.g. ``Europe/Amsterdam``. It is dependent on ``pendulum``, which is more accurate than ``pytz``. Pendulum is installed when you install Airflow. + Web UI ------ @@ -90,7 +91,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create .. code-block:: python - dag = DAG("my_dag", start_date=datetime(2017, 1, 1), default_args={"retries": 3}) + dag = DAG( + "my_dag", + start_date=pendulum.datetime(2017, 1, 1, tz="UTC"), + default_args={"retries": 3}, + ) op = BashOperator(task_id="dummy", bash_command="Hello World!", dag=dag) print(op.retries) # 3 @@ -120,19 +125,21 @@ it is therefore important to make sure this setting is equal on all Airflow node .. note:: For more information on setting the configuration, see :doc:`howto/set-config` +.. _timezone_aware_dags: + Time zone aware DAGs -------------------- Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware ``start_date`` -using ``pendulum``. +using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. .. code-block:: python import pendulum - local_tz = pendulum.timezone("Europe/Amsterdam") - - dag = DAG("my_tz_dag", start_date=datetime(2016, 1, 1, tzinfo=local_tz)) + dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")) op = DummyOperator(task_id="dummy", dag=dag) print(dag.timezone) # @@ -170,6 +177,6 @@ Time deltas Time zone aware DAGs that use ``timedelta`` or ``relativedelta`` schedules respect daylight savings time for the start date but do not adjust for daylight savings time when scheduling subsequent runs. For example, a -DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="US/Eastern")`` +DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="UTC")`` and a schedule interval of ``timedelta(days=1)`` will run daily at 05:00 UTC regardless of daylight savings time. diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index acaca762307..149d9676da3 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -230,6 +230,14 @@ Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once. +Using time zones +---------------- + +Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates +using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + Recap ----- Alright, so we have a pretty basic DAG. At this point your code should look @@ -489,7 +497,8 @@ Lets look at our DAG: .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum import requests from airflow.decorators import dag, task @@ -498,9 +507,9 @@ Lets look at our DAG: @dag( schedule_interval="0 0 * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) def Etl(): @task diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py index 25ceeba6d3e..a12f2f65d34 100644 --- a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py +++ b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py @@ -17,13 +17,15 @@ # under the License. # [START dag] """This dag only runs some simple tasks to test Airflow's task execution.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow.models.dag import DAG from airflow.operators.dummy import DummyOperator -now = datetime.now() -now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) +now = pendulum.now(tz="UTC") +now_to_the_hour = (now - datetime.timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) START_DATE = now_to_the_hour DAG_NAME = 'test_dag_v1' @@ -31,7 +33,7 @@ DAG_NAME, schedule_interval='*/10 * * * *', default_args={'depends_on_past': True}, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, )