diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d24afeef8..1df10c1c1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,8 +10,8 @@ New Features * Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016 * Support ``static_index.html`` docs by @dwreeves in #999 * (WIP) Support caching dbt ls output in Airflow variable in #1014 by @tatiana - - different approach than 1.5.0a1 and 1.5.0a2 - - fix log for TaskGroup + - a3 & a4: different approach than 1.5.0a1 and 1.5.0a2 + - a4: introduce CachePurgeConfig Others diff --git a/cosmos/cache.py b/cosmos/cache.py index 1e0b341f0..2ba6b8d14 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,13 +1,19 @@ from __future__ import annotations +import functools +import hashlib +import os import shutil +import time from pathlib import Path import msgpack +from airflow.models import Variable from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup from cosmos import settings +from cosmos.config import CachePurgeConfig from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME from cosmos.dbt.project import get_partial_parse_path from cosmos.log import get_logger @@ -171,3 +177,32 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P if source_manifest_filepath.exists(): shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) + + +@functools.lru_cache +def should_use_cache() -> bool: + return settings.enable_cache and settings.experimental_cache + + +def calculate_current_version(cache_identifier: str, project_dir: Path, cache_purge_config: CachePurgeConfig) -> str: + start_time = time.process_time() + + # Combined value for when the dbt project directory files were last modified + # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) + dbt_combined_last_modified = sum([path.stat().st_mtime for path in project_dir.glob("**/*")]) + + # The performance for the following will depend on the user's configuration + files_modified_time = sum(path.stat().st_mtime for path in cache_purge_config.file_paths) + airflow_vars = [Variable.get(var_name, "") for var_name in cache_purge_config.airflow_vars] + hash_airflow_vars = hashlib.md5("".join(airflow_vars).encode()).hexdigest() + envvars = [os.getenv(envvar_name, "") for envvar_name in cache_purge_config.env_vars] + hash_envvars = hashlib.md5("".join(envvars).encode()).hexdigest() + + elapsed_time = time.process_time() - start_time + logger.info(f"Cosmos performance: time to calculate {cache_identifier} current version: {elapsed_time}") + return f"{dbt_combined_last_modified},{files_modified_time},{hash_airflow_vars},{hash_envvars}" + + +@functools.lru_cache +def was_project_modified(previous_version: str, current_version: str) -> bool: + return previous_version != current_version diff --git a/cosmos/config.py b/cosmos/config.py index 820833e6c..3479f4b63 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -215,6 +215,13 @@ def is_manifest_available(self) -> bool: return self.manifest_path.exists() +@dataclass +class CachePurgeConfig: + file_paths: list[Path] = field(default_factory=lambda: list()) + airflow_vars: list[str] = field(default_factory=lambda: list()) + env_vars: list[str] = field(default_factory=lambda: list()) + + @dataclass class ProfileConfig: """ diff --git a/cosmos/converter.py b/cosmos/converter.py index 43eb338e5..7c492a43a 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -16,7 +16,7 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import CachePurgeConfig, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ExecutionMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.selector import retrieve_by_label @@ -204,6 +204,7 @@ def __init__( profile_config: ProfileConfig | None = None, execution_config: ExecutionConfig | None = None, render_config: RenderConfig | None = None, + cache_purge_config: CachePurgeConfig = CachePurgeConfig(), dag: DAG | None = None, task_group: TaskGroup | None = None, operator_args: dict[str, Any] | None = None, @@ -241,6 +242,7 @@ def __init__( profile_config=profile_config, cache_dir=cache_dir, cache_identifier=cache_identifier, + cache_purge_config=cache_purge_config, dbt_vars=dbt_vars, ) self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index d812bd35b..2a34471ba 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -6,6 +6,7 @@ import platform import tempfile from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen from typing import Any @@ -14,7 +15,7 @@ from airflow.models import Variable from cosmos import cache, settings -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import CachePurgeConfig, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -133,6 +134,7 @@ class DbtGraph: nodes: dict[str, DbtNode] = dict() filtered_nodes: dict[str, DbtNode] = dict() load_method: LoadMode = LoadMode.AUTOMATIC + current_version: str = "" def __init__( self, @@ -142,6 +144,7 @@ def __init__( profile_config: ProfileConfig | None = None, cache_dir: Path | None = None, cache_identifier: str = "UNDEFINED", + cache_purge_config: CachePurgeConfig | None = None, # dbt_vars only supported for LegacyDbtProject dbt_vars: dict[str, str] | None = None, ): @@ -151,6 +154,7 @@ def __init__( self.execution_config = execution_config self.cache_dir = cache_dir self.cache_identifier = cache_identifier + self.cache_purge_config = cache_purge_config or CachePurgeConfig() self.dbt_vars = dbt_vars or {} def load( @@ -202,13 +206,29 @@ def load_via_dbt_ls_cache(self) -> bool: """ logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_identifier}...") if settings.enable_cache and settings.experimental_cache: - dbt_ls_cache = Variable.get(self.cache_identifier, "") - if dbt_ls_cache: + project_path = self.render_config.project_path + + try: + cache_dict = Variable.get(self.cache_identifier, deserialize_json=True) + except (json.decoder.JSONDecodeError, KeyError): + logger.info(f"Cosmos performance: Cache miss for {self.cache_identifier}") + return False + else: + logger.info(f"Cosmos performance: Cache hit for {self.cache_identifier}") + + cache_version = cache_dict["version"] + dbt_ls_cache = cache_dict["dbt_ls"] + + current_version = cache.calculate_current_version( + self.cache_identifier, project_path, self.cache_purge_config # type: ignore + ) + + if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version): logger.info( f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_identifier} is {len(dbt_ls_cache.encode('utf-8'))}" ) self.load_method = LoadMode.DBT_LS_CACHE - project_path = self.render_config.project_path + nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache) self.nodes = nodes self.filtered_nodes = nodes @@ -249,7 +269,14 @@ def run_dbt_ls( logger.debug(line.strip()) if settings.enable_cache and settings.experimental_cache: - Variable.set(self.cache_identifier, stdout) + cache_dict = { + "version": cache.calculate_current_version( + self.cache_identifier, project_path, self.cache_purge_config + ), + "dbt_ls": stdout, + "last_modified": datetime.now().isoformat(), + } + Variable.set(self.cache_identifier, cache_dict, serialize_json=True) nodes = parse_dbt_ls_output(project_path, stdout) return nodes diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index f230e87d4..1a7c5af8a 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -10,6 +10,7 @@ from airflow.operators.empty import EmptyOperator from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import CachePurgeConfig from cosmos.constants import InvocationMode from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -29,6 +30,8 @@ invocation_mode=InvocationMode.SUBPROCESS, ) +cache_purge_config = CachePurgeConfig(file_paths=Path(__file__), airflow_vars=["purge_cache"], env_vars=["PURGE_CACHE"]) + @dag( schedule_interval="@daily",