Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ OpenAI parser #245

Merged
merged 11 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ By default, there is a `GenericProvider` that support a `SimpleProcessor` using

> Note: Because these providers do not support the BCOP standard natively, maybe there are some gaps on the implemented parser that will be refined with new test cases. We encourage you to report related **issues**!

#### LLM-powered Parsers

The library supports an optional parser option leveraging Large Language Model (LLM) to provide a best-effort parsing when the specific parsers have not been successful.

These LLM parsers are automatically appended as a processor option after the already available for all the Providers when the integration environmental variable is set (check the below integrations).
chadell marked this conversation as resolved.
Show resolved Hide resolved

> These integrations may involve some costs for API usage. Use it carefully! As an order of magnitude, a parsing of an email with OpenAI GPT gpt-3.5-turbo model costs $0.004.

These are the current supported LLM integrations:

- [OpenAI](https://openai.com/product), these are the supported ENVs:
- `OPENAI_TOKEN` (Required): OpenAI token.
- `OPENAI_MODEL` (Optional): Model to use, it defaults to "gpt-3.5-turbo".

## Installation

The library is available as a Python package in pypi and can be installed with pip:
Expand Down Expand Up @@ -319,6 +333,7 @@ The project is following Network to Code software development guidelines and is
...omitted debug logs...
====================================================== 99 passed, 174 deselected, 17 warnings in 10.35s ======================================================
```

7. Run some final CI tests locally to ensure that there is no linting/formatting issues with your changes. You should look to get a code score of 10/10. See the example below: `invoke tests --local`

```
Expand Down
165 changes: 154 additions & 11 deletions circuit_maintenance_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import quopri
from typing import Dict, List
from email.utils import parsedate_tz, mktime_tz
import hashlib

import bs4 # type: ignore
from bs4.element import ResultSet # type: ignore

from pydantic import BaseModel, Extra
from pydantic import BaseModel
from icalendar import Calendar # type: ignore

from circuit_maintenance_parser.errors import ParserError
Expand All @@ -23,7 +24,7 @@
logger = logging.getLogger(__name__)


class Parser(BaseModel, extra=Extra.forbid):
class Parser(BaseModel):
"""Parser class.

A Parser handles one or more specific data type(s) (specified in `data_types`).
Expand All @@ -34,14 +35,15 @@ class Parser(BaseModel, extra=Extra.forbid):
# _data_types are used to match the Parser to to each type of DataPart
_data_types = ["text/plain", "plain"]

# TODO: move it to where it is used, Cogent parser
_geolocator = Geolocator()

@classmethod
def get_data_types(cls) -> List[str]:
"""Return the expected data type."""
return cls._data_types

def parser_hook(self, raw: bytes) -> List[Dict]:
def parser_hook(self, raw: bytes, content_type: str) -> List[Dict]:
"""Custom parser logic.

This method is used by the main `Parser` classes (such as `ICal` or `Html` parser) to define a shared
Expand All @@ -53,14 +55,14 @@ def parser_hook(self, raw: bytes) -> List[Dict]:
"""
raise NotImplementedError

def parse(self, raw: bytes) -> List[Dict]:
def parse(self, raw: bytes, content_type: str) -> List[Dict]:
"""Execute parsing.

Do not override this method!
Instead, each main `Parser` class should implement its own custom logic within the `parser_hook` method.
"""
try:
result = self.parser_hook(raw)
result = self.parser_hook(raw, content_type)
except Exception as exc:
raise ParserError from exc
if any(not partial_result for partial_result in result):
Expand All @@ -86,7 +88,7 @@ class ICal(Parser):

_data_types = ["text/calendar", "ical", "icalendar"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
# iCalendar data sometimes comes encoded with base64
# TODO: add a test case
Expand Down Expand Up @@ -159,7 +161,7 @@ def remove_hex_characters(string):
"""Convert any hex characters to standard ascii."""
return string.encode("ascii", errors="ignore").decode("utf-8")

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
Expand Down Expand Up @@ -191,7 +193,7 @@ class EmailDateParser(Parser):

_data_types = [EMAIL_HEADER_DATE]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
parsed_date = parsedate_tz(raw.decode())
if parsed_date:
Expand All @@ -204,7 +206,7 @@ class EmailSubjectParser(Parser):

_data_types = [EMAIL_HEADER_SUBJECT]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_subject(self.bytes_to_string(raw).replace("\r", "").replace("\n", "")):
Expand All @@ -226,7 +228,7 @@ class Csv(Parser):

_data_types = ["application/csv", "text/csv", "application/octet-stream"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_csv(raw):
Expand All @@ -245,7 +247,7 @@ class Text(Parser):

_data_types = ["text/plain"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
text = self.get_text_hook(raw)
Expand All @@ -261,3 +263,144 @@ def get_text_hook(raw: bytes) -> str:
def parse_text(self, text) -> List[Dict]:
"""Custom text parsing."""
raise NotImplementedError


class LLM(Parser):
"""LLM parser."""

_data_types = ["text/html", "html", "text/plain"]

_llm_question = (
"Can you extract the maintenance_id, the account_id, the impact, the status "
"(e.g., confirmed, cancelled, rescheduled), the summary, the circuit ids (also defined as service or order), "
"and the global start_time and end_time as EPOCH timestamps in JSON format with keys in "
"lowercase underscore format? Reply with only the answer in JSON form and "
"include no other commentary"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to the OpenAPI parser specifically or at least made overridable? I could see that different LLMs might need variant phrasings of the question to get the desired outcome.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is to have a generic one that is used by default, and then, every LLM could overwrite as needed


def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
if content_type in ["html", "text/html"]:
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
content = soup.text
else:
content = self.get_text_hook(raw)
for data in self.parse_content(content):
result.append(data)
return result

@staticmethod
def get_text_hook(raw: bytes) -> str:
"""Can be overwritten by subclasses."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What purpose does this method serve? When would subclasses need/want to override it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it more explicit why it is used. the final parsers could have a different way to decode bytes to string

return raw.decode()

@staticmethod
def get_key_with_string(dictionary: dict, string: str):
"""Returns the key in the dictionary that contains the given string."""
for key in dictionary.keys():
if string in key:
return key
return None

def get_llm_response(self, content):
"""Method to retrieve the response from the LLM for some content."""
raise NotImplementedError

def _get_impact(self, generated_json: dict):
"""Method to get a general Impact for all Circuits."""
impact_key = self.get_key_with_string(generated_json, "impact")
if impact_key:
if "no impact" in generated_json[impact_key].lower():
return Impact.NO_IMPACT
if "partial" in generated_json[impact_key].lower():
return Impact.DEGRADED

return Impact.OUTAGE

def _get_circuit_ids(self, generated_json: dict, impact: Impact):
"""Method to get the Circuit IDs and use a general Impact."""
circuits = []
circuits_ids_key = self.get_key_with_string(generated_json, "circuit")
for circuit in generated_json[circuits_ids_key]:
if isinstance(circuit, str):
circuits.append(CircuitImpact(circuit_id=circuit, impact=impact))
elif isinstance(circuit, dict):
circuit_key = self.get_key_with_string(circuit, "circuit")
circuits.append(CircuitImpact(circuit_id=circuit[circuit_key], impact=impact))

return circuits

def _get_start(self, generated_json: dict):
"""Method to get the Start Time."""
return generated_json[self.get_key_with_string(generated_json, "start_time")]

def _get_end(self, generated_json: dict):
"""Method to get the End Time."""
return generated_json[self.get_key_with_string(generated_json, "end_time")]

def _get_summary(self, generated_json: dict):
"""Method to get the Summary."""
return generated_json[self.get_key_with_string(generated_json, "summary")]

def _get_status(self, generated_json: dict):
"""Method to get the Status."""
status_key = self.get_key_with_string(generated_json, "status")

if "confirmed" in generated_json[status_key].lower():
return Status.CONFIRMED
if "rescheduled" in generated_json[status_key].lower():
return Status.RE_SCHEDULED
if "cancelled" in generated_json[status_key].lower():
return Status.CANCELLED
if "ongoing" in generated_json[status_key].lower():
return Status.IN_PROCESS
if "completed" in generated_json[status_key].lower():
return Status.COMPLETED

return Status.CONFIRMED

def _get_account(self, generated_json: dict):
"""Method to get the Account."""
account = generated_json[self.get_key_with_string(generated_json, "account")]
if not account:
return "Not found"

return account

def _get_maintenance_id(self, generated_json: dict, start, end, circuits):
"""Method to get the Maintenance ID."""
maintenance_key = self.get_key_with_string(generated_json, "maintenance")
if maintenance_key and generated_json["maintenance_id"] != "N/A":
return generated_json["maintenance_id"]

maintenace_id = str(start) + str(end) + "".join(list(circuits))
return hashlib.md5(maintenace_id.encode("utf-8")).hexdigest() # nosec
chadell marked this conversation as resolved.
Show resolved Hide resolved

def parse_content(self, content):
"""Parse content via LLM."""
generated_json = self.get_llm_response(content)
if not generated_json:
return []

impact = self._get_impact(generated_json)

data = {
"circuits": self._get_circuit_ids(generated_json, impact),
"start": int(self._get_start(generated_json)),
"end": int(self._get_end(generated_json)),
"summary": str(self._get_summary(generated_json)),
"status": self._get_status(generated_json),
"account": str(self._get_account(generated_json)),
}

data["maintenance_id"] = str(
self._get_maintenance_id(
generated_json,
data["start"],
data["end"],
data["circuits"],
)
)

return [data]
48 changes: 48 additions & 0 deletions circuit_maintenance_parser/parsers/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""OpenAI Parser."""
import os
import logging
import json
from typing import List, Optional

import openai

from circuit_maintenance_parser.parser import LLM

logger = logging.getLogger(__name__)


class OpenAIParser(LLM):
"""Notifications Parser powered by OpenAI ChatGPT."""

def get_llm_response(self, content) -> Optional[List]:
"""Get LLM processing from OpenAI."""
openai.api_key = os.getenv("OPENAI_TOKEN")
openai_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
try:
response = openai.ChatCompletion.create(
model=openai_model,
messages=[
{
"role": "system",
"content": content,
},
{
"role": "user",
"content": self._llm_question,
},
],
)
except openai.error.InvalidRequestError as err:
logger.error(err)
return None

logger.info("Used OpenAI tokens: %s", response["usage"])
generated_text = response.choices[0].message["content"]
logger.info("Response from LLM: %s", generated_text)
try:
return json.loads(generated_text)
except ValueError as err:
logger.error(err)
return None

return None
18 changes: 9 additions & 9 deletions circuit_maintenance_parser/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
notification data may have in order to create the `Maintenance` object.

There are 2 hooks available, to be implemented by custom `Processors`:
process_hook: Method that recieves the parsed output and manipulates the extracted data. It could create
process_hook: Method that receives the parsed output and manipulates the extracted data. It could create
the final `Maintenances` or just accumulate them.
post_process_hook (optional): Used to be able to do a final action on the extracted data before returing
post_process_hook (optional): Used to be able to do a final action on the extracted data before returning
the final `Maintenances`.

Attributes:
Expand All @@ -54,13 +54,13 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
self.extended_data = extended_data
maintenances_data: List = []

# First, we generate a list of tuples with a `DataPart` and `Parser` if the data type from the first is
# supported by the second.
data_part_and_parser_combinations = [
(data_part, data_parser)
# First, we generate a set with the key `Parser` and `DataPart` if the data type from the first is
chadell marked this conversation as resolved.
Show resolved Hide resolved
# supported by the second. This avoids reusing the same Parser for different data types if supported.
data_part_and_parser_combinations = {
data_parser: data_part
for (data_part, data_parser) in itertools.product(data.data_parts, self.data_parsers)
if data_part.type in data_parser.get_data_types()
]
}

if not data_part_and_parser_combinations:
error_message = (
Expand All @@ -72,9 +72,9 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
logger.debug(error_message)
raise ProcessorError(error_message)

for data_part, data_parser in data_part_and_parser_combinations:
for data_parser, data_part in data_part_and_parser_combinations.items():
try:
self.process_hook(data_parser().parse(data_part.content), maintenances_data)
self.process_hook(data_parser().parse(data_part.content, data_part.type), maintenances_data)

except (ParserError, ValidationError) as exc:
error_message = "Parser class %s from %s was not successful.\n%s"
Expand Down
Loading
Loading