Skip to content

Commit

Permalink
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 5875a15 from 1.10.4
Browse files Browse the repository at this point in the history
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (twitter-forks#39)

Co-authored-by: Vishesh Jain <[email protected]>
Co-authored-by: aoen <[email protected]>
  • Loading branch information
3 people authored and Ayush Sethi committed Dec 20, 2020
1 parent 234156f commit 6ed530d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
6 changes: 5 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ def get_dag(self, dag_id):
)
)) or enforce_from_file:
# Reprocess source file

# TODO: remove the below hack to find relative dag location in webserver
filepath = dag.fileloc if dag else orm_dag.get_local_fileloc()

found_dags = self.process_file(
filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False)
filepath=correct_maybe_zipped(filepath), only_if_updated=False)

# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
Expand Down
4 changes: 2 additions & 2 deletions airflow/sensors/external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ def poke(self, context, session=None):
raise AirflowException('The external DAG '
'{} does not exist.'.format(self.external_dag_id))
else:
if not os.path.exists(dag_to_wait.fileloc):
if not os.path.exists(dag_to_wait.get_local_fileloc()):
raise AirflowException('The external DAG '
'{} was deleted.'.format(self.external_dag_id))

if self.external_task_id:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
refreshed_dag_info = DagBag(dag_to_wait.get_local_fileloc()).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
'{} in DAG {} does not exist.'.format(self.external_task_id,
Expand Down
1 change: 1 addition & 0 deletions airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@

version = '1.10.14+test1'


0 comments on commit 6ed530d

Please sign in to comment.