From ed59030a15ec6d0bcd89b313851c027e62a281fc Mon Sep 17 00:00:00 2001 From: Andre Furlan Date: Thu, 13 Apr 2023 11:06:31 -0700 Subject: [PATCH] Adding replace_where incremental strategy (#293) (#310) This PR adds a new incremental strategy `replace_where`. The strategy resolves to an `INSERT INTO ... REPLACE WHERE` statement. It completes the feature set explained here: https://docs.databricks.com/delta/selective-overwrite.html#replace-where&language-python A lot of the code change is to bring macros from dbt-spark over to dbt-databricks. The only real code change was in validating incremental strategies and adding the replace_where strategy. #### Why do we need it? It enables use cases where part of the data is always replaced and where MERGE is not possible, such as when there is no primary key. E.g.: events table where we want to always replace the last 3 days. #### Difference from insert_overwrite Insert overwrite only works with dynamic partition pruning spark setting, which is not available in sql warehouses or any Unity Catalog-enabled cluster. It also only works with whole partitions, making it difficult to set up and assure that the correct data is dropped. Signed-off-by: Andre Furlan --- dbt/adapters/databricks/impl.py | 2 +- .../incremental/incremental.sql | 6 +- .../incremental/strategies.sql | 89 +++++++++++++++++++ .../materializations/incremental/validate.sql | 53 +++++++++++ .../append_delta.sql | 0 .../insert_overwrite_no_partitions.sql} | 0 .../insert_overwrite_partitions.sql | 19 ++++ .../merge_exclude_columns.sql | 0 .../merge_no_key.sql | 0 .../merge_unique_key.sql | 0 .../merge_update_columns.sql | 0 .../models_delta_cluster/replace_where.sql | 18 ++++ .../models_delta_cluster_uc/append_delta.sql | 18 ++++ .../merge_exclude_columns.sql | 22 +++++ .../models_delta_cluster_uc/merge_no_key.sql | 17 ++++ .../merge_unique_key.sql | 18 ++++ .../merge_update_columns.sql | 20 +++++ .../models_delta_cluster_uc/replace_where.sql | 18 ++++ .../models_delta_warehouse/append_delta.sql | 18 ++++ .../merge_exclude_columns.sql | 22 +++++ .../models_delta_warehouse/merge_no_key.sql | 17 ++++ .../merge_unique_key.sql | 18 ++++ .../merge_update_columns.sql | 20 +++++ .../models_delta_warehouse/replace_where.sql | 18 ++++ .../seeds/expected_replace_where.csv | 3 + .../test_incremental_strategies.py | 70 ++++++++------- 26 files changed, 450 insertions(+), 36 deletions(-) create mode 100644 dbt/include/databricks/macros/materializations/incremental/validate.sql rename tests/integration/incremental_strategies/{models_delta => models_delta_cluster}/append_delta.sql (100%) rename tests/integration/incremental_strategies/{models_bad/bad_insert_overwrite_delta.sql => models_delta_cluster/insert_overwrite_no_partitions.sql} (100%) create mode 100644 tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql rename tests/integration/incremental_strategies/{models_delta => models_delta_cluster}/merge_exclude_columns.sql (100%) rename tests/integration/incremental_strategies/{models_delta => models_delta_cluster}/merge_no_key.sql (100%) rename tests/integration/incremental_strategies/{models_delta => models_delta_cluster}/merge_unique_key.sql (100%) rename tests/integration/incremental_strategies/{models_delta => models_delta_cluster}/merge_update_columns.sql (100%) create mode 100644 tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql create mode 100644 tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql create mode 100644 tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql create mode 100644 tests/integration/incremental_strategies/seeds/expected_replace_where.csv diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 07d4e9ad..870244d3 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -454,7 +454,7 @@ def run_sql_for_tests( conn.transaction_open = False def valid_incremental_strategies(self) -> List[str]: - return ["append", "merge", "insert_overwrite"] + return ["append", "merge", "insert_overwrite", "replace_where"] @property def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 5c7058ee..9aa3c258 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -4,8 +4,8 @@ {%- set raw_strategy = config.get('incremental_strategy') or 'merge' -%} {%- set grant_config = config.get('grants') -%} - {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} - {%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + {%- set file_format = dbt_databricks_validate_get_file_format(raw_file_format) -%} + {%- set incremental_strategy = dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) -%} {#-- Set vars --#} @@ -17,7 +17,7 @@ {%- set target_relation = this -%} {%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%} - {#-- Set Overwrite Mode --#} + {#-- Set Overwrite Mode - does not work for warehouses --#} {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} {%- call statement() -%} set spark.sql.sources.partitionOverwriteMode = DYNAMIC diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 98025a44..d81e8f8f 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -6,6 +6,95 @@ {% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %} {% endmacro %} +{% macro databricks__get_incremental_replace_where_sql(arg_dict) %} + {% do return(get_replace_where_sql(arg_dict)) %} +{% endmacro %} + +{% macro get_incremental_replace_where_sql(arg_dict) %} + + {{ return(adapter.dispatch('get_incremental_replace_where_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + {% macro databricks__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %} {{ return(get_insert_overwrite_sql(source, target)) }} {% endmacro %} + + +{% macro get_insert_overwrite_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert overwrite table {{ target_relation }} + {{ partition_cols(label="partition") }} + select {{dest_cols_csv}} from {{ source_relation }} + +{% endmacro %} + +{% macro get_replace_where_sql(args_dict) -%} + {%- set predicates = args_dict['incremental_predicates'] -%} + {%- set target_relation = args_dict['target_relation'] -%} + {%- set temp_relation = args_dict['temp_relation'] -%} + + insert into {{ target_relation }} + {% if predicates %} + {% if predicates is sequence and predicates is not string %} + replace where {{ predicates | join(' and ') }} + {% else %} + replace where {{ predicates }} + {% endif %} + {% endif %} + table {{ temp_relation }} + +{% endmacro %} + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation }} + +{% endmacro %} + +{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {# need dest_columns for merge_exclude_columns, default to use "*" #} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + {{ sql_header if sql_header is not none }} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{ predicates | join(' and ') }} + + when matched then update set + {% if update_columns -%}{%- for column_name in update_columns %} + {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {%- else %} * {% endif %} + + when not matched then insert * +{% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql new file mode 100644 index 00000000..d599a12e --- /dev/null +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -0,0 +1,53 @@ +{% macro dbt_databricks_validate_get_file_format(raw_file_format) %} + {#-- Validate the file format #} + + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %} + + {% set invalid_file_format_msg -%} + Invalid file format provided: {{ raw_file_format }} + Expected one of: {{ accepted_formats | join(', ') }} + {%- endset %} + + {% if raw_file_format not in accepted_formats %} + {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} + {% endif %} + + {% do return(raw_file_format) %} +{% endmacro %} + + +{% macro dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) %} + {#-- Validate the incremental strategy #} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'merge', 'replace_where', 'append', 'insert_overwrite' + {%- endset %} + + {% set invalid_delta_only_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You can only choose this strategy when file_format is set to 'delta' + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when connecting via warehouse + Use the 'merge' or 'replace_where' strategy instead + {%- endset %} + + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {%-else %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} + {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} + {% endif %} + {% if raw_strategy == 'replace_where' and file_format not in ['delta'] %} + {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} diff --git a/tests/integration/incremental_strategies/models_delta/append_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster/append_delta.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/append_delta.sql rename to tests/integration/incremental_strategies/models_delta_cluster/append_delta.sql diff --git a/tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_no_partitions.sql similarity index 100% rename from tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql rename to tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_no_partitions.sql diff --git a/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql new file mode 100644 index 00000000..a188dd3e --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster/insert_overwrite_partitions.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + partition_by = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_exclude_columns.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_exclude_columns.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_exclude_columns.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_no_key.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_no_key.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_no_key.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_unique_key.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_unique_key.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_unique_key.sql diff --git a/tests/integration/incremental_strategies/models_delta/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster/merge_update_columns.sql similarity index 100% rename from tests/integration/incremental_strategies/models_delta/merge_update_columns.sql rename to tests/integration/incremental_strategies/models_delta_cluster/merge_update_columns.sql diff --git a/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql b/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql new file mode 100644 index 00000000..39762411 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/append_delta.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql new file mode 100644 index 00000000..5d8560aa --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_exclude_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', + merge_exclude_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be ignored, color will be updated +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql new file mode 100644 index 00000000..e2a10393 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_no_key.sql @@ -0,0 +1,17 @@ +{{ config( + materialized = 'incremental', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql new file mode 100644 index 00000000..5a662240 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_unique_key.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql new file mode 100644 index 00000000..658c3251 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/merge_update_columns.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql b/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_cluster_uc/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql b/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql new file mode 100644 index 00000000..39762411 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/append_delta.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql new file mode 100644 index 00000000..5d8560aa --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_exclude_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', + merge_exclude_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be ignored, color will be updated +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql new file mode 100644 index 00000000..e2a10393 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_no_key.sql @@ -0,0 +1,17 @@ +{{ config( + materialized = 'incremental', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql new file mode 100644 index 00000000..5a662240 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_unique_key.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql new file mode 100644 index 00000000..658c3251 --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/merge_update_columns.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql b/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql new file mode 100644 index 00000000..b1696b8f --- /dev/null +++ b/tests/integration/incremental_strategies/models_delta_warehouse/replace_where.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy = 'replace_where', + incremental_predicates = "id >= 2" +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/seeds/expected_replace_where.csv b/tests/integration/incremental_strategies/seeds/expected_replace_where.csv new file mode 100644 index 00000000..051ba4e8 --- /dev/null +++ b/tests/integration/incremental_strategies/seeds/expected_replace_where.csv @@ -0,0 +1,3 @@ +id,msg,color +1,hello,blue +3,anyway,purple \ No newline at end of file diff --git a/tests/integration/incremental_strategies/test_incremental_strategies.py b/tests/integration/incremental_strategies/test_incremental_strategies.py index e6addd05..e882b3f4 100644 --- a/tests/integration/incremental_strategies/test_incremental_strategies.py +++ b/tests/integration/incremental_strategies/test_incremental_strategies.py @@ -53,55 +53,61 @@ def test_insert_overwrite_databricks_cluster(self): self.run_and_test() -class TestDeltaStrategies(TestIncrementalStrategies): +class TestDeltaStrategiesWarehouse(TestIncrementalStrategies): @property def models(self): - return "models_delta" + return "models_delta_warehouse" - def run_and_test(self): + def run_and_test_warehouse(self): self.seed_and_run_twice() self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") - - @use_profile("databricks_cluster") - def test_delta_strategies_databricks_cluster(self): - self.run_and_test() - - @use_profile("databricks_uc_cluster") - def test_delta_strategies_databricks_uc_cluster(self): - self.run_and_test() + self.assertTablesEqual("replace_where", "expected_replace_where") @use_profile("databricks_sql_endpoint") def test_delta_strategies_databricks_sql_endpoint(self): - self.run_and_test() + self.run_and_test_warehouse() @use_profile("databricks_uc_sql_endpoint") def test_delta_strategies_databricks_uc_sql_endpoint(self): - self.run_and_test() + self.run_and_test_warehouse() -# Uncomment this hudi integration test after the hudi 0.10.0 release to make it work. -# class TestHudiStrategies(TestIncrementalStrategies): -# @property -# def models(self): -# return "models_hudi" -# -# def run_and_test(self): -# self.seed_and_run_once() -# self.assertTablesEqual("append", "expected_append") -# self.assertTablesEqual("merge_no_key", "expected_append") -# self.assertTablesEqual("merge_unique_key", "expected_upsert") -# self.assertTablesEqual( -# "insert_overwrite_no_partitions", "expected_overwrite") -# self.assertTablesEqual( -# "insert_overwrite_partitions", "expected_upsert") -# -# @use_profile("apache_spark") -# def test_hudi_strategies_apache_spark(self): -# self.run_and_test() +class TestDeltaStrategiesCluster(TestIncrementalStrategies): + @property + def models(self): + return "models_delta_cluster" + + @use_profile("databricks_cluster") + def test_delta_strategies_databricks_cluster(self): + self.seed_and_run_twice() + self.assertTablesEqual("append_delta", "expected_append") + self.assertTablesEqual("merge_no_key", "expected_append") + self.assertTablesEqual("merge_unique_key", "expected_upsert") + self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") + self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") + self.assertTablesEqual("insert_overwrite_no_partitions", "expected_overwrite") + self.assertTablesEqual("insert_overwrite_partitions", "expected_upsert") + self.assertTablesEqual("replace_where", "expected_replace_where") + + +class TestDeltaStrategiesClusterUC(TestIncrementalStrategies): + @property + def models(self): + return "models_delta_cluster_uc" + + @use_profile("databricks_uc_cluster") + def test_delta_strategies_databricks_uc_cluster(self): + self.seed_and_run_twice() + self.assertTablesEqual("append_delta", "expected_append") + self.assertTablesEqual("merge_no_key", "expected_append") + self.assertTablesEqual("merge_unique_key", "expected_upsert") + self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") + self.assertTablesEqual("merge_exclude_columns", "expected_exclude_upsert") + self.assertTablesEqual("replace_where", "expected_replace_where") class TestBadStrategies(TestIncrementalStrategies):