Skip to content

Commit

Permalink
Ability to change the partition key #45
Browse files Browse the repository at this point in the history
Co-authored-by: Oscar <[email protected]>
  • Loading branch information
ilias1111 and OscarSnowPlow committed Dec 4, 2024
1 parent 63264ac commit edfde4c
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 56 deletions.
1 change: 1 addition & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ vars:
snowplow__dev_target_name: 'dev'
snowplow__allow_refresh: false
snowplow__session_timestamp: 'collector_tstamp'
snowplow__partition_tstamp: 'collector_tstamp' # This is the column that will be used to partition the data in the derived tables, it should be a timestamp column that is present in the data
# Variables - Databricks Only
# Add the following variable to your dbt project's dbt_project.yml file
# Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic')
Expand Down
1 change: 1 addition & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vars:
snowplow__backfill_limit_days: 2
snowplow__derived_tstamp_partitioned: false
snowplow__atomic_schema: "{{ target.schema ~ 'sp_normalize_int_test' }}"
snowplow__partition_tstamp: "load_tstamp"

models:
snowplow_normalize_integration_tests:
Expand Down
18 changes: 9 additions & 9 deletions integration_tests/macros/test_normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ It runs 9 tests:
{% macro databricks__test_normalize_events() %}
{% set expected_dict = {
"flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
"flat_cols_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
} %}
{% set results_dict ={
Expand Down
2 changes: 1 addition & 1 deletion macros/normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ select
event_id
, collector_tstamp
{% if target.type in ['databricks', 'spark'] -%}
, DATE(collector_tstamp) as collector_tstamp_date
, DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date
{%- endif %}
-- Flat columns from event table
{% if flat_cols|length > 0 %}
Expand Down
14 changes: 14 additions & 0 deletions macros/rename_partition_tstamp_date.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro rename_partition_tstamp_date() %}
{{ return(adapter.dispatch('rename_partition_tstamp_date', 'snowplow_normalize')()) }}
{% endmacro %}

{% macro default__rename_partition_tstamp_date() %}


{% set rename_partition_tstamp_date = var('snowplow__partition_tstamp')~"_date" %}

{{ log("Rename partition to: " ~ rename_partition_tstamp_date)}}

{{ return(rename_partition_tstamp_date) }}

{% endmacro %}
2 changes: 2 additions & 0 deletions macros/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,5 @@ macros:
- name: text
type: string
description: the text to convert to snakecase
- name: rename_partition_tstamp_date
description: Takes the partition_tstamp column and renames it to add the date suffix, currently only used in Databricks
16 changes: 8 additions & 8 deletions utils/snowplow_normalize_model_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}}, databricks_val='collector_tstamp_date'),
}}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={{
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down Expand Up @@ -251,11 +251,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "unique_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}}, databricks_val='collector_tstamp_date'),
}}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={{
'delta.autoOptimize.optimizeWrite' : 'true',
Expand All @@ -270,9 +270,9 @@
filtered_model_content += f"""
select
event_id
, collector_tstamp
, {{{{var("snowplow__partition_tstamp")}}}}
{{% if target.type in ['databricks', 'spark'] -%}}
, DATE(collector_tstamp) as collector_tstamp_date
, DATE({{{{var("snowplow__partition_tstamp")}}}}) as {{{{var("snowplow__partition_tstamp")}}}}_date
{{%- endif %}}
, event_name
, '{model}' as event_table_name
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name2_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name3_2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name4_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name5_9.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name6_6.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name7_6.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/event_name1_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_tstamp"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_tstamp"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=rename_partition_tstamp_date()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
Loading

0 comments on commit edfde4c

Please sign in to comment.