From 2c8b5673a62261ce42a6d5b0d3df005662211b9e Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 23 Sep 2024 09:44:27 -0700 Subject: [PATCH 1/3] Deprecate DAG.run method This method relies on local backfill mode, which is slated for removal in 3.0. We have suitable alternatives such as DAG.test() and triggering dags via API. --- airflow/models/dag.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index c95d11f3efe61..215dae298f106 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -28,6 +28,7 @@ import sys import time import traceback +import warnings import weakref from collections import abc, defaultdict, deque from contextlib import ExitStack @@ -88,6 +89,7 @@ DuplicateTaskIdFound, FailStopDagInvalidTriggerRule, ParamValidationError, + RemovedInAirflow3Warning, TaskDeferred, TaskNotFound, UnknownExecutorException, @@ -2331,6 +2333,13 @@ def run( :param run_at_least_once: If true, always run the DAG at least once even if no logical run exists within the time range. """ + warnings.warn( + "`DAG.run()` is deprecated and will be removed in Airflow 3.0. Consider " + "using `DAG.test()` instead, or trigger your dag via API.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.backfill_job_runner import BackfillJobRunner From 796541e68e1f61ea5b6bfcff9f2dd65dcb1b7868 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:40:17 -0700 Subject: [PATCH 2/3] handle warnings --- tests/jobs/test_scheduler_job.py | 107 ++++++++++++++++++------------- tests/models/test_dag.py | 20 +++--- 2 files changed, 75 insertions(+), 52 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 372a6ae2d9dfe..d8092db30d9b7 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -42,7 +42,7 @@ from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.datasets import Dataset from airflow.datasets.manager import DatasetManager -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, RemovedInAirflow3Warning from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import MOCK_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader @@ -2838,6 +2838,10 @@ def evaluate_dagrun( This is hackish: a dag run is created but its tasks are run by a backfill. """ + + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + if run_kwargs is None: run_kwargs = {} @@ -2898,40 +2902,49 @@ def test_dagrun_fail(self): """ DagRuns with one failed and one incomplete root task -> FAILED """ - self.evaluate_dagrun( - dag_id="test_dagrun_states_fail", - expected_task_states={ - "test_dagrun_fail": State.FAILED, - "test_dagrun_succeed": State.UPSTREAM_FAILED, - }, - dagrun_state=State.FAILED, - ) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + self.evaluate_dagrun( + dag_id="test_dagrun_states_fail", + expected_task_states={ + "test_dagrun_fail": State.FAILED, + "test_dagrun_succeed": State.UPSTREAM_FAILED, + }, + dagrun_state=State.FAILED, + ) def test_dagrun_success(self): """ DagRuns with one failed and one successful root task -> SUCCESS """ - self.evaluate_dagrun( - dag_id="test_dagrun_states_success", - expected_task_states={ - "test_dagrun_fail": State.FAILED, - "test_dagrun_succeed": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - ) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + self.evaluate_dagrun( + dag_id="test_dagrun_states_success", + expected_task_states={ + "test_dagrun_fail": State.FAILED, + "test_dagrun_succeed": State.SUCCESS, + }, + dagrun_state=State.SUCCESS, + ) def test_dagrun_root_fail(self): """ DagRuns with one successful and one failed root task -> FAILED """ - self.evaluate_dagrun( - dag_id="test_dagrun_states_root_fail", - expected_task_states={ - "test_dagrun_succeed": State.SUCCESS, - "test_dagrun_fail": State.FAILED, - }, - dagrun_state=State.FAILED, - ) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + self.evaluate_dagrun( + dag_id="test_dagrun_states_root_fail", + expected_task_states={ + "test_dagrun_succeed": State.SUCCESS, + "test_dagrun_fail": State.FAILED, + }, + dagrun_state=State.FAILED, + ) def test_dagrun_root_fail_unfinished(self): """ @@ -2994,16 +3007,19 @@ def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): if ignore_first_depends_on_past=True and the dagrun execution_date is after the start_date. """ - self.evaluate_dagrun( - dag_id="test_dagrun_states_deadlock", - expected_task_states={ - "test_depends_on_past": State.SUCCESS, - "test_depends_on_past_2": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - advance_execution_date=True, - run_kwargs=dict(ignore_first_depends_on_past=True), - ) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + self.evaluate_dagrun( + dag_id="test_dagrun_states_deadlock", + expected_task_states={ + "test_depends_on_past": State.SUCCESS, + "test_depends_on_past_2": State.SUCCESS, + }, + dagrun_state=State.SUCCESS, + advance_execution_date=True, + run_kwargs=dict(ignore_first_depends_on_past=True), + ) def test_dagrun_deadlock_ignore_depends_on_past(self): """ @@ -3012,15 +3028,18 @@ def test_dagrun_deadlock_ignore_depends_on_past(self): test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except that start_date == execution_date so depends_on_past is irrelevant). """ - self.evaluate_dagrun( - dag_id="test_dagrun_states_deadlock", - expected_task_states={ - "test_depends_on_past": State.SUCCESS, - "test_depends_on_past_2": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - run_kwargs=dict(ignore_first_depends_on_past=True), - ) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + self.evaluate_dagrun( + dag_id="test_dagrun_states_deadlock", + expected_task_states={ + "test_depends_on_past": State.SUCCESS, + "test_depends_on_past_2": State.SUCCESS, + }, + dagrun_state=State.SUCCESS, + run_kwargs=dict(ignore_first_depends_on_past=True), + ) @pytest.mark.parametrize( "configs", diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 90d956caeb7dd..df4a892768816 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -45,6 +45,7 @@ AirflowException, DuplicateTaskIdFound, ParamValidationError, + RemovedInAirflow3Warning, UnknownExecutorException, ) from airflow.executors import executor_loader @@ -2733,14 +2734,17 @@ def test_dataset_expression(self, session: Session) -> None: @mock.patch("airflow.models.dag.run_job") def test_dag_executors(self, run_job_mock): - dag = DAG(dag_id="test", schedule=None) - reload(executor_loader) - with conf_vars({("core", "executor"): "SequentialExecutor"}): - dag.run() - assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor) - - dag.run(local=True) - assert isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor) + # todo: AIP-78 remove along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + dag = DAG(dag_id="test", schedule=None) + reload(executor_loader) + with conf_vars({("core", "executor"): "SequentialExecutor"}): + dag.run() + assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor) + + dag.run(local=True) + assert isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor) class TestQueries: From 577510497a9bec96c0c91279153796d7e8fc470f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 23 Sep 2024 15:16:15 -0700 Subject: [PATCH 3/3] handle warning in test --- tests/jobs/test_scheduler_job.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index d8092db30d9b7..9113a2dee1bd1 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2965,9 +2965,12 @@ def test_dagrun_root_fail_unfinished(self): ) self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id) - for _ in _mock_executor(self.null_exec): - with pytest.raises(AirflowException): - dag.run(start_date=dr.execution_date, end_date=dr.execution_date) + # todo: AIP-78 remove this test along with DAG.run() + # this only tests the backfill job runner, not the scheduler + with pytest.warns(RemovedInAirflow3Warning): + for _ in _mock_executor(self.null_exec): + with pytest.raises(AirflowException): + dag.run(start_date=dr.execution_date, end_date=dr.execution_date) # Mark the successful task as never having run since we want to see if the # dagrun will be in a running state despite having an unfinished task.