Skip to content

Commit

Permalink
Add materialized view materialization
Browse files Browse the repository at this point in the history
Co-authored-by: Jay-code0
  • Loading branch information
damian3031 committed Mar 8, 2023
1 parent 7f72784 commit e1c4cde
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20230307-103324.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add Materialized View materialization
time: 2023-03-07T10:33:24.990266+01:00
custom:
Author: Jay-code0 damian3031
Issue: "258"
PR: "260"
74 changes: 74 additions & 0 deletions dbt/include/trino/macros/materializations/materialized_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
{% materialization materialized_view, adapter="trino" %}

{% set full_refresh_mode = (should_full_refresh()) %}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{{ log("No existing materialized view found, creating materialized view...", info=true) }}
{%- set build_sql = create_materialized_view_as(target_relation) %}

{% elif full_refresh_mode or existing_relation.type != "materializedview" %}
{{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}

{% do adapter.rename_relation(existing_relation, backup_relation) %}
{%- set build_sql = create_materialized_view_as(target_relation) %}
{% do to_drop.append(backup_relation) %}

{% else %}
{{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }}
{%- set build_sql = refresh_materialized_view(target_relation) %}
{% endif %}

{% if build_sql %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% else %}

{{ store_result('main', 'SKIP') }}

{% endif %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}


{%- macro create_materialized_view_as(target_relation) -%}
{%- set sqlcode = "CREATE OR REPLACE MATERIALIZED VIEW " ~ target_relation ~ " AS " ~ sql %}
{{ sqlcode }}
{%- endmacro -%}


{%- macro refresh_materialized_view(target_relation) -%}
{%- set sqlcode = "REFRESH MATERIALIZED VIEW " ~ target_relation %}
{{ sqlcode }}
{%- endmacro -%}
135 changes: 134 additions & 1 deletion tests/functional/adapter/materialization/test_materialized_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pytest
from dbt.tests.util import check_relation_types, run_dbt
from dbt.tests.util import (
check_relation_types,
check_relations_equal,
run_dbt,
run_sql_with_adapter,
)

from tests.functional.adapter.materialization.fixtures import model_sql, seed_csv


@pytest.mark.iceberg
Expand Down Expand Up @@ -40,3 +47,129 @@ def test_mv_is_dropped_when_model_runs_view(self, project):
"my_table": "table",
}
check_relation_types(project.adapter, expected)


@pytest.mark.iceberg
class TestIcebergMaterializedViewCreate:
# Configuration in dbt_project.yml
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "mv_test",
"models": {
"+materialized": "materialized_view",
},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# Everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# Everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"mat_view.sql": model_sql,
}

def test_mv_is_created_and_refreshed(self, project):
catalog = project.adapter.config.credentials.database
schema = project.adapter.config.credentials.schema

# Seed seed
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

# Create MV
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

# Check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "mat_view"])

# Add one row to seed
sql = f"""INSERT INTO {catalog}.{schema}.seed
VALUES (5, 'Mateo', timestamp '2014-09-07 17:04:27')"""
run_sql_with_adapter(project.adapter, sql, fetch="all")

# Refresh MV
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

# Check if one row is added in MV
sql = f"select * from {catalog}.{schema}.mat_view"
results = run_sql_with_adapter(project.adapter, sql, fetch="all")
assert len(results) == 5


@pytest.mark.iceberg
class TestIcebergMaterializedViewDropAndCreate:
# Configuration in dbt_project.yml
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "mv_test",
"models": {
"+materialized": "materialized_view",
"+full_refresh": True,
},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# Everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# Everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"mat_view_overrides_table.sql": model_sql,
"mat_view_overrides_view.sql": model_sql,
"mat_view_overrides_materializedview.sql": model_sql,
}

def test_mv_overrides_relation(self, project):
# Create relation with same name
project.adapter.execute("CREATE VIEW mat_view_overrides_view AS SELECT 3 c")
project.adapter.execute("CREATE TABLE mat_view_overrides_table AS SELECT 4 d")
project.adapter.execute(
"CREATE MATERIALIZED VIEW mat_view_overrides_materializedview AS SELECT 5 e"
)

# Seed seed
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

# Create MVs, already existing relations with same name should be dropped
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 3

# Check if MVs were created correctly
expected = {
"mat_view_overrides_view": "materializedview",
"mat_view_overrides_table": "materializedview",
"mat_view_overrides_materializedview": "materializedview",
}
check_relation_types(project.adapter, expected)

check_relations_equal(
project.adapter,
[
"seed",
"mat_view_overrides_view",
"mat_view_overrides_table",
"mat_view_overrides_materializedview",
],
)

0 comments on commit e1c4cde

Please sign in to comment.