From a89389de32fd3fc140722ca925d5a28526fd8172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Rojas=20Ben=C3=ADtez?= Date: Wed, 14 Aug 2024 03:53:16 -0500 Subject: [PATCH] Add default source nodes rendering (#1107) Re-Opening of PR #661 This PR features a new way of rendering source nodes: - Check freshness for sources with freshness checks - Source tests - Empty operators for nodes without tests or freshness. One of the main limitations I found while using the `custom_callback` functions on source nodes to check freshness is that nodes were being created on 100% of sources but not all of them required freshness checks, this made workers waste compute time. I'm adding a new variable into the DbtNode class called has_freshness which would be True for sources with freshness checks and False for any other resource type. If this feature is enabled with the option `ALL`: All sources with the has_freshness == False will be rendered as Empty Operators, to keep the dbt's behavior of showing sources as suggested in issue #630 A new rendered template field is included too: `freshness` which is the sources.json generated by dbt when running `dbt source freshness` This adds a new node type (source), which changes some tests behavior. This PR also updates the dev dbt project jaffle_shop to include source nodes when enabled. ![image](https://github.com/user-attachments/assets/e972ac58-8741-4c13-9905-e78775f9cc80) As seen in the image, source nodes with freshness checks are rendered with a blue color, while the ones rendered as EmptyOperator show a white/light green color ## Related Issue(s) Closes: #630 Closes: #572 Closes: https://github.com/astronomer/astronomer-cosmos/issues/875 ## Breaking Change? This won't be a breaking change since the default behavior will still be ignoring this new feature. That can be changed with the new RenderConfig variable called `source_rendering_behavior`. Co-authored-by: Pankaj Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> --- .github/workflows/test.yml | 3 + cosmos/airflow/graph.py | 34 +++- cosmos/config.py | 3 + cosmos/constants.py | 10 + cosmos/core/airflow.py | 2 +- cosmos/dbt/graph.py | 59 +++++- cosmos/operators/aws_eks.py | 7 + cosmos/operators/azure_container_instance.py | 7 + cosmos/operators/base.py | 9 + cosmos/operators/docker.py | 7 + cosmos/operators/kubernetes.py | 7 + cosmos/operators/local.py | 36 +++- cosmos/operators/virtualenv.py | 8 + .../jaffle_shop/models/staging/sources.yml | 31 ++++ .../models/staging/stg_customers.sql | 9 +- .../jaffle_shop/models/staging/stg_orders.sql | 9 +- .../models/staging/stg_payments.sql | 11 +- docs/configuration/cosmos-conf.rst | 1 - docs/configuration/index.rst | 1 + docs/configuration/source-nodes-rendering.rst | 36 ++++ scripts/test/integration-dbt-1-5-4.sh | 1 + scripts/test/integration-expensive.sh | 1 + scripts/test/integration.sh | 2 + tests/airflow/test_graph.py | 75 +++++++- tests/dbt/test_graph.py | 172 ++++++++++++++---- tests/operators/test_local.py | 84 ++++++++- 26 files changed, 564 insertions(+), 61 deletions(-) create mode 100644 dev/dags/dbt/jaffle_shop/models/staging/sources.yml create mode 100644 docs/configuration/source-nodes-rendering.rst diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f19831676..219ea2019 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -163,6 +163,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -234,6 +235,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -377,6 +379,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5639d3bcb..17ee22c95 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -12,6 +12,7 @@ TESTABLE_DBT_RESOURCES, DbtResourceType, ExecutionMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -127,7 +128,11 @@ def create_test_task_metadata( def create_task_metadata( - node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False + node: DbtNode, + execution_mode: ExecutionMode, + args: dict[str, Any], + use_task_group: bool = False, + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -145,6 +150,7 @@ def create_task_metadata( DbtResourceType.SNAPSHOT: "DbtSnapshot", DbtResourceType.SEED: "DbtSeed", DbtResourceType.TEST: "DbtTest", + DbtResourceType.SOURCE: "DbtSource", } args = {**args, **{"models": node.resource_name}} @@ -154,6 +160,23 @@ def create_task_metadata( task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" + elif node.resource_type == DbtResourceType.SOURCE: + if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( + source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS + and node.has_freshness is False + and node.has_test is False + ): + return None + # TODO: https://github.com/astronomer/astronomer-cosmos + # pragma: no cover + task_id = f"{node.name}_source" + args["select"] = f"source:{node.resource_name}" + args.pop("models") + if use_task_group is True: + task_id = node.resource_type.value + if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: + # render sources without freshness as empty operators + return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator") else: task_id = f"{node.name}_{node.resource_type.value}" if use_task_group is True: @@ -185,6 +208,7 @@ def generate_task_or_group( execution_mode: ExecutionMode, task_args: dict[str, Any], test_behavior: TestBehavior, + source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, **kwargs: Any, @@ -198,7 +222,11 @@ def generate_task_or_group( ) task_meta = create_task_metadata( - node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group + node=node, + execution_mode=execution_mode, + args=task_args, + use_task_group=use_task_group, + source_rendering_behavior=source_rendering_behavior, ) # In most cases, we'll map one DBT node to one Airflow task @@ -260,6 +288,7 @@ def build_airflow_graph( """ node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior + source_rendering_behavior = render_config.source_rendering_behavior tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -278,6 +307,7 @@ def build_airflow_graph( execution_mode=execution_mode, task_args=task_args, test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, node=node, diff --git a/cosmos/config.py b/cosmos/config.py index 62557de63..f2c07bb98 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -20,6 +20,7 @@ ExecutionMode, InvocationMode, LoadMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -59,6 +60,7 @@ class RenderConfig: :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). + :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). """ emit_datasets: bool = True @@ -75,6 +77,7 @@ class RenderConfig: dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) enable_mock_profile: bool = True + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/constants.py b/cosmos/constants.py index 25b33f28a..6e96551e4 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -113,6 +113,16 @@ class TestIndirectSelection(Enum): EMPTY = "empty" +class SourceRenderingBehavior(Enum): + """ + Modes to configure the source rendering behavior. + """ + + NONE = "none" + ALL = "all" + WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness" + + class DbtResourceType(aenum.Enum): # type: ignore """ Type of dbt node. diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 38d5113ec..acff5d012 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,7 +35,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None dag=dag, task_group=task_group, owner=task_owner, - extra_context=task.extra_context, + **({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}), **task.arguments, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 5240a8ed6..f60ac2ece 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -13,7 +13,7 @@ from functools import cached_property from pathlib import Path from subprocess import PIPE, Popen -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional from airflow.models import Variable @@ -33,6 +33,7 @@ DbtResourceType, ExecutionMode, LoadMode, + SourceRenderingBehavior, ) from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file @@ -53,7 +54,7 @@ class CosmosLoadDbtException(Exception): @dataclass class DbtNode: """ - Metadata related to a dbt node (e.g. model, seed, snapshot). + Metadata related to a dbt node (e.g. model, seed, snapshot, source). """ unique_id: str @@ -62,6 +63,7 @@ class DbtNode: file_path: Path tags: list[str] = field(default_factory=lambda: []) config: dict[str, Any] = field(default_factory=lambda: {}) + has_freshness: bool = False has_test: bool = False @property @@ -104,6 +106,30 @@ def context_dict(self) -> dict[str, Any]: } +def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool: + """Function to find if a source has null freshness. Scenarios where freshness + looks like: + "freshness": { + "warn_after": { + "count": null, + "period": null + }, + "error_after": { + "count": null, + "period": null + }, + "filter": null + } + should be considered as null, this function ensures that.""" + if freshness is None: + return False + for _, value in freshness.items(): + if isinstance(value, dict): + if any(subvalue is not None for subvalue in value.values()): + return True + return False + + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -147,6 +173,11 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, file_path=project_path / node_dict["original_file_path"], tags=node_dict.get("tags", []), config=node_dict.get("config", {}), + has_freshness=( + is_freshness_effective(node_dict.get("freshness")) + if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE + else False + ), ) nodes[node.unique_id] = node logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) @@ -353,7 +384,24 @@ def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] ) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [dbt_cmd, "ls", "--output", "json"] + if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE: + ls_command = [ + dbt_cmd, + "ls", + "--output", + "json", + "--output-keys", + "name", + "unique_id", + "resource_type", + "depends_on", + "original_file_path", + "tags", + "config", + "freshness", + ] + else: + ls_command = [dbt_cmd, "ls", "--output", "json"] ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) @@ -636,6 +684,11 @@ def load_from_dbt_manifest(self) -> None: file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], + has_freshness=( + is_freshness_effective(node_dict.get("freshness")) + if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE + else False + ), ) nodes[node.unique_id] = node diff --git a/cosmos/operators/aws_eks.py b/cosmos/operators/aws_eks.py index 180028378..0d98b2c63 100644 --- a/cosmos/operators/aws_eks.py +++ b/cosmos/operators/aws_eks.py @@ -14,6 +14,7 @@ DbtRunOperationKubernetesOperator, DbtSeedKubernetesOperator, DbtSnapshotKubernetesOperator, + DbtSourceKubernetesOperator, DbtTestKubernetesOperator, ) @@ -101,6 +102,12 @@ class DbtSnapshotAwsEksOperator(DbtAwsEksBaseOperator, DbtSnapshotKubernetesOper """ +class DbtSourceAzureContainerInstanceOperator(DbtAwsEksBaseOperator, DbtSourceKubernetesOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunAwsEksOperator(DbtAwsEksBaseOperator, DbtRunKubernetesOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index d8427b2fb..993d4315f 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -13,6 +13,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -102,6 +103,12 @@ class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContai """ +class DbtSourceAzureContainerInstanceOperator(DbtSourceMixin, DbtAzureContainerInstanceBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core run command. diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index d0cbdd282..37aae7a81 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -344,6 +344,15 @@ class DbtSnapshotMixin: ui_color = "#964B00" +class DbtSourceMixin: + """ + Executes a dbt source freshness command. + """ + + base_cmd = ["source", "freshness"] + ui_color = "#34CCEB" + + class DbtRunMixin: """ Mixin for dbt run command. diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 532de380e..4abf9e994 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -13,6 +13,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -94,6 +95,12 @@ class DbtSnapshotDockerOperator(DbtSnapshotMixin, DbtDockerBaseOperator): """ +class DbtSourceDockerOperator(DbtSourceMixin, DbtDockerBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunDockerOperator(DbtRunMixin, DbtDockerBaseOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index f84219199..ef69cd561 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -17,6 +17,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -125,6 +126,12 @@ class DbtSnapshotKubernetesOperator(DbtSnapshotMixin, DbtKubernetesBaseOperator) """ +class DbtSourceKubernetesOperator(DbtSourceMixin, DbtKubernetesBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 8a4ea1ba2..8baa42716 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import tempfile import warnings @@ -67,6 +68,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -115,9 +117,10 @@ class DbtLocalBaseOperator(AbstractDbtBaseOperator): and does not inherit the current process environment. """ - template_fields: Sequence[str] = AbstractDbtBaseOperator.template_fields + ("compiled_sql",) # type: ignore[operator] + template_fields: Sequence[str] = AbstractDbtBaseOperator.template_fields + ("compiled_sql", "freshness") # type: ignore[operator] template_fields_renderers = { "compiled_sql": "sql", + "freshness": "json", } def __init__( @@ -133,6 +136,7 @@ def __init__( self.profile_config = profile_config self.callback = callback self.compiled_sql = "" + self.freshness = "" self.should_store_compiled_sql = should_store_compiled_sql self.openlineage_events_completes: list[RunEvent] = [] self.invocation_mode = invocation_mode @@ -248,6 +252,29 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context, session: Se else: logger.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") + @provide_session + def store_freshness_json(self, tmp_project_dir: str, context: Context, session: Session = NEW_SESSION) -> None: + """ + Takes the compiled sources.json file from the dbt source freshness and stores it in the freshness rendered template. + Gets called after every dbt run / source freshness. + """ + if not self.should_store_compiled_sql: + return + + sources_json_path = Path(os.path.join(tmp_project_dir, "target", "sources.json")) + + if sources_json_path.exists(): + sources_json_content = sources_json_path.read_text(encoding="utf-8").strip() + + sources_data = json.loads(sources_json_content) + + formatted_sources_json = json.dumps(sources_data, indent=4) + + self.freshness = formatted_sources_json + + else: + self.freshness = "" + def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( @@ -368,6 +395,7 @@ def run_command( if partial_parse_file.exists(): cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) + self.store_freshness_json(tmp_project_dir, context) self.store_compiled_sql(tmp_project_dir, context) self.handle_exception(result) if self.callback: @@ -538,6 +566,12 @@ class DbtSnapshotLocalOperator(DbtSnapshotMixin, DbtLocalBaseOperator): """ +class DbtSourceLocalOperator(DbtSourceMixin, DbtLocalBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunLocalOperator(DbtRunMixin, DbtLocalBaseOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index cbe8c67e9..4b1a97e6c 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -18,6 +18,7 @@ DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, + DbtSourceLocalOperator, DbtTestLocalOperator, ) @@ -136,6 +137,13 @@ class DbtSnapshotVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSnapshotLocalO """ +class DbtSourceVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSourceLocalOperator): + """ + Executes `dbt source freshness` command within a Python Virtual Environment, that is created before running the dbt + command and deleted just after. + """ + + class DbtRunVirtualenvOperator(DbtVirtualenvBaseOperator, DbtRunLocalOperator): # type: ignore[misc] """ Executes a dbt core run command within a Python Virtual Environment, that is created before running the dbt command diff --git a/dev/dags/dbt/jaffle_shop/models/staging/sources.yml b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml new file mode 100644 index 000000000..a3139b585 --- /dev/null +++ b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml @@ -0,0 +1,31 @@ + +version: 2 + +sources: + - name: postgres_db + database: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + tables: + - name: raw_customers + columns: + - name: id + tests: + - unique + - not_null + - name: raw_payments + columns: + - name: id + tests: + - unique + - not_null + - name: raw_orders + columns: + - name: id + tests: + - unique + - not_null + freshness: + warn_after: + count: 3650 + period: day + loaded_at_field: CAST(order_date AS TIMESTAMP) diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql index cad047269..71b6c7c0a 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql @@ -4,8 +4,15 @@ with source as ( Normally we would select from the table here, but we are using seeds to load our data in this project #} - select * from {{ ref('raw_customers') }} + select * from {{ source('postgres_db', 'raw_customers') }} + +), +force_seed_dep as ( + {#- + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none + #} + select * from {{ ref('raw_customers') }} ), renamed as ( diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql index a654dcb94..b6c13a33f 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql @@ -4,10 +4,17 @@ with source as ( Normally we would select from the table here, but we are using seeds to load our data in this project #} - select * from {{ ref('raw_orders') }} + select * from {{ source('postgres_db', 'raw_orders') }} ), +force_seed_dep as ( + {#- + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none + #} + select * from {{ ref('raw_customers') }} +), + renamed as ( select diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql index f718596ad..3ff1fbece 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql @@ -1,11 +1,14 @@ with source as ( + select * from {{ source('postgres_db', 'raw_payments') }} + +), + +force_seed_dep as ( {#- - Normally we would select from the table here, but we are using seeds to load - our data in this project + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none #} - select * from {{ ref('raw_payments') }} - + select * from {{ ref('raw_customers') }} ), renamed as ( diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 037a43d3b..5b8ee210b 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -94,7 +94,6 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` - [openlineage] ~~~~~~~~~~~~~ diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index 90f195938..f6e60f61b 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -22,6 +22,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m Testing Behavior Selecting & Excluding Partial Parsing + Source Nodes Rendering Operator Args Compiled SQL Logging diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst new file mode 100644 index 000000000..ae1417361 --- /dev/null +++ b/docs/configuration/source-nodes-rendering.rst @@ -0,0 +1,36 @@ +.. _source-nodes-rendering: + +Source Nodes Rendering +================ + +.. note:: + This feature is only available for dbt-core >= 1.5 and cosmos >= 1.6.0. + +By default, Cosmos does not render dbt sources automatically. Instead, you need to configure the rendering of sources explicitly. +You can control this behavior using the ``source_rendering_behavior`` field in the ``RenderConfig`` object. This is how it works: + +- **all**: + When set to ``all``, Cosmos renders all sources in the dbt project. It uses three different node types for this: + - ``EmptyOperator``: For sources that do not have tests or freshness checks. + - ``DbtSourceOperator``: For sources that have freshness checks. + - ``DbtTestOperator``: For sources that have tests. + + This approach aims to create a comprehensive DAG that aligns with dbt documentation, allowing for the rendering of both sources and models for a more detailed visual representation. + It also ensures that model dependencies do not run if their sources are not fresh, thus preventing the execution of stale or incomplete data. + +- **none** (default): When set to ``none``, Cosmos does not automatically render any sources. Note that if node converters are being used for sources, they will still function as intended. + +- **with_tests_or_freshness**: When set to ``with_tests_or_freshness``, Cosmos only renders sources that have either tests or freshness checks. + +Example: + +.. code-block:: python + + from cosmos import DbtTaskGroup, RenderConfig + from cosmos.constants import SourceRenderingBehavior + + jaffle_shop = DbtTaskGroup( + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, + ) + ) diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index 087533082..284f60517 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -1,5 +1,6 @@ pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 +export SOURCE_RENDERING_BEHAVIOR=all rm -rf airflow.*; \ airflow db init; \ pytest -vv \ diff --git a/scripts/test/integration-expensive.sh b/scripts/test/integration-expensive.sh index 24bace86d..96c2388cf 100644 --- a/scripts/test/integration-expensive.sh +++ b/scripts/test/integration-expensive.sh @@ -1,3 +1,4 @@ +export SOURCE_RENDERING_BEHAVIOR=all pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 1d8264768..a39ef63b1 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -3,6 +3,8 @@ set -x set -e +export SOURCE_RENDERING_BEHAVIOR=all + pip freeze | grep airflow echo $AIRFLOW_HOME ls $AIRFLOW_HOME diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index f9a62106d..ddffa226c 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,3 +1,4 @@ +import os from datetime import datetime from pathlib import Path from unittest.mock import patch @@ -21,6 +22,7 @@ from cosmos.constants import ( DbtResourceType, ExecutionMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -29,6 +31,7 @@ from cosmos.profiles import PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") +SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) parent_seed = DbtNode( unique_id=f"{DbtResourceType.SEED.value}.{SAMPLE_PROJ_PATH.stem}.seed_parent", @@ -100,6 +103,7 @@ def test_build_airflow_graph_with_after_each(): task_args=task_args, render_config=RenderConfig( test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), dbt_project_name="astro_shop", ) @@ -169,6 +173,7 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource }, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) assert isinstance(output, TaskGroup) assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"] @@ -196,6 +201,7 @@ def test_build_airflow_graph_with_after_all(): render_config = RenderConfig( select=["tag:some"], test_behavior=TestBehavior.AFTER_ALL, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) build_airflow_graph( nodes=sample_nodes, @@ -309,12 +315,12 @@ def test_create_task_metadata_unsupported(caplog): ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, - "my_source_run", - "cosmos.operators.local.DbtRunLocalOperator", - {"models": "my_source"}, + "my_source_source", + "cosmos.operators.local.DbtSourceLocalOperator", + {"select": "source:my_source"}, { "dbt_node_config": { - "unique_id": "model.my_folder.my_source", + "unique_id": "source.my_folder.my_source", "resource_type": "source", "depends_on": [], "file_path": ".", @@ -364,6 +370,7 @@ def test_create_task_metadata_model( file_path=Path(""), tags=[], config={}, + has_freshness=True, ) metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) @@ -402,6 +409,64 @@ def test_create_task_metadata_model_use_task_group(caplog): assert metadata.id == "run" +@pytest.mark.parametrize( + "unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class", + [ + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + True, + SOURCE_RENDERING_BEHAVIOR, + "my_source_source", + "cosmos.operators.local.DbtSourceLocalOperator", + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SOURCE_RENDERING_BEHAVIOR, + "my_source_source", + "airflow.operators.empty.EmptyOperator", + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + True, + SourceRenderingBehavior.NONE, + None, + None, + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SourceRenderingBehavior.NONE, + None, + None, + ), + ], +) +def test_create_task_metadata_source_with_rendering_options( + unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class, caplog +): + child_node = DbtNode( + unique_id=unique_id, + resource_type=resource_type, + depends_on=[], + file_path=Path(""), + tags=[], + config={}, + has_freshness=has_freshness, + ) + + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, source_rendering_behavior=source_rendering_behavior, args={} + ) + if metadata: + assert metadata.id == expected_id + assert metadata.operator_class == expected_operator_class + + @pytest.mark.parametrize("use_task_group", (None, True, False)) def test_create_task_metadata_seed(caplog, use_task_group): sample_node = DbtNode( @@ -524,7 +589,7 @@ def test_airflow_kwargs_generation(): "group_id": "fake_group_id", "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", - "render_config": RenderConfig(select=["fake-render"]), + "render_config": RenderConfig(select=["fake-render"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR), "default_args": {"retries": 2}, "profile_config": ProfileConfig( profile_name="default", diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 0e7b0b05a..8b526a8ab 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -14,7 +14,7 @@ from cosmos import settings from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode +from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode, SourceRenderingBehavior from cosmos.dbt.graph import ( CosmosLoadDbtException, DbtGraph, @@ -32,6 +32,7 @@ SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt" +SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) @pytest.fixture @@ -131,7 +132,10 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(exclude=["config.materialized:table"]) + render_config = RenderConfig( + exclude=["config.materialized:table"], + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -170,7 +174,7 @@ def test_load_via_manifest_with_select(project_name, manifest_filepath, model_fi target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(select=["+customers"]) + render_config = RenderConfig(select=["+customers"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -251,7 +255,10 @@ def test_load_automatic_dbt_ls_file_is_available(mock_load_via_dbt_ls_file): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT) + render_config = RenderConfig( + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL) assert mock_load_via_dbt_ls_file.called @@ -264,7 +271,7 @@ def test_load_dbt_ls_file_without_file(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=None) + render_config = RenderConfig(dbt_ls_path=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE) @@ -278,7 +285,11 @@ def test_load_dbt_ls_file_without_project_path(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=None) + render_config = RenderConfig( + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + dbt_project_path=None, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) dbt_graph = DbtGraph( project=project_config, profile_config=profile_config, @@ -419,7 +430,10 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( assert not (tmp_dbt_project_dir / "logs").exists() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -440,7 +454,10 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( def test_load_via_dbt_ls_with_exclude(postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["*customers*"], exclude=["*orders*"] + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + select=["*customers*"], + exclude=["*orders*"], + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -453,13 +470,15 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): dbt_graph.load_via_dbt_ls() assert dbt_graph.nodes == dbt_graph.filtered_nodes # This test is dependent upon dbt >= 1.5.4 - assert len(dbt_graph.nodes) == 7 + assert len(dbt_graph.nodes) == 9 expected_keys = [ "model.jaffle_shop.customers", "model.jaffle_shop.stg_customers", "seed.jaffle_shop.raw_customers", "test.jaffle_shop.not_null_customers_customer_id.5c9bf9911d", "test.jaffle_shop.not_null_stg_customers_customer_id.e2cfb1f9aa", + "test.jaffle_shop.source_not_null_postgres_db_raw_customers_id.de3e9fff76", + "test.jaffle_shop.source_unique_postgres_db_raw_customers_id.6e5ad1d707", "test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1", "test.jaffle_shop.unique_stg_customers_customer_id.c7614daada", ] @@ -481,7 +500,10 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -492,13 +514,16 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): dbt_graph.load_via_dbt_ls() assert dbt_graph.nodes == dbt_graph.filtered_nodes - assert len(dbt_graph.nodes) == 28 + assert len(dbt_graph.nodes) == 37 def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig( + dbt_executable_path="/inexistent/dbt", + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) dbt_graph = DbtGraph( project=project_config, execution_config=execution_config, @@ -516,7 +541,9 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + dbt_executable_path="existing-dbt-cmd", + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -535,7 +562,9 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + dbt_executable_path="/inexistent/dbt", + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( @@ -569,6 +598,7 @@ def test_load_via_dbt_ls_with_sources(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, env_vars={"DBT_SQLITE_PATH": str(DBT_PROJECTS_ROOT_DIR / "data")}, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -586,7 +616,11 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_deps=False) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + dbt_deps=False, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -629,7 +663,11 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( stdout, stderr = process.communicate() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=False) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=False, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -657,7 +695,10 @@ def test_load_via_dbt_ls_caching_partial_parsing( project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=True, + enable_mock_profile=False, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -697,7 +738,10 @@ def test_load_via_dbt_ls_uses_partial_parse_when_cache_is_disabled( caplog.set_level(logging.DEBUG) project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=True, + enable_mock_profile=False, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -738,7 +782,10 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr( mock_popen().returncode = 0 project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -757,7 +804,10 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen, postgres_profile_c mock_popen().returncode = 1 project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -775,7 +825,10 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen, postgres_profile_c def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, postgres_profile_config): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -794,7 +847,10 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, po def test_load_via_load_via_custom_parser(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -816,7 +872,11 @@ def test_load_via_load_via_custom_parser(project_name): def test_load_via_load_via_custom_parser_select_rendering_config(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "jaffle_shop") execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["customers"]) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + select=["customers"], + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -884,7 +944,10 @@ def test_update_node_dependency_test_not_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(exclude=["config.materialized:test"]) + render_config = RenderConfig( + exclude=["config.materialized:test"], + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -907,7 +970,7 @@ def test_tag_selected_node_test_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(select=["tag:test_tag"]) + render_config = RenderConfig(select=["tag:test_tag"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -932,7 +995,10 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method, postgres_profi dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), - render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), + render_config=RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), profile_config=postgres_profile_config, ) @@ -980,7 +1046,9 @@ def test_load_via_dbt_ls_file(): profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) render_config = RenderConfig( - dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -1078,7 +1146,10 @@ def test_load_via_dbt_ls_project_config_env_vars( mock_popen().returncode = 0 env_vars = {"MY_ENV_VAR": "my_value"} project_config = ProjectConfig(env_vars=env_vars) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1115,7 +1186,10 @@ def test_profile_created_correctly_with_profile_mapping( mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig(env_vars={}) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = postgres_profile_config execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -1140,7 +1214,10 @@ def test_load_via_dbt_ls_project_config_dbt_vars( mock_popen().returncode = 0 dbt_vars = {"my_var1": "my_value1", "my_var2": "my_value2"} project_config = ProjectConfig(dbt_vars=dbt_vars) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1175,6 +1252,7 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, selector=selector, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -1205,7 +1283,11 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig(partial_parse=False) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + load_method=LoadMode.DBT_LS, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1231,7 +1313,11 @@ def test_load_method_with_unsupported_render_config_selector_arg(load_method): f"RenderConfig.selector is not yet supported when loading dbt projects using the {load_method} parser." ) dbt_graph = DbtGraph( - render_config=RenderConfig(load_method=load_method, selector="my_selector"), + render_config=RenderConfig( + load_method=load_method, + selector="my_selector", + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ), project=MagicMock(), ) with pytest.raises(CosmosLoadDbtException, match=expected_error_msg): @@ -1255,6 +1341,7 @@ def test_load_via_dbt_ls_with_project_config_vars(): render_config=RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -1291,6 +1378,7 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, selector="stage_customers", + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( @@ -1302,11 +1390,13 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile dbt_graph.load_via_dbt_ls() filtered_nodes = dbt_graph.filtered_nodes.keys() - assert len(filtered_nodes) == 4 + assert len(filtered_nodes) == 7 assert "model.jaffle_shop.stg_customers" in filtered_nodes assert "seed.jaffle_shop.raw_customers" in filtered_nodes - # Two tests should be filtered - assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 2 + if SOURCE_RENDERING_BEHAVIOR == SourceRenderingBehavior.ALL: + assert "source.jaffle_shop.postgres_db.raw_customers" in filtered_nodes + # Four tests should be filtered + assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 4 @pytest.mark.parametrize( @@ -1404,7 +1494,13 @@ def airflow_variable(): @pytest.mark.integration def test_dbt_ls_cache_key_args_uses_airflow_vars_to_purge_dbt_ls_cache(airflow_variable): key, value = airflow_variable - graph = DbtGraph(project=ProjectConfig(), render_config=RenderConfig(airflow_vars_to_purge_dbt_ls_cache=[key])) + graph = DbtGraph( + project=ProjectConfig(), + render_config=RenderConfig( + airflow_vars_to_purge_dbt_ls_cache=[key], + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ), + ) assert graph.dbt_ls_cache_key_args == [key, value] @@ -1422,9 +1518,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "65595448aded2c2b52878a801c1d9c59" + assert hash_dir == "a9879ec2ec503b0fe023d059caf50d41" else: - assert hash_dir == "4c826c84a94b0f1f5508c4e425170677" + assert hash_dir == "9001bedf4aa8a329f7b669c89f337c24" @pytest.mark.integration diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index ed46caf88..54d9d01da 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,4 @@ +import json import logging import os import shutil @@ -36,6 +37,7 @@ DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, + DbtSourceLocalOperator, DbtTestLocalOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -444,7 +446,7 @@ def test_run_operator_dataset_inlets_and_outlets(caplog): run_test_dag(dag) - assert run_operator.inlets == [] + assert run_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.raw_customers", extra=None)] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.outlets == [] @@ -770,15 +772,19 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo [ ( DbtSeedLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), ), ( DbtRunLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), ), ( DbtBuildLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), + ), + ( + DbtSourceLocalOperator, + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness"), ), ], ) @@ -941,3 +947,73 @@ def test_handle_exception_subprocess(caplog): operator.handle_exception_subprocess(result) assert len(str(err_context.value)) < 100 # Ensure the error message is not too long assert len(caplog.text) > 1000 # Ensure the log message is not truncated + + +@pytest.fixture +def mock_context(): + return MagicMock() + + +@pytest.fixture +def mock_session(): + return MagicMock() + + +@patch("cosmos.operators.local.Path") +def test_store_freshness_json(mock_path_class, mock_context, mock_session): + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + ) + + # Mock the behavior of Path.exists() and Path.read_text() + mock_sources_json_path = MagicMock() + mock_path_class.return_value = mock_sources_json_path + mock_sources_json_path.exists.return_value = True + mock_sources_json_path.read_text.return_value = '{"key": "value"}' + + # Expected formatted JSON content + expected_freshness = json.dumps({"key": "value"}, indent=4) + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == expected_freshness + + +@patch("cosmos.operators.local.Path") +def test_store_freshness_json_no_file(mock_path_class, mock_context, mock_session): + # Create an instance of the class that contains the method + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + ) + + # Mock the behavior of Path.exists() and Path.read_text() + mock_sources_json_path = MagicMock() + mock_path_class.return_value = mock_sources_json_path + mock_sources_json_path.exists.return_value = False + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == "" + + +def test_store_freshness_not_store_compiled_sql(mock_context, mock_session): + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == ""