From 19636873906c138f9f662a22c916854c3bd8e81b Mon Sep 17 00:00:00 2001 From: maahir22 Date: Sun, 2 Apr 2023 17:09:58 +0530 Subject: [PATCH 01/12] Add command to get DAG Details via CLI --- airflow/cli/cli_config.py | 6 ++++ airflow/cli/commands/dag_command.py | 16 +++++++++ tests/cli/commands/test_dag_command.py | 46 ++++++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 5502a68fb88cb..dd9544b1454fb 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1100,6 +1100,12 @@ class GroupCommand(NamedTuple): CLICommand = Union[ActionCommand, GroupCommand] DAGS_COMMANDS = ( + ActionCommand( + name="details", + help="Get DAG details given a DAG id", + func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"), + args=(ARG_DAG_ID, ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE), + ), ActionCommand( name="list", help="List all the DAGs", diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index ef891fba62832..71a08a00c8d48 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -30,6 +30,7 @@ from airflow import settings from airflow.api.client import get_current_api_client +from airflow.api_connexion.schemas.dag_schema import dag_detail_schema from airflow.cli.simple_table import AirflowConsole from airflow.configuration import conf from airflow.exceptions import AirflowException, RemovedInAirflow3Warning @@ -343,6 +344,21 @@ def dag_list_dags(args): ) +@cli_utils.action_cli +@suppress_logs_and_warning +def dag_details(args): + """Get DAG details given a DAG id""" + dag = get_dag(args.subdir, args.dag_id) + dag_detail = dag_detail_schema.dump(dag) + for key, value in dag_detail.items(): + if isinstance(value, dict): + print(f"\t{key}:") + for subkey, subvalue in value.items(): + print(f"\t\t{subkey}: {subvalue}") + else: + print(f"\t{key}: {value}") + + @cli_utils.action_cli @suppress_logs_and_warning def dag_list_import_errors(args): diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 6b4e2187fadcf..15340c4fd30e5 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -470,6 +470,52 @@ def test_cli_report(self): assert "airflow/example_dags/example_complex.py" in out assert "example_complex" in out + @conf_vars({("core", "load_examples"): "true"}) + def test_cli_get_dag_details(self): + args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + dag_command.dag_details(args) + out = temp_stdout.getvalue() + + # Check if DAG Details field are present + dag_details_fields = [ + "timezone", + "start_date", + "dag_id", + "default_view", + "is_paused_upon_creation", + "is_paused", + "concurrency", + "description", + "schedule_interval", + "dag_run_timeout", + "last_parsed", + "max_active_tasks", + "fileloc", + "max_active_runs", + "orientation", + "is_active", + "params", + "end_date", + "file_token", + "tags", + "doc_md", + "pickle_id", + "owners", + "catchup", + "render_template_as_native_obj", + "is_subdag", + ] + + for field in dag_details_fields: + assert field in out + + # Check if identifying values are present + dag_details_values = ["airflow", "airflow/example_dags/example_complex.py", "16", "example_complex"] + + for value in dag_details_values: + assert value in out + @conf_vars({("core", "load_examples"): "true"}) def test_cli_list_dags(self): args = self.parser.parse_args(["dags", "list", "--output", "yaml"]) From 592c7a2ffad330bdbd4333129db75823cc027866 Mon Sep 17 00:00:00 2001 From: maahir22 Date: Sun, 2 Apr 2023 18:07:01 +0530 Subject: [PATCH 02/12] Field fix --- airflow/cli/commands/dag_command.py | 11 +++--- tests/cli/commands/test_dag_command.py | 46 +++++++++++++------------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 71a08a00c8d48..3b41619319d32 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -30,7 +30,7 @@ from airflow import settings from airflow.api.client import get_current_api_client -from airflow.api_connexion.schemas.dag_schema import dag_detail_schema +from airflow.api_connexion.schemas.dag_schema import dag_schema from airflow.cli.simple_table import AirflowConsole from airflow.configuration import conf from airflow.exceptions import AirflowException, RemovedInAirflow3Warning @@ -346,10 +346,13 @@ def dag_list_dags(args): @cli_utils.action_cli @suppress_logs_and_warning -def dag_details(args): +@provide_session +def dag_details(args, session=NEW_SESSION): """Get DAG details given a DAG id""" - dag = get_dag(args.subdir, args.dag_id) - dag_detail = dag_detail_schema.dump(dag) + dag = DagModel.get_dagmodel(args.dag_id, session=session) + if not dag: + raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + dag_detail = dag_schema.dump(dag) for key, value in dag_detail.items(): if isinstance(value, dict): print(f"\t{key}:") diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 15340c4fd30e5..8f2b724757f85 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -479,32 +479,32 @@ def test_cli_get_dag_details(self): # Check if DAG Details field are present dag_details_fields = [ - "timezone", - "start_date", - "dag_id", - "default_view", - "is_paused_upon_creation", - "is_paused", - "concurrency", + "has_task_concurrency_limits", + "tags", + "file_token", "description", - "schedule_interval", - "dag_run_timeout", - "last_parsed", - "max_active_tasks", - "fileloc", - "max_active_runs", - "orientation", + "last_expired", + "root_dag_id", "is_active", - "params", - "end_date", - "file_token", - "tags", - "doc_md", - "pickle_id", - "owners", - "catchup", - "render_template_as_native_obj", + "last_pickled", + "scheduler_lock", + "next_dagrun_create_after", + "next_dagrun_data_interval_start:", + "last_parsed_time", + "fileloc", + "default_view", + "max_active_tasks", "is_subdag", + "owners", + "has_import_errors", + "dag_id", + "pickle_id", + "schedule_interval", + "timetable_description", + "next_dagrun_data_interval_end", + "is_paused", + "next_dagrun", + "max_active_runs", ] for field in dag_details_fields: From 82b795042645a465457e9834912c2418ad83e67a Mon Sep 17 00:00:00 2001 From: maahir22 Date: Sun, 2 Apr 2023 22:05:52 +0530 Subject: [PATCH 03/12] Switch to standard console --- airflow/cli/commands/dag_command.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 3b41619319d32..cd3baaa8cb06f 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -353,13 +353,10 @@ def dag_details(args, session=NEW_SESSION): if not dag: raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") dag_detail = dag_schema.dump(dag) - for key, value in dag_detail.items(): - if isinstance(value, dict): - print(f"\t{key}:") - for subkey, subvalue in value.items(): - print(f"\t\t{subkey}: {subvalue}") - else: - print(f"\t{key}: {value}") + AirflowConsole().print_as( + data=[dag_detail], + output=("yaml" if args.output == "table" else args.output), + ) @cli_utils.action_cli From 9117867eca36f29e195e6eb637d1fe26e8c90a5e Mon Sep 17 00:00:00 2001 From: maahir22 Date: Mon, 3 Apr 2023 20:20:27 +0530 Subject: [PATCH 04/12] Use AirflowConsole - transposed table --- airflow/cli/commands/dag_command.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index cd3baaa8cb06f..54f437d196fc7 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -353,9 +353,15 @@ def dag_details(args, session=NEW_SESSION): if not dag: raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") dag_detail = dag_schema.dump(dag) + + if args.output == "table": + data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()] + else: + data = [dag_detail] + AirflowConsole().print_as( - data=[dag_detail], - output=("yaml" if args.output == "table" else args.output), + data=data, + output=args.output, ) From ecafbbf5821722d29cbe92c921c1f34489a660cf Mon Sep 17 00:00:00 2001 From: maahir22 Date: Mon, 3 Apr 2023 23:10:47 +0530 Subject: [PATCH 05/12] Add check for Plain Table --- airflow/cli/commands/dag_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 54f437d196fc7..3427db589022a 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -354,7 +354,7 @@ def dag_details(args, session=NEW_SESSION): raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") dag_detail = dag_schema.dump(dag) - if args.output == "table": + if args.output == "table" or args.output == "plain": data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()] else: data = [dag_detail] From 0371805c7fd82df2d71d9258b1ab0a0342083b46 Mon Sep 17 00:00:00 2001 From: maahir22 <56473490+maahir22@users.noreply.github.com> Date: Tue, 4 Apr 2023 07:19:54 +0530 Subject: [PATCH 06/12] Update airflow/cli/commands/dag_command.py Co-authored-by: Hussein Awala --- airflow/cli/commands/dag_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 3427db589022a..ce3b0b4fda7dc 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -354,7 +354,7 @@ def dag_details(args, session=NEW_SESSION): raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") dag_detail = dag_schema.dump(dag) - if args.output == "table" or args.output == "plain": + if args.output in ["table", "plain"]: data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()] else: data = [dag_detail] From 8589b847d789d6d9840dca6b43ff504973b59d08 Mon Sep 17 00:00:00 2001 From: maahir22 Date: Tue, 4 Apr 2023 21:38:47 +0530 Subject: [PATCH 07/12] Remove Subdir Arg --- airflow/cli/cli_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index dd9544b1454fb..5a1007eaa66d0 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1104,7 +1104,7 @@ class GroupCommand(NamedTuple): name="details", help="Get DAG details given a DAG id", func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"), - args=(ARG_DAG_ID, ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE), + args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE), ), ActionCommand( name="list", From d24cbe9dc49fd85a1c26abb4731749267bcb75e7 Mon Sep 17 00:00:00 2001 From: maahir22 Date: Tue, 11 Apr 2023 21:40:58 +0530 Subject: [PATCH 08/12] Remove Hard-Coded checks --- tests/cli/commands/test_dag_command.py | 37 +++++--------------------- 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 8f2b724757f85..fa377584a1ef6 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -31,6 +31,7 @@ import time_machine from airflow import settings +from airflow.api_connexion.schemas.dag_schema import dag_schema from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.exceptions import AirflowException @@ -473,41 +474,17 @@ def test_cli_report(self): @conf_vars({("core", "load_examples"): "true"}) def test_cli_get_dag_details(self): args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"]) + dag_id = "example_complex" with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_details(args) out = temp_stdout.getvalue() - # Check if DAG Details field are present - dag_details_fields = [ - "has_task_concurrency_limits", - "tags", - "file_token", - "description", - "last_expired", - "root_dag_id", - "is_active", - "last_pickled", - "scheduler_lock", - "next_dagrun_create_after", - "next_dagrun_data_interval_start:", - "last_parsed_time", - "fileloc", - "default_view", - "max_active_tasks", - "is_subdag", - "owners", - "has_import_errors", - "dag_id", - "pickle_id", - "schedule_interval", - "timetable_description", - "next_dagrun_data_interval_end", - "is_paused", - "next_dagrun", - "max_active_runs", - ] + with create_session() as session: + dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none() + dag_detail_fields = dag_schema.dump(dag) - for field in dag_details_fields: + # Check if DAG Details field are present + for field in dag_detail_fields: assert field in out # Check if identifying values are present From 3644be3d0b3c7ec0544a94bd4629aac80410cc1b Mon Sep 17 00:00:00 2001 From: maahir22 Date: Wed, 12 Apr 2023 12:29:11 +0530 Subject: [PATCH 09/12] Remove hard-coded fields --- tests/cli/commands/test_dag_command.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index fa377584a1ef6..fd288c1ef1062 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -31,7 +31,6 @@ import time_machine from airflow import settings -from airflow.api_connexion.schemas.dag_schema import dag_schema from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.exceptions import AirflowException @@ -474,14 +473,13 @@ def test_cli_report(self): @conf_vars({("core", "load_examples"): "true"}) def test_cli_get_dag_details(self): args = self.parser.parse_args(["dags", "details", "example_complex", "--output", "yaml"]) - dag_id = "example_complex" with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_details(args) out = temp_stdout.getvalue() - with create_session() as session: - dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none() - dag_detail_fields = dag_schema.dump(dag) + dag_detail_fields = [ + column.name for column in DagModel.__table__.columns if column.name != "processor_subdir" + ] # Check if DAG Details field are present for field in dag_detail_fields: From 3ae510056c12189ed92eff8d2f0862777e339a5e Mon Sep 17 00:00:00 2001 From: maahir22 Date: Wed, 12 Apr 2023 12:31:49 +0530 Subject: [PATCH 10/12] Remove hardcoded fields --- tests/cli/commands/test_dag_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index fd288c1ef1062..674717c3f29cc 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -778,4 +778,4 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag): ] ) mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value, tis=[])]) - assert "SOURCE" in output + assert "SOURCE" in output \ No newline at end of file From 2bd153be17a0416d5948b14496350fc4420fef16 Mon Sep 17 00:00:00 2001 From: maahir22 Date: Wed, 12 Apr 2023 12:33:16 +0530 Subject: [PATCH 11/12] Fix lint --- tests/cli/commands/test_dag_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 674717c3f29cc..fd288c1ef1062 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -778,4 +778,4 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag): ] ) mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value, tis=[])]) - assert "SOURCE" in output \ No newline at end of file + assert "SOURCE" in output From b6a84393d208da01610b5923e70f4a1b35c55ac7 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 13 Apr 2023 23:06:22 +0200 Subject: [PATCH 12/12] load dag fields from a DAGSchema instance --- tests/cli/commands/test_dag_command.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index fd288c1ef1062..a54d26a9932c6 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -31,6 +31,7 @@ import time_machine from airflow import settings +from airflow.api_connexion.schemas.dag_schema import DAGSchema from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.exceptions import AirflowException @@ -477,9 +478,7 @@ def test_cli_get_dag_details(self): dag_command.dag_details(args) out = temp_stdout.getvalue() - dag_detail_fields = [ - column.name for column in DagModel.__table__.columns if column.name != "processor_subdir" - ] + dag_detail_fields = DAGSchema().fields.keys() # Check if DAG Details field are present for field in dag_detail_fields: