Skip to content

Commit

Permalink
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
Browse files Browse the repository at this point in the history
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (twitter-forks#61)

CP of 5605d10
& apache#11462
  • Loading branch information
msumit authored and Ayush Sethi committed Dec 21, 2020
1 parent 565b6e2 commit 4ea1c3c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
21 changes: 11 additions & 10 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,7 @@ def _execute_helper(self):
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
execute_start_time = timezone.utcnow()
loop_start_time = time.time()

if self.using_sqlite:
Expand Down Expand Up @@ -1525,6 +1526,16 @@ def _execute_helper(self):
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
Expand All @@ -1542,16 +1553,6 @@ def _execute_helper(self):
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove()
Expand Down
1 change: 1 addition & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def test_process_file_cron_validity_check(self):
for d in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files))
self.assertEqual(len(dagbag.dags), 0)

@patch.object(DagModel, 'get_current')
def test_get_dag_without_refresh(self, mock_dagmodel):
Expand Down

0 comments on commit 4ea1c3c

Please sign in to comment.