Skip to content

Commit

Permalink
refactor: move AssetAliasEvent to utils/context
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Dec 5, 2024
1 parent b06bc3d commit 178ecf4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 17 deletions.
14 changes: 13 additions & 1 deletion airflow/utils/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAliasEvent,
AssetAliasUniqueKey,
AssetRef,
AssetUniqueKey,
BaseAsset,
BaseAssetUniqueKey,
)
from airflow.sdk.definitions.asset.metadata import extract_event_key
from airflow.typing_compat import TypedDict
from airflow.utils.db import LazySelectSequence
from airflow.utils.types import NOTSET

Expand Down Expand Up @@ -163,6 +163,18 @@ def get(self, key: str, default_conn: Any = None) -> Any:
return default_conn


class AssetAliasEvent(TypedDict):
"""
A represeation of asset event to be triggered by an asset alias.
:meta private:
"""

source_alias_name: str
dest_asset_key: AssetUniqueKey
extra: dict[str, Any]


@attrs.define()
class OutletEventAccessor:
"""
Expand Down
7 changes: 6 additions & 1 deletion airflow/utils/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.asset import Asset, AssetAliasEvent, BaseAsset, BaseAssetUniqueKey
from airflow.sdk.definitions.asset import Asset, AssetUniqueKey, BaseAsset, BaseAssetUniqueKey
from airflow.serialization.pydantic.asset import AssetEventPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.typing_compat import TypedDict
Expand All @@ -57,6 +57,11 @@ class VariableAccessor:
class ConnectionAccessor:
def get(self, key: str, default_conn: Any = None) -> Any: ...

class AssetAliasEvent(TypedDict):
source_alias_name: str
dest_asset_key: AssetUniqueKey
extra: dict[str, Any]

class OutletEventAccessor:
def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,17 @@
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAliasEvent,
AssetAll,
AssetAny,
expand_alias_to_assets,
)
from airflow.utils.context import AssetAliasEvent
else:
if AIRFLOW_V_3_0_PLUS:
from airflow.auth.managers.models.resource_details import AssetDetails
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAliasEvent,
AssetAll,
AssetAny,
expand_alias_to_assets,
Expand Down
9 changes: 0 additions & 9 deletions task_sdk/src/airflow/sdk/definitions/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from sqlalchemy import select

from airflow.serialization.dag_dependency import DagDependency
from airflow.typing_compat import TypedDict
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand Down Expand Up @@ -449,14 +448,6 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe
)


class AssetAliasEvent(TypedDict):
"""A represeation of asset event to be triggered by an asset alias."""

source_alias_name: str
dest_asset_key: AssetUniqueKey
extra: dict[str, Any]


class _AssetBooleanCondition(BaseAsset):
"""Base class for asset boolean logic."""

Expand Down
3 changes: 1 addition & 2 deletions tests/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAliasEvent,
AssetAliasUniqueKey,
AssetUniqueKey,
)
Expand All @@ -66,7 +65,7 @@
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
from airflow.utils.context import OutletEventAccessor, OutletEventAccessors
from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors
from airflow.utils.db import LazySelectSequence
from airflow.utils.operator_resources import Resources
from airflow.utils.state import DagRunState, State
Expand Down
3 changes: 1 addition & 2 deletions tests/utils/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAliasEvent,
AssetAliasUniqueKey,
AssetUniqueKey,
)
from airflow.utils.context import OutletEventAccessor, OutletEventAccessors
from airflow.utils.context import AssetAliasEvent, OutletEventAccessor, OutletEventAccessors


class TestOutletEventAccessor:
Expand Down

0 comments on commit 178ecf4

Please sign in to comment.