From 849f36ebe9c10b5a9b712332f64cdb1234ba22f2 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Mon, 15 Jan 2024 12:15:56 -0500 Subject: [PATCH 01/11] support partial parsing --- cosmos/config.py | 8 ++++++- cosmos/constants.py | 1 + cosmos/converter.py | 1 + cosmos/dbt/graph.py | 8 ++++++- cosmos/dbt/project.py | 16 +++++++++---- cosmos/hooks/subprocess.py | 6 +++++ cosmos/operators/base.py | 16 +++++++------ cosmos/operators/local.py | 16 +++++++++---- docs/configuration/execution-config.rst | 1 + docs/configuration/render-config.rst | 1 + tests/dbt/test_project.py | 31 +++++++++++++++++++++++-- 11 files changed, 85 insertions(+), 20 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index dc33c0eba..ea64787da 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -47,6 +47,7 @@ class RenderConfig: :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. + :param partial_parse: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. """ emit_datasets: bool = True @@ -61,6 +62,7 @@ class RenderConfig: env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None dbt_ls_path: Path | None = None + partial_parse: bool = True project_path: Path | None = field(init=False) @@ -141,6 +143,7 @@ def __init__( project_name: str | None = None, env_vars: dict[str, str] | None = None, dbt_vars: dict[str, str] | None = None, + partial_parse: bool = True, ): # Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig # dbt_project_path may not always be defined here. @@ -166,6 +169,7 @@ def __init__( self.env_vars = env_vars self.dbt_vars = dbt_vars + self.partial_parse = partial_parse def validate_project(self) -> None: """ @@ -292,7 +296,8 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. - :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path + :param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path + :param partial_parse: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path """ execution_mode: ExecutionMode = ExecutionMode.LOCAL @@ -301,6 +306,7 @@ class ExecutionConfig: dbt_project_path: InitVar[str | Path | None] = None project_path: Path | None = field(init=False) + partial_parse: bool = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None diff --git a/cosmos/constants.py b/cosmos/constants.py index 96c5bdd07..a00857540 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -12,6 +12,7 @@ DBT_LOG_DIR_NAME = "logs" DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH" DBT_TARGET_DIR_NAME = "target" +DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" diff --git a/cosmos/converter.py b/cosmos/converter.py index c2b31700b..ce3cd8b9a 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -246,6 +246,7 @@ def __init__( task_args = { **operator_args, "project_dir": execution_config.project_path, + "partial_parse": execution_config.partial_parse, "profile_config": profile_config, "emit_datasets": render_config.emit_datasets, "env": env_vars, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b41c15a49..59712af23 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -22,7 +22,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks, environ +from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -199,6 +199,9 @@ def run_dbt_ls( if self.render_config.selector: ls_command.extend(["--selector", self.render_config.selector]) + if not self.render_config.partial_parse: + ls_command.append("--no-partial-parse") + ls_command.extend(self.local_flags) stdout = run_command(ls_command, tmp_dir, env_vars) @@ -243,6 +246,9 @@ def load_via_dbt_ls(self) -> None: tmpdir_path = Path(tmpdir) create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps) + if self.render_config.partial_parse: + copy_manifest_for_partial_parse(self.render_config.project_path, tmpdir_path) + with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ( self.project.env_vars or self.render_config.env_vars or {} ): diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index aff6ed03e..aa10abb83 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -1,11 +1,9 @@ from __future__ import annotations from pathlib import Path +import shutil import os -from cosmos.constants import ( - DBT_LOG_DIR_NAME, - DBT_TARGET_DIR_NAME, -) +from cosmos.constants import DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME from contextlib import contextmanager from typing import Generator @@ -21,6 +19,16 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool os.symlink(project_path / child_name, tmp_dir / child_name) +def copy_manifest_for_partial_parse(project_path: Path, tmp_dir: Path) -> None: + partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME + + if partial_parse_file.exists(): + tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME + tmp_target_dir.mkdir(exist_ok=True) + + shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME)) + + @contextmanager def environ(env_vars: dict[str, str]) -> Generator[None, None, None]: """Temporarily set environment variables inside the context manager and restore diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index cf6b489e8..2522420b7 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -105,3 +105,9 @@ def send_sigterm(self) -> None: logger.info("Sending SIGTERM signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) + + def send_sigint(self) -> None: + """Sends SIGINT signal to ``self.sub_process`` if one exists.""" + logger.info("Sending SIGINT signal to process group") + if self.sub_process and hasattr(self.sub_process, "pid"): + os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index c46b95c59..b6e623749 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -51,6 +51,9 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): :param skip_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. + :param partial_parse: If True, then the operator will use the + ``partial_parse.msgpack`` during execution if it exists. If False, then + a flag will be explicitly set to turn off partial parsing. :param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed. Otherwise, the query will keep running when the task is killed. :param dbt_executable_path: Path to dbt executable can be used with venv @@ -68,13 +71,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): "vars", "models", ) - global_boolean_flags = ( - "no_version_check", - "cache_selected_only", - "fail_fast", - "quiet", - "warn_error", - ) + global_boolean_flags = ("no_version_check", "cache_selected_only", "fail_fast", "quiet", "warn_error") intercept_flag = True @@ -105,6 +102,7 @@ def __init__( append_env: bool = False, output_encoding: str = "utf-8", skip_exit_code: int = 99, + partial_parse: bool = True, cancel_query_on_kill: bool = True, dbt_executable_path: str = get_system_dbt(), dbt_cmd_flags: list[str] | None = None, @@ -131,6 +129,7 @@ def __init__( self.append_env = append_env self.output_encoding = output_encoding self.skip_exit_code = skip_exit_code + self.partial_parse = partial_parse self.cancel_query_on_kill = cancel_query_on_kill self.dbt_executable_path = dbt_executable_path self.dbt_cmd_flags = dbt_cmd_flags @@ -219,6 +218,9 @@ def build_cmd( dbt_cmd.extend(self.dbt_cmd_global_flags) + if not self.partial_parse: + dbt_cmd.append("--no-partial-parse") + dbt_cmd.extend(self.base_cmd) if self.indirect_selection: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index cc0e1f30b..11c6f3a12 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -34,7 +34,12 @@ from sqlalchemy.orm import Session -from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER +from cosmos.constants import ( + DEFAULT_OPENLINEAGE_NAMESPACE, + OPENLINEAGE_PRODUCER, + DBT_TARGET_DIR_NAME, + DBT_PARTIAL_PARSE_FILE_NAME, +) from cosmos.config import ProfileConfig from cosmos.log import get_logger from cosmos.operators.base import ( @@ -51,7 +56,7 @@ FullOutputSubprocessResult, ) from cosmos.dbt.parser.output import extract_log_issues, parse_output -from cosmos.dbt.project import create_symlinks +from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse DBT_NO_TESTS_MSG = "Nothing to do" DBT_WARN_MSG = "WARN" @@ -208,6 +213,9 @@ def run_command( create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps) + if self.partial_parse: + copy_manifest_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir)) + with self.profile_config.ensure_profile() as profile_values: (profile_path, env_vars) = profile_values env.update(env_vars) @@ -377,9 +385,7 @@ def execute(self, context: Context) -> None: def on_kill(self) -> None: if self.cancel_query_on_kill: - self.subprocess_hook.log.info("Sending SIGINT signal to process group") - if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"): - os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT) + self.subprocess_hook.send_sigint() else: self.subprocess_hook.send_sigterm() diff --git a/docs/configuration/execution-config.rst b/docs/configuration/execution-config.rst index c118590d8..82fa75280 100644 --- a/docs/configuration/execution-config.rst +++ b/docs/configuration/execution-config.rst @@ -10,3 +10,4 @@ The ``ExecutionConfig`` class takes the following arguments: - ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` +- ``partial_parse``: If True, then the operator will use the ``partial_parse.msgpack`` during execution if it exists. If False, then the flag will be explicitly set to turn off partial parsing. For ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``, the ``partial_parse.msgpack`` file will be copied into temporary directory that dbt executes out of. diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 6d669d0a5..898cf1c18 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -17,6 +17,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` +- ``partial_parse``: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index 000ad06bd..40591712b 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -1,8 +1,11 @@ -from pathlib import Path -from cosmos.dbt.project import create_symlinks, environ import os +from pathlib import Path from unittest.mock import patch +import pytest + +from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ + DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -17,6 +20,30 @@ def test_create_symlinks(tmp_path): assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages") +@pytest.mark.parametrize("exists", [True, False]) +def test_copy_manifest_for_partial_parse(tmp_path, exists): + project_path = tmp_path / "project" + target_path = project_path / "target" + partial_parse_file = target_path / "partial_parse.msgpack" + + target_path.mkdir(parents=True) + + if exists: + partial_parse_file.write_bytes(b"") + + tmp_dir = tmp_path / "tmp_dir" + tmp_dir.mkdir() + + copy_manifest_for_partial_parse(project_path, tmp_dir) + + tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack" + + if exists: + assert tmp_partial_parse_file.exists() + else: + assert not tmp_partial_parse_file.exists() + + @patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"}) def test_environ_context_manager(): # Define the expected environment variables From 82a9e85f5deacf37b35a901b31ee1eed5b767046 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Wed, 17 Jan 2024 22:03:06 -0500 Subject: [PATCH 02/11] add tests for subprocesshook --- tests/hooks/__init__.py | 0 tests/hooks/test_subprocess.py | 70 ++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 tests/hooks/__init__.py create mode 100644 tests/hooks/test_subprocess.py diff --git a/tests/hooks/__init__.py b/tests/hooks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py new file mode 100644 index 000000000..601d37b00 --- /dev/null +++ b/tests/hooks/test_subprocess.py @@ -0,0 +1,70 @@ +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import MagicMock, patch +import signal + +import pytest + +from cosmos.hooks.subprocess import FullOutputSubprocessHook + +OS_ENV_KEY = "SUBPROCESS_ENV_TEST" +OS_ENV_VAL = "this-is-from-os-environ" + + +@pytest.mark.parametrize( + "env,expected", + [ + ({"ABC": "123", "AAA": "456"}, {"ABC": "123", "AAA": "456", OS_ENV_KEY: ""}), + ({}, {OS_ENV_KEY: ""}), + (None, {OS_ENV_KEY: OS_ENV_VAL}), + ], + ids=["with env", "empty env", "no env"], +) +def test_env(env, expected): + """ + Test that env variables are exported correctly to the command environment. + When ``env`` is ``None``, ``os.environ`` should be passed to ``Popen``. + Otherwise, the variables in ``env`` should be available, and ``os.environ`` should not. + """ + hook = FullOutputSubprocessHook() + + def build_cmd(keys, filename): + """ + Produce bash command to echo env vars into filename. + Will always echo the special test var named ``OS_ENV_KEY`` into the file to test whether + ``os.environ`` is passed or not. + """ + return "\n".join(f"echo {k}=${k}>> {filename}" for k in [*keys, OS_ENV_KEY]) + + with TemporaryDirectory() as tmp_dir, patch.dict("os.environ", {OS_ENV_KEY: OS_ENV_VAL}): + tmp_file = Path(tmp_dir, "test.txt") + command = build_cmd(env and env.keys() or [], tmp_file.as_posix()) + hook.run_command(command=["bash", "-c", command], env=env) + actual = dict([x.split("=") for x in tmp_file.read_text().splitlines()]) + assert actual == expected + + +def test_subprocess_hook(): + hook = FullOutputSubprocessHook() + result = hook.run_command(command=["bash", "-c", f'echo "foo"']) + assert result.exit_code == 0 + assert result.output == "foo" + assert result.full_output == ["foo"] + + +@patch("os.getpgid", return_value=123) +@patch("os.killpg") +def test_send_sigint(mock_killpg, mock_getpgid): + hook = FullOutputSubprocessHook() + hook.sub_process = MagicMock() + hook.send_sigint() + mock_killpg.assert_called_with(123, signal.SIGINT) + + +@patch("os.getpgid", return_value=123) +@patch("os.killpg") +def test_send_sigterm(mock_killpg, mock_getpgid): + hook = FullOutputSubprocessHook() + hook.sub_process = MagicMock() + hook.send_sigterm() + mock_killpg.assert_called_with(123, signal.SIGTERM) From cce152c2418a0b397d9f0ddfc94e844841560ff6 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Wed, 31 Jan 2024 21:48:26 -0500 Subject: [PATCH 03/11] add tests --- tests/dbt/test_graph.py | 32 ++++++++++++++++++++++++ tests/operators/test_local.py | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7e941cb49..d3fa37815 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -899,6 +899,38 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( assert ls_command[ls_command.index("--selector") + 1] == selector +@patch("cosmos.dbt.graph.Popen") +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") +@patch("cosmos.config.RenderConfig.validate_dbt_command") +def test_load_via_dbt_ls_render_config_no_partial_parse( + mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir +): + """Tests that the dbt ls command in the subprocess has "--selector" with the RenderConfig.selector.""" + mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 + project_config = ProjectConfig() + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + load_method=LoadMode.DBT_LS, + partial_parse=False + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + execution_config = MagicMock() + dbt_graph = DbtGraph( + project=project_config, + render_config=render_config, + execution_config=execution_config, + profile_config=profile_config, + ) + dbt_graph.load_via_dbt_ls() + ls_command = mock_popen.call_args.args[0] + assert "--no-partial-parse" in ls_command + + @pytest.mark.parametrize("load_method", [LoadMode.DBT_MANIFEST, LoadMode.CUSTOM]) def test_load_method_with_unsupported_render_config_selector_arg(load_method): """Tests that error is raised when RenderConfig.selector is used with LoadMode.DBT_MANIFEST or LoadMode.CUSTOM.""" diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index aa4cb741f..36d2cd46f 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -233,6 +233,22 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +def test_dbt_base_operator_no_partial_parse() -> None: + + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + partial_parse=False, + ) + + cmd, _ = dbt_base_operator.build_cmd( + Context(execution_date=datetime(2023, 2, 15, 12, 30)), + ) + + assert "--no-partial-parse" in cmd + + @pytest.mark.integration def test_run_test_operator_with_callback(failing_test_dbt_project): on_warning_callback = MagicMock() @@ -503,3 +519,33 @@ def test_dbt_docs_local_operator_with_static_flag(): dbt_cmd_flags=["--static"], ) assert operator.required_files == ["static_index.html"] + + +@patch("cosmos.hooks.subprocess.FullOutputSubprocessHook.send_sigint") +def test_dbt_local_operator_on_kill_sigint(mock_send_sigint) -> None: + + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + cancel_query_on_kill=True + ) + + dbt_base_operator.on_kill() + + mock_send_sigint.assert_called_once() + + +@patch("cosmos.hooks.subprocess.FullOutputSubprocessHook.send_sigterm") +def test_dbt_local_operator_on_kill_sigterm(mock_send_sigterm) -> None: + + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + cancel_query_on_kill=False + ) + + dbt_base_operator.on_kill() + + mock_send_sigterm.assert_called_once() From cefc9675a38b5183f04e6091541da8a1ce5ebaad Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 1 Feb 2024 02:49:52 +0000 Subject: [PATCH 04/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/dbt/test_graph.py | 4 +--- tests/operators/test_local.py | 10 ++-------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index cbb1c35e0..18d638be5 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -912,9 +912,7 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( mock_popen().returncode = 0 project_config = ProjectConfig() render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - load_method=LoadMode.DBT_LS, - partial_parse=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, partial_parse=False ) profile_config = ProfileConfig( profile_name="test", diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index d499280db..23b9aca05 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -533,10 +533,7 @@ def test_dbt_docs_local_operator_with_static_flag(): def test_dbt_local_operator_on_kill_sigint(mock_send_sigint) -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( - profile_config=profile_config, - task_id="my-task", - project_dir="my/dir", - cancel_query_on_kill=True + profile_config=profile_config, task_id="my-task", project_dir="my/dir", cancel_query_on_kill=True ) dbt_base_operator.on_kill() @@ -548,10 +545,7 @@ def test_dbt_local_operator_on_kill_sigint(mock_send_sigint) -> None: def test_dbt_local_operator_on_kill_sigterm(mock_send_sigterm) -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( - profile_config=profile_config, - task_id="my-task", - project_dir="my/dir", - cancel_query_on_kill=False + profile_config=profile_config, task_id="my-task", project_dir="my/dir", cancel_query_on_kill=False ) dbt_base_operator.on_kill() From 7863800980ecd17e68627db4c63cf8ff205b82dc Mon Sep 17 00:00:00 2001 From: Daniel Reeves <31971762+dwreeves@users.noreply.github.com> Date: Thu, 15 Feb 2024 10:02:40 -0500 Subject: [PATCH 05/11] Update cosmos/operators/base.py Co-authored-by: Julian LaNeve --- cosmos/operators/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 5e1375ebd..d8fefc523 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -51,7 +51,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): :param skip_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. - :param partial_parse: If True, then the operator will use the + :param partial_parse: If True (default), then the operator will use the ``partial_parse.msgpack`` during execution if it exists. If False, then a flag will be explicitly set to turn off partial parsing. :param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed. From ceaec807f340a73d6f483a8a4df561d4bf374aae Mon Sep 17 00:00:00 2001 From: Daniel Reeves <31971762+dwreeves@users.noreply.github.com> Date: Thu, 15 Feb 2024 10:02:50 -0500 Subject: [PATCH 06/11] Update cosmos/config.py Co-authored-by: Julian LaNeve --- cosmos/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index ea64787da..fabfc8da1 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -47,7 +47,7 @@ class RenderConfig: :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. - :param partial_parse: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. + :param partial_parse: If True (default), then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. """ emit_datasets: bool = True From ef348a205aa7630fc31470a4d0af05602a46aaf9 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Fri, 16 Feb 2024 16:02:07 -0500 Subject: [PATCH 07/11] updates --- cosmos/config.py | 3 --- cosmos/converter.py | 2 +- cosmos/dbt/graph.py | 8 ++++---- cosmos/dbt/project.py | 2 +- cosmos/operators/local.py | 4 ++-- docs/configuration/execution-config.rst | 1 - docs/configuration/render-config.rst | 1 - docs/getting_started/execution-modes.rst | 4 ++++ tests/dbt/test_graph.py | 2 +- tests/dbt/test_project.py | 4 ++-- 10 files changed, 15 insertions(+), 16 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index fabfc8da1..9b17c5b19 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -62,7 +62,6 @@ class RenderConfig: env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None dbt_ls_path: Path | None = None - partial_parse: bool = True project_path: Path | None = field(init=False) @@ -297,7 +296,6 @@ class ExecutionConfig: :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. :param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path - :param partial_parse: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path """ execution_mode: ExecutionMode = ExecutionMode.LOCAL @@ -306,7 +304,6 @@ class ExecutionConfig: dbt_project_path: InitVar[str | Path | None] = None project_path: Path | None = field(init=False) - partial_parse: bool = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None diff --git a/cosmos/converter.py b/cosmos/converter.py index 81c87fe88..07cf144ea 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -246,7 +246,7 @@ def __init__( task_args = { **operator_args, "project_dir": execution_config.project_path, - "partial_parse": execution_config.partial_parse, + "partial_parse": project_config.partial_parse, "profile_config": profile_config, "emit_datasets": render_config.emit_datasets, "env": env_vars, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 85831c61f..f36769649 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -22,7 +22,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ +from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -204,7 +204,7 @@ def run_dbt_ls( if self.render_config.selector: ls_command.extend(["--selector", self.render_config.selector]) - if not self.render_config.partial_parse: + if not self.project.partial_parse: ls_command.append("--no-partial-parse") ls_command.extend(self.local_flags) @@ -251,8 +251,8 @@ def load_via_dbt_ls(self) -> None: tmpdir_path = Path(tmpdir) create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps) - if self.render_config.partial_parse: - copy_manifest_for_partial_parse(self.render_config.project_path, tmpdir_path) + if self.project.partial_parse: + copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path) with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ( self.project.env_vars or self.render_config.env_vars or {} diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index aa10abb83..889987b6d 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -19,7 +19,7 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool os.symlink(project_path / child_name, tmp_dir / child_name) -def copy_manifest_for_partial_parse(project_path: Path, tmp_dir: Path) -> None: +def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None: partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME if partial_parse_file.exists(): diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 45d7a96ee..e08d44b53 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -57,7 +57,7 @@ FullOutputSubprocessResult, ) from cosmos.dbt.parser.output import extract_log_issues, parse_output -from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse +from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse DBT_NO_TESTS_MSG = "Nothing to do" DBT_WARN_MSG = "WARN" @@ -214,7 +214,7 @@ def run_command( create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps) if self.partial_parse: - copy_manifest_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir)) + copy_msgpack_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir)) with self.profile_config.ensure_profile() as profile_values: (profile_path, env_vars) = profile_values diff --git a/docs/configuration/execution-config.rst b/docs/configuration/execution-config.rst index 4df96d978..23b511e37 100644 --- a/docs/configuration/execution-config.rst +++ b/docs/configuration/execution-config.rst @@ -10,4 +10,3 @@ The ``ExecutionConfig`` class takes the following arguments: - ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``dbt_project_path``: Configures the dbt project location accessible at runtime for dag execution. This is the project path in a docker container for ``ExecutionMode.DOCKER`` or ``ExecutionMode.KUBERNETES``. Mutually exclusive with ``ProjectConfig.dbt_project_path``. -- ``partial_parse``: If True, then the operator will use the ``partial_parse.msgpack`` during execution if it exists. If False, then the flag will be explicitly set to turn off partial parsing. For ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``, the ``partial_parse.msgpack`` file will be copied into temporary directory that dbt executes out of. diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 74dcfd756..f3e216712 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -17,7 +17,6 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` -- ``partial_parse``: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index 924e4ba12..7c7417cc7 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -56,6 +56,9 @@ The ``local`` execution mode assumes a ``dbt`` binary is reachable within the Ai If ``dbt`` was not installed as part of the Cosmos packages, users can define a custom path to ``dbt`` by declaring the argument ``dbt_executable_path``. +By default, if Cosmos sees a ``partial_parse.msgpack`` in the target directory of the dbt project directory when using ``local`` execution, it will use this for partial parsing to speed up task execution. +This can be turned off by setting ``partial_parse=False`` in the ``ProjectConfig``. + When using the ``local`` execution mode, Cosmos converts Airflow Connections into a native ``dbt`` profiles file (``profiles.yml``). Example of how to use, for instance, when ``dbt`` was installed together with Cosmos: @@ -76,6 +79,7 @@ The ``virtualenv`` mode isolates the Airflow worker dependencies from ``dbt`` by In this case, users are responsible for declaring which version of ``dbt`` they want to use by giving the argument ``py_requirements``. This argument can be set directly in operator instances or when instantiating ``DbtDag`` and ``DbtTaskGroup`` as part of ``operator_args``. Similar to the ``local`` execution mode, Cosmos converts Airflow Connections into a way ``dbt`` understands them by creating a ``dbt`` profile file (``profiles.yml``). +Also similar to the ``local`` execution mode, Cosmos will by default attempt to use a ``partial_parse.msgpack`` if one exists to speed up parsing. Some drawbacks of this approach: diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 18d638be5..e07cce177 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -907,7 +907,7 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( def test_load_via_dbt_ls_render_config_no_partial_parse( mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir ): - """Tests that the dbt ls command in the subprocess has "--selector" with the RenderConfig.selector.""" + """Tests that --no-partial-parse appears when partial_parse=False.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig() diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index 40591712b..85314b8e5 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -4,7 +4,7 @@ import pytest -from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ +from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -34,7 +34,7 @@ def test_copy_manifest_for_partial_parse(tmp_path, exists): tmp_dir = tmp_path / "tmp_dir" tmp_dir.mkdir() - copy_manifest_for_partial_parse(project_path, tmp_dir) + copy_msgpack_for_partial_parse(project_path, tmp_dir) tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack" From bf05be384b51b459af9211098f215f662dacee75 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Fri, 16 Feb 2024 16:05:01 -0500 Subject: [PATCH 08/11] fix test --- tests/dbt/test_graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index e07cce177..b2dcff22f 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -910,9 +910,9 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( """Tests that --no-partial-parse appears when partial_parse=False.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 - project_config = ProjectConfig() + project_config = ProjectConfig(partial_parse=False) render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, partial_parse=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS ) profile_config = ProfileConfig( profile_name="test", From 134d42467ebcf1be47d8566db78cdb2f06856538 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Feb 2024 21:05:33 +0000 Subject: [PATCH 09/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/dbt/test_graph.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index b2dcff22f..c27b20841 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -911,9 +911,7 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig(partial_parse=False) - render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS - ) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS) profile_config = ProfileConfig( profile_name="test", target_name="test", From 237fbc8e9c5c14481bba7bc5d378d925d09aaafa Mon Sep 17 00:00:00 2001 From: dwreeves Date: Fri, 16 Feb 2024 16:12:45 -0500 Subject: [PATCH 10/11] update docstring --- cosmos/config.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cosmos/config.py b/cosmos/config.py index 9b17c5b19..754f6e135 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -123,6 +123,9 @@ class ProjectConfig: :param dbt_vars: Dictionary of dbt variables for the project. This argument overrides variables defined in your dbt_project.yml file. The dictionary is dumped to a yaml string and passed to dbt commands as the --vars argument. Variables are only supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and ``RenderConfig.LoadMode.CUSTOM`` load mode. + :param partial_parse: If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used + for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV`` + execution modes. """ dbt_project_path: Path | None = None From 8f09927641adc3ac7a9893a9ac12efbecc489b5c Mon Sep 17 00:00:00 2001 From: dwreeves Date: Sun, 18 Feb 2024 15:32:26 -0500 Subject: [PATCH 11/11] update --- cosmos/config.py | 1 - docs/configuration/project-config.rst | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 754f6e135..52763536f 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -47,7 +47,6 @@ class RenderConfig: :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. - :param partial_parse: If True (default), then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists. """ emit_datasets: bool = True diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index c062a1de5..3bf524ac8 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -23,6 +23,9 @@ variables that should be used for rendering and execution. It takes the followin will only be rendered at execution time, not at render time. - ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode. +- ``partial_parse``: (new in v1.4) If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used + for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV`` + execution modes. Project Config Example ----------------------