Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: DAGs are not marked as stale if the dags folder change (#41433) #41829

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,20 @@ def deactivate_stale_dags(
dags_parsed = session.execute(query)

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
if (
dag_removed_from_dag_folder_or_file = (
dag.fileloc in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
):
)

if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file:
cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

Expand Down
71 changes: 67 additions & 4 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import socket
import sys
import tempfile
import textwrap
import threading
import time
Expand Down Expand Up @@ -638,7 +639,7 @@ def test_scan_stale_dags(self):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory="directory",
dag_directory=str(TEST_DAG_FOLDER),
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand Down Expand Up @@ -712,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
dag_directory = "directory"
dag_directory = str(TEST_DAG_FOLDER)
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=dag_directory,
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand All @@ -740,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self):
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir="other")
other_dag.sync_to_db(processor_subdir="/other")

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -762,6 +763,68 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1

def test_scan_stale_dags_when_dag_folder_change(self):
"""
Ensure dags from old dag_folder is marked as stale when dag processor
is running as part of scheduler.
"""

def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
Expand Down
1 change: 1 addition & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker):
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
Expand Down