From edfde4c48f128342c7488ca9d8af584e460add7c Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 25 Nov 2024 11:36:24 +0200 Subject: [PATCH] Ability to change the partition key #45 Co-authored-by: Oscar --- dbt_project.yml | 1 + integration_tests/dbt_project.yml | 1 + .../macros/test_normalize_events.sql | 18 +++++----- macros/normalize_events.sql | 2 +- macros/rename_partition_tstamp_date.sql | 14 ++++++++ macros/schema.yml | 2 ++ utils/snowplow_normalize_model_gen.py | 16 ++++----- utils/tests/expected/custom_table_name2_1.sql | 6 ++-- utils/tests/expected/custom_table_name3_2.sql | 6 ++-- utils/tests/expected/custom_table_name4_1.sql | 6 ++-- utils/tests/expected/custom_table_name5_9.sql | 6 ++-- utils/tests/expected/custom_table_name6_6.sql | 6 ++-- utils/tests/expected/custom_table_name7_6.sql | 6 ++-- utils/tests/expected/event_name1_1.sql | 6 ++-- .../tests/expected/test_normalized_events.sql | 34 +++++++++---------- 15 files changed, 74 insertions(+), 56 deletions(-) create mode 100644 macros/rename_partition_tstamp_date.sql diff --git a/dbt_project.yml b/dbt_project.yml index 00de1bf..3a4158a 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -41,6 +41,7 @@ vars: snowplow__dev_target_name: 'dev' snowplow__allow_refresh: false snowplow__session_timestamp: 'collector_tstamp' + snowplow__partition_tstamp: 'collector_tstamp' # This is the column that will be used to partition the data in the derived tables, it should be a timestamp column that is present in the data # Variables - Databricks Only # Add the following variable to your dbt project's dbt_project.yml file # Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic') diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 96515d6..c92a6fe 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -34,6 +34,7 @@ vars: snowplow__backfill_limit_days: 2 snowplow__derived_tstamp_partitioned: false snowplow__atomic_schema: "{{ target.schema ~ 'sp_normalize_int_test' }}" + snowplow__partition_tstamp: "load_tstamp" models: snowplow_normalize_integration_tests: diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index e40407c..151185a 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -78,15 +78,15 @@ It runs 9 tests: {% macro databricks__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", - "multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" + "flat_cols_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "multiple_base_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", + "multiple_sde_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} {% set results_dict ={ diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index 7e35ca2..b3c86a7 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -195,7 +195,7 @@ select event_id , collector_tstamp {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} -- Flat columns from event table {% if flat_cols|length > 0 %} diff --git a/macros/rename_partition_tstamp_date.sql b/macros/rename_partition_tstamp_date.sql new file mode 100644 index 0000000..408a260 --- /dev/null +++ b/macros/rename_partition_tstamp_date.sql @@ -0,0 +1,14 @@ +{% macro rename_partition_tstamp_date() %} + {{ return(adapter.dispatch('rename_partition_tstamp_date', 'snowplow_normalize')()) }} +{% endmacro %} + +{% macro default__rename_partition_tstamp_date() %} + + + {% set rename_partition_tstamp_date = var('snowplow__partition_tstamp')~"_date" %} + + {{ log("Rename partition to: " ~ rename_partition_tstamp_date)}} + + {{ return(rename_partition_tstamp_date) }} + +{% endmacro %} diff --git a/macros/schema.yml b/macros/schema.yml index 3fc4402..8b2f2cb 100644 --- a/macros/schema.yml +++ b/macros/schema.yml @@ -73,3 +73,5 @@ macros: - name: text type: string description: the text to convert to snakecase + - name: rename_partition_tstamp_date + description: Takes the partition_tstamp column and renames it to add the date suffix, currently only used in Databricks \ No newline at end of file diff --git a/utils/snowplow_normalize_model_gen.py b/utils/snowplow_normalize_model_gen.py index fdbd102..a31e9ad 100644 --- a/utils/snowplow_normalize_model_gen.py +++ b/utils/snowplow_normalize_model_gen.py @@ -195,11 +195,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }}, databricks_val='collector_tstamp_date'), + }}, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={{ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -251,11 +251,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "unique_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }}, databricks_val='collector_tstamp_date'), + }}, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={{ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -270,9 +270,9 @@ filtered_model_content += f""" select event_id - , collector_tstamp + , {{{{var("snowplow__partition_tstamp")}}}} {{% if target.type in ['databricks', 'spark'] -%}} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{{{var("snowplow__partition_tstamp")}}}}) as {{{{var("snowplow__partition_tstamp")}}}}_date {{%- endif %}} , event_name , '{model}' as event_table_name diff --git a/utils/tests/expected/custom_table_name2_1.sql b/utils/tests/expected/custom_table_name2_1.sql index bf3b770..83ffd6c 100644 --- a/utils/tests/expected/custom_table_name2_1.sql +++ b/utils/tests/expected/custom_table_name2_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name3_2.sql b/utils/tests/expected/custom_table_name3_2.sql index b81c68a..c4b3365 100644 --- a/utils/tests/expected/custom_table_name3_2.sql +++ b/utils/tests/expected/custom_table_name3_2.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name4_1.sql b/utils/tests/expected/custom_table_name4_1.sql index 8d12527..e520cb7 100644 --- a/utils/tests/expected/custom_table_name4_1.sql +++ b/utils/tests/expected/custom_table_name4_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name5_9.sql b/utils/tests/expected/custom_table_name5_9.sql index 66d66d6..6553244 100644 --- a/utils/tests/expected/custom_table_name5_9.sql +++ b/utils/tests/expected/custom_table_name5_9.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name6_6.sql b/utils/tests/expected/custom_table_name6_6.sql index b1bde78..dafb91b 100644 --- a/utils/tests/expected/custom_table_name6_6.sql +++ b/utils/tests/expected/custom_table_name6_6.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name7_6.sql b/utils/tests/expected/custom_table_name7_6.sql index 608287f..6003c42 100644 --- a/utils/tests/expected/custom_table_name7_6.sql +++ b/utils/tests/expected/custom_table_name7_6.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/event_name1_1.sql b/utils/tests/expected/event_name1_1.sql index 1f2f83c..fb6040e 100644 --- a/utils/tests/expected/event_name1_1.sql +++ b/utils/tests/expected/event_name1_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/test_normalized_events.sql b/utils/tests/expected/test_normalized_events.sql index d74e64d..04dd4c4 100644 --- a/utils/tests/expected/test_normalized_events.sql +++ b/utils/tests/expected/test_normalized_events.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "unique_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -17,9 +17,9 @@ select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'itsaprefix_event_name1_1' as event_table_name @@ -34,9 +34,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name2_1' as event_table_name @@ -51,9 +51,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name3_2' as event_table_name @@ -68,9 +68,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name4_1' as event_table_name @@ -85,9 +85,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name5_9' as event_table_name @@ -102,9 +102,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name6_6' as event_table_name @@ -119,9 +119,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name7_6' as event_table_name