diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 2887cd13a344..131ea00c6723 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -299,24 +299,6 @@ - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_log -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_soft_fail_as_skipped -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_not_exists_without_check_existence -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_failed_states -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_success -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_when_there_is_no_TIs -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_failed_states -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_sensor_success -- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_sensor_with_task_group -- tests/sensors/test_external_task_sensor.py::test_clear_multiple_external_task_marker -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_clear_activate -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_deep -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_shallow -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_exception -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_future -- tests/sensors/test_external_task_sensor.py::test_external_task_marker_transitive -- tests/sensors/test_timeout_sensor.py::TestSensorTimeout::test_timeout - tests/triggers/test_external_task.py::TestTaskStateTrigger::test_task_state_trigger_success diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 557e4cf00dea..28db24305faa 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -109,12 +109,13 @@ def setup_method(self): self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) self.args = {"owner": "airflow", "start_date": DEFAULT_DATE} self.dag = DAG(TEST_DAG_ID, default_args=self.args) + self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE) def add_time_sensor(self, task_id=TEST_TASK_ID): op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - def add_dummy_task_group(self, target_states=None): + def add_fake_task_group(self, target_states=None): target_states = [State.SUCCESS] * 2 if target_states is None else target_states with self.dag as dag: with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group: @@ -122,36 +123,36 @@ def add_dummy_task_group(self, target_states=None): SerializedDagModel.write_dag(dag) for idx, task in enumerate(task_group): - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti = TaskInstance(task=task, run_id=self.dag_run_id) ti.run(ignore_ti_state=True, mark_success=True) ti.set_state(target_states[idx]) - def add_dummy_task_group_with_dynamic_tasks(self, target_state=State.SUCCESS): + def add_fake_task_group_with_dynamic_tasks(self, target_state=State.SUCCESS): map_indexes = range(5) with self.dag as dag: with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group: @task_deco - def dummy_task(): + def fake_task(): pass @task_deco - def dummy_mapped_task(x: int): + def fake_mapped_task(x: int): return x - dummy_task() - dummy_mapped_task.expand(x=list(map_indexes)) + fake_task() + fake_mapped_task.expand(x=list(map_indexes)) SerializedDagModel.write_dag(dag) for task in task_group: - if task.task_id == "dummy_mapped_task": + if task.task_id == "fake_mapped_task": for map_index in map_indexes: - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE, map_index=map_index) + ti = TaskInstance(task=task, run_id=self.dag_run_id, map_index=map_index) ti.run(ignore_ti_state=True, mark_success=True) ti.set_state(target_state) else: - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti = TaskInstance(task=task, run_id=self.dag_run_id) ti.run(ignore_ti_state=True, mark_success=True) ti.set_state(target_state) @@ -178,7 +179,7 @@ def test_external_task_sensor_multiple_task_ids(self): def test_external_task_sensor_with_task_group(self): self.add_time_sensor() - self.add_dummy_task_group() + self.add_fake_task_group() op = ExternalTaskSensor( task_id="test_external_task_sensor_task_group", external_dag_id=TEST_DAG_ID, @@ -236,7 +237,7 @@ def test_raise_with_external_task_sensor_task_group_and_task_ids(self): # this behaviour is similar to external_task_id doesn't exists def test_external_task_group_not_exists_without_check_existence(self): self.add_time_sensor() - self.add_dummy_task_group() + self.add_fake_task_group() op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -250,7 +251,7 @@ def test_external_task_group_not_exists_without_check_existence(self): def test_external_task_group_sensor_success(self): self.add_time_sensor() - self.add_dummy_task_group() + self.add_fake_task_group() op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -263,7 +264,7 @@ def test_external_task_group_sensor_success(self): def test_external_task_group_sensor_failed_states(self): ti_states = [State.FAILED, State.FAILED] self.add_time_sensor() - self.add_dummy_task_group(ti_states) + self.add_fake_task_group(ti_states) op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -435,7 +436,11 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c def test_external_dag_sensor(self): other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") other_dag.create_dagrun( - run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS + run_id="test", + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + state=State.SUCCESS, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), ) op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", @@ -448,7 +453,11 @@ def test_external_dag_sensor(self): def test_external_dag_sensor_log(self, caplog): other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") other_dag.create_dagrun( - run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS + run_id="test", + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + state=State.SUCCESS, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), ) op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", @@ -461,7 +470,11 @@ def test_external_dag_sensor_log(self, caplog): def test_external_dag_sensor_soft_fail_as_skipped(self): other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") other_dag.create_dagrun( - run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS + run_id="test", + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + state=State.SUCCESS, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), ) op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", @@ -795,7 +808,7 @@ def test_external_task_sensor_waits_for_dag_check_existence(self): def test_external_task_group_with_mapped_tasks_sensor_success(self): self.add_time_sensor() - self.add_dummy_task_group_with_dynamic_tasks() + self.add_fake_task_group_with_dynamic_tasks() op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -807,7 +820,7 @@ def test_external_task_group_with_mapped_tasks_sensor_success(self): def test_external_task_group_with_mapped_tasks_failed_states(self): self.add_time_sensor() - self.add_dummy_task_group_with_dynamic_tasks(State.FAILED) + self.add_fake_task_group_with_dynamic_tasks(State.FAILED) op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -824,7 +837,7 @@ def test_external_task_group_with_mapped_tasks_failed_states(self): def test_external_task_group_when_there_is_no_TIs(self): """Test that the sensor does not fail when there are no TIs to check.""" self.add_time_sensor() - self.add_dummy_task_group_with_dynamic_tasks(State.FAILED) + self.add_fake_task_group_with_dynamic_tasks(State.FAILED) op = ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -1232,6 +1245,7 @@ def run_tasks(dag_bag, execution_date=DEFAULT_DATE, session=None): execution_date=execution_date, start_date=execution_date, run_type=DagRunType.MANUAL, + data_interval=(execution_date, execution_date), session=session, ) # we use sorting by task_id here because for the test DAG structure of ours @@ -1615,7 +1629,7 @@ def dag_bag_head_tail_mapped_tasks(): with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag: @task_deco - def dummy_task(x: int): + def fake_task(x: int): return x head = ExternalTaskSensor( @@ -1626,7 +1640,7 @@ def dummy_task(x: int): mode="reschedule", ) - body = dummy_task.expand(x=range(5)) + body = fake_task.expand(x=range(5)) tail = ExternalTaskMarker( task_id="tail", external_dag_id=dag.dag_id, @@ -1656,7 +1670,7 @@ def test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m ) session.add(dagrun) for task in dag.tasks: - if task.task_id == "dummy_task": + if task.task_id == "fake_task": for map_index in range(5): ti = TaskInstance(task=task, run_id=dagrun.run_id, map_index=map_index) ti.state = TaskInstanceState.SUCCESS diff --git a/tests/sensors/test_timeout_sensor.py b/tests/sensors/test_timeout_sensor.py deleted file mode 100644 index 315658a607ae..000000000000 --- a/tests/sensors/test_timeout_sensor.py +++ /dev/null @@ -1,85 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import time -from datetime import timedelta -from typing import TYPE_CHECKING - -import pytest - -from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException -from airflow.models.dag import DAG -from airflow.sensors.base import BaseSensorOperator -from airflow.utils import timezone -from airflow.utils.timezone import datetime - -pytestmark = pytest.mark.db_test - -if TYPE_CHECKING: - from airflow.utils.context import Context - -DEFAULT_DATE = datetime(2015, 1, 1) -TEST_DAG_ID = "unit_test_dag" - - -class TimeoutTestSensor(BaseSensorOperator): - """ - Sensor that always returns the return_value provided - - :param return_value: Set to true to mark the task as SKIPPED on failure - """ - - def __init__(self, return_value=False, **kwargs): - self.return_value = return_value - super().__init__(**kwargs) - - def poke(self, context: Context): - return self.return_value - - def execute(self, context: Context): - started_at = timezone.utcnow() - time_jump = self.params["time_jump"] - while not self.poke(context): - if time_jump: - started_at -= time_jump - if (timezone.utcnow() - started_at).total_seconds() > self.timeout: - if self.soft_fail: - raise AirflowSkipException("timeout") - else: - raise AirflowSensorTimeout("timeout") - time.sleep(self.poke_interval) - self.log.info("Success criteria met. Exiting.") - - -class TestSensorTimeout: - def setup_method(self): - args = {"owner": "airflow", "start_date": DEFAULT_DATE} - self.dag = DAG(TEST_DAG_ID, default_args=args) - - def test_timeout(self): - op = TimeoutTestSensor( - task_id="test_timeout", - execution_timeout=timedelta(days=2), - return_value=False, - poke_interval=5, - params={"time_jump": timedelta(days=2, seconds=1)}, - dag=self.dag, - ) - with pytest.raises(AirflowSensorTimeout): - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)