Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial parsing #800

Merged
merged 19 commits into from
Feb 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support partial parsing
dwreeves committed Jan 15, 2024
commit 849f36ebe9c10b5a9b712332f64cdb1234ba22f2
8 changes: 7 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ class RenderConfig:
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param partial_parse: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists.
dwreeves marked this conversation as resolved.
Show resolved Hide resolved
"""

emit_datasets: bool = True
@@ -61,6 +62,7 @@ class RenderConfig:
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
dbt_ls_path: Path | None = None
partial_parse: bool = True

project_path: Path | None = field(init=False)

@@ -141,6 +143,7 @@ def __init__(
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
partial_parse: bool = True,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
@@ -166,6 +169,7 @@ def __init__(

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

def validate_project(self) -> None:
"""
@@ -292,7 +296,8 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param partial_parse: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
@@ -301,6 +306,7 @@ class ExecutionConfig:

dbt_project_path: InitVar[str | Path | None] = None
project_path: Path | None = field(init=False)
partial_parse: bool = True
jbandoro marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

1 change: 1 addition & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
@@ -246,6 +246,7 @@ def __init__(
task_args = {
**operator_args,
"project_dir": execution_config.project_path,
"partial_parse": execution_config.partial_parse,
"profile_config": profile_config,
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
8 changes: 7 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

@@ -199,6 +199,9 @@
if self.render_config.selector:
ls_command.extend(["--selector", self.render_config.selector])

if not self.render_config.partial_parse:
ls_command.append("--no-partial-parse")

Check warning on line 203 in cosmos/dbt/graph.py

Codecov / codecov/patch

cosmos/dbt/graph.py#L203

Added line #L203 was not covered by tests

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)
@@ -243,6 +246,9 @@
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.render_config.partial_parse:
copy_manifest_for_partial_parse(self.render_config.project_path, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
16 changes: 12 additions & 4 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

from pathlib import Path
import shutil
import os
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.constants import DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME
from contextlib import contextmanager
from typing import Generator

@@ -21,6 +19,16 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def copy_manifest_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
jbandoro marked this conversation as resolved.
Show resolved Hide resolved
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME))
jlaneve marked this conversation as resolved.
Show resolved Hide resolved


@contextmanager
def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
"""Temporarily set environment variables inside the context manager and restore
6 changes: 6 additions & 0 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
@@ -105,3 +105,9 @@
logger.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)

Check warning on line 113 in cosmos/hooks/subprocess.py

Codecov / codecov/patch

cosmos/hooks/subprocess.py#L111-L113

Added lines #L111 - L113 were not covered by tests
16 changes: 9 additions & 7 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
@@ -51,6 +51,9 @@
:param skip_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param partial_parse: If True, then the operator will use the
dwreeves marked this conversation as resolved.
Show resolved Hide resolved
``partial_parse.msgpack`` during execution if it exists. If False, then
a flag will be explicitly set to turn off partial parsing.
:param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed.
Otherwise, the query will keep running when the task is killed.
:param dbt_executable_path: Path to dbt executable can be used with venv
@@ -68,13 +71,7 @@
"vars",
"models",
)
global_boolean_flags = (
"no_version_check",
"cache_selected_only",
"fail_fast",
"quiet",
"warn_error",
)
global_boolean_flags = ("no_version_check", "cache_selected_only", "fail_fast", "quiet", "warn_error")

intercept_flag = True

@@ -105,6 +102,7 @@
append_env: bool = False,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
partial_parse: bool = True,
cancel_query_on_kill: bool = True,
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
@@ -131,6 +129,7 @@
self.append_env = append_env
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.partial_parse = partial_parse
self.cancel_query_on_kill = cancel_query_on_kill
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
@@ -219,6 +218,9 @@

dbt_cmd.extend(self.dbt_cmd_global_flags)

if not self.partial_parse:
dbt_cmd.append("--no-partial-parse")

Check warning on line 222 in cosmos/operators/base.py

Codecov / codecov/patch

cosmos/operators/base.py#L222

Added line #L222 was not covered by tests

dbt_cmd.extend(self.base_cmd)

if self.indirect_selection:
16 changes: 11 additions & 5 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
@@ -34,7 +34,12 @@

from sqlalchemy.orm import Session

from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
DBT_TARGET_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
)
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
@@ -51,7 +56,7 @@
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse

DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"
@@ -208,6 +213,9 @@

create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps)

if self.partial_parse:
copy_manifest_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir))

with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
@@ -377,9 +385,7 @@

def on_kill(self) -> None:
if self.cancel_query_on_kill:
self.subprocess_hook.log.info("Sending SIGINT signal to process group")
if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"):
os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT)
self.subprocess_hook.send_sigint()

Check warning on line 388 in cosmos/operators/local.py

Codecov / codecov/patch

cosmos/operators/local.py#L388

Added line #L388 was not covered by tests
else:
self.subprocess_hook.send_sigterm()

1 change: 1 addition & 0 deletions docs/configuration/execution-config.rst
Original file line number Diff line number Diff line change
@@ -10,3 +10,4 @@ The ``ExecutionConfig`` class takes the following arguments:
- ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection.
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``partial_parse``: If True, then the operator will use the ``partial_parse.msgpack`` during execution if it exists. If False, then the flag will be explicitly set to turn off partial parsing. For ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``, the ``partial_parse.msgpack`` file will be copied into temporary directory that dbt executes out of.
1 change: 1 addition & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``partial_parse``: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists.

Customizing how nodes are rendered (experimental)
-------------------------------------------------
31 changes: 29 additions & 2 deletions tests/dbt/test_project.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from cosmos.dbt.project import create_symlinks, environ
import os
from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


@@ -17,6 +20,30 @@ def test_create_symlinks(tmp_path):
assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages")


@pytest.mark.parametrize("exists", [True, False])
def test_copy_manifest_for_partial_parse(tmp_path, exists):
project_path = tmp_path / "project"
target_path = project_path / "target"
partial_parse_file = target_path / "partial_parse.msgpack"

target_path.mkdir(parents=True)

if exists:
partial_parse_file.write_bytes(b"")

tmp_dir = tmp_path / "tmp_dir"
tmp_dir.mkdir()

copy_manifest_for_partial_parse(project_path, tmp_dir)

tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack"

if exists:
assert tmp_partial_parse_file.exists()
else:
assert not tmp_partial_parse_file.exists()


@patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"})
def test_environ_context_manager():
# Define the expected environment variables