Skip to content

Commit

Permalink
Add command to get DAG Details via CLI (#30432)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Hussein Awala <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
(cherry picked from commit 7074167)
  • Loading branch information
maahir22 authored and ephraimbuddy committed Apr 14, 2023
1 parent 885a61b commit 06e4672
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,12 @@ class GroupCommand(NamedTuple):
CLICommand = Union[ActionCommand, GroupCommand]

DAGS_COMMANDS = (
ActionCommand(
name="details",
help="Get DAG details given a DAG id",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list",
help="List all the DAGs",
Expand Down
22 changes: 22 additions & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from airflow import settings
from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
Expand Down Expand Up @@ -346,6 +347,27 @@ def dag_list_dags(args) -> None:
)


@cli_utils.action_cli
@suppress_logs_and_warning
@provide_session
def dag_details(args, session=NEW_SESSION):
"""Get DAG details given a DAG id"""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dag_detail = dag_schema.dump(dag)

if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()]
else:
data = [dag_detail]

AirflowConsole().print_as(
data=data,
output=args.output,
)


@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_import_errors(args) -> None:
Expand Down
20 changes: 20 additions & 0 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import time_machine

from airflow import settings
from airflow.api_connexion.schemas.dag_schema import DAGSchema
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -470,6 +471,25 @@ def test_cli_report(self):
assert "airflow/example_dags/example_complex.py" in out
assert "example_complex" in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_get_dag_details(self):
args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_details(args)
out = temp_stdout.getvalue()

dag_detail_fields = DAGSchema().fields.keys()

# Check if DAG Details field are present
for field in dag_detail_fields:
assert field in out

# Check if identifying values are present
dag_details_values = ["airflow", "airflow/example_dags/example_complex.py", "16", "example_complex"]

for value in dag_details_values:
assert value in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_list_dags(self):
args = self.parser.parse_args(["dags", "list", "--output", "yaml"])
Expand Down

0 comments on commit 06e4672

Please sign in to comment.