-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed up LoadMode.DBT_LS
by caching dbt ls output in Airflow Variable
#1014
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
cosmos/dbt/graph.py
Outdated
""" | ||
logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_identifier}...") | ||
if settings.enable_cache and settings.experimental_cache: | ||
dbt_ls_cache = Variable.get(self.cache_identifier, "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to query the db directly, so you bypass any configured secrets backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham how do you advise us to do this?
Wouldn't there be a risk that with this, we'd create a larger coupling of Cosmos to Airflow that could be more sensitive to different versions of Airflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jed, since this has been long standing, I'll be merging as it is - and I can make a follow up PR to address after your feedback.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1014 +/- ##
==========================================
+ Coverage 95.82% 96.05% +0.22%
==========================================
Files 62 62
Lines 3020 3196 +176
==========================================
+ Hits 2894 3070 +176
Misses 126 126 ☔ View full report in Codecov by Sentry. |
Looks promising! |
5a25d9d
to
cfed136
Compare
cfed136
to
9656788
Compare
e3988cb
to
327192a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. There are some minor cosmetic suggestions for the documentation that we can address iteratively in a subsequent PR.
Great feature support! Thank you! 👏🏽
@@ -82,6 +82,7 @@ repos: | |||
types-PyYAML, | |||
types-attrs, | |||
attrs, | |||
types-pytz, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
types-pytz, |
Would we still need it now?
@@ -81,6 +81,7 @@ tests = [ | |||
"pytest-cov", | |||
"pytest-describe", | |||
"sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html | |||
"types-pytz", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"types-pytz", |
same
|
||
Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output | ||
of this command as an `Airflow Variable <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html>`_. | ||
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time. | |
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time. |
Caching the partial parse file | ||
~~~~~~~~~~~~~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caching the partial parse file | |
~~~~~~~~~~~~~ | |
Caching the partial parse file | |
~~~~~~~~~~~~~~~~~~~~~ |
Caching the dbt ls output | ||
~~~~~~~~~~~~~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caching the dbt ls output | |
~~~~~~~~~~~~~ | |
Caching the dbt ls output | |
~~~~~~~~~~~~~~~~~~~ |
@@ -30,6 +30,22 @@ This page lists all available Airflow configurations that affect ``astronomer-co | |||
- Default: ``True`` | |||
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE`` | |||
|
|||
.. enable_cache_dbt_ls: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. enable_cache_dbt_ls: | |
.. _enable_cache_dbt_ls: |
@tatiana A little late to this PR, but not late to 1.5.0: I'm thinking the I think, going into something like Cosmos 2.0, it makes sense to consolidate the class DbtGraph:
load_method_mapping: dict[LoadMode, Callable[[], None]] = {}
def __init__(
self,
project_config: ProjectConfig | None = None,
profile_config: ProfileConfig | None = None,
execution_config: ExecutionConfig | None = None,
render_config: RenderConfig | None = None,
**kwargs
): as per the code example in #895. Adding more kwargs means either we cannot do this, or we need to add more deprecations. I think doing this will also help make the API cleaner, more consistent, and easier for users to reason about. Right now there are just a lot of things for users to tweak and it can be overwhelming. Keeping it consistent and locked inside the configs can reduce confusion. WDYT? A few other notes:
|
New Features * Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014 * Support to cache profiles created via ``ProfileMapping`` by @pankajastro in #1046 * Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe * Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016 * Add node config to TaskInstance Context by @linchun3 in #1044 Bug fixes * Support partial parsing when cache is disabled by @tatiana in #1070 * Fix disk permission error in restricted env by @pankajastro in #1051 * Add CSP header to iframe contents by @dwreeves in #1055 * Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047 Enhancements * Support ``static_index.html`` docs by @dwreeves in #999 * Support deep linking dbt docs via Airflow UI by @dwreeves in #1038 * Add ability to specify host/port for Snowflake connection by @whummer in #1063 Docs * Fix rendering for env ``enable_cache_dbt_ls`` by @pankajastro in #1069 Others * Update documentation for DbtDocs generator by @arjunanan6 in #1043 * Use uv in CI by @dwreeves in #1013 * Cache hatch folder in the CI by @tatiana in #1056 * Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054 * Mark plugin integration tests as integration by @tatiana in #1057 * Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049 * Pre-commit hook updates in #1039, #1050, #1064 * Remove duplicates in changelog by @jedcunningham in #1068
…stronomer#1054) Using the Airflow metadata database connection as an example connection is misleading. The mismatch in the environment variable value used in the Cosmos integration tests, particularly with sqlite as the Airflow metadata database, is an issue that can hide other underlining problems. This PR decouples the test connection used by Cosmos example DAGs from the Airflow metadata Database connection. Since this change affects the Github action configuration, it will only work for the branch-triggered GH action runs, such as: https://github.com/astronomer/astronomer-cosmos/actions/runs/9596066209 Because this is a breaking change to the CI script itself, all the tests `pull_request_target` are expected to fail during the PR - and will pass once this is merged to `main`. This improvement was originally part of astronomer#1014 --------- Co-authored-by: Pankaj Koti <[email protected]>
…le (astronomer#1014) Improve significantly the `LoadMode.DBT_LS` performance. The example DAGs tested reduced the task queueing time significantly (from ~30s to ~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s (by more than 50%). Some users[ reported improvements of 84%](astronomer#1014 (comment)) in the DAG run time when trying out these changes. This difference can be even more significant on larger dbt projects. The improvement was accomplished by caching the dbt ls output as an Airflow Variable. This is an alternative to astronomer#992, when we cached the pickled DAG/TaskGroup into a local file in the Airflow node. Unlike astronomer#992, this approach works well for distributed deployments of Airflow. As with any caching solution, this strategy does not guarantee optimal performance on every run—whenever the cache is regenerated, the scheduler or DAG processor will experience a delay. It was also observed that the key value could change across platforms (e.g., `Darwin` and `Linux`). Therefore, if using a deployment with heterogeneous OS, the key may be regenerated often. Closes: astronomer#990 Closes: astronomer#1061 **Enabling/disabling this feature** This feature is enabled by default. Users can disable it by setting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`. **How the cache is refreshed** Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. The cache will be automatically refreshed in case any files of the dbt project change. Changes are calculated using the SHA256 of all the files in the directory. Initially, this feature was implemented using the files' modified timestamp, but this did not work well for some Airflow deployments (e.g., `astro --dags` since the timestamp was changed during deployments). Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration: * `ProjectConfig.dbt_vars` * `ProjectConfig.env_vars` * `ProjectConfig.partial_parse` * `RenderConfig.env_vars` * `RenderConfig.exclude` * `RenderConfig.select` * `RenderConfig.selector` The following argument was introduced in case users would like to define Airflow variables that could be used to refresh the cache (it expects a list with Airflow variable names): * `RenderConfig.airflow_vars_to_purge_cache` Example: ``` RenderConfig( airflow_vars_to_purge_cache==["refresh_cache"] ) ``` **Cache key** The Airflow variables that represent the dbt ls cache are prefixed by `cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When using `DbtTaskGroup`, they consider the TaskGroup and parent task groups and DAG. Examples: 1. The `DbtDag` "cosmos_dag" will have the cache represented by `"cosmos_cache__basic_cosmos_dag"`. 2. The `DbtTaskGroup` "customers" declared inside teh DAG "basic_cosmos_task_group" will have the cache key `"cosmos_cache__basic_cosmos_task_group__customers"`. **Cache value** The cache values contain a few properties: - `last_modified` timestamp, represented using the ISO 8601 format. - `version` is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time the cache was created - `dbt_ls_compressed` represents the dbt ls output compressed using zlib and encoded to base64 to be recorded as a string to the Airflow metadata database. Steps used to compress: ``` compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) encoded_data = base64.b64encode(compressed_data) dbt_ls_compressed = encoded_data.decode("utf-8") ``` We are compressing this value because it will be significant for larger dbt projects, depending on the selectors used, and we wanted this approach to be safe and not clutter the Airflow metadata database. Some numbers on the compression * A dbt project with 100 models can lead to a dbt ls output of 257k characters when using JSON. Zlib could compress it by 20x. * Another [real-life dbt project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads) with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It reduces to 489 KB after being compressed using `zlib` and encoded using `base64` - to 6% of the original size. * Maximum cell size in Postgres: 20MB The latency used to compress is in the order of milliseconds, not interfering in the performance of this solution. **Future work** * How this will affect the Airflow db in the long term * How does this performance compare to `ObjectStorage`? **Example of results before and after this change** Task queue times in Astro before the change: <img width="1488" alt="Screenshot 2024-06-03 at 11 15 26" src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c"> Task queue times in Astro after the change on the second run of the DAG: <img width="1624" alt="Screenshot 2024-06-03 at 11 15 44" src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e"> This feature will be available in `astronomer-cosmos==1.5.0a8`.
New Features * Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in astronomer#1014 * Support to cache profiles created via ``ProfileMapping`` by @pankajastro in astronomer#1046 * Support for running dbt tasks in AWS EKS in astronomer#944 by @VolkerSchiewe * Add Clickhouse profile mapping by @roadan and @pankajastro in astronomer#353 and astronomer#1016 * Add node config to TaskInstance Context by @linchun3 in astronomer#1044 Bug fixes * Support partial parsing when cache is disabled by @tatiana in astronomer#1070 * Fix disk permission error in restricted env by @pankajastro in astronomer#1051 * Add CSP header to iframe contents by @dwreeves in astronomer#1055 * Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in astronomer#1047 Enhancements * Support ``static_index.html`` docs by @dwreeves in astronomer#999 * Support deep linking dbt docs via Airflow UI by @dwreeves in astronomer#1038 * Add ability to specify host/port for Snowflake connection by @whummer in astronomer#1063 Docs * Fix rendering for env ``enable_cache_dbt_ls`` by @pankajastro in astronomer#1069 Others * Update documentation for DbtDocs generator by @arjunanan6 in astronomer#1043 * Use uv in CI by @dwreeves in astronomer#1013 * Cache hatch folder in the CI by @tatiana in astronomer#1056 * Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in astronomer#1054 * Mark plugin integration tests as integration by @tatiana in astronomer#1057 * Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in astronomer#1049 * Pre-commit hook updates in astronomer#1039, astronomer#1050, astronomer#1064 * Remove duplicates in changelog by @jedcunningham in astronomer#1068
New Features * Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014 * Support to cache profiles created via ``ProfileMapping`` by @pankajastro in #1046 * Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe * Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016 * Add node config to TaskInstance Context by @linchun3 in #1044 Bug fixes * Support partial parsing when cache is disabled by @tatiana in #1070 * Fix disk permission error in restricted env by @pankajastro in #1051 * Add CSP header to iframe contents by @dwreeves in #1055 * Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047 Enhancements * Support ``static_index.html`` docs by @dwreeves in #999 * Support deep linking dbt docs via Airflow UI by @dwreeves in #1038 * Add ability to specify host/port for Snowflake connection by @whummer in #1063 Docs * Fix rendering for env ``enable_cache_dbt_ls`` by @pankajastro in #1069 Others * Update documentation for DbtDocs generator by @arjunanan6 in #1043 * Use uv in CI by @dwreeves in #1013 * Cache hatch folder in the CI by @tatiana in #1056 * Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054 * Mark plugin integration tests as integration by @tatiana in #1057 * Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049 * Pre-commit hook updates in #1039, #1050, #1064 * Remove duplicates in changelog by @jedcunningham in #1068 (cherry picked from commit 18d2c90)
Hi @dwreeves, Thanks a lot for all the very relevant feedback! I'm sorry I missed it, as it was made after the PR was merged.
We can expose them there as part of a follow-up PR / future version of Cosmos. Just added: #1110 I'm also happy with the proposal to refactor the I created a PR to address the feedback on the dead code: #1111. Your proposal to refactor the load implementation in #1001 is very good as well. Would you like to work on it? |
I would, but I've been busy... still trying to find time to contribute to this project. |
Addressing the late feedback on a previously merged PR: #1014 (comment)
Addressing the late feedback on a previously merged PR: astronomer#1014 (comment)
This PR introduces the functionality to store and retrieve the `dbt ls` output cache in remote storage systems. This enhancement improves the efficiency and scalability of cache management for Cosmos dbt projects that use the `dbt ls` cache option (enabled by default) introduced in PR #1014 ## Key Changes 1. **`dbt ls` Cache Storage in Remote Stores**: Added support to store the dbt ls cache as a JSON file in remote storage paths configured in the Airflow settings under the `cosmos` section. The cache is saved in the specified remote storage path & it includes the `cosmos_cache__` prefix. 2. **Cache Retrieval from Remote Stores**: Implemented logic to check the existence of the cache in the remote storage path before falling back to the Variable cache. If the `remote_cache_dir` is specified and it exists in the remote store, it is read and used; We try creating the specified path if it does not exist. 3. **Backward Compatibility**: Maintained backward compatibility by allowing users to continue using local cache storage through Airflow Variables if a `remote_cache_dir` is not specified. ## Impact 1. **Scalability**: Enables the use of remote, scalable storage systems for dbt cache management. 2. **Performance**: Reduces the load on Airflow's metadata database by offloading cache storage to external systems. 3. **Flexibility**: Provides users with the option to choose between local (Airflow metadata using Variables) and remote cache storage based on their infrastructure needs. ## Configuration To leverage this feature, users need to set the `remote_cache_dir` in their Airflow settings in the `cosmos` section. This path should point to a compatible remote storage location. You can also specify the `remote_cache_dir_conn_id` which is your Airflow connection that can connect to your remote store. If it's not specified, Cosmos will aim to identify the scheme for the specified path and use the default Airflow connection ID as per the scheme. ## Testing 1. Tested with various remote storage backends (AWS S3 and GCP GS) to ensure compatibility and reliability 2. Verified that cache retrieval falls back to Variable based caching approach if the `remote_cache_dir` is not configured. ## Documentation Updated the documentation to include instructions on configuring `remote_cache_dir`. ## Limitations 1. Users must be on Airflow version 2.8 or higher because the underlying Airflow Object Store feature we utilise to access remote stores was introduced in this version. If users attempt to specify a `remote_cache_dir` on an older Airflow version, they will encounter an error indicating the version requirement. 2. Users would observe a slight delay for the tasks being in queued state (approx 1-2 seconds queued duration vs the 0-1 seconds previously in the Variable approach) due to remote storage calls to retrieve the cache from. Closes: #1072
Improve significantly the
LoadMode.DBT_LS
performance. The example DAGs tested reduced the task queueing time significantly (from ~30s to ~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s (by more than 50%). Some users reported improvements of 84% in the DAG run time when trying out these changes. This difference can be even more significant on larger dbt projects.The improvement was accomplished by caching the dbt ls output as an Airflow Variable. This is an alternative to #992, when we cached the pickled DAG/TaskGroup into a local file in the Airflow node. Unlike #992, this approach works well for distributed deployments of Airflow.
As with any caching solution, this strategy does not guarantee optimal performance on every run—whenever the cache is regenerated, the scheduler or DAG processor will experience a delay. It was also observed that the key value could change across platforms (e.g.,
Darwin
andLinux
). Therefore, if using a deployment with heterogeneous OS, the key may be regenerated often.Closes: #990
Closes: #1061
Enabling/disabling this feature
This feature is enabled by default.
Users can disable it by setting the environment variable
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0
.How the cache is refreshed
Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key.
The cache will be automatically refreshed in case any files of the dbt project change. Changes are calculated using the SHA256 of all the files in the directory. Initially, this feature was implemented using the files' modified timestamp, but this did not work well for some Airflow deployments (e.g.,
astro --dags
since the timestamp was changed during deployments).Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration:
ProjectConfig.dbt_vars
ProjectConfig.env_vars
ProjectConfig.partial_parse
RenderConfig.env_vars
RenderConfig.exclude
RenderConfig.select
RenderConfig.selector
The following argument was introduced in case users would like to define Airflow variables that could be used to refresh the cache (it expects a list with Airflow variable names):
RenderConfig.airflow_vars_to_purge_cache
Example:
Cache key
The Airflow variables that represent the dbt ls cache are prefixed by
cosmos_cache
. When usingDbtDag
, the keys use the DAG name. When usingDbtTaskGroup
, they consider the TaskGroup and parent task groups and DAG.Examples:
DbtDag
"cosmos_dag" will have the cache represented by"cosmos_cache__basic_cosmos_dag"
.DbtTaskGroup
"customers" declared inside teh DAG "basic_cosmos_task_group" will have the cache key"cosmos_cache__basic_cosmos_task_group__customers"
.Cache value
The cache values contain a few properties:
last_modified
timestamp, represented using the ISO 8601 format.version
is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time the cache was createddbt_ls_compressed
represents the dbt ls output compressed using zlib and encoded to base64 to be recorded as a string to the Airflow metadata database.Steps used to compress:
We are compressing this value because it will be significant for larger dbt projects, depending on the selectors used, and we wanted this approach to be safe and not clutter the Airflow metadata database.
Some numbers on the compression
zlib
and encoded usingbase64
- to 6% of the original size.The latency used to compress is in the order of milliseconds, not interfering in the performance of this solution.
Future work
LoadMode.DBT_LS
cache #1090ObjectStorage
? [Feature] Allow storing dbt ls cache into Object Store #1072Example of results before and after this change
Task queue times in Astro before the change:
Task queue times in Astro after the change on the second run of the DAG:
This feature is available in
astronomer-cosmos==1.5.0a8
.The previous screenshots were taken when trying out the alpha release using the following Astro CLI project:
https://github.com/astronomer/cosmos-demo
The same was reproduced by running the DAG using Airflow standalone.