Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for incremental models #10

Merged
merged 1 commit into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ on your Trino instance.
Due to the nature of Trino, not all core `dbt` functionality is supported.
The following features of dbt are not implemented in `dbt-trino`:
- Snapshot
- Incremental models

Also, note that upper or mixed case schema names will cause catalog queries to fail.
Please only use lower case schema names with this adapter.
Expand All @@ -100,6 +99,17 @@ hive.allow-drop-table=true
hive.allow-rename-table=true
```

#### Incremental models

The incremental strategy currently supported by this adapter is to append new records
without updating/overwriting any existing data from the target model.
findinpath marked this conversation as resolved.
Show resolved Hide resolved

```
{{
config(materialized = 'incremental')
}}
findinpath marked this conversation as resolved.
Show resolved Hide resolved
```

#### Use table properties to configure connector specifics

Trino connectors use table properties to configure connector specifics.
Expand Down
44 changes: 40 additions & 4 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
{% macro dbt_trino_get_append_sql(tmp_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into {{ target_relation }}
select {{dest_cols_csv}} from {{ tmp_relation.include(database=false, schema=false) }};

drop table if exists {{ tmp_relation }};

{% endmacro %}

{% materialization incremental, adapter='trino' -%}
{{ exceptions.raise_not_implemented(
'incremental materialization not implemented for '+adapter.type())
}}
{% endmaterialization %}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %}
{% do run_query(drop_tmp_relation_sql) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_trino_get_append_sql(tmp_relation, target_relation) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
17 changes: 17 additions & 0 deletions docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{
config(materialized = 'incremental')
}}

with orders as (

select * from {{ ref('stg_orders') }}

)

select distinct order_date
from orders

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where order_date > (select max(order_date) from {{ this }})
{% endif %}
10 changes: 10 additions & 0 deletions docker/dbt/jaffle_shop/models/marts/core/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ models:
- name: total_order_amount
description: Total value (AUD) of a customer's orders

- name: dim_order_dates
description: This table is built in an incremental fashion and contains all the dates on which orders have been placed.
findinpath marked this conversation as resolved.
Show resolved Hide resolved

columns:
- name: order_date
description: A unique order date
tests:
- unique
- not_null

- name: fct_orders
description: This table has basic information about orders, as well as some derived facts based on payments

Expand Down
7 changes: 6 additions & 1 deletion docker/run_tests.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
cd "$(dirname "${BASH_SOURCE[0]}")"

set -exo pipefail
# run the dim_order_dates model two times in order to test incremental functionality
docker run \
--network="dbt-net" \
-v $PWD/dbt:/root/.dbt \
dbt-trino \
"cd /jaffle_shop && dbt seed && dbt run && dbt test"
"cd /jaffle_shop \
&& dbt seed \
&& dbt run \
&& dbt run --model dim_order_dates \
findinpath marked this conversation as resolved.
Show resolved Hide resolved
&& dbt test"
4 changes: 2 additions & 2 deletions test/integration/trino.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ sequences:
test_dbt_empty: empty
test_dbt_base: base
test_dbt_ephemeral: ephemeral
# no incrementals, no snapshots
# test_dbt_incremental: incremental
test_dbt_incremental: incremental
# no snapshots
# test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
# test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
test_dbt_data_test: data_test
Expand Down