diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 264baa93b..e38cd7157 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -38,11 +38,25 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: ["3.8", "3.9", "3.10", "3.11"]
+ python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]
exclude:
- python-version: "3.11"
airflow-version: "2.4"
+ # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12.
+ # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
+ # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
+ # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
+ - python-version: "3.12"
+ airflow-version: "2.4"
+ - python-version: "3.12"
+ airflow-version: "2.5"
+ - python-version: "3.12"
+ airflow-version: "2.6"
+ - python-version: "3.12"
+ airflow-version: "2.7"
+ - python-version: "3.12"
+ airflow-version: "2.8"
steps:
- uses: actions/checkout@v3
with:
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 18c22f889..ad4bb2c65 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -54,7 +54,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
- rev: v0.4.5
+ rev: v0.4.7
hooks:
- id: ruff
args:
diff --git a/cosmos/config.py b/cosmos/config.py
index 64a7acd08..820833e6c 100644
--- a/cosmos/config.py
+++ b/cosmos/config.py
@@ -319,6 +319,18 @@ class ExecutionConfig:
project_path: Path | None = field(init=False)
def __post_init__(self, dbt_project_path: str | Path | None) -> None:
- if self.invocation_mode and self.execution_mode != ExecutionMode.LOCAL:
- raise CosmosValueError("ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL.")
+ if self.invocation_mode and self.execution_mode not in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
+ raise CosmosValueError(
+ "ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL and ExecutionMode.VIRTUALENV."
+ )
+ if self.execution_mode == ExecutionMode.VIRTUALENV:
+ if self.invocation_mode == InvocationMode.DBT_RUNNER:
+ raise CosmosValueError(
+ "InvocationMode.DBT_RUNNER has not been implemented for ExecutionMode.VIRTUALENV"
+ )
+ elif self.invocation_mode is None:
+ logger.debug(
+ "Defaulting to InvocationMode.SUBPROCESS as it is the only supported invocation mode for ExecutionMode.VIRTUALENV"
+ )
+ self.invocation_mode = InvocationMode.SUBPROCESS
self.project_path = Path(dbt_project_path) if dbt_project_path else None
diff --git a/cosmos/profiles/__init__.py b/cosmos/profiles/__init__.py
index fa8e5c370..5cc3109cc 100644
--- a/cosmos/profiles/__init__.py
+++ b/cosmos/profiles/__init__.py
@@ -9,6 +9,7 @@
from .bigquery.oauth import GoogleCloudOauthProfileMapping
from .bigquery.service_account_file import GoogleCloudServiceAccountFileProfileMapping
from .bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping
+from .clickhouse.user_pass import ClickhouseUserPasswordProfileMapping
from .databricks.token import DatabricksTokenProfileMapping
from .exasol.user_pass import ExasolUserPasswordProfileMapping
from .postgres.user_pass import PostgresUserPasswordProfileMapping
@@ -25,6 +26,7 @@
profile_mappings: list[Type[BaseProfileMapping]] = [
AthenaAccessKeyProfileMapping,
+ ClickhouseUserPasswordProfileMapping,
GoogleCloudServiceAccountFileProfileMapping,
GoogleCloudServiceAccountDictProfileMapping,
GoogleCloudOauthProfileMapping,
diff --git a/cosmos/profiles/athena/access_key.py b/cosmos/profiles/athena/access_key.py
index 02de2be24..8dc14f839 100644
--- a/cosmos/profiles/athena/access_key.py
+++ b/cosmos/profiles/athena/access_key.py
@@ -66,9 +66,11 @@ def profile(self) -> dict[str, Any | None]:
**self.profile_args,
"aws_access_key_id": self.temporary_credentials.access_key,
"aws_secret_access_key": self.get_env_var_format("aws_secret_access_key"),
- "aws_session_token": self.get_env_var_format("aws_session_token"),
}
+ if self.temporary_credentials.token:
+ profile["aws_session_token"] = self.get_env_var_format("aws_session_token")
+
return self.filter_null(profile)
@property
diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py
index 038b34153..17858d7bb 100644
--- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py
+++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py
@@ -22,7 +22,6 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping):
required_fields = [
"project",
- "dataset",
"keyfile_json",
]
@@ -45,12 +44,14 @@ def profile(self) -> dict[str, Any | None]:
Even though the Airflow connection contains hard-coded Service account credentials,
we generate a temporary file and the DBT profile uses it.
"""
- return {
+ profile_dict = {
**self.mapped_params,
"threads": 1,
**self.profile_args,
}
+ return self.filter_null(profile_dict)
+
@property
def mock_profile(self) -> dict[str, Any | None]:
"Generates mock profile. Defaults `threads` to 1."
diff --git a/cosmos/profiles/clickhouse/__init__.py b/cosmos/profiles/clickhouse/__init__.py
new file mode 100644
index 000000000..bd94af5fe
--- /dev/null
+++ b/cosmos/profiles/clickhouse/__init__.py
@@ -0,0 +1,5 @@
+"""Generic Airflow connection -> dbt profile mappings"""
+
+from .user_pass import ClickhouseUserPasswordProfileMapping
+
+__all__ = ["ClickhouseUserPasswordProfileMapping"]
diff --git a/cosmos/profiles/clickhouse/user_pass.py b/cosmos/profiles/clickhouse/user_pass.py
new file mode 100644
index 000000000..7d168895a
--- /dev/null
+++ b/cosmos/profiles/clickhouse/user_pass.py
@@ -0,0 +1,70 @@
+"""Maps Airflow Postgres connections using user + password authentication to dbt profiles."""
+
+from __future__ import annotations
+
+from typing import Any
+
+from ..base import BaseProfileMapping
+
+
+class ClickhouseUserPasswordProfileMapping(BaseProfileMapping):
+ """
+ Maps Airflow generic connections using user + password authentication to dbt Clickhouse profiles.
+ https://docs.getdbt.com/docs/core/connect-data-platform/clickhouse-setup
+ """
+
+ airflow_connection_type: str = "generic"
+ dbt_profile_type: str = "clickhouse"
+ default_port = 9000
+ is_community = True
+
+ required_fields = [
+ "host",
+ "login",
+ "schema",
+ "clickhouse",
+ ]
+ secret_fields = [
+ "password",
+ ]
+ airflow_param_mapping = {
+ "host": "host",
+ "login": "login",
+ "password": "password",
+ "port": "port",
+ "schema": "schema",
+ "clickhouse": "extra.clickhouse",
+ }
+
+ def _set_default_param(self, profile_dict: dict[str, Any]) -> dict[str, Any]:
+ if not profile_dict.get("driver"):
+ profile_dict["driver"] = "native"
+
+ if not profile_dict.get("port"):
+ profile_dict["port"] = self.default_port
+
+ if not profile_dict.get("secure"):
+ profile_dict["secure"] = False
+ return profile_dict
+
+ @property
+ def profile(self) -> dict[str, Any | None]:
+ """Gets profile. The password is stored in an environment variable."""
+ profile_dict = {
+ **self.mapped_params,
+ **self.profile_args,
+ # password should always get set as env var
+ "password": self.get_env_var_format("password"),
+ }
+
+ return self.filter_null(self._set_default_param(profile_dict))
+
+ @property
+ def mock_profile(self) -> dict[str, Any | None]:
+ """Gets mock profile."""
+
+ profile_dict = {
+ **super().mock_profile,
+ }
+
+ return self._set_default_param(profile_dict)
diff --git a/cosmos/settings.py b/cosmos/settings.py
index 2ea0dd4fa..fc5954131 100644
--- a/cosmos/settings.py
+++ b/cosmos/settings.py
@@ -10,7 +10,7 @@
# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
-enable_cache = conf.get("cosmos", "enable_cache", fallback=True)
+enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True)
propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True)
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)
diff --git a/docs/getting_started/execution-modes-local-conflicts.rst b/docs/getting_started/execution-modes-local-conflicts.rst
index 96921b6f7..3f537baba 100644
--- a/docs/getting_started/execution-modes-local-conflicts.rst
+++ b/docs/getting_started/execution-modes-local-conflicts.rst
@@ -10,24 +10,25 @@ If you find errors, we recommend users look into using `alternative execution mo
In the following table, ``x`` represents combinations that lead to conflicts (vanilla ``apache-airflow`` and ``dbt-core`` packages):
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| Airflow / DBT | 1.0 | 1.1 | 1.2 | 1.3 | 1.4 | 1.5 | 1.6 | 1.7 |
-+===============+=====+=====+=====+=====+=====+=====+=====+=====+
-| 2.2 | | | | x | x | x | x | x |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.3 | x | x | | x | x | x | x | x |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.4 | x | x | x | | | | | |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.5 | x | x | x | | | | | |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.6 | x | x | x | x | x | | | |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.7 | x | x | x | x | x | | | |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-| 2.8 | x | x | x | x | x | | x | x |
-+---------------+-----+-----+-----+-----+-----+-----+-----+-----+
-
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| Airflow / DBT | 1.0 | 1.1 | 1.2 | 1.3 | 1.4 | 1.5 | 1.6 | 1.7 | 1.8 |
++===============+=====+=====+=====+=====+=====+=====+=====+=====+=====+
+| 2.2 | | | | x | x | x | x | x | x |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.3 | x | x | | x | x | x | x | x | X |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.4 | x | x | x | | | | | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.5 | x | x | x | | | | | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.6 | x | x | x | x | x | | | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.7 | x | x | x | x | x | | | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.8 | x | x | x | x | x | | x | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+| 2.9 | x | x | x | x | x | | | | |
++---------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
Examples of errors
-----------------------------------
@@ -92,9 +93,11 @@ The table was created by running `nox `__ wi
@nox.session(python=["3.10"])
@nox.parametrize(
- "dbt_version", ["1.0", "1.1", "1.2", "1.3", "1.4", "1.5", "1.6", "1.7"]
+ "dbt_version", ["1.0", "1.1", "1.2", "1.3", "1.4", "1.5", "1.6", "1.7", "1.8"]
+ )
+ @nox.parametrize(
+ "airflow_version", ["2.2.4", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]
)
- @nox.parametrize("airflow_version", ["2.2.4", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8"])
def compatibility(session: nox.Session, airflow_version, dbt_version) -> None:
"""Run both unit and integration tests."""
session.run(
diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst
index 1b1a35cb9..266b40f32 100644
--- a/docs/getting_started/execution-modes.rst
+++ b/docs/getting_started/execution-modes.rst
@@ -92,6 +92,7 @@ Some drawbacks of this approach:
- It is slower than ``local`` because it creates a new Python virtual environment for each Cosmos dbt task run.
- If dbt is unavailable in the Airflow scheduler, the default ``LoadMode.DBT_LS`` will not work. In this scenario, users must use a `parsing method `_ that does not rely on dbt, such as ``LoadMode.MANIFEST``.
+- Only ``InvocationMode.SUBPROCESS`` is supported currently, attempt to use ``InvocationMode.DBT_RUNNER`` will raise error.
Example of how to use:
diff --git a/pyproject.toml b/pyproject.toml
index 577fa5047..c91b6abb3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -25,6 +25,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
]
dependencies = [
"aenum",
@@ -43,6 +44,7 @@ dependencies = [
dbt-all = [
"dbt-athena",
"dbt-bigquery",
+ "dbt-clickhouse",
"dbt-databricks",
"dbt-exasol",
"dbt-postgres",
@@ -53,6 +55,7 @@ dbt-all = [
]
dbt-athena = ["dbt-athena-community", "apache-airflow-providers-amazon>=8.0.0"]
dbt-bigquery = ["dbt-bigquery"]
+dbt-clickhouse = ["dbt-clickhouse"]
dbt-databricks = ["dbt-databricks"]
dbt-exasol = ["dbt-exasol"]
dbt-postgres = ["dbt-postgres"]
@@ -138,7 +141,7 @@ dependencies = [
pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"]
[[tool.hatch.envs.tests.matrix]]
-python = ["3.8", "3.9", "3.10", "3.11"]
+python = ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]
[tool.hatch.envs.tests.overrides]
diff --git a/tests/operators/test_base.py b/tests/operators/test_base.py
index 70e4059e7..3d39d43a7 100644
--- a/tests/operators/test_base.py
+++ b/tests/operators/test_base.py
@@ -1,3 +1,4 @@
+import sys
from unittest.mock import patch
import pytest
@@ -14,6 +15,10 @@
)
+@pytest.mark.skipif(
+ (sys.version_info.major, sys.version_info.minor) == (3, 12),
+ reason="The error message for the abstract class instantiation seems to have changed between Python 3.11 and 3.12",
+)
def test_dbt_base_operator_is_abstract():
"""Tests that the abstract base operator cannot be instantiated since the base_cmd is not defined."""
expected_error = (
@@ -23,6 +28,20 @@ def test_dbt_base_operator_is_abstract():
AbstractDbtBaseOperator()
+@pytest.mark.skipif(
+ (sys.version_info.major, sys.version_info.minor) != (3, 12),
+ reason="The error message for the abstract class instantiation seems to have changed between Python 3.11 and 3.12",
+)
+def test_dbt_base_operator_is_abstract_py12():
+ """Tests that the abstract base operator cannot be instantiated since the base_cmd is not defined."""
+ expected_error = (
+ "Can't instantiate abstract class AbstractDbtBaseOperator without an implementation for abstract methods "
+ "'base_cmd', 'build_and_run_cmd'"
+ )
+ with pytest.raises(TypeError, match=expected_error):
+ AbstractDbtBaseOperator()
+
+
@pytest.mark.parametrize("cmd_flags", [["--some-flag"], []])
@patch("cosmos.operators.base.AbstractDbtBaseOperator.build_and_run_cmd")
def test_dbt_base_operator_execute(mock_build_and_run_cmd, cmd_flags, monkeypatch):
diff --git a/tests/profiles/athena/test_athena_access_key.py b/tests/profiles/athena/test_athena_access_key.py
index 71ba1eb05..c0a25b7e9 100644
--- a/tests/profiles/athena/test_athena_access_key.py
+++ b/tests/profiles/athena/test_athena_access_key.py
@@ -1,8 +1,10 @@
"Tests for the Athena profile."
+from __future__ import annotations
import json
import sys
from collections import namedtuple
+from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
@@ -39,12 +41,7 @@ def get_credentials(self) -> Credentials:
yield mock_aws_hook
-@pytest.fixture()
-def mock_athena_conn(): # type: ignore
- """
- Sets the connection as an environment variable.
- """
-
+def mock_conn_value(token: str | None = None) -> Connection:
conn = Connection(
conn_id="my_athena_connection",
conn_type="aws",
@@ -52,7 +49,7 @@ def mock_athena_conn(): # type: ignore
password="my_aws_secret_key",
extra=json.dumps(
{
- "aws_session_token": "token123",
+ "aws_session_token": token,
"database": "my_database",
"region_name": "us-east-1",
"s3_staging_dir": "s3://my_bucket/dbt/",
@@ -60,7 +57,25 @@ def mock_athena_conn(): # type: ignore
}
),
)
+ return conn
+
+@pytest.fixture()
+def mock_athena_conn(): # type: ignore
+ """
+ Sets the connection as an environment variable.
+ """
+ conn = mock_conn_value(token="token123")
+ with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
+ yield conn
+
+
+@pytest.fixture()
+def mock_athena_conn_without_token(): # type: ignore
+ """
+ Sets the connection as an environment variable.
+ """
+ conn = mock_conn_value(token=None)
with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
yield conn
@@ -151,6 +166,28 @@ def test_athena_profile_args(
}
+@mock.patch("cosmos.profiles.athena.access_key.AthenaAccessKeyProfileMapping._get_temporary_credentials")
+def test_athena_profile_args_without_token(mock_temp_cred, mock_athena_conn_without_token: Connection) -> None:
+ """
+ Tests that the profile values get set correctly for Athena.
+ """
+ ReadOnlyCredentials = namedtuple("ReadOnlyCredentials", ["access_key", "secret_key", "token"])
+ credentials = ReadOnlyCredentials(access_key="my_aws_access_key", secret_key="my_aws_secret_key", token=None)
+ mock_temp_cred.return_value = credentials
+
+ profile_mapping = get_automatic_profile_mapping(mock_athena_conn_without_token.conn_id)
+
+ assert profile_mapping.profile == {
+ "type": "athena",
+ "aws_access_key_id": "my_aws_access_key",
+ "aws_secret_access_key": "{{ env_var('COSMOS_CONN_AWS_AWS_SECRET_ACCESS_KEY') }}",
+ "database": mock_athena_conn_without_token.extra_dejson.get("database"),
+ "region_name": mock_athena_conn_without_token.extra_dejson.get("region_name"),
+ "s3_staging_dir": mock_athena_conn_without_token.extra_dejson.get("s3_staging_dir"),
+ "schema": mock_athena_conn_without_token.extra_dejson.get("schema"),
+ }
+
+
def test_athena_profile_args_overrides(
mock_athena_conn: Connection,
) -> None:
diff --git a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py
index 4e56f5ba1..6f0d60b8d 100755
--- a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py
+++ b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py
@@ -64,8 +64,8 @@ def test_connection_claiming_succeeds(mock_bigquery_conn_with_dict: Connection):
def test_connection_claiming_fails(mock_bigquery_conn_with_dict: Connection):
- # Remove the `dataset` key, which is mandatory
- mock_bigquery_conn_with_dict.extra = json.dumps({"project": "my_project", "keyfile_dict": sample_keyfile_dict})
+ # Remove the `project` key, which is mandatory
+ mock_bigquery_conn_with_dict.extra = json.dumps({"dataset": "my_dataset", "keyfile_dict": sample_keyfile_dict})
profile_mapping = GoogleCloudServiceAccountDictProfileMapping(mock_bigquery_conn_with_dict, {})
assert not profile_mapping.can_claim_connection()
@@ -96,7 +96,6 @@ def test_mock_profile(mock_bigquery_conn_with_dict: Connection):
"type": "bigquery",
"method": "service-account-json",
"project": "mock_value",
- "dataset": "mock_value",
"threads": 1,
"keyfile_json": None,
}
diff --git a/tests/profiles/clickhouse/__init__.py b/tests/profiles/clickhouse/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/profiles/clickhouse/test_clickhouse_userpass.py b/tests/profiles/clickhouse/test_clickhouse_userpass.py
new file mode 100644
index 000000000..1f623c803
--- /dev/null
+++ b/tests/profiles/clickhouse/test_clickhouse_userpass.py
@@ -0,0 +1,117 @@
+"""Tests for the clickhouse profile."""
+
+from unittest.mock import patch
+
+import pytest
+from airflow.models.connection import Connection
+
+from cosmos.profiles import get_automatic_profile_mapping
+from cosmos.profiles.clickhouse.user_pass import (
+ ClickhouseUserPasswordProfileMapping,
+)
+
+
+@pytest.fixture()
+def mock_clickhouse_conn(): # type: ignore
+ """Sets the connection as an environment variable."""
+ conn = Connection(
+ conn_id="clickhouse_connection",
+ conn_type="generic",
+ host="my_host",
+ login="my_user",
+ password="my_password",
+ schema="my_database",
+ extra='{"clickhouse": "True"}',
+ )
+
+ with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
+ yield conn
+
+
+def test_connection_claiming1() -> None:
+ """
+ Tests that the clickhouse profile mapping claims the correct connection type.
+
+ should only claim when:
+ - conn_type == generic
+ And the following exist:
+ - host
+ - login
+ - password
+ - schema
+ - extra.clickhouse
+ """
+ required_values = {
+ "conn_type": "generic",
+ "host": "my_host",
+ "login": "my_user",
+ "schema": "my_database",
+ "extra": '{"clickhouse": "True"}',
+ }
+
+ def can_claim_with_missing_key(missing_key: str) -> bool:
+ values = required_values.copy()
+ del values[missing_key]
+ conn = Connection(**values) # type: ignore
+ with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
+ profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
+ return profile_mapping.can_claim_connection()
+
+ # if we're missing any of the required values, it shouldn't claim
+ for key in required_values:
+ assert not can_claim_with_missing_key(key), f"Failed when missing {key}"
+
+ # if we have all the required values, it should claim
+ conn = Connection(**required_values) # type: ignore
+ with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
+ profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
+ assert profile_mapping.can_claim_connection()
+
+
+def test_profile_mapping_selected(
+ mock_clickhouse_conn: Connection,
+) -> None:
+ """Tests that the correct profile mapping is selected."""
+ profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, {})
+ assert isinstance(profile_mapping, ClickhouseUserPasswordProfileMapping)
+
+
+def test_profile_args(mock_clickhouse_conn: Connection) -> None:
+ """Tests that the profile values get set correctly."""
+ profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={})
+
+ assert profile_mapping.profile == {
+ "type": "clickhouse",
+ "schema": mock_clickhouse_conn.schema,
+ "login": mock_clickhouse_conn.login,
+ "password": "{{ env_var('COSMOS_CONN_GENERIC_PASSWORD') }}",
+ "driver": "native",
+ "port": 9000,
+ "host": mock_clickhouse_conn.host,
+ "secure": False,
+ "clickhouse": "True",
+ }
+
+
+def test_mock_profile() -> None:
+ """Tests that the mock_profile values get set correctly."""
+ profile_mapping = ClickhouseUserPasswordProfileMapping(
+ "conn_id"
+ ) # get_automatic_profile_mapping("mock_clickhouse_conn.conn_id", profile_args={})
+
+ assert profile_mapping.mock_profile == {
+ "type": "clickhouse",
+ "schema": "mock_value",
+ "login": "mock_value",
+ "driver": "native",
+ "port": 9000,
+ "host": "mock_value",
+ "secure": False,
+ "clickhouse": "mock_value",
+ }
+
+
+def test_profile_env_vars(mock_clickhouse_conn: Connection) -> None:
+ """Tests that the environment variables get set correctly."""
+ profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={})
+ assert profile_mapping.env_vars == {"COSMOS_CONN_GENERIC_PASSWORD": mock_clickhouse_conn.password}
diff --git a/tests/test_config.py b/tests/test_config.py
index acca546be..d7dc24cbe 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -199,15 +199,34 @@ def test_render_config_env_vars_deprecated():
@pytest.mark.parametrize(
- "execution_mode, expectation",
+ "execution_mode, invocation_mode, expectation",
[
- (ExecutionMode.LOCAL, does_not_raise()),
- (ExecutionMode.VIRTUALENV, pytest.raises(CosmosValueError)),
- (ExecutionMode.KUBERNETES, pytest.raises(CosmosValueError)),
- (ExecutionMode.DOCKER, pytest.raises(CosmosValueError)),
- (ExecutionMode.AZURE_CONTAINER_INSTANCE, pytest.raises(CosmosValueError)),
+ (ExecutionMode.LOCAL, InvocationMode.DBT_RUNNER, does_not_raise()),
+ (ExecutionMode.LOCAL, InvocationMode.SUBPROCESS, does_not_raise()),
+ (ExecutionMode.LOCAL, None, does_not_raise()),
+ (ExecutionMode.VIRTUALENV, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
+ (ExecutionMode.VIRTUALENV, InvocationMode.SUBPROCESS, does_not_raise()),
+ (ExecutionMode.VIRTUALENV, None, does_not_raise()),
+ (ExecutionMode.KUBERNETES, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
+ (ExecutionMode.DOCKER, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
+ (ExecutionMode.AZURE_CONTAINER_INSTANCE, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
],
)
-def test_execution_config_with_invocation_option(execution_mode, expectation):
+def test_execution_config_with_invocation_option(execution_mode, invocation_mode, expectation):
with expectation:
- ExecutionConfig(execution_mode=execution_mode, invocation_mode=InvocationMode.DBT_RUNNER)
+ ExecutionConfig(execution_mode=execution_mode, invocation_mode=invocation_mode)
+
+
+@pytest.mark.parametrize(
+ "execution_mode, expected_invocation_mode",
+ [
+ (ExecutionMode.LOCAL, None),
+ (ExecutionMode.VIRTUALENV, InvocationMode.SUBPROCESS),
+ (ExecutionMode.KUBERNETES, None),
+ (ExecutionMode.DOCKER, None),
+ (ExecutionMode.AZURE_CONTAINER_INSTANCE, None),
+ ],
+)
+def test_execution_config_default_config(execution_mode, expected_invocation_mode):
+ execution_config = ExecutionConfig(execution_mode=execution_mode)
+ assert execution_config.invocation_mode == expected_invocation_mode
diff --git a/tests/test_settings.py b/tests/test_settings.py
new file mode 100644
index 000000000..d9f5e0f6e
--- /dev/null
+++ b/tests/test_settings.py
@@ -0,0 +1,11 @@
+import os
+from importlib import reload
+from unittest.mock import patch
+
+from cosmos import settings
+
+
+@patch.dict(os.environ, {"AIRFLOW__COSMOS__ENABLE_CACHE": "False"}, clear=True)
+def test_enable_cache_env_var():
+ reload(settings)
+ assert settings.enable_cache is False