Skip to content

Commit

Permalink
fix(dag): fix missing key within stored_datasets and outlet_reference…
Browse files Browse the repository at this point in the history
…s, we should use sanitized uri instead
  • Loading branch information
Lee-W committed Sep 4, 2024
1 parent cb7697f commit 7f1c45e
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
from airflow import settings, utils
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf as airflow_conf, secrets_backend_list
from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll
from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll, _sanitize_uri
from airflow.datasets.manager import dataset_manager
from airflow.exceptions import (
AirflowException,
Expand Down Expand Up @@ -2798,7 +2798,7 @@ def bulk_write_to_db(
curr_outlet_references.remove(ref)

for d in dataset_outlets:
outlet_references[(task.dag_id, task.task_id)].add(d.uri)
outlet_references[(task.dag_id, task.task_id)].add(_sanitize_uri(d.uri))
outlet_datasets[DatasetModel.from_public(d)] = None

for d_a in dataset_alias_outlets:
Expand All @@ -2819,11 +2819,11 @@ def bulk_write_to_db(
# scheduler. But if we're here, then we have found that dataset again in our DAGs, which
# means that it is no longer an orphan, so set is_orphaned to False.
stored_dataset.is_orphaned = expression.false()
stored_datasets[stored_dataset.uri] = stored_dataset
stored_datasets[_sanitize_uri(stored_dataset.uri)] = stored_dataset
else:
new_datasets.append(dataset)
dataset_manager.create_datasets(dataset_models=new_datasets, session=session)
stored_datasets.update({dataset.uri: dataset for dataset in new_datasets})
stored_datasets.update({_sanitize_uri(dataset.uri): dataset for dataset in new_datasets})

del new_datasets
del all_datasets
Expand Down

0 comments on commit 7f1c45e

Please sign in to comment.