Skip to content

Commit

Permalink
feat(dataset): raise deprecation warning when accessing inlet or outl…
Browse files Browse the repository at this point in the history
…et events through str

this behavior will be removed in airflow 3 as assets have attributes name and uri,
it would be confusing to identify which attribute should be used to filter the right asset
  • Loading branch information
Lee-W committed Nov 12, 2024
1 parent 376773a commit f9bf3c2
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 18 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dataset_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def produce_dataset_events():
def produce_dataset_events_through_dataset_alias(*, outlet_events=None):
bucket_name = "bucket"
object_path = "my-task"
outlet_events["example-alias"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
outlet_events[DatasetAlias("example-alias")].add(Dataset(f"s3://{bucket_name}/{object_path}"))

produce_dataset_events_through_dataset_alias()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def produce_dataset_events():
def produce_dataset_events_through_dataset_alias_with_no_taskflow(*, outlet_events=None):
bucket_name = "bucket"
object_path = "my-task"
outlet_events["example-alias-no-taskflow"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
outlet_events[DatasetAlias("example-alias-no-taskflow")].add(Dataset(f"s3://{bucket_name}/{object_path}"))

PythonOperator(
task_id="produce_dataset_events_through_dataset_alias_with_no_taskflow",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_inlet_event_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ def read_dataset_event(*, inlet_events=None):
BashOperator(
task_id="read_dataset_event_from_classic",
inlets=[ds],
bash_command="echo '{{ inlet_events['s3://output/1.txt'][-1].extra | tojson }}'",
bash_command="echo '{{ inlet_events[Dataset('s3://output/1.txt')][-1].extra | tojson }}'",
)
27 changes: 27 additions & 0 deletions airflow/utils/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ class OutletEventAccessor:
def add(self, dataset: Dataset | str, extra: dict[str, Any] | None = None) -> None:
"""Add a DatasetEvent to an existing Dataset."""
if isinstance(dataset, str):
warnings.warn(
(
"Emitting dataset events using string is deprecated and will be removed in Airflow 3. "
"Please use the Dataset object (renamed as Asset in Airflow 3) directly"
),
DeprecationWarning,
stacklevel=2,
)
dataset_uri = dataset
elif isinstance(dataset, Dataset):
dataset_uri = dataset.uri
Expand Down Expand Up @@ -216,6 +224,16 @@ def __len__(self) -> int:
return len(self._dict)

def __getitem__(self, key: str | Dataset | DatasetAlias) -> OutletEventAccessor:
if isinstance(key, str):
warnings.warn(
(
"Accessing outlet_events using string is deprecated and will be removed in Airflow 3. "
"Please use the Dataset or DatasetAlias object (renamed as Asset and AssetAlias in Airflow 3) directly"
),
DeprecationWarning,
stacklevel=2,
)

event_key = extract_event_key(key)
if event_key not in self._dict:
self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key)
Expand Down Expand Up @@ -282,6 +300,15 @@ def __getitem__(self, key: int | str | Dataset | DatasetAlias) -> LazyDatasetEve
join_clause = DatasetEvent.source_aliases
where_clause = DatasetAliasModel.name == dataset_alias.name
elif isinstance(obj, (Dataset, str)):
if isinstance(obj, str):
warnings.warn(
(
"Accessing inlet_events using string is deprecated and will be removed in Airflow 3. "
"Please use the Dataset object (renamed as Asset in Airflow 3) directly"
),
DeprecationWarning,
stacklevel=2,
)
dataset = self._datasets[extract_event_key(obj)]
join_clause = DatasetEvent.dataset
where_clause = DatasetModel.uri == dataset.uri
Expand Down
14 changes: 8 additions & 6 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ The following example creates a dataset event against the S3 URI ``f"s3://bucket
@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
outlet_events["my-task-outputs"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
**Emit a dataset event during task execution through yielding Metadata**
Expand Down Expand Up @@ -462,11 +462,11 @@ Only one dataset event is emitted for an added dataset, even if it is added to t
]
)
def my_task_with_outlet_events(*, outlet_events):
outlet_events["my-task-outputs-1"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
# This line won't emit an additional dataset event as the dataset and extra are the same as the previous line.
outlet_events["my-task-outputs-2"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
# This line will emit an additional dataset event as the extra is different.
outlet_events["my-task-outputs-3"].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"})
outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"})
Scheduling based on dataset aliases
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -487,7 +487,7 @@ The dataset alias is resolved to the datasets during DAG parsing. Thus, if the "
@task(outlets=[DatasetAlias("example-alias")])
def produce_dataset_events(*, outlet_events):
outlet_events["example-alias"].add(Dataset("s3://bucket/my-task"))
outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))
with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")):
Expand All @@ -511,7 +511,9 @@ As mentioned in :ref:`Fetching information from previously emitted dataset event
@task(outlets=[DatasetAlias("example-alias")])
def produce_dataset_events(*, outlet_events):
outlet_events["example-alias"].add(Dataset("s3://bucket/my-task"), extra={"row_count": 1})
outlet_events[DatasetAlias("example-alias")].add(
Dataset("s3://bucket/my-task"), extra={"row_count": 1}
)
with DAG(dag_id="dataset-alias-consumer", schedule=None):
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,7 @@ def test_outlet_dataset_extra(self, dag_maker, session):

@task(outlets=Dataset("test_outlet_dataset_extra_1"))
def write1(*, outlet_events):
outlet_events["test_outlet_dataset_extra_1"].extra = {"foo": "bar"}
outlet_events[Dataset("test_outlet_dataset_extra_1")].extra = {"foo": "bar"}

write1()

Expand Down Expand Up @@ -2453,8 +2453,8 @@ def test_outlet_dataset_extra_ignore_different(self, dag_maker, session):

@task(outlets=Dataset("test_outlet_dataset_extra"))
def write(*, outlet_events):
outlet_events["test_outlet_dataset_extra"].extra = {"one": 1}
outlet_events["different_uri"].extra = {"foo": "bar"} # Will be silently dropped.
outlet_events[Dataset("test_outlet_dataset_extra")].extra = {"one": 1}
outlet_events[Dataset("different_uri")].extra = {"foo": "bar"} # Will be silently dropped.

write()

Expand Down Expand Up @@ -2722,22 +2722,22 @@ def test_inlet_dataset_extra(self, dag_maker, session):

@task(outlets=Dataset("test_inlet_dataset_extra"))
def write(*, ti, outlet_events):
outlet_events["test_inlet_dataset_extra"].extra = {"from": ti.task_id}
outlet_events[Dataset("test_inlet_dataset_extra")].extra = {"from": ti.task_id}

@task(inlets=Dataset("test_inlet_dataset_extra"))
def read(*, inlet_events):
second_event = inlet_events["test_inlet_dataset_extra"][1]
second_event = inlet_events[Dataset("test_inlet_dataset_extra")][1]
assert second_event.uri == "test_inlet_dataset_extra"
assert second_event.extra == {"from": "write2"}

last_event = inlet_events["test_inlet_dataset_extra"][-1]
last_event = inlet_events[Dataset("test_inlet_dataset_extra")][-1]
assert last_event.uri == "test_inlet_dataset_extra"
assert last_event.extra == {"from": "write3"}

with pytest.raises(KeyError):
inlet_events["does_not_exist"]
inlet_events[Dataset("does_not_exist")]
with pytest.raises(IndexError):
inlet_events["test_inlet_dataset_extra"][5]
inlet_events[Dataset("test_inlet_dataset_extra")][5]

# TODO: Support slices.

Expand Down Expand Up @@ -2798,7 +2798,7 @@ def read(*, inlet_events):
assert last_event.extra == {"from": "write3"}

with pytest.raises(KeyError):
inlet_events["does_not_exist"]
inlet_events[Dataset("does_not_exist")]
with pytest.raises(KeyError):
inlet_events[DatasetAlias("does_not_exist")]
with pytest.raises(IndexError):
Expand Down

0 comments on commit f9bf3c2

Please sign in to comment.