Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT : Add command to get DAG Details via CLI #30432

Merged
merged 14 commits into from
Apr 14, 2023
6 changes: 6 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,12 @@ class GroupCommand(NamedTuple):
CLICommand = Union[ActionCommand, GroupCommand]

DAGS_COMMANDS = (
ActionCommand(
name="details",
help="Get DAG details given a DAG id",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list",
help="List all the DAGs",
Expand Down
22 changes: 22 additions & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from airflow import settings
from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
Expand Down Expand Up @@ -346,6 +347,27 @@ def dag_list_dags(args) -> None:
)


@cli_utils.action_cli
@suppress_logs_and_warning
@provide_session
def dag_details(args, session=NEW_SESSION):
"""Get DAG details given a DAG id"""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dag_detail = dag_schema.dump(dag)

if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()]
else:
data = [dag_detail]

AirflowConsole().print_as(
data=data,
output=args.output,
)


@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_import_errors(args) -> None:
Expand Down
20 changes: 20 additions & 0 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import time_machine

from airflow import settings
from airflow.api_connexion.schemas.dag_schema import DAGSchema
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -470,6 +471,25 @@ def test_cli_report(self):
assert "airflow/example_dags/example_complex.py" in out
assert "example_complex" in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_get_dag_details(self):
args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_details(args)
out = temp_stdout.getvalue()

dag_detail_fields = DAGSchema().fields.keys()

# Check if DAG Details field are present
for field in dag_detail_fields:
assert field in out

# Check if identifying values are present
dag_details_values = ["airflow", "airflow/example_dags/example_complex.py", "16", "example_complex"]

for value in dag_details_values:
assert value in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_list_dags(self):
args = self.parser.parse_args(["dags", "list", "--output", "yaml"])
Expand Down