diff --git a/dbt/include/greenplum/macros/materializations/snapshot_merge.sql b/dbt/include/greenplum/macros/materializations/snapshot_merge.sql index f7f64b4..7e5b157 100644 --- a/dbt/include/greenplum/macros/materializations/snapshot_merge.sql +++ b/dbt/include/greenplum/macros/materializations/snapshot_merge.sql @@ -1,4 +1,132 @@ - {% macro greenplum__snapshot_merge_sql(target, source, insert_cols) -%} - {{ return(greenplum__snapshot_merge_sql(target, source, insert_cols)) }} + {%- set insert_cols_csv = insert_cols | join(', ') -%} + + update {{ target }} + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + from {{ source }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text + and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text) + and {{ target }}.dbt_valid_to is null; + + insert into {{ target }} ({{ insert_cols_csv }}) + select {% for column in insert_cols -%} + DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %} + {%- endfor %} + from {{ source }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text; {% endmacro %} + +{% macro greenplum__snapshot_staging_table(strategy, source_sql, target_relation) -%} + + with snapshot_query as ( + + {{ source_sql }} + + ), + + snapshotted_data as ( + + select *, + {{ strategy.unique_key }} as dbt_unique_key + + from {{ target_relation }} + where dbt_valid_to is null + + ), + + insertions_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, + {{ strategy.scd_id }} as dbt_scd_id + + from snapshot_query + ), + + updates_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + {{ strategy.updated_at }} as dbt_valid_to + + from snapshot_query + ), + + {%- if strategy.invalidate_hard_deletes %} + + deletes_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key + from snapshot_query + ), + {% endif %} + + insertions as ( + + select + 'insert'::text as dbt_change_type, + source_data.* + + from insertions_source_data as source_data + left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where snapshotted_data.dbt_unique_key is null + or ( + snapshotted_data.dbt_unique_key is not null + and ( + {{ strategy.row_changed }} + ) + ) + + ), + + updates as ( + + select + 'update'::text as dbt_change_type, + source_data.*, + snapshotted_data.dbt_scd_id + + from updates_source_data as source_data + join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where ( + {{ strategy.row_changed }} + ) + ) + + {%- if strategy.invalidate_hard_deletes -%} + , + + deletes as ( + + select + 'delete'::text as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as dbt_valid_from, + {{ snapshot_get_time() }} as dbt_updated_at, + {{ snapshot_get_time() }} as dbt_valid_to, + snapshotted_data.dbt_scd_id + + from snapshotted_data + left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where source_data.dbt_unique_key is null + ) + {%- endif %} + + select * from insertions + union all + select * from updates + {%- if strategy.invalidate_hard_deletes %} + union all + select * from deletes + {%- endif %} + +{%- endmacro %} \ No newline at end of file