Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced all days_ago functions with datetime functions #23237

Merged
merged 5 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
"""Example DAG demonstrating the usage of the SubDagOperator."""

# [START example_subdag_operator]
import datetime

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'example_subdag_operator'

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=days_ago(2),
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@once",
tags=['example'],
) as dag:
Expand Down
7 changes: 3 additions & 4 deletions tests/api/common/test_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
# specific language governing permissions and limitations
# under the License.


import pytest

from airflow import models
from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -73,11 +72,11 @@ def setup_dag_models(self, for_sub_dag=False):

task = EmptyOperator(
task_id='dummy',
dag=models.DAG(dag_id=self.key, default_args={'start_date': days_ago(2)}),
dag=models.DAG(dag_id=self.key, default_args={'start_date': timezone.datetime(2022, 1, 1)}),
owner='airflow',
)

test_date = days_ago(1)
test_date = timezone.datetime(2022, 1, 1)
with create_session() as session:
session.add(DM(dag_id=self.key, fileloc=self.dag_file_path, is_subdag=for_sub_dag))
dr = DR(dag_id=self.key, run_type=DagRunType.MANUAL, run_id="test", execution_date=test_date)
Expand Down
21 changes: 12 additions & 9 deletions tests/api/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

from datetime import timedelta
import datetime
from typing import Callable

import pytest
Expand All @@ -34,7 +34,6 @@
)
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -62,12 +61,12 @@ def create_dags(cls, dagbag):
cls.dag2 = dagbag.get_dag('example_subdag_operator')
cls.dag3 = dagbag.get_dag('example_trigger_target_dag')
cls.dag4 = dagbag.get_dag('test_mapped_classic')
cls.execution_dates = [days_ago(2), days_ago(1)]
cls.execution_dates = [timezone.datetime(2022, 1, 1), timezone.datetime(2022, 1, 2)]
start_date3 = cls.dag3.start_date
cls.dag3_execution_dates = [
start_date3,
start_date3 + timedelta(days=1),
start_date3 + timedelta(days=2),
start_date3 + datetime.timedelta(days=1),
start_date3 + datetime.timedelta(days=2),
]

@pytest.fixture(autouse=True)
Expand All @@ -76,7 +75,7 @@ def setup(self):
clear_db_runs()
drs = _create_dagruns(
self.dag1,
[_DagRunInfo(d, (d, d + timedelta(days=1))) for d in self.execution_dates],
[_DagRunInfo(d, (d, d + datetime.timedelta(days=1))) for d in self.execution_dates],
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
Expand All @@ -88,7 +87,7 @@ def setup(self):
[
_DagRunInfo(
self.dag2.start_date,
(self.dag2.start_date, self.dag2.start_date + timedelta(days=1)),
(self.dag2.start_date, self.dag2.start_date + datetime.timedelta(days=1)),
),
],
state=State.RUNNING,
Expand All @@ -112,7 +111,7 @@ def setup(self):
[
_DagRunInfo(
self.dag4.start_date,
(self.dag4.start_date, self.dag4.start_date + timedelta(days=1)),
(self.dag4.start_date, self.dag4.start_date + datetime.timedelta(days=1)),
)
],
state=State.SUCCESS,
Expand Down Expand Up @@ -482,7 +481,11 @@ def setup_class(cls):
cls.dag1.sync_to_db()
cls.dag2 = dagbag.dags['example_subdag_operator']
cls.dag2.sync_to_db()
cls.execution_dates = [days_ago(2), days_ago(1), days_ago(0)]
cls.execution_dates = [
timezone.datetime(2022, 1, 1),
timezone.datetime(2022, 1, 2),
timezone.datetime(2022, 1, 3),
]

def setup_method(self):
clear_db_runs()
Expand Down
33 changes: 14 additions & 19 deletions tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from urllib.parse import quote_plus

import pytest
from parameterized import parameterized

from airflow import DAG
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
Expand All @@ -28,8 +28,8 @@
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import create_user, delete_user
from tests.test_utils.db import clear_db_runs, clear_db_xcom
Expand Down Expand Up @@ -61,7 +61,7 @@ def configured_app(minimal_app_for_api):
class TestGetExtraLinks:
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app, session) -> None:
self.default_time = datetime(2020, 1, 1)
self.default_time = timezone.datetime(2020, 1, 1)

clear_db_runs()
clear_db_xcom()
Expand Down Expand Up @@ -90,40 +90,35 @@ def teardown_method(self) -> None:
clear_db_xcom()

def _create_dag(self):
with DAG(
dag_id="TEST_DAG_ID",
default_args=dict(
start_date=self.default_time,
),
) as dag:
with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag:
BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1")
BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"])
return dag

@parameterized.expand(
@pytest.mark.parametrize(
"url, expected_title, expected_detail",
[
(
"missing_dag",
pytest.param(
"/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG not found",
'DAG with ID = "INVALID" not found',
id="missing_dag",
),
(
"missing_dag_run",
pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG Run not found",
'DAG Run with ID = "INVALID" not found',
id="missing_dag_run",
),
(
"missing_task",
pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links",
"Task not found",
'Task with ID = "INVALID" not found',
id="missing_task",
),
]
],
)
def test_should_respond_404(self, name, url, expected_title, expected_detail):
del name
def test_should_respond_404(self, url, expected_title, expected_detail):
response = self.client.get(url, environ_overrides={'REMOTE_USER': "test"})

assert 404 == response.status_code
Expand Down
12 changes: 5 additions & 7 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import io
import json
import logging
Expand All @@ -24,7 +24,6 @@
import unittest
from argparse import ArgumentParser
from contextlib import redirect_stdout
from datetime import datetime
from unittest import mock

import pytest
Expand All @@ -38,14 +37,13 @@
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools, clear_db_runs

DEFAULT_DATE = days_ago(1)
DEFAULT_DATE = timezone.datetime(2022, 1, 1)
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
)
Expand Down Expand Up @@ -374,7 +372,7 @@ def test_task_states_for_dag_run(self):

dag2 = DagBag().dags['example_python_operator']
task2 = dag2.get_task(task_id='print_the_context')
default_date2 = timezone.make_aware(datetime(2016, 1, 9))
default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
dagrun = dag2.create_dagrun(
state=State.RUNNING,
Expand Down Expand Up @@ -417,7 +415,7 @@ def test_task_states_for_dag_run_when_dag_run_not_exists(self):
task_states_for_dag_run should return an AirflowException when invalid dag id is passed
"""
with pytest.raises(DagRunNotFound):
default_date2 = timezone.make_aware(datetime(2016, 1, 9))
default_date2 = timezone.datetime(2016, 1, 9)
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
Expand Down Expand Up @@ -455,7 +453,7 @@ def setUp(self) -> None:
self.run_id = "test_run"
self.dag_path = os.path.join(ROOT_FOLDER, "dags", "test_logging_in_dag.py")
reset(self.dag_id)
self.execution_date = timezone.make_aware(datetime(2017, 1, 1))
self.execution_date = timezone.datetime(2017, 1, 1)
self.execution_date_str = self.execution_date.isoformat()
self.task_args = ['tasks', 'run', self.dag_id, self.task_id, '--local', self.execution_date_str]
self.log_dir = conf.get_mandatory_value('logging', 'base_log_folder')
Expand Down
18 changes: 8 additions & 10 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#

import datetime
import os
from unittest import mock
Expand All @@ -34,7 +33,6 @@
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -113,7 +111,7 @@ def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag):

# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
test_start_date = days_ago(1)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -142,7 +140,7 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag
# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
# Pass anything besides a timedelta object to the sla argument.
test_start_date = days_ago(1)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -170,7 +168,7 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dum

# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -206,7 +204,7 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak

# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
with dag_maker(
dag_id='test_sla_miss',
default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
Expand Down Expand Up @@ -247,7 +245,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c

sla_callback = MagicMock(side_effect=RuntimeError('Could not call function'))

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -277,7 +275,7 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
):
session = settings.Session()

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
email1 = '[email protected]'
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
Expand Down Expand Up @@ -317,7 +315,7 @@ def test_dag_file_processor_sla_miss_email_exception(
# Mock the callback function so we can verify that it was not called
mock_send_email.side_effect = RuntimeError('Could not send an email')

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -347,7 +345,7 @@ def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag):
"""
session = settings.Session()

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down
6 changes: 4 additions & 2 deletions tests/dags/test_default_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pendulum

from airflow.models import DAG
from airflow.utils.dates import days_ago

args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
args = {'owner': 'airflow', 'retries': 3, 'start_date': pendulum.datetime(2022, 1, 1)}

tree_dag = DAG(
dag_id='test_tree_view',
Expand Down
9 changes: 3 additions & 6 deletions tests/dags/test_example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
import datetime

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}

dag = DAG(
dag_id='test_example_bash_operator',
default_args=args,
default_args={'owner': 'airflow', 'retries': 3, 'start_date': datetime.datetime(2022, 1, 1)},
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
dagrun_timeout=datetime.timedelta(minutes=60),
)

cmd = 'ls -l'
Expand Down
Loading