From 541e2e0e1460a3024e7bd5aa11533bda215f1690 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Thu, 14 Nov 2024 17:30:12 +0200 Subject: [PATCH] Prepare the codebase for spark release (#46) --- dbt_project.yml | 2 +- .../.scripts/integration_tests.sh | 12 ++--- integration_tests/ci/profiles.yml | 19 ++++---- integration_tests/dbt_project.yml | 8 +++- .../macros/test_normalize_events.sql | 47 +++++++++++++++++++ integration_tests/macros/test_users_table.sql | 43 +++++++++++++++++ .../spark/int_test_dummy_model.sql | 12 +++++ .../spark/snowplow_normalize_stg.sql | 10 ++++ macros/normalize_events.sql | 2 +- macros/users_table.sql | 2 +- ...nowplow_normalize_base_events_this_run.sql | 46 +++++++++++------- packages.yml | 2 +- 12 files changed, 168 insertions(+), 37 deletions(-) create mode 100644 integration_tests/models/dummy_model/spark/int_test_dummy_model.sql create mode 100644 integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql diff --git a/dbt_project.yml b/dbt_project.yml index 8904d0e..00de1bf 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -63,7 +63,7 @@ on-run-end: models: snowplow_normalize: +materialized: table - +file_format: delta + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" +bind: false base: manifest: diff --git a/integration_tests/.scripts/integration_tests.sh b/integration_tests/.scripts/integration_tests.sh index 096d8cb..11d722b 100755 --- a/integration_tests/.scripts/integration_tests.sh +++ b/integration_tests/.scripts/integration_tests.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" @@ -23,13 +23,11 @@ fi for db in ${DATABASES[@]}; do - if [ $db == 'bigquery' ]; then - echo "Snowplow web integration tests: Seeding data and doing first run" - - eval "dbt seed --target $db --full-refresh" || exit 1; - - eval "dbt run --target $db --full-refresh" || exit 1; + if [[ "$db" == "bigquery" ]]; then + echo "Snowplow integration tests: Seeding data and doing first run" + eval "dbt seed --target $db --full-refresh" || exit 1 + eval "dbt run --target $db --full-refresh" || exit 1 fi echo "Snowplow normalize integration tests: snakeify case" diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 5476748..76fe5ef 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -67,12 +67,13 @@ integration_tests: token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 4 - # spark: - # type: spark - # method: odbc - # driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - # schema: "github_snwplow_web_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - # host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - # token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - # endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - # threads: 4 + spark_iceberg: + type: spark + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 4 \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 3663550..96515d6 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -23,6 +23,9 @@ clean-targets: quoting: identifier: false schema: false + database: "{{ true if target.type in ['bigquery','databricks'] else false }}" + + vars: snowplow_normalize: @@ -34,15 +37,18 @@ vars: models: snowplow_normalize_integration_tests: + +materialized: table bind: false +schema: "sp_normalize_int_test" dummy_model: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type == 'databricks' | as_bool() }}" snowflake: +enabled: "{{ target.type == 'snowflake' | as_bool() }}" + spark: + +enabled: "{{ target.type == 'spark' | as_bool() }}" seeds: quote_columns: false diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index b063d74..342f496 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -120,6 +120,53 @@ It runs 9 tests: {% endmacro %} +{% macro spark__test_normalize_events() %} + + -- Main difference here is that spark doesnt need the catalog in the from clause + + {% set expected_dict = { + "flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", + "multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" + } %} + + {% set results_dict ={ + "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), + "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), + "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), + "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), + "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), + "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') + } + %} + + + {# {{ print(results_dict['flat_cols_only'])}} #} + {# {{ print(results_dict['sde_plus_cols'])}} #} + {# {{ print(results_dict['sde_plus_cols_w_alias'])}} #} + {# {{ print(results_dict['sde_plus_1_context'])}} #} + {# {{ print(results_dict['sde_plus_2_context'])}} #} + {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} + {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['multiple_base_events'])}} #} + {# {{ print(results_dict['multiple_sde_events'])}} #} + + + {{ dbt_unittest.assert_dict_equals(expected_dict, results_dict) }} + + +{% endmacro %} + + {% macro snowflake__test_normalize_events() %} {% set expected_dict = { diff --git a/integration_tests/macros/test_users_table.sql b/integration_tests/macros/test_users_table.sql index 3be634f..519692f 100644 --- a/integration_tests/macros/test_users_table.sql +++ b/integration_tests/macros/test_users_table.sql @@ -71,6 +71,49 @@ It runs 6 tests: {% endmacro %} +{% macro spark__test_users_table() %} + -- Main difference here is that spark doesnt need the catalog in the from clause + {% set expected_dict = { + "1_context" : "with defined_user_id as ( select user_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "2_context" : "with defined_user_id as ( select user_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field" : "with defined_user_id as ( select test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_sde" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_context" : "with defined_user_id as ( select CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1[0].test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both_w_alias" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as my_user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by my_user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where my_user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both_w_alias_and_flat" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as my_user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table , app_id , network_user_id -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by my_user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where my_user_id is not null ) select * except (rn) from users_ordering where rn = 1" + } %} + + + + {% set results_dict ={ + "1_context" : snowplow_normalize.users_table('user_id', '', '', ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], remove_new_event_check = true).split()|join(' '), + "2_context" : snowplow_normalize.users_table('user_id', '', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field" : snowplow_normalize.users_table('testId', '', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_sde" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_context" : snowplow_normalize.users_table('testId', '', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_both" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_both_w_alias" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], 'my_user_id', remove_new_event_check = true).split()|join(' '), + "custom_user_field_both_w_alias_and_flat" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], 'my_user_id', ['app_id', 'network_user_id'], remove_new_event_check = true).split()|join(' '), + } + %} + + + {# {{ print(results_dict['1_context'])}} #} + {# {{ print(results_dict['2_context'])}} #} + {# {{ print(results_dict['custom_user_field'])}} #} + {# {{ print(results_dict['custom_user_field_sde'])}} #} + {# {{ print(results_dict['custom_user_field_context'])}} #} + {# {{ print(results_dict['custom_user_field_both'])}} #} + {# {{ print(results_dict['custom_user_field_both_w_alias'])}} #} + {# {{ print(results_dict['custom_user_field_both_w_alias_and_flat'])}} #} + + + {{ dbt_unittest.assert_equals(expected_dict, results_dict) }} + + +{% endmacro %} + {% macro databricks__test_users_table() %} {% set expected_dict = { diff --git a/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql b/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql new file mode 100644 index 0000000..39d0df8 --- /dev/null +++ b/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql @@ -0,0 +1,12 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +{{ config( + tags = "snowplow_normalize_incremental", +) }} + +select 1 as dummy from {{ ref ('snowplow_normalize_base_events_this_run')}} diff --git a/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql new file mode 100644 index 0000000..6923b1a --- /dev/null +++ b/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql @@ -0,0 +1,10 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +-- we don't actually use this data at all, we just need the model so the graph can build +select * +from {{ ref('snowplow_norm_dummy_events') }} diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index ee0f784..7e35ca2 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -159,7 +159,7 @@ where {%- endif -%} {% endmacro %} -{% macro databricks__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} +{% macro spark__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} {# Remove down to major version for Databricks columns, drop 2 last _X values #} {%- set sde_cols_clean = [] -%} {%- for ind in range(sde_cols|length) -%} diff --git a/macros/users_table.sql b/macros/users_table.sql index b389c7b..d0d0194 100644 --- a/macros/users_table.sql +++ b/macros/users_table.sql @@ -167,7 +167,7 @@ where rn = 1 {% endmacro %} -{% macro databricks__users_table(user_id_field = 'user_id', user_id_sde = '', user_id_context = '', user_cols = [], user_keys = [], user_types = [], user_id_alias = 'user_id', flat_cols = [], remove_new_event_check = false) %} +{% macro spark__users_table(user_id_field = 'user_id', user_id_sde = '', user_id_context = '', user_cols = [], user_keys = [], user_types = [], user_id_alias = 'user_id', flat_cols = [], remove_new_event_check = false) %} {# Remove down to major version for Databricks columns, drop 2 last _X values #} {%- set user_cols_clean = [] -%} {%- for ind in range(user_cols|length) -%} diff --git a/models/base/scratch/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowplow_normalize_base_events_this_run.sql index 46acd94..966ebd5 100644 --- a/models/base/scratch/snowplow_normalize_base_events_this_run.sql +++ b/models/base/scratch/snowplow_normalize_base_events_this_run.sql @@ -14,24 +14,38 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} -select - a.* +with prep AS ( -from {{ var('snowplow__events') }} as a + select + a.* -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% if target.type not in ['databricks','snowflake','bigquery'] %} + , row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.dvce_sent_tstamp {%- endif %}) as rn {% endif %} - and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} >= {{ lower_limit }} - and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} <= {{ upper_limit }} - {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} - and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} - and a.derived_tstamp <= {{ upper_limit }} + + from {{ var('snowplow__events') }} as a + where + {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} + {% if var("snowplow__days_late_allowed") == -1 %} + 1 = 1 + {% else %} + a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} >= {{ lower_limit }} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} <= {{ upper_limit }} + {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} + {# We are doing the branching in order not to do the qualify in the case of spark, as it does not support it #} + {% if target.type in ['databricks','snowflake','bigquery'] %} + qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 {% endif %} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} +) -qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 +SELECT * +FROM prep +{% if target.type not in ['databricks','snowflake','bigquery'] %} +WHERE rn = 1 +{% endif %} \ No newline at end of file diff --git a/packages.yml b/packages.yml index 6da1d8c..bf44732 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - package: snowplow/snowplow_utils - version: [">=0.16.2", "<0.17.0"] + version: [">=0.17.0", "<0.18.0"]