diff --git a/parsers/US_PJM.py b/parsers/US_PJM.py index 4708d769cd..7a7e4b04ea 100644 --- a/parsers/US_PJM.py +++ b/parsers/US_PJM.py @@ -2,19 +2,21 @@ """Parser for the PJM area of the United States.""" -import json import re -from datetime import datetime +from datetime import datetime, timedelta from logging import Logger, getLogger from typing import List, Optional, Union import arrow import demjson3 as demjson +import pandas as pd +import pytz from bs4 import BeautifulSoup from dateutil import parser, tz -from requests import Session, get +from requests import Response, Session -from .lib.utils import get_token +from parsers.lib.config import refetch_frequency +from parsers.lib.exceptions import ParserException # Used for consumption forecast data. API_ENDPOINT = "https://api.pjm.com/api/v1/" @@ -48,86 +50,56 @@ "cpl east": "SOUTHIMP|CPLE", } +FUEL_MAPPING = { + "Coal": "coal", + "Gas": "gas", + "Hydro": "hydro", + "Multiple Fuels": "unknown", + "Nuclear": "nuclear", + "Oil": "oil", + "Other": "unknown", + "Other Renewables": "unknown", + "Solar": "solar", + "Storage": "battery", + "Wind": "wind", +} -def extract_data(session: Optional[Session] = None) -> tuple: - """ - Makes a request to the PJM data url. - Finds timestamp of current data and converts into a useful form. - Finds generation data inside script tag. - - :return: tuple of generation data and datetime. - """ - s = session or Session() - req = s.get(url) - soup = BeautifulSoup(req.content, "html.parser") - - try: - time_div = soup.find("div", id="asOfDate").text - except AttributeError: - raise LookupError("No data is available for US-PJM.") - - time_pattern = re.compile( - r"""(\d{1,2} #Hour can be 1/2 digits. - : #Separator. - \d{2})\s #Minutes must be 2 digits with a space after. - (a.m.|p.m.) #Either am or pm allowed.""", - re.X, +def get_api_subscription_key(session: Session) -> str: + pjm_settings: Response = session.get( + "https://dataminer2.pjm.com/config/settings.json" + ) + if pjm_settings.status_code == 200: + return pjm_settings.json()["subscriptionKey"] + raise ParserException( + parser="US_PJM.py", + message=f"Could not get API key", ) - latest_time = re.search(time_pattern, time_div) - - time_data = latest_time.group(1).split(":") - am_or_pm = latest_time.group(2) - hour = int(time_data[0]) - minute = int(time_data[1]) - - # Time format used by PJM is slightly unusual and needs to be converted so arrow can use it. - if am_or_pm == "p.m." and hour != 12: - # Time needs to be in 24hr format - hour += 12 - elif am_or_pm == "a.m." and hour == 12: - # Midnight is 12 a.m. - hour = 0 - - arr_dt = arrow.now("America/New_York").replace(hour=hour, minute=minute) - future_check = arrow.now("America/New_York") - - if arr_dt > future_check: - # Generation mix lags 1-2hrs behind present. - # This check prevents data near midnight being given the wrong date. - arr_dt = arr_dt.shift(days=-1) - - dt = arr_dt.floor("minute").datetime - - generation_mix_div = soup.find("div", id="rtschartallfuelspjmGenFuelM_container") - generation_mix_script = generation_mix_div.next_sibling - - pattern = r"series: \[(.*)\]" - script_data = re.search(pattern, str(generation_mix_script)).group(1) - - # demjson is required because script data is javascript not valid json. - raw_data = demjson.decode(script_data) - data = raw_data["data"] - - return data, dt - - -def data_processer(data) -> dict: - """Takes a list of dictionaries and extracts generation type and value from each.""" - production = {} - for point in data: - gen_type = mapping[point["name"]] - gen_value = float(point["y"]) - production[gen_type] = production.get(gen_type, 0.0) + gen_value +def fetch_api_data(kind: str, params: dict, session: Session) -> list: - return production + headers = { + "Host": "api.pjm.com", + "Ocp-Apim-Subscription-Key": get_api_subscription_key(session=session), + "Origin": "http://dataminer2.pjm.com", + "Referer": "http://dataminer2.pjm.com/", + } + url = API_ENDPOINT + kind + resp: Response = session.get(url=url, params=params, headers=headers) + if resp.status_code == 200: + data = resp.json() + return data + else: + raise ParserException( + parser="US_PJM.py", + message=f"{kind} data is not available in the API", + ) def fetch_consumption_forecast_7_days( zone_key: str = "US-PJM", - session: Optional[Session] = None, + session: Session = Session(), target_datetime: Optional[datetime] = None, logger: Logger = getLogger(__name__), ) -> list: @@ -138,16 +110,12 @@ def fetch_consumption_forecast_7_days( if not session: session = Session() - headers = {"Ocp-Apim-Subscription-Key": get_token("PJM_TOKEN")} - # startRow must be set if forecast_area is set. # RTO_COMBINED is area for whole PJM zone. params = {"download": True, "startRow": 1, "forecast_area": "RTO_COMBINED"} # query API - url = API_ENDPOINT + "load_frcstd_7_day" - resp = get(url, params, headers=headers) - data = json.loads(resp.content) + data = fetch_api_data(kind="load_frcstd_7_day", params=params, session=session) data_points = list() for elem in data: @@ -163,28 +131,53 @@ def fetch_consumption_forecast_7_days( return data_points +@refetch_frequency(timedelta(days=1)) def fetch_production( zone_key: str = "US-PJM", - session: Optional[Session] = None, + session: Session = Session(), target_datetime: Optional[datetime] = None, logger: Logger = getLogger(__name__), -) -> dict: - """Requests the last known production mix (in MW) of a given country.""" - if target_datetime is not None: - raise NotImplementedError("This parser is not yet able to parse past dates") - - extracted = extract_data(session=None) - production = data_processer(extracted[0]) - - datapoint = { - "zoneKey": zone_key, - "datetime": extracted[1], - "production": production, - "storage": {"hydro": None, "battery": None}, - "source": "pjm.com", +) -> list: + """uses PJM API to get generation by fuel. we assume that storage is battery storage (see https://learn.pjm.com/energy-innovations/energy-storage)""" + if target_datetime is None: + target_datetime = arrow.utcnow().datetime + + params = { + "download": True, + "startRow": 1, + "fields": "datetime_beginning_ept,fuel_type,mw", + "datetime_beginning_ept": target_datetime.strftime("%Y-%m-%dT%H:00:00.0000000"), } - - return datapoint + resp_data = fetch_api_data(kind="gen_by_fuel", params=params, session=session) + + data = pd.DataFrame(resp_data) + data.datetime_beginning_ept = pd.to_datetime(data.datetime_beginning_ept) + data = data.set_index("datetime_beginning_ept") + data.fuel_type = data.fuel_type.map(FUEL_MAPPING) + + all_data_points = [] + for dt in data.index.unique(): + production = {} + storage = {} + data_dt = data.loc[data.index == dt] + for i in range(len(data_dt)): + row = data_dt.iloc[i] + if row["fuel_type"] == "battery": + storage["battery"] = row.get("mw") + else: + mode = row["fuel_type"] + production[mode] = row.get("mw") + data_point = { + "zoneKey": zone_key, + "datetime": arrow.get(dt).datetime.replace( + tzinfo=pytz.timezone("US/Eastern") + ), + "production": production, + "storage": storage, + "source": "pjm.com", + } + all_data_points += [data_point] + return all_data_points def add_default_tz(timestamp): @@ -196,7 +189,7 @@ def add_default_tz(timestamp): return modified_timestamp -def get_miso_exchange(session: Optional[Session] = None) -> tuple: +def get_miso_exchange(session: Session) -> tuple: """ Current exchange status between PJM and MISO. :return: tuple containing flow and timestamp. @@ -204,9 +197,8 @@ def get_miso_exchange(session: Optional[Session] = None) -> tuple: map_url = "http://pjm.com/markets-and-operations/interregional-map.aspx" - s = session or Session() - req = s.get(map_url) - soup = BeautifulSoup(req.content, "html.parser") + res: Response = session.get(map_url) + soup = BeautifulSoup(res.text, "html.parser") find_div = soup.find("div", {"id": "body_0_flow1", "class": "flow"}) @@ -233,7 +225,7 @@ def get_miso_exchange(session: Optional[Session] = None) -> tuple: return flow, dt_aware -def get_exchange_data(interface, session: Optional[Session] = None) -> list: +def get_exchange_data(interface, session: Session) -> list: """ This function can fetch 5min data for any PJM interface in the current day. Extracts load and timestamp data from html source then joins them together. @@ -242,9 +234,8 @@ def get_exchange_data(interface, session: Optional[Session] = None) -> list: base_url = "http://www.pjm.com/Charts/InterfaceChart.aspx?open=" url = base_url + exchange_mapping[interface] - s = session or Session() - req = s.get(url) - soup = BeautifulSoup(req.content, "html.parser") + res: Response = session.get(url) + soup = BeautifulSoup(res.text, "html.parser") scripts = soup.find( "script", @@ -281,16 +272,16 @@ def get_exchange_data(interface, session: Optional[Session] = None) -> list: return converted_flows -def combine_NY_exchanges() -> list: +def combine_NY_exchanges(session: Session) -> list: """ Combination function for the 4 New York interfaces. Timestamps are checked to ensure correct combination. """ - nyiso = get_exchange_data("nyiso", session=None) - neptune = get_exchange_data("neptune", session=None) - linden = get_exchange_data("linden", session=None) - hudson = get_exchange_data("hudson", session=None) + nyiso = get_exchange_data("nyiso", session) + neptune = get_exchange_data("neptune", session) + linden = get_exchange_data("linden", session) + hudson = get_exchange_data("hudson", session) combined_flows = zip(nyiso, neptune, linden, hudson) @@ -314,7 +305,7 @@ def combine_NY_exchanges() -> list: def fetch_exchange( zone_key1: str, zone_key2: str, - session: Optional[Session] = None, + session: Session = Session(), target_datetime: Optional[datetime] = None, logger: Logger = getLogger(__name__), ) -> Union[List[dict], dict]: @@ -326,12 +317,12 @@ def fetch_exchange( sortedcodes = "->".join(sorted([zone_key1, zone_key2])) if sortedcodes == "US-NY->US-PJM": - flows = combine_NY_exchanges() + flows = combine_NY_exchanges(session) elif sortedcodes == "US-MIDA-PJM->US-NY-NYIS": - flows = combine_NY_exchanges() + flows = combine_NY_exchanges(session) flows = [(-total, dt) for total, dt in flows] elif sortedcodes == "US-MISO->US-PJM": - flow = get_miso_exchange() + flow = get_miso_exchange(session) exchange = { "sortedZoneKeys": sortedcodes, "datetime": flow[1], @@ -340,7 +331,7 @@ def fetch_exchange( } return exchange elif sortedcodes == "US-MIDA-PJM->US-MIDW-MISO": - flow = get_miso_exchange() + flow = get_miso_exchange(session) exchange = { "sortedZoneKeys": sortedcodes, "datetime": flow[1], @@ -366,7 +357,7 @@ def fetch_exchange( def fetch_price( zone_key: str = "US-PJM", - session: Optional[Session] = None, + session: Session = Session(), target_datetime: Optional[datetime] = None, logger: Logger = getLogger(__name__), ) -> dict: @@ -374,9 +365,8 @@ def fetch_price( if target_datetime is not None: raise NotImplementedError("This parser is not yet able to parse past dates") - s = session or Session() - req = s.get(url) - soup = BeautifulSoup(req.content, "html.parser") + res: Response = session.get(url) + soup = BeautifulSoup(res.text, "html.parser") price_tag = soup.find("span", class_="rtolmpico") price_data = price_tag.find_next("h2")