diff --git a/.changes/unreleased/Features-20230307-103324.yaml b/.changes/unreleased/Features-20230307-103324.yaml new file mode 100644 index 00000000..73c7bac2 --- /dev/null +++ b/.changes/unreleased/Features-20230307-103324.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Add Materialized View materialization +time: 2023-03-07T10:33:24.990266+01:00 +custom: + Author: Jay-code0 damian3031 + Issue: "258" + PR: "260" diff --git a/dbt/include/trino/macros/materializations/materialized_view.sql b/dbt/include/trino/macros/materializations/materialized_view.sql new file mode 100644 index 00000000..a99f0627 --- /dev/null +++ b/dbt/include/trino/macros/materializations/materialized_view.sql @@ -0,0 +1,74 @@ +{% materialization materialized_view, adapter="trino" %} + + {% set full_refresh_mode = (should_full_refresh()) %} + + {%- set target_relation = this %} + {%- set existing_relation = load_relation(this) -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + + {% if existing_relation is none %} + {{ log("No existing materialized view found, creating materialized view...", info=true) }} + {%- set build_sql = create_materialized_view_as(target_relation) %} + + {% elif full_refresh_mode or existing_relation.type != "materializedview" %} + {{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }} + {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} + {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% do adapter.drop_relation(backup_relation) %} + + {% do adapter.rename_relation(existing_relation, backup_relation) %} + {%- set build_sql = create_materialized_view_as(target_relation) %} + {% do to_drop.append(backup_relation) %} + + {% else %} + {{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }} + {%- set build_sql = refresh_materialized_view(target_relation) %} + {% endif %} + + {% if build_sql %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {% do persist_docs(target_relation, model) %} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% else %} + + {{ store_result('main', 'SKIP') }} + + {% endif %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} + + +{%- macro create_materialized_view_as(target_relation) -%} + {%- set sqlcode = "CREATE OR REPLACE MATERIALIZED VIEW " ~ target_relation ~ " AS " ~ sql %} + {{ sqlcode }} +{%- endmacro -%} + + +{%- macro refresh_materialized_view(target_relation) -%} + {%- set sqlcode = "REFRESH MATERIALIZED VIEW " ~ target_relation %} + {{ sqlcode }} +{%- endmacro -%} diff --git a/tests/functional/adapter/materialization/test_materialized_view.py b/tests/functional/adapter/materialization/test_materialized_view.py index df157fdd..6dff09f1 100644 --- a/tests/functional/adapter/materialization/test_materialized_view.py +++ b/tests/functional/adapter/materialization/test_materialized_view.py @@ -1,5 +1,12 @@ import pytest -from dbt.tests.util import check_relation_types, run_dbt +from dbt.tests.util import ( + check_relation_types, + check_relations_equal, + run_dbt, + run_sql_with_adapter, +) + +from tests.functional.adapter.materialization.fixtures import model_sql, seed_csv @pytest.mark.iceberg @@ -40,3 +47,129 @@ def test_mv_is_dropped_when_model_runs_view(self, project): "my_table": "table", } check_relation_types(project.adapter, expected) + + +@pytest.mark.iceberg +class TestIcebergMaterializedViewCreate: + # Configuration in dbt_project.yml + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "mv_test", + "models": { + "+materialized": "materialized_view", + }, + "seeds": { + "+column_types": {"some_date": "timestamp(6)"}, + }, + } + + # Everything that goes in the "seeds" directory + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + # Everything that goes in the "models" directory + @pytest.fixture(scope="class") + def models(self): + return { + "mat_view.sql": model_sql, + } + + def test_mv_is_created_and_refreshed(self, project): + catalog = project.adapter.config.credentials.database + schema = project.adapter.config.credentials.schema + + # Seed seed + results = run_dbt(["seed"], expect_pass=True) + assert len(results) == 1 + + # Create MV + results = run_dbt(["run"], expect_pass=True) + assert len(results) == 1 + + # Check if the data was loaded correctly + check_relations_equal(project.adapter, ["seed", "mat_view"]) + + # Add one row to seed + sql = f"""INSERT INTO {catalog}.{schema}.seed + VALUES (5, 'Mateo', timestamp '2014-09-07 17:04:27')""" + run_sql_with_adapter(project.adapter, sql, fetch="all") + + # Refresh MV + results = run_dbt(["run"], expect_pass=True) + assert len(results) == 1 + + # Check if one row is added in MV + sql = f"select * from {catalog}.{schema}.mat_view" + results = run_sql_with_adapter(project.adapter, sql, fetch="all") + assert len(results) == 5 + + +@pytest.mark.iceberg +class TestIcebergMaterializedViewDropAndCreate: + # Configuration in dbt_project.yml + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "mv_test", + "models": { + "+materialized": "materialized_view", + "+full_refresh": True, + }, + "seeds": { + "+column_types": {"some_date": "timestamp(6)"}, + }, + } + + # Everything that goes in the "seeds" directory + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + # Everything that goes in the "models" directory + @pytest.fixture(scope="class") + def models(self): + return { + "mat_view_overrides_table.sql": model_sql, + "mat_view_overrides_view.sql": model_sql, + "mat_view_overrides_materializedview.sql": model_sql, + } + + def test_mv_overrides_relation(self, project): + # Create relation with same name + project.adapter.execute("CREATE VIEW mat_view_overrides_view AS SELECT 3 c") + project.adapter.execute("CREATE TABLE mat_view_overrides_table AS SELECT 4 d") + project.adapter.execute( + "CREATE MATERIALIZED VIEW mat_view_overrides_materializedview AS SELECT 5 e" + ) + + # Seed seed + results = run_dbt(["seed"], expect_pass=True) + assert len(results) == 1 + + # Create MVs, already existing relations with same name should be dropped + results = run_dbt(["run"], expect_pass=True) + assert len(results) == 3 + + # Check if MVs were created correctly + expected = { + "mat_view_overrides_view": "materializedview", + "mat_view_overrides_table": "materializedview", + "mat_view_overrides_materializedview": "materializedview", + } + check_relation_types(project.adapter, expected) + + check_relations_equal( + project.adapter, + [ + "seed", + "mat_view_overrides_view", + "mat_view_overrides_table", + "mat_view_overrides_materializedview", + ], + )