Skip to content

Commit

Permalink
Add support for incremental models
Browse files Browse the repository at this point in the history
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
  • Loading branch information
findinpath committed Oct 7, 2021
1 parent 8872312 commit 3617a2c
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 8 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ on your Trino instance.
Due to the nature of Trino, not all core `dbt` functionality is supported.
The following features of dbt are not implemented in `dbt-trino`:
- Snapshot
- Incremental models

Also, note that upper or mixed case schema names will cause catalog queries to fail.
Please only use lower case schema names with this adapter.
Expand All @@ -100,6 +99,17 @@ hive.allow-drop-table=true
hive.allow-rename-table=true
```

#### Incremental models

The incremental strategy currently supported by this adapter is to append new records
without updating/overwriting any existing data from the target model.

```
{{
config(materialized = 'incremental')
}}
```

#### Use table properties to configure connector specifics

Trino connectors use table properties to configure connector specifics.
Expand Down
44 changes: 40 additions & 4 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
{% macro dbt_trino_get_append_sql(tmp_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into {{ target_relation }}
select {{dest_cols_csv}} from {{ tmp_relation.include(database=false, schema=false) }};

drop table if exists {{ tmp_relation }};

{% endmacro %}

{% materialization incremental, adapter='trino' -%}
{{ exceptions.raise_not_implemented(
'incremental materialization not implemented for '+adapter.type())
}}
{% endmaterialization %}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %}
{% do run_query(drop_tmp_relation_sql) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_trino_get_append_sql(tmp_relation, target_relation) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
17 changes: 17 additions & 0 deletions docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{
config(materialized = 'incremental')
}}

with orders as (

select * from {{ ref('stg_orders') }}

)

select distinct order_date
from orders

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where order_date > (select max(order_date) from {{ this }})
{% endif %}
10 changes: 10 additions & 0 deletions docker/dbt/jaffle_shop/models/marts/core/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ models:
- name: total_order_amount
description: Total value (AUD) of a customer's orders

- name: dim_order_dates
description: This table is built in an incremental fashion and contains all the dates on which orders have been placed.

columns:
- name: order_date
description: A unique order date
tests:
- unique
- not_null

- name: fct_orders
description: This table has basic information about orders, as well as some derived facts based on payments

Expand Down
7 changes: 6 additions & 1 deletion docker/run_tests.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
cd "$(dirname "${BASH_SOURCE[0]}")"

set -exo pipefail
# run the dim_order_dates model two times in order to test incremental functionality
docker run \
--network="dbt-net" \
-v $PWD/dbt:/root/.dbt \
dbt-trino \
"cd /jaffle_shop && dbt seed && dbt run && dbt test"
"cd /jaffle_shop \
&& dbt seed \
&& dbt run \
&& dbt run --model dim_order_dates \
&& dbt test"
4 changes: 2 additions & 2 deletions test/integration/trino.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ sequences:
test_dbt_empty: empty
test_dbt_base: base
test_dbt_ephemeral: ephemeral
# no incrementals, no snapshots
# test_dbt_incremental: incremental
test_dbt_incremental: incremental
# no snapshots
# test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
# test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
test_dbt_data_test: data_test
Expand Down

0 comments on commit 3617a2c

Please sign in to comment.