Skip to content

Commit

Permalink
Introduce CachePurgeConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Jun 10, 2024
1 parent 0395fe1 commit cfed136
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 8 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ New Features
* Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016
* Support ``static_index.html`` docs by @dwreeves in #999
* (WIP) Support caching dbt ls output in Airflow variable in #1014 by @tatiana
- different approach than 1.5.0a1 and 1.5.0a2
- fix log for TaskGroup
- a3 & a4: different approach than 1.5.0a1 and 1.5.0a2
- a4: introduce CachePurgeConfig

Others

Expand Down
35 changes: 35 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from __future__ import annotations

import functools
import hashlib
import os
import shutil
import time
from pathlib import Path

import msgpack
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.config import CachePurgeConfig
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger
Expand Down Expand Up @@ -171,3 +177,32 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P

if source_manifest_filepath.exists():
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


@functools.lru_cache
def should_use_cache() -> bool:
return settings.enable_cache and settings.experimental_cache


def calculate_current_version(cache_identifier: str, project_dir: Path, cache_purge_config: CachePurgeConfig) -> str:
start_time = time.process_time()

# Combined value for when the dbt project directory files were last modified
# This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder)
dbt_combined_last_modified = sum([path.stat().st_mtime for path in project_dir.glob("**/*")])

# The performance for the following will depend on the user's configuration
files_modified_time = sum(path.stat().st_mtime for path in cache_purge_config.file_paths)
airflow_vars = [Variable.get(var_name, "") for var_name in cache_purge_config.airflow_vars]
hash_airflow_vars = hashlib.md5("".join(airflow_vars).encode()).hexdigest()
envvars = [os.getenv(envvar_name, "") for envvar_name in cache_purge_config.env_vars]
hash_envvars = hashlib.md5("".join(envvars).encode()).hexdigest()

elapsed_time = time.process_time() - start_time
logger.info(f"Cosmos performance: time to calculate {cache_identifier} current version: {elapsed_time}")
return f"{dbt_combined_last_modified},{files_modified_time},{hash_airflow_vars},{hash_envvars}"


@functools.lru_cache
def was_project_modified(previous_version: str, current_version: str) -> bool:
return previous_version != current_version
7 changes: 7 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ def is_manifest_available(self) -> bool:
return self.manifest_path.exists()


@dataclass
class CachePurgeConfig:
file_paths: list[Path] = field(default_factory=lambda: list())
airflow_vars: list[str] = field(default_factory=lambda: list())
env_vars: list[str] = field(default_factory=lambda: list())


@dataclass
class ProfileConfig:
"""
Expand Down
4 changes: 3 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.config import CachePurgeConfig, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.selector import retrieve_by_label
Expand Down Expand Up @@ -204,6 +204,7 @@ def __init__(
profile_config: ProfileConfig | None = None,
execution_config: ExecutionConfig | None = None,
render_config: RenderConfig | None = None,
cache_purge_config: CachePurgeConfig = CachePurgeConfig(),
dag: DAG | None = None,
task_group: TaskGroup | None = None,
operator_args: dict[str, Any] | None = None,
Expand Down Expand Up @@ -241,6 +242,7 @@ def __init__(
profile_config=profile_config,
cache_dir=cache_dir,
cache_identifier=cache_identifier,
cache_purge_config=cache_purge_config,
dbt_vars=dbt_vars,
)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
Expand Down
37 changes: 32 additions & 5 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import platform
import tempfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any
Expand All @@ -14,7 +15,7 @@
from airflow.models import Variable

from cosmos import cache, settings
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.config import CachePurgeConfig, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_LOG_FILENAME,
Expand Down Expand Up @@ -133,6 +134,7 @@ class DbtGraph:
nodes: dict[str, DbtNode] = dict()
filtered_nodes: dict[str, DbtNode] = dict()
load_method: LoadMode = LoadMode.AUTOMATIC
current_version: str = ""

def __init__(
self,
Expand All @@ -142,6 +144,7 @@ def __init__(
profile_config: ProfileConfig | None = None,
cache_dir: Path | None = None,
cache_identifier: str = "UNDEFINED",
cache_purge_config: CachePurgeConfig | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
):
Expand All @@ -151,6 +154,7 @@ def __init__(
self.execution_config = execution_config
self.cache_dir = cache_dir
self.cache_identifier = cache_identifier
self.cache_purge_config = cache_purge_config or CachePurgeConfig()
self.dbt_vars = dbt_vars or {}

def load(
Expand Down Expand Up @@ -202,13 +206,29 @@ def load_via_dbt_ls_cache(self) -> bool:
"""
logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_identifier}...")
if settings.enable_cache and settings.experimental_cache:
dbt_ls_cache = Variable.get(self.cache_identifier, "")
if dbt_ls_cache:
project_path = self.render_config.project_path

try:
cache_dict = Variable.get(self.cache_identifier, deserialize_json=True)
except (json.decoder.JSONDecodeError, KeyError):
logger.info(f"Cosmos performance: Cache miss for {self.cache_identifier}")
return False
else:
logger.info(f"Cosmos performance: Cache hit for {self.cache_identifier}")

cache_version = cache_dict["version"]
dbt_ls_cache = cache_dict["dbt_ls"]

current_version = cache.calculate_current_version(
self.cache_identifier, project_path, self.cache_purge_config # type: ignore
)

if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version):
logger.info(
f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_identifier} is {len(dbt_ls_cache.encode('utf-8'))}"
)
self.load_method = LoadMode.DBT_LS_CACHE
project_path = self.render_config.project_path

nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache)
self.nodes = nodes
self.filtered_nodes = nodes
Expand Down Expand Up @@ -249,7 +269,14 @@ def run_dbt_ls(
logger.debug(line.strip())

if settings.enable_cache and settings.experimental_cache:
Variable.set(self.cache_identifier, stdout)
cache_dict = {
"version": cache.calculate_current_version(
self.cache_identifier, project_path, self.cache_purge_config
),
"dbt_ls": stdout,
"last_modified": datetime.now().isoformat(),
}
Variable.set(self.cache_identifier, cache_dict, serialize_json=True)

nodes = parse_dbt_ls_output(project_path, stdout)
return nodes
Expand Down
3 changes: 3 additions & 0 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.config import CachePurgeConfig
from cosmos.constants import InvocationMode
from cosmos.profiles import PostgresUserPasswordProfileMapping

Expand All @@ -29,6 +30,8 @@
invocation_mode=InvocationMode.SUBPROCESS,
)

cache_purge_config = CachePurgeConfig(file_paths=Path(__file__), airflow_vars=["purge_cache"], env_vars=["PURGE_CACHE"])


@dag(
schedule_interval="@daily",
Expand Down

0 comments on commit cfed136

Please sign in to comment.