From 3b21ac2b7388072a611811ff979798df97e973f3 Mon Sep 17 00:00:00 2001 From: Devin Matte Date: Sun, 9 Jun 2024 19:41:00 -0400 Subject: [PATCH 1/3] Support LAMP based monthly csvs --- .../chalicelib/historic/backfill/main.py | 8 ++++++-- .../chalicelib/historic/constants.py | 17 +++++++++++++++-- mbta-performance/chalicelib/historic/process.py | 11 +++++++---- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/mbta-performance/chalicelib/historic/backfill/main.py b/mbta-performance/chalicelib/historic/backfill/main.py index 1093054..1861d43 100644 --- a/mbta-performance/chalicelib/historic/backfill/main.py +++ b/mbta-performance/chalicelib/historic/backfill/main.py @@ -1,4 +1,4 @@ -from ..constants import ARCGIS_IDS +from ..constants import ARCGIS_IDS, HISTORIC_COLUMNS_LAMP, HISTORIC_COLUMNS_PRE_LAMP from ..download import download_historic_data, list_files_in_dir, prep_local_dir, unzip_historic_data from ..process import process_events @@ -11,7 +11,11 @@ def backfill_single_year(year: str): input_dir = unzip_historic_data(zip_file, f"data/input/{year}") # process the data for file in list_files_in_dir(input_dir): - process_events(file, "data/output") + # in 2024 data moved to LAMP and the format changed + if int(year) >= 2024: + process_events(file, "data/output", columns=HISTORIC_COLUMNS_LAMP) + else: + process_events(file, "data/output", columns=HISTORIC_COLUMNS_PRE_LAMP) print(f"Finished backfilling year {year}") diff --git a/mbta-performance/chalicelib/historic/constants.py b/mbta-performance/chalicelib/historic/constants.py index 0c869b7..f115e88 100644 --- a/mbta-performance/chalicelib/historic/constants.py +++ b/mbta-performance/chalicelib/historic/constants.py @@ -7,11 +7,11 @@ "2021": "611b8c77f30245a0af0c62e2859e8b49", "2022": "99094a0c59e443cdbdaefa071c6df609", "2023": "9a7f5634db72459ab731b6a9b274a1d4", - "2024": "4adbec39db40498a8530496d8c63a924", + "2024": "0711756aa5e1400891e79b984a94b495", } -HISTORIC_COLUMNS = [ +HISTORIC_COLUMNS_PRE_LAMP = [ "service_date", "route_id", "trip_id", @@ -23,3 +23,16 @@ "event_type", "event_time_sec", ] + +HISTORIC_COLUMNS_LAMP = [ + "service_date", + "route_id", + "trip_id", + "direction_id", + "stop_id", + "sync_stop_sequence", + "vehicle_id", + "vehicle_label", + "event_type", + "event_time_sec", +] diff --git a/mbta-performance/chalicelib/historic/process.py b/mbta-performance/chalicelib/historic/process.py index 02a8206..cc2ce7f 100644 --- a/mbta-performance/chalicelib/historic/process.py +++ b/mbta-performance/chalicelib/historic/process.py @@ -1,13 +1,13 @@ import pandas as pd import pathlib -from .constants import HISTORIC_COLUMNS +from .constants import HISTORIC_COLUMNS_PRE_LAMP as HISTORIC_COLUMNS from .gtfs_archive import add_gtfs_headways -def process_events(input_csv: str, outdir: str, nozip: bool = False): +def process_events(input_csv: str, outdir: str, nozip: bool = False, columns: list = HISTORIC_COLUMNS): df = pd.read_csv( input_csv, - usecols=HISTORIC_COLUMNS, + usecols=columns, parse_dates=["service_date"], dtype={ "route_id": "str", @@ -22,13 +22,16 @@ def process_events(input_csv: str, outdir: str, nozip: bool = False): df["event_time"] = df["service_date"] + pd.to_timedelta(df["event_time_sec"], unit="s") df.drop(columns=["event_time_sec"], inplace=True) + if "sync_stop_sequence" in df.columns: + df.rename(columns={"sync_stop_sequence": "stop_sequence"}, inplace=True) + try: df = add_gtfs_headways(df) except IndexError: # failure to add gtfs benchmarks pass - service_date_month = pd.Grouper(key="service_date", freq="1M") + service_date_month = pd.Grouper(key="service_date", freq="1ME") grouped = df.groupby([service_date_month, "stop_id"]) for name, events in grouped: From 8721f9dc1b85c5161f8dff9e4dcc114ddec82ea0 Mon Sep 17 00:00:00 2001 From: Devin Matte Date: Fri, 21 Jun 2024 20:15:08 -0400 Subject: [PATCH 2/3] Up timeout --- mbta-performance/.chalice/config.json | 2 +- mbta-performance/chalicelib/historic/process.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/mbta-performance/.chalice/config.json b/mbta-performance/.chalice/config.json index 89fefce..b0a3cc6 100644 --- a/mbta-performance/.chalice/config.json +++ b/mbta-performance/.chalice/config.json @@ -20,7 +20,7 @@ "lambda_functions": { "process_daily_lamp": { "iam_policy_file": "policy-lamp-ingest.json", - "lambda_timeout": 900, + "lambda_timeout": 2000, "lambda_memory_size": 2048 } } diff --git a/mbta-performance/chalicelib/historic/process.py b/mbta-performance/chalicelib/historic/process.py index cc2ce7f..5d9caaf 100644 --- a/mbta-performance/chalicelib/historic/process.py +++ b/mbta-performance/chalicelib/historic/process.py @@ -31,6 +31,14 @@ def process_events(input_csv: str, outdir: str, nozip: bool = False, columns: li # failure to add gtfs benchmarks pass + # Write to disk + to_disk(df, outdir, nozip) + + +def to_disk(df: pd.DataFrame, outdir, nozip=False): + """ + For each service_date/stop_id/direction/route group, we write the events to disk. + """ service_date_month = pd.Grouper(key="service_date", freq="1ME") grouped = df.groupby([service_date_month, "stop_id"]) From fc510170fc3f7f4dc4d2e71a09dbe94caa7ef08d Mon Sep 17 00:00:00 2001 From: Devin Matte Date: Fri, 21 Jun 2024 20:20:50 -0400 Subject: [PATCH 3/3] Reduce timeout --- mbta-performance/.chalice/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mbta-performance/.chalice/config.json b/mbta-performance/.chalice/config.json index b0a3cc6..89fefce 100644 --- a/mbta-performance/.chalice/config.json +++ b/mbta-performance/.chalice/config.json @@ -20,7 +20,7 @@ "lambda_functions": { "process_daily_lamp": { "iam_policy_file": "policy-lamp-ingest.json", - "lambda_timeout": 2000, + "lambda_timeout": 900, "lambda_memory_size": 2048 } }