Skip to content

Commit

Permalink
Prepare the codebase for spark release (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilias1111 authored Nov 14, 2024
1 parent 79f5eaf commit 541e2e0
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 37 deletions.
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ on-run-end:
models:
snowplow_normalize:
+materialized: table
+file_format: delta
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
+bind: false
base:
manifest:
Expand Down
12 changes: 5 additions & 7 deletions integration_tests/.scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ do
esac
done

declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake")
declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "spark_iceberg")

# set to lower case
DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')"
Expand All @@ -23,13 +23,11 @@ fi

for db in ${DATABASES[@]}; do

if [ $db == 'bigquery' ]; then
echo "Snowplow web integration tests: Seeding data and doing first run"

eval "dbt seed --target $db --full-refresh" || exit 1;

eval "dbt run --target $db --full-refresh" || exit 1;
if [[ "$db" == "bigquery" ]]; then
echo "Snowplow integration tests: Seeding data and doing first run"

eval "dbt seed --target $db --full-refresh" || exit 1
eval "dbt run --target $db --full-refresh" || exit 1
fi

echo "Snowplow normalize integration tests: snakeify case"
Expand Down
19 changes: 10 additions & 9 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ integration_tests:
token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}"
threads: 4

# spark:
# type: spark
# method: odbc
# driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}"
# schema: "github_snwplow_web_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
# host: "{{ env_var('DATABRICKS_TEST_HOST') }}"
# token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}"
# endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}"
# threads: 4
spark_iceberg:
type: spark
method: thrift
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
connect_retries: 5
connect_timeout: 60
threads: 4
8 changes: 7 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ clean-targets:
quoting:
identifier: false
schema: false
database: "{{ true if target.type in ['bigquery','databricks'] else false }}"



vars:
snowplow_normalize:
Expand All @@ -34,15 +37,18 @@ vars:

models:
snowplow_normalize_integration_tests:
+materialized: table
bind: false
+schema: "sp_normalize_int_test"
dummy_model:
bigquery:
+enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
+enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
+enabled: "{{ target.type == 'databricks' | as_bool() }}"
snowflake:
+enabled: "{{ target.type == 'snowflake' | as_bool() }}"
spark:
+enabled: "{{ target.type == 'spark' | as_bool() }}"

seeds:
quote_columns: false
Expand Down
47 changes: 47 additions & 0 deletions integration_tests/macros/test_normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,53 @@ It runs 9 tests:
{% endmacro %}
{% macro spark__test_normalize_events() %}
-- Main difference here is that spark doesnt need the catalog in the from clause
{% set expected_dict = {
"flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
} %}
{% set results_dict ={
"flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '),
"sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '),
"sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '),
"sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '),
"sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '),
"context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ')
}
%}
{# {{ print(results_dict['flat_cols_only'])}} #}
{# {{ print(results_dict['sde_plus_cols'])}} #}
{# {{ print(results_dict['sde_plus_cols_w_alias'])}} #}
{# {{ print(results_dict['sde_plus_1_context'])}} #}
{# {{ print(results_dict['sde_plus_2_context'])}} #}
{# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #}
{# {{ print(results_dict['context_only'])}} #}
{# {{ print(results_dict['multiple_base_events'])}} #}
{# {{ print(results_dict['multiple_sde_events'])}} #}
{{ dbt_unittest.assert_dict_equals(expected_dict, results_dict) }}
{% endmacro %}
{% macro snowflake__test_normalize_events() %}
{% set expected_dict = {
Expand Down
Loading

0 comments on commit 541e2e0

Please sign in to comment.