From 665519b57e53640ce752e7a046d7ad0e23fba39d Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 31 Oct 2024 15:32:20 -0700 Subject: [PATCH 1/4] downgrade to concrete required version of core so that we are one step closer to release --- dbt/adapters/databricks/impl.py | 12 +++++++++++- .../macros/materializations/incremental/validate.sql | 2 +- requirements.txt | 4 ++-- setup.py | 6 +++--- .../functional/adapter/microbatch/test_microbatch.py | 8 ++++++++ 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 2b4ea7a1..e781a568 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -84,10 +84,16 @@ from dbt_common.exceptions import DbtInternalError from dbt_common.contracts.config.base import BaseConfig +import pkg_resources +from packaging import version + if TYPE_CHECKING: from agate import Row from agate import Table +dbt_version = pkg_resources.get_distribution("dbt-core").version +SUPPORT_MICROBATCH = version.parse(dbt_version) >= version.parse("1.9.0b1") + CURRENT_CATALOG_MACRO_NAME = "current_catalog" USE_CATALOG_MACRO_NAME = "use_catalog" GET_CATALOG_MACRO_NAME = "get_catalog" @@ -660,7 +666,11 @@ def run_sql_for_tests( conn.transaction_open = False def valid_incremental_strategies(self) -> list[str]: - return ["append", "merge", "insert_overwrite", "replace_where", "microbatch"] + valid_strategies = ["append", "merge", "insert_overwrite", "replace_where"] + if SUPPORT_MICROBATCH: + valid_strategies.append("microbatch") + + return valid_strategies @property def python_submission_helpers(self) -> dict[str, type[PythonJobHelper]]: diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql index 7b5c5bd7..6e53957b 100644 --- a/dbt/include/databricks/macros/materializations/incremental/validate.sql +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -35,7 +35,7 @@ Use the 'merge' or 'replace_where' strategy instead {%- endset %} - {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where', 'microbatch'] %} + {% if raw_strategy not in adapter.valid_strategies %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} diff --git a/requirements.txt b/requirements.txt index fb9c7f54..49a8fd3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ databricks-sql-connector>=3.5.0, <4.0 -dbt-spark>=1.9.0b1, <2.0 -dbt-core>=1.9.0b1, <2.0 +dbt-spark>=1.8.0, <2.0 +dbt-core>=1.8.7, <2.0 dbt-common>=1.10.0, <2.0 dbt-adapters>=1.7.0, <2.0 databricks-sdk==0.17.0 diff --git a/setup.py b/setup.py index 5c236324..f22997e8 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ import os import sys -# require python 3.8 or newer +# require python 3.9 or newer if sys.version_info < (3, 9): print("Error: dbt does not support this version of Python.") print("Please upgrade to Python 3.9 or higher.") @@ -54,8 +54,8 @@ def _get_plugin_version() -> str: packages=find_namespace_packages(include=["dbt", "dbt.*"]), include_package_data=True, install_requires=[ - "dbt-spark>=1.9.0b1, <2.0", - "dbt-core>=1.9.0b1, <2.0", + "dbt-spark>=1.8.0, <2.0", + "dbt-core>=1.8.7, <2.0", "dbt-adapters>=1.7.0, <2.0", "dbt-common>=1.10.0, <2.0", "databricks-sql-connector>=3.5.0, <4.0.0", diff --git a/tests/functional/adapter/microbatch/test_microbatch.py b/tests/functional/adapter/microbatch/test_microbatch.py index 4bf66a22..a8cf1b64 100644 --- a/tests/functional/adapter/microbatch/test_microbatch.py +++ b/tests/functional/adapter/microbatch/test_microbatch.py @@ -2,10 +2,18 @@ BaseMicrobatch, ) import pytest +from packaging import version +import pkg_resources from tests.functional.adapter.microbatch import fixtures +dbt_version = pkg_resources.get_distribution("dbt-core").version + +@pytest.mark.skipif( + version.parse(dbt_version) < version.parse("1.9.0b1"), + reason="Microbatch is not supported with this version of core", +) class TestDatabricksMicrobatch(BaseMicrobatch): @pytest.fixture(scope="class") def models(self, microbatch_model_sql, input_model_sql): From a79840229767d2080db9292ac149cc87d9862375 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 31 Oct 2024 15:47:32 -0700 Subject: [PATCH 2/4] move to a python 3.12 compat approach --- dbt/adapters/databricks/impl.py | 4 ++-- tests/functional/adapter/microbatch/test_microbatch.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index e781a568..f15a2eca 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -84,14 +84,14 @@ from dbt_common.exceptions import DbtInternalError from dbt_common.contracts.config.base import BaseConfig -import pkg_resources +from importlib import metadata from packaging import version if TYPE_CHECKING: from agate import Row from agate import Table -dbt_version = pkg_resources.get_distribution("dbt-core").version +dbt_version = metadata.version("dbt-core") SUPPORT_MICROBATCH = version.parse(dbt_version) >= version.parse("1.9.0b1") CURRENT_CATALOG_MACRO_NAME = "current_catalog" diff --git a/tests/functional/adapter/microbatch/test_microbatch.py b/tests/functional/adapter/microbatch/test_microbatch.py index a8cf1b64..89b7f11e 100644 --- a/tests/functional/adapter/microbatch/test_microbatch.py +++ b/tests/functional/adapter/microbatch/test_microbatch.py @@ -3,11 +3,11 @@ ) import pytest from packaging import version -import pkg_resources +from importlib import metadata from tests.functional.adapter.microbatch import fixtures -dbt_version = pkg_resources.get_distribution("dbt-core").version +dbt_version = metadata.version("dbt-core") @pytest.mark.skipif( From 388f7bed93252376da4b471a2d44055712259a14 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 31 Oct 2024 15:48:46 -0700 Subject: [PATCH 3/4] fix the bug Jacky found --- .../databricks/macros/materializations/incremental/validate.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql index 6e53957b..cfc85e5d 100644 --- a/dbt/include/databricks/macros/materializations/incremental/validate.sql +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -35,7 +35,7 @@ Use the 'merge' or 'replace_where' strategy instead {%- endset %} - {% if raw_strategy not in adapter.valid_strategies %} + {% if raw_strategy not in adapter.valid_incremental_strategies %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} From 9b68640da3aa8975c79354b62bca8d76c57a7225 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 31 Oct 2024 16:13:35 -0700 Subject: [PATCH 4/4] fix --- dbt/adapters/databricks/impl.py | 1 + .../databricks/macros/materializations/incremental/validate.sql | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index f15a2eca..094b0950 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -665,6 +665,7 @@ def run_sql_for_tests( cursor.close() conn.transaction_open = False + @available def valid_incremental_strategies(self) -> list[str]: valid_strategies = ["append", "merge", "insert_overwrite", "replace_where"] if SUPPORT_MICROBATCH: diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql index cfc85e5d..68c60af3 100644 --- a/dbt/include/databricks/macros/materializations/incremental/validate.sql +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -35,7 +35,7 @@ Use the 'merge' or 'replace_where' strategy instead {%- endset %} - {% if raw_strategy not in adapter.valid_incremental_strategies %} + {% if raw_strategy not in adapter.valid_incremental_strategies() %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}