diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index fa1e71a5eda3b..cd6aa6c9a29c2 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -937,6 +937,7 @@ def string_lower_type(val): ARG_ASSET_NAME = Arg(("--name",), default="", help="Asset name") ARG_ASSET_URI = Arg(("--uri",), default="", help="Asset URI") +ARG_ASSET_ALIAS = Arg(("--alias",), default=False, action="store_true", help="Show asset alias") ALTERNATIVE_CONN_SPECS_ARGS = [ ARG_CONN_TYPE, @@ -978,13 +979,13 @@ class GroupCommand(NamedTuple): name="list", help="List assets", func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_list"), - args=(ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS), + args=(ARG_ASSET_ALIAS, ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS), ), ActionCommand( name="details", help="Show asset details", func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_details"), - args=(ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE), + args=(ARG_ASSET_ALIAS, ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE), ), ActionCommand( name="materialize", diff --git a/airflow/cli/commands/remote_commands/asset_command.py b/airflow/cli/commands/remote_commands/asset_command.py index b83465dbb26cb..12d8516ff6230 100644 --- a/airflow/cli/commands/remote_commands/asset_command.py +++ b/airflow/cli/commands/remote_commands/asset_command.py @@ -23,10 +23,10 @@ from sqlalchemy import select from airflow.api.common.trigger_dag import trigger_dag -from airflow.api_fastapi.core_api.datamodels.assets import AssetResponse +from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasSchema, AssetResponse from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse from airflow.cli.simple_table import AirflowConsole -from airflow.models.asset import AssetModel, TaskOutletAssetReference +from airflow.models.asset import AssetAliasModel, AssetModel, TaskOutletAssetReference from airflow.utils import cli as cli_utils from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.types import DagRunTriggeredByType @@ -36,30 +36,51 @@ from sqlalchemy.orm import Session + from airflow.api_fastapi.core_api.base import BaseModel + log = logging.getLogger(__name__) +def _list_asset_aliases(args, *, session: Session) -> tuple[Any, type[BaseModel]]: + aliases = session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name)) + return aliases, AssetAliasSchema + + +def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]: + assets = session.scalars(select(AssetModel).order_by(AssetModel.name)) + return assets, AssetResponse + + @cli_utils.action_cli @provide_session def asset_list(args, *, session: Session = NEW_SESSION) -> None: """Display assets in the command line.""" - assets = session.scalars(select(AssetModel).order_by(AssetModel.name)) + if args.alias: + data, model_cls = _list_asset_aliases(args, session=session) + else: + data, model_cls = _list_assets(args, session=session) - def detail_mapper(asset: AssetModel) -> dict[str, Any]: - model = AssetResponse.model_validate(asset) - return model.model_dump(include=args.columns) + def detail_mapper(asset: Any) -> dict[str, Any]: + model = model_cls.model_validate(asset) + return model.model_dump(mode="json", include=args.columns) - AirflowConsole().print_as( - data=assets, - output=args.output, - mapper=detail_mapper, - ) + AirflowConsole().print_as(data=data, output=args.output, mapper=detail_mapper) -@cli_utils.action_cli -@provide_session -def asset_details(args, *, session: Session = NEW_SESSION) -> None: - """Display details of an asset.""" +def _detail_asset_alias(args, *, session: Session) -> BaseModel: + if not args.name: + raise SystemExit("Required --name with --alias") + if args.uri: + raise SystemExit("Cannot use --uri with --alias") + + alias = session.scalar(select(AssetAliasModel).where(AssetAliasModel.name == args.name)) + if alias is None: + raise SystemExit(f"Asset alias with name {args.name} does not exist.") + + return AssetAliasSchema.model_validate(alias) + + +def _detail_asset(args, *, session: Session) -> BaseModel: if not args.name and not args.uri: raise SystemExit("Either --name or --uri is required") @@ -79,7 +100,19 @@ def asset_details(args, *, session: Session = NEW_SESSION) -> None: if next(asset_it, None) is not None: raise SystemExit(f"More than one asset exists with {select_message}.") - model_data = AssetResponse.model_validate(asset).model_dump() + return AssetResponse.model_validate(asset) + + +@cli_utils.action_cli +@provide_session +def asset_details(args, *, session: Session = NEW_SESSION) -> None: + """Display details of an asset.""" + if args.alias: + model = _detail_asset_alias(args, session=session) + else: + model = _detail_asset(args, session=session) + + model_data = model.model_dump(mode="json") if args.output in ["table", "plain"]: data = [{"property_name": key, "property_value": value} for key, value in model_data.items()] else: @@ -118,7 +151,7 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None: dagrun = trigger_dag(dag_id=dag_id, triggered_by=DagRunTriggeredByType.CLI, session=session) if dagrun is not None: - data = [DAGRunResponse.model_validate(dagrun).model_dump()] + data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")] else: data = [] diff --git a/tests/cli/commands/remote_commands/test_asset_command.py b/tests/cli/commands/remote_commands/test_asset_command.py index a321d9d54beaa..69906d1813a29 100644 --- a/tests/cli/commands/remote_commands/test_asset_command.py +++ b/tests/cli/commands/remote_commands/test_asset_command.py @@ -29,7 +29,6 @@ from airflow.cli.commands.remote_commands import asset_command from airflow.models.dagbag import DagBag -from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs if typing.TYPE_CHECKING: @@ -56,7 +55,6 @@ def parser() -> ArgumentParser: return cli_parser.get_parser() -@conf_vars({("core", "load_examples"): "true"}) def test_cli_assets_list(parser: ArgumentParser) -> None: args = parser.parse_args(["assets", "list", "--output=json"]) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: @@ -64,11 +62,19 @@ def test_cli_assets_list(parser: ArgumentParser) -> None: asset_list = json.loads(temp_stdout.getvalue()) assert len(asset_list) > 0 - assert "name" in asset_list[0] - assert "uri" in asset_list[0] - assert "group" in asset_list[0] - assert "extra" in asset_list[0] - assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in asset_list) + assert set(asset_list[0]) == {"name", "uri", "group", "extra"} + assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in asset_list), asset_list + + +def test_cli_assets_alias_list(parser: ArgumentParser) -> None: + args = parser.parse_args(["assets", "list", "--alias", "--output=json"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + asset_command.asset_list(args) + + alias_list = json.loads(temp_stdout.getvalue()) + assert len(alias_list) > 0 + assert set(alias_list[0]) == {"name", "group"} + assert any(alias["name"] == "example-alias" for alias in alias_list), alias_list def test_cli_assets_details(parser: ArgumentParser) -> None: @@ -97,6 +103,20 @@ def test_cli_assets_details(parser: ArgumentParser) -> None: } +def test_cli_assets_alias_details(parser: ArgumentParser) -> None: + args = parser.parse_args(["assets", "details", "--alias", "--name=example-alias", "--output=json"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + asset_command.asset_details(args) + + alias_detail_list = json.loads(temp_stdout.getvalue()) + assert len(alias_detail_list) == 1 + + # No good way to statically compare these. + undeterministic = {"id": None} + + assert alias_detail_list[0] | undeterministic == undeterministic | {"name": "example-alias", "group": ""} + + def test_cli_assets_materialize(parser: ArgumentParser) -> None: args = parser.parse_args(["assets", "materialize", "--name=asset1_producer", "--output=json"]) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: @@ -124,5 +144,5 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None: "run_type": "manual", "start_date": None, "state": "queued", - "triggered_by": "DagRunTriggeredByType.CLI", + "triggered_by": "cli", }