diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 45101ec25bce5..2f0105ca8cb2b 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -53,7 +53,6 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetRef, AssetUniqueKey, @@ -61,6 +60,7 @@ BaseAssetUniqueKey, ) from airflow.sdk.definitions.asset.metadata import extract_event_key +from airflow.typing_compat import TypedDict from airflow.utils.db import LazySelectSequence from airflow.utils.types import NOTSET @@ -163,6 +163,18 @@ def get(self, key: str, default_conn: Any = None) -> Any: return default_conn +class AssetAliasEvent(TypedDict): + """ + A represeation of asset event to be triggered by an asset alias. + + :meta private: + """ + + source_alias_name: str + dest_asset_key: AssetUniqueKey + extra: dict[str, Any] + + @attrs.define() class OutletEventAccessor: """ diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index c72f4b6764265..a3a12f825ec11 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -39,7 +39,7 @@ from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.param import ParamsDict from airflow.models.taskinstance import TaskInstance -from airflow.sdk.definitions.asset import Asset, AssetAliasEvent, BaseAsset, BaseAssetUniqueKey +from airflow.sdk.definitions.asset import Asset, AssetUniqueKey, BaseAsset, BaseAssetUniqueKey from airflow.serialization.pydantic.asset import AssetEventPydantic from airflow.serialization.pydantic.dag_run import DagRunPydantic from airflow.typing_compat import TypedDict @@ -57,6 +57,11 @@ class VariableAccessor: class ConnectionAccessor: def get(self, key: str, default_conn: Any = None) -> Any: ... +class AssetAliasEvent(TypedDict): + source_alias_name: str + dest_asset_key: AssetUniqueKey + extra: dict[str, Any] + class OutletEventAccessor: def __init__( self, diff --git a/providers/src/airflow/providers/common/compat/assets/__init__.py b/providers/src/airflow/providers/common/compat/assets/__init__.py index 66178cf0c68db..1d2356f44e083 100644 --- a/providers/src/airflow/providers/common/compat/assets/__init__.py +++ b/providers/src/airflow/providers/common/compat/assets/__init__.py @@ -31,18 +31,17 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAll, AssetAny, expand_alias_to_assets, ) + from airflow.utils.context import AssetAliasEvent else: if AIRFLOW_V_3_0_PLUS: from airflow.auth.managers.models.resource_details import AssetDetails from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAll, AssetAny, expand_alias_to_assets, diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py index 1423100678abf..dc005986fe58c 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -37,7 +37,6 @@ from sqlalchemy import select from airflow.serialization.dag_dependency import DagDependency -from airflow.typing_compat import TypedDict from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: @@ -449,14 +448,6 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe ) -class AssetAliasEvent(TypedDict): - """A represeation of asset event to be triggered by an asset alias.""" - - source_alias_name: str - dest_asset_key: AssetUniqueKey - extra: dict[str, Any] - - class _AssetBooleanCondition(BaseAsset): """Base class for asset boolean logic.""" diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index a90f90d6ebe80..1e3bd1a58f8a8 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -52,7 +52,6 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetUniqueKey, ) @@ -66,7 +65,7 @@ from airflow.serialization.serialized_objects import BaseSerialization from airflow.triggers.base import BaseTrigger from airflow.utils import timezone -from airflow.utils.context import OutletEventAccessor, OutletEventAccessors +from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors from airflow.utils.db import LazySelectSequence from airflow.utils.operator_resources import Resources from airflow.utils.state import DagRunState, State diff --git a/tests/utils/test_context.py b/tests/utils/test_context.py index 227c3b42607bd..c69c84321300f 100644 --- a/tests/utils/test_context.py +++ b/tests/utils/test_context.py @@ -24,11 +24,10 @@ from airflow.sdk.definitions.asset import ( Asset, AssetAlias, - AssetAliasEvent, AssetAliasUniqueKey, AssetUniqueKey, ) -from airflow.utils.context import OutletEventAccessor, OutletEventAccessors +from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors class TestOutletEventAccessor: