Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance by 22-35% or more by caching partial parse artefact #904

Merged
merged 20 commits into from
May 1, 2024
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
901651d
Cache Cosmos-generated dbt partial parse in Airflow
tatiana Mar 28, 2024
cc5f797
Extend caching of partial parse to running tasks
tatiana Mar 28, 2024
9f67153
Allow users to disable mock profile
tatiana Mar 28, 2024
b51dcfe
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 28, 2024
9584aa4
Fix broken unittests
tatiana Mar 28, 2024
d91cb0d
Fix type-check
tatiana Mar 28, 2024
8f93b30
Refactor so we leverage cache even if user instantiates operator dire…
tatiana Mar 29, 2024
88c6a75
Fix broken unit tests
tatiana Apr 17, 2024
cf6a639
Increase test coverage and fix implementation
tatiana Apr 18, 2024
466c271
Fix broken unittest and add integration partial parsing test
tatiana Apr 19, 2024
a377b60
Try to overcome failing test in CI - that works locally
tatiana Apr 19, 2024
42aacaf
Restore test_plugin.py
tatiana Apr 19, 2024
bb4d353
Simplify how cache is set at the operator level
tatiana Apr 24, 2024
c66f808
Add test to confirm behaviour of converter on cache dir
tatiana Apr 24, 2024
6401096
Remove unused copy_msgpack_for_partial_parse
tatiana May 1, 2024
5e98dec
Use InvocationMode.DBT_RUNNER in another example DAG
tatiana May 1, 2024
12ca7f5
Add missing docstring information
tatiana May 1, 2024
7424a7f
Rely on Airflow conf fallback, removing redundancy
tatiana May 1, 2024
02ee94c
Make cache functions non-public, as advised in the code-review
tatiana May 1, 2024
2a3f007
Create cache identifier using double underscore to differentiate betw…
tatiana May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Cache Cosmos-generated dbt partial parse in Airflow
Support to creating/caching dbt partial parse files into a (optionally user-defined) temporary directory.
Users can customise the temporary directory by using the Airflow conf cosmos.cache_dir.

Affects DAG parsing.
  • Loading branch information
tatiana committed May 1, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 901651d511e880131831a1bd76d158690f5d0dac
97 changes: 97 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

import shutil
from enum import Enum
from pathlib import Path

from cosmos import settings
from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path


def create_cache_identifier(dag_name: str, task_group_name: str = "") -> str:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a DAG name and a (optional) task_group_name, create the identifier for caching.

:param dag_name: Name of the Cosmos DbtDag being cached
:param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached
:return: Unique identifier representing the cache
"""
if task_group_name:
cache_identifier = "_".join([dag_name, task_group_name])
else:
cache_identifier = dag_name
return cache_identifier


def obtain_cache_dir_path(cache_identifier: str) -> Path:
"""
Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory
does not exist, create it.

:param cache_identifier: Unique key used as a cache identifier
:return: Path to directory used to cache this specific Cosmos DbtDag or DbtTaskGroup
"""
cache_dir_path = settings.cache_dir / cache_identifier
tmp_target_dir = cache_dir_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(parents=True, exist_ok=True)
return cache_dir_path


def get_timestamp(path: Path) -> float:
"""
Return the timestamp of a path or 0, if it does not exist.

:param path: Path to the file or directory of interest
:return: File or directory timestamp
"""
try:
timestamp = path.stat().st_mtime
except FileNotFoundError:
timestamp = 0
return timestamp


def get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | None:
"""
Return the path to the latest partial parse file, if defined.

:param dbt_project_path: Original dbt project path
:param cache_dir: Path to the Cosmos project cache directory
:return: Either return the Path to the latest partial parse file, or None.
"""
project_partial_parse_path = get_partial_parse_path(dbt_project_path)
cosmos_cached_partial_parse_filepath = get_partial_parse_path(cache_dir)

age_project_partial_parse = get_timestamp(project_partial_parse_path)
age_cosmos_cached_partial_parse_filepath = get_timestamp(cosmos_cached_partial_parse_filepath)

if age_project_partial_parse and age_cosmos_cached_partial_parse_filepath:
if age_project_partial_parse > age_cosmos_cached_partial_parse_filepath:
return project_partial_parse_path
else:
return cosmos_cached_partial_parse_filepath
elif age_project_partial_parse:
return project_partial_parse_path
elif age_cosmos_cached_partial_parse_filepath:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
return cosmos_cached_partial_parse_filepath

return None


def update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None:
"""
Update the cache to have the latest partial parse file contents.

:param latest_partial_parse_filepath: Path to the most up-to-date partial parse file
:param cache_dir: Path to the Cosmos project cache directory
"""
cache_path = get_partial_parse_path(cache_dir)
shutil.copy(str(latest_partial_parse_filepath), str(cache_path))


def copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: Path) -> None:
target_partial_parse_file = get_partial_parse_path(project_path)
tmp_target_dir = project_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)
shutil.copy(str(partial_parse_filepath), str(target_partial_parse_file))
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
5 changes: 5 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import cache
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
@@ -234,11 +235,15 @@ def __init__(
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
cache_dir = dag and cache.obtain_cache_dir_path(
cache_identifier=cache.create_cache_identifier(dag.dag_id, task_group and task_group.group_id)
)
self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
cache_dir=cache_dir,
dbt_vars=dbt_vars,
)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
21 changes: 17 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@

import yaml

from cosmos import cache
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_LOG_DIR_NAME,
@@ -23,7 +24,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ
from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ, get_partial_parse_path
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

@@ -136,13 +137,15 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
cache_dir: Path | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.cache_dir = cache_dir
self.dbt_vars = dbt_vars or {}

def load(
@@ -250,10 +253,15 @@ def load_via_dbt_ls(self) -> None:
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
)
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse:
copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path)
abs_project_path = self.render_config.project_path.absolute()
create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse and self.cache_dir:
latest_partial_parse = cache.get_latest_partial_parse(abs_project_path, self.cache_dir)
logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse)
if latest_partial_parse is not None:
cache.copy_partial_parse_to_project(latest_partial_parse, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
@@ -288,6 +296,11 @@ def load_via_dbt_ls(self) -> None:
self.nodes = nodes
self.filtered_nodes = nodes

if self.project.partial_parse and self.cache_dir:
partial_parse_file = get_partial_parse_path(tmpdir_path)
if partial_parse_file.exists():
cache.update_partial_parse_cache(partial_parse_file, self.cache_dir)

def load_via_dbt_ls_file(self) -> None:
"""
This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a
9 changes: 8 additions & 1 deletion cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -20,8 +20,15 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def get_partial_parse_path(project_dir_path: Path) -> Path:
"""
Return the partial parse (partial_parse.msgpack) path for a given dbt project directory.
"""
return project_dir_path / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME


def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME
partial_parse_file = get_partial_parse_path(project_path)

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
9 changes: 9 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import tempfile
from pathlib import Path

from airflow.configuration import conf

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME

default_cache_dir = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=default_cache_dir) or default_cache_dir)