Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign nullable dtypes to dataframe columns #7

Merged
merged 5 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions mbta-performance/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,23 @@ def _local_save(s3_key, stop_events):
def _process_arrival_departure_times(pq_df: pd.DataFrame) -> pd.DataFrame:
"""Process and collate arrivals and departures for a timetable of events.

Before: TODO add example
After: TODO add example
This does two things:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for documenting everything 👏

1. Convert Epoch Unix timestamps to Eastern Time-datetimes
2. Set departures' stop id's to that of their previous stops (see below example)
Before (Green-D):
STOP_SEQUENCE | STOP | ARRIVAL | DEPARTURE
360 | place-newto | 4:55:59 AM | (blah)
370 | place-chhil | 4:59:36 AM | 4:56:41 AM
380 | place-rsmnl | (blah) | 5:00:30 AM

It's more interpretable to move up the departures so they act as follows:

After (Green-D):
STOP_SEQUENCE | STOP | ARRIVAL | DEPARTURE
360 | place-newto | 4:55:59 AM | 4:56:41 AM
370 | place-chhil | 4:59:36 AM | 5:00:30 AM
380 | place-rsmnl | (blah) | (blah)
"""
# NB: While generally, we can trust df dtypes fetched from parquet files as the files are compressed with columnar metadata,
# theres some numerical imprecisions that numpy seem to be throwing on M1 machines
# that are affecting how epoch timestamps are being cased to datetimes. Maybe not a problem on the AWS machines, though?
pq_df["dep_time"] = pd.to_datetime(pq_df["move_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")
pq_df["arr_time"] = pd.to_datetime(pq_df["stop_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")

Expand Down Expand Up @@ -112,7 +123,16 @@ def fetch_pq_file_from_remote(service_date: date) -> pd.DataFrame:
if result.status_code != 200:
raise ValueError(f"Failed to fetch LAMP parquet file from {url}. Status code: {result.status_code}")

return pd.read_parquet(io.BytesIO(result.content), columns=INPUT_COLUMNS, engine="pyarrow")
return pd.read_parquet(
io.BytesIO(result.content),
columns=INPUT_COLUMNS,
engine="pyarrow",
# NB: Even through parquet files are compressed with columnar metadata, pandas will sometimes override them
# if the columns contain nulls. This is important as the move/stop times are nullable int64 epoch timestamps,
# which will overflow if read in as floats.
# https://pandas.pydata.org/docs/user_guide/integer_na.html#nullable-integer-data-type
dtype_backend="numpy_nullable",
)


def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -125,7 +145,13 @@ def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:


def upload_to_s3(stop_id_and_events: Tuple[str, pd.DataFrame], service_date: date) -> None:
"""Upload events to s3 as a .csv file."""
"""Upload events to s3 as a .csv file.

Args:
stop_id_and_events: tuple of a stop id, and all the arrival/departure events that occured at this
stop over the course of a day (so far)
service day: service date corresponding to the events.
"""
# unpack from iterable
stop_id, stop_events = stop_id_and_events

Expand Down
Binary file not shown.
52 changes: 52 additions & 0 deletions mbta-performance/chalicelib/lamp/tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import date
import unittest
from unittest import mock

import pandas as pd

from .. import ingest


DATA_PATH = "mbta-performance/chalicelib/lamp/tests/sample_data/2024-02-07-subway-on-time-performance-v1.parquet"


class TestIngest(unittest.TestCase):
def setUp(self):
with open(DATA_PATH, "rb") as f:
self.data = f.read()

def _mock_s3_upload(self):
# mock upload of s3.upload_df_as_csv() to a fake bucket
pass

def test__process_arrival_departure_times(self):
pass

def test_fetch_pq_file_from_remote(self):
mock_response = mock.Mock(status_code=200, content=self.data)
with mock.patch("requests.get", return_value=mock_response):
inital_df = ingest.fetch_pq_file_from_remote(date(2024, 2, 7))
self.assertListEqual(
list(inital_df.dtypes),
[
pd.Int64Dtype(), # service_date
"string[python]", # route_id
"string[python]", # trip_id
"string[python]", # stop_id
pd.BooleanDtype(), # direction_id
pd.Int16Dtype(), # stop_sequence
"string[python]", # vehicle_id
"string[python]", # vehicle_label
pd.Int64Dtype(), # move_timestamp
pd.Int64Dtype(), # stop_timestamp
],
)

def test_ingest_pq_file(self):
pass

def test_upload_to_s3(self):
pass

def test_ingest_today_lamp_data(self):
pass