diff --git a/dbt/context/common.py b/dbt/context/common.py index 5eddcc860ca..7c5ecc1d44d 100644 --- a/dbt/context/common.py +++ b/dbt/context/common.py @@ -305,11 +305,8 @@ def _return(value): def get_this_relation(db_wrapper, project_cfg, profile, model): - table_name = dbt.utils.model_immediate_name( - model, dbt.flags.NON_DESTRUCTIVE) - return db_wrapper.adapter.Relation.create_from_node( - profile, model, table_name=table_name) + profile, model) def create_relation(relation_type, quoting_config): diff --git a/dbt/include/global_project/macros/adapters/snowflake.sql b/dbt/include/global_project/macros/adapters/snowflake.sql index a28c13484da..d63875b648b 100644 --- a/dbt/include/global_project/macros/adapters/snowflake.sql +++ b/dbt/include/global_project/macros/adapters/snowflake.sql @@ -5,3 +5,9 @@ {{ default__create_table_as(temporary, relation, sql) }} {% endmacro %} + +{% macro snowflake__create_view_as(relation, sql) -%} + create or replace view {{ relation }} as ( + {{ sql }} + ); +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/table/snowflake_table.sql b/dbt/include/global_project/macros/materializations/table/snowflake_table.sql new file mode 100644 index 00000000000..5a753babfed --- /dev/null +++ b/dbt/include/global_project/macros/materializations/table/snowflake_table.sql @@ -0,0 +1,93 @@ +{% materialization table, adapter='snowflake' %} + {%- set identifier = model['alias'] -%} + {%- set tmp_identifier = identifier + '__dbt_tmp' -%} + {%- set backup_identifier = identifier + '__dbt_backup' -%} + {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} + + {%- set existing_relations = adapter.list_relations(schema=schema) -%} + {%- set old_relation = adapter.get_relation(relations_list=existing_relations, + schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, + schema=schema, type='table') -%} + {%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier, + schema=schema, type='table') -%} + + /* + See ../view/view.sql for more information about this relation. + */ + {%- set backup_relation = api.Relation.create(identifier=backup_identifier, + schema=schema, type=(old_relation.type or 'table')) -%} + + {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} + {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} + {%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%} + + + -- drop the temp relations if they exists for some reason + {{ adapter.drop_relation(intermediate_relation) }} + {{ adapter.drop_relation(backup_relation) }} + + -- setup: if the target relation already exists, truncate or drop it (if it's a view) + {% if non_destructive_mode -%} + {% if exists_as_table -%} + {{ adapter.truncate_relation(old_relation) }} + {% elif exists_as_view -%} + {{ adapter.drop_relation(old_relation) }} + {%- set old_relation = none -%} + {%- endif %} + {%- endif %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% call statement('main') -%} + {%- if non_destructive_mode -%} + {%- if old_relation is not none -%} + {{ create_table_as(create_as_temporary, intermediate_relation, sql) }} + + {% set dest_columns = adapter.get_columns_in_table(schema, identifier) %} + {% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) ( + select {{ dest_cols_csv }} + from {{ intermediate_relation.include(schema=(not create_as_temporary)) }} + ); + {%- else -%} + {{ create_table_as(create_as_temporary, target_relation, sql) }} + {%- endif -%} + {%- else -%} + {{ create_table_as(create_as_temporary, intermediate_relation, sql) }} + {%- endif -%} + {%- endcall %} + + -- cleanup + {% if non_destructive_mode -%} + -- noop + {%- else -%} + {% if old_relation is not none %} + {% if old_relation.type == 'view' %} + {#-- This is the primary difference between Snowflake and Redshift. Renaming this view + -- would cause an error if the view has become invalid due to upstream schema changes #} + {{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }} + {{ drop_relation_if_exists(old_relation) }} + {% else %} + {{ adapter.rename_relation(target_relation, backup_relation) }} + {% endif %} + {% endif %} + + {{ adapter.rename_relation(intermediate_relation, target_relation) }} + {%- endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + -- finally, drop the existing/backup relation after the commit + {{ drop_relation_if_exists(backup_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} +{% endmaterialization %} diff --git a/dbt/include/global_project/macros/materializations/table/table.sql b/dbt/include/global_project/macros/materializations/table/table.sql index 2f861d56843..c99df1121fe 100644 --- a/dbt/include/global_project/macros/materializations/table/table.sql +++ b/dbt/include/global_project/macros/materializations/table/table.sql @@ -63,20 +63,19 @@ {%- endif -%} {%- endcall %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - -- cleanup {% if non_destructive_mode -%} -- noop {%- else -%} {% if old_relation is not none %} - -- move the existing relation out of the way - {{ adapter.rename_relation(target_relation, backup_relation) }} + {{ adapter.rename_relation(target_relation, backup_relation) }} {% endif %} {{ adapter.rename_relation(intermediate_relation, target_relation) }} {%- endif %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + -- `COMMIT` happens here {{ adapter.commit() }} diff --git a/dbt/include/global_project/macros/materializations/view/bigquery_view.sql b/dbt/include/global_project/macros/materializations/view/bigquery_view.sql deleted file mode 100644 index c8ca7eb0a5e..00000000000 --- a/dbt/include/global_project/macros/materializations/view/bigquery_view.sql +++ /dev/null @@ -1,41 +0,0 @@ -{% materialization view, adapter='bigquery' -%} - - {%- set identifier = model['alias'] -%} - {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} - - {%- set old_relation = adapter.get_relation( - schema=schema, identifier=identifier) -%} - - {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} - - {%- set target_relation = api.Relation.create( - identifier=identifier, schema=schema, - type='view') -%} - - {{ run_hooks(pre_hooks) }} - - -- If there's a table with the same name and we weren't told to full refresh, - -- that's an error. If we were told to full refresh, drop it. - {%- if old_relation is not none and old_relation.is_table -%} - {%- if flags.FULL_REFRESH and not non_destructive_mode -%} - {{ adapter.drop_relation(old_relation) }} - {%- else -%} - {{ exceptions.relation_wrong_type(old_relation, 'view') }} - {%- endif -%} - {%- endif -%} - - -- build model - {% if exists_as_view and non_destructive_mode -%} - {% call noop_statement('main', status="PASS", res=None) -%} - -- Not running : non-destructive mode - {{ sql }} - {%- endcall %} - {%- else -%} - {% call statement('main') -%} - {{ create_view_as(target_relation, sql) }} - {%- endcall %} - {%- endif %} - - {{ run_hooks(post_hooks) }} - -{%- endmaterialization %} diff --git a/dbt/include/global_project/macros/materializations/view/bq_snowflake_view.sql b/dbt/include/global_project/macros/materializations/view/bq_snowflake_view.sql new file mode 100644 index 00000000000..d5ceafec383 --- /dev/null +++ b/dbt/include/global_project/macros/materializations/view/bq_snowflake_view.sql @@ -0,0 +1,98 @@ + +{% macro handle_existing_table(full_refresh, non_destructive_mode, old_relation) %} + {{ adapter_macro("dbt.handle_existing_table", full_refresh, non_destructive_mode, old_relation) }} +{% endmacro %} + +{% macro default__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %} + {%- if not non_destructive_mode -%} + {{ adapter.drop_relation(old_relation) }} + {%- endif -%} +{% endmacro %} + +{% macro bigquery__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %} + {%- if full_refresh and not non_destructive_mode -%} + {{ adapter.drop_relation(old_relation) }} + {%- else -%} + {{ exceptions.relation_wrong_type(old_relation, 'view') }} + {%- endif -%} +{% endmacro %} + + +{# /* + Core materialization implementation. BigQuery and Snowflake are similar + because both can use `create or replace view` where the resulting view schema + is not necessarily the same as the existing view. On Redshift, this would + result in: ERROR: cannot change number of columns in view + + This implementation is superior to the create_temp, swap_with_existing, drop_old + paradigm because transactions don't run DDL queries atomically on Snowflake. By using + `create or replace view`, the materialization becomes atomic in nature. + */ +#} + +{% macro impl_view_materialization(run_outside_transaction_hooks=True) %} + {%- set identifier = model['alias'] -%} + {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} + + {%- set old_relation = adapter.get_relation( + schema=schema, identifier=identifier) -%} + + {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} + + {%- set target_relation = api.Relation.create( + identifier=identifier, schema=schema, + type='view') -%} + + {%- set should_ignore = non_destructive_mode and exists_as_view %} + {%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %} + + {% if run_outside_transaction_hooks %} + -- no transactions on BigQuery + {{ run_hooks(pre_hooks, inside_transaction=False) }} + {% endif %} + + -- `BEGIN` happens here on Snowflake + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- If there's a table with the same name and we weren't told to full refresh, + -- that's an error. If we were told to full refresh, drop it. This behavior differs + -- for Snowflake and BigQuery, so multiple dispatch is used. + {%- if old_relation is not none and old_relation.is_table -%} + {{ handle_existing_table(flags.FULL_REFRESH, non_destructive_mode, old_relation) }} + {%- endif -%} + + -- build model + {% if non_destructive_mode -%} + {% call noop_statement('main', status="PASS", res=None) -%} + -- Not running : non-destructive mode + {{ sql }} + {%- endcall %} + {%- else -%} + {% call statement('main') -%} + {{ create_view_as(target_relation, sql) }} + {%- endcall %} + {%- endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {# + -- Don't commit in non-destructive mode _unless_ there are in-transaction hooks + -- TODO : Figure out some other way of doing this that isn't as fragile + #} + {% if has_transactional_hooks or not should_ignore %} + {{ adapter.commit() }} + {% endif %} + + {% if run_outside_transaction_hooks %} + -- No transactions on BigQuery + {{ run_hooks(post_hooks, inside_transaction=False) }} + {% endif %} +{% endmacro %} + +{% materialization view, adapter='bigquery' -%} + {{ impl_view_materialization(run_outside_transaction_hooks=False) }} +{%- endmaterialization %} + +{% materialization view, adapter='snowflake' -%} + {{ impl_view_materialization() }} +{%- endmaterialization %} diff --git a/dbt/include/global_project/macros/materializations/view/view.sql b/dbt/include/global_project/macros/materializations/view/view.sql index f618963d71f..489bf60fa63 100644 --- a/dbt/include/global_project/macros/materializations/view/view.sql +++ b/dbt/include/global_project/macros/materializations/view/view.sql @@ -61,8 +61,6 @@ {%- endcall %} {%- endif %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - -- cleanup {% if not should_ignore -%} -- move the existing view out of the way @@ -72,6 +70,8 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {%- endif %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + {# -- Don't commit in non-destructive mode _unless_ there are in-transaction hooks -- TODO : Figure out some other way of doing this that isn't as fragile diff --git a/dbt/utils.py b/dbt/utils.py index e167451da7b..c7f225d1251 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -83,24 +83,6 @@ def compiler_warning(model, msg, resource_type='model'): ) -def model_immediate_name(model, non_destructive): - """ - Returns the name of the model relation within the transaction. This is - useful for referencing the model in pre or post hooks. Non-destructive - models aren't created with temp suffixes, nor are incremental models or - seeds. - """ - - model_name = model['alias'] - is_incremental = (get_materialization(model) == 'incremental') - is_seed = is_type(model, 'seed') - - if non_destructive or is_incremental or is_seed: - return model_name - else: - return "{}__dbt_tmp".format(model_name) - - def find_operation_by_name(flat_graph, target_name, target_package): return find_by_name(flat_graph, target_name, target_package, 'macros', [NodeType.Operation]) diff --git a/test/integration/013_context_var_tests/test_context_vars.py b/test/integration/013_context_var_tests/test_context_vars.py index ff3f7c35f3c..0f3f5008483 100644 --- a/test/integration/013_context_var_tests/test_context_vars.py +++ b/test/integration/013_context_var_tests/test_context_vars.py @@ -92,11 +92,11 @@ def test_env_vars_dev(self): self.assertEqual( ctx['this'], - '"{}"."context__dbt_tmp"'.format(self.unique_schema())) + '"{}"."context"'.format(self.unique_schema())) self.assertEqual(ctx['this.name'], 'context') self.assertEqual(ctx['this.schema'], self.unique_schema()) - self.assertEqual(ctx['this.table'], 'context__dbt_tmp') + self.assertEqual(ctx['this.table'], 'context') self.assertEqual(ctx['target.dbname'], 'dbt') self.assertEqual(ctx['target.host'], 'database') @@ -118,11 +118,11 @@ def test_env_vars_prod(self): self.assertEqual( ctx['this'], - '"{}"."context__dbt_tmp"'.format(self.unique_schema())) + '"{}"."context"'.format(self.unique_schema())) self.assertEqual(ctx['this.name'], 'context') self.assertEqual(ctx['this.schema'], self.unique_schema()) - self.assertEqual(ctx['this.table'], 'context__dbt_tmp') + self.assertEqual(ctx['this.table'], 'context') self.assertEqual(ctx['target.dbname'], 'dbt') self.assertEqual(ctx['target.host'], 'database') diff --git a/test/integration/036_snowflake_view_dependency_test/data/people.csv b/test/integration/036_snowflake_view_dependency_test/data/people.csv new file mode 100644 index 00000000000..36a69d8ca27 --- /dev/null +++ b/test/integration/036_snowflake_view_dependency_test/data/people.csv @@ -0,0 +1,4 @@ +id,name +1,Drew +2,Jake +3,Connor diff --git a/test/integration/036_snowflake_view_dependency_test/models/.gitkeep b/test/integration/036_snowflake_view_dependency_test/models/.gitkeep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/integration/036_snowflake_view_dependency_test/models/base_table.sql b/test/integration/036_snowflake_view_dependency_test/models/base_table.sql new file mode 100644 index 00000000000..23079234f9b --- /dev/null +++ b/test/integration/036_snowflake_view_dependency_test/models/base_table.sql @@ -0,0 +1,9 @@ + +{{ config(materialized='table') }} + +select * + {% if var('add_table_field', False) %} + , 1 as new_field + {% endif %} + +from {{ ref('people') }} diff --git a/test/integration/036_snowflake_view_dependency_test/models/dependent_model.sql b/test/integration/036_snowflake_view_dependency_test/models/dependent_model.sql new file mode 100644 index 00000000000..9fef9d88f12 --- /dev/null +++ b/test/integration/036_snowflake_view_dependency_test/models/dependent_model.sql @@ -0,0 +1,8 @@ + +{% if var('dependent_type', 'view') == 'view' %} + {{ config(materialized='view') }} +{% else %} + {{ config(materialized='table') }} +{% endif %} + +select * from {{ ref('base_table') }} diff --git a/test/integration/036_snowflake_view_dependency_test/test_view_binding_dependency.py b/test/integration/036_snowflake_view_dependency_test/test_view_binding_dependency.py new file mode 100644 index 00000000000..66aa5529a49 --- /dev/null +++ b/test/integration/036_snowflake_view_dependency_test/test_view_binding_dependency.py @@ -0,0 +1,85 @@ +from test.integration.base import DBTIntegrationTest, use_profile + +class TestSnowflakeLateBindingViewDependency(DBTIntegrationTest): + + @property + def schema(self): + return "snowflake_view_dependency_test_036" + + @property + def models(self): + return "test/integration/036_snowflake_view_dependency_test/models" + + @property + def project_config(self): + return { + "data-paths": ["test/integration/036_snowflake_view_dependency_test/data"], + "quoting": { + "schema": False, + "identifier": False + } + } + + """ + Snowflake views are not bound to the relations they select from. A Snowflake view + can have entirely invalid SQL if, for example, the table it selects from is dropped + and recreated with a different schema. In these scenarios, Snowflake will raise an error if: + 1) The view is queried + 2) The view is altered + + dbt's logic should avoid running these types of queries against views in situations + where they _may_ have invalid definitions. These tests assert that views are handled + correctly in various different scenarios + """ + + @use_profile('snowflake') + def test__snowflake__changed_table_schema_for_downstream_view(self): + results = self.run_dbt(["seed"]) + self.assertEqual(len(results), 1) + + results = self.run_dbt(["run"]) + self.assertEqual(len(results), 2) + self.assertManyTablesEqual(["PEOPLE", "BASE_TABLE", "DEPENDENT_MODEL"]) + + # Change the schema of base_table, assert that dependent_model doesn't fail + results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: view}"]) + self.assertEqual(len(results), 2) + self.assertManyTablesEqual(["BASE_TABLE", "DEPENDENT_MODEL"]) + + """ + This test is similar to the one above, except the downstream model starts as a view, and + then is changed to be a table. This checks that the table materialization does not + errantly rename a view that might have an invalid definition, which would cause an error + """ + @use_profile('snowflake') + def test__snowflake__changed_table_schema_for_downstream_view_changed_to_table(self): + results = self.run_dbt(["seed"]) + self.assertEqual(len(results), 1) + + results = self.run_dbt(["run"]) + self.assertEqual(len(results), 2) + self.assertManyTablesEqual(["PEOPLE", "BASE_TABLE", "DEPENDENT_MODEL"]) + + expected_types = { + 'base_table': 'table', + 'dependent_model': 'view' + } + + # ensure that the model actually was materialized as a table + for result in results: + node_name = result.node.name + self.assertEqual(result.node.config['materialized'], expected_types[node_name]) + + results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: table}"]) + self.assertEqual(len(results), 2) + self.assertManyTablesEqual(["BASE_TABLE", "DEPENDENT_MODEL"]) + + expected_types = { + 'base_table': 'table', + 'dependent_model': 'table' + } + + # ensure that the model actually was materialized as a table + for result in results: + node_name = result.node.name + self.assertEqual(result.node.config['materialized'], expected_types[node_name])