forked from openai/chatgpt-retrieval-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
file.py
117 lines (95 loc) · 3.59 KB
/
file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
from io import BufferedReader
from typing import Optional
from fastapi import UploadFile
import mimetypes
from PyPDF2 import PdfReader
import docx2txt
import csv
import pptx
from loguru import logger
from models.models import Document, DocumentMetadata
async def get_document_from_file(
file: UploadFile, metadata: DocumentMetadata
) -> Document:
extracted_text = await extract_text_from_form_file(file)
doc = Document(text=extracted_text, metadata=metadata)
return doc
def extract_text_from_filepath(filepath: str, mimetype: Optional[str] = None) -> str:
"""Return the text content of a file given its filepath."""
if mimetype is None:
# Get the mimetype of the file based on its extension
mimetype, _ = mimetypes.guess_type(filepath)
if not mimetype:
if filepath.endswith(".md"):
mimetype = "text/markdown"
else:
raise Exception("Unsupported file type")
try:
with open(filepath, "rb") as file:
extracted_text = extract_text_from_file(file, mimetype)
except Exception as e:
logger.error(e)
raise e
return extracted_text
def extract_text_from_file(file: BufferedReader, mimetype: str) -> str:
if mimetype == "application/pdf":
# Extract text from pdf using PyPDF2
reader = PdfReader(file)
extracted_text = " ".join([page.extract_text() for page in reader.pages])
elif mimetype == "text/plain" or mimetype == "text/markdown":
# Read text from plain text file
extracted_text = file.read().decode("utf-8")
elif (
mimetype
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
# Extract text from docx using docx2txt
extracted_text = docx2txt.process(file)
elif mimetype == "text/csv":
# Extract text from csv using csv module
extracted_text = ""
decoded_buffer = (line.decode("utf-8") for line in file)
reader = csv.reader(decoded_buffer)
for row in reader:
extracted_text += " ".join(row) + "\n"
elif (
mimetype
== "application/vnd.openxmlformats-officedocument.presentationml.presentation"
):
# Extract text from pptx using python-pptx
extracted_text = ""
presentation = pptx.Presentation(file)
for slide in presentation.slides:
for shape in slide.shapes:
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
for run in paragraph.runs:
extracted_text += run.text + " "
extracted_text += "\n"
else:
# Unsupported file type
raise ValueError("Unsupported file type: {}".format(mimetype))
return extracted_text
# Extract text from a file based on its mimetype
async def extract_text_from_form_file(file: UploadFile):
"""Return the text content of a file."""
# get the file body from the upload file object
mimetype = file.content_type
logger.info(f"mimetype: {mimetype}")
logger.info(f"file.file: {file.file}")
logger.info("file: ", file)
file_stream = await file.read()
temp_file_path = "/tmp/temp_file"
# write the file to a temporary location
with open(temp_file_path, "wb") as f:
f.write(file_stream)
try:
extracted_text = extract_text_from_filepath(temp_file_path, mimetype)
except Exception as e:
logger.error(e)
os.remove(temp_file_path)
raise e
# remove file from temp location
os.remove(temp_file_path)
return extracted_text