Skip to content

Commit

Permalink
Merge pull request #10 from jairus-m/hotfix/missing-data
Browse files Browse the repository at this point in the history
Hotfix: address missing data
  • Loading branch information
jairus-m authored Dec 23, 2024
2 parents bc790e7 + 28c1ef2 commit a4dc220
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 13 deletions.
3 changes: 2 additions & 1 deletion analytics_dbt/models/marts/sport_type_weekly_totals.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
select
da.week_start
, da.sport_type
, count(fa.id) as num_activities
, sum(distance_miles) as distance_miles
, sum(moving_time_minutes) as moving_time_minutes
, sum(elapsed_time_minutes) as elapsed_time_minutes
Expand All @@ -13,4 +14,4 @@ from {{ ref('fct_activities') }} as fa
left join {{ ref('dim_activities') }} as da
on fa.id = da.id
group by week_start, sport_type
order by week_start desc
order by week_start desc, sport_type
2 changes: 1 addition & 1 deletion analytics_dbt/models/src/_src_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2

sources:
- name: strava
schema: activities_staging
schema: activities
tables:
- name: activities
description: Source data containing all activities and fields
Expand Down
2 changes: 1 addition & 1 deletion analytics_dbt/models/src/src_activities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ with raw_activities AS (
)

SELECT *
FROM raw_activities
FROM raw_activities
2 changes: 1 addition & 1 deletion analytics_dbt/target/manifest.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions analytics_dbt/tests/test_activity_counts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
with

activity_totals as (
select count(*) as total_activities from {{ ref('fct_activities') }}
)

select *
from activity_totals
where total_activities < 2000

14 changes: 14 additions & 0 deletions analytics_dbt/tests/test_cycling_counts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
with

cycling_totals as (
select count(*) as total_activities
from {{ ref('fct_activities') }} as fct
left join {{ ref('dim_activities') }} as dim
on fct.id = dim.id
where sport_type = 'Cycling'
and average_watts is not null
)

select *
from cycling_totals
where total_activities < 1000
5 changes: 3 additions & 2 deletions dagster_proj/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dagster import Definitions, load_assets_from_modules

from .assets import activities, dbt, energy_prediction
from .assets import activities, dbt, energy_prediction, asset_checks
from .resources import duckdb_resource, dbt_resource, strava_api_resouce
from .jobs import activities_update_job
from .schedules import activities_update_schedule
Expand All @@ -18,8 +18,9 @@

defs = Definitions(
assets=activities_assets + ml_assets + analytics_dbt_assets + ml_assets,
asset_checks=asset_checks, # defined in assets/__init__.py
resources={
"database": duckdb_resource,
"duckdb": duckdb_resource,
"dbt": dbt_resource,
"strava": strava_api_resouce,
},
Expand Down
11 changes: 11 additions & 0 deletions dagster_proj/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import sys
from dagster import AssetChecksDefinition

from .energy_prediction import *

asset_checks = [
getattr(sys.modules[__name__], name)
for name in dir(sys.modules[__name__])
if isinstance(getattr(sys.modules[__name__], name), AssetChecksDefinition)
]

27 changes: 20 additions & 7 deletions dagster_proj/assets/energy_prediction.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from dagster import (
asset,
asset_check,
AssetCheckResult,
multi_asset,
AssetOut,
MaterializeResult,
MetadataValue,
get_dagster_logger,
AssetExecutionContext
)

from dagster_duckdb import DuckDBResource

import numpy as np

from sklearn.model_selection import train_test_split
Expand All @@ -25,10 +26,11 @@


@asset(
deps=["fct_activities"],
deps=["fct_activities", "dim_activities"],
compute_kind="DuckDB",
required_resource_keys={"duckdb"},
)
def cycling_data(database: DuckDBResource):
def cycling_data(context: AssetExecutionContext): # duckdb matches the string, 'duckdb'defines in Definitions resource
"""Gets cycling data from DuckDB"""
query = """
select
Expand All @@ -40,15 +42,26 @@ def cycling_data(database: DuckDBResource):
, weighted_average_watts
, kilojoules
, average_heartrate
from strava.obt_clean_activities
where sport_type in ('Virtual Ride', 'Ride')
from strava.fct_activities as fct
left join strava.dim_activities as dim
on fct.id = dim.id
where sport_type = 'Cycling'
and average_watts is not null
"""
with database.get_connection() as conn:
with context.resources.duckdb.get_connection() as conn:
df = conn.execute(query).fetch_df()
logger.info(f"Size of data: {df.shape}")
return df

@asset_check(asset=cycling_data)
def check_cycling_data_size(cycling_data):
"""Check that the cycling data has more than 1500 rows."""
num_rows = cycling_data.shape[0]
passed = num_rows > 1500
return AssetCheckResult(
passed=passed,
metadata={"num_rows": num_rows}
)

@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def preprocess_data(cycling_data):
Expand Down
Binary file modified data/dev/strava.duckdb
Binary file not shown.

0 comments on commit a4dc220

Please sign in to comment.