From 7074167d71c93b69361d24c1121adc7419367f2a Mon Sep 17 00:00:00 2001 From: maahir22 <56473490+maahir22@users.noreply.github.com> Date: Fri, 14 Apr 2023 22:44:48 +0530 Subject: [PATCH] Add command to get DAG Details via CLI (#30432) --------- Co-authored-by: Hussein Awala Co-authored-by: Hussein Awala --- airflow/cli/cli_config.py | 6 ++++++ airflow/cli/commands/dag_command.py | 22 ++++++++++++++++++++++ tests/cli/commands/test_dag_command.py | 20 ++++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index e08b4e01ec741..ffcfd92de03b6 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1101,6 +1101,12 @@ class GroupCommand(NamedTuple): CLICommand = Union[ActionCommand, GroupCommand] DAGS_COMMANDS = ( + ActionCommand( + name="details", + help="Get DAG details given a DAG id", + func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"), + args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE), + ), ActionCommand( name="list", help="List all the DAGs", diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 8735a0e522715..c81f1c0a47a45 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -32,6 +32,7 @@ from airflow import settings from airflow.api.client import get_current_api_client +from airflow.api_connexion.schemas.dag_schema import dag_schema from airflow.cli.simple_table import AirflowConsole from airflow.configuration import conf from airflow.exceptions import AirflowException, RemovedInAirflow3Warning @@ -346,6 +347,27 @@ def dag_list_dags(args) -> None: ) +@cli_utils.action_cli +@suppress_logs_and_warning +@provide_session +def dag_details(args, session=NEW_SESSION): + """Get DAG details given a DAG id""" + dag = DagModel.get_dagmodel(args.dag_id, session=session) + if not dag: + raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + dag_detail = dag_schema.dump(dag) + + if args.output in ["table", "plain"]: + data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()] + else: + data = [dag_detail] + + AirflowConsole().print_as( + data=data, + output=args.output, + ) + + @cli_utils.action_cli @suppress_logs_and_warning def dag_list_import_errors(args) -> None: diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 6b4e2187fadcf..a54d26a9932c6 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -31,6 +31,7 @@ import time_machine from airflow import settings +from airflow.api_connexion.schemas.dag_schema import DAGSchema from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.exceptions import AirflowException @@ -470,6 +471,25 @@ def test_cli_report(self): assert "airflow/example_dags/example_complex.py" in out assert "example_complex" in out + @conf_vars({("core", "load_examples"): "true"}) + def test_cli_get_dag_details(self): + args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + dag_command.dag_details(args) + out = temp_stdout.getvalue() + + dag_detail_fields = DAGSchema().fields.keys() + + # Check if DAG Details field are present + for field in dag_detail_fields: + assert field in out + + # Check if identifying values are present + dag_details_values = ["airflow", "airflow/example_dags/example_complex.py", "16", "example_complex"] + + for value in dag_details_values: + assert value in out + @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_dags(self): args = self.parser.parse_args(["dags", "list", "--output", "yaml"])