Skip to content

Commit

Permalink
Improve performance by 22-35% or more by caching partial parse artefa…
Browse files Browse the repository at this point in the history
…ct (#904)

Improve the performance to run the benchmark DAG with 100 tasks by 34%
and the benchmark DAG with 10 tasks by 22%, by persisting the dbt
partial parse artifact in Airflow nodes. This performance can be even
higher in the case of dbt projects that take more time to be parsed.

With the introduction of #800, Cosmos supports using dbt partial parsing
files. This feature has led to a substantial performance improvement,
particularly for large dbt projects, both during Airflow DAG parsing
(using LoadMode.DBT_LS) and also Airflow task execution (when using
`ExecutionMode.LOCAL` and `ExecutionMode.VIRTUALENV`).

There were two limitations with the initial support to partial parsing,
which the current PR aims to address:

1. DAGs using Cosmos `ProfileMapping` classes could not leverage this
feature. This is because the partial parsing relies on profile files not
changing, and by default, Cosmos would mock the dbt profile in several
parts of the code. The consequence is that users trying Cosmos 1.4.0a1
will see the following message:
```
13:33:16  Unable to do partial parsing because profile has changed
13:33:16  Unable to do partial parsing because env vars used in profiles.yml have changed
```

2. The user had to explicitly provide a `partial_parse.msgpack` file in
the original project folder for their Airflow deployment - and if, for
any reason, this became outdated, the user would not leverage the
partial parsing feature. Since Cosmos runs dbt tasks from within a
temporary directory, the partial parse would be stale for some users, it
would be updated in the temporary directory, but the next time the task
was run, Cosmos/dbt would not leverage the recently updated
`partial_parse.msgpack` file.

The current PR addresses these two issues respectfully by:

1. Allowing users that want to leverage Cosmos `ProfileMapping` and
partial parsing to use `RenderConfig(enable_mock_profile=False)`

2. Introducing a Cosmos cache directory where we are persisting partial
parsing files. This feature is enabled by default, but users can opt out
by setting the Airflow configuration `[cosmos][enable_cache] = False`
(exporting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE=0`).
Users can also define the temporary directory used to store these files
using the `[cosmos][cache_dir]` Airflow configuration. By default,
Cosmos will create and use a folder `cosmos` inside the system's
temporary directory:
https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir .

This PR affects both DAG parsing and task execution. Although it does
not introduce an optimisation per se, it makes the partial parse feature
implemented #800 available to more users.

Closes: #722

I updated the documentation in the PR: #898

Some future steps related to optimization associated to caching to be
addressed in separate PRs:
i. Change how we create mocked profiles, to create the file itself in
the same way, referencing an environment variable with the same name -
and only changing the value of the environment variable (#924)
ii. Extend caching to the `profiles.yml` created by Cosmos in the newly
introduced `tmp/cosmos` without the need to recreate it every time
(#925).
iii. Extend caching to the Airflow DAG/Task group as a pickle file -
this approach is more generic and would work for every type of DAG
parsing and executor. (#926)
iv. Support persisting/fetching the cache from remote storage so we
don't have to replicate it for every Airflow scheduler and worker node.
(#927)
v. Cache dbt deps lock file/avoid installing dbt steps every time. We
can leverage `package-lock.yml` introduced in dbt t 1.7
(https://docs.getdbt.com/reference/commands/deps#predictable-package-installs),
but ideally, we'd have a strategy to support older versions of dbt as
well. (#930)
vi. Support caching `partial_parse.msgpack` even when vars change:
https://medium.com/@sebastian.daum89/how-to-speed-up-single-dbt-invocations-when-using-changing-dbt-variables-b9d91ce3fb0d
vii. Support partial parsing in Docker and Kubernetes Cosmos executors
(#929)
viii. Centralise all the Airflow-based config into Cosmos settings.py &
create a dedicated docs page containing information about these (#928)

**How to validate this change**

Run the performance benchmark against this and the `main` branch,
checking the value of `/tmp/performance_results.txt`.

Example of commands run locally:

```
# Setup
AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:[email protected]:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance-setup

# Run test for 100 dbt models per DAG:
MODEL_COUNT=100 AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:[email protected]:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance
```

An example of output when running 100 with the main branch:
```
NUM_MODELS=100
TIME=114.18614888191223
MODELS_PER_SECOND=0.8757629623135543
DBT_VERSION=1.7.13
```

And with the current PR:
```
NUM_MODELS=100
TIME=75.17766404151917
MODELS_PER_SECOND=1.33018232576064
DBT_VERSION=1.7.13
```
  • Loading branch information
tatiana authored May 1, 2024
1 parent 626efdc commit 458be0c
Show file tree
Hide file tree
Showing 18 changed files with 426 additions and 70 deletions.
124 changes: 124 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import shutil
from pathlib import Path

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path


# It was considered to create a cache identifier based on the dbt project path, as opposed
# to where it is used in Airflow. However, we could have concurrency issues if the same
# dbt cached directory was being used by different dbt task groups or DAGs within the same
# node. For this reason, as a starting point, the cache is identified by where it is used.
# This can be reviewed in the future.
def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
"""
Given a DAG name and a (optional) task_group_name, create the identifier for caching.
:param dag_name: Name of the Cosmos DbtDag being cached
:param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached
:return: Unique identifier representing the cache
"""
if task_group:
if task_group.dag_id is not None:
cache_identifiers_list = [task_group.dag_id]
if task_group.group_id is not None:
cache_identifiers_list.extend([task_group.group_id.replace(".", "__")])
cache_identifier = "__".join(cache_identifiers_list)
else:
cache_identifier = dag.dag_id

return cache_identifier


def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path:
"""
Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory
does not exist, create it.
:param cache_identifier: Unique key used as a cache identifier
:param base_dir: Root directory where cache will be stored
:return: Path to directory used to cache this specific Cosmos DbtDag or DbtTaskGroup
"""
cache_dir_path = base_dir / cache_identifier
tmp_target_dir = cache_dir_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(parents=True, exist_ok=True)
return cache_dir_path


def _get_timestamp(path: Path) -> float:
"""
Return the timestamp of a path or 0, if it does not exist.
:param path: Path to the file or directory of interest
:return: File or directory timestamp
"""
try:
timestamp = path.stat().st_mtime
except FileNotFoundError:
timestamp = 0
return timestamp


def _get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | None:
"""
Return the path to the latest partial parse file, if defined.
:param dbt_project_path: Original dbt project path
:param cache_dir: Path to the Cosmos project cache directory
:return: Either return the Path to the latest partial parse file, or None.
"""
project_partial_parse_path = get_partial_parse_path(dbt_project_path)
cosmos_cached_partial_parse_filepath = get_partial_parse_path(cache_dir)

age_project_partial_parse = _get_timestamp(project_partial_parse_path)
age_cosmos_cached_partial_parse_filepath = _get_timestamp(cosmos_cached_partial_parse_filepath)

if age_project_partial_parse and age_cosmos_cached_partial_parse_filepath:
if age_project_partial_parse > age_cosmos_cached_partial_parse_filepath:
return project_partial_parse_path
else:
return cosmos_cached_partial_parse_filepath
elif age_project_partial_parse:
return project_partial_parse_path
elif age_cosmos_cached_partial_parse_filepath:
return cosmos_cached_partial_parse_filepath

return None


def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None:
"""
Update the cache to have the latest partial parse file contents.
:param latest_partial_parse_filepath: Path to the most up-to-date partial parse file
:param cache_dir: Path to the Cosmos project cache directory
"""
cache_path = get_partial_parse_path(cache_dir)
manifest_path = get_partial_parse_path(cache_dir).parent / DBT_MANIFEST_FILE_NAME
latest_manifest_filepath = latest_partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME

shutil.copy(str(latest_partial_parse_filepath), str(cache_path))
shutil.copy(str(latest_manifest_filepath), str(manifest_path))


def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: Path) -> None:
"""
Update target dbt project directory to have the latest partial parse file contents.
:param partial_parse_filepath: Path to the most up-to-date partial parse file
:param project_path: Path to the target dbt project directory
"""
target_partial_parse_file = get_partial_parse_path(project_path)
tmp_target_dir = project_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

source_manifest_filepath = partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME
target_manifest_filepath = target_partial_parse_file.parent / DBT_MANIFEST_FILE_NAME
shutil.copy(str(partial_parse_filepath), str(target_partial_parse_file))
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))
6 changes: 4 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class RenderConfig:
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
"""

emit_datasets: bool = True
Expand All @@ -68,8 +69,8 @@ class RenderConfig:
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
dbt_ls_path: Path | None = None

project_path: Path | None = field(init=False)
enable_mock_profile: bool = True

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down Expand Up @@ -288,7 +289,8 @@ def ensure_profile(
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
logger.info(
"Creating temporary profiles.yml at %s with the following contents:\n%s",
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
profile_contents,
)
Expand Down
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
19 changes: 7 additions & 12 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
Expand Down Expand Up @@ -214,8 +215,6 @@ def __init__(

validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)

Expand All @@ -224,21 +223,16 @@ def __init__(
env_vars = project_config.env_vars or operator_args.get("env")
dbt_vars = project_config.dbt_vars or operator_args.get("vars")

# Previously, we were creating a cosmos.dbt.project.DbtProject
# DbtProject has now been replaced with ProjectConfig directly
# since the interface of the two classes were effectively the same
# Under this previous implementation, we were passing:
# - name, root dir, models dir, snapshots dir and manifest path
# Internally in the dbtProject class, we were defaulting the profile_path
# To be root dir/profiles.yml
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
cache_dir = None
if settings.enable_cache:
cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache._create_cache_identifier(dag, task_group))

self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
cache_dir=cache_dir,
dbt_vars=dbt_vars,
)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
Expand All @@ -251,6 +245,7 @@ def __init__(
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
"vars": dbt_vars,
"cache_dir": cache_dir,
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
Expand Down
29 changes: 21 additions & 8 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import yaml

from cosmos import cache
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_LOG_DIR_NAME,
Expand All @@ -23,7 +24,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -73,7 +74,7 @@ def name(self) -> str:
def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env_vars.keys())
logger.debug("Environment variable keys: %s", env_vars.keys())
process = Popen(
command,
stdout=PIPE,
Expand Down Expand Up @@ -136,13 +137,15 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
cache_dir: Path | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.cache_dir = cache_dir
self.dbt_vars = dbt_vars or {}

def load(
Expand Down Expand Up @@ -250,14 +253,19 @@ def load_via_dbt_ls(self) -> None:
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
)
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse:
copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path)
abs_project_path = self.render_config.project_path.absolute()
create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
if self.project.partial_parse and self.cache_dir:
latest_partial_parse = cache._get_latest_partial_parse(abs_project_path, self.cache_dir)
logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse)
if latest_partial_parse is not None:
cache._copy_partial_parse_to_project(latest_partial_parse, tmpdir_path)

with self.profile_config.ensure_profile(
use_mock_values=self.render_config.enable_mock_profile
) as profile_values, environ(self.project.env_vars or self.render_config.env_vars or {}):
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)
Expand Down Expand Up @@ -288,6 +296,11 @@ def load_via_dbt_ls(self) -> None:
self.nodes = nodes
self.filtered_nodes = nodes

if self.project.partial_parse and self.cache_dir:
partial_parse_file = get_partial_parse_path(tmpdir_path)
if partial_parse_file.exists():
cache._update_partial_parse_cache(partial_parse_file, self.cache_dir)

def load_via_dbt_ls_file(self) -> None:
"""
This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a
Expand Down
13 changes: 5 additions & 8 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME))
def get_partial_parse_path(project_dir_path: Path) -> Path:
"""
Return the partial parse (partial_parse.msgpack) path for a given dbt project directory.
"""
return project_dir_path / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME


@contextmanager
Expand Down
6 changes: 6 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
from abc import ABCMeta, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Any, Sequence, Tuple

import yaml
Expand All @@ -10,6 +12,7 @@
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.strings import to_boolean

from cosmos import cache
from cosmos.dbt.executable import get_system_dbt
from cosmos.log import get_logger

Expand Down Expand Up @@ -61,6 +64,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
(i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt)
:param dbt_cmd_flags: List of flags to pass to dbt command
:param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command
:param cache_dir: Directory used to cache Cosmos/dbt artifacts in Airflow worker nodes
"""

template_fields: Sequence[str] = ("env", "select", "exclude", "selector", "vars", "models")
Expand Down Expand Up @@ -108,6 +112,7 @@ def __init__(
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
dbt_cmd_global_flags: list[str] | None = None,
cache_dir: Path | None = None,
**kwargs: Any,
) -> None:
self.project_dir = project_dir
Expand Down Expand Up @@ -135,6 +140,7 @@ def __init__(
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
self.dbt_cmd_global_flags = dbt_cmd_global_flags or []
self.cache_dir = cache_dir
super().__init__(**kwargs)

def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]:
Expand Down
Loading

0 comments on commit 458be0c

Please sign in to comment.