diff --git a/.changes/unreleased/Features-20220823-085727.yaml b/.changes/unreleased/Features-20220823-085727.yaml new file mode 100644 index 00000000000..4d8daebbf5e --- /dev/null +++ b/.changes/unreleased/Features-20220823-085727.yaml @@ -0,0 +1,7 @@ +kind: Features +body: incremental predicates +time: 2022-08-23T08:57:27.640804-05:00 +custom: + Author: dave-connors-3 + Issue: "5680" + PR: "5702" diff --git a/core/dbt/docs/build/doctrees/environment.pickle b/core/dbt/docs/build/doctrees/environment.pickle index 73d18c236ad..8aaad5e25b0 100644 Binary files a/core/dbt/docs/build/doctrees/environment.pickle and b/core/dbt/docs/build/doctrees/environment.pickle differ diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index 602067616d2..e8ff5c1ea4f 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -50,9 +50,9 @@ {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} - {% set incremental_predicates = config.get('incremental_predicates', none) %} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} - {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} {% endif %} diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index 836d768d01a..5033178be49 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -1,9 +1,9 @@ -{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%} - {{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }} +{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }} {%- endmacro %} -{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} - {%- set predicates = [] if predicates is none else [] + predicates -%} +{% macro default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set merge_update_columns = config.get('merge_update_columns') -%} {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} @@ -32,7 +32,7 @@ merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE - on {{ predicates | join(' and ') }} + on {{"(" ~ predicates | join(") and (") ~ ")"}} {% if unique_key %} when matched then update set @@ -50,11 +50,11 @@ {% endmacro %} -{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} - {{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }} +{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }} {%- endmacro %} -{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} +{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} @@ -65,8 +65,13 @@ where ( {% for key in unique_key %} {{ source }}.{{ key }} = {{ target }}.{{ key }} - {{ "and " if not loop.last }} + {{ "and " if not loop.last}} {% endfor %} + {% if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {% endif %} ); {% else %} delete from {{ target }} @@ -74,7 +79,12 @@ {{ unique_key }}) in ( select ({{ unique_key }}) from {{ source }} - ); + ) + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%}; {% endif %} {% endif %} diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 5226d01de16..72082ccad32 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -21,7 +21,7 @@ {% macro default__get_incremental_delete_insert_sql(arg_dict) %} - {% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} + {% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} {% endmacro %} @@ -35,7 +35,7 @@ {% macro default__get_incremental_merge_sql(arg_dict) %} - {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} + {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} {% endmacro %} @@ -48,7 +48,7 @@ {% macro default__get_incremental_insert_overwrite_sql(arg_dict) %} - {% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %} + {% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} {% endmacro %} diff --git a/tests/adapter/dbt/tests/adapter/incremental/test_incremental_predicates.py b/tests/adapter/dbt/tests/adapter/incremental/test_incremental_predicates.py new file mode 100644 index 00000000000..11a4b6c0384 --- /dev/null +++ b/tests/adapter/dbt/tests/adapter/incremental/test_incremental_predicates.py @@ -0,0 +1,154 @@ +import pytest +from dbt.tests.util import run_dbt, check_relations_equal +from collections import namedtuple + + +models__delete_insert_incremental_predicates_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id' +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg, 'blue' as color +union all +select 2 as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- delete will not happen on the above record where id = 2, so new record will be inserted instead +select 1 as id, 'hey' as msg, 'blue' as color +union all +select 2 as id, 'yo' as msg, 'green' as color +union all +select 3 as id, 'anyway' as msg, 'purple' as color + +{% endif %} +""" + +seeds__expected_delete_insert_incremental_predicates_csv = """id,msg,color +1,hey,blue +2,goodbye,red +2,yo,green +3,anyway,purple +""" + +ResultHolder = namedtuple( + "ResultHolder", + [ + "seed_count", + "model_count", + "seed_rows", + "inc_test_model_count", + "opt_model_count", + "relation", + ], +) + + +class BaseIncrementalPredicates: + @pytest.fixture(scope="class") + def models(self): + return { + "delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+incremental_predicates": [ + "id != 2" + ], + "+incremental_strategy": "delete+insert" + } + } + + def update_incremental_model(self, incremental_model): + """update incremental model after the seed table has been updated""" + model_result_set = run_dbt(["run", "--select", incremental_model]) + return len(model_result_set) + + def get_test_fields( + self, project, seed, incremental_model, update_sql_file, opt_model_count=None + ): + + seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"])) + + model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"])) + # pass on kwarg + relation = incremental_model + # update seed in anticipation of incremental model update + row_count_query = "select * from {}.{}".format(project.test_schema, seed) + # project.run_sql_file(Path("seeds") / Path(update_sql_file + ".sql")) + seed_rows = len(project.run_sql(row_count_query, fetch="all")) + + # propagate seed state to incremental model according to unique keys + inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model) + + return ResultHolder( + seed_count, model_count, seed_rows, inc_test_model_count, opt_model_count, relation + ) + + def check_scenario_correctness(self, expected_fields, test_case_fields, project): + """Invoke assertions to verify correct build functionality""" + # 1. test seed(s) should build afresh + assert expected_fields.seed_count == test_case_fields.seed_count + # 2. test model(s) should build afresh + assert expected_fields.model_count == test_case_fields.model_count + # 3. seeds should have intended row counts post update + assert expected_fields.seed_rows == test_case_fields.seed_rows + # 4. incremental test model(s) should be updated + assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count + # 5. extra incremental model(s) should be built; optional since + # comparison may be between an incremental model and seed + if expected_fields.opt_model_count and test_case_fields.opt_model_count: + assert expected_fields.opt_model_count == test_case_fields.opt_model_count + # 6. result table should match intended result set (itself a relation) + check_relations_equal( + project.adapter, [expected_fields.relation, test_case_fields.relation] + ) + + def get_expected_fields(self, relation, seed_rows, opt_model_count=None): + return ResultHolder( + seed_count=1, + model_count=1, + inc_test_model_count=1, + seed_rows=seed_rows, + opt_model_count=opt_model_count, + relation=relation + ) + + # no unique_key test + def test__incremental_predicates(self, project): + """seed should match model after two incremental runs""" + + expected_fields = self.get_expected_fields(relation="expected_delete_insert_incremental_predicates", seed_rows=4) + test_case_fields = self.get_test_fields( + project, seed="expected_delete_insert_incremental_predicates", incremental_model="delete_insert_incremental_predicates", update_sql_file=None + ) + self.check_scenario_correctness(expected_fields, test_case_fields, project) + + +class TestIncrementalPredicatesDeleteInsert(BaseIncrementalPredicates): + pass + + +class TestPredicatesDeleteInsert(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+predicates": [ + "id != 2" + ], + "+incremental_strategy": "delete+insert" + } + }