Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
CorsettiS authored Aug 14, 2024
2 parents eb3ec85 + a89389d commit 897090e
Show file tree
Hide file tree
Showing 26 changed files with 564 additions and 61 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -234,6 +235,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -377,6 +379,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down
34 changes: 32 additions & 2 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TESTABLE_DBT_RESOURCES,
DbtResourceType,
ExecutionMode,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
)
Expand Down Expand Up @@ -127,7 +128,11 @@ def create_test_task_metadata(


def create_task_metadata(
node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False
node: DbtNode,
execution_mode: ExecutionMode,
args: dict[str, Any],
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand All @@ -145,6 +150,7 @@ def create_task_metadata(
DbtResourceType.SNAPSHOT: "DbtSnapshot",
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
args = {**args, **{"models": node.resource_name}}

Expand All @@ -154,6 +160,23 @@ def create_task_metadata(
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
and node.has_freshness is False
and node.has_test is False
):
return None
# TODO: https://github.com/astronomer/astronomer-cosmos
# pragma: no cover
task_id = f"{node.name}_source"
args["select"] = f"source:{node.resource_name}"
args.pop("models")
if use_task_group is True:
task_id = node.resource_type.value
if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL:
# render sources without freshness as empty operators
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator")
else:
task_id = f"{node.name}_{node.resource_type.value}"
if use_task_group is True:
Expand Down Expand Up @@ -185,6 +208,7 @@ def generate_task_or_group(
execution_mode: ExecutionMode,
task_args: dict[str, Any],
test_behavior: TestBehavior,
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
on_warning_callback: Callable[..., Any] | None,
**kwargs: Any,
Expand All @@ -198,7 +222,11 @@ def generate_task_or_group(
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
node=node,
execution_mode=execution_mode,
args=task_args,
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -260,6 +288,7 @@ def build_airflow_graph(
"""
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

Expand All @@ -278,6 +307,7 @@ def build_airflow_graph(
execution_mode=execution_mode,
task_args=task_args,
test_behavior=test_behavior,
source_rendering_behavior=source_rendering_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
node=node,
Expand Down
3 changes: 3 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ExecutionMode,
InvocationMode,
LoadMode,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
)
Expand Down Expand Up @@ -59,6 +60,7 @@ class RenderConfig:
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
"""

emit_datasets: bool = True
Expand All @@ -75,6 +77,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
Expand Down
10 changes: 10 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class TestIndirectSelection(Enum):
EMPTY = "empty"


class SourceRenderingBehavior(Enum):
"""
Modes to configure the source rendering behavior.
"""

NONE = "none"
ALL = "all"
WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness"


class DbtResourceType(aenum.Enum): # type: ignore
"""
Type of dbt node.
Expand Down
2 changes: 1 addition & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
dag=dag,
task_group=task_group,
owner=task_owner,
extra_context=task.extra_context,
**({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}),
**task.arguments,
)

Expand Down
59 changes: 56 additions & 3 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

from airflow.models import Variable

Expand All @@ -33,6 +33,7 @@
DbtResourceType,
ExecutionMode,
LoadMode,
SourceRenderingBehavior,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file
Expand All @@ -53,7 +54,7 @@ class CosmosLoadDbtException(Exception):
@dataclass
class DbtNode:
"""
Metadata related to a dbt node (e.g. model, seed, snapshot).
Metadata related to a dbt node (e.g. model, seed, snapshot, source).
"""

unique_id: str
Expand All @@ -62,6 +63,7 @@ class DbtNode:
file_path: Path
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_freshness: bool = False
has_test: bool = False

@property
Expand Down Expand Up @@ -104,6 +106,30 @@ def context_dict(self) -> dict[str, Any]:
}


def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool:
"""Function to find if a source has null freshness. Scenarios where freshness
looks like:
"freshness": {
"warn_after": {
"count": null,
"period": null
},
"error_after": {
"count": null,
"period": null
},
"filter": null
}
should be considered as null, this function ensures that."""
if freshness is None:
return False
for _, value in freshness.items():
if isinstance(value, dict):
if any(subvalue is not None for subvalue in value.values()):
return True
return False


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
Expand Down Expand Up @@ -147,6 +173,11 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
file_path=project_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
Expand Down Expand Up @@ -353,7 +384,24 @@ def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [dbt_cmd, "ls", "--output", "json"]
if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE:
ls_command = [
dbt_cmd,
"ls",
"--output",
"json",
"--output-keys",
"name",
"unique_id",
"resource_type",
"depends_on",
"original_file_path",
"tags",
"config",
"freshness",
]
else:
ls_command = [dbt_cmd, "ls", "--output", "json"]

ls_args = self.dbt_ls_args
ls_command.extend(self.local_flags)
Expand Down Expand Up @@ -636,6 +684,11 @@ def load_from_dbt_manifest(self) -> None:
file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]),
tags=node_dict["tags"],
config=node_dict["config"],
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)

nodes[node.unique_id] = node
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DbtRunOperationKubernetesOperator,
DbtSeedKubernetesOperator,
DbtSnapshotKubernetesOperator,
DbtSourceKubernetesOperator,
DbtTestKubernetesOperator,
)

Expand Down Expand Up @@ -101,6 +102,12 @@ class DbtSnapshotAwsEksOperator(DbtAwsEksBaseOperator, DbtSnapshotKubernetesOper
"""


class DbtSourceAzureContainerInstanceOperator(DbtAwsEksBaseOperator, DbtSourceKubernetesOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunAwsEksOperator(DbtAwsEksBaseOperator, DbtRunKubernetesOperator):
"""
Executes a dbt core run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -102,6 +103,12 @@ class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContai
"""


class DbtSourceAzureContainerInstanceOperator(DbtSourceMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core run command.
Expand Down
9 changes: 9 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ class DbtSnapshotMixin:
ui_color = "#964B00"


class DbtSourceMixin:
"""
Executes a dbt source freshness command.
"""

base_cmd = ["source", "freshness"]
ui_color = "#34CCEB"


class DbtRunMixin:
"""
Mixin for dbt run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -94,6 +95,12 @@ class DbtSnapshotDockerOperator(DbtSnapshotMixin, DbtDockerBaseOperator):
"""


class DbtSourceDockerOperator(DbtSourceMixin, DbtDockerBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunDockerOperator(DbtRunMixin, DbtDockerBaseOperator):
"""
Executes a dbt core run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -125,6 +126,12 @@ class DbtSnapshotKubernetesOperator(DbtSnapshotMixin, DbtKubernetesBaseOperator)
"""


class DbtSourceKubernetesOperator(DbtSourceMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core run command.
Expand Down
Loading

0 comments on commit 897090e

Please sign in to comment.