From 7cf7fde151983c7f75e2c4093b03d8a79f71cc34 Mon Sep 17 00:00:00 2001 From: Agnes Kiss Date: Thu, 19 Dec 2024 09:00:12 +0000 Subject: [PATCH 1/2] Revert renaming of manifest columns --- integration_tests/.scripts/unit_tests.sh | 2 +- ...data_get_incremental_manifest_status_t.csv | 2 +- ...incremental_manifest_status_t_expected.csv | 2 +- .../data_get_run_limits_t.csv | 2 +- integration_tests/dbt_project.yml | 12 ++++---- ...et_incremental_manifest_status_t_macro.sql | 24 ++++++++-------- .../test_get_run_limits_t_macro.sql | 8 +++--- ...create_snowplow_incremental_manifest_t.sql | 4 +-- .../get_incremental_manifest_status_t.sql | 26 ++++++++--------- .../incremental_hooks/get_run_limits_t.sql | 28 +++++++++---------- .../update_incremental_manifest_table_t.sql | 18 ++++++------ 11 files changed, 64 insertions(+), 64 deletions(-) diff --git a/integration_tests/.scripts/unit_tests.sh b/integration_tests/.scripts/unit_tests.sh index 3f74e396..3b4f1738 100644 --- a/integration_tests/.scripts/unit_tests.sh +++ b/integration_tests/.scripts/unit_tests.sh @@ -75,7 +75,7 @@ for db in ${DATABASES[@]}; do eval "dbt test --select test_get_incremental_manifest_status_macro --store-failures --target $db" || exit 1; fi - # This macro returns returns the array: [min_first_processed_load_tstamp, max_first_processed_load_tstamp, min_last_processed_load_tstamp, max_last_processed_load_tstamp, models_matched_from_manifest, sync_count, has_matched_all_models] + # This macro returns returns the array: [min_first_success, max_first_success, min_last_success, max_last_success, models_matched_from_manifest, sync_count, has_matched_all_models] # Not too important to test, it is effectively returns a min/max/count from values in the manifest based on the models in the run # Inputs are read from a seed file, we can selectively test the different inputs depending on the models in run array so no need for it to contain exact scenarios upfront diff --git a/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t.csv b/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t.csv index 33048b85..bd96e93a 100644 --- a/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t.csv +++ b/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t.csv @@ -1,4 +1,4 @@ -model,first_processed_load_tstamp,last_processed_load_tstamp +model,first_success,last_success a,2020-01-01 00:00:00,2020-01-02 00:00:00 b,2020-01-02 00:00:00,2020-01-03 00:00:00 c,2020-01-03 00:00:00,2020-01-04 00:00:00 diff --git a/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t_expected.csv b/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t_expected.csv index 3c70e017..ab911589 100644 --- a/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t_expected.csv +++ b/integration_tests/data/incremental_hooks/data_get_incremental_manifest_status_t_expected.csv @@ -1,3 +1,3 @@ -test_case,min_first_processed_load_tstamp,max_first_processed_load_tstamp,min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,sync_count,has_matched_all_models +test_case,min_first_success,max_first_success,min_last_success,max_last_success,models_matched_from_manifest,sync_count,has_matched_all_models all model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-04 00:00:00,3,3,true some model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-03 00:00:00,2,2,false diff --git a/integration_tests/data/incremental_hooks/data_get_run_limits_t.csv b/integration_tests/data/incremental_hooks/data_get_run_limits_t.csv index 2c43e469..6f0a9f29 100644 --- a/integration_tests/data/incremental_hooks/data_get_run_limits_t.csv +++ b/integration_tests/data/incremental_hooks/data_get_run_limits_t.csv @@ -1,4 +1,4 @@ -min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit +min_last_success,max_last_success,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit ,,0,FALSE,0,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00 2021-03-01 00:00:00+00:00,2021-03-01 00:00:00+00:00,10,FALSE,1,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00 2021-03-01 18:00:00+00:00,2021-03-01 18:00:00+00:00,10,TRUE,1,2021-01-01,2021-03-01 18:00:00+00:00,2021-03-31 18:00:00+00:00 diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 2100b2f4..83392302 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -141,14 +141,14 @@ seeds: max_last_success: timestamp data_get_incremental_manifest_status_t: +column_types: - first_processed_load_tstamp: timestamp - last_processed_load_tstamp: timestamp + first_success: timestamp + last_success: timestamp data_get_incremental_manifest_status_t_expected: +column_types: - min_first_processed_load_tstamp: timestamp - max_first_processed_load_tstamp: timestamp - min_last_processed_load_tstamp: timestamp - max_last_processed_load_tstamp: timestamp + min_first_success: timestamp + max_first_success: timestamp + min_last_success: timestamp + max_last_success: timestamp data_get_run_limits: +column_types: min_last_success: timestamp diff --git a/integration_tests/models/unit_tests/test_get_incremental_manifest_status_t_macro/test_get_incremental_manifest_status_t_macro.sql b/integration_tests/models/unit_tests/test_get_incremental_manifest_status_t_macro/test_get_incremental_manifest_status_t_macro.sql index 7f74736a..fea64acd 100644 --- a/integration_tests/models/unit_tests/test_get_incremental_manifest_status_t_macro/test_get_incremental_manifest_status_t_macro.sql +++ b/integration_tests/models/unit_tests/test_get_incremental_manifest_status_t_macro/test_get_incremental_manifest_status_t_macro.sql @@ -11,10 +11,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select 'all model_in_run exist in manifest' as test_case, - {{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_processed_load_tstamp, + {{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success, + {{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success, + {{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_success, + {{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_success, {{all_models[4]}} as models_matched_from_manifest, {{all_models[5]}} as sync_count, {{all_models[6]}} as has_matched_all_models @@ -23,10 +23,10 @@ union all select 'some model_in_run exist in manifest' as test_case, - {{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_processed_load_tstamp, - {{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_processed_load_tstamp, + {{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success, + {{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success, + {{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_success, + {{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_success, {{partial_models[4]}} as models_matched_from_manifest, {{partial_models[5]}} as sync_count, {{partial_models[6]}} as has_matched_all_models @@ -35,10 +35,10 @@ select select test_case, - min_first_processed_load_tstamp, - max_first_processed_load_tstamp, - min_last_processed_load_tstamp, - max_last_processed_load_tstamp, + min_first_success, + max_first_success, + min_last_success, + max_last_success, models_matched_from_manifest, sync_count, cast(has_matched_all_models as {{ dbt.type_boolean() }}) as has_matched_all_models diff --git a/integration_tests/models/unit_tests/test_get_run_limits_t_macro/test_get_run_limits_t_macro.sql b/integration_tests/models/unit_tests/test_get_run_limits_t_macro/test_get_run_limits_t_macro.sql index 9bfc80cf..8b213b71 100644 --- a/integration_tests/models/unit_tests/test_get_run_limits_t_macro/test_get_run_limits_t_macro.sql +++ b/integration_tests/models/unit_tests/test_get_run_limits_t_macro/test_get_run_limits_t_macro.sql @@ -18,14 +18,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% do test_data.update({key.lower(): value}) %} {% endfor %} -{% for i in range(test_data.min_last_processed_load_tstamp|length) %} +{% for i in range(test_data.min_last_success|length) %} {# iteratively pass each row of test data into get_run_limits_t() and execute returned query - min_first_processed_load_tstamp and max_first_processed_load_tstamp are not yet used, placeholder in place #} + min_first_success and max_first_success are not yet used, placeholder in place #} {%- set results = run_query(snowplow_utils.get_run_limits_t('9999-01-01 00:00:00', '9999-01-01 00:00:00', - test_data.min_last_processed_load_tstamp[i], - test_data.max_last_processed_load_tstamp[i], + test_data.min_last_success[i], + test_data.max_last_success[i], test_data.models_matched_from_manifest[i], test_data.sync_count[i], test_data.has_matched_all_models[i], diff --git a/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_incremental_manifest_t.sql b/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_incremental_manifest_t.sql index 6bf9ed69..72466deb 100644 --- a/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_incremental_manifest_t.sql +++ b/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_incremental_manifest_t.sql @@ -11,8 +11,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select cast(null as {{ snowplow_utils.type_max_string() }}) model, - cast('1970-01-01' as {{ type_timestamp() }}) as first_processed_load_tstamp, - cast('1970-01-01' as {{ type_timestamp() }}) as last_processed_load_tstamp + cast('1970-01-01' as {{ type_timestamp() }}) as first_success, + cast('1970-01-01' as {{ type_timestamp() }}) as last_success ) select * diff --git a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_incremental_manifest_status_t.sql b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_incremental_manifest_status_t.sql index 582ef407..90e151d6 100644 --- a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_incremental_manifest_status_t.sql +++ b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_incremental_manifest_status_t.sql @@ -26,12 +26,12 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set status_query %} select - min(first_processed_load_tstamp) as min_first_processed_load_tstamp, - max(first_processed_load_tstamp) as max_first_processed_load_tstamp, - min(last_processed_load_tstamp) as min_last_processed_load_tstamp, - max(last_processed_load_tstamp) as max_last_processed_load_tstamp, + min(first_success) as min_first_success, + max(first_success) as max_first_success, + min(last_success) as min_last_success, + max(last_success) as max_last_success, coalesce(count(*), 0) as models, - count(distinct last_processed_load_tstamp) as sync_count + count(distinct last_success) as sync_count from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(models_in_run) }}) {% endset %} @@ -40,18 +40,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% if execute %} - {% set min_first_processed_load_tstamp = results.columns[0].values()[0] %} - {% set max_first_processed_load_tstamp = results.columns[1].values()[0] %} - {% set min_last_processed_load_tstamp = results.columns[2].values()[0] %} - {% set max_last_processed_load_tstamp = results.columns[3].values()[0] %} + {% set min_first_success = results.columns[0].values()[0] %} + {% set max_first_success = results.columns[1].values()[0] %} + {% set min_last_success = results.columns[2].values()[0] %} + {% set max_last_success = results.columns[3].values()[0] %} {% set models_matched_from_manifest = results.columns[4].values()[0] %} {% set sync_count = results.columns[5].values()[0] %} {% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %} - {{ return([min_first_processed_load_tstamp, - max_first_processed_load_tstamp, - min_last_processed_load_tstamp, - max_last_processed_load_tstamp, + {{ return([min_first_success, + max_first_success, + min_last_success, + max_last_success, models_matched_from_manifest, sync_count, has_matched_all_models]) }} diff --git a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_run_limits_t.sql b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_run_limits_t.sql index fc4a7012..b5747c64 100644 --- a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_run_limits_t.sql +++ b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/get_run_limits_t.sql @@ -7,18 +7,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {# Returns the sql to calculate the lower/upper limits of the run #} -{% macro get_run_limits_t(min_first_processed_load_tstamp, - max_first_processed_load_tstamp, - min_last_processed_load_tstamp, - max_last_processed_load_tstamp, +{% macro get_run_limits_t(min_first_success, + max_first_success, + min_last_success, + max_last_success, models_matched_from_manifest, sync_count, has_matched_all_models, start_date) -%} {% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %} - {% set min_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(min_last_processed_load_tstamp) %} - {% set max_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(max_last_processed_load_tstamp) %} + {% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %} + {% set max_last_success = snowplow_utils.cast_to_tstamp(max_last_success) %} {% if not execute %} {{ return('') }} @@ -53,7 +53,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% do snowplow_utils.log_message("Snowplow: New Snowplow incremental model. Backfilling") %} {% set run_limits_query %} select {{ start_tstamp }} as lower_limit, - least({{ max_last_processed_load_tstamp }}, + least({{ max_last_success }}, {{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit {% endset %} {% endif %} @@ -77,9 +77,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% do snowplow_utils.log_message("Snowplow: Snowplow incremental models out of sync. Syncing") %} {% set run_limits_query %} - select {{ min_last_processed_load_tstamp }} as lower_limit, - least({{ max_last_processed_load_tstamp }}, - {{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }}) as upper_limit + select {{ min_last_success }} as lower_limit, + least({{ max_last_success }}, + {{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}) as upper_limit {% endset %} {# State 5: If all models in the run exists in the manifest, none are out of sync, it is a standard incremental run #} @@ -92,16 +92,16 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 select {% if var("snowplow__run_type", "incremental") == 'incremental' %} - {{ min_last_processed_load_tstamp }} as lower_limit, + {{ min_last_success }} as lower_limit, {% elif var("snowplow__run_type", "incremental") == 'current_day_incremental'%} - least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_processed_load_tstamp }}) as lower_limit, + least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_success }}) as lower_limit, {% elif var("snowplow__run_type", "incremental") == 'last_n_days_incremental'%} - least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_processed_load_tstamp }}) as lower_limit, + least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_success }}) as lower_limit, {% else %} {{ exceptions.raise_compiler_error("Snowplow Error: Input for variable snowplow__run_type not recognised. Input must be 'incremental', 'current_day_incremental' or 'last_n_days_incremental''. Input given: " ~ var("snowplow__run_type")) }} {% endif %} - least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }}, + least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}, {{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit {% endset %} {% endif %} diff --git a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/update_incremental_manifest_table_t.sql b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/update_incremental_manifest_table_t.sql index 8ef4a14a..a4cb04cd 100644 --- a/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/update_incremental_manifest_table_t.sql +++ b/macros/incrementalize/incrementalize_by_tstamp/incremental_hooks/update_incremental_manifest_table_t.sql @@ -18,27 +18,27 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set last_success_query %} select b.model, - a.last_processed_load_tstamp, - a.first_processed_load_tstamp + a.last_success, + a.first_success from - (select max(load_tstamp) as last_processed_load_tstamp, - min(load_tstamp) as first_processed_load_tstamp from {{ base_events_table }}) a, + (select max(load_tstamp) as last_success, + min(load_tstamp) as first_success from {{ base_events_table }}) a, ({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b - where a.last_processed_load_tstamp is not null -- if run contains no data don't add to manifest + where a.last_success is not null -- if run contains no data don't add to manifest {% endset %} merge into {{ manifest_table }} m using ( {{ last_success_query }} ) s on m.model = s.model when matched then - update set last_processed_load_tstamp = greatest(m.last_processed_load_tstamp, s.last_processed_load_tstamp), - first_processed_load_tstamp = coalesce(m.first_processed_load_tstamp, s.first_processed_load_tstamp) + update set last_success = greatest(m.last_success, s.last_success), + first_success = coalesce(m.first_success, s.first_success) when not matched then - insert (model, last_processed_load_tstamp, first_processed_load_tstamp) - values (s.model, s.last_processed_load_tstamp, s.first_processed_load_tstamp); + insert (model, last_success, first_success) + values (s.model, s.last_success, s.first_success); {% if target.type == 'snowflake' %} commit; From 7eea43b6a9086073643c73dfe86a3e0e32a2cbb2 Mon Sep 17 00:00:00 2001 From: Agnes Kiss Date: Thu, 19 Dec 2024 09:36:53 +0000 Subject: [PATCH 2/2] Remove limit from this run --- .../base/base_create_snowplow_events_this_run_t.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_events_this_run_t.sql b/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_events_this_run_t.sql index 3ad2ebbd..d0640bc6 100644 --- a/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_events_this_run_t.sql +++ b/macros/incrementalize/incrementalize_by_tstamp/base/base_create_snowplow_events_this_run_t.sql @@ -24,8 +24,6 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 where load_tstamp >= {{ lower_limit }} and load_tstamp <= {{ upper_limit }} - and derived_tstamp >= {{ lower_limit }} and derived_tstamp <= {{ upper_limit }} - and {{ snowplow_utils.app_id_filter(app_ids) }} and {{ snowplow_utils.event_name_filter(event_names) }}