From f9bf3c2afbbf44867d2690c1faf27eeb15c345f6 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 12 Nov 2024 18:42:48 +0800 Subject: [PATCH] feat(dataset): raise deprecation warning when accessing inlet or outlet events through str this behavior will be removed in airflow 3 as assets have attributes name and uri, it would be confusing to identify which attribute should be used to filter the right asset --- airflow/example_dags/example_dataset_alias.py | 2 +- .../example_dataset_alias_with_no_taskflow.py | 2 +- .../example_dags/example_inlet_event_extra.py | 2 +- airflow/utils/context.py | 27 +++++++++++++++++++ .../authoring-and-scheduling/datasets.rst | 14 +++++----- tests/models/test_taskinstance.py | 18 ++++++------- 6 files changed, 47 insertions(+), 18 deletions(-) diff --git a/airflow/example_dags/example_dataset_alias.py b/airflow/example_dags/example_dataset_alias.py index c50a89e34fb8c..4bfc6f51a7351 100644 --- a/airflow/example_dags/example_dataset_alias.py +++ b/airflow/example_dags/example_dataset_alias.py @@ -67,7 +67,7 @@ def produce_dataset_events(): def produce_dataset_events_through_dataset_alias(*, outlet_events=None): bucket_name = "bucket" object_path = "my-task" - outlet_events["example-alias"].add(Dataset(f"s3://{bucket_name}/{object_path}")) + outlet_events[DatasetAlias("example-alias")].add(Dataset(f"s3://{bucket_name}/{object_path}")) produce_dataset_events_through_dataset_alias() diff --git a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py index 7d7227af39f50..699ef1591372f 100644 --- a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py +++ b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py @@ -68,7 +68,7 @@ def produce_dataset_events(): def produce_dataset_events_through_dataset_alias_with_no_taskflow(*, outlet_events=None): bucket_name = "bucket" object_path = "my-task" - outlet_events["example-alias-no-taskflow"].add(Dataset(f"s3://{bucket_name}/{object_path}")) + outlet_events[DatasetAlias("example-alias-no-taskflow")].add(Dataset(f"s3://{bucket_name}/{object_path}")) PythonOperator( task_id="produce_dataset_events_through_dataset_alias_with_no_taskflow", diff --git a/airflow/example_dags/example_inlet_event_extra.py b/airflow/example_dags/example_inlet_event_extra.py index 4b7567fc2f87e..b07faf2bdfe0b 100644 --- a/airflow/example_dags/example_inlet_event_extra.py +++ b/airflow/example_dags/example_inlet_event_extra.py @@ -57,5 +57,5 @@ def read_dataset_event(*, inlet_events=None): BashOperator( task_id="read_dataset_event_from_classic", inlets=[ds], - bash_command="echo '{{ inlet_events['s3://output/1.txt'][-1].extra | tojson }}'", + bash_command="echo '{{ inlet_events[Dataset('s3://output/1.txt')][-1].extra | tojson }}'", ) diff --git a/airflow/utils/context.py b/airflow/utils/context.py index a72885401f7b2..9dddcc3f16cd8 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -177,6 +177,14 @@ class OutletEventAccessor: def add(self, dataset: Dataset | str, extra: dict[str, Any] | None = None) -> None: """Add a DatasetEvent to an existing Dataset.""" if isinstance(dataset, str): + warnings.warn( + ( + "Emitting dataset events using string is deprecated and will be removed in Airflow 3. " + "Please use the Dataset object (renamed as Asset in Airflow 3) directly" + ), + DeprecationWarning, + stacklevel=2, + ) dataset_uri = dataset elif isinstance(dataset, Dataset): dataset_uri = dataset.uri @@ -216,6 +224,16 @@ def __len__(self) -> int: return len(self._dict) def __getitem__(self, key: str | Dataset | DatasetAlias) -> OutletEventAccessor: + if isinstance(key, str): + warnings.warn( + ( + "Accessing outlet_events using string is deprecated and will be removed in Airflow 3. " + "Please use the Dataset or DatasetAlias object (renamed as Asset and AssetAlias in Airflow 3) directly" + ), + DeprecationWarning, + stacklevel=2, + ) + event_key = extract_event_key(key) if event_key not in self._dict: self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key) @@ -282,6 +300,15 @@ def __getitem__(self, key: int | str | Dataset | DatasetAlias) -> LazyDatasetEve join_clause = DatasetEvent.source_aliases where_clause = DatasetAliasModel.name == dataset_alias.name elif isinstance(obj, (Dataset, str)): + if isinstance(obj, str): + warnings.warn( + ( + "Accessing inlet_events using string is deprecated and will be removed in Airflow 3. " + "Please use the Dataset object (renamed as Asset in Airflow 3) directly" + ), + DeprecationWarning, + stacklevel=2, + ) dataset = self._datasets[extract_event_key(obj)] join_clause = DatasetEvent.dataset where_clause = DatasetModel.uri == dataset.uri diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index a69c09bc13b0f..c5d117ab5a5a1 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -432,7 +432,7 @@ The following example creates a dataset event against the S3 URI ``f"s3://bucket @task(outlets=[DatasetAlias("my-task-outputs")]) def my_task_with_outlet_events(*, outlet_events): - outlet_events["my-task-outputs"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) + outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) **Emit a dataset event during task execution through yielding Metadata** @@ -462,11 +462,11 @@ Only one dataset event is emitted for an added dataset, even if it is added to t ] ) def my_task_with_outlet_events(*, outlet_events): - outlet_events["my-task-outputs-1"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) + outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) # This line won't emit an additional dataset event as the dataset and extra are the same as the previous line. - outlet_events["my-task-outputs-2"].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) + outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"}) # This line will emit an additional dataset event as the extra is different. - outlet_events["my-task-outputs-3"].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"}) + outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"}) Scheduling based on dataset aliases ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -487,7 +487,7 @@ The dataset alias is resolved to the datasets during DAG parsing. Thus, if the " @task(outlets=[DatasetAlias("example-alias")]) def produce_dataset_events(*, outlet_events): - outlet_events["example-alias"].add(Dataset("s3://bucket/my-task")) + outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task")) with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")): @@ -511,7 +511,9 @@ As mentioned in :ref:`Fetching information from previously emitted dataset event @task(outlets=[DatasetAlias("example-alias")]) def produce_dataset_events(*, outlet_events): - outlet_events["example-alias"].add(Dataset("s3://bucket/my-task"), extra={"row_count": 1}) + outlet_events[DatasetAlias("example-alias")].add( + Dataset("s3://bucket/my-task"), extra={"row_count": 1} + ) with DAG(dag_id="dataset-alias-consumer", schedule=None): diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 468dc2c9300ca..b5dcb43de262d 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2411,7 +2411,7 @@ def test_outlet_dataset_extra(self, dag_maker, session): @task(outlets=Dataset("test_outlet_dataset_extra_1")) def write1(*, outlet_events): - outlet_events["test_outlet_dataset_extra_1"].extra = {"foo": "bar"} + outlet_events[Dataset("test_outlet_dataset_extra_1")].extra = {"foo": "bar"} write1() @@ -2453,8 +2453,8 @@ def test_outlet_dataset_extra_ignore_different(self, dag_maker, session): @task(outlets=Dataset("test_outlet_dataset_extra")) def write(*, outlet_events): - outlet_events["test_outlet_dataset_extra"].extra = {"one": 1} - outlet_events["different_uri"].extra = {"foo": "bar"} # Will be silently dropped. + outlet_events[Dataset("test_outlet_dataset_extra")].extra = {"one": 1} + outlet_events[Dataset("different_uri")].extra = {"foo": "bar"} # Will be silently dropped. write() @@ -2722,22 +2722,22 @@ def test_inlet_dataset_extra(self, dag_maker, session): @task(outlets=Dataset("test_inlet_dataset_extra")) def write(*, ti, outlet_events): - outlet_events["test_inlet_dataset_extra"].extra = {"from": ti.task_id} + outlet_events[Dataset("test_inlet_dataset_extra")].extra = {"from": ti.task_id} @task(inlets=Dataset("test_inlet_dataset_extra")) def read(*, inlet_events): - second_event = inlet_events["test_inlet_dataset_extra"][1] + second_event = inlet_events[Dataset("test_inlet_dataset_extra")][1] assert second_event.uri == "test_inlet_dataset_extra" assert second_event.extra == {"from": "write2"} - last_event = inlet_events["test_inlet_dataset_extra"][-1] + last_event = inlet_events[Dataset("test_inlet_dataset_extra")][-1] assert last_event.uri == "test_inlet_dataset_extra" assert last_event.extra == {"from": "write3"} with pytest.raises(KeyError): - inlet_events["does_not_exist"] + inlet_events[Dataset("does_not_exist")] with pytest.raises(IndexError): - inlet_events["test_inlet_dataset_extra"][5] + inlet_events[Dataset("test_inlet_dataset_extra")][5] # TODO: Support slices. @@ -2798,7 +2798,7 @@ def read(*, inlet_events): assert last_event.extra == {"from": "write3"} with pytest.raises(KeyError): - inlet_events["does_not_exist"] + inlet_events[Dataset("does_not_exist")] with pytest.raises(KeyError): inlet_events[DatasetAlias("does_not_exist")] with pytest.raises(IndexError):