Skip to content

Commit

Permalink
Merge pull request #8 from jairus-m/feat/add-ruff
Browse files Browse the repository at this point in the history
Feat/add ruff
  • Loading branch information
jairus-m authored Dec 21, 2024
2 parents 72ac665 + 24a28ff commit d8290eb
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 144 deletions.
Binary file removed .DS_Store
Binary file not shown.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ logs
dagster_proj.egg-info
.vscode
*.pyc
__pyache__
__pyache__
.ruff_cache
.DS_Store
2 changes: 1 addition & 1 deletion analytics_dbt/target/manifest.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dagster_proj/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
from .schedules import activities_update_schedule

activities_assets = load_assets_from_modules([activities])
ml_assets = load_assets_from_modules([energy_prediction], group_name='ml_pipeline')
analytics_dbt_assets = load_assets_from_modules([dbt], group_name='dbt_duckdb')
ml_assets = load_assets_from_modules([energy_prediction], group_name="ml_pipeline")
analytics_dbt_assets = load_assets_from_modules([dbt], group_name="dbt_duckdb")

all_jobs = [activities_update_job]
all_schedules = [activities_update_schedule]

defs = Definitions(
assets=activities_assets+ml_assets+analytics_dbt_assets+ml_assets,
assets=activities_assets + ml_assets + analytics_dbt_assets + ml_assets,
resources={
"database": database_resource,
"dbt": dbt_resource,
'strava': strava_api_resouce,
"strava": strava_api_resouce,
},
jobs=all_jobs,
schedules=all_schedules,
)
)
34 changes: 15 additions & 19 deletions dagster_proj/assets/activities.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
from dagster import (
asset,
EnvVar,
load_assets_from_modules,
asset,
EnvVar,
get_dagster_logger,
AssetCheckExecutionContext,
)
from dagster_dbt import get_asset_key_for_source
import dlt
from dlt.sources.helpers import requests
from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources
import pendulum
import sys

from ..resources import StravaAPIResource, strava_api_resouce

logger = get_dagster_logger()


@dlt.source
def strava_rest_api_config(strava_resource: StravaAPIResource):
logger.info("Extracting Strava data source")
access_token = strava_api_resouce.get_access_token()
access_token = strava_resource.get_access_token()

config: RESTAPIConfig = {
"client": {
Expand All @@ -28,11 +24,7 @@ def strava_rest_api_config(strava_resource: StravaAPIResource):
"type": "bearer",
"token": access_token,
},
"paginator": {
"type": "page_number",
"base_page": 1,
"total_path": None
},
"paginator": {"type": "page_number", "base_page": 1, "total_path": None},
},
"resources": [
{
Expand All @@ -43,31 +35,35 @@ def strava_rest_api_config(strava_resource: StravaAPIResource):
"type": "incremental",
"cursor_path": "start_date_local",
"initial_value": "2010-01-01 00:00:00+00:00",
"convert": lambda dt_str: int(pendulum.parse(dt_str).timestamp()),
"convert": lambda dt_str: int(
pendulum.parse(dt_str).timestamp()
),
},
}
}
},
"primary_key": "id",
"write_disposition": "merge",
}
]
],
}

logger.info("RESTAPIConfig set up, starting to yield resources...")

yield from rest_api_resources(config)

@asset(key=['strava', 'activities'], group_name='dltHub')

@asset(key=["strava", "activities"], group_name="dltHub")
def load_strava_activities():
"""
dlt EL pipeline based off declarative Rest API Config
to load raw Strava activities into DuckDB
"""
pipeline = dlt.pipeline(
pipeline_name="strava_rest_config",
pipeline_name="strava_rest_config",
destination=dlt.destinations.duckdb(EnvVar("DUCKDB_DATABASE").get_value()),
dataset_name="activities",
progress="log")
progress="log",
)

source = strava_rest_api_config(strava_api_resouce)

Expand Down
9 changes: 3 additions & 6 deletions dagster_proj/assets/dbt.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import json

from dagster import AssetExecutionContext, AssetKey
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets

from ..project import dbt_project


@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=DagsterDbtTranslator()
manifest=dbt_project.manifest_path, dagster_dbt_translator=DagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
102 changes: 66 additions & 36 deletions dagster_proj/assets/energy_prediction.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from dagster import (
asset,
AssetKey,
multi_asset,
asset,
multi_asset,
AssetOut,
MaterializeResult,
MetadataValue,
get_dagster_logger
)
MaterializeResult,
MetadataValue,
get_dagster_logger,
)

from dagster_duckdb import DuckDBResource

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
Expand All @@ -25,6 +23,7 @@

logger = get_dagster_logger()


@asset(
deps=["fct_activities"],
compute_kind="DuckDB",
Expand All @@ -47,43 +46,76 @@ def cycling_data(database: DuckDBResource):
"""
with database.get_connection() as conn:
df = conn.execute(query).fetch_df()
logger.info(f'Size of data: {df.shape}')
logger.info(f"Size of data: {df.shape}")
return df


@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def preprocess_data(cycling_data):
"""Preprocesses data (train/test split of X/y)"""
X = cycling_data.loc[:, ~cycling_data.columns.isin(['kilojoules'])]
y = np.sqrt(cycling_data.loc[:, cycling_data.columns.isin(['kilojoules'])]).values.ravel()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1998)
X = cycling_data.loc[:, ~cycling_data.columns.isin(["kilojoules"])]
y = np.sqrt(
cycling_data.loc[:, cycling_data.columns.isin(["kilojoules"])]
).values.ravel()
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=1998
)
return (X_train, y_train), (X_test, y_test)


@asset
def sklearn_preprocessor():
"""Implement Sklearn data preprocessor pipeline"""
sqrt_pipeline = Pipeline([
('impute_regression', IterativeImputer(estimator=LinearRegression())),
('sqrt_transform', FunctionTransformer(np.sqrt, feature_names_out='one-to-one')),
('standard_scaler', StandardScaler())
])

numeric_pipeline = Pipeline([
('impute_regression', IterativeImputer(estimator=LinearRegression())),
('standard_scaler', StandardScaler())
])

return ColumnTransformer([
('nums', numeric_pipeline, ['distance', 'moving_time', 'average_speed', 'average_watts', 'weighted_average_watts']),
('sqrt_transform', sqrt_pipeline, ['total_elevation_gain'])
], remainder='drop')
sqrt_pipeline = Pipeline(
[
("impute_regression", IterativeImputer(estimator=LinearRegression())),
(
"sqrt_transform",
FunctionTransformer(np.sqrt, feature_names_out="one-to-one"),
),
("standard_scaler", StandardScaler()),
]
)

numeric_pipeline = Pipeline(
[
("impute_regression", IterativeImputer(estimator=LinearRegression())),
("standard_scaler", StandardScaler()),
]
)

return ColumnTransformer(
[
(
"nums",
numeric_pipeline,
[
"distance",
"moving_time",
"average_speed",
"average_watts",
"weighted_average_watts",
],
),
("sqrt_transform", sqrt_pipeline, ["total_elevation_gain"]),
],
remainder="drop",
)


@asset
def random_forest_pipeline(sklearn_preprocessor):
"""Sklearn preprocessor + random forest pipeline"""
return Pipeline([
('preprocess', sklearn_preprocessor),
('random_forest', RandomForestRegressor(n_estimators=100, random_state=1998)) # hehe :3
])
return Pipeline(
[
("preprocess", sklearn_preprocessor),
(
"random_forest",
RandomForestRegressor(n_estimators=100, random_state=1998),
), # hehe :3
]
)


@asset
def trained_model(training_data, random_forest_pipeline):
Expand All @@ -93,6 +125,7 @@ def trained_model(training_data, random_forest_pipeline):
rf_pipeline.fit(X_train, y_train)
return rf_pipeline


@asset
def evaluate_model(trained_model, test_data):
"""Predict and evaluate on testing data"""
Expand All @@ -102,10 +135,7 @@ def evaluate_model(trained_model, test_data):
r2 = r2_score(y_test, y_pred)
logger.info(f"Mean Squared Error: {mse}")
logger.info(f"R-squared Score: {r2}")

return MaterializeResult(
metadata={
"mse": MetadataValue.float(mse),
"r2": MetadataValue.float(r2)
}
metadata={"mse": MetadataValue.float(mse), "r2": MetadataValue.float(r2)}
)
5 changes: 2 additions & 3 deletions dagster_proj/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import define_asset_job, AssetSelection

activities_update_job = define_asset_job(
name="activities_update_job",
selection=AssetSelection.all()
)
name="activities_update_job", selection=AssetSelection.all()
)
4 changes: 2 additions & 2 deletions dagster_proj/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from dagster_dbt import DbtProject

dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "analytics_dbt").resolve(),
project_dir=Path(__file__).joinpath("..", "..", "analytics_dbt").resolve(),
)

dbt_project.prepare_if_dev()

project_dir = str(Path(__file__).parent.parent.joinpath("analytics_dbt").resolve())
project_dir = str(Path(__file__).parent.parent.joinpath("analytics_dbt").resolve())
2 changes: 1 addition & 1 deletion dagster_proj/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

dbt_resource = DbtCliResource(
project_dir=dbt_project,
profiles_dir='analytics_dbt',
profiles_dir="analytics_dbt",
)

database_resource = DuckDBResource(
Expand Down
16 changes: 7 additions & 9 deletions dagster_proj/resources/configured_resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import ConfigurableResource, asset, Definitions
from dagster import ConfigurableResource
import requests
from typing import Dict, Any

class StravaAPIResource(ConfigurableResource):
client_id: str
Expand All @@ -16,12 +15,11 @@ def get_access_token(self) -> str:
def _refresh_access_token(self) -> str:
auth_url = "https://www.strava.com/oauth/token"
payload = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token'
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
}
response = requests.post(auth_url, data=payload, timeout=100)
response.raise_for_status()
return response.json()['access_token']

response.raise_for_status()
return response.json()["access_token"]
4 changes: 2 additions & 2 deletions dagster_proj/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

activities_update_schedule = ScheduleDefinition(
job=activities_update_job,
cron_schedule="0 0 * * *", # every day, at midnight
)
cron_schedule="0 0 * * *", # every day, at midnight
)
Loading

0 comments on commit d8290eb

Please sign in to comment.