diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 07d4e9ad..870244d3 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -454,7 +454,7 @@ def run_sql_for_tests( conn.transaction_open = False def valid_incremental_strategies(self) -> List[str]: - return ["append", "merge", "insert_overwrite"] + return ["append", "merge", "insert_overwrite", "replace_where"] @property def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 5c7058ee..9aa3c258 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -4,8 +4,8 @@ {%- set raw_strategy = config.get('incremental_strategy') or 'merge' -%} {%- set grant_config = config.get('grants') -%} - {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} - {%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + {%- set file_format = dbt_databricks_validate_get_file_format(raw_file_format) -%} + {%- set incremental_strategy = dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) -%} {#-- Set vars --#} @@ -17,7 +17,7 @@ {%- set target_relation = this -%} {%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%} - {#-- Set Overwrite Mode --#} + {#-- Set Overwrite Mode - does not work for warehouses --#} {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} {%- call statement() -%} set spark.sql.sources.partitionOverwriteMode = DYNAMIC diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 98025a44..d81e8f8f 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -6,6 +6,95 @@ {% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %} {% endmacro %} +{% macro databricks__get_incremental_replace_where_sql(arg_dict) %} + {% do return(get_replace_where_sql(arg_dict)) %} +{% endmacro %} + +{% macro get_incremental_replace_where_sql(arg_dict) %} + + {{ return(adapter.dispatch('get_incremental_replace_where_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + {% macro databricks__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %} {{ return(get_insert_overwrite_sql(source, target)) }} {% endmacro %} + + +{% macro get_insert_overwrite_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert overwrite table {{ target_relation }} + {{ partition_cols(label="partition") }} + select {{dest_cols_csv}} from {{ source_relation }} + +{% endmacro %} + +{% macro get_replace_where_sql(args_dict) -%} + {%- set predicates = args_dict['incremental_predicates'] -%} + {%- set target_relation = args_dict['target_relation'] -%} + {%- set temp_relation = args_dict['temp_relation'] -%} + + insert into {{ target_relation }} + {% if predicates %} + {% if predicates is sequence and predicates is not string %} + replace where {{ predicates | join(' and ') }} + {% else %} + replace where {{ predicates }} + {% endif %} + {% endif %} + table {{ temp_relation }} + +{% endmacro %} + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation }} + +{% endmacro %} + +{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {# need dest_columns for merge_exclude_columns, default to use "*" #} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + {{ sql_header if sql_header is not none }} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{ predicates | join(' and ') }} + + when matched then update set + {% if update_columns -%}{%- for column_name in update_columns %} + {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {%- else %} * {% endif %} + + when not matched then insert * +{% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql new file mode 100644 index 00000000..d599a12e --- /dev/null +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -0,0 +1,53 @@ +{% macro dbt_databricks_validate_get_file_format(raw_file_format) %} + {#-- Validate the file format #} + + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %} + + {% set invalid_file_format_msg -%} + Invalid file format provided: {{ raw_file_format }} + Expected one of: {{ accepted_formats | join(', ') }} + {%- endset %} + + {% if raw_file_format not in accepted_formats %} + {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} + {% endif %} + + {% do return(raw_file_format) %} +{% endmacro %} + + +{% macro dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) %} + {#-- Validate the incremental strategy #} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'merge', 'replace_where', 'append', 'insert_overwrite' + {%- endset %} + + {% set invalid_delta_only_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You can only choose this strategy when file_format is set to 'delta' + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when connecting via warehouse + Use the 'merge' or 'replace_where' strategy instead + {%- endset %} + + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {%-else %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} + {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} + {% endif %} + {% if raw_strategy == 'replace_where' and file_format not in ['delta'] %} + {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} diff --git a/tests/integration/incremental_strategies/models_delta/append_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster/append_delta.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/append_delta.sql rename to tests/integration/incremental_strategies/models_delta_cluster/append_delta.sql diff --git a/tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_no_partitions.sql similarity index 100% rename from tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql rename to tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_no_partitions.sql diff --git a/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql new file mode 100644 index 00000000..a188dd3e --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + partition_by = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_exclude_columns.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_exclude_columns.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_exclude_columns.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_no_key.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_no_key.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_no_key.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_unique_key.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_unique_key.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_unique_key.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_update_columns.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_update_columns.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_update_columns.sql diff --git a/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql b/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql new file mode 100644 index 00000000..39762411 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql new file mode 100644 index 00000000..5d8560aa --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', + merge_exclude_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be ignored, color will be updated +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql new file mode 100644 index 00000000..e2a10393 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql @@ -0,0 +1,17 @@ +{{ config( + materialized = 'incremental', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql new file mode 100644 index 00000000..5a662240 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql new file mode 100644 index 00000000..658c3251 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql b/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql new file mode 100644 index 00000000..39762411 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql new file mode 100644 index 00000000..5d8560aa --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', + merge_exclude_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be ignored, color will be updated +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql new file mode 100644 index 00000000..e2a10393 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql @@ -0,0 +1,17 @@ +{{ config( + materialized = 'incremental', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql new file mode 100644 index 00000000..5a662240 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql new file mode 100644 index 00000000..658c3251 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql b/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/seeds/expected_replace_where.csv b/tests/integration/incremental_strategies/seeds/expected_replace_where.csv new file mode 100644 index 00000000..051ba4e8 --- /dev/null +++ b/tests/integration/incremental_strategies/seeds/expected_replace_where.csv @@ -0,0 +1,3 @@ +id,msg,color +1,hello,blue +3,anyway,purple \ No newline at end of file diff --git a/tests/integration/incremental_strategies/test_incremental_strategies.py b/tests/integration/incremental_strategies/test_incremental_strategies.py index e6addd05..e882b3f4 100644 --- a/tests/integration/incremental_strategies/test_incremental_strategies.py +++ b/tests/integration/incremental_strategies/test_incremental_strategies.py @@ -53,55 +53,61 @@ def test_insert_overwrite_databricks_cluster(self): self.run_and_test() -class TestDeltaStrategies(TestIncrementalStrategies): +class TestDeltaStrategiesWarehouse(TestIncrementalStrategies): @property def models(self): - return "models_delta" + return "models_delta_warehouse" - def run_and_test(self): + def run_and_test_warehouse(self): self.seed_and_run_twice() self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") - - @use_profile("databricks_cluster") - def test_delta_strategies_databricks_cluster(self): - self.run_and_test() - - @use_profile("databricks_uc_cluster") - def test_delta_strategies_databricks_uc_cluster(self): - self.run_and_test() + self.assertTablesEqual("replace_where", "expected_replace_where") @use_profile("databricks_sql_endpoint") def test_delta_strategies_databricks_sql_endpoint(self): - self.run_and_test() + self.run_and_test_warehouse() @use_profile("databricks_uc_sql_endpoint") def test_delta_strategies_databricks_uc_sql_endpoint(self): - self.run_and_test() + self.run_and_test_warehouse() -# Uncomment this hudi integration test after the hudi 0.10.0 release to make it work. -# class TestHudiStrategies(TestIncrementalStrategies): -# @property -# def models(self): -# return "models_hudi" -# -# def run_and_test(self): -# self.seed_and_run_once() -# self.assertTablesEqual("append", "expected_append") -# self.assertTablesEqual("merge_no_key", "expected_append") -# self.assertTablesEqual("merge_unique_key", "expected_upsert") -# self.assertTablesEqual( -# "insert_overwrite_no_partitions", "expected_overwrite") -# self.assertTablesEqual( -# "insert_overwrite_partitions", "expected_upsert") -# -# @use_profile("apache_spark") -# def test_hudi_strategies_apache_spark(self): -# self.run_and_test() +class TestDeltaStrategiesCluster(TestIncrementalStrategies): + @property + def models(self): + return "models_delta_cluster" + + @use_profile("databricks_cluster") + def test_delta_strategies_databricks_cluster(self): + self.seed_and_run_twice() + self.assertTablesEqual("append_delta", "expected_append") + self.assertTablesEqual("merge_no_key", "expected_append") + self.assertTablesEqual("merge_unique_key", "expected_upsert") + self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") + self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") + self.assertTablesEqual("insert_overwrite_no_partitions", "expected_overwrite") + self.assertTablesEqual("insert_overwrite_partitions", "expected_upsert") + self.assertTablesEqual("replace_where", "expected_replace_where") + + +class TestDeltaStrategiesClusterUC(TestIncrementalStrategies): + @property + def models(self): + return "models_delta_cluster_uc" + + @use_profile("databricks_uc_cluster") + def test_delta_strategies_databricks_uc_cluster(self): + self.seed_and_run_twice() + self.assertTablesEqual("append_delta", "expected_append") + self.assertTablesEqual("merge_no_key", "expected_append") + self.assertTablesEqual("merge_unique_key", "expected_upsert") + self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") + self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") + self.assertTablesEqual("replace_where", "expected_replace_where") class TestBadStrategies(TestIncrementalStrategies):