Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial parsing #800

Merged
merged 19 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RenderConfig:
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param partial_parse: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists.
dwreeves marked this conversation as resolved.
Show resolved Hide resolved
"""

emit_datasets: bool = True
Expand All @@ -61,6 +62,7 @@ class RenderConfig:
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
dbt_ls_path: Path | None = None
partial_parse: bool = True

project_path: Path | None = field(init=False)

Expand Down Expand Up @@ -141,6 +143,7 @@ def __init__(
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
partial_parse: bool = True,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
Expand All @@ -166,6 +169,7 @@ def __init__(

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

def validate_project(self) -> None:
"""
Expand Down Expand Up @@ -292,7 +296,8 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param partial_parse: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
Expand All @@ -301,6 +306,7 @@ class ExecutionConfig:

dbt_project_path: InitVar[str | Path | None] = None
project_path: Path | None = field(init=False)
partial_parse: bool = True
jbandoro marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
1 change: 1 addition & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def __init__(
task_args = {
**operator_args,
"project_dir": execution_config.project_path,
"partial_parse": execution_config.partial_parse,
"profile_config": profile_config,
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
Expand Down
8 changes: 7 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -204,6 +204,9 @@
if self.render_config.selector:
ls_command.extend(["--selector", self.render_config.selector])

if not self.render_config.partial_parse:
ls_command.append("--no-partial-parse")

Check warning on line 208 in cosmos/dbt/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/dbt/graph.py#L208

Added line #L208 was not covered by tests

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)
Expand Down Expand Up @@ -248,6 +251,9 @@
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.render_config.partial_parse:
copy_manifest_for_partial_parse(self.render_config.project_path, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
Expand Down
16 changes: 12 additions & 4 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

from pathlib import Path
import shutil
import os
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.constants import DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME
from contextlib import contextmanager
from typing import Generator

Expand All @@ -21,6 +19,16 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def copy_manifest_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
jbandoro marked this conversation as resolved.
Show resolved Hide resolved
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME))
jlaneve marked this conversation as resolved.
Show resolved Hide resolved


@contextmanager
def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
"""Temporarily set environment variables inside the context manager and restore
Expand Down
6 changes: 6 additions & 0 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@ def send_sigterm(self) -> None:
logger.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)
16 changes: 9 additions & 7 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
:param skip_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param partial_parse: If True, then the operator will use the
dwreeves marked this conversation as resolved.
Show resolved Hide resolved
``partial_parse.msgpack`` during execution if it exists. If False, then
a flag will be explicitly set to turn off partial parsing.
:param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed.
Otherwise, the query will keep running when the task is killed.
:param dbt_executable_path: Path to dbt executable can be used with venv
Expand All @@ -68,13 +71,7 @@
"vars",
"models",
)
global_boolean_flags = (
"no_version_check",
"cache_selected_only",
"fail_fast",
"quiet",
"warn_error",
)
global_boolean_flags = ("no_version_check", "cache_selected_only", "fail_fast", "quiet", "warn_error")

intercept_flag = True

Expand Down Expand Up @@ -105,6 +102,7 @@
append_env: bool = False,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
partial_parse: bool = True,
cancel_query_on_kill: bool = True,
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
Expand All @@ -131,6 +129,7 @@
self.append_env = append_env
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.partial_parse = partial_parse
self.cancel_query_on_kill = cancel_query_on_kill
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
Expand Down Expand Up @@ -219,6 +218,9 @@

dbt_cmd.extend(self.dbt_cmd_global_flags)

if not self.partial_parse:
dbt_cmd.append("--no-partial-parse")

Check warning on line 222 in cosmos/operators/base.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/base.py#L222

Added line #L222 was not covered by tests

dbt_cmd.extend(self.base_cmd)

if self.indirect_selection:
Expand Down
16 changes: 11 additions & 5 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@

from sqlalchemy.orm import Session

from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
DBT_TARGET_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
)
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
Expand All @@ -52,7 +57,7 @@
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse

DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"
Expand Down Expand Up @@ -208,6 +213,9 @@

create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps)

if self.partial_parse:
copy_manifest_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir))

with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
Expand Down Expand Up @@ -374,9 +382,7 @@

def on_kill(self) -> None:
if self.cancel_query_on_kill:
self.subprocess_hook.log.info("Sending SIGINT signal to process group")
if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"):
os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT)
self.subprocess_hook.send_sigint()

Check warning on line 385 in cosmos/operators/local.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/local.py#L385

Added line #L385 was not covered by tests
else:
self.subprocess_hook.send_sigterm()

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/execution-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ The ``ExecutionConfig`` class takes the following arguments:
- ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection.
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``partial_parse``: If True, then the operator will use the ``partial_parse.msgpack`` during execution if it exists. If False, then the flag will be explicitly set to turn off partial parsing. For ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``, the ``partial_parse.msgpack`` file will be copied into temporary directory that dbt executes out of.
1 change: 1 addition & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``partial_parse``: If True, then when ``load_method=LoadMode.DBT_LS``, attempt to use the ``partial_parse.msgpack`` file if it exists.

Customizing how nodes are rendered (experimental)
-------------------------------------------------
Expand Down
31 changes: 29 additions & 2 deletions tests/dbt/test_project.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from cosmos.dbt.project import create_symlinks, environ
import os
from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos.dbt.project import create_symlinks, copy_manifest_for_partial_parse, environ

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


Expand All @@ -17,6 +20,30 @@ def test_create_symlinks(tmp_path):
assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages")


@pytest.mark.parametrize("exists", [True, False])
def test_copy_manifest_for_partial_parse(tmp_path, exists):
project_path = tmp_path / "project"
target_path = project_path / "target"
partial_parse_file = target_path / "partial_parse.msgpack"

target_path.mkdir(parents=True)

if exists:
partial_parse_file.write_bytes(b"")

tmp_dir = tmp_path / "tmp_dir"
tmp_dir.mkdir()

copy_manifest_for_partial_parse(project_path, tmp_dir)

tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack"

if exists:
assert tmp_partial_parse_file.exists()
else:
assert not tmp_partial_parse_file.exists()


@patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"})
def test_environ_context_manager():
# Define the expected environment variables
Expand Down
Empty file added tests/hooks/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions tests/hooks/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import MagicMock, patch
import signal

import pytest

from cosmos.hooks.subprocess import FullOutputSubprocessHook

OS_ENV_KEY = "SUBPROCESS_ENV_TEST"
OS_ENV_VAL = "this-is-from-os-environ"


@pytest.mark.parametrize(
"env,expected",
[
({"ABC": "123", "AAA": "456"}, {"ABC": "123", "AAA": "456", OS_ENV_KEY: ""}),
({}, {OS_ENV_KEY: ""}),
(None, {OS_ENV_KEY: OS_ENV_VAL}),
],
ids=["with env", "empty env", "no env"],
)
def test_env(env, expected):
"""
Test that env variables are exported correctly to the command environment.
When ``env`` is ``None``, ``os.environ`` should be passed to ``Popen``.
Otherwise, the variables in ``env`` should be available, and ``os.environ`` should not.
"""
hook = FullOutputSubprocessHook()

def build_cmd(keys, filename):
"""
Produce bash command to echo env vars into filename.
Will always echo the special test var named ``OS_ENV_KEY`` into the file to test whether
``os.environ`` is passed or not.
"""
return "\n".join(f"echo {k}=${k}>> {filename}" for k in [*keys, OS_ENV_KEY])

with TemporaryDirectory() as tmp_dir, patch.dict("os.environ", {OS_ENV_KEY: OS_ENV_VAL}):
tmp_file = Path(tmp_dir, "test.txt")
command = build_cmd(env and env.keys() or [], tmp_file.as_posix())
hook.run_command(command=["bash", "-c", command], env=env)
actual = dict([x.split("=") for x in tmp_file.read_text().splitlines()])
assert actual == expected


def test_subprocess_hook():
hook = FullOutputSubprocessHook()
result = hook.run_command(command=["bash", "-c", f'echo "foo"'])
assert result.exit_code == 0
assert result.output == "foo"
assert result.full_output == ["foo"]


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigint(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigint()
mock_killpg.assert_called_with(123, signal.SIGINT)


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigterm(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigterm()
mock_killpg.assert_called_with(123, signal.SIGTERM)
Loading