Skip to content

Commit

Permalink
Merge branch 'main' of github.com-ghec:databricks/dbt-databricks into…
Browse files Browse the repository at this point in the history
… bump_python_sdk_version
  • Loading branch information
eric-wang-1990 committed Nov 16, 2024
2 parents 8dcba15 + 711607e commit bfe05a9
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825))

### Fixes

- Replace array indexing with 'get' in split_part so as not to raise exception when indexing beyond bounds ([839](https://github.com/databricks/dbt-databricks/pull/839))

### Under the Hood

- Significant refactoring and increased testing of python_submissions ([830](https://github.com/databricks/dbt-databricks/pull/830))
Expand All @@ -26,6 +30,7 @@
- Upgrade databricks-sql-connector dependency to 3.5.0 ([833](https://github.com/databricks/dbt-databricks/pull/833))
- Prepare for python typing deprecations ([837](https://github.com/databricks/dbt-databricks/pull/837))
- Fix behavior flag use in init of DatabricksAdapter (thanks @VersusFacit!) ([836](https://github.com/databricks/dbt-databricks/pull/836))
- Restrict pydantic to V1 per dbt Labs' request ([843](https://github.com/databricks/dbt-databricks/pull/843))

## dbt-databricks 1.8.7 (October 10, 2024)

Expand Down
36 changes: 12 additions & 24 deletions dbt/adapters/databricks/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ def __post_init__(self) -> None:
"_user_agent_entry",
):
if key in connection_parameters:
raise DbtValidationError(
f"The connection parameter `{key}` is reserved."
)
raise DbtValidationError(f"The connection parameter `{key}` is reserved.")
if "http_headers" in connection_parameters:
http_headers = connection_parameters["http_headers"]
if not isinstance(http_headers, dict) or any(
Expand All @@ -145,9 +143,7 @@ def validate_creds(self) -> None:

if not self.token and self.auth_type != "oauth":
raise DbtConfigError(
(
"The config `auth_type: oauth` is required when not using access token"
)
("The config `auth_type: oauth` is required when not using access token")
)

if not self.client_id and self.client_secret:
Expand Down Expand Up @@ -175,15 +171,11 @@ def get_invocation_env(cls) -> Optional[str]:
# Thrift doesn't allow nested () so we need to ensure
# that the passed user agent is valid.
if not DBT_DATABRICKS_INVOCATION_ENV_REGEX.search(invocation_env):
raise DbtValidationError(
f"Invalid invocation environment: {invocation_env}"
)
raise DbtValidationError(f"Invalid invocation environment: {invocation_env}")
return invocation_env

@classmethod
def get_all_http_headers(
cls, user_http_session_headers: dict[str, str]
) -> dict[str, str]:
def get_all_http_headers(cls, user_http_session_headers: dict[str, str]) -> dict[str, str]:
http_session_headers_str: Optional[str] = os.environ.get(
DBT_DATABRICKS_HTTP_SESSION_HEADERS
)
Expand Down Expand Up @@ -218,17 +210,13 @@ def type(self) -> str:
def unique_field(self) -> str:
return cast(str, self.host)

def connection_info(
self, *, with_aliases: bool = False
) -> Iterable[tuple[str, Any]]:
def connection_info(self, *, with_aliases: bool = False) -> Iterable[tuple[str, Any]]:
as_dict = self.to_dict(omit_none=False)
connection_keys = set(self._connection_keys(with_aliases=with_aliases))
aliases: list[str] = []
if with_aliases:
aliases = [k for k, v in self._ALIASES.items() if v in connection_keys]
for key in itertools.chain(
self._connection_keys(with_aliases=with_aliases), aliases
):
for key in itertools.chain(self._connection_keys(with_aliases=with_aliases), aliases):
if key in as_dict:
yield key, as_dict[key]

Expand Down Expand Up @@ -302,13 +290,9 @@ class DatabricksCredentialManager(DataClassDictMixin):
auth_type: Optional[str] = None

@classmethod
def create_from(
cls, credentials: DatabricksCredentials
) -> "DatabricksCredentialManager":
if credentials.host is None:
raise ValueError("host cannot be None")
def create_from(cls, credentials: DatabricksCredentials) -> "DatabricksCredentialManager":
return DatabricksCredentialManager(
host=credentials.host,
host=credentials.host or "",
token=credentials.token,
client_id=credentials.client_id or CLIENT_ID,
client_secret=credentials.client_secret or "",
Expand Down Expand Up @@ -426,10 +410,14 @@ def inner() -> Callable[[], Dict[str, str]]:

@property
def header_factory(self) -> CredentialsProvider:
if self._config is None:
raise RuntimeError("Config is not initialized")
header_factory = self._config._header_factory
assert header_factory is not None, "Header factory is not set."
return header_factory

@property
def config(self) -> Config:
if self._config is None:
raise RuntimeError("Config is not initialized")
return self._config
13 changes: 12 additions & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@
from dbt_common.exceptions import DbtInternalError
from dbt_common.contracts.config.base import BaseConfig

from importlib import metadata
from packaging import version

if TYPE_CHECKING:
from agate import Row
from agate import Table

dbt_version = metadata.version("dbt-core")
SUPPORT_MICROBATCH = version.parse(dbt_version) >= version.parse("1.9.0b1")

CURRENT_CATALOG_MACRO_NAME = "current_catalog"
USE_CATALOG_MACRO_NAME = "use_catalog"
GET_CATALOG_MACRO_NAME = "get_catalog"
Expand Down Expand Up @@ -659,8 +665,13 @@ def run_sql_for_tests(
cursor.close()
conn.transaction_open = False

@available
def valid_incremental_strategies(self) -> list[str]:
return ["append", "merge", "insert_overwrite", "replace_where", "microbatch"]
valid_strategies = ["append", "merge", "insert_overwrite", "replace_where"]
if SUPPORT_MICROBATCH:
valid_strategies.append("microbatch")

return valid_strategies

@property
def python_submission_helpers(self) -> dict[str, type[PythonJobHelper]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ select {{source_cols_csv}} from {{ source_relation }}
{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{# need dest_columns for merge_exclude_columns, default to use "*" #}

{%- set target_alias = config.get('target_alias', 'tgt') -%}
{%- set source_alias = config.get('source_alias', 'src') -%}
{%- set target_alias = config.get('target_alias', 'DBT_INTERNAL_DEST') -%}
{%- set source_alias = config.get('source_alias', 'DBT_INTERNAL_SOURCE') -%}

{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
Expand Down Expand Up @@ -146,7 +146,7 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endif %}
{% endmacro %}

{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='src') %}
{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %}
{%- if update_columns -%}
{%- for column_name in update_columns -%}
{{ column_name }} = {{ source_alias }}.{{ column_name }}{%- if not loop.last %}, {% endif -%}
Expand All @@ -160,7 +160,7 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endif -%}
{% endmacro %}

{% macro get_merge_insert(on_schema_change, source_columns, source_alias='src') %}
{% macro get_merge_insert(on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %}
{%- if on_schema_change == 'ignore' -%}
*
{%- else -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
Use the 'merge' or 'replace_where' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where', 'microbatch'] %}
{% if raw_strategy not in adapter.valid_incremental_strategies() %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
Expand Down
43 changes: 43 additions & 0 deletions dbt/include/databricks/macros/utils/split_part.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{% macro databricks__split_part(string_text, delimiter_text, part_number) %}

{% set delimiter_expr %}

-- escape if starts with a special character
case when regexp_extract({{ delimiter_text }}, '([^A-Za-z0-9])(.*)', 1) != '_'
then concat('\\', {{ delimiter_text }})
else {{ delimiter_text }} end

{% endset %}

{% if part_number >= 0 %}

{% set split_part_expr %}

get(split(
{{ string_text }},
{{ delimiter_expr }}
), {{ part_number - 1 if part_number > 0 else part_number }})

{% endset %}

{% else %}

{% set split_part_expr %}

get(split(
{{ string_text }},
{{ delimiter_expr }}
),
length({{ string_text }})
- length(
replace({{ string_text }}, {{ delimiter_text }}, '')
) + 1 + {{ part_number }}
)

{% endset %}

{% endif %}

{{ return(split_part_expr) }}

{% endmacro %}
21 changes: 11 additions & 10 deletions docs/databricks-merge.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ The merge incremental strategy requires:
- Databricks Runtime 5.1 and above for delta file format
- Apache Spark for hudi file format

dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery.
If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column.
dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery.
If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column.
If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).

Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified.

From v.1.9 onwards `merge` behavior can be tuned by setting the additional parameters.

- Merge steps control parameters that tweak the default behaviour:

- `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement.
- `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped.
- `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement.
- `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause.
- `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause.

- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`.
- `matched_condition`: applies to `when matched` step.
In order to define such conditions one may use `tgt` and `src` as aliases for the target and source tables respectively, e.g. `tgt.col1 = hash(src.col2, src.col3)`.
- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`.
- `matched_condition`: applies to `when matched` step.
In order to define such conditions one may use `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` as aliases for the target and source tables respectively, e.g. `DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3)`.
- `not_matched_condition`: applies to `when not matched` step.
- `not_matched_by_source_condition`: applies to `when not matched by source` step.
- `target_alias`, `source_alias`: string values that will be used instead of `tgt` and `src` to distinguish between source and target tables in the merge statement.
- `target_alias`, `source_alias`: string values that will be used instead of `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` to distinguish between source and target tables in the merge statement.

Example below illustrates how these parameters affect the merge statement generation:

Expand Down Expand Up @@ -53,7 +54,7 @@ from
```

```sql
merge
merge
with schema evolution
into
target_table as t
Expand Down Expand Up @@ -89,8 +90,8 @@ when not matched
s.attr2,
s.tech_change_ts
)

when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
```
```
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
databricks-sql-connector>=3.5.0, <4.0
dbt-spark>=1.9.0b1, <2.0
dbt-core>=1.9.0b1, <2.0
dbt-spark>=1.8.0, <2.0
dbt-core>=1.8.7, <2.0
dbt-common>=1.10.0, <2.0
dbt-adapters>=1.7.0, <2.0
databricks-sdk==0.36.0
keyring>=23.13.0
protobuf<5.0.0
pydantic>=1.10.0, <2
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import sys

# require python 3.8 or newer
# require python 3.9 or newer
if sys.version_info < (3, 9):
print("Error: dbt does not support this version of Python.")
print("Please upgrade to Python 3.9 or higher.")
Expand Down Expand Up @@ -54,16 +54,16 @@ def _get_plugin_version() -> str:
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
include_package_data=True,
install_requires=[
"dbt-spark>=1.9.0b1, <2.0",
"dbt-core>=1.9.0b1, <2.0",
"dbt-spark>=1.8.0, <2.0",
"dbt-core>=1.8.7, <2.0",
"dbt-adapters>=1.7.0, <2.0",
"dbt-common>=1.10.0, <2.0",
"databricks-sql-connector>=3.5.0, <4.0.0",
"databricks-sdk==0.36.0",
"keyring>=23.13.0",
"pandas<2.2.0",
"protobuf<5.0.0",
"pydantic~=2.7.0",
"pydantic>=1.10.0, <2",
],
zip_safe=False,
classifiers=[
Expand Down
6 changes: 4 additions & 2 deletions tests/functional/adapter/iceberg/test_iceberg_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from dbt.artifacts.schemas.results import RunStatus


@pytest.mark.skip_profile("databricks_cluster")
# @pytest.mark.skip_profile("databricks_cluster")
@pytest.mark.skip("Skip for now as it is broken in prod")
class TestIcebergTables:
@pytest.fixture(scope="class")
def models(self):
Expand All @@ -20,7 +21,8 @@ def test_iceberg_refs(self, project):
assert len(run_results) == 3


@pytest.mark.skip_profile("databricks_cluster")
# @pytest.mark.skip_profile("databricks_cluster")
@pytest.mark.skip("Skip for now as it is broken in prod")
class TestIcebergSwap:
@pytest.fixture(scope="class")
def models(self):
Expand Down
1 change: 1 addition & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
source_alias='src',
target_alias='t',
matched_condition='src.V > t.V and hash(src.first, src.second) <> hash(t.first, t.second)',
not_matched_condition='src.V > 0',
Expand Down
8 changes: 8 additions & 0 deletions tests/functional/adapter/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@
BaseMicrobatch,
)
import pytest
from packaging import version
from importlib import metadata

from tests.functional.adapter.microbatch import fixtures

dbt_version = metadata.version("dbt-core")


@pytest.mark.skipif(
version.parse(dbt_version) < version.parse("1.9.0b1"),
reason="Microbatch is not supported with this version of core",
)
class TestDatabricksMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def models(self, microbatch_model_sql, input_model_sql):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/python/test_python_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_parsed_model__run_name(self):
assert model.run_name.startswith("hive_metastore-default-test-")

def test_parsed_model__invalid_config(self):
parsed_model = {"alias": "test", "config": []}
parsed_model = {"alias": "test", "config": 1}
with pytest.raises(ValidationError):
ParsedPythonModel(**parsed_model)

Expand Down

0 comments on commit bfe05a9

Please sign in to comment.