Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade PJM production parser #4871

Merged
18 commits merged into from
Dec 19, 2022
226 changes: 108 additions & 118 deletions parsers/US_PJM.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

"""Parser for the PJM area of the United States."""

import json
import re
from datetime import datetime
from datetime import datetime, timedelta
from logging import Logger, getLogger
from typing import List, Optional, Union

import arrow
import demjson3 as demjson
import pandas as pd
import pytz
from bs4 import BeautifulSoup
from dateutil import parser, tz
from requests import Session, get
from requests import Response, Session

from .lib.utils import get_token
from parsers.lib.config import refetch_frequency
from parsers.lib.exceptions import ParserException

# Used for consumption forecast data.
API_ENDPOINT = "https://api.pjm.com/api/v1/"
Expand Down Expand Up @@ -48,86 +50,56 @@
"cpl east": "SOUTHIMP|CPLE",
}

FUEL_MAPPING = {
"Coal": "coal",
"Gas": "gas",
"Hydro": "hydro",
"Multiple Fuels": "unknown",
"Nuclear": "nuclear",
"Oil": "oil",
"Other": "unknown",
"Other Renewables": "unknown",
"Solar": "solar",
"Storage": "battery",
"Wind": "wind",
}

def extract_data(session: Optional[Session] = None) -> tuple:
"""
Makes a request to the PJM data url.
Finds timestamp of current data and converts into a useful form.
Finds generation data inside script tag.

:return: tuple of generation data and datetime.
"""

s = session or Session()
req = s.get(url)
soup = BeautifulSoup(req.content, "html.parser")

try:
time_div = soup.find("div", id="asOfDate").text
except AttributeError:
raise LookupError("No data is available for US-PJM.")

time_pattern = re.compile(
r"""(\d{1,2} #Hour can be 1/2 digits.
: #Separator.
\d{2})\s #Minutes must be 2 digits with a space after.
(a.m.|p.m.) #Either am or pm allowed.""",
re.X,
def get_api_subscription_key(session: Session) -> str:
pjm_settings: Response = session.get(
"https://dataminer2.pjm.com/config/settings.json"
)
if pjm_settings.status_code == 200:
return pjm_settings.json()["subscriptionKey"]
raise ParserException(
parser="US_PJM.py",
message=f"Could not get API key",
)

latest_time = re.search(time_pattern, time_div)

time_data = latest_time.group(1).split(":")
am_or_pm = latest_time.group(2)
hour = int(time_data[0])
minute = int(time_data[1])

# Time format used by PJM is slightly unusual and needs to be converted so arrow can use it.
if am_or_pm == "p.m." and hour != 12:
# Time needs to be in 24hr format
hour += 12
elif am_or_pm == "a.m." and hour == 12:
# Midnight is 12 a.m.
hour = 0

arr_dt = arrow.now("America/New_York").replace(hour=hour, minute=minute)
future_check = arrow.now("America/New_York")

if arr_dt > future_check:
# Generation mix lags 1-2hrs behind present.
# This check prevents data near midnight being given the wrong date.
arr_dt = arr_dt.shift(days=-1)

dt = arr_dt.floor("minute").datetime

generation_mix_div = soup.find("div", id="rtschartallfuelspjmGenFuelM_container")
generation_mix_script = generation_mix_div.next_sibling

pattern = r"series: \[(.*)\]"
script_data = re.search(pattern, str(generation_mix_script)).group(1)

# demjson is required because script data is javascript not valid json.
raw_data = demjson.decode(script_data)
data = raw_data["data"]

return data, dt


def data_processer(data) -> dict:
"""Takes a list of dictionaries and extracts generation type and value from each."""

production = {}
for point in data:
gen_type = mapping[point["name"]]
gen_value = float(point["y"])
production[gen_type] = production.get(gen_type, 0.0) + gen_value
def fetch_api_data(kind: str, params: dict, session: Session) -> list:

return production
headers = {
"Host": "api.pjm.com",
"Ocp-Apim-Subscription-Key": get_api_subscription_key(session=session),
"Origin": "http://dataminer2.pjm.com",
"Referer": "http://dataminer2.pjm.com/",
}
url = API_ENDPOINT + kind
resp: Response = session.get(url=url, params=params, headers=headers)
if resp.status_code == 200:
data = resp.json()
return data
else:
raise ParserException(
parser="US_PJM.py",
message=f"{kind} data is not available in the API",
)


def fetch_consumption_forecast_7_days(
zone_key: str = "US-PJM",
session: Optional[Session] = None,
session: Session = Session(),
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> list:
Expand All @@ -138,16 +110,12 @@ def fetch_consumption_forecast_7_days(
if not session:
session = Session()

headers = {"Ocp-Apim-Subscription-Key": get_token("PJM_TOKEN")}

# startRow must be set if forecast_area is set.
# RTO_COMBINED is area for whole PJM zone.
params = {"download": True, "startRow": 1, "forecast_area": "RTO_COMBINED"}

# query API
url = API_ENDPOINT + "load_frcstd_7_day"
resp = get(url, params, headers=headers)
data = json.loads(resp.content)
data = fetch_api_data(kind="load_frcstd_7_day", params=params, session=session)

data_points = list()
for elem in data:
Expand All @@ -163,28 +131,53 @@ def fetch_consumption_forecast_7_days(
return data_points


@refetch_frequency(timedelta(days=1))
def fetch_production(
zone_key: str = "US-PJM",
session: Optional[Session] = None,
session: Session = Session(),
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> dict:
"""Requests the last known production mix (in MW) of a given country."""
if target_datetime is not None:
raise NotImplementedError("This parser is not yet able to parse past dates")

extracted = extract_data(session=None)
production = data_processer(extracted[0])

datapoint = {
"zoneKey": zone_key,
"datetime": extracted[1],
"production": production,
"storage": {"hydro": None, "battery": None},
"source": "pjm.com",
) -> list:
"""uses PJM API to get generation by fuel. we assume that storage is battery storage (see https://learn.pjm.com/energy-innovations/energy-storage)"""
if target_datetime is None:
target_datetime = arrow.utcnow().datetime

params = {
"download": True,
"startRow": 1,
"fields": "datetime_beginning_ept,fuel_type,mw",
"datetime_beginning_ept": target_datetime.strftime("%Y-%m-%dT%H:00:00.0000000"),
}

return datapoint
resp_data = fetch_api_data(kind="gen_by_fuel", params=params, session=session)

data = pd.DataFrame(resp_data)
data.datetime_beginning_ept = pd.to_datetime(data.datetime_beginning_ept)
data = data.set_index("datetime_beginning_ept")
data.fuel_type = data.fuel_type.map(FUEL_MAPPING)

all_data_points = []
for dt in data.index.unique():
production = {}
storage = {}
data_dt = data.loc[data.index == dt]
for i in range(len(data_dt)):
row = data_dt.iloc[i]
if row["fuel_type"] == "battery":
storage["battery"] = row.get("mw")
else:
mode = row["fuel_type"]
production[mode] = row.get("mw")
data_point = {
"zoneKey": zone_key,
"datetime": arrow.get(dt).datetime.replace(
tzinfo=pytz.timezone("US/Eastern")
),
"production": production,
"storage": storage,
"source": "pjm.com",
}
all_data_points += [data_point]
return all_data_points


def add_default_tz(timestamp):
Expand All @@ -196,17 +189,16 @@ def add_default_tz(timestamp):
return modified_timestamp


def get_miso_exchange(session: Optional[Session] = None) -> tuple:
def get_miso_exchange(session: Session) -> tuple:
"""
Current exchange status between PJM and MISO.
:return: tuple containing flow and timestamp.
"""

map_url = "http://pjm.com/markets-and-operations/interregional-map.aspx"

s = session or Session()
req = s.get(map_url)
soup = BeautifulSoup(req.content, "html.parser")
res: Response = session.get(map_url)
soup = BeautifulSoup(res.text, "html.parser")

find_div = soup.find("div", {"id": "body_0_flow1", "class": "flow"})

Expand All @@ -233,7 +225,7 @@ def get_miso_exchange(session: Optional[Session] = None) -> tuple:
return flow, dt_aware


def get_exchange_data(interface, session: Optional[Session] = None) -> list:
def get_exchange_data(interface, session: Session) -> list:
"""
This function can fetch 5min data for any PJM interface in the current day.
Extracts load and timestamp data from html source then joins them together.
Expand All @@ -242,9 +234,8 @@ def get_exchange_data(interface, session: Optional[Session] = None) -> list:
base_url = "http://www.pjm.com/Charts/InterfaceChart.aspx?open="
url = base_url + exchange_mapping[interface]

s = session or Session()
req = s.get(url)
soup = BeautifulSoup(req.content, "html.parser")
res: Response = session.get(url)
soup = BeautifulSoup(res.text, "html.parser")

scripts = soup.find(
"script",
Expand Down Expand Up @@ -281,16 +272,16 @@ def get_exchange_data(interface, session: Optional[Session] = None) -> list:
return converted_flows


def combine_NY_exchanges() -> list:
def combine_NY_exchanges(session: Session) -> list:
"""
Combination function for the 4 New York interfaces.
Timestamps are checked to ensure correct combination.
"""

nyiso = get_exchange_data("nyiso", session=None)
neptune = get_exchange_data("neptune", session=None)
linden = get_exchange_data("linden", session=None)
hudson = get_exchange_data("hudson", session=None)
nyiso = get_exchange_data("nyiso", session)
neptune = get_exchange_data("neptune", session)
linden = get_exchange_data("linden", session)
hudson = get_exchange_data("hudson", session)

combined_flows = zip(nyiso, neptune, linden, hudson)

Expand All @@ -314,7 +305,7 @@ def combine_NY_exchanges() -> list:
def fetch_exchange(
zone_key1: str,
zone_key2: str,
session: Optional[Session] = None,
session: Session = Session(),
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> Union[List[dict], dict]:
Expand All @@ -326,12 +317,12 @@ def fetch_exchange(
sortedcodes = "->".join(sorted([zone_key1, zone_key2]))

if sortedcodes == "US-NY->US-PJM":
flows = combine_NY_exchanges()
flows = combine_NY_exchanges(session)
elif sortedcodes == "US-MIDA-PJM->US-NY-NYIS":
flows = combine_NY_exchanges()
flows = combine_NY_exchanges(session)
flows = [(-total, dt) for total, dt in flows]
elif sortedcodes == "US-MISO->US-PJM":
flow = get_miso_exchange()
flow = get_miso_exchange(session)
exchange = {
"sortedZoneKeys": sortedcodes,
"datetime": flow[1],
Expand All @@ -340,7 +331,7 @@ def fetch_exchange(
}
return exchange
elif sortedcodes == "US-MIDA-PJM->US-MIDW-MISO":
flow = get_miso_exchange()
flow = get_miso_exchange(session)
exchange = {
"sortedZoneKeys": sortedcodes,
"datetime": flow[1],
Expand All @@ -366,17 +357,16 @@ def fetch_exchange(

def fetch_price(
zone_key: str = "US-PJM",
session: Optional[Session] = None,
session: Session = Session(),
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> dict:
"""Requests the last known power price of a given country."""
if target_datetime is not None:
raise NotImplementedError("This parser is not yet able to parse past dates")

s = session or Session()
req = s.get(url)
soup = BeautifulSoup(req.content, "html.parser")
res: Response = session.get(url)
soup = BeautifulSoup(res.text, "html.parser")

price_tag = soup.find("span", class_="rtolmpico")
price_data = price_tag.find_next("h2")
Expand Down