diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py index e5e6b011f607..a7be03742ea1 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py @@ -1,8 +1,8 @@ """Module containing tasks and flows for interacting with dbt CLI""" + import os from pathlib import Path, PosixPath from typing import Any, Dict, List, Optional, Union -from uuid import UUID import yaml from dbt.cli.main import dbtRunner, dbtRunnerResult @@ -29,9 +29,9 @@ async def trigger_dbt_cli_command( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-cli-command-summary", - **command_kwargs: Dict[str, Any], + create_summary_artifact: bool = False, + summary_artifact_key: Optional[str] = "dbt-cli-command-summary", + extra_command_args: Optional[List[str]] = None, ) -> Optional[dbtRunnerResult]: """ Task for running dbt commands. @@ -53,8 +53,15 @@ async def trigger_dbt_cli_command( dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False. - **shell_run_command_kwargs: Additional keyword arguments to pass to - [shell_run_command](https://prefecthq.github.io/prefect-shell/commands/#prefect_shell.commands.shell_run_command). + create_artifact: If True, creates a Prefect artifact on the task run + with the dbt build results using the specified artifact key. + Defaults to True. + artifact_key: The key under which to store + the dbt build results artifact in Prefect. + Defaults to 'dbt-seed-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt command. + These arguments get appended to the command that gets passed to the dbtRunner client. + Example: extra_command_args=["--model", "foo_model"] Returns: last_line_cli_output (str): The last line of the CLI output will be returned @@ -108,9 +115,10 @@ def trigger_dbt_cli_command_flow(): target_configs=target_configs, ) result = trigger_dbt_cli_command( - "dbt debug", + "dbt run", overwrite_profiles=True, - dbt_cli_profile=dbt_cli_profile + dbt_cli_profile=dbt_cli_profile, + extra_command_args=["--model", "foo_model"] ) return result @@ -156,8 +164,9 @@ def trigger_dbt_cli_command_flow(): cli_args.append("--project-dir") cli_args.append(project_dir) - if command_kwargs: - cli_args.append(command_kwargs) + if extra_command_args: + for value in extra_command_args: + cli_args.append(value) # fix up empty shell_run_command_kwargs dbt_runner_client = dbtRunner() @@ -169,21 +178,22 @@ def trigger_dbt_cli_command_flow(): raise result.exception # Creating the dbt Summary Markdown if enabled - if create_artifact and isinstance(result.result, RunExecutionResult): - markdown = create_summary_markdown(result, command) + if create_summary_artifact and isinstance(result.result, RunExecutionResult): + run_results = consolidate_run_results(result) + markdown = create_summary_markdown(run_results, command) artifact_id = await create_markdown_artifact( markdown=markdown, - key=artifact_key, + key=summary_artifact_key, ) if not artifact_id: - logger.error(f"Artifact was not created for dbt {command} task") + logger.error(f"Summary Artifact was not created for dbt {command} task") else: logger.info( f"dbt {command} task completed successfully with artifact {artifact_id}" ) else: logger.debug( - f"Artifact was not created for dbt {command} this task \ + f"Artifacts were not created for dbt {command} this task \ due to create_artifact=False or the dbt command did not \ return any RunExecutionResults. \ See https://docs.getdbt.com/reference/programmatic-invocations \ @@ -392,9 +402,9 @@ async def run_dbt_build( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-build-task-summary", - **command_kwargs, + create_summary_artifact: bool = False, + summary_artifact_key: str = "dbt-build-task-summary", + extra_command_args: Optional[List[str]] = None, ): """ Executes the 'dbt build' command within a Prefect task, @@ -419,6 +429,7 @@ async def run_dbt_build( artifact_key: The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-build-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt build command. Example: ```python @@ -428,7 +439,8 @@ async def run_dbt_build( @flow def dbt_test_flow(): dbt_build_task( - project_dir="/Users/test/my_dbt_project_dir" + project_dir="/Users/test/my_dbt_project_dir", + extra_command_args=["--model", "foo_model"] ) ``` @@ -445,9 +457,9 @@ def dbt_test_flow(): project_dir=project_dir, overwrite_profiles=overwrite_profiles, dbt_cli_profile=dbt_cli_profile, - create_artifact=create_artifact, - artifact_key=artifact_key, - **command_kwargs, + create_summary_artifact=create_summary_artifact, + summary_artifact_key=summary_artifact_key, + extra_command_args=extra_command_args, ) return results @@ -458,9 +470,9 @@ async def run_dbt_model( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-run-task-summary", - **command_kwargs, + create_summary_artifact: bool = False, + summary_artifact_key: str = "dbt-run-task-summary", + extra_command_args: Optional[List[str]] = None, ): """ Executes the 'dbt run' command within a Prefect task, @@ -485,6 +497,7 @@ async def run_dbt_model( artifact_key: The key under which to store the dbt run results artifact in Prefect. Defaults to 'dbt-run-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt run command. Example: ```python @@ -494,7 +507,8 @@ async def run_dbt_model( @flow def dbt_test_flow(): dbt_run_task( - project_dir="/Users/test/my_dbt_project_dir" + project_dir="/Users/test/my_dbt_project_dir", + extra_command_args=["--model", "foo_model"] ) ``` @@ -511,9 +525,9 @@ def dbt_test_flow(): project_dir=project_dir, overwrite_profiles=overwrite_profiles, dbt_cli_profile=dbt_cli_profile, - create_artifact=create_artifact, - artifact_key=artifact_key, - **command_kwargs, + create_summary_artifact=create_summary_artifact, + summary_artifact_key=summary_artifact_key, + extra_command_args=extra_command_args, ) return results @@ -525,9 +539,9 @@ async def run_dbt_test( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-test-task-summary", - **command_kwargs, + create_summary_artifact: bool = False, + summary_artifact_key: str = "dbt-test-task-summary", + extra_command_args: Optional[List[str]] = None, ): """ Executes the 'dbt test' command within a Prefect task, @@ -552,6 +566,7 @@ async def run_dbt_test( artifact_key: The key under which to store the dbt test results artifact in Prefect. Defaults to 'dbt-test-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt test command. Example: ```python @@ -561,7 +576,8 @@ async def run_dbt_test( @flow def dbt_test_flow(): dbt_test_task( - project_dir="/Users/test/my_dbt_project_dir" + project_dir="/Users/test/my_dbt_project_dir", + extra_command_args=["--model", "foo_model"] ) ``` @@ -578,9 +594,9 @@ def dbt_test_flow(): project_dir=project_dir, overwrite_profiles=overwrite_profiles, dbt_cli_profile=dbt_cli_profile, - create_artifact=create_artifact, - artifact_key=artifact_key, - **command_kwargs, + create_summary_artifact=create_summary_artifact, + summary_artifact_key=summary_artifact_key, + extra_command_args=extra_command_args, ) return results @@ -592,9 +608,9 @@ async def run_dbt_snapshot( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-snapshot-task-summary", - **command_kwargs, + create_summary_artifact: bool = False, + summary_artifact_key: str = "dbt-snapshot-task-summary", + extra_command_args: Optional[List[str]] = None, ): """ Executes the 'dbt snapshot' command within a Prefect task, @@ -619,6 +635,7 @@ async def run_dbt_snapshot( artifact_key: The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-snapshot-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt snapshot command. Example: ```python @@ -628,7 +645,8 @@ async def run_dbt_snapshot( @flow def dbt_test_flow(): dbt_snapshot_task( - project_dir="/Users/test/my_dbt_project_dir" + project_dir="/Users/test/my_dbt_project_dir", + extra_command_args=["--fail-fast"] ) ``` @@ -645,9 +663,9 @@ def dbt_test_flow(): project_dir=project_dir, overwrite_profiles=overwrite_profiles, dbt_cli_profile=dbt_cli_profile, - create_artifact=create_artifact, - artifact_key=artifact_key, - **command_kwargs, + create_summary_artifact=create_summary_artifact, + summary_artifact_key=summary_artifact_key, + extra_command_args=extra_command_args, ) return results @@ -659,9 +677,9 @@ async def run_dbt_seed( project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, - create_artifact: bool = True, - artifact_key: str = "dbt-seed-task-summary", - **command_kwargs, + create_summary_artifact: bool = False, + summary_artifact_key: str = "dbt-seed-task-summary", + extra_command_args: Optional[List[str]] = None, ): """ Executes the 'dbt seed' command within a Prefect task, @@ -686,6 +704,7 @@ async def run_dbt_seed( artifact_key: The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-seed-task-summary'. + extra_command_args: Additional command arguments to pass to the dbt seed command. Example: ```python @@ -695,7 +714,8 @@ async def run_dbt_seed( @flow def dbt_test_flow(): dbt_seed_task( - project_dir="/Users/test/my_dbt_project_dir" + project_dir="/Users/test/my_dbt_project_dir", + extra_command_args=["--fail-fast"] ) ``` @@ -712,61 +732,152 @@ def dbt_test_flow(): project_dir=project_dir, overwrite_profiles=overwrite_profiles, dbt_cli_profile=dbt_cli_profile, - create_artifact=create_artifact, - artifact_key=artifact_key, - **command_kwargs, + create_summary_artifact=create_summary_artifact, + summary_artifact_key=summary_artifact_key, + extra_command_args=extra_command_args, ) return results -def create_summary_markdown(results: dbtRunnerResult, command: str) -> UUID: +def create_summary_markdown(run_results: dict, command: str) -> str: """ Creates a Prefect task artifact summarizing the results of the above predefined prefrect-dbt task. """ - # Create Summary Markdown Artifact - run_statuses: Dict[str, List[str]] = { - "successful": [], - "failed": [], - "skipped": [], - } - - for r in results.result.results: - if r.status == NodeStatus.Success or r.status == NodeStatus.Pass: - run_statuses["successful"].append(r) - elif ( - r.status == NodeStatus.Fail - or r.status == NodeStatus.Error - or r.status == NodeStatus.RuntimeErr - ): - run_statuses["failed"].append(r) - elif r.status == NodeStatus.Skipped: - run_statuses["skipped"].append(r) - - markdown = f"# dbt {command} Task Summary" - - if run_statuses["failed"] != []: - failed_runs_str = "" - for r in run_statuses["failed"]: - failed_runs_str += f"**{r.node.name}**\n \ - Node Type: {r.node.resource_type}\n \ - Node Path: {r.node.original_file_path}" - if r.message: - message = r.message.replace("\n", ".") - failed_runs_str += f"\nError Message: {message}\n" - markdown += f"""\n## Failed Runs 🔴\n\n{failed_runs_str}\n\n""" - - if run_statuses["successful"] != []: + markdown = f"# dbt {command} Task Summary\n" + markdown += _create_node_summary_table_md(run_results=run_results) + + if ( + run_results["Error"] != [] + or run_results["Fail"] != [] + or run_results["Skipped"] != [] + or run_results["Warn"] != [] + ): + markdown += "\n\n ## Unsuccessful Nodes ❌\n\n" + markdown += _create_unsuccessful_markdown(run_results=run_results) + + if run_results["Success"] != []: successful_runs_str = "\n".join( - [f"**{r.node.name}**" for r in run_statuses["successful"]] + [f"* {r.node.name}" for r in run_results["Success"]] ) - markdown += f"""\n## Successful Runs ✅\n\n{successful_runs_str}\n\n""" + markdown += f"""\n## Successful Nodes ✅\n\n{successful_runs_str}\n\n""" - if run_statuses["skipped"] != []: - skipped_runs_str = "\n".join( - [f"**{r.node.name}**" for r in run_statuses["skipped"]] - ) - markdown += f""" ## Skipped Runs 🚫\n\n{skipped_runs_str}\n\n""" + return markdown + + +def _create_node_info_md(node_name, resource_type, message, path, compiled_code) -> str: + """ + Creates template for unsuccessful node information + """ + markdown = f""" +**{node_name}** + +Type: {resource_type} + +Message: + +> {message} + + +Path: {path} + +""" + + if compiled_code: + markdown += f""" +Compiled code: +```sql +{compiled_code} +``` + """ + + return markdown + + +def _create_node_summary_table_md(run_results: dict) -> str: + """ + Creates a table for node summary + """ + + markdown = f""" +| Successes | Errors | Failures | Skips | Warnings | +| :-------: | :----: | :------: | :---: | :------: | +| {len(run_results["Success"])} | {len(run_results["Error"])} | {len(run_results["Fail"])} | {len(run_results["Skipped"])} | {len(run_results["Warn"])} | + """ + return markdown + + +def _create_unsuccessful_markdown(run_results: dict) -> str: + """ + Creates markdown summarizing the results + of unsuccessful nodes, including compiled code. + """ + markdown = "" + if len(run_results["Error"]) > 0: + markdown += "\n### Errored Nodes:\n" + for n in run_results["Error"]: + markdown += _create_node_info_md( + n.node.name, + n.node.resource_type, + n.message, + n.node.path, + n.node.compiled_code if not n.node.resource_type == "seed" else None, + ) + if len(run_results["Fail"]) > 0: + markdown += "\n### Failed Nodes:\n" + for n in run_results["Fail"]: + markdown += _create_node_info_md( + n.node.name, + n.node.resource_type, + n.message, + n.node.path, + n.node.compiled_code if not n.node.resource_type == "seed" else None, + ) + if len(run_results["Skipped"]) > 0: + markdown += "\n### Skipped Nodes:\n" + for n in run_results["Skipped"]: + markdown += _create_node_info_md( + n.node.name, + n.node.resource_type, + n.message, + n.node.path, + n.node.compiled_code if not n.node.resource_type == "seed" else None, + ) + if len(run_results["Warn"]) > 0: + markdown += "\n### Warned Nodes:\n" + for n in run_results["Warn"]: + markdown += _create_node_info_md( + n.node.name, + n.node.resource_type, + n.message, + n.node.path, + n.node.compiled_code if not n.node.resource_type == "seed" else None, + ) return markdown + + +def consolidate_run_results(results: dbtRunnerResult) -> dict: + run_results: Dict[str, List[str]] = { + "Success": [], + "Fail": [], + "Skipped": [], + "Error": [], + "Warn": [], + } + + if results.exception is None: + for r in results.result.results: + if r.status == NodeStatus.Fail: + run_results["Fail"].append(r) + elif r.status == NodeStatus.Error or r.status == NodeStatus.RuntimeErr: + run_results["Error"].append(r) + elif r.status == NodeStatus.Skipped: + run_results["Skipped"].append(r) + elif r.status == NodeStatus.Success or r.status == NodeStatus.Pass: + run_results["Success"].append(r) + elif r.status == NodeStatus.Warn: + run_results["Warn"].append(r) + + return run_results diff --git a/src/integrations/prefect-dbt/tests/cli/test_commands.py b/src/integrations/prefect-dbt/tests/cli/test_commands.py index 11f580527efb..2e6f06dcd994 100644 --- a/src/integrations/prefect-dbt/tests/cli/test_commands.py +++ b/src/integrations/prefect-dbt/tests/cli/test_commands.py @@ -59,6 +59,41 @@ async def mock_dbt_runner_model_success(): ) +@pytest.fixture +async def mock_dbt_runner_model_error(): + return dbtRunnerResult( + success=False, + exception=None, + result=RunExecutionResult( + results=[ + RunResult( + status="error", + timing=None, + thread_id="'Thread-1 (worker)'", + message="Runtime Error", + failures=None, + node=ModelNode( + database="test-123", + schema="prefect_dbt_example", + name="my_first_dbt_model", + resource_type="model", + package_name="prefect_dbt_bigquery", + path="example/my_first_dbt_model.sql", + original_file_path="models/example/my_first_dbt_model.sql", + unique_id="model.prefect_dbt_bigquery.my_first_dbt_model", + fqn=["prefect_dbt_bigquery", "example", "my_first_dbt_model"], + alias="my_first_dbt_model", + checksum=FileHash(name="sha256", checksum="123456789"), + ), + execution_time=0.0, + adapter_response=None, + ) + ], + elapsed_time=0.0, + ), + ) + + @pytest.fixture async def mock_dbt_runner_ls_success(): return dbtRunnerResult( @@ -231,14 +266,14 @@ def test_flow(): @pytest.mark.usefixtures("dbt_runner_ls_result") -def test_trigger_dbt_cli_command_shell_kwargs(profiles_dir, dbt_cli_profile_bare): +def test_trigger_dbt_cli_command_extra_command_args(profiles_dir, dbt_cli_profile_bare): @flow def test_flow(): return trigger_dbt_cli_command( "dbt ls", - return_all=True, profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, + extra_command_args=["--return_all", "True"], ) result = test_flow() @@ -364,8 +399,8 @@ def test_flow(): return run_dbt_build( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, ) test_flow() @@ -373,6 +408,7 @@ def test_flow(): assert a.type == "markdown" assert a.data.startswith("# dbt build Task Summary") assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data @pytest.mark.usefixtures("dbt_runner_model_result") @@ -382,8 +418,8 @@ def test_flow(): return run_dbt_test( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, ) test_flow() @@ -391,6 +427,7 @@ def test_flow(): assert a.type == "markdown" assert a.data.startswith("# dbt test Task Summary") assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data @pytest.mark.usefixtures("dbt_runner_model_result") @@ -400,8 +437,8 @@ def test_flow(): return run_dbt_snapshot( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, ) test_flow() @@ -409,6 +446,7 @@ def test_flow(): assert a.type == "markdown" assert a.data.startswith("# dbt snapshot Task Summary") assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data @pytest.mark.usefixtures("dbt_runner_model_result") @@ -418,8 +456,8 @@ def test_flow(): return run_dbt_seed( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, ) test_flow() @@ -427,6 +465,7 @@ def test_flow(): assert a.type == "markdown" assert a.data.startswith("# dbt seed Task Summary") assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data @pytest.mark.usefixtures("dbt_runner_model_result") @@ -436,8 +475,35 @@ def test_flow(): return run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, + ) + + test_flow() + assert (a := Artifact.get(key="foo")) + assert a.type == "markdown" + assert a.data.startswith("# dbt run Task Summary") + assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data + + +@pytest.fixture +def dbt_runner_model_error(monkeypatch, mock_dbt_runner_model_error): + _mock_dbt_runner_invoke_error = MagicMock(return_value=mock_dbt_runner_model_error) + monkeypatch.setattr("dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_invoke_error) + + +@pytest.mark.usefixtures("dbt_runner_model_error") +def test_run_dbt_model_creates_unsuccessful_artifact( + profiles_dir, dbt_cli_profile_bare +): + @flow + def test_flow(): + return run_dbt_model( + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + summary_artifact_key="foo", + create_summary_artifact=True, ) test_flow() @@ -445,6 +511,7 @@ def test_flow(): assert a.type == "markdown" assert a.data.startswith("# dbt run Task Summary") assert "my_first_dbt_model" in a.data + assert "Unsuccessful Nodes" in a.data @pytest.mark.usefixtures("dbt_runner_failed_result") @@ -454,8 +521,8 @@ def test_flow(): return run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, - artifact_key="foo", - create_artifact=True, + summary_artifact_key="foo", + create_summary_artifact=True, ) with pytest.raises(DbtUsageException, match="No such command 'weeeeeee'."):