Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ OpenAI parser #245

Merged
merged 11 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ By default, there is a `GenericProvider` that support a `SimpleProcessor` using

> Note: Because these providers do not support the BCOP standard natively, maybe there are some gaps on the implemented parser that will be refined with new test cases. We encourage you to report related **issues**!

#### LLM-powered Parsers

The library supports an optional parser option leveraging Large Language Model (LLM) to provide a best-effort parsing when the specific parsers have not been successful.

When the appropriate environment variable(s) are set (see below), these LLM parsers are automatically appended after all existing processors for each defined Provider.

> These integrations may involve some costs for API usage. Use it carefully! As an order of magnitude, a parsing of an email with OpenAI GPT gpt-3.5-turbo model costs $0.004.

These are the current supported LLM integrations:

- [OpenAI](https://openai.com/product), these are the supported ENVs:
- `OPENAI_TOKEN` (Required): OpenAI token.
- `OPENAI_MODEL` (Optional): Model to use, it defaults to "gpt-3.5-turbo".

## Installation

The library is available as a Python package in pypi and can be installed with pip:
Expand Down Expand Up @@ -319,6 +333,7 @@ The project is following Network to Code software development guidelines and is
...omitted debug logs...
====================================================== 99 passed, 174 deselected, 17 warnings in 10.35s ======================================================
```

7. Run some final CI tests locally to ensure that there is no linting/formatting issues with your changes. You should look to get a code score of 10/10. See the example below: `invoke tests --local`

```
Expand Down
199 changes: 188 additions & 11 deletions circuit_maintenance_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import quopri
from typing import Dict, List
from email.utils import parsedate_tz, mktime_tz
import hashlib

import bs4 # type: ignore
from bs4.element import ResultSet # type: ignore

from pydantic import BaseModel, Extra
from pydantic import BaseModel
from icalendar import Calendar # type: ignore

from circuit_maintenance_parser.errors import ParserError
Expand All @@ -23,7 +24,7 @@
logger = logging.getLogger(__name__)


class Parser(BaseModel, extra=Extra.forbid):
class Parser(BaseModel):
"""Parser class.

A Parser handles one or more specific data type(s) (specified in `data_types`).
Expand All @@ -34,14 +35,15 @@ class Parser(BaseModel, extra=Extra.forbid):
# _data_types are used to match the Parser to to each type of DataPart
_data_types = ["text/plain", "plain"]

# TODO: move it to where it is used, Cogent parser
_geolocator = Geolocator()

@classmethod
def get_data_types(cls) -> List[str]:
"""Return the expected data type."""
return cls._data_types

def parser_hook(self, raw: bytes) -> List[Dict]:
def parser_hook(self, raw: bytes, content_type: str) -> List[Dict]:
"""Custom parser logic.

This method is used by the main `Parser` classes (such as `ICal` or `Html` parser) to define a shared
Expand All @@ -53,14 +55,14 @@ def parser_hook(self, raw: bytes) -> List[Dict]:
"""
raise NotImplementedError

def parse(self, raw: bytes) -> List[Dict]:
def parse(self, raw: bytes, content_type: str) -> List[Dict]:
"""Execute parsing.

Do not override this method!
Instead, each main `Parser` class should implement its own custom logic within the `parser_hook` method.
"""
try:
result = self.parser_hook(raw)
result = self.parser_hook(raw, content_type)
except Exception as exc:
raise ParserError from exc
if any(not partial_result for partial_result in result):
Expand All @@ -86,7 +88,7 @@ class ICal(Parser):

_data_types = ["text/calendar", "ical", "icalendar"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
# iCalendar data sometimes comes encoded with base64
# TODO: add a test case
Expand Down Expand Up @@ -159,7 +161,7 @@ def remove_hex_characters(string):
"""Convert any hex characters to standard ascii."""
return string.encode("ascii", errors="ignore").decode("utf-8")

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
Expand Down Expand Up @@ -191,7 +193,7 @@ class EmailDateParser(Parser):

_data_types = [EMAIL_HEADER_DATE]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
parsed_date = parsedate_tz(raw.decode())
if parsed_date:
Expand All @@ -204,7 +206,7 @@ class EmailSubjectParser(Parser):

_data_types = [EMAIL_HEADER_SUBJECT]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_subject(self.bytes_to_string(raw).replace("\r", "").replace("\n", "")):
Expand All @@ -226,7 +228,7 @@ class Csv(Parser):

_data_types = ["application/csv", "text/csv", "application/octet-stream"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_csv(raw):
Expand All @@ -245,7 +247,7 @@ class Text(Parser):

_data_types = ["text/plain"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
text = self.get_text_hook(raw)
Expand All @@ -261,3 +263,178 @@ def get_text_hook(raw: bytes) -> str:
def parse_text(self, text) -> List[Dict]:
"""Custom text parsing."""
raise NotImplementedError


class LLM(Parser):
"""LLM parser."""

_data_types = ["text/html", "html", "text/plain"]

_llm_question = """Please, could you extract a JSON form without any other comment,
with the following JSON schema (timestamps in EPOCH):
{
"type": "object",
"properties": {
"start": {
"type": "int",
},
"end": {
"type": "int",
},
"account": {
"type": "string",
},
"summary": {
"type": "string",
},
"maintenance_id": {
"type": "string",
},
"account": {
"type": "string",
},
"status": {
"type": "string",
},
"impact": {
"type": "string",
},
"circuit_ids": {
"type": "array",
"items": {
"type": "string",
}
}
}
More context:
* Circuit IDs are also known as service or order
* Status could be confirmed, ongoing, cancelled, completed or rescheduled
"""

def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
if content_type in ["html", "text/html"]:
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
content = soup.text
elif content_type in ["text/plain"]:
content = self.get_text_hook(raw)

for data in self.parse_content(content):
result.append(data)
return result

@staticmethod
def get_text_hook(raw: bytes) -> str:
"""Can be overwritten by subclasses."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What purpose does this method serve? When would subclasses need/want to override it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it more explicit why it is used. the final parsers could have a different way to decode bytes to string

return raw.decode()

@staticmethod
def get_key_with_string(dictionary: dict, string: str):
"""Returns the key in the dictionary that contains the given string."""
for key in dictionary.keys():
if string in key:
return key
return None

def get_llm_response(self, content):
"""Method to retrieve the response from the LLM for some content."""
raise NotImplementedError

def _get_impact(self, generated_json: dict):
"""Method to get a general Impact for all Circuits."""
impact_key = self.get_key_with_string(generated_json, "impact")
if impact_key:
if "no impact" in generated_json[impact_key].lower():
return Impact.NO_IMPACT
if "partial" in generated_json[impact_key].lower():
return Impact.DEGRADED

return Impact.OUTAGE

def _get_circuit_ids(self, generated_json: dict, impact: Impact):
"""Method to get the Circuit IDs and use a general Impact."""
circuits = []
circuits_ids_key = self.get_key_with_string(generated_json, "circuit")
for circuit in generated_json[circuits_ids_key]:
if isinstance(circuit, str):
circuits.append(CircuitImpact(circuit_id=circuit, impact=impact))
elif isinstance(circuit, dict):
circuit_key = self.get_key_with_string(circuit, "circuit")
circuits.append(CircuitImpact(circuit_id=circuit[circuit_key], impact=impact))

return circuits

def _get_start(self, generated_json: dict):
"""Method to get the Start Time."""
return generated_json[self.get_key_with_string(generated_json, "start")]

def _get_end(self, generated_json: dict):
"""Method to get the End Time."""
return generated_json[self.get_key_with_string(generated_json, "end")]

def _get_summary(self, generated_json: dict):
"""Method to get the Summary."""
return generated_json[self.get_key_with_string(generated_json, "summary")]

def _get_status(self, generated_json: dict):
"""Method to get the Status."""
status_key = self.get_key_with_string(generated_json, "status")

if "confirmed" in generated_json[status_key].lower():
return Status.CONFIRMED
if "rescheduled" in generated_json[status_key].lower():
return Status.RE_SCHEDULED
if "cancelled" in generated_json[status_key].lower():
return Status.CANCELLED
if "ongoing" in generated_json[status_key].lower():
return Status.IN_PROCESS
if "completed" in generated_json[status_key].lower():
return Status.COMPLETED

return Status.CONFIRMED

def _get_account(self, generated_json: dict):
"""Method to get the Account."""
account = generated_json[self.get_key_with_string(generated_json, "account")]
if not account:
return "Not found"

return account

def _get_maintenance_id(self, generated_json: dict, start, end, circuits):
"""Method to get the Maintenance ID."""
maintenance_key = self.get_key_with_string(generated_json, "maintenance")
if maintenance_key and generated_json["maintenance_id"] != "N/A":
return generated_json["maintenance_id"]

maintenance_id = str(start) + str(end) + "".join(list(circuits))
return hashlib.md5(maintenance_id.encode("utf-8")).hexdigest() # nosec

def parse_content(self, content):
"""Parse content via LLM."""
generated_json = self.get_llm_response(content)
if not generated_json:
return []

impact = self._get_impact(generated_json)

data = {
"circuits": self._get_circuit_ids(generated_json, impact),
"start": int(self._get_start(generated_json)),
"end": int(self._get_end(generated_json)),
"summary": str(self._get_summary(generated_json)),
"status": self._get_status(generated_json),
"account": str(self._get_account(generated_json)),
}

data["maintenance_id"] = str(
self._get_maintenance_id(
generated_json,
data["start"],
data["end"],
data["circuits"],
)
)

return [data]
51 changes: 51 additions & 0 deletions circuit_maintenance_parser/parsers/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""OpenAI Parser."""
import os
import logging
import json
from typing import List, Optional

import openai

from circuit_maintenance_parser.parser import LLM

logger = logging.getLogger(__name__)


class OpenAIParser(LLM):
"""Notifications Parser powered by OpenAI ChatGPT."""

def get_llm_response(self, content) -> Optional[List]:
"""Get LLM processing from OpenAI."""
openai.api_key = os.getenv("OPENAI_TOKEN")
openai_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
try:
response = openai.ChatCompletion.create(
model=openai_model,
messages=[
{
"role": "system",
"content": self._llm_question,
},
{
"role": "user",
"content": content,
},
],
)

# TODO: Maybe asking again about the generated response could refine it

except openai.error.InvalidRequestError as err:
logger.error(err)
return None

logger.info("Used OpenAI tokens: %s", response["usage"])
generated_text = response.choices[0].message["content"]
logger.info("Response from LLM: %s", generated_text)
try:
return json.loads(generated_text)
except ValueError as err:
logger.error(err)
return None

return None
Loading