Skip to content

Commit

Permalink
Add materialized view materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
damian3031 committed Mar 7, 2023
1 parent 7f72784 commit 8054838
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20230307-103324.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add Materialized View materialization
time: 2023-03-07T10:33:24.990266+01:00
custom:
Author: Jay-code0 damian3031
Issue: "258"
PR: "260"
41 changes: 41 additions & 0 deletions dbt/include/trino/macros/materializations/materialized_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{% materialization materialized_view, adapter="trino" %}
{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{{ log("No existing materialized view found, creating materialized view...", info=true) }}
{%- set build_sql = build_materialized_view(target_relation) %}

{% elif existing_relation.type != "materializedview" %}
{{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }}
{{ adapter.drop_relation(existing_relation) }}
{%- set build_sql = build_materialized_view(target_relation) %}

{% elif existing_relation.type == "materializedview" %}
{{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }}
{%- set build_sql = refresh_materialized_view(target_relation) %}
{% endif %}

{#-- build model #}
{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}


{%- macro build_materialized_view(target_relation) -%}
{%- set sqlcode= "CREATE OR REPLACE MATERIALIZED VIEW " ~ target_relation ~ " AS " ~ sql %}
{{ sqlcode }}
{%- endmacro -%}


{%- macro refresh_materialized_view(target_relation) -%}
{%- set sqlcode= "REFRESH MATERIALIZED VIEW " ~ target_relation %}
{{ sqlcode }}
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import pytest
from dbt.tests.util import check_relation_types, run_dbt
from dbt.tests.util import check_relation_types, run_dbt, run_sql_with_adapter, check_relations_equal

from tests.functional.adapter.materialization.fixtures import (
model_sql,
seed_csv,
)


@pytest.mark.iceberg
Expand Down Expand Up @@ -40,3 +45,92 @@ def test_mv_is_dropped_when_model_runs_view(self, project):
"my_table": "table",
}
check_relation_types(project.adapter, expected)


@pytest.mark.iceberg
class TestIcebergMaterializedViewCreate:
# Configuration in dbt_project.yml
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "mv_test",
"models": {
"+materialized": "materialized_view",
},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# Everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# Everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"mat_view.sql": model_sql,
}


def test_mv_is_created_and_refreshed(self, project):
catalog = project.adapter.config.credentials.database
schema = project.adapter.config.credentials.schema

# Seed seed
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

# Create MV
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

# Check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "mat_view"])

# Add one row to seed
sql = f"""INSERT INTO {catalog}.{schema}.seed
VALUES (5, 'Mateo', timestamp '2014-09-07 17:04:27')""".strip()
run_sql_with_adapter(project.adapter, sql, fetch="all")

# Refresh MV
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

# Check if one row is added in MV
sql = f"""select * from {catalog}.{schema}.mat_view
""".strip()
results = run_sql_with_adapter(project.adapter, sql, fetch="all")
assert len(results) == 5

# Cleanup
project.adapter.execute("DROP MATERIALIZED VIEW mat_view")

def mv_overrides_relation(self, project, relation_type):
# Create relation with same name
project.adapter.execute(f"CREATE {relation_type} mat_view AS SELECT 3 c")
expected = {"mat_view": relation_type}
check_relation_types(project.adapter, expected)

# Seed seed
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

# Create MV, table/view should be dropped beforehand
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1
expected = {"mat_view": "materializedview"}
check_relation_types(project.adapter, expected)

# Cleanup
project.adapter.execute("DROP MATERIALIZED VIEW mat_view")

def test_mv_overrides_table2(self, project):
self.mv_overrides_relation(project, 'table')

def test_mv_overrides_view2(self, project):
self.mv_overrides_relation(project, 'view')

0 comments on commit 8054838

Please sign in to comment.