Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT : Add command to get DAG Details via CLI #30432

Merged
merged 14 commits into from
Apr 14, 2023
6 changes: 6 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,12 @@ class GroupCommand(NamedTuple):
CLICommand = Union[ActionCommand, GroupCommand]

DAGS_COMMANDS = (
ActionCommand(
name="details",
help="Get DAG details given a DAG id",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
josh-fell marked this conversation as resolved.
Show resolved Hide resolved
),
ActionCommand(
name="list",
help="List all the DAGs",
Expand Down
19 changes: 19 additions & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from airflow import settings
from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
Expand Down Expand Up @@ -343,6 +344,24 @@ def dag_list_dags(args):
)


@cli_utils.action_cli
@suppress_logs_and_warning
@provide_session
def dag_details(args, session=NEW_SESSION):
"""Get DAG details given a DAG id"""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dag_detail = dag_schema.dump(dag)
for key, value in dag_detail.items():
if isinstance(value, dict):
print(f"\t{key}:")
for subkey, subvalue in value.items():
print(f"\t\t{subkey}: {subvalue}")
else:
print(f"\t{key}: {value}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are trying to implement a new logic for yaml output, and this doesn't support other types (json, table, plain).
Instead you can use AirflowConsole.print_as method, something like:

Suggested change
for key, value in dag_detail.items():
if isinstance(value, dict):
print(f"\t{key}:")
for subkey, subvalue in value.items():
print(f"\t\t{subkey}: {subvalue}")
else:
print(f"\t{key}: {value}")
AirflowConsole().print_as(
data=[dag_detail],
output=args.output,
)

Copy link
Member

@potiuk potiuk Apr 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. All the CLI commands should use the same formatting options.

Copy link
Contributor Author

@maahir22 maahir22 Apr 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you want to do that @potiuk, @hussein-awala ? The default fallback when no --output option is provided is table -> so args.output becomes table by default, the SimpleTable isn't able to render such a large table properly due to the number of parameters and the length of these fields - most devices won't be able to display this output in a comprehensible way.
To standardise the approach, I've switched to AirflowConsole(), with a caveat that "table" isn't allowed as an output option - if it's received "yaml" output is displayed instead.

  1. Table Format

image

  1. JSON Format
    image

  2. YAML Format
    image

  3. Plain Format
    image

Copy link
Member

@potiuk potiuk Apr 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a table support with swapped columns and rows ? I think that one will be much better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo so essentially a transpose of the current table?
Let me know if I understand it currently, if initially the columns were scheduler_lock, is_active with Row1 being [], False -> now the rows will be scheduler_lock, is_active with Column1 having [], False?
Also, do we want this to be a new output type like --ttable? (Extra t for transposed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of the transposed table, it's simple with no need to update the print_as method:

if args.output == "table":
    data = [
        {
            "property_name": key,
            "property_value": value
        }
        for key, value in dag_detail.items()
    ]
else:
    data = [dag_detail]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precisely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes done, and everything looks fine now for table (all other outputs in my previous comment) -
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see in my previous comments, table & plain both formats were not rendered properly for the large number of attributes, so I've added checks for both of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. That's what I thought will be nicer :)



@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_import_errors(args):
Expand Down
46 changes: 46 additions & 0 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,52 @@ def test_cli_report(self):
assert "airflow/example_dags/example_complex.py" in out
assert "example_complex" in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_get_dag_details(self):
args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_details(args)
out = temp_stdout.getvalue()

# Check if DAG Details field are present
dag_details_fields = [
Copy link
Member

@potiuk potiuk Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the right way. You should check attributes of the dag rather than list the fields by hand. If we ad a new field we wll 100% forget to add it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I'll check all the attributes - don't get the "they should also likely be sorted part", do we want to display the command attributes in sorted order & test if the same order is maintained? Is there a reason behind doing this as it follows the same order as the REST Client response.

"has_task_concurrency_limits",
"tags",
"file_token",
"description",
"last_expired",
"root_dag_id",
"is_active",
"last_pickled",
"scheduler_lock",
"next_dagrun_create_after",
"next_dagrun_data_interval_start:",
"last_parsed_time",
"fileloc",
"default_view",
"max_active_tasks",
"is_subdag",
"owners",
"has_import_errors",
"dag_id",
"pickle_id",
"schedule_interval",
"timetable_description",
"next_dagrun_data_interval_end",
"is_paused",
"next_dagrun",
"max_active_runs",
]

for field in dag_details_fields:
assert field in out

# Check if identifying values are present
dag_details_values = ["airflow", "airflow/example_dags/example_complex.py", "16", "example_complex"]

for value in dag_details_values:
assert value in out

@conf_vars({("core", "load_examples"): "true"})
def test_cli_list_dags(self):
args = self.parser.parse_args(["dags", "list", "--output", "yaml"])
Expand Down