diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 4cb11cff82928..f16a913725401 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -481,7 +481,11 @@ def start(self): @provide_session def _deactivate_stale_dags(self, session=None): - """Detects DAGs which are no longer present in files and deactivate them.""" + """ + Detects DAGs which are no longer present in files + + Deactivate them and remove them in the serialized_dag table + """ now = timezone.utcnow() elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds() if elapsed_time_since_refresh > self.deactivate_stale_dags_interval: @@ -514,6 +518,10 @@ def _deactivate_stale_dags(self, session=None): if deactivated: self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated) + for dag_id in to_deactivate: + SerializedDagModel.remove_dag(dag_id) + self.log.info("Deleted DAG %s in serialized_dag table", dag_id) + self.last_deactivate_stale_dags_time = timezone.utcnow() def _run_parsing_loop(self): diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 6a65116d51f70..ad613d877df0c 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -34,6 +34,7 @@ import pytest from freezegun import freeze_time +from sqlalchemy import func from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG @@ -476,6 +477,7 @@ def test_deactivate_stale_dags(self): dag = dagbag.get_dag('test_example_bash_operator') dag.last_parsed_time = timezone.utcnow() dag.sync_to_db() + SerializedDagModel.write_dag(dag) # Add DAG to the file_parsing_stats stat = DagFileStat( @@ -488,18 +490,36 @@ def test_deactivate_stale_dags(self): manager._file_paths = [test_dag_path] manager._file_stats[test_dag_path] = stat - active_dags = ( - session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all() + active_dag_count = ( + session.query(func.count(DagModel.dag_id)) + .filter(DagModel.is_active, DagModel.fileloc == test_dag_path) + .scalar() ) - assert len(active_dags) == 1 + assert active_dag_count == 1 + + serialized_dag_count = ( + session.query(func.count(SerializedDagModel.dag_id)) + .filter(SerializedDagModel.fileloc == test_dag_path) + .scalar() + ) + assert serialized_dag_count == 1 manager._file_stats[test_dag_path] = stat manager._deactivate_stale_dags() - active_dags = ( - session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all() + + active_dag_count = ( + session.query(func.count(DagModel.dag_id)) + .filter(DagModel.is_active, DagModel.fileloc == test_dag_path) + .scalar() ) + assert active_dag_count == 0 - assert len(active_dags) == 0 + serialized_dag_count = ( + session.query(func.count(SerializedDagModel.dag_id)) + .filter(SerializedDagModel.fileloc == test_dag_path) + .scalar() + ) + assert serialized_dag_count == 0 @mock.patch( "airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock