Skip to content

Commit

Permalink
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_in…
Browse files Browse the repository at this point in the history
…terval (apache#61)

CP of 5605d10
& apache#11462
  • Loading branch information
msumit authored Oct 15, 2020
1 parent 6162402 commit 1991419
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
21 changes: 11 additions & 10 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,7 @@ def _execute_helper(self):
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
execute_start_time = timezone.utcnow()
loop_start_time = time.time()

if self.using_sqlite:
Expand Down Expand Up @@ -1453,6 +1454,16 @@ def _execute_helper(self):
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
Expand All @@ -1470,16 +1481,6 @@ def _execute_helper(self):
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove()
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
dag.fileloc = filepath
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
croniter(dag._schedule_interval)
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
# under the License.
#

version = '1.10.4+twtr21'
version = '1.10.4+twtr22'

1 change: 1 addition & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def test_process_file_cron_validity_check(self):
for d in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files))
self.assertEqual(len(dagbag.dags), 0)

@patch.object(DagModel, 'get_current')
def test_get_dag_without_refresh(self, mock_dagmodel):
Expand Down

0 comments on commit 1991419

Please sign in to comment.