diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 88be05d6f8cd5..72a7685626900 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -175,8 +175,12 @@ def get_dag(self, dag_id): ) )) or enforce_from_file: # Reprocess source file + + # TODO: remove the below hack to find relative dag location in webserver + filepath = dag.fileloc if dag else orm_dag.get_local_fileloc() + found_dags = self.process_file( - filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False) + filepath=correct_maybe_zipped(filepath), only_if_updated=False) # If the source file no longer exports `dag_id`, delete it from self.dags if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]: diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index b759a71d58f23..b52ddd591b27f 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -129,12 +129,12 @@ def poke(self, context, session=None): raise AirflowException('The external DAG ' '{} does not exist.'.format(self.external_dag_id)) else: - if not os.path.exists(dag_to_wait.fileloc): + if not os.path.exists(dag_to_wait.get_local_fileloc()): raise AirflowException('The external DAG ' '{} was deleted.'.format(self.external_dag_id)) if self.external_task_id: - refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) + refreshed_dag_info = DagBag(dag_to_wait.get_local_fileloc()).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task(self.external_task_id): raise AirflowException('The external task' '{} in DAG {} does not exist.'.format(self.external_task_id, diff --git a/airflow/version.py b/airflow/version.py index 164b3be6c40db..2a90b478ce33b 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -20,3 +20,4 @@ version = '1.10.14+test1' +