From 199141930a626323c969a7dce245664c2a98a02e Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Thu, 15 Oct 2020 15:47:18 +0530 Subject: [PATCH] [CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (#61) CP of 5605d1063be9532a2f9861283998c39bd06e4ce8 & https://github.com/apache/airflow/pull/11462 --- airflow/jobs/scheduler_job.py | 21 +++++++++++---------- airflow/models/dagbag.py | 2 +- airflow/version.py | 2 +- tests/models/test_dagbag.py | 1 + 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 0b3113fe7ae91..2dc36e47d5e52 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1370,6 +1370,7 @@ def _execute_helper(self): while (timezone.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop...") + execute_start_time = timezone.utcnow() loop_start_time = time.time() if self.using_sqlite: @@ -1453,6 +1454,16 @@ def _execute_helper(self): self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) time.sleep(self._processor_poll_interval) + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + if self.processor_agent.all_files_processed: + self.log.info( + "Deactivating DAGs that haven't been touched since %s", + execute_start_time.isoformat() + ) + models.DAG.deactivate_stale_dags(execute_start_time) + if self.processor_agent.done: self.log.info("Exiting scheduler loop as all files" " have been processed {} times".format(self.num_runs)) @@ -1470,16 +1481,6 @@ def _execute_helper(self): self.processor_agent.terminate() self.log.info("All DAG processors terminated") - # Verify that all files were processed, and if so, deactivate DAGs that - # haven't been touched by the scheduler as they likely have been - # deleted. - if self.processor_agent.all_files_processed: - self.log.info( - "Deactivating DAGs that haven't been touched since %s", - execute_start_time.isoformat() - ) - models.DAG.deactivate_stale_dags(execute_start_time) - self.executor.end() settings.Session.remove() diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 786497ec36c74..1b97a1c112482 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -253,9 +253,9 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.fileloc = filepath try: dag.is_subdag = False - self.bag_dag(dag, parent_dag=dag, root_dag=dag) if isinstance(dag._schedule_interval, six.string_types): croniter(dag._schedule_interval) + self.bag_dag(dag, parent_dag=dag, root_dag=dag) found_dags.append(dag) found_dags += dag.subdags except (CroniterBadCronError, diff --git a/airflow/version.py b/airflow/version.py index 39582d1ce56dc..3369a180109fb 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr21' +version = '1.10.4+twtr22' diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index c0d8b574136e1..be45a39cc6202 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -161,6 +161,7 @@ def test_process_file_cron_validity_check(self): for d in invalid_dag_files: dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d)) self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files)) + self.assertEqual(len(dagbag.dags), 0) @patch.object(DagModel, 'get_current') def test_get_dag_without_refresh(self, mock_dagmodel):