-
Notifications
You must be signed in to change notification settings - Fork 175
/
incremental.sql
111 lines (85 loc) · 4.63 KB
/
incremental.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
{% macro dbt_snowflake_get_tmp_relation_type(strategy, unique_key, language) %}
/* {#
If we are running multiple statements (DELETE + INSERT),
we must first save the model query results as a temporary table
in order to guarantee consistent inputs to both statements.
If we are running a single statement (MERGE or INSERT alone),
we can save the model query definition as a view instead,
for faster overall incremental processing.
#} */
{% if language == 'sql' and (strategy in ('default', 'append', 'merge') or (unique_key is none)) %}
{{ return('view') }}
{% else %} {#-- play it safe -- #}
{{ return('table') }}
{% endif %}
{% endmacro %}
{% materialization incremental, adapter='snowflake', supported_languages=['sql', 'python'] -%}
{% set original_query_tag = set_query_tag() %}
{#-- Set vars --#}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set language = model['language'] -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set tmp_relation_type = dbt_snowflake_get_tmp_relation_type(incremental_strategy, unique_key, language) %}
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}
{% set grant_config = config.get('grants') %}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% else %}
{#-- Create the temp relation, either as a view or as a temp table --#}
{% if tmp_relation_type == 'view' %}
{%- call statement('create_tmp_relation') -%}
{{ create_view_as(tmp_relation, compiled_code) }}
{%- endcall -%}
{% else %}
{%- call statement('create_tmp_relation', language=language) -%}
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}
{% endif %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}
{% do drop_relation_if_exists(tmp_relation) %}
{{ run_hooks(post_hooks) }}
{% set target_relation = target_relation.incorporate(type='table') %}
{% set should_revoke =
should_revoke(existing_relation.is_table, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}
{% macro snowflake__get_incremental_default_sql(arg_dict) %}
{{ return(get_incremental_merge_sql(arg_dict)) }}
{% endmacro %}