Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert renaming of manifest columns #192

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/.scripts/unit_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ for db in ${DATABASES[@]}; do
eval "dbt test --select test_get_incremental_manifest_status_macro --store-failures --target $db" || exit 1;
fi

# This macro returns returns the array: [min_first_processed_load_tstamp, max_first_processed_load_tstamp, min_last_processed_load_tstamp, max_last_processed_load_tstamp, models_matched_from_manifest, sync_count, has_matched_all_models]
# This macro returns returns the array: [min_first_success, max_first_success, min_last_success, max_last_success, models_matched_from_manifest, sync_count, has_matched_all_models]
# Not too important to test, it is effectively returns a min/max/count from values in the manifest based on the models in the run
# Inputs are read from a seed file, we can selectively test the different inputs depending on the models in run array so no need for it to contain exact scenarios upfront

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
model,first_processed_load_tstamp,last_processed_load_tstamp
model,first_success,last_success
a,2020-01-01 00:00:00,2020-01-02 00:00:00
b,2020-01-02 00:00:00,2020-01-03 00:00:00
c,2020-01-03 00:00:00,2020-01-04 00:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
test_case,min_first_processed_load_tstamp,max_first_processed_load_tstamp,min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,sync_count,has_matched_all_models
test_case,min_first_success,max_first_success,min_last_success,max_last_success,models_matched_from_manifest,sync_count,has_matched_all_models
all model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-04 00:00:00,3,3,true
some model_in_run exist in manifest,2020-01-01 00:00:00,2020-01-03 00:00:00,2020-01-02 00:00:00,2020-01-03 00:00:00,2,2,false
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
min_last_processed_load_tstamp,max_last_processed_load_tstamp,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit
min_last_success,max_last_success,models_matched_from_manifest,has_matched_all_models,sync_count,start_date,lower_limit,upper_limit
,,0,FALSE,0,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00
2021-03-01 00:00:00+00:00,2021-03-01 00:00:00+00:00,10,FALSE,1,2021-01-01,2021-01-01 00:00:00+00:00,2021-01-31 00:00:00+00:00
2021-03-01 18:00:00+00:00,2021-03-01 18:00:00+00:00,10,TRUE,1,2021-01-01,2021-03-01 18:00:00+00:00,2021-03-31 18:00:00+00:00
Expand Down
12 changes: 6 additions & 6 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ seeds:
max_last_success: timestamp
data_get_incremental_manifest_status_t:
+column_types:
first_processed_load_tstamp: timestamp
last_processed_load_tstamp: timestamp
first_success: timestamp
last_success: timestamp
data_get_incremental_manifest_status_t_expected:
+column_types:
min_first_processed_load_tstamp: timestamp
max_first_processed_load_tstamp: timestamp
min_last_processed_load_tstamp: timestamp
max_last_processed_load_tstamp: timestamp
min_first_success: timestamp
max_first_success: timestamp
min_last_success: timestamp
max_last_success: timestamp
data_get_run_limits:
+column_types:
min_last_success: timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
'all model_in_run exist in manifest' as test_case,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[2]) }} as min_last_success,
{{ snowplow_utils.cast_to_tstamp(all_models[3]) }} as max_last_success,
{{all_models[4]}} as models_matched_from_manifest,
{{all_models[5]}} as sync_count,
{{all_models[6]}} as has_matched_all_models
Expand All @@ -23,10 +23,10 @@ union all

select
'some model_in_run exist in manifest' as test_case,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_processed_load_tstamp,
{{ snowplow_utils.cast_to_tstamp(all_models[0]) }} as min_first_success,
{{ snowplow_utils.cast_to_tstamp(all_models[1]) }} as max_first_success,
{{ snowplow_utils.cast_to_tstamp(partial_models[2]) }} as min_last_success,
{{ snowplow_utils.cast_to_tstamp(partial_models[3]) }} as max_last_success,
{{partial_models[4]}} as models_matched_from_manifest,
{{partial_models[5]}} as sync_count,
{{partial_models[6]}} as has_matched_all_models
Expand All @@ -35,10 +35,10 @@ select

select
test_case,
min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
cast(has_matched_all_models as {{ dbt.type_boolean() }}) as has_matched_all_models
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do test_data.update({key.lower(): value}) %}
{% endfor %}

{% for i in range(test_data.min_last_processed_load_tstamp|length) %}
{% for i in range(test_data.min_last_success|length) %}

{# iteratively pass each row of test data into get_run_limits_t() and execute returned query
min_first_processed_load_tstamp and max_first_processed_load_tstamp are not yet used, placeholder in place #}
min_first_success and max_first_success are not yet used, placeholder in place #}
{%- set results = run_query(snowplow_utils.get_run_limits_t('9999-01-01 00:00:00',
'9999-01-01 00:00:00',
test_data.min_last_processed_load_tstamp[i],
test_data.max_last_processed_load_tstamp[i],
test_data.min_last_success[i],
test_data.max_last_success[i],
test_data.models_matched_from_manifest[i],
test_data.sync_count[i],
test_data.has_matched_all_models[i],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

where load_tstamp >= {{ lower_limit }} and load_tstamp <= {{ upper_limit }}

and derived_tstamp >= {{ lower_limit }} and derived_tstamp <= {{ upper_limit }}

and {{ snowplow_utils.app_id_filter(app_ids) }}

and {{ snowplow_utils.event_name_filter(event_names) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) model,
cast('1970-01-01' as {{ type_timestamp() }}) as first_processed_load_tstamp,
cast('1970-01-01' as {{ type_timestamp() }}) as last_processed_load_tstamp
cast('1970-01-01' as {{ type_timestamp() }}) as first_success,
cast('1970-01-01' as {{ type_timestamp() }}) as last_success
)

select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% set status_query %}
select
min(first_processed_load_tstamp) as min_first_processed_load_tstamp,
max(first_processed_load_tstamp) as max_first_processed_load_tstamp,
min(last_processed_load_tstamp) as min_last_processed_load_tstamp,
max(last_processed_load_tstamp) as max_last_processed_load_tstamp,
min(first_success) as min_first_success,
max(first_success) as max_first_success,
min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models,
count(distinct last_processed_load_tstamp) as sync_count
count(distinct last_success) as sync_count
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}
Expand All @@ -40,18 +40,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% if execute %}

{% set min_first_processed_load_tstamp = results.columns[0].values()[0] %}
{% set max_first_processed_load_tstamp = results.columns[1].values()[0] %}
{% set min_last_processed_load_tstamp = results.columns[2].values()[0] %}
{% set max_last_processed_load_tstamp = results.columns[3].values()[0] %}
{% set min_first_success = results.columns[0].values()[0] %}
{% set max_first_success = results.columns[1].values()[0] %}
{% set min_last_success = results.columns[2].values()[0] %}
{% set max_last_success = results.columns[3].values()[0] %}
{% set models_matched_from_manifest = results.columns[4].values()[0] %}
{% set sync_count = results.columns[5].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}

{{ return([min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{{ return([min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models]) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{# Returns the sql to calculate the lower/upper limits of the run #}

{% macro get_run_limits_t(min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{% macro get_run_limits_t(min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models,
start_date) -%}

{% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %}
{% set min_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(min_last_processed_load_tstamp) %}
{% set max_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(max_last_processed_load_tstamp) %}
{% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %}
{% set max_last_success = snowplow_utils.cast_to_tstamp(max_last_success) %}

{% if not execute %}
{{ return('') }}
Expand Down Expand Up @@ -53,7 +53,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: New Snowplow incremental model. Backfilling") %}
{% set run_limits_query %}
select {{ start_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit
{% endset %}
{% endif %}
Expand All @@ -77,9 +77,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: Snowplow incremental models out of sync. Syncing") %}

{% set run_limits_query %}
select {{ min_last_processed_load_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }}) as upper_limit
select {{ min_last_success }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}) as upper_limit
{% endset %}

{# State 5: If all models in the run exists in the manifest, none are out of sync, it is a standard incremental run #}
Expand All @@ -92,16 +92,16 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
select

{% if var("snowplow__run_type", "incremental") == 'incremental' %}
{{ min_last_processed_load_tstamp }} as lower_limit,
{{ min_last_success }} as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'current_day_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_success }}) as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'last_n_days_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_success }}) as lower_limit,
{% else %}
{{ exceptions.raise_compiler_error("Snowplow Error: Input for variable snowplow__run_type not recognised. Input must be 'incremental', 'current_day_incremental' or 'last_n_days_incremental''. Input given: " ~ var("snowplow__run_type")) }}
{% endif %}

least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }},
least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }},
{{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit
{% endset %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% set last_success_query %}
select
b.model,
a.last_processed_load_tstamp,
a.first_processed_load_tstamp
a.last_success,
a.first_success

from
(select max(load_tstamp) as last_processed_load_tstamp,
min(load_tstamp) as first_processed_load_tstamp from {{ base_events_table }}) a,
(select max(load_tstamp) as last_success,
min(load_tstamp) as first_success from {{ base_events_table }}) a,
({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b

where a.last_processed_load_tstamp is not null -- if run contains no data don't add to manifest
where a.last_success is not null -- if run contains no data don't add to manifest
{% endset %}

merge into {{ manifest_table }} m
using ( {{ last_success_query }} ) s
on m.model = s.model
when matched then
update set last_processed_load_tstamp = greatest(m.last_processed_load_tstamp, s.last_processed_load_tstamp),
first_processed_load_tstamp = coalesce(m.first_processed_load_tstamp, s.first_processed_load_tstamp)
update set last_success = greatest(m.last_success, s.last_success),
first_success = coalesce(m.first_success, s.first_success)

when not matched then
insert (model, last_processed_load_tstamp, first_processed_load_tstamp)
values (s.model, s.last_processed_load_tstamp, s.first_processed_load_tstamp);
insert (model, last_success, first_success)
values (s.model, s.last_success, s.first_success);

{% if target.type == 'snowflake' %}
commit;
Expand Down
Loading