Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve deprecations in core sensors tests #39404

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,24 +299,6 @@
- tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag
- tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname
- tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_log
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_soft_fail_as_skipped
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_not_exists_without_check_existence
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_failed_states
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_success
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_when_there_is_no_TIs
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_failed_states
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_sensor_success
- tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_sensor_with_task_group
- tests/sensors/test_external_task_sensor.py::test_clear_multiple_external_task_marker
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_clear_activate
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_deep
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_shallow
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_exception
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_future
- tests/sensors/test_external_task_sensor.py::test_external_task_marker_transitive
- tests/sensors/test_timeout_sensor.py::TestSensorTimeout::test_timeout
- tests/triggers/test_external_task.py::TestTaskStateTrigger::test_task_state_trigger_success


Expand Down
60 changes: 37 additions & 23 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,49 +109,50 @@ def setup_method(self):
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG(TEST_DAG_ID, default_args=self.args)
self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE)

def add_time_sensor(self, task_id=TEST_TASK_ID):
op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def add_dummy_task_group(self, target_states=None):
def add_fake_task_group(self, target_states=None):
target_states = [State.SUCCESS] * 2 if target_states is None else target_states
with self.dag as dag:
with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
_ = [EmptyOperator(task_id=f"task{i}") for i in range(len(target_states))]
SerializedDagModel.write_dag(dag)

for idx, task in enumerate(task_group):
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti = TaskInstance(task=task, run_id=self.dag_run_id)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_states[idx])

def add_dummy_task_group_with_dynamic_tasks(self, target_state=State.SUCCESS):
def add_fake_task_group_with_dynamic_tasks(self, target_state=State.SUCCESS):
map_indexes = range(5)
with self.dag as dag:
with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:

@task_deco
def dummy_task():
def fake_task():
pass

@task_deco
def dummy_mapped_task(x: int):
def fake_mapped_task(x: int):
return x

dummy_task()
dummy_mapped_task.expand(x=list(map_indexes))
fake_task()
fake_mapped_task.expand(x=list(map_indexes))

SerializedDagModel.write_dag(dag)

for task in task_group:
if task.task_id == "dummy_mapped_task":
if task.task_id == "fake_mapped_task":
for map_index in map_indexes:
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE, map_index=map_index)
ti = TaskInstance(task=task, run_id=self.dag_run_id, map_index=map_index)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_state)
else:
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti = TaskInstance(task=task, run_id=self.dag_run_id)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_state)

Expand All @@ -178,7 +179,7 @@ def test_external_task_sensor_multiple_task_ids(self):

def test_external_task_sensor_with_task_group(self):
self.add_time_sensor()
self.add_dummy_task_group()
self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_task_group",
external_dag_id=TEST_DAG_ID,
Expand Down Expand Up @@ -236,7 +237,7 @@ def test_raise_with_external_task_sensor_task_group_and_task_ids(self):
# this behaviour is similar to external_task_id doesn't exists
def test_external_task_group_not_exists_without_check_existence(self):
self.add_time_sensor()
self.add_dummy_task_group()
self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand All @@ -250,7 +251,7 @@ def test_external_task_group_not_exists_without_check_existence(self):

def test_external_task_group_sensor_success(self):
self.add_time_sensor()
self.add_dummy_task_group()
self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand All @@ -263,7 +264,7 @@ def test_external_task_group_sensor_success(self):
def test_external_task_group_sensor_failed_states(self):
ti_states = [State.FAILED, State.FAILED]
self.add_time_sensor()
self.add_dummy_task_group(ti_states)
self.add_fake_task_group(ti_states)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand Down Expand Up @@ -435,7 +436,11 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c
def test_external_dag_sensor(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS
run_id="test",
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
Expand All @@ -448,7 +453,11 @@ def test_external_dag_sensor(self):
def test_external_dag_sensor_log(self, caplog):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS
run_id="test",
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
Expand All @@ -461,7 +470,11 @@ def test_external_dag_sensor_log(self, caplog):
def test_external_dag_sensor_soft_fail_as_skipped(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS
run_id="test",
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
Expand Down Expand Up @@ -795,7 +808,7 @@ def test_external_task_sensor_waits_for_dag_check_existence(self):

def test_external_task_group_with_mapped_tasks_sensor_success(self):
self.add_time_sensor()
self.add_dummy_task_group_with_dynamic_tasks()
self.add_fake_task_group_with_dynamic_tasks()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand All @@ -807,7 +820,7 @@ def test_external_task_group_with_mapped_tasks_sensor_success(self):

def test_external_task_group_with_mapped_tasks_failed_states(self):
self.add_time_sensor()
self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand All @@ -824,7 +837,7 @@ def test_external_task_group_with_mapped_tasks_failed_states(self):
def test_external_task_group_when_there_is_no_TIs(self):
"""Test that the sensor does not fail when there are no TIs to check."""
self.add_time_sensor()
self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
Expand Down Expand Up @@ -1232,6 +1245,7 @@ def run_tasks(dag_bag, execution_date=DEFAULT_DATE, session=None):
execution_date=execution_date,
start_date=execution_date,
run_type=DagRunType.MANUAL,
data_interval=(execution_date, execution_date),
session=session,
)
# we use sorting by task_id here because for the test DAG structure of ours
Expand Down Expand Up @@ -1615,7 +1629,7 @@ def dag_bag_head_tail_mapped_tasks():
with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag:

@task_deco
def dummy_task(x: int):
def fake_task(x: int):
return x

head = ExternalTaskSensor(
Expand All @@ -1626,7 +1640,7 @@ def dummy_task(x: int):
mode="reschedule",
)

body = dummy_task.expand(x=range(5))
body = fake_task.expand(x=range(5))
tail = ExternalTaskMarker(
task_id="tail",
external_dag_id=dag.dag_id,
Expand Down Expand Up @@ -1656,7 +1670,7 @@ def test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m
)
session.add(dagrun)
for task in dag.tasks:
if task.task_id == "dummy_task":
if task.task_id == "fake_task":
for map_index in range(5):
ti = TaskInstance(task=task, run_id=dagrun.run_id, map_index=map_index)
ti.state = TaskInstanceState.SUCCESS
Expand Down
85 changes: 0 additions & 85 deletions tests/sensors/test_timeout_sensor.py
Taragolis marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.