From 3617a2c5b15c198a00de1b6c088777c20851be6e Mon Sep 17 00:00:00 2001 From: findinpath Date: Tue, 21 Sep 2021 08:46:06 +0200 Subject: [PATCH] Add support for incremental models The incremental strategy supported is to insert new records into target table, without updating or overwriting. Resolves: #1 --- README.md | 12 ++++- .../macros/materializations/incremental.sql | 44 +++++++++++++++++-- .../models/marts/core/dim_order_dates.sql | 17 +++++++ .../jaffle_shop/models/marts/core/schema.yml | 10 +++++ docker/run_tests.bash | 7 ++- test/integration/trino.dbtspec | 4 +- 6 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql diff --git a/README.md b/README.md index 906081d3..588e482b 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,6 @@ on your Trino instance. Due to the nature of Trino, not all core `dbt` functionality is supported. The following features of dbt are not implemented in `dbt-trino`: - Snapshot -- Incremental models Also, note that upper or mixed case schema names will cause catalog queries to fail. Please only use lower case schema names with this adapter. @@ -100,6 +99,17 @@ hive.allow-drop-table=true hive.allow-rename-table=true ``` +#### Incremental models + +The incremental strategy currently supported by this adapter is to append new records +without updating/overwriting any existing data from the target model. + +``` +{{ + config(materialized = 'incremental') +}} +``` + #### Use table properties to configure connector specifics Trino connectors use table properties to configure connector specifics. diff --git a/dbt/include/trino/macros/materializations/incremental.sql b/dbt/include/trino/macros/materializations/incremental.sql index 42ed148f..01548ffd 100644 --- a/dbt/include/trino/macros/materializations/incremental.sql +++ b/dbt/include/trino/macros/materializations/incremental.sql @@ -1,6 +1,42 @@ +{% macro dbt_trino_get_append_sql(tmp_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into {{ target_relation }} + select {{dest_cols_csv}} from {{ tmp_relation.include(database=false, schema=false) }}; + + drop table if exists {{ tmp_relation }}; + +{% endmacro %} {% materialization incremental, adapter='trino' -%} - {{ exceptions.raise_not_implemented( - 'incremental materialization not implemented for '+adapter.type()) - }} -{% endmaterialization %} + + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + + {{ run_hooks(pre_hooks) }} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %} + {% do run_query(drop_tmp_relation_sql) %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% set build_sql = dbt_trino_get_append_sql(tmp_relation, target_relation) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {%- endcall -%} + + {{ run_hooks(post_hooks) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} diff --git a/docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql b/docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql new file mode 100644 index 00000000..2a6be756 --- /dev/null +++ b/docker/dbt/jaffle_shop/models/marts/core/dim_order_dates.sql @@ -0,0 +1,17 @@ +{{ + config(materialized = 'incremental') +}} + +with orders as ( + + select * from {{ ref('stg_orders') }} + +) + +select distinct order_date +from orders + +{% if is_incremental() %} + -- this filter will only be applied on an incremental run + where order_date > (select max(order_date) from {{ this }}) +{% endif %} diff --git a/docker/dbt/jaffle_shop/models/marts/core/schema.yml b/docker/dbt/jaffle_shop/models/marts/core/schema.yml index c85fbf36..dfcc80d2 100644 --- a/docker/dbt/jaffle_shop/models/marts/core/schema.yml +++ b/docker/dbt/jaffle_shop/models/marts/core/schema.yml @@ -32,6 +32,16 @@ models: - name: total_order_amount description: Total value (AUD) of a customer's orders + - name: dim_order_dates + description: This table is built in an incremental fashion and contains all the dates on which orders have been placed. + + columns: + - name: order_date + description: A unique order date + tests: + - unique + - not_null + - name: fct_orders description: This table has basic information about orders, as well as some derived facts based on payments diff --git a/docker/run_tests.bash b/docker/run_tests.bash index 8bd79fd3..85077b4f 100755 --- a/docker/run_tests.bash +++ b/docker/run_tests.bash @@ -4,8 +4,13 @@ cd "$(dirname "${BASH_SOURCE[0]}")" set -exo pipefail +# run the dim_order_dates model two times in order to test incremental functionality docker run \ --network="dbt-net" \ -v $PWD/dbt:/root/.dbt \ dbt-trino \ - "cd /jaffle_shop && dbt seed && dbt run && dbt test" + "cd /jaffle_shop \ + && dbt seed \ + && dbt run \ + && dbt run --model dim_order_dates \ + && dbt test" diff --git a/test/integration/trino.dbtspec b/test/integration/trino.dbtspec index 5081873a..778c89dd 100644 --- a/test/integration/trino.dbtspec +++ b/test/integration/trino.dbtspec @@ -21,8 +21,8 @@ sequences: test_dbt_empty: empty test_dbt_base: base test_dbt_ephemeral: ephemeral - # no incrementals, no snapshots - # test_dbt_incremental: incremental + test_dbt_incremental: incremental + # no snapshots # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols test_dbt_data_test: data_test