Skip to content

Commit

Permalink
Revert renaming of manifest columns
Browse files Browse the repository at this point in the history
  • Loading branch information
agnessnowplow committed Dec 19, 2024
1 parent 56f6fd7 commit 7cf7fde
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 64 deletions.
2 changes: 1 addition & 1 deletion integration_tests/.scripts/unit_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ for db in ${DATABASES[@]}; do
eval "dbt test --select test_get_incremental_manifest_status_macro --store-failures --target $db" || exit 1;
fi

# This macro returns returns the array: [min_first_processed_load_tstamp, max_first_processed_load_tstamp, min_last_processed_load_tstamp, max_last_processed_load_tstamp, models_matched_from_manifest, sync_count, has_matched_all_models]
# This macro returns returns the array: [min_first_success, max_first_success, min_last_success, max_last_success, models_matched_from_manifest, sync_count, has_matched_all_models]
# Not too important to test, it is effectively returns a min/max/count from values in the manifest based on the models in the run
# Inputs are read from a seed file, we can selectively test the different inputs depending on the models in run array so no need for it to contain exact scenarios upfront

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
model,first_processed_load_tstamp,last_processed_load_tstamp
model,first_success,last_success
a,2020-01-01 00:00:00,2020-01-02 00:00:00
b,2020-01-02 00:00:00,2020-01-03 00:00:00
c,2020-01-03 00:00:00,2020-01-04 00:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
test_case,min_first_processed_load_tstamp,max_first_processed_load_tstamp,min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,sync_count,has_matched_all_models
test_case,min_first_success,max_first_success,min_last_success,max_last_success,models_matched_from_manifest,sync_count,has_matched_all_models
all model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-04 00:00:00,3,3,true
some model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-03 00:00:00,2,2,false
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit
min_last_success,max_last_success,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit
,,0,FALSE,0,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00
2021-03-01 00:00:00+00:00,2021-03-01 00:00:00+00:00,10,FALSE,1,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00
2021-03-01 18:00:00+00:00,2021-03-01 18:00:00+00:00,10,TRUE,1,2021-01-01,2021-03-01 18:00:00+00:00,2021-03-31 18:00:00+00:00
Expand Down
12 changes: 6 additions & 6 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ seeds:
max_last_success: timestamp
data_get_incremental_manifest_status_t:
+column_types:
first_processed_load_tstamp: timestamp
last_processed_load_tstamp: timestamp
first_success: timestamp
last_success: timestamp
data_get_incremental_manifest_status_t_expected:
+column_types:
min_first_processed_load_tstamp: timestamp
max_first_processed_load_tstamp: timestamp
min_last_processed_load_tstamp: timestamp
max_last_processed_load_tstamp: timestamp
min_first_success: timestamp
max_first_success: timestamp
min_last_success: timestamp
max_last_success: timestamp
data_get_run_limits:
+column_types:
min_last_success: timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
'all model_in_run exist in manifest' as test_case,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_success,
{{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_success,
{{all_models[4]}} as models_matched_from_manifest,
{{all_models[5]}} as sync_count,
{{all_models[6]}} as has_matched_all_models
Expand All @@ -23,10 +23,10 @@ union all

select
'some model_in_run exist in manifest' as test_case,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success,
{{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_success,
{{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_success,
{{partial_models[4]}} as models_matched_from_manifest,
{{partial_models[5]}} as sync_count,
{{partial_models[6]}} as has_matched_all_models
Expand All @@ -35,10 +35,10 @@ select

select
test_case,
min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
cast(has_matched_all_models as {{ dbt.type_boolean() }}) as has_matched_all_models
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do test_data.update({key.lower(): value}) %}
{% endfor %}

{% for i in range(test_data.min_last_processed_load_tstamp|length) %}
{% for i in range(test_data.min_last_success|length) %}

{# iteratively pass each row of test data into get_run_limits_t() and execute returned query
min_first_processed_load_tstamp and max_first_processed_load_tstamp are not yet used, placeholder in place #}
min_first_success and max_first_success are not yet used, placeholder in place #}
{%- set results = run_query(snowplow_utils.get_run_limits_t('9999-01-01 00:00:00',
'9999-01-01 00:00:00',
test_data.min_last_processed_load_tstamp[i],
test_data.max_last_processed_load_tstamp[i],
test_data.min_last_success[i],
test_data.max_last_success[i],
test_data.models_matched_from_manifest[i],
test_data.sync_count[i],
test_data.has_matched_all_models[i],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) model,
cast('1970-01-01' as {{ type_timestamp() }}) as first_processed_load_tstamp,
cast('1970-01-01' as {{ type_timestamp() }}) as last_processed_load_tstamp
cast('1970-01-01' as {{ type_timestamp() }}) as first_success,
cast('1970-01-01' as {{ type_timestamp() }}) as last_success
)

select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% set status_query %}
select
min(first_processed_load_tstamp) as min_first_processed_load_tstamp,
max(first_processed_load_tstamp) as max_first_processed_load_tstamp,
min(last_processed_load_tstamp) as min_last_processed_load_tstamp,
max(last_processed_load_tstamp) as max_last_processed_load_tstamp,
min(first_success) as min_first_success,
max(first_success) as max_first_success,
min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models,
count(distinct last_processed_load_tstamp) as sync_count
count(distinct last_success) as sync_count
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}
Expand All @@ -40,18 +40,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% if execute %}

{% set min_first_processed_load_tstamp = results.columns[0].values()[0] %}
{% set max_first_processed_load_tstamp = results.columns[1].values()[0] %}
{% set min_last_processed_load_tstamp = results.columns[2].values()[0] %}
{% set max_last_processed_load_tstamp = results.columns[3].values()[0] %}
{% set min_first_success = results.columns[0].values()[0] %}
{% set max_first_success = results.columns[1].values()[0] %}
{% set min_last_success = results.columns[2].values()[0] %}
{% set max_last_success = results.columns[3].values()[0] %}
{% set models_matched_from_manifest = results.columns[4].values()[0] %}
{% set sync_count = results.columns[5].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}

{{ return([min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{{ return([min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models]) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{# Returns the sql to calculate the lower/upper limits of the run #}

{% macro get_run_limits_t(min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{% macro get_run_limits_t(min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models,
start_date) -%}

{% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %}
{% set min_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(min_last_processed_load_tstamp) %}
{% set max_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(max_last_processed_load_tstamp) %}
{% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %}
{% set max_last_success = snowplow_utils.cast_to_tstamp(max_last_success) %}

{% if not execute %}
{{ return('') }}
Expand Down Expand Up @@ -53,7 +53,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: New Snowplow incremental model. Backfilling") %}
{% set run_limits_query %}
select {{ start_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit
{% endset %}
{% endif %}
Expand All @@ -77,9 +77,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: Snowplow incremental models out of sync. Syncing") %}

{% set run_limits_query %}
select {{ min_last_processed_load_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }}) as upper_limit
select {{ min_last_success }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}) as upper_limit
{% endset %}

{# State 5: If all models in the run exists in the manifest, none are out of sync, it is a standard incremental run #}
Expand All @@ -92,16 +92,16 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
select

{% if var("snowplow__run_type", "incremental") == 'incremental' %}
{{ min_last_processed_load_tstamp }} as lower_limit,
{{ min_last_success }} as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'current_day_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_success }}) as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'last_n_days_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_success }}) as lower_limit,
{% else %}
{{ exceptions.raise_compiler_error("Snowplow Error: Input for variable snowplow__run_type not recognised. Input must be 'incremental', 'current_day_incremental' or 'last_n_days_incremental''. Input given: " ~ var("snowplow__run_type")) }}
{% endif %}

least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }},
least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }},
{{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit
{% endset %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% set last_success_query %}
select
b.model,
a.last_processed_load_tstamp,
a.first_processed_load_tstamp
a.last_success,
a.first_success

from
(select max(load_tstamp) as last_processed_load_tstamp,
min(load_tstamp) as first_processed_load_tstamp from {{ base_events_table }}) a,
(select max(load_tstamp) as last_success,
min(load_tstamp) as first_success from {{ base_events_table }}) a,
({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b

where a.last_processed_load_tstamp is not null -- if run contains no data don't add to manifest
where a.last_success is not null -- if run contains no data don't add to manifest
{% endset %}

merge into {{ manifest_table }} m
using ( {{ last_success_query }} ) s
on m.model = s.model
when matched then
update set last_processed_load_tstamp = greatest(m.last_processed_load_tstamp, s.last_processed_load_tstamp),
first_processed_load_tstamp = coalesce(m.first_processed_load_tstamp, s.first_processed_load_tstamp)
update set last_success = greatest(m.last_success, s.last_success),
first_success = coalesce(m.first_success, s.first_success)

when not matched then
insert (model, last_processed_load_tstamp, first_processed_load_tstamp)
values (s.model, s.last_processed_load_tstamp, s.first_processed_load_tstamp);
insert (model, last_success, first_success)
values (s.model, s.last_success, s.first_success);

{% if target.type == 'snowflake' %}
commit;
Expand Down

0 comments on commit 7cf7fde

Please sign in to comment.