Skip to content

Commit

Permalink
Adding replace_where incremental strategy (#293) (#310)
Browse files Browse the repository at this point in the history
This PR adds a new incremental strategy `replace_where`. The strategy resolves to an `INSERT INTO ... REPLACE WHERE` statement. It completes the feature set explained here: https://docs.databricks.com/delta/selective-overwrite.html#replace-where&language-python

A lot of the code change is to bring macros from dbt-spark over to dbt-databricks. The only real code change was in validating incremental strategies and adding the replace_where strategy.

#### Why do we need it?
It enables use cases where part of the data is always replaced and where MERGE is not possible, such as when there is no primary key. E.g.: events table where we want to always replace the last 3 days.

#### Difference from insert_overwrite
Insert overwrite only works with dynamic partition pruning spark setting, which is not available in sql warehouses or any Unity Catalog-enabled cluster. It also only works with whole partitions, making it difficult to set up and assure that the correct data is dropped.


Signed-off-by: Andre Furlan <[email protected]>
  • Loading branch information
andrefurlan-db authored Apr 13, 2023
1 parent f7b74e4 commit ed59030
Show file tree
Hide file tree
Showing 26 changed files with 450 additions and 36 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def run_sql_for_tests(
conn.transaction_open = False

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite"]
return ["append", "merge", "insert_overwrite", "replace_where"]

@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{%- set raw_strategy = config.get('incremental_strategy') or 'merge' -%}
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{%- set file_format = dbt_databricks_validate_get_file_format(raw_file_format) -%}
{%- set incremental_strategy = dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{#-- Set vars --#}

Expand All @@ -17,7 +17,7 @@
{%- set target_relation = this -%}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%}

{#-- Set Overwrite Mode --#}
{#-- Set Overwrite Mode - does not work for warehouses --#}
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,95 @@
{% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}

{% macro databricks__get_incremental_replace_where_sql(arg_dict) %}
{% do return(get_replace_where_sql(arg_dict)) %}
{% endmacro %}

{% macro get_incremental_replace_where_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_replace_where_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro databricks__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ return(get_insert_overwrite_sql(source, target)) }}
{% endmacro %}


{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}

{% macro get_replace_where_sql(args_dict) -%}
{%- set predicates = args_dict['incremental_predicates'] -%}
{%- set target_relation = args_dict['target_relation'] -%}
{%- set temp_relation = args_dict['temp_relation'] -%}

insert into {{ target_relation }}
{% if predicates %}
{% if predicates is sequence and predicates is not string %}
replace where {{ predicates | join(' and ') }}
{% else %}
replace where {{ predicates }}
{% endif %}
{% endif %}
table {{ temp_relation }}

{% endmacro %}

{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}

{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{# need dest_columns for merge_exclude_columns, default to use "*" #}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}

when matched then update set
{% if update_columns -%}{%- for column_name in update_columns %}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{%- else %} * {% endif %}

when not matched then insert *
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{% macro dbt_databricks_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expected one of: {{ accepted_formats | join(', ') }}
{%- endset %}

{% if raw_file_format not in accepted_formats %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% do return(raw_file_format) %}
{% endmacro %}


{% macro dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'merge', 'replace_where', 'append', 'insert_overwrite'
{%- endset %}

{% set invalid_delta_only_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when connecting via warehouse
Use the 'merge' or 'replace_where' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'replace_where' and file_format not in ['delta'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'replace_where',
incremental_predicates = "id >= 2"
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
unique_key = 'id',
merge_exclude_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be ignored, color will be updated
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized = 'incremental',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
merge_update_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be updated, color will be ignored
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'replace_where',
incremental_predicates = "id >= 2"
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
unique_key = 'id',
merge_exclude_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be ignored, color will be updated
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized = 'incremental',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Loading

0 comments on commit ed59030

Please sign in to comment.