Skip to content

Commit

Permalink
Handle non 200 responses
Browse files Browse the repository at this point in the history
  • Loading branch information
devinmatte committed Apr 15, 2024
1 parent fe1a874 commit 883614d
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 149 deletions.
3 changes: 3 additions & 0 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"recommendations": ["ms-python.black-formatter", "ms-python.flake8"]
}
8 changes: 8 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"python.formatting.blackPath": ".venv/bin/black",
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# 🏮 MBTA Performance Processing (LAMPLighter)

Scripts for processing MBTA performance data both from LAMP and from monthly historical files
Scripts for processing MBTA performance data both from LAMP and from monthly historical files

## Requirements to develop locally

- Python 3.12 with recent poetry (1.7.0 or later)
- Verify with `python --version && poetry --version`
- `poetry self update` to update poetry

## Instructions to run locally

1. Add your AWS credentials (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) to your shell environment, OR add them to a .boto config file with awscli command `aws configure`.
3 changes: 2 additions & 1 deletion mbta-performance/.chalice/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"DD_SITE": "datadoghq.com",
"DD_ENV": "prod",
"DD_SERVICE": "mbta-performance",
"DD_TRACE_ENABLED": "true"
"DD_TRACE_ENABLED": "true",
"DD_PROFILING_ENABLED": "true"
},
"tags": {
"service": "mbta-performance",
Expand Down
2 changes: 1 addition & 1 deletion mbta-performance/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# Runs every 60 minutes from either 4 AM -> 1:55AM or 5 AM -> 2:55 AM depending on DST
@app.schedule(Cron("0", "0-6,9-23", "*", "*", "?", "*"))
def process_daily_lamp(event):
lamp.ingest_lamp_data()
lamp.ingest_today_lamp_data()
4 changes: 2 additions & 2 deletions mbta-performance/chalicelib/lamp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__all__ = ["ingest_lamp_data"]
__all__ = ["ingest_today_lamp_data"]

from .ingest import ingest_lamp_data
from .ingest import ingest_today_lamp_data
31 changes: 31 additions & 0 deletions mbta-performance/chalicelib/lamp/backfill/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pandas as pd
from ..ingest import LAMP_INDEX_URL, fetch_pq_file_from_remote, ingest_pq_file, upload_to_s3
from ... import parallel


_parallel_upload = parallel.make_parallel(upload_to_s3)


def backfill_all_in_index():
"""Backfill all the dates in the LAMP index."""

# Load the index
index = pd.read_csv(LAMP_INDEX_URL)
# Get the dates in the index
dates = pd.to_datetime(index["service_date"]).dt.date
# Backfill each date
for date_to_backfill in dates:
try:
pq_df = fetch_pq_file_from_remote(date_to_backfill)
except ValueError as e:
# If we can't fetch the file, we can't process it
print(f"Failed to fetch {date_to_backfill}: {e}")
processed_daily_events = ingest_pq_file(pq_df)

# split daily events by stop_id and parallel upload to s3
stop_event_groups = processed_daily_events.groupby("stop_id")
_parallel_upload(stop_event_groups, date_to_backfill)


if __name__ == "__main__":
backfill_all_in_index()
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@


def service_date(ts: datetime) -> date:
"""
Return the service date for a given timestamp in Eastern time.
"""
# In practice a None TZ is UTC, but we want to be explicit
# In many places we have an implied eastern
ts = ts.replace(tzinfo=EASTERN_TIME)
Expand All @@ -17,6 +20,9 @@ def service_date(ts: datetime) -> date:


def get_current_service_date() -> date:
"""
Returns the current service date in Eastern time.
"""
return service_date(datetime.now(EASTERN_TIME))


Expand Down
17 changes: 13 additions & 4 deletions mbta-performance/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import pandas as pd

from .utils import format_dateint, get_current_service_date
from .date import format_dateint, get_current_service_date
from .. import parallel
from .. import s3

Expand Down Expand Up @@ -108,6 +108,10 @@ def fetch_pq_file_from_remote(service_date: date) -> pd.DataFrame:
# TODO(check if file exists in index, throw if it doesn't)
url = RAPID_DAILY_URL_TEMPLATE.format(YYYY_MM_DD=service_date.strftime("%Y-%m-%d"))
result = requests.get(url)

if result.status_code != 200:
raise ValueError(f"Failed to fetch LAMP parquet file from {url}. Status code: {result.status_code}")

return pd.read_parquet(io.BytesIO(result.content), columns=INPUT_COLUMNS, engine="pyarrow")


Expand Down Expand Up @@ -135,10 +139,15 @@ def upload_to_s3(stop_id_and_events: Tuple[str, pd.DataFrame], service_date: dat
_parallel_upload = parallel.make_parallel(upload_to_s3)


def ingest_lamp_data():
def ingest_today_lamp_data():
"""Ingest and upload today's LAMP data."""
service_date = get_current_service_date()
pq_df = fetch_pq_file_from_remote(service_date)
try:
pq_df = fetch_pq_file_from_remote(service_date)
except ValueError as e:
# If we can't fetch the file, we can't process it
print(e)
return
processed_daily_events = ingest_pq_file(pq_df)

# split daily events by stop_id and parallel upload to s3
Expand All @@ -147,4 +156,4 @@ def ingest_lamp_data():


if __name__ == "__main__":
ingest_lamp_data()
ingest_today_lamp_data()
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import date, datetime
from ..utils import EASTERN_TIME, service_date
from ..date import EASTERN_TIME, service_date


def test_service_date():
Expand Down
Loading

0 comments on commit 883614d

Please sign in to comment.