Skip to content

Commit

Permalink
remove stale serialized dags (#22917)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingzh authored May 12, 2022
1 parent c74e67a commit 749e53d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
10 changes: 9 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,11 @@ def start(self):

@provide_session
def _deactivate_stale_dags(self, session=None):
"""Detects DAGs which are no longer present in files and deactivate them."""
"""
Detects DAGs which are no longer present in files
Deactivate them and remove them in the serialized_dag table
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
Expand Down Expand Up @@ -514,6 +518,10 @@ def _deactivate_stale_dags(self, session=None):
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated)

for dag_id in to_deactivate:
SerializedDagModel.remove_dag(dag_id)
self.log.info("Deleted DAG %s in serialized_dag table", dag_id)

self.last_deactivate_stale_dags_time = timezone.utcnow()

def _run_parsing_loop(self):
Expand Down
32 changes: 26 additions & 6 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import pytest
from freezegun import freeze_time
from sqlalchemy import func

from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
Expand Down Expand Up @@ -476,6 +477,7 @@ def test_deactivate_stale_dags(self):
dag = dagbag.get_dag('test_example_bash_operator')
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
SerializedDagModel.write_dag(dag)

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -488,18 +490,36 @@ def test_deactivate_stale_dags(self):
manager._file_paths = [test_dag_path]
manager._file_stats[test_dag_path] = stat

active_dags = (
session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert len(active_dags) == 1
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

manager._file_stats[test_dag_path] = stat
manager._deactivate_stale_dags()
active_dags = (
session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 0

assert len(active_dags) == 0
serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 0

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
Expand Down

0 comments on commit 749e53d

Please sign in to comment.