diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index c03bc074d0abd7..819da5d7e1d740 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -526,14 +526,20 @@ def deactivate_stale_dags( dags_parsed = session.execute(query) for dag in dags_parsed: + # When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder, + # DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact + # on standalone DAG processors. + dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured - if ( + dag_removed_from_dag_folder_or_file = ( dag.fileloc in last_parsed and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc] - ): + ) + + if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file: cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id) to_deactivate.add(dag.dag_id) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 8112b7222a6973..b5d0b35580b4aa 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -26,6 +26,7 @@ import random import socket import sys +import tempfile import textwrap import threading import time @@ -638,7 +639,7 @@ def test_scan_stale_dags(self): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory="directory", + dag_directory=str(TEST_DAG_FOLDER), max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -712,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self): """ Ensure only dags from current dag_directory are updated """ - dag_directory = "directory" + dag_directory = str(TEST_DAG_FOLDER) manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=dag_directory, + dag_directory=TEST_DAG_FOLDER, max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -740,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self): # Add stale DAG to the DB other_dag = other_dagbag.get_dag("test_start_date_scheduling") other_dag.last_parsed_time = timezone.utcnow() - other_dag.sync_to_db(processor_subdir="other") + other_dag.sync_to_db(processor_subdir="/other") # Add DAG to the file_parsing_stats stat = DagFileStat( @@ -762,6 +763,68 @@ def test_scan_stale_dags_standalone_mode(self): active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() assert active_dag_count == 1 + def test_scan_stale_dags_when_dag_folder_change(self): + """ + Ensure dags from old dag_folder is marked as stale when dag processor + is running as part of scheduler. + """ + + def get_dag_string(filename) -> str: + return open(TEST_DAG_FOLDER / filename).read() + + with tempfile.TemporaryDirectory() as tmpdir: + old_dag_home = tempfile.mkdtemp(dir=tmpdir) + old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py") + old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode()) + old_dag_file.flush() + new_dag_home = tempfile.mkdtemp(dir=tmpdir) + new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py") + new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode()) + new_dag_file.flush() + + manager = DagProcessorJobRunner( + job=Job(), + processor=DagFileProcessorManager( + dag_directory=new_dag_home, + max_runs=1, + processor_timeout=timedelta(minutes=10), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ), + ) + + dagbag = DagBag(old_dag_file.name, read_dags_from_db=False) + other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False) + + with create_session() as session: + # Add DAG from old dah home to the DB + dag = dagbag.get_dag("test_example_bash_operator") + dag.fileloc = old_dag_file.name + dag.last_parsed_time = timezone.utcnow() + dag.sync_to_db(processor_subdir=old_dag_home) + + # Add DAG from new DAG home to the DB + other_dag = other_dagbag.get_dag("test_start_date_scheduling") + other_dag.fileloc = new_dag_file.name + other_dag.last_parsed_time = timezone.utcnow() + other_dag.sync_to_db(processor_subdir=new_dag_home) + + manager.processor._file_paths = [new_dag_file] + + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 2 + + manager.processor._scan_stale_dags() + + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 1 + @mock.patch( "airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2e96728d5ecae1..38bde8bf265657 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker): dag_id="test_retry_still_in_executor", schedule="@once", session=session, + fileloc=os.devnull + "/test_retry_still_in_executor.py", ): dag_task1 = BashOperator( task_id="test_retry_handling_op",