diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 0b3113fe7ae91..2dc36e47d5e52 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1370,6 +1370,7 @@ def _execute_helper(self): while (timezone.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop...") + execute_start_time = timezone.utcnow() loop_start_time = time.time() if self.using_sqlite: @@ -1453,6 +1454,16 @@ def _execute_helper(self): self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) time.sleep(self._processor_poll_interval) + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + if self.processor_agent.all_files_processed: + self.log.info( + "Deactivating DAGs that haven't been touched since %s", + execute_start_time.isoformat() + ) + models.DAG.deactivate_stale_dags(execute_start_time) + if self.processor_agent.done: self.log.info("Exiting scheduler loop as all files" " have been processed {} times".format(self.num_runs)) @@ -1470,16 +1481,6 @@ def _execute_helper(self): self.processor_agent.terminate() self.log.info("All DAG processors terminated") - # Verify that all files were processed, and if so, deactivate DAGs that - # haven't been touched by the scheduler as they likely have been - # deleted. - if self.processor_agent.all_files_processed: - self.log.info( - "Deactivating DAGs that haven't been touched since %s", - execute_start_time.isoformat() - ) - models.DAG.deactivate_stale_dags(execute_start_time) - self.executor.end() settings.Session.remove() diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 786497ec36c74..1b97a1c112482 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -253,9 +253,9 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.fileloc = filepath try: dag.is_subdag = False - self.bag_dag(dag, parent_dag=dag, root_dag=dag) if isinstance(dag._schedule_interval, six.string_types): croniter(dag._schedule_interval) + self.bag_dag(dag, parent_dag=dag, root_dag=dag) found_dags.append(dag) found_dags += dag.subdags except (CroniterBadCronError, diff --git a/airflow/version.py b/airflow/version.py index 39582d1ce56dc..3369a180109fb 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr21' +version = '1.10.4+twtr22' diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index c0d8b574136e1..be45a39cc6202 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -161,6 +161,7 @@ def test_process_file_cron_validity_check(self): for d in invalid_dag_files: dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d)) self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files)) + self.assertEqual(len(dagbag.dags), 0) @patch.object(DagModel, 'get_current') def test_get_dag_without_refresh(self, mock_dagmodel):