Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add ruff #7

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ logs
dagster_proj.egg-info
.vscode
*.pyc
__pyache__
__pyache__
.ruff_cache
.DS_Store
2 changes: 1 addition & 1 deletion analytics_dbt/target/manifest.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dagster_proj/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
from .schedules import activities_update_schedule

activities_assets = load_assets_from_modules([activities])
ml_assets = load_assets_from_modules([energy_prediction], group_name='ml_pipeline')
analytics_dbt_assets = load_assets_from_modules([dbt], group_name='dbt_duckdb')
ml_assets = load_assets_from_modules([energy_prediction], group_name="ml_pipeline")
analytics_dbt_assets = load_assets_from_modules([dbt], group_name="dbt_duckdb")

all_jobs = [activities_update_job]
all_schedules = [activities_update_schedule]

defs = Definitions(
assets=activities_assets+ml_assets+analytics_dbt_assets+ml_assets,
assets=activities_assets + ml_assets + analytics_dbt_assets + ml_assets,
resources={
"database": database_resource,
"dbt": dbt_resource,
'strava': strava_api_resouce,
"strava": strava_api_resouce,
},
jobs=all_jobs,
schedules=all_schedules,
)
)
34 changes: 15 additions & 19 deletions dagster_proj/assets/activities.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
from dagster import (
asset,
EnvVar,
load_assets_from_modules,
asset,
EnvVar,
get_dagster_logger,
AssetCheckExecutionContext,
)
from dagster_dbt import get_asset_key_for_source
import dlt
from dlt.sources.helpers import requests
from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources
import pendulum
import sys

from ..resources import StravaAPIResource, strava_api_resouce

logger = get_dagster_logger()


@dlt.source
def strava_rest_api_config(strava_resource: StravaAPIResource):
logger.info("Extracting Strava data source")
access_token = strava_api_resouce.get_access_token()
access_token = strava_resource.get_access_token()

config: RESTAPIConfig = {
"client": {
Expand All @@ -28,11 +24,7 @@ def strava_rest_api_config(strava_resource: StravaAPIResource):
"type": "bearer",
"token": access_token,
},
"paginator": {
"type": "page_number",
"base_page": 1,
"total_path": None
},
"paginator": {"type": "page_number", "base_page": 1, "total_path": None},
},
"resources": [
{
Expand All @@ -43,31 +35,35 @@ def strava_rest_api_config(strava_resource: StravaAPIResource):
"type": "incremental",
"cursor_path": "start_date_local",
"initial_value": "2010-01-01 00:00:00+00:00",
"convert": lambda dt_str: int(pendulum.parse(dt_str).timestamp()),
"convert": lambda dt_str: int(
pendulum.parse(dt_str).timestamp()
),
},
}
}
},
"primary_key": "id",
"write_disposition": "merge",
}
]
],
}

logger.info("RESTAPIConfig set up, starting to yield resources...")

yield from rest_api_resources(config)

@asset(key=['strava', 'activities'], group_name='dltHub')

@asset(key=["strava", "activities"], group_name="dltHub")
def load_strava_activities():
"""
dlt EL pipeline based off declarative Rest API Config
to load raw Strava activities into DuckDB
"""
pipeline = dlt.pipeline(
pipeline_name="strava_rest_config",
pipeline_name="strava_rest_config",
destination=dlt.destinations.duckdb(EnvVar("DUCKDB_DATABASE").get_value()),
dataset_name="activities",
progress="log")
progress="log",
)

source = strava_rest_api_config(strava_api_resouce)

Expand Down
9 changes: 3 additions & 6 deletions dagster_proj/assets/dbt.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import json

from dagster import AssetExecutionContext, AssetKey
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets

from ..project import dbt_project


@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=DagsterDbtTranslator()
manifest=dbt_project.manifest_path, dagster_dbt_translator=DagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
102 changes: 66 additions & 36 deletions dagster_proj/assets/energy_prediction.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from dagster import (
asset,
AssetKey,
multi_asset,
asset,
multi_asset,
AssetOut,
MaterializeResult,
MetadataValue,
get_dagster_logger
)
MaterializeResult,
MetadataValue,
get_dagster_logger,
)

from dagster_duckdb import DuckDBResource

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
Expand All @@ -25,6 +23,7 @@

logger = get_dagster_logger()


@asset(
deps=["fct_activities"],
compute_kind="DuckDB",
Expand All @@ -47,43 +46,76 @@ def cycling_data(database: DuckDBResource):
"""
with database.get_connection() as conn:
df = conn.execute(query).fetch_df()
logger.info(f'Size of data: {df.shape}')
logger.info(f"Size of data: {df.shape}")
return df


@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def preprocess_data(cycling_data):
"""Preprocesses data (train/test split of X/y)"""
X = cycling_data.loc[:, ~cycling_data.columns.isin(['kilojoules'])]
y = np.sqrt(cycling_data.loc[:, cycling_data.columns.isin(['kilojoules'])]).values.ravel()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1998)
X = cycling_data.loc[:, ~cycling_data.columns.isin(["kilojoules"])]
y = np.sqrt(
cycling_data.loc[:, cycling_data.columns.isin(["kilojoules"])]
).values.ravel()
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=1998
)
return (X_train, y_train), (X_test, y_test)


@asset
def sklearn_preprocessor():
"""Implement Sklearn data preprocessor pipeline"""
sqrt_pipeline = Pipeline([
('impute_regression', IterativeImputer(estimator=LinearRegression())),
('sqrt_transform', FunctionTransformer(np.sqrt, feature_names_out='one-to-one')),
('standard_scaler', StandardScaler())
])

numeric_pipeline = Pipeline([
('impute_regression', IterativeImputer(estimator=LinearRegression())),
('standard_scaler', StandardScaler())
])

return ColumnTransformer([
('nums', numeric_pipeline, ['distance', 'moving_time', 'average_speed', 'average_watts', 'weighted_average_watts']),
('sqrt_transform', sqrt_pipeline, ['total_elevation_gain'])
], remainder='drop')
sqrt_pipeline = Pipeline(
[
("impute_regression", IterativeImputer(estimator=LinearRegression())),
(
"sqrt_transform",
FunctionTransformer(np.sqrt, feature_names_out="one-to-one"),
),
("standard_scaler", StandardScaler()),
]
)

numeric_pipeline = Pipeline(
[
("impute_regression", IterativeImputer(estimator=LinearRegression())),
("standard_scaler", StandardScaler()),
]
)

return ColumnTransformer(
[
(
"nums",
numeric_pipeline,
[
"distance",
"moving_time",
"average_speed",
"average_watts",
"weighted_average_watts",
],
),
("sqrt_transform", sqrt_pipeline, ["total_elevation_gain"]),
],
remainder="drop",
)


@asset
def random_forest_pipeline(sklearn_preprocessor):
"""Sklearn preprocessor + random forest pipeline"""
return Pipeline([
('preprocess', sklearn_preprocessor),
('random_forest', RandomForestRegressor(n_estimators=100, random_state=1998)) # hehe :3
])
return Pipeline(
[
("preprocess", sklearn_preprocessor),
(
"random_forest",
RandomForestRegressor(n_estimators=100, random_state=1998),
), # hehe :3
]
)


@asset
def trained_model(training_data, random_forest_pipeline):
Expand All @@ -93,6 +125,7 @@ def trained_model(training_data, random_forest_pipeline):
rf_pipeline.fit(X_train, y_train)
return rf_pipeline


@asset
def evaluate_model(trained_model, test_data):
"""Predict and evaluate on testing data"""
Expand All @@ -102,10 +135,7 @@ def evaluate_model(trained_model, test_data):
r2 = r2_score(y_test, y_pred)
logger.info(f"Mean Squared Error: {mse}")
logger.info(f"R-squared Score: {r2}")

return MaterializeResult(
metadata={
"mse": MetadataValue.float(mse),
"r2": MetadataValue.float(r2)
}
metadata={"mse": MetadataValue.float(mse), "r2": MetadataValue.float(r2)}
)
5 changes: 2 additions & 3 deletions dagster_proj/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import define_asset_job, AssetSelection

activities_update_job = define_asset_job(
name="activities_update_job",
selection=AssetSelection.all()
)
name="activities_update_job", selection=AssetSelection.all()
)
4 changes: 2 additions & 2 deletions dagster_proj/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from dagster_dbt import DbtProject

dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "analytics_dbt").resolve(),
project_dir=Path(__file__).joinpath("..", "..", "analytics_dbt").resolve(),
)

dbt_project.prepare_if_dev()

project_dir = str(Path(__file__).parent.parent.joinpath("analytics_dbt").resolve())
project_dir = str(Path(__file__).parent.parent.joinpath("analytics_dbt").resolve())
2 changes: 1 addition & 1 deletion dagster_proj/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

dbt_resource = DbtCliResource(
project_dir=dbt_project,
profiles_dir='analytics_dbt',
profiles_dir="analytics_dbt",
)

database_resource = DuckDBResource(
Expand Down
16 changes: 7 additions & 9 deletions dagster_proj/resources/configured_resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import ConfigurableResource, asset, Definitions
from dagster import ConfigurableResource
import requests
from typing import Dict, Any

class StravaAPIResource(ConfigurableResource):
client_id: str
Expand All @@ -16,12 +15,11 @@ def get_access_token(self) -> str:
def _refresh_access_token(self) -> str:
auth_url = "https://www.strava.com/oauth/token"
payload = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token'
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
}
response = requests.post(auth_url, data=payload, timeout=100)
response.raise_for_status()
return response.json()['access_token']

response.raise_for_status()
return response.json()["access_token"]
4 changes: 2 additions & 2 deletions dagster_proj/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

activities_update_schedule = ScheduleDefinition(
job=activities_update_job,
cron_schedule="0 0 * * *", # every day, at midnight
)
cron_schedule="0 0 * * *", # every day, at midnight
)
Loading
Loading