Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding replace_where incremental strategy (#293) #310

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def run_sql_for_tests(
conn.transaction_open = False

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite"]
return ["append", "merge", "insert_overwrite", "replace_where"]

@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{%- set raw_strategy = config.get('incremental_strategy') or 'merge' -%}
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{%- set file_format = dbt_databricks_validate_get_file_format(raw_file_format) -%}
{%- set incremental_strategy = dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{#-- Set vars --#}

Expand All @@ -17,7 +17,7 @@
{%- set target_relation = this -%}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%}

{#-- Set Overwrite Mode --#}
{#-- Set Overwrite Mode - does not work for warehouses --#}
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,95 @@
{% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}

{% macro databricks__get_incremental_replace_where_sql(arg_dict) %}
{% do return(get_replace_where_sql(arg_dict)) %}
{% endmacro %}

{% macro get_incremental_replace_where_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_replace_where_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro databricks__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ return(get_insert_overwrite_sql(source, target)) }}
{% endmacro %}


{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}

{% macro get_replace_where_sql(args_dict) -%}
{%- set predicates = args_dict['incremental_predicates'] -%}
{%- set target_relation = args_dict['target_relation'] -%}
{%- set temp_relation = args_dict['temp_relation'] -%}

insert into {{ target_relation }}
{% if predicates %}
{% if predicates is sequence and predicates is not string %}
replace where {{ predicates | join(' and ') }}
{% else %}
replace where {{ predicates }}
{% endif %}
{% endif %}
table {{ temp_relation }}

{% endmacro %}

{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}

{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{# need dest_columns for merge_exclude_columns, default to use "*" #}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}

when matched then update set
{% if update_columns -%}{%- for column_name in update_columns %}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{%- else %} * {% endif %}

when not matched then insert *
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{% macro dbt_databricks_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expected one of: {{ accepted_formats | join(', ') }}
{%- endset %}

{% if raw_file_format not in accepted_formats %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% do return(raw_file_format) %}
{% endmacro %}


{% macro dbt_databricks_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'merge', 'replace_where', 'append', 'insert_overwrite'
{%- endset %}

{% set invalid_delta_only_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when connecting via warehouse
Use the 'merge' or 'replace_where' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'replace_where' and file_format not in ['delta'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'replace_where',
incremental_predicates = "id >= 2"
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
unique_key = 'id',
merge_exclude_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be ignored, color will be updated
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized = 'incremental',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
merge_update_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be updated, color will be ignored
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'replace_where',
incremental_predicates = "id >= 2"
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
unique_key = 'id',
merge_exclude_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be ignored, color will be updated
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized = 'incremental',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Loading