From 0a125e8cb2d4590dfa0003ef3b2fcacf87e285bd Mon Sep 17 00:00:00 2001 From: the4thamigo-uk Date: Thu, 18 Jul 2024 12:15:00 +0100 Subject: [PATCH] Integration test for adding new unique key column - fixes #322 --- .../incremental/incremental.sql | 15 ++++++++++-- .../adapter/incremental/test_schema_change.py | 23 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index b5ac37ab..b27e60da 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -154,12 +154,23 @@ -- table. {%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%} + + -- Existing table does not have any of the new columns that have been added to the unique_key, so we remove them + {% set old_unique_key = [] %} + {%- for source_column in source_columns -%} + {%- if source_column.name in ( unique_key.split(',') | map('trim') ) -%} + {{ old_unique_key.append(source_column.quoted) }} + {%- endif -%} + {%- endfor -%} + + {%- set old_unique_key_csv = old_unique_key | join(', ') -%} + {% call statement('insert_existing_data') %} insert into {{ inserted_relation }} ({{ source_columns_csv }}) select {{ source_columns_csv }} from {{ existing_relation }} - where ({{ unique_key }}) not in ( - select {{ unique_key }} + where ({{ old_unique_key_csv }}) not in ( + select {{ old_unique_key_csv }} from {{ inserting_relation }} ) {{ adapter.get_model_query_settings(model) }} diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 9bccaf4e..a9a93f89 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -5,7 +5,7 @@ {{ config( materialized='incremental', - unique_key='col_1', + unique_key='col_1' + var('extra_unique_keys',''), on_schema_change='%schema_change%' ) }} @@ -61,7 +61,7 @@ def test_fail(self, project): assert 'out of sync' in log_output.lower() def test_append(self, project): - run_dbt(["run", "--select", "schema_change_append"]) + run_dbt(["run", "--full-refresh", "--select", "schema_change_append"]) result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 @@ -69,3 +69,22 @@ def test_append(self, project): result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") assert result[0][2] == 0 assert result[3][2] == 5 + + def test_append_unique_key(self, project): + run_dbt(["run", "--full-refresh", "--select", "schema_change_append"]) + result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + run_dbt( + [ + "--debug", + "run", + "--select", + "schema_change_append", + "--vars", + '{"extra_unique_keys": ",col_3"}', + ] + ) + result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + assert result[0][2] == 0 + assert result[3][2] == 5