Skip to content

Commit

Permalink
Show alises with assets 'list' and 'details' CLI (#44595)
Browse files Browse the repository at this point in the history
I also took the chance to change the model dump format from "python" to
"json". This is more inline with the REST API, and also seems more CLI
friendly?
  • Loading branch information
uranusjr authored Dec 4, 2024
1 parent 0cfa562 commit a530ee3
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 27 deletions.
5 changes: 3 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ def string_lower_type(val):

ARG_ASSET_NAME = Arg(("--name",), default="", help="Asset name")
ARG_ASSET_URI = Arg(("--uri",), default="", help="Asset URI")
ARG_ASSET_ALIAS = Arg(("--alias",), default=False, action="store_true", help="Show asset alias")

ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
Expand Down Expand Up @@ -978,13 +979,13 @@ class GroupCommand(NamedTuple):
name="list",
help="List assets",
func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_list"),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS),
args=(ARG_ASSET_ALIAS, ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS),
),
ActionCommand(
name="details",
help="Show asset details",
func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_details"),
args=(ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE),
args=(ARG_ASSET_ALIAS, ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="materialize",
Expand Down
67 changes: 50 additions & 17 deletions airflow/cli/commands/remote_commands/asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from sqlalchemy import select

from airflow.api.common.trigger_dag import trigger_dag
from airflow.api_fastapi.core_api.datamodels.assets import AssetResponse
from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasSchema, AssetResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.models.asset import AssetModel, TaskOutletAssetReference
from airflow.models.asset import AssetAliasModel, AssetModel, TaskOutletAssetReference
from airflow.utils import cli as cli_utils
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import DagRunTriggeredByType
Expand All @@ -36,30 +36,51 @@

from sqlalchemy.orm import Session

from airflow.api_fastapi.core_api.base import BaseModel

log = logging.getLogger(__name__)


def _list_asset_aliases(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
aliases = session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name))
return aliases, AssetAliasSchema


def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
assets = session.scalars(select(AssetModel).order_by(AssetModel.name))
return assets, AssetResponse


@cli_utils.action_cli
@provide_session
def asset_list(args, *, session: Session = NEW_SESSION) -> None:
"""Display assets in the command line."""
assets = session.scalars(select(AssetModel).order_by(AssetModel.name))
if args.alias:
data, model_cls = _list_asset_aliases(args, session=session)
else:
data, model_cls = _list_assets(args, session=session)

def detail_mapper(asset: AssetModel) -> dict[str, Any]:
model = AssetResponse.model_validate(asset)
return model.model_dump(include=args.columns)
def detail_mapper(asset: Any) -> dict[str, Any]:
model = model_cls.model_validate(asset)
return model.model_dump(mode="json", include=args.columns)

AirflowConsole().print_as(
data=assets,
output=args.output,
mapper=detail_mapper,
)
AirflowConsole().print_as(data=data, output=args.output, mapper=detail_mapper)


@cli_utils.action_cli
@provide_session
def asset_details(args, *, session: Session = NEW_SESSION) -> None:
"""Display details of an asset."""
def _detail_asset_alias(args, *, session: Session) -> BaseModel:
if not args.name:
raise SystemExit("Required --name with --alias")
if args.uri:
raise SystemExit("Cannot use --uri with --alias")

alias = session.scalar(select(AssetAliasModel).where(AssetAliasModel.name == args.name))
if alias is None:
raise SystemExit(f"Asset alias with name {args.name} does not exist.")

return AssetAliasSchema.model_validate(alias)


def _detail_asset(args, *, session: Session) -> BaseModel:
if not args.name and not args.uri:
raise SystemExit("Either --name or --uri is required")

Expand All @@ -79,7 +100,19 @@ def asset_details(args, *, session: Session = NEW_SESSION) -> None:
if next(asset_it, None) is not None:
raise SystemExit(f"More than one asset exists with {select_message}.")

model_data = AssetResponse.model_validate(asset).model_dump()
return AssetResponse.model_validate(asset)


@cli_utils.action_cli
@provide_session
def asset_details(args, *, session: Session = NEW_SESSION) -> None:
"""Display details of an asset."""
if args.alias:
model = _detail_asset_alias(args, session=session)
else:
model = _detail_asset(args, session=session)

model_data = model.model_dump(mode="json")
if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in model_data.items()]
else:
Expand Down Expand Up @@ -118,7 +151,7 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None:

dagrun = trigger_dag(dag_id=dag_id, triggered_by=DagRunTriggeredByType.CLI, session=session)
if dagrun is not None:
data = [DAGRunResponse.model_validate(dagrun).model_dump()]
data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")]
else:
data = []

Expand Down
36 changes: 28 additions & 8 deletions tests/cli/commands/remote_commands/test_asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from airflow.cli.commands.remote_commands import asset_command
from airflow.models.dagbag import DagBag

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, clear_db_runs

if typing.TYPE_CHECKING:
Expand All @@ -56,19 +55,26 @@ def parser() -> ArgumentParser:
return cli_parser.get_parser()


@conf_vars({("core", "load_examples"): "true"})
def test_cli_assets_list(parser: ArgumentParser) -> None:
args = parser.parse_args(["assets", "list", "--output=json"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
asset_command.asset_list(args)

asset_list = json.loads(temp_stdout.getvalue())
assert len(asset_list) > 0
assert "name" in asset_list[0]
assert "uri" in asset_list[0]
assert "group" in asset_list[0]
assert "extra" in asset_list[0]
assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in asset_list)
assert set(asset_list[0]) == {"name", "uri", "group", "extra"}
assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in asset_list), asset_list


def test_cli_assets_alias_list(parser: ArgumentParser) -> None:
args = parser.parse_args(["assets", "list", "--alias", "--output=json"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
asset_command.asset_list(args)

alias_list = json.loads(temp_stdout.getvalue())
assert len(alias_list) > 0
assert set(alias_list[0]) == {"name", "group"}
assert any(alias["name"] == "example-alias" for alias in alias_list), alias_list


def test_cli_assets_details(parser: ArgumentParser) -> None:
Expand Down Expand Up @@ -97,6 +103,20 @@ def test_cli_assets_details(parser: ArgumentParser) -> None:
}


def test_cli_assets_alias_details(parser: ArgumentParser) -> None:
args = parser.parse_args(["assets", "details", "--alias", "--name=example-alias", "--output=json"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
asset_command.asset_details(args)

alias_detail_list = json.loads(temp_stdout.getvalue())
assert len(alias_detail_list) == 1

# No good way to statically compare these.
undeterministic = {"id": None}

assert alias_detail_list[0] | undeterministic == undeterministic | {"name": "example-alias", "group": ""}


def test_cli_assets_materialize(parser: ArgumentParser) -> None:
args = parser.parse_args(["assets", "materialize", "--name=asset1_producer", "--output=json"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
Expand Down Expand Up @@ -124,5 +144,5 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"run_type": "manual",
"start_date": None,
"state": "queued",
"triggered_by": "DagRunTriggeredByType.CLI",
"triggered_by": "cli",
}

0 comments on commit a530ee3

Please sign in to comment.