From 640719b5c1a7ac4b90e64399041b5c933e700798 Mon Sep 17 00:00:00 2001 From: Satish Chinthanippu Date: Mon, 1 Jul 2024 06:01:51 -0700 Subject: [PATCH 1/4] Implemented new profile mapping class for teradata --- cosmos/profiles/__init__.py | 3 + cosmos/profiles/teradata/__init__.py | 0 cosmos/profiles/teradata/user_pass.py | 60 ++++++ pyproject.toml | 2 + tests/profiles/teradata/__init__.py | 0 .../teradata/test_teradata_user_pass.py | 172 ++++++++++++++++++ 6 files changed, 237 insertions(+) create mode 100644 cosmos/profiles/teradata/__init__.py create mode 100644 cosmos/profiles/teradata/user_pass.py create mode 100644 tests/profiles/teradata/__init__.py create mode 100644 tests/profiles/teradata/test_teradata_user_pass.py diff --git a/cosmos/profiles/__init__.py b/cosmos/profiles/__init__.py index b182bacf7..392b4f78b 100644 --- a/cosmos/profiles/__init__.py +++ b/cosmos/profiles/__init__.py @@ -19,6 +19,7 @@ from .snowflake.user_pass import SnowflakeUserPasswordProfileMapping from .snowflake.user_privatekey import SnowflakePrivateKeyPemProfileMapping from .spark.thrift import SparkThriftProfileMapping +from .teradata.user_pass import TeradataUserPasswordProfileMapping from .trino.certificate import TrinoCertificateProfileMapping from .trino.jwt import TrinoJWTProfileMapping from .trino.ldap import TrinoLDAPProfileMapping @@ -39,6 +40,7 @@ SnowflakePrivateKeyPemProfileMapping, SparkThriftProfileMapping, ExasolUserPasswordProfileMapping, + TeradataUserPasswordProfileMapping, TrinoLDAPProfileMapping, TrinoCertificateProfileMapping, TrinoJWTProfileMapping, @@ -79,6 +81,7 @@ def get_automatic_profile_mapping( "SnowflakeEncryptedPrivateKeyFilePemProfileMapping", "SparkThriftProfileMapping", "ExasolUserPasswordProfileMapping", + "TeradataUserPasswordProfileMapping", "TrinoLDAPProfileMapping", "TrinoCertificateProfileMapping", "TrinoJWTProfileMapping", diff --git a/cosmos/profiles/teradata/__init__.py b/cosmos/profiles/teradata/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cosmos/profiles/teradata/user_pass.py b/cosmos/profiles/teradata/user_pass.py new file mode 100644 index 000000000..f9cb6dc3a --- /dev/null +++ b/cosmos/profiles/teradata/user_pass.py @@ -0,0 +1,60 @@ +"""Maps Airflow Snowflake connections to dbt profiles if they use a user/password.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from ..base import BaseProfileMapping + +if TYPE_CHECKING: + pass + + +class TeradataUserPasswordProfileMapping(BaseProfileMapping): + """ + Maps Airflow Teradata connections using user + password authentication to dbt profiles. + https://docs.getdbt.com/docs/core/connect-data-platform/teradata-setup + https://airflow.apache.org/docs/apache-airflow-providers-teradata/stable/connections/teradata.html + """ + + airflow_connection_type: str = "teradata" + dbt_profile_type: str = "teradata" + + required_fields = [ + "host", + "user", + "password", + ] + secret_fields = [ + "password", + ] + airflow_param_mapping = { + "host": "host", + "user": "login", + "password": "password", + "schema": "schema", + "tmode": "extra.tmode", + } + + @property + def profile(self) -> dict[str, Any | None]: + """Gets profile. The password is stored in an environment variable.""" + profile = { + "port": 1025, + **self.mapped_params, + **self.profile_args, + # password should always get set as env var + "password": self.get_env_var_format("password"), + } + + return self.filter_null(profile) + + @property + def mock_profile(self) -> dict[str, Any | None]: + """Gets mock profile. Defaults port to 1025.""" + parent_mock = super().mock_profile + + return { + "port": 1025, + **parent_mock, + } diff --git a/pyproject.toml b/pyproject.toml index 6c518613b..5cbc93a98 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dbt-all = [ "dbt-redshift", "dbt-snowflake", "dbt-spark", + "dbt-teradata", "dbt-vertica", ] dbt-athena = ["dbt-athena-community", "apache-airflow-providers-amazon>=8.0.0"] @@ -62,6 +63,7 @@ dbt-postgres = ["dbt-postgres"] dbt-redshift = ["dbt-redshift"] dbt-snowflake = ["dbt-snowflake"] dbt-spark = ["dbt-spark"] +dbt-teradata = ["dbt-teradata"] dbt-vertica = ["dbt-vertica<=1.5.4"] openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] all = ["astronomer-cosmos[dbt-all]", "astronomer-cosmos[openlineage]"] diff --git a/tests/profiles/teradata/__init__.py b/tests/profiles/teradata/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/profiles/teradata/test_teradata_user_pass.py b/tests/profiles/teradata/test_teradata_user_pass.py new file mode 100644 index 000000000..8542ec547 --- /dev/null +++ b/tests/profiles/teradata/test_teradata_user_pass.py @@ -0,0 +1,172 @@ +"""Tests for the postgres profile.""" + +from unittest.mock import patch + +import pytest +from airflow.models.connection import Connection + +from cosmos.profiles import get_automatic_profile_mapping +from cosmos.profiles.teradata.user_pass import TeradataUserPasswordProfileMapping + + +@pytest.fixture() +def mock_teradata_conn(): # type: ignore + """ + Sets the connection as an environment variable. + """ + conn = Connection( + conn_id="my_teradata_connection", + conn_type="teradata", + host="my_host", + login="my_user", + password="my_password", + ) + + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + yield conn + + +@pytest.fixture() +def mock_teradata_conn_custom_tmode(): # type: ignore + """ + Sets the connection as an environment variable. + """ + conn = Connection( + conn_id="my_teradata_connection", + conn_type="teradata", + host="my_host", + login="my_user", + password="my_password", + port=1025, + schema="my_database", + extra='{"tmode": "TERA"}', + ) + + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + yield conn + + +def test_connection_claiming() -> None: + """ + Tests that the teradata profile mapping claims the correct connection type. + """ + # should only claim when: + # - conn_type == teradata + # and the following exist: + # - host + # - user + # - password + potential_values = { + "conn_type": "teradata", + "host": "my_host", + "login": "my_user", + "password": "my_password", + } + + # if we're missing any of the values, it shouldn't claim + for key in potential_values: + values = potential_values.copy() + del values[key] + conn = Connection(**values) # type: ignore + + print("testing with", values) + + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + profile_mapping = TeradataUserPasswordProfileMapping(conn) + assert not profile_mapping.can_claim_connection() + + # if we have them all, it should claim + conn = Connection(**potential_values) # type: ignore + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + profile_mapping = TeradataUserPasswordProfileMapping(conn, {"schema": "my_schema"}) + assert profile_mapping.can_claim_connection() + + +def test_profile_mapping_selected( + mock_teradata_conn: Connection, +) -> None: + """ + Tests that the correct profile mapping is selected. + """ + profile_mapping = get_automatic_profile_mapping( + mock_teradata_conn.conn_id, + ) + print(profile_mapping) + print(profile_mapping.profile) + assert isinstance(profile_mapping, TeradataUserPasswordProfileMapping) + + +def test_profile_mapping_keeps_custom_tmode(mock_teradata_conn_custom_tmode: Connection) -> None: + profile = TeradataUserPasswordProfileMapping(mock_teradata_conn_custom_tmode.conn_id) + assert profile.profile["tmode"] == "TERA" + + +def test_profile_args( + mock_teradata_conn: Connection, +) -> None: + """ + Tests that the profile values get set correctly. + """ + profile_mapping = get_automatic_profile_mapping( + mock_teradata_conn.conn_id, + profile_args={"schema": "my_database"}, + ) + assert profile_mapping.profile_args == { + "schema": "my_database", + } + + assert profile_mapping.profile == { + "type": mock_teradata_conn.conn_type, + "host": mock_teradata_conn.host, + "user": mock_teradata_conn.login, + "password": "{{ env_var('COSMOS_CONN_TERADATA_PASSWORD') }}", + "port": mock_teradata_conn.port, + "schema": "my_database", + } + + +def test_profile_args_overrides( + mock_teradata_conn: Connection, +) -> None: + """ + Tests that you can override the profile values. + """ + profile_mapping = get_automatic_profile_mapping( + mock_teradata_conn.conn_id, + profile_args={"schema": "my_schema"}, + ) + assert profile_mapping.profile_args == { + "schema": "my_schema", + } + + assert profile_mapping.profile == { + "type": mock_teradata_conn.conn_type, + "host": mock_teradata_conn.host, + "user": mock_teradata_conn.login, + "password": "{{ env_var('COSMOS_CONN_TERADATA_PASSWORD') }}", + "port": mock_teradata_conn.port, + "schema": "my_schema", + } + + +def test_profile_env_vars( + mock_teradata_conn: Connection, +) -> None: + """ + Tests that the environment variables get set correctly. + """ + profile_mapping = get_automatic_profile_mapping( + mock_teradata_conn.conn_id, + profile_args={"schema": "my_schema"}, + ) + assert profile_mapping.env_vars == { + "COSMOS_CONN_TERADATA_PASSWORD": mock_teradata_conn.password, + } + + +def test_mock_profile() -> None: + """ + Tests that the mock profile port value get set correctly. + """ + profile = TeradataUserPasswordProfileMapping("mock_conn_id") + assert profile.mock_profile.get("port") == 1025 From e574457534891780e10d5c54dcccd3085d564d11 Mon Sep 17 00:00:00 2001 From: Satish Chinthanippu Date: Tue, 2 Jul 2024 04:12:36 -0700 Subject: [PATCH 2/4] Added profile mapping for teradata --- cosmos/profiles/teradata/user_pass.py | 8 ++++--- .../teradata/test_teradata_user_pass.py | 22 +++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cosmos/profiles/teradata/user_pass.py b/cosmos/profiles/teradata/user_pass.py index f9cb6dc3a..a744bdfa9 100644 --- a/cosmos/profiles/teradata/user_pass.py +++ b/cosmos/profiles/teradata/user_pass.py @@ -40,21 +40,23 @@ class TeradataUserPasswordProfileMapping(BaseProfileMapping): def profile(self) -> dict[str, Any | None]: """Gets profile. The password is stored in an environment variable.""" profile = { - "port": 1025, **self.mapped_params, **self.profile_args, # password should always get set as env var "password": self.get_env_var_format("password"), } + # schema is not mandatory in teradata. In teradata user itself a database so if schema is not mentioned + # in both airflow connection and profile_args then treating user as schema. + if "schema" not in self.profile_args and self.mapped_params.get("schema") is None: + profile["schema"] = profile["user"] return self.filter_null(profile) @property def mock_profile(self) -> dict[str, Any | None]: - """Gets mock profile. Defaults port to 1025.""" + """Gets mock profile.""" parent_mock = super().mock_profile return { - "port": 1025, **parent_mock, } diff --git a/tests/profiles/teradata/test_teradata_user_pass.py b/tests/profiles/teradata/test_teradata_user_pass.py index 8542ec547..ff28977fe 100644 --- a/tests/profiles/teradata/test_teradata_user_pass.py +++ b/tests/profiles/teradata/test_teradata_user_pass.py @@ -37,7 +37,6 @@ def mock_teradata_conn_custom_tmode(): # type: ignore host="my_host", login="my_user", password="my_password", - port=1025, schema="my_database", extra='{"tmode": "TERA"}', ) @@ -56,7 +55,7 @@ def test_connection_claiming() -> None: # - host # - user # - password - potential_values = { + potential_values: dict[str, str] = { "conn_type": "teradata", "host": "my_host", "login": "my_user", @@ -69,12 +68,15 @@ def test_connection_claiming() -> None: del values[key] conn = Connection(**values) # type: ignore - print("testing with", values) - with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): profile_mapping = TeradataUserPasswordProfileMapping(conn) assert not profile_mapping.can_claim_connection() + # Even there is no schema, making user as schema as user itself schema in teradata + conn = Connection(**{k: v for k, v in potential_values.items() if k != "schema"}) + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + profile_mapping = TeradataUserPasswordProfileMapping(conn, {"schema": None}) + assert profile_mapping.can_claim_connection() # if we have them all, it should claim conn = Connection(**potential_values) # type: ignore with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): @@ -91,11 +93,15 @@ def test_profile_mapping_selected( profile_mapping = get_automatic_profile_mapping( mock_teradata_conn.conn_id, ) - print(profile_mapping) - print(profile_mapping.profile) assert isinstance(profile_mapping, TeradataUserPasswordProfileMapping) +def test_profile_mapping_keeps_port(mock_teradata_conn: Connection) -> None: + # port is not handled in airflow connection so adding it as profile_args + profile = TeradataUserPasswordProfileMapping(mock_teradata_conn.conn_id, profile_args={"port": 1025}) + assert profile.profile["port"] == 1025 + + def test_profile_mapping_keeps_custom_tmode(mock_teradata_conn_custom_tmode: Connection) -> None: profile = TeradataUserPasswordProfileMapping(mock_teradata_conn_custom_tmode.conn_id) assert profile.profile["tmode"] == "TERA" @@ -120,7 +126,6 @@ def test_profile_args( "host": mock_teradata_conn.host, "user": mock_teradata_conn.login, "password": "{{ env_var('COSMOS_CONN_TERADATA_PASSWORD') }}", - "port": mock_teradata_conn.port, "schema": "my_database", } @@ -144,7 +149,6 @@ def test_profile_args_overrides( "host": mock_teradata_conn.host, "user": mock_teradata_conn.login, "password": "{{ env_var('COSMOS_CONN_TERADATA_PASSWORD') }}", - "port": mock_teradata_conn.port, "schema": "my_schema", } @@ -169,4 +173,4 @@ def test_mock_profile() -> None: Tests that the mock profile port value get set correctly. """ profile = TeradataUserPasswordProfileMapping("mock_conn_id") - assert profile.mock_profile.get("port") == 1025 + assert profile.mock_profile.get("host") == "mock_value" From b0410a3d22c851e0f453a5a8d645224aa5062138 Mon Sep 17 00:00:00 2001 From: Satish Chinthanippu Date: Tue, 2 Jul 2024 07:48:39 -0700 Subject: [PATCH 3/4] Added is_community and removed unwanted code 1. Removed mock_profile as it inherits the parent mock_profile 2. Removed TYPE_CHECKING --- cosmos/profiles/teradata/user_pass.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/cosmos/profiles/teradata/user_pass.py b/cosmos/profiles/teradata/user_pass.py index a744bdfa9..e3ad6e5a6 100644 --- a/cosmos/profiles/teradata/user_pass.py +++ b/cosmos/profiles/teradata/user_pass.py @@ -2,13 +2,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any from ..base import BaseProfileMapping -if TYPE_CHECKING: - pass - class TeradataUserPasswordProfileMapping(BaseProfileMapping): """ @@ -19,6 +16,7 @@ class TeradataUserPasswordProfileMapping(BaseProfileMapping): airflow_connection_type: str = "teradata" dbt_profile_type: str = "teradata" + is_community = True required_fields = [ "host", @@ -51,12 +49,3 @@ def profile(self) -> dict[str, Any | None]: profile["schema"] = profile["user"] return self.filter_null(profile) - - @property - def mock_profile(self) -> dict[str, Any | None]: - """Gets mock profile.""" - parent_mock = super().mock_profile - - return { - **parent_mock, - } From 2bb37a1187ef7067ec8088da3303a5e42d2640c3 Mon Sep 17 00:00:00 2001 From: Satish Chinthanippu Date: Wed, 3 Jul 2024 13:27:27 +0530 Subject: [PATCH 4/4] Removed None as method doesn't return None from filter_null Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> --- cosmos/profiles/teradata/user_pass.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/profiles/teradata/user_pass.py b/cosmos/profiles/teradata/user_pass.py index e3ad6e5a6..059e4a9f0 100644 --- a/cosmos/profiles/teradata/user_pass.py +++ b/cosmos/profiles/teradata/user_pass.py @@ -35,7 +35,7 @@ class TeradataUserPasswordProfileMapping(BaseProfileMapping): } @property - def profile(self) -> dict[str, Any | None]: + def profile(self) -> dict[str, Any]: """Gets profile. The password is stored in an environment variable.""" profile = { **self.mapped_params,