Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate downstream analytics #14

Merged
merged 6 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytics_dbt/target/manifest.json

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions dagster_proj/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
import os
import os

from dagster import Definitions, load_assets_from_modules, EnvVar

from .assets import activities, dbt, energy_prediction, asset_checks
from .assets import activities, dbt, energy_prediction, weekly_totals, asset_checks
from .resources import database_resource, dbt_resource, strava_api_resouce
from .jobs import activities_update_job
from .schedules import activities_update_schedule

DAGSTER_ENVIRONMENT = EnvVar('DAGSTER_ENVIRONMENT').get_value()
DAGSTER_ENVIRONMENT = EnvVar("DAGSTER_ENVIRONMENT").get_value()

# load in assets from assets/
activities_assets = load_assets_from_modules([activities])
ml_assets = load_assets_from_modules([energy_prediction], group_name="ml_pipeline")
dashboard_assets = load_assets_from_modules([weekly_totals], group_name="analytics")
analytics_dbt_assets = load_assets_from_modules([dbt], group_name="dbt_duckdb")

# load in jobs from jobs/
all_jobs = [activities_update_job]
all_schedules = [activities_update_schedule]

defs = Definitions(
assets=activities_assets + ml_assets + analytics_dbt_assets + ml_assets,
asset_checks=asset_checks, # defined in assets/__init__.py
assets=activities_assets
+ ml_assets
+ analytics_dbt_assets
+ ml_assets
+ dashboard_assets,
asset_checks=asset_checks, # defined in assets/__init__.py
resources={
"database": database_resource[DAGSTER_ENVIRONMENT],
"dbt": dbt_resource,
Expand All @@ -29,4 +34,3 @@
jobs=all_jobs,
schedules=all_schedules,
)

61 changes: 58 additions & 3 deletions dagster_proj/assets/energy_prediction.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from pathlib import Path

from dagster import (
asset,
asset_check,
Expand All @@ -13,6 +16,7 @@

from ..utils import dynamic_query

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
Expand All @@ -25,6 +29,8 @@
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor

import plotly.graph_objects as go

logger = get_dagster_logger()

DAGSTER_ENVIRONMENT = EnvVar("DAGSTER_ENVIRONMENT").get_value()
Expand Down Expand Up @@ -153,7 +159,7 @@ def trained_model(training_data, random_forest_pipeline):
return rf_pipeline


@asset
@multi_asset(outs={"results_df": AssetOut(), "model_evaluation": AssetOut()})
def evaluate_model(trained_model, test_data):
"""Predict and evaluate on testing data"""
X_test, y_test = test_data
Expand All @@ -163,6 +169,55 @@ def evaluate_model(trained_model, test_data):
logger.info(f"Mean Squared Error: {mse}")
logger.info(f"R-squared Score: {r2}")

return MaterializeResult(
metadata={"mse": MetadataValue.float(mse), "r2": MetadataValue.float(r2)}
results_df = pd.DataFrame(X_test)
results_df['Predicted'] = y_pred

return (
results_df,
MaterializeResult(
asset_key="model_evaluation",
metadata={"mse": MetadataValue.float(mse), "r2": MetadataValue.float(r2)}
)
)

@asset
def ml_results_scatter_plot(context: AssetExecutionContext, results_df):
"""Scatterplot of X_test and y_pred"""
fig = go.Figure()

fig.add_trace(go.Scatter(
x=results_df.index,
y=results_df.iloc[:, 0],
mode='markers',
name='X_test',
marker=dict(size=8)
))

fig.add_trace(go.Scatter(
x=results_df.index,
y=results_df['Predicted'],
mode='markers',
name='Predicted',
marker=dict(color='red', size=10)
))

fig.update_layout(
title='Scatter Plot of X_test and Predicted Values',
xaxis_title='Index',
yaxis_title='Values',
legend=dict(title='Legend'),
template='plotly_white'
)

# path to save the HTML file
save_chart_path = Path(context.instance.storage_directory()).joinpath("ml_results.html")

# save the figure as an HTML file
fig.write_html(save_chart_path, auto_open=False)

# add metadata to make the HTML file accessible from the Dagster UI
context.add_output_metadata({
"plot_url": MetadataValue.url("file://" + os.fspath(save_chart_path))
})

return fig
177 changes: 177 additions & 0 deletions dagster_proj/assets/weekly_totals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import os
from datetime import datetime, timedelta
from pathlib import Path

from dagster import (
asset,
get_dagster_logger,
EnvVar,
AssetExecutionContext,
MetadataValue
)

import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio

from ..utils import dynamic_query

DAGSTER_ENVIRONMENT = EnvVar("DAGSTER_ENVIRONMENT").get_value()

logger = get_dagster_logger()

@asset(
deps=["sport_type_weekly_totals"],
compute_kind="Snowflake",
required_resource_keys={"database"},
)
def sport_type_totals(
context: AssetExecutionContext,
):
"""Gets weekly sport_type totals data"""
query = """
select *
from dbt.sport_type_weekly_totals
"""
df = dynamic_query(
dagster_environment=DAGSTER_ENVIRONMENT,
context=context,
query=query,
)

context.log.info(f"Size of data: {df.shape}")

return df

@asset
def sports_activity_dashboard(context: AssetExecutionContext, sport_type_totals):
df = sport_type_totals

# week_start to datetime
df['week_start'] = pd.to_datetime(df['week_start'])

most_recent_date = df['week_start'].max()

# get the last 8 weeks
eight_weeks_ago = most_recent_date - timedelta(weeks=7)
df = df[df['week_start'] > eight_weeks_ago]

# totals
total_prs = df['pr_count'].sum()
total_achievements = df['achievement_count'].sum()

pio.templates.default = "plotly"

# main plotly dashboard
fig = make_subplots(
rows=4, cols=2,
specs=[
[{"type": "indicator"}, {"type": "indicator"}],
[{"type": "pie"}, {"type": "scatter"}],
[{"type": "bar", "colspan": 2}, None],
[{"type": "bar", "colspan": 2}, None]
],
subplot_titles=("Total PRs", "Total Achievements",
"Sport Type Distribution", "Distance per Week",
"Total Hours per Week",
"Number of Activities per Week"),
vertical_spacing=0.1,
row_heights=[0.2, 0.3, 0.25, 0.25]
)

# 1. Total PRs indicator
fig.add_trace(
go.Indicator(
mode="number",
value=total_prs,
# title={"text": "Total PRs", "font": {"size": 20}},
number={"font": {"size": 40}},
domain={"row": 0, "column": 0}
),
row=1, col=1
)

# 2. Total Achievements indicator
fig.add_trace(
go.Indicator(
mode="number",
value=total_achievements,
# title={"text": "Total Achievements", "font": {"size": 20}},
number={"font": {"size": 40}},
domain={"row": 0, "column": 1}
),
row=1, col=2
)

# 3. Pie chart for sport type distribution
sport_type_counts = df['sport_type'].value_counts()
fig.add_trace(
go.Pie(labels=sport_type_counts.index, values=sport_type_counts.values, name="Sport Types"),
row=2, col=1
)

# 4. Line graph for distance per week with average
df_weekly = df.groupby('week_start').agg({
'distance_miles': 'sum',
'moving_time_minutes': 'sum',
'num_activities': 'sum'
}).reset_index()

avg_distance = df_weekly['distance_miles'].mean()

fig.add_trace(
go.Scatter(x=df_weekly['week_start'], y=df_weekly['distance_miles'],
name="Distance (miles)", line=dict(color='red')),
row=2, col=2
)
fig.add_trace(
go.Scatter(x=[df_weekly['week_start'].min(), df_weekly['week_start'].max()],
y=[avg_distance, avg_distance],
name="Avg Distance", line=dict(color='red', dash='dash')),
row=2, col=2
)

# 5. Bar chart for total hours per week with average
df_weekly['total_hours'] = df_weekly['moving_time_minutes'] / 60
avg_hours = df_weekly['total_hours'].mean()

fig.add_trace(
go.Bar(x=df_weekly['week_start'], y=df_weekly['total_hours'], name="Total Hours"),
row=3, col=1
)
fig.add_trace(
go.Scatter(x=[df_weekly['week_start'].min(), df_weekly['week_start'].max()],
y=[avg_hours, avg_hours],
name="Avg Hours", line=dict(color='orange', dash='dash')),
row=3, col=1
)

#6. Bar chart of total activities per week
fig.add_trace(
go.Bar(x=df_weekly['week_start'], y=df_weekly['num_activities'], name="Number of Activities"),
row=4, col=1
)

fig.update_xaxes(title_text="Week", row=4, col=1)
fig.update_yaxes(title_text="Number of Activities", row=4, col=1)

# final layout updates
fig.update_layout(height=1200, width=1200, title_text="Activity Totals (Last 8 Weeks)")
fig.update_xaxes(title_text="Week", row=3, col=1)
fig.update_yaxes(title_text="Hours", row=3, col=1)
fig.update_xaxes(title_text="Week", row=2, col=2)
fig.update_yaxes(title_text="Distance (miles)", row=2, col=2)

# HTML file path to save
save_chart_path = Path(context.instance.storage_directory()).joinpath("sports_activity_dashboard.html")

# save file
fig.write_html(save_chart_path, auto_open=False)

# this is needed to allow generated filepath to be accessible to UI
context.add_output_metadata({
"plot_url": MetadataValue.url("file://" + os.fspath(save_chart_path))
})

return fig
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"dagster-cloud>=1.9.3",
"dbt-snowflake>=1.8.4",
"dagster-snowflake>=0.25.3",
"plotly>=5.24.1",
]

[project.optional-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ packaging==24.2
# dagster-dbt
# dbt-core
# dlt
# plotly
# requirements-parser
# snowflake-connector-python
# streamlit
Expand All @@ -310,6 +311,8 @@ pillow==11.0.0
# via streamlit
platformdirs==4.3.6
# via snowflake-connector-python
plotly==5.24.1
# via dagster-proj (pyproject.toml)
pluggy==1.5.0
# via dlt
ply==3.11
Expand Down Expand Up @@ -475,6 +478,7 @@ tabulate==0.9.0
tenacity==9.0.0
# via
# dlt
# plotly
# streamlit
text-unidecode==1.3
# via python-slugify
Expand Down
15 changes: 15 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading