From 44dd173f8f0fa8758982f0aca138ad15dffd4860 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 5 Dec 2024 11:29:06 +0800 Subject: [PATCH] refactor: move AssetAliasEvent to utils/context --- airflow/utils/context.py | 14 +++++++++++++- airflow/utils/context.pyi | 7 ++++++- .../providers/common/compat/assets/__init__.py | 3 +-- .../src/airflow/sdk/definitions/asset/__init__.py | 9 --------- tests/serialization/test_serialized_objects.py | 3 +-- tests/utils/test_context.py | 3 +-- 6 files changed, 22 insertions(+), 17 deletions(-) diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 129a974fb3997..40003f45afe09 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -53,7 +53,6 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetRef, AssetUniqueKey, @@ -61,6 +60,7 @@ BaseAssetUniqueKey, ) from airflow.sdk.definitions.asset.metadata import extract_event_key +from airflow.typing_compat import TypedDict from airflow.utils.db import LazySelectSequence from airflow.utils.types import NOTSET @@ -163,6 +163,18 @@ def get(self, key: str, default_conn: Any = None) -> Any: return default_conn +class AssetAliasEvent(TypedDict): + """ + A represeation of asset event to be triggered by an asset alias. + + :meta private: + """ + + source_alias_name: str + dest_asset_key: AssetUniqueKey + extra: dict[str, Any] + + @attrs.define() class OutletEventAccessor: """ diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index c72f4b6764265..a3a12f825ec11 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -39,7 +39,7 @@ from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.param import ParamsDict from airflow.models.taskinstance import TaskInstance -from airflow.sdk.definitions.asset import Asset, AssetAliasEvent, BaseAsset, BaseAssetUniqueKey +from airflow.sdk.definitions.asset import Asset, AssetUniqueKey, BaseAsset, BaseAssetUniqueKey from airflow.serialization.pydantic.asset import AssetEventPydantic from airflow.serialization.pydantic.dag_run import DagRunPydantic from airflow.typing_compat import TypedDict @@ -57,6 +57,11 @@ class VariableAccessor: class ConnectionAccessor: def get(self, key: str, default_conn: Any = None) -> Any: ... +class AssetAliasEvent(TypedDict): + source_alias_name: str + dest_asset_key: AssetUniqueKey + extra: dict[str, Any] + class OutletEventAccessor: def __init__( self, diff --git a/providers/src/airflow/providers/common/compat/assets/__init__.py b/providers/src/airflow/providers/common/compat/assets/__init__.py index 049c4ac40b997..7b888f55d9eb1 100644 --- a/providers/src/airflow/providers/common/compat/assets/__init__.py +++ b/providers/src/airflow/providers/common/compat/assets/__init__.py @@ -32,10 +32,10 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAll, AssetAny, ) + from airflow.utils.context import AssetAliasEvent else: if AIRFLOW_V_3_0_PLUS: from airflow.auth.managers.models.resource_details import AssetDetails @@ -43,7 +43,6 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAll, AssetAny, ) diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py index 651006d251546..7f1bfec92b945 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -37,7 +37,6 @@ import attrs from airflow.serialization.dag_dependency import DagDependency -from airflow.typing_compat import TypedDict if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -446,14 +445,6 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe ) -class AssetAliasEvent(TypedDict): - """A represeation of asset event to be triggered by an asset alias.""" - - source_alias_name: str - dest_asset_key: AssetUniqueKey - extra: dict[str, Any] - - class _AssetBooleanCondition(BaseAsset): """Base class for asset boolean logic.""" diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index a90f90d6ebe80..1e3bd1a58f8a8 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -52,7 +52,6 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetUniqueKey, ) @@ -66,7 +65,7 @@ from airflow.serialization.serialized_objects import BaseSerialization from airflow.triggers.base import BaseTrigger from airflow.utils import timezone -from airflow.utils.context import OutletEventAccessor, OutletEventAccessors +from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors from airflow.utils.db import LazySelectSequence from airflow.utils.operator_resources import Resources from airflow.utils.state import DagRunState, State diff --git a/tests/utils/test_context.py b/tests/utils/test_context.py index 227c3b42607bd..c69c84321300f 100644 --- a/tests/utils/test_context.py +++ b/tests/utils/test_context.py @@ -24,11 +24,10 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetUniqueKey, ) -from airflow.utils.context import OutletEventAccessor, OutletEventAccessors +from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors class TestOutletEventAccessor: