From f8521e339ffa78329ca114bc10d6a76893b9932c Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Tue, 3 Dec 2024 15:40:44 +0200 Subject: [PATCH] Pe 6540 bq new loader (#48) --- .../source/snowplow_norm_dummy_events.csv | 4 +- integration_tests/dbt_project.yml | 1 + .../macros/test_normalize_events.sql | 5 +- .../bigquery/snowplow_normalize_stg.sql | 20 +++++-- macros/normalize_events.sql | 53 ++++++++++++++----- 5 files changed, 60 insertions(+), 23 deletions(-) diff --git a/integration_tests/data/source/snowplow_norm_dummy_events.csv b/integration_tests/data/source/snowplow_norm_dummy_events.csv index b80f0be..3840de2 100644 --- a/integration_tests/data/source/snowplow_norm_dummy_events.csv +++ b/integration_tests/data/source/snowplow_norm_dummy_events.csv @@ -1,2 +1,2 @@ -event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5 -'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" +event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2 +'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index c92a6fe..a457d14 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -75,3 +75,4 @@ seeds: contexts_test2_1_0_3: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_4: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_5: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" + contexts_test4_1: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index 151185a..e00e8fa 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -31,13 +31,13 @@ It runs 9 tests: {% macro bigquery__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as my_alias_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as my_alias_test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_1_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} @@ -45,13 +45,13 @@ It runs 9 tests: {% set results_dict ={ - "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } @@ -65,6 +65,7 @@ It runs 9 tests: {# {{ print(results_dict['sde_plus_2_context'])}} #} {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['context_only_new_loader'])}} #} {# {{ print(results_dict['multiple_base_events'])}} #} {# {{ print(results_dict['multiple_sde_events'])}} #} diff --git a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql index 1f79d0c..03b4f8c 100644 --- a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql +++ b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql @@ -8,21 +8,23 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select * - except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1,contexts_test4_2), JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1, JSON_EXTRACT_ARRAY(contexts_test2_1_0_2) AS contexts_test2_1_0_2, JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3, JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4, - JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5 + JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5, + JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1, + JSON_EXTRACT_ARRAY(contexts_test4_2) AS contexts_test4_2 from {{ ref('snowplow_norm_dummy_events') }} ) -- recreate repeated record field i.e. array of structs as is originally in BQ events table select - * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2), -- order is reversed to test the aliasing of the coalesced columns struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0, struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1, @@ -56,7 +58,15 @@ select array( select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) from unnest(contexts_test2_1_0_5) as json_array - ) as contexts_test2_1_0_5 - + ) as contexts_test2_1_0_5, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_1) as json_array + ) as contexts_test4_1, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_2) as json_array + ) as contexts_test4_2 + from prep diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index b3c86a7..ca07fd9 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -65,16 +65,42 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} -{# Remove down to major version for bigquery combine columns macro, drop 2 last _X values #} -{%- set sde_cols_clean = [] -%} -{%- for ind in range(sde_cols|length) -%} - {% do sde_cols_clean.append('_'.join(sde_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} -{%- set context_cols_clean = [] -%} -{%- for ind in range(context_cols|length) -%} - {% do context_cols_clean.append('_'.join(context_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} + {# Handle both versioned and unversioned column names #} + {%- set re = modules.re -%} + + {# + This regex pattern handles column versioning in Snowplow contexts and self-describing events. + It specifically targets three-part semantic versions (e.g., field_1_2_3) while preserving + one-part (field_1) and two-part (field_1_2) versions. + + Pattern breakdown: '(_\\d+)_\\d+_\\d+$' + - (_\\d+) : Capture group that matches an underscore followed by one or more digits + This captures the major version number (e.g., "_1" in "field_1_2_3") + - _\\d+ : Matches an underscore and one or more digits (minor version) + - _\\d+ : Matches an underscore and one or more digits (patch version) + - $ : Ensures the pattern only matches at the end of the string + The replacement pattern '\\1' keeps only the captured major version. + + Examples: + - field_1 -> field_1 (no change - only has major version) + - field_1_2 -> field_1_2 (no change - has major and minor versions) + - field_1_2_3 -> field_1 (transforms - removes minor and patch versions) + #} + {%- set version_pattern = '(_\\d+)_\\d+_\\d+$' -%} + + {%- set sde_cols_clean = [] -%} + {%- for col in sde_cols -%} + {# Get the base name for combine_column_versions to work with #} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} + {% do sde_cols_clean.append(clean_name) -%} + {%- endfor -%} + + {%- set context_cols_clean = [] -%} + {%- for col in context_cols -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} + {% do context_cols_clean.append(clean_name) -%} + {%- endfor -%} {# Replace keys with snake_case where needed #} {%- set sde_keys_clean = [] -%} {%- set context_keys_clean = [] -%} @@ -86,7 +112,6 @@ where {%- endfor -%} {% do sde_keys_clean.append(sde_key_clean) -%} {%- endfor -%} - {%- for ind1 in range(context_keys|length) -%} {%- set context_key_clean = [] -%} {%- for ind2 in range(context_keys[ind1]|length) -%} @@ -119,10 +144,10 @@ select {%- set required_aliases = sde_keys_clean[col_ind] -%} {%- endif -%} {%- set sde_col_list = snowplow_utils.combine_column_versions( - relation=ref('snowplow_normalize_base_events_this_run'), - column_prefix=col.lower(), - required_fields = zip(sde_keys_clean[col_ind], required_aliases) - ) -%} + relation=ref('snowplow_normalize_base_events_this_run'), + column_prefix=col.lower(), + required_fields = zip(sde_keys_clean[col_ind], required_aliases) + ) -%} {%- for field, key_ind in zip(sde_col_list, range(sde_col_list|length)) -%} {# Loop over each key within the column, appling the bespoke alias as needed #} , {{field}} {% endfor -%}